diff --git a/eve_rock_watcher.py b/eve_rock_watcher.py new file mode 100644 index 0000000..4fcd99e --- /dev/null +++ b/eve_rock_watcher.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +"""Rock watcher — 'when do I switch asteroids?' alerts from your Survey Scanner. + +EVE has no API for asteroid contents, so this reads the **Survey Scanner Results** +window off the screen (the only place that shows units remaining per rock). It tracks +each rock's amount over time and tells you, in Discord, when the rock you're mining is +about to run dry — and which rock to jump to next. + +Two ways it knows your depletion rate (best available wins): + 1. Direct — if the survey scanner is left running, each rock's number drops every + cycle; it measures units/sec straight from that (most accurate, multi-rock). + 2. Hold-fill fallback — if there's a single rock and the scanner is static, it uses + the live ore-hold fill rate (written by eve_orehold_watcher to .mining_rate) + divided by the ore's m³/unit. + +Only active during a mining session: start one with `!mining on` in Discord +(`!mining off` to stop). Runs headless, launched by start-all.ps1. + + python eve_rock_watcher.py # normal (waits for !mining on) + python eve_rock_watcher.py --snip # manually box the survey window (if auto fails) + python eve_rock_watcher.py --test # parse one screen grab and print rows +""" +import json +import os +import re +import sys +import time + +HERE = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, HERE) +import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining + +# ore unit volumes (m³/unit) for the hold-fill fallback; base names, longest first +ORE_VOL = { + "dark ochre": 8.0, "veldspar": 0.1, "scordite": 0.15, "pyroxeres": 0.3, + "plagioclase": 0.35, "omber": 0.6, "kernite": 1.2, "jaspet": 2.0, + "hemorphite": 3.0, "hedbergite": 3.0, "gneiss": 5.0, "crokite": 16.0, + "spodumain": 16.0, "bistot": 16.0, "arkonor": 16.0, "mercoxit": 40.0, +} +ORE_NAMES = sorted(ORE_VOL, key=len, reverse=True) # match "dark ochre" before "ochre" + + +def match_ore(line_lower): + for name in ORE_NAMES: + if name in line_lower: + return name + return None + + +def parse_survey(text): + """OCR text of the Survey Scanner Results -> [(ore_base_name, units), ...].""" + rows = [] + for raw in text.splitlines(): + line = raw.strip() + if not line: + continue + ore = match_ore(line.lower()) + if not ore: + continue + ints = [int(n.replace(",", "")) for n in re.findall(r"\d[\d,]*", line)] + ints = [n for n in ints if n >= 10] # drop tiny (distance fragments) + if not ints: + continue + rows.append((ore, max(ints))) # units is the big number on the row + return rows + + +def save_rock_region(cp, l, t, wd, ht): + if not cp.has_section("rock"): + cp.add_section("rock") + cp["rock"]["region"] = f"{l},{t},{wd},{ht}" + with open(w.CONFIG_PATH, "w") as f: + cp.write(f) + + +def detect_survey_region(cp): + """Find the Survey Scanner Results window by locating >=2 'ore name + number' rows.""" + import mss + import pytesseract + from PIL import Image + tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() + if tcmd: + pytesseract.pytesseract.tesseract_cmd = tcmd + try: + with mss.mss() as sct: + mon = sct.monitors[0] + raw = sct.grab(mon) + img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX") + data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) + except Exception as e: + print(f"[rock] detect error: {e}") + return None + lines = {} + for i in range(len(data["text"])): + if not data["text"][i].strip(): + continue + k = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) + lines.setdefault(k, []).append(i) + boxes = [] + for idxs in lines.values(): + joined = " ".join(data["text"][i] for i in idxs).lower() + if match_ore(joined) and re.search(r"\d", joined): + xs = [data["left"][i] for i in idxs] + ys = [data["top"][i] for i in idxs] + rs = [data["left"][i] + data["width"][i] for i in idxs] + bs = [data["top"][i] + data["height"][i] for i in idxs] + boxes.append((min(xs), min(ys), max(rs), max(bs))) + if len(boxes) < 2: + return None + pad = 12 + l = mon["left"] + min(b[0] for b in boxes) - pad + t = mon["top"] + min(b[1] for b in boxes) - pad + wd = max(b[2] for b in boxes) - min(b[0] for b in boxes) + 2 * pad + ht = max(b[3] for b in boxes) - min(b[1] for b in boxes) + 2 * pad + save_rock_region(cp, l, t, wd, ht) + print(f"[rock] auto-detected survey window -> {l},{t},{wd},{ht} ({len(boxes)} rows)") + return [l, t, wd, ht] + + +def hold_rate(): + """Live ore-hold fill rate (m³/min) written by eve_orehold_watcher, if fresh.""" + try: + d = json.load(open(w.RATE_PATH)) + if time.time() - d.get("ts", 0) <= 120: + return float(d.get("rate_m3min", 0) or 0) + except Exception: + pass + return 0.0 + + +class Tracker: + """Per-rock unit history -> measured depletion rate (units/sec).""" + + def __init__(self): + self.h = {} # key -> [(t, units)] + + def update(self, rows, now): + counts, seen, keyed = {}, set(), [] + for ore, units in rows: + ore = ore.lower() + counts[ore] = counts.get(ore, 0) + 1 + key = f"{ore}#{counts[ore]}" + keyed.append((key, ore, units)) + seen.add(key) + hist = self.h.setdefault(key, []) + if hist and units > hist[-1][1] + 1: # went up = new rock / rescan -> reset + hist.clear() + hist.append((now, units)) + self.h[key] = [(t, u) for (t, u) in hist if now - t <= 120] + for key in list(self.h): # forget rocks no longer listed + if key not in seen: + del self.h[key] + return keyed + + def rate(self, key): + hist = self.h.get(key, []) + if len(hist) < 2 or hist[-1][0] - hist[0][0] < 15: + return 0.0 + du = hist[0][1] - hist[-1][1] + dt = hist[-1][0] - hist[0][0] + return du / dt if du > 0 else 0.0 + + +def get_region(cp): + region_s = cp.get("rock", "region", fallback="").strip() if cp.has_section("rock") else "" + if region_s: + return [int(x) for x in region_s.split(",")] + print("[rock] no saved region — auto-detecting the Survey Scanner Results window...") + return detect_survey_region(cp) + + +def main(): + cp = w.load_config() + if "--snip" in sys.argv: + import importlib + # reuse the ore-hold snipper UI but save to [rock] + l = w.run_snip.__doc__ # noqa: F841 (keep import side effect minimal) + print("Drag a box around the whole Survey Scanner Results list.") + _snip_to_rock(cp) + return + + poll = cp.getint("rock", "poll_secs", fallback=5) if cp.has_section("rock") else 5 + switch_secs = cp.getint("rock", "switch_secs", fallback=45) if cp.has_section("rock") else 45 + cooldown = cp.getint("rock", "cooldown_secs", fallback=60) if cp.has_section("rock") else 60 + + if "--test" in sys.argv: + import pytesseract + tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() + if tcmd: + pytesseract.pytesseract.tesseract_cmd = tcmd + region = get_region(cp) + if not region: + print("Couldn't find the survey window. Open it (with rocks scanned) and retry, " + "or run --snip.") + return + text = pytesseract.image_to_string(w.grab_region(region)) + print("rows:", parse_survey(text)) + return + + import pytesseract + tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() + if tcmd: + pytesseract.pytesseract.tesseract_cmd = tcmd + + print(f"[rock] started; switch<{switch_secs}s, poll {poll}s (waits for !mining on)") + tracker = Tracker() + last_alert = {} + region = None + while True: + if not w.bot_mining(cp): # only during a mining session + tracker.h.clear() + time.sleep(15) + continue + if region is None: + region = get_region(cp) + if region is None: + print("[rock] survey window not visible — open it (rocks scanned). Retrying...") + time.sleep(max(poll, 15)) + continue + w.heartbeat(cp, "rock") + try: + text = pytesseract.image_to_string(w.grab_region(region)) + rows = parse_survey(text) + if not rows: + region = None # lost it; re-detect next loop + time.sleep(poll) + continue + now = time.time() + keyed = tracker.update(rows, now) + hr = hold_rate() + for key, ore, units in keyed: + r = tracker.rate(key) # measured units/sec + if r <= 0 and len(keyed) == 1 and hr > 0 and ORE_VOL.get(ore): + r = hr / 60.0 / ORE_VOL[ore] # single-rock hold-fill fallback + if r <= 0: + continue + tleft = units / r + if tleft <= switch_secs and now - last_alert.get(key, 0) > cooldown: + last_alert[key] = now + others = [(o, u) for (k, o, u) in keyed if k != key] + nxt = max(others, key=lambda x: x[1]) if others else None + msg = (f"{ore.title()} rock ~{int(tleft)}s from empty " + f"({units:,} u left).") + if nxt: + msg += f" Next: {nxt[0].title()} ({nxt[1]:,} u)." + w.notify(cp, "Switch rocks", msg, priority="high", tags="pick,gem") + print(f"[rock] ALERT {msg}") + except Exception as e: + print(f"[rock] error: {e}") + time.sleep(poll) + + +def _snip_to_rock(cp): + """Drag-select the survey window, save its box to [rock] region.""" + import tkinter as tk + coords = {} + root = tk.Tk() + root.attributes("-fullscreen", True) + root.attributes("-alpha", 0.25) + root.configure(bg="black") + canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0) + canvas.pack(fill=tk.BOTH, expand=True) + rect = {"id": None, "x0": 0, "y0": 0} + + def on_press(e): + rect["x0"], rect["y0"] = e.x_root, e.y_root + rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2) + + def on_drag(e): + canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(), + rect["y0"] - root.winfo_rooty(), e.x, e.y) + + def on_release(e): + x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root + coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0)) + root.destroy() + + canvas.bind("", on_press) + canvas.bind("", on_drag) + canvas.bind("", on_release) + root.bind("", lambda e: root.destroy()) + root.mainloop() + if "r" in coords: + l, t, wd, ht = coords["r"] + save_rock_region(cp, l, t, wd, ht) + print(f"Saved survey region {l},{t},{wd},{ht} to config.ini") + else: + print("Cancelled.") + + +if __name__ == "__main__": + main()