From b136a9afb1d243d46ccb51966468ac0181f2bf0d Mon Sep 17 00:00:00 2001 From: brockdarnold Date: Sun, 14 Jun 2026 07:17:59 +0000 Subject: [PATCH] publish eve_orehold_watcher.py --- eve_orehold_watcher.py | 336 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 eve_orehold_watcher.py diff --git a/eve_orehold_watcher.py b/eve_orehold_watcher.py new file mode 100644 index 0000000..4dbca2d --- /dev/null +++ b/eve_orehold_watcher.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +""" +eve_orehold_watcher.py - Ping me when my Retriever's ore hold is full. + +Runs on Goliath (Windows). Two detection modes (set in config.ini): + + mode = ocr Reads the in-game "Ore Hold" inventory window every POLL_SECS + via screen capture + OCR, parses " / m3", + and alerts when fill% >= ALERT_PCT. Most accurate. Requires + Tesseract-OCR installed and a one-time region calibration + (run with --snip to draw the box). + + mode = timer No screen reading. You give it your ore-hold size and effective + yield (m3/min); it counts down from when you press Enter and + alerts at the projected fill time. Dead reliable, zero setup. + Only blind spot: a rock depleting earlier than estimated. + +Both modes deliver to: + - ntfy (push to your phone; reuses the same ntfy app you already have) + - Windows toast (on-screen on Goliath, so you see it without alt-tabbing) + +Alerts re-arm automatically (OCR: when the hold drops back below RESET_PCT, +e.g. after you unload; timer: when you start a new run) and respect a cooldown. + +Usage: + python eve_orehold_watcher.py # run the watcher (mode from config) + python eve_orehold_watcher.py --snip # draw the OCR capture region, save it + python eve_orehold_watcher.py --test # fire one test alert and exit +""" + +import configparser +import json +import os +import re +import sys +import time +import urllib.request + +HERE = os.path.dirname(os.path.abspath(__file__)) +CONFIG_PATH = os.path.join(HERE, "config.ini") + + +# --------------------------------------------------------------------------- # +# Config +# --------------------------------------------------------------------------- # +def load_config(): + if not os.path.exists(CONFIG_PATH): + sys.exit(f"No config.ini found. Copy config.ini.example -> config.ini " + f"and edit it.\n(looked in {CONFIG_PATH})") + cp = configparser.ConfigParser() + cp.read(CONFIG_PATH) + return cp + + +def save_region(cp, left, top, width, height): + if not cp.has_section("ocr"): + cp.add_section("ocr") + cp["ocr"]["region"] = f"{left},{top},{width},{height}" + with open(CONFIG_PATH, "w") as f: + cp.write(f) + + +# --------------------------------------------------------------------------- # +# Delivery +# --------------------------------------------------------------------------- # +_BOT_STATE = {"t": 0, "muted": False} + + +def bot_muted(cp): + """Coordinate with the Discord bot: poll the shared alert-state it publishes, so a + `!mute` in Discord also silences these local watchers. Cached 30s.""" + url = cp.get("coordination", "bot_state_url", fallback="").strip() \ + if cp.has_section("coordination") else "" + if not url: + return False + if time.time() - _BOT_STATE["t"] < 30: + return _BOT_STATE["muted"] + try: + d = json.loads(urllib.request.urlopen(url, timeout=5).read()) + _BOT_STATE["muted"] = bool(d.get("muted")) + _BOT_STATE["t"] = time.time() + except Exception: + pass + return _BOT_STATE["muted"] + + +def notify(cp, title, message, priority="high", tags="rock,bell"): + """Fan out to ntfy (phone), a Windows toast, and a Discord channel.""" + if bot_muted(cp): + print("[muted by bot]") + return + _ntfy(cp, title, message, priority, tags) + _toast(title, message) + _discord(cp, title, message) + + +def _discord(cp, title, message): + """Post to a Discord channel via webhook (shared with fleetmates).""" + webhook = cp.get("discord", "webhook", fallback="").strip() + if not webhook: + return + mention = cp.get("discord", "mention", fallback="").strip() # e.g. <@USERID> or @here + payload = json.dumps({"content": f"{mention} **{title}**\n{message}".strip()}) + try: + req = urllib.request.Request( + webhook, data=payload.encode("utf-8"), + headers={"Content-Type": "application/json"}, method="POST") + with urllib.request.urlopen(req, timeout=10) as r: + r.read() + print("[discord] sent") + except Exception as e: + print(f"[discord] FAILED: {e}") + + +def _ntfy(cp, title, message, priority, tags): + server = cp.get("ntfy", "server", fallback="https://ntfy.sh").rstrip("/") + topic = cp.get("ntfy", "topic", fallback="").strip() + if not topic: + print("[ntfy] no topic set, skipping push") + return + url = f"{server}/{topic}" + headers = { + "Title": title, + "Priority": priority, + "Tags": tags, + } + auth = cp.get("ntfy", "auth", fallback="").strip() # "user:pass" for self-hosted + if auth: + import base64 + headers["Authorization"] = "Basic " + base64.b64encode( + auth.encode()).decode() + try: + req = urllib.request.Request(url, data=message.encode("utf-8"), + headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=10) as r: + r.read() + print(f"[ntfy] sent -> {url}") + except Exception as e: + print(f"[ntfy] FAILED: {e}") + + +def _toast(title, message): + try: + from winotify import Notification, audio + t = Notification(app_id="Eve Ore Watcher", title=title, msg=message, + duration="long") + t.set_audio(audio.Default, loop=False) + t.show() + print("[toast] shown") + except Exception as e: + print(f"[toast] FAILED (is 'winotify' installed?): {e}") + + +# --------------------------------------------------------------------------- # +# OCR mode +# --------------------------------------------------------------------------- # +def parse_orehold_text(text): + """Pull (current, capacity) m3 out of OCR text like '12,345 / 22,000 m3'.""" + cleaned = text.replace(",", "").replace(".", "").replace(" ", "") + m = re.search(r"(\d{2,7})/(\d{2,7})", cleaned) + if not m: + return None + cur, cap = int(m.group(1)), int(m.group(2)) + if cap <= 0 or cur > cap * 1.2: + return None + return cur, cap + + +def grab_region(region): + import mss + from PIL import Image + left, top, width, height = region + with mss.mss() as sct: + raw = sct.grab({"left": left, "top": top, "width": width, + "height": height}) + return Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX") + + +def run_ocr(cp): + import pytesseract + tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() + if tcmd: + pytesseract.pytesseract.tesseract_cmd = tcmd + + region_s = cp.get("ocr", "region", fallback="").strip() + if not region_s: + sys.exit("No OCR region set. Run: python eve_orehold_watcher.py --snip") + region = [int(x) for x in region_s.split(",")] + + poll = cp.getint("watcher", "poll_secs", fallback=10) + alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0) + reset_pct = cp.getfloat("watcher", "reset_pct", fallback=50.0) + cooldown = cp.getint("watcher", "cooldown_secs", fallback=120) + # stall = hold not growing -> lasers/drones stopped (depleted rock, idle drones) + stall_secs = cp.getint("watcher", "stall_secs", fallback=150) + + print(f"[ocr] watching region={region} every {poll}s; " + f"alert>={alert_pct}% reset<{reset_pct}% stall>{stall_secs}s") + armed = True + last_alert = 0.0 + last_cur = -1 + last_grow = time.time() + last_stall_alert = 0.0 + while True: + try: + img = grab_region(region) + text = pytesseract.image_to_string(img, config="--psm 7") + parsed = parse_orehold_text(text) + if parsed: + cur, cap = parsed + pct = 100.0 * cur / cap + print(f"[ocr] {cur}/{cap} m3 ({pct:.1f}%) armed={armed}") + if pct < reset_pct: + armed = True + # --- still mining? (hold should be growing) --- + if cur > last_cur: + last_grow = time.time() + last_cur = cur + if pct < alert_pct - 1 and time.time() - last_grow > stall_secs \ + and time.time() - last_stall_alert > cooldown: + notify(cp, "Mining stopped?", + f"Hold hasn't grown in {stall_secs}s at {pct:.0f}% — rock " + f"depleted, drones idle, or lasers offlined? Check.", + priority="high", tags="warning") + last_stall_alert = time.time() + # --- hold full --- + if armed and pct >= alert_pct and \ + time.time() - last_alert > cooldown: + notify(cp, "Hold full — compress", + f"Hold at {pct:.0f}% ({cur:,}/{cap:,} m3). " + f"Compress / unload / swap.") + armed = False + last_alert = time.time() + else: + print(f"[ocr] no reading (text={text!r})") + except Exception as e: + print(f"[ocr] error: {e}") + time.sleep(poll) + + +# --------------------------------------------------------------------------- # +# Timer mode +# --------------------------------------------------------------------------- # +def run_timer(cp): + hold = cp.getfloat("timer", "ore_hold_m3", fallback=22000.0) + yield_pm = cp.getfloat("timer", "yield_m3_per_min", fallback=0.0) + alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0) + if yield_pm <= 0: + sys.exit("Set [timer] yield_m3_per_min in config.ini (your m3/min).") + + target = hold * alert_pct / 100.0 + secs = target / yield_pm * 60.0 + while True: + input(f"\nPress Enter when lasers go hot " + f"(will ping in {secs/60:.1f} min at {alert_pct:.0f}% / " + f"{target:,.0f} m3)... ") + start = time.time() + while time.time() - start < secs: + remaining = secs - (time.time() - start) + print(f" [timer] {remaining/60:5.1f} min to full", end="\r") + time.sleep(5) + notify(cp, "Retriever ore hold full (est.)", + f"~{alert_pct:.0f}% full ({target:,.0f} m3 at " + f"{yield_pm:.0f} m3/min). Check & unload.") + print("\n[timer] alert fired. Loop again for the next run.") + + +# --------------------------------------------------------------------------- # +# Region snip helper (Tkinter drag-to-select) +# --------------------------------------------------------------------------- # +def run_snip(cp): + import tkinter as tk + coords = {} + + root = tk.Tk() + root.attributes("-fullscreen", True) + root.attributes("-alpha", 0.25) + root.configure(bg="black") + root.title("Drag a box over the Ore Hold 'cur / cap m3' text") + canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0) + canvas.pack(fill=tk.BOTH, expand=True) + rect = {"id": None, "x0": 0, "y0": 0} + + def on_press(e): + rect["x0"], rect["y0"] = e.x_root, e.y_root + rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, + outline="red", width=2) + + def on_drag(e): + canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(), + rect["y0"] - root.winfo_rooty(), e.x, e.y) + + def on_release(e): + x0, y0 = rect["x0"], rect["y0"] + x1, y1 = e.x_root, e.y_root + coords["region"] = (min(x0, x1), min(y0, y1), + abs(x1 - x0), abs(y1 - y0)) + root.destroy() + + canvas.bind("", on_press) + canvas.bind("", on_drag) + canvas.bind("", on_release) + root.bind("", lambda e: root.destroy()) + print("Drag a tight box around the '12,345 / 22,000 m3' text. Esc to cancel.") + root.mainloop() + + if "region" in coords: + l, t, w, h = coords["region"] + save_region(cp, l, t, w, h) + print(f"Saved region={l},{t},{w},{h} to config.ini") + else: + print("Cancelled, nothing saved.") + + +# --------------------------------------------------------------------------- # +def main(): + cp = load_config() + if "--snip" in sys.argv: + run_snip(cp) + return + if "--test" in sys.argv: + notify(cp, "Eve watcher test", + "If you got this on phone + toast, delivery works.") + return + mode = cp.get("watcher", "mode", fallback="ocr").strip().lower() + print(f"=== eve_orehold_watcher starting (mode={mode}) ===") + if mode == "ocr": + run_ocr(cp) + elif mode == "timer": + run_timer(cp) + else: + sys.exit(f"Unknown mode '{mode}' (use 'ocr' or 'timer')") + + +if __name__ == "__main__": + main()