#!/usr/bin/env python3 """ eve_orehold_watcher.py - Ping me when my Retriever's ore hold is full. Runs on Goliath (Windows). Two detection modes (set in config.ini): mode = ocr Reads the in-game "Ore Hold" inventory window every POLL_SECS via screen capture + OCR, parses " / m3", and alerts when fill% >= ALERT_PCT. Most accurate. Requires Tesseract-OCR installed and a one-time region calibration (run with --snip to draw the box). mode = timer No screen reading. You give it your ore-hold size and effective yield (m3/min); it counts down from when you press Enter and alerts at the projected fill time. Dead reliable, zero setup. Only blind spot: a rock depleting earlier than estimated. Both modes deliver to: - ntfy (push to your phone; reuses the same ntfy app you already have) - Windows toast (on-screen on Goliath, so you see it without alt-tabbing) Alerts re-arm automatically (OCR: when the hold drops back below RESET_PCT, e.g. after you unload; timer: when you start a new run) and respect a cooldown. Usage: python eve_orehold_watcher.py # run the watcher (mode from config) python eve_orehold_watcher.py --snip # draw the OCR capture region, save it python eve_orehold_watcher.py --test # fire one test alert and exit """ import configparser import json import os import re import socket import subprocess import sys import time import urllib.request # Stop child processes (Tesseract on every OCR scan, etc.) from flashing a console # window. Patches subprocess so every spawned process gets CREATE_NO_WINDOW on Windows. # All watchers import this module, so this covers the whole fleet. if os.name == "nt": _CREATE_NO_WINDOW = 0x08000000 _orig_popen_init = subprocess.Popen.__init__ def _popen_no_window(self, *a, **k): k["creationflags"] = k.get("creationflags", 0) | _CREATE_NO_WINDOW _orig_popen_init(self, *a, **k) subprocess.Popen.__init__ = _popen_no_window HERE = os.path.dirname(os.path.abspath(__file__)) CONFIG_PATH = os.path.join(HERE, "config.ini") def heartbeat(cp, source): """No-op in Discord — 'watcher online' pings were non-information spam. Watchers being up is evident from the actual alerts/status. Kept as a local log line only.""" print(f"[{source}] online") # --------------------------------------------------------------------------- # # Config # --------------------------------------------------------------------------- # def load_config(): if not os.path.exists(CONFIG_PATH): sys.exit(f"No config.ini found. Copy config.ini.example -> config.ini " f"and edit it.\n(looked in {CONFIG_PATH})") cp = configparser.ConfigParser() cp.read(CONFIG_PATH, encoding="utf-8-sig") # tolerate a UTF-8 BOM return cp def save_region(cp, left, top, width, height): if not cp.has_section("ocr"): cp.add_section("ocr") cp["ocr"]["region"] = f"{left},{top},{width},{height}" with open(CONFIG_PATH, "w") as f: cp.write(f) def snip_region(prompt="Drag a box around the area to watch. Esc to cancel."): """Full-screen drag-select; returns (left, top, width, height) or None. Reusable by any watcher that needs a one-time region (survey scanner, Local window, ...).""" import tkinter as tk coords = {} root = tk.Tk() root.attributes("-fullscreen", True) root.attributes("-alpha", 0.25) root.configure(bg="black") canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0) canvas.pack(fill=tk.BOTH, expand=True) rect = {"id": None, "x0": 0, "y0": 0} def on_press(e): rect["x0"], rect["y0"] = e.x_root, e.y_root rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2) def on_drag(e): canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(), rect["y0"] - root.winfo_rooty(), e.x, e.y) def on_release(e): x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0)) root.destroy() canvas.bind("", on_press) canvas.bind("", on_drag) canvas.bind("", on_release) root.bind("", lambda e: root.destroy()) print(prompt) root.mainloop() return coords.get("r") RATE_PATH = os.path.join(HERE, ".mining_rate") def write_rate(m3_per_min, cur, cap): """Publish the live ore-hold fill rate so the rock watcher can estimate how long until the asteroid you're on is depleted. Written every OCR tick.""" try: with open(RATE_PATH, "w") as f: json.dump({"rate_m3min": round(m3_per_min, 1), "cur": cur, "cap": cap, "ts": int(time.time())}, f) except Exception: pass # --------------------------------------------------------------------------- # # Delivery # --------------------------------------------------------------------------- # _BOT_STATE = {"t": 0, "muted": False, "mining": False} DEFAULT_STATE_URL = ("https://git.armoredarmadillo.com/brockdarnold/eve-watcher/" "raw/branch/main/alert_state.json") def _bot_state(cp): """Poll the shared alert-state the Discord bot publishes (mute/quiet/mining), cached 30s, so a Discord `!mute` / `!mining` also drives the local watchers. Defaults to the public alert_state.json (no config needed).""" url = (cp.get("coordination", "bot_state_url", fallback="").strip() if cp.has_section("coordination") else "") or DEFAULT_STATE_URL if time.time() - _BOT_STATE["t"] < 30: return _BOT_STATE try: req = urllib.request.Request(url, headers={"User-Agent": "eve-watcher"}) d = json.loads(urllib.request.urlopen(req, timeout=5).read()) _BOT_STATE["muted"] = bool(d.get("muted")) _BOT_STATE["mining"] = bool(d.get("mining")) _BOT_STATE["t"] = time.time() except Exception: pass return _BOT_STATE def bot_muted(cp): return _bot_state(cp)["muted"] def bot_mining(cp): """True when a mining session is active (started via `!mining on` in Discord).""" return _bot_state(cp)["mining"] def notify(cp, title, message, priority="high", tags="rock,bell"): """Fan out to ntfy (phone), a Windows toast, and a Discord channel.""" if bot_muted(cp): print("[muted by bot]") return _ntfy(cp, title, message, priority, tags) _toast(title, message) _discord(cp, title, message) # Shared fleet-channel webhook — fallback so alerts reach Discord even if an older # config.ini has no webhook set (config is gitignored, so updates can't fill it). DEFAULT_WEBHOOK = ("https://discord.com/api/webhooks/1515603432583598172/" "7g2A9Lfg1afbZGoxBENu9TpxSxE4zfpg16nRqE08qzyI3a0uttADL6wyJ2ERHRfsHlK9") def _discord(cp, title, message): """Post to a Discord channel via webhook (shared with fleetmates).""" webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK if not webhook: return mention = cp.get("discord", "mention", fallback="").strip() # e.g. <@USERID> or @here payload = json.dumps({"content": f"{mention} **{title}**\n{message}".strip()}) try: req = urllib.request.Request( webhook, data=payload.encode("utf-8"), headers={"Content-Type": "application/json", "User-Agent": "eve-watcher"}, method="POST") # Discord 403s a missing/blank User-Agent — must set one with urllib.request.urlopen(req, timeout=10) as r: r.read() print("[discord] sent") except Exception as e: print(f"[discord] FAILED: {e}") def _discord_live(cp, key, title, message): """Post-or-EDIT a single 'live' message so status updates IN PLACE (no spam). Stores the message id per key in .live_. Skips when muted.""" if bot_muted(cp): return webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK if not webhook: return idfile = os.path.join(HERE, f".live_{key}") body = json.dumps({"content": f"**{title}**\n{message}"}).encode("utf-8") hdr = {"Content-Type": "application/json", "User-Agent": "eve-watcher"} mid = None try: mid = (open(idfile).read().strip() or None) except Exception: pass if mid: # try to edit the existing message try: urllib.request.urlopen(urllib.request.Request( f"{webhook}/messages/{mid}", data=body, headers=hdr, method="PATCH"), timeout=10).read() return except Exception: mid = None # message gone -> repost below try: resp = json.loads(urllib.request.urlopen(urllib.request.Request( f"{webhook}?wait=true", data=body, headers=hdr, method="POST"), timeout=10).read()) with open(idfile, "w") as f: f.write(str(resp.get("id", ""))) except Exception as e: print(f"[discord-live] {e}") def _ntfy(cp, title, message, priority, tags): server = cp.get("ntfy", "server", fallback="https://ntfy.sh").rstrip("/") topic = cp.get("ntfy", "topic", fallback="").strip() if not topic: print("[ntfy] no topic set, skipping push") return url = f"{server}/{topic}" headers = { "Title": title, "Priority": priority, "Tags": tags, } auth = cp.get("ntfy", "auth", fallback="").strip() # "user:pass" for self-hosted if auth: import base64 headers["Authorization"] = "Basic " + base64.b64encode( auth.encode()).decode() try: req = urllib.request.Request(url, data=message.encode("utf-8"), headers=headers, method="POST") with urllib.request.urlopen(req, timeout=10) as r: r.read() print(f"[ntfy] sent -> {url}") except Exception as e: print(f"[ntfy] FAILED: {e}") def _toast(title, message): try: from winotify import Notification, audio t = Notification(app_id="Eve Ore Watcher", title=title, msg=message, duration="long") t.set_audio(audio.Default, loop=False) t.show() print("[toast] shown") except Exception as e: print(f"[toast] FAILED (is 'winotify' installed?): {e}") # --------------------------------------------------------------------------- # # OCR mode # --------------------------------------------------------------------------- # def parse_orehold_text(text): """Pull (current, capacity) m3 out of OCR text like '12,345 / 22,000 m3'.""" cleaned = text.replace(",", "").replace(".", "").replace(" ", "") m = re.search(r"(\d{2,7})/(\d{2,7})", cleaned) if not m: return None cur, cap = int(m.group(1)), int(m.group(2)) if cap <= 0 or cur > cap * 1.2: return None return cur, cap def grab_region(region): import mss from PIL import Image left, top, width, height = region with mss.mss() as sct: raw = sct.grab({"left": left, "top": top, "width": width, "height": height}) return Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX") def _detect_readout(cp, img, mon_left=0, mon_top=0, save=True): """Find the ore-hold 'cur / cap m3' readout inside a screenshot. Returns the absolute-screen region [l,t,w,h] (and saves it) or None. Split out from the screen-grab so it can be unit-tested against a rendered image.""" import pytesseract data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) lines = {} for i in range(len(data["text"])): if not data["text"][i].strip(): continue key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) lines.setdefault(key, []).append(i) best = None # (cap, [l,t,w,h]) for idxs in lines.values(): joined = " ".join(data["text"][i] for i in idxs) parsed = parse_orehold_text(joined) if not parsed: continue cur, cap = parsed if not (500 <= cap <= 2_000_000): # plausible ore/gas hold capacity continue xs = [data["left"][i] for i in idxs] ys = [data["top"][i] for i in idxs] rights = [data["left"][i] + data["width"][i] for i in idxs] bots = [data["top"][i] + data["height"][i] for i in idxs] pad = 8 region = [mon_left + min(xs) - pad, mon_top + min(ys) - pad, (max(rights) - min(xs)) + 2 * pad, (max(bots) - min(ys)) + 2 * pad] if best is None or cap > best[0]: # ore hold cap is the big number best = (cap, region) if best: l, t, w, h = best[1] if save: save_region(cp, l, t, w, h) print(f"[ocr] auto-detected ore-hold readout -> region={l},{t},{w},{h} " f"(cap~{best[0]:,})") return best[1] return None def auto_region(cp): """Scan the whole screen for the ore-hold readout and save its location — replaces the manual --snip. Just have the in-game Ore Hold window open.""" import mss from PIL import Image try: with mss.mss() as sct: mon = sct.monitors[0] # full virtual desktop (all screens) raw = sct.grab(mon) img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX") except Exception as e: print(f"[ocr] auto-detect error: {e}") return None return _detect_readout(cp, img, mon["left"], mon["top"]) def run_ocr(cp): import pytesseract tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if tcmd: pytesseract.pytesseract.tesseract_cmd = tcmd poll = cp.getint("watcher", "poll_secs", fallback=10) alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0) reset_pct = cp.getfloat("watcher", "reset_pct", fallback=50.0) cooldown = cp.getint("watcher", "cooldown_secs", fallback=120) # stall = hold not growing -> lasers/drones stopped (depleted rock, idle drones) stall_secs = cp.getint("watcher", "stall_secs", fallback=75) status_secs = cp.getint("watcher", "status_secs", fallback=90) # live Discord readout region_s = cp.get("ocr", "region", fallback="").strip() if region_s: region = [int(x) for x in region_s.split(",")] else: print("[ocr] no saved region — auto-detecting the Ore Hold readout on screen...") region = auto_region(cp) while region is None: print(f"[ocr] not visible yet — open your in-game Ore Hold window. " f"Retrying in {max(poll, 15)}s...") time.sleep(max(poll, 15)) region = auto_region(cp) print(f"[ocr] watching region={region} every {poll}s; " f"alert>={alert_pct}% reset<{reset_pct}% stall>{stall_secs}s") armed = True last_alert = 0.0 last_cur = -1 last_grow = time.time() last_stall_alert = 0.0 misses = 0 samples = [] # recent (t, cur) for the live fill-rate estimate rate_m3min = 0.0 last_status = 0.0 was_mining = False while True: try: img = grab_region(region) text = pytesseract.image_to_string(img, config="--psm 7") parsed = parse_orehold_text(text) if parsed: misses = 0 cur, cap = parsed pct = 100.0 * cur / cap print(f"[ocr] {cur}/{cap} m3 ({pct:.1f}%) armed={armed}") # --- live fill rate (m3/min) over a ~90s window -> .mining_rate --- now = time.time() samples.append((now, cur)) samples = [(t, c) for (t, c) in samples if now - t <= 90 and c <= cur] if len(samples) >= 2 and (samples[-1][0] - samples[0][0]) >= 20: dc = samples[-1][1] - samples[0][1] dt = samples[-1][0] - samples[0][0] rate_m3min = dc / dt * 60 if dc > 0 else 0.0 write_rate(rate_m3min, cur, cap) # --- live status feed to Discord during a mining session --- mining = bot_mining(cp) if mining and not bot_muted(cp) and \ (not was_mining or time.time() - last_status >= status_secs): free = cap - cur if rate_m3min > 0: full_at = int(time.time() + free / rate_m3min * 60) tail = f"filling {rate_m3min:,.0f} m³/min · full " else: tail = "not growing yet — lasers/drones idle?" act = " · 🟠 **COMPRESS NOW**" if pct >= alert_pct else \ (" · compress soon" if pct >= 80 else "") _discord_live(cp, "hold", f"⛏️ {socket.gethostname()} ore hold", f"{pct:.0f}% ({cur:,}/{cap:,} m³) · {tail}{act}") last_status = time.time() was_mining = mining if pct < reset_pct: armed = True # --- still mining? (hold should be growing) --- if cur > last_cur: last_grow = time.time() last_cur = cur if pct < alert_pct - 1 and time.time() - last_grow > stall_secs \ and time.time() - last_stall_alert > cooldown: notify(cp, "Mining stopped?", f"Hold hasn't grown in {stall_secs}s at {pct:.0f}% — rock " f"depleted, drones idle, or lasers offlined? Check.", priority="high", tags="warning") last_stall_alert = time.time() # --- hold full --- if armed and pct >= alert_pct and \ time.time() - last_alert > cooldown: notify(cp, "Hold full — compress", f"Hold at {pct:.0f}% ({cur:,}/{cap:,} m3). " f"Compress / unload / swap.") armed = False last_alert = time.time() else: misses += 1 print(f"[ocr] no reading (text={text!r})") # window moved / closed? after ~2 min of misses, re-find it on screen if misses % 12 == 0: print("[ocr] readout lost — re-scanning screen for the Ore Hold...") new = auto_region(cp) if new: region = new except Exception as e: print(f"[ocr] error: {e}") time.sleep(poll) # --------------------------------------------------------------------------- # # Timer mode # --------------------------------------------------------------------------- # def run_timer(cp): hold = cp.getfloat("timer", "ore_hold_m3", fallback=22000.0) yield_pm = cp.getfloat("timer", "yield_m3_per_min", fallback=0.0) alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0) if yield_pm <= 0: sys.exit("Set [timer] yield_m3_per_min in config.ini (your m3/min).") target = hold * alert_pct / 100.0 secs = target / yield_pm * 60.0 while True: input(f"\nPress Enter when lasers go hot " f"(will ping in {secs/60:.1f} min at {alert_pct:.0f}% / " f"{target:,.0f} m3)... ") start = time.time() while time.time() - start < secs: remaining = secs - (time.time() - start) print(f" [timer] {remaining/60:5.1f} min to full", end="\r") time.sleep(5) notify(cp, "Retriever ore hold full (est.)", f"~{alert_pct:.0f}% full ({target:,.0f} m3 at " f"{yield_pm:.0f} m3/min). Check & unload.") print("\n[timer] alert fired. Loop again for the next run.") # --------------------------------------------------------------------------- # # Region snip helper (Tkinter drag-to-select) # --------------------------------------------------------------------------- # def run_snip(cp): import tkinter as tk coords = {} root = tk.Tk() root.attributes("-fullscreen", True) root.attributes("-alpha", 0.25) root.configure(bg="black") root.title("Drag a box over the Ore Hold 'cur / cap m3' text") canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0) canvas.pack(fill=tk.BOTH, expand=True) rect = {"id": None, "x0": 0, "y0": 0} def on_press(e): rect["x0"], rect["y0"] = e.x_root, e.y_root rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2) def on_drag(e): canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(), rect["y0"] - root.winfo_rooty(), e.x, e.y) def on_release(e): x0, y0 = rect["x0"], rect["y0"] x1, y1 = e.x_root, e.y_root coords["region"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0)) root.destroy() canvas.bind("", on_press) canvas.bind("", on_drag) canvas.bind("", on_release) root.bind("", lambda e: root.destroy()) print("Drag a tight box around the '12,345 / 22,000 m3' text. Esc to cancel.") root.mainloop() if "region" in coords: l, t, w, h = coords["region"] save_region(cp, l, t, w, h) print(f"Saved region={l},{t},{w},{h} to config.ini") else: print("Cancelled, nothing saved.") # --------------------------------------------------------------------------- # def main(): cp = load_config() if "--snip" in sys.argv: run_snip(cp) return if "--test" in sys.argv: notify(cp, "Eve watcher test", "If you got this on phone + toast, delivery works.") return mode = cp.get("watcher", "mode", fallback="ocr").strip().lower() print(f"=== eve_orehold_watcher starting (mode={mode}) ===") heartbeat(cp, "orehold") if mode == "ocr": run_ocr(cp) elif mode == "timer": run_timer(cp) else: sys.exit(f"Unknown mode '{mode}' (use 'ocr' or 'timer')") if __name__ == "__main__": main()