#!/usr/bin/env python3 """Rock watcher — 'when do I switch asteroids?' alerts from your Survey Scanner. EVE has no API for asteroid contents, so this reads the **Survey Scanner Results** window off the screen (the only place that shows units remaining per rock). It tracks each rock's amount over time and tells you, in Discord, when the rock you're mining is about to run dry — and which rock to jump to next. Two ways it knows your depletion rate (best available wins): 1. Direct — if the survey scanner is left running, each rock's number drops every cycle; it measures units/sec straight from that (most accurate, multi-rock). 2. Hold-fill fallback — if there's a single rock and the scanner is static, it uses the live ore-hold fill rate (written by eve_orehold_watcher to .mining_rate) divided by the ore's m³/unit. Only active during a mining session: start one with `!mining on` in Discord (`!mining off` to stop). Runs headless, launched by start-all.ps1. python eve_rock_watcher.py # normal (waits for !mining on) python eve_rock_watcher.py --snip # manually box the survey window (if auto fails) python eve_rock_watcher.py --test # parse one screen grab and print rows """ import json import os import re import socket import sys import time HERE = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, HERE) import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining # ore unit volumes (m³/unit) for the hold-fill fallback; base names, longest first ORE_VOL = { "dark ochre": 8.0, "veldspar": 0.1, "scordite": 0.15, "pyroxeres": 0.3, "plagioclase": 0.35, "omber": 0.6, "kernite": 1.2, "jaspet": 2.0, "hemorphite": 3.0, "hedbergite": 3.0, "gneiss": 5.0, "crokite": 16.0, "spodumain": 16.0, "bistot": 16.0, "arkonor": 16.0, "mercoxit": 40.0, } ORE_NAMES = sorted(ORE_VOL, key=len, reverse=True) # match "dark ochre" before "ochre" def match_ore(line_lower): for name in ORE_NAMES: if name in line_lower: return name return None def parse_survey(text): """OCR text of the Survey Scanner Results -> [(ore_base_name, units), ...].""" rows = [] for raw in text.splitlines(): line = raw.strip() if not line: continue ore = match_ore(line.lower()) if not ore: continue ints = [int(n.replace(",", "")) for n in re.findall(r"\d[\d,]*", line)] ints = [n for n in ints if n >= 10] # drop tiny (distance fragments) if not ints: continue rows.append((ore, max(ints))) # units is the big number on the row return rows # Selected-Item panel (no survey scanner needed): "...Quantity 40,370 Units". # Match the NUMBER before "Units" — don't require the word "Quantity" (OCR drops it). QTY_RE = re.compile(r"([\d.,]{2,})\s*units?\b", re.I) def parse_selected(text): """The selected asteroid's remaining units from its info panel, or None.""" m = QTY_RE.search(text.replace("\n", " ")) if not m: return None digits = re.sub(r"\D", "", m.group(1)) # tolerate OCR , vs . in the number return int(digits) if digits and len(digits) >= 2 else None def save_rock_region(cp, l, t, wd, ht, mode="survey"): if not cp.has_section("rock"): cp.add_section("rock") cp["rock"]["region"] = f"{l},{t},{wd},{ht}" cp["rock"]["mode"] = mode # 'survey' rows or 'selected' item with open(w.CONFIG_PATH, "w") as f: cp.write(f) def detect_selected_region(cp): """Find the Selected Item 'Quantity N Units' line WITHIN the EVE window capture (clean, occlusion-proof). Returns a WINDOW-RELATIVE tight [l,t,w,h] or None.""" import pytesseract tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: pytesseract.pytesseract.tesseract_cmd = tcmd img = w.capture_window() if img is None: return None S = 2 d = pytesseract.image_to_data(img.resize((img.width*S, img.height*S)), output_type=pytesseract.Output.DICT) n = len(d["text"]) for i in range(n): if "unit" not in d["text"][i].strip().lower(): continue uy, uh, ux, ur = d["top"][i], d["height"][i], d["left"][i], d["left"][i] + d["width"][i] # the number is the word(s) just LEFT of "Units" on the same line nums = [j for j in range(n) if d["text"][j].strip() and abs(d["top"][j] - uy) < uh and 0 < (ux - d["left"][j]) < 340 * S and re.search(r"\d", d["text"][j])] if not nums: continue l = min(d["left"][j] for j in nums) // S reg = [max(0, l - 20), max(0, uy // S - 6), (ur // S - l) + 45, uh // S + 12] save_rock_region(cp, *reg, mode="selected") print(f"[rock] Selected-Item quantity region (window-relative) -> {reg}") return reg return None def detect_survey_region(cp): """Find the Survey Scanner Results window by locating >=2 'ore name + number' rows.""" import mss import pytesseract from PIL import Image tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: pytesseract.pytesseract.tesseract_cmd = tcmd try: with mss.mss() as sct: mon = sct.monitors[0] raw = sct.grab(mon) img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX") data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) except Exception as e: print(f"[rock] detect error: {e}") return None lines = {} for i in range(len(data["text"])): if not data["text"][i].strip(): continue k = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) lines.setdefault(k, []).append(i) boxes = [] for idxs in lines.values(): joined = " ".join(data["text"][i] for i in idxs).lower() if match_ore(joined) and re.search(r"\d", joined): xs = [data["left"][i] for i in idxs] ys = [data["top"][i] for i in idxs] rs = [data["left"][i] + data["width"][i] for i in idxs] bs = [data["top"][i] + data["height"][i] for i in idxs] boxes.append((min(xs), min(ys), max(rs), max(bs))) if len(boxes) < 2: return None pad = 12 l = mon["left"] + min(b[0] for b in boxes) - pad t = mon["top"] + min(b[1] for b in boxes) - pad wd = max(b[2] for b in boxes) - min(b[0] for b in boxes) + 2 * pad ht = max(b[3] for b in boxes) - min(b[1] for b in boxes) + 2 * pad save_rock_region(cp, l, t, wd, ht) print(f"[rock] auto-detected survey window -> {l},{t},{wd},{ht} ({len(boxes)} rows)") return [l, t, wd, ht] def hold_rate(): """Live ore-hold fill rate (m³/min) written by eve_orehold_watcher, if fresh.""" try: d = json.load(open(w.RATE_PATH)) if time.time() - d.get("ts", 0) <= 120: return float(d.get("rate_m3min", 0) or 0) except Exception: pass return 0.0 class Tracker: """Per-rock unit history -> measured depletion rate (units/sec).""" def __init__(self): self.h = {} # key -> [(t, units)] def update(self, rows, now): counts, seen, keyed = {}, set(), [] for ore, units in rows: ore = ore.lower() counts[ore] = counts.get(ore, 0) + 1 key = f"{ore}#{counts[ore]}" keyed.append((key, ore, units)) seen.add(key) hist = self.h.setdefault(key, []) if hist and units > hist[-1][1] + 1: # went up = new rock / rescan -> reset hist.clear() hist.append((now, units)) self.h[key] = [(t, u) for (t, u) in hist if now - t <= 120] for key in list(self.h): # forget rocks no longer listed if key not in seen: del self.h[key] return keyed def rate(self, key): hist = self.h.get(key, []) if len(hist) < 2 or hist[-1][0] - hist[0][0] < 15: return 0.0 du = hist[0][1] - hist[-1][1] dt = hist[-1][0] - hist[0][0] return du / dt if du > 0 else 0.0 def get_region(cp): """Return (region, mode). mode = 'survey' (scanner rows) or 'selected' (Selected Item).""" if cp.has_section("rock"): rs = cp.get("rock", "region", fallback="").strip() if rs: return [int(x) for x in rs.split(",")], cp.get("rock", "mode", fallback="survey") print("[rock] no saved region — locating the Selected-Item Quantity line...") r = detect_selected_region(cp) # primary: the rock you have selected if r: return r, "selected" r = detect_survey_region(cp) # fallback: a Survey Scanner window if r: return r, "survey" return None, None def read_rows(cp, region, mode): import pytesseract tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: # Tesseract isn't always on PATH; set it explicitly pytesseract.pytesseract.tesseract_cmd = tcmd img = w.grab_region(region) try: # upscale: small panel text reads better img = img.resize((img.width * 2, img.height * 2)) except Exception: pass text = pytesseract.image_to_string(img, config="--psm 6") if mode == "selected": q = parse_selected(text) return [("rock", q)] if q else [] return parse_survey(text) def main(): cp = w.load_config() if "--snip" in sys.argv: import importlib # reuse the ore-hold snipper UI but save to [rock] l = w.run_snip.__doc__ # noqa: F841 (keep import side effect minimal) print("Drag a box around the whole Survey Scanner Results list.") _snip_to_rock(cp) return sec = cp.has_section("rock") poll = cp.getint("rock", "poll_secs", fallback=5) if sec else 5 switch_secs = cp.getint("rock", "switch_secs", fallback=45) if sec else 45 cooldown = cp.getint("rock", "cooldown_secs", fallback=60) if sec else 60 status_secs = cp.getint("rock", "status_secs", fallback=90) if sec else 90 if "--test" in sys.argv: import pytesseract tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: pytesseract.pytesseract.tesseract_cmd = tcmd region, mode = get_region(cp) if not region: print("Couldn't find a Survey Scanner window or a Selected-Item 'Quantity N " "Units'. Select a rock (or open the survey results) and retry.") return print(f"mode={mode} rows:", read_rows(cp, region, mode)) return import pytesseract tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: pytesseract.pytesseract.tesseract_cmd = tcmd print(f"[rock] started; switch<{switch_secs}s, poll {poll}s (waits for !mining on)") tracker = Tracker() last_alert = {} last_status = 0.0 region = mode = None while True: if not w.bot_mining(cp): # only during a mining session tracker.h.clear() time.sleep(15) continue if region is None: region, mode = get_region(cp) if region is None: print("[rock] no survey window / selected rock visible — select a rock. " "Retrying...") time.sleep(max(poll, 15)) continue w.heartbeat(cp, "rock") try: rows = read_rows(cp, region, mode) if not rows: # nothing selected at the moment — KEEP the fixed region (positions are # stable; don't burn CPU re-hunting on a 9000px screen), just retry. time.sleep(poll) continue now = time.time() keyed = tracker.update(rows, now) hr = hold_rate() actives = [] # (key, ore, units, tleft) for key, ore, units in keyed: r = tracker.rate(key) # measured units/sec if r <= 0 and len(keyed) == 1 and hr > 0 and ORE_VOL.get(ore): r = hr / 60.0 / ORE_VOL[ore] # single-rock hold-fill fallback if r <= 0: continue actives.append((key, ore, units, units / r)) # the rock you're emptying = the one with the least time left actives.sort(key=lambda x: x[3]) for key, ore, units, tleft in actives: if tleft <= switch_secs and now - last_alert.get(key, 0) > cooldown: last_alert[key] = now others = [(o, u) for (k, o, u) in keyed if k != key] nxt = max(others, key=lambda x: x[1]) if others else None empty_at = int(now + tleft) msg = f"{ore.title()} rock empties ({units:,} u left)." if nxt: msg += f" Switch to {nxt[0].title()} ({nxt[1]:,} u)." w.notify(cp, "Switch rocks", msg, priority="high", tags="pick,gem") print(f"[rock] ALERT {msg}") # periodic live readout of the rock you're on (so you see the countdown) if actives and not w.bot_muted(cp) and now - last_status >= status_secs: key, ore, units, tleft = actives[0] empty_at = int(now + tleft) others = [(o, u) for (k, o, u) in keyed if k != key] nxt = max(others, key=lambda x: x[1]) if others else None line = f"🪨 {ore.title()} {units:,} u · empties " if nxt: line += f" · next: {nxt[0].title()} ({nxt[1]:,} u)" w._discord_live(cp, "rock", f"⛏️ {socket.gethostname()} current rock", line) last_status = now except Exception as e: print(f"[rock] error: {e}") time.sleep(poll) def _snip_to_rock(cp): """Drag-select the survey window, save its box to [rock] region.""" import tkinter as tk coords = {} root = tk.Tk() root.attributes("-fullscreen", True) root.attributes("-alpha", 0.25) root.configure(bg="black") canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0) canvas.pack(fill=tk.BOTH, expand=True) rect = {"id": None, "x0": 0, "y0": 0} def on_press(e): rect["x0"], rect["y0"] = e.x_root, e.y_root rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2) def on_drag(e): canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(), rect["y0"] - root.winfo_rooty(), e.x, e.y) def on_release(e): x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0)) root.destroy() canvas.bind("", on_press) canvas.bind("", on_drag) canvas.bind("", on_release) root.bind("", lambda e: root.destroy()) root.mainloop() if "r" in coords: l, t, wd, ht = coords["r"] save_rock_region(cp, l, t, wd, ht) print(f"Saved survey region {l},{t},{wd},{ht} to config.ini") else: print("Cancelled.") if __name__ == "__main__": main()