eve-watcher/eve_orehold_watcher.py

659 lines
27 KiB
Python

#!/usr/bin/env python3
"""
eve_orehold_watcher.py - Ping me when my Retriever's ore hold is full.
Runs on Goliath (Windows). Two detection modes (set in config.ini):
mode = ocr Reads the in-game "Ore Hold" inventory window every POLL_SECS
via screen capture + OCR, parses "<current> / <capacity> m3",
and alerts when fill% >= ALERT_PCT. Most accurate. Requires
Tesseract-OCR installed and a one-time region calibration
(run with --snip to draw the box).
mode = timer No screen reading. You give it your ore-hold size and effective
yield (m3/min); it counts down from when you press Enter and
alerts at the projected fill time. Dead reliable, zero setup.
Only blind spot: a rock depleting earlier than estimated.
Both modes deliver to:
- ntfy (push to your phone; reuses the same ntfy app you already have)
- Windows toast (on-screen on Goliath, so you see it without alt-tabbing)
Alerts re-arm automatically (OCR: when the hold drops back below RESET_PCT,
e.g. after you unload; timer: when you start a new run) and respect a cooldown.
Usage:
python eve_orehold_watcher.py # run the watcher (mode from config)
python eve_orehold_watcher.py --snip # draw the OCR capture region, save it
python eve_orehold_watcher.py --test # fire one test alert and exit
"""
import configparser
import json
import os
import re
import socket
import subprocess
import sys
import time
import urllib.request
# Stop child processes (Tesseract on every OCR scan, etc.) from flashing a console
# window. Patches subprocess so every spawned process gets CREATE_NO_WINDOW on Windows.
# All watchers import this module, so this covers the whole fleet.
if os.name == "nt":
_CREATE_NO_WINDOW = 0x08000000
_orig_popen_init = subprocess.Popen.__init__
def _popen_no_window(self, *a, **k):
k["creationflags"] = k.get("creationflags", 0) | _CREATE_NO_WINDOW
_orig_popen_init(self, *a, **k)
subprocess.Popen.__init__ = _popen_no_window
HERE = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(HERE, "config.ini")
def heartbeat(cp, source):
"""No-op in Discord — 'watcher online' pings were non-information spam. Watchers
being up is evident from the actual alerts/status. Kept as a local log line only."""
print(f"[{source}] online")
# --------------------------------------------------------------------------- #
# Config
# --------------------------------------------------------------------------- #
def load_config():
if not os.path.exists(CONFIG_PATH):
sys.exit(f"No config.ini found. Copy config.ini.example -> config.ini "
f"and edit it.\n(looked in {CONFIG_PATH})")
# inline_comment_prefixes: strip ' ; comment' after values, else getint() crashes
# on lines like "poll_secs = 10 ; how often" (config.ini.example has these).
cp = configparser.ConfigParser(inline_comment_prefixes=(";",))
cp.read(CONFIG_PATH, encoding="utf-8-sig") # tolerate a UTF-8 BOM
return cp
def save_region(cp, left, top, width, height):
if not cp.has_section("ocr"):
cp.add_section("ocr")
cp["ocr"]["region"] = f"{left},{top},{width},{height}"
with open(CONFIG_PATH, "w") as f:
cp.write(f)
def snip_region(prompt="Drag a box around the area to watch. Esc to cancel."):
"""Full-screen drag-select; returns (left, top, width, height) or None. Reusable
by any watcher that needs a one-time region (survey scanner, Local window, ...)."""
import tkinter as tk
coords = {}
root = tk.Tk()
root.attributes("-fullscreen", True)
root.attributes("-alpha", 0.25)
root.configure(bg="black")
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
canvas.pack(fill=tk.BOTH, expand=True)
rect = {"id": None, "x0": 0, "y0": 0}
def on_press(e):
rect["x0"], rect["y0"] = e.x_root, e.y_root
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2)
def on_drag(e):
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
rect["y0"] - root.winfo_rooty(), e.x, e.y)
def on_release(e):
x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root
coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0))
root.destroy()
canvas.bind("<ButtonPress-1>", on_press)
canvas.bind("<B1-Motion>", on_drag)
canvas.bind("<ButtonRelease-1>", on_release)
root.bind("<Escape>", lambda e: root.destroy())
print(prompt)
root.mainloop()
return coords.get("r")
RATE_PATH = os.path.join(HERE, ".mining_rate")
def write_rate(m3_per_min, cur, cap):
"""Publish the live ore-hold fill rate so the rock watcher can estimate how long
until the asteroid you're on is depleted. Written every OCR tick."""
try:
with open(RATE_PATH, "w") as f:
json.dump({"rate_m3min": round(m3_per_min, 1), "cur": cur, "cap": cap,
"ts": int(time.time())}, f)
except Exception:
pass
# --------------------------------------------------------------------------- #
# Delivery
# --------------------------------------------------------------------------- #
_BOT_STATE = {"t": 0, "muted": False, "mining": False}
DEFAULT_STATE_URL = ("https://git.armoredarmadillo.com/brockdarnold/eve-watcher/"
"raw/branch/main/alert_state.json")
def _bot_state(cp):
"""Poll the shared alert-state the Discord bot publishes (mute/quiet/mining),
cached 30s, so a Discord `!mute` / `!mining` also drives the local watchers.
Defaults to the public alert_state.json (no config needed)."""
url = (cp.get("coordination", "bot_state_url", fallback="").strip()
if cp.has_section("coordination") else "") or DEFAULT_STATE_URL
if time.time() - _BOT_STATE["t"] < 30:
return _BOT_STATE
try:
req = urllib.request.Request(url, headers={"User-Agent": "eve-watcher"})
d = json.loads(urllib.request.urlopen(req, timeout=5).read())
_BOT_STATE["muted"] = bool(d.get("muted"))
_BOT_STATE["mining"] = bool(d.get("mining"))
_BOT_STATE["t"] = time.time()
except Exception:
pass
return _BOT_STATE
def bot_muted(cp):
return _bot_state(cp)["muted"]
def bot_mining(cp):
"""True when a mining session is active (started via `!mining on` in Discord)."""
return _bot_state(cp)["mining"]
def notify(cp, title, message, priority="high", tags="rock,bell"):
"""Fan out to ntfy (phone), a Windows toast, and a Discord channel."""
if bot_muted(cp):
print("[muted by bot]")
return
_ntfy(cp, title, message, priority, tags)
_toast(title, message)
_discord(cp, title, message)
# Shared fleet-channel webhook — fallback so alerts reach Discord even if an older
# config.ini has no webhook set (config is gitignored, so updates can't fill it).
DEFAULT_WEBHOOK = ("https://discord.com/api/webhooks/1515603432583598172/"
"7g2A9Lfg1afbZGoxBENu9TpxSxE4zfpg16nRqE08qzyI3a0uttADL6wyJ2ERHRfsHlK9")
def _discord(cp, title, message):
"""Post to a Discord channel via webhook (shared with fleetmates)."""
webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK
if not webhook:
return
mention = cp.get("discord", "mention", fallback="").strip() # e.g. <@USERID> or @here
payload = json.dumps({"content": f"{mention} **{title}**\n{message}".strip()})
try:
req = urllib.request.Request(
webhook, data=payload.encode("utf-8"),
headers={"Content-Type": "application/json", "User-Agent": "eve-watcher"},
method="POST") # Discord 403s a missing/blank User-Agent — must set one
with urllib.request.urlopen(req, timeout=10) as r:
r.read()
print("[discord] sent")
except Exception as e:
print(f"[discord] FAILED: {e}")
def _discord_live(cp, key, title, message):
"""Post-or-EDIT a single 'live' message so status updates IN PLACE (no spam).
Stores the message id per key in .live_<key>. Skips when muted."""
if bot_muted(cp):
return
webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK
if not webhook:
return
idfile = os.path.join(HERE, f".live_{key}")
body = json.dumps({"content": f"**{title}**\n{message}"}).encode("utf-8")
hdr = {"Content-Type": "application/json", "User-Agent": "eve-watcher"}
mid = None
try:
mid = (open(idfile).read().strip() or None)
except Exception:
pass
if mid: # try to edit the existing message
try:
urllib.request.urlopen(urllib.request.Request(
f"{webhook}/messages/{mid}", data=body, headers=hdr, method="PATCH"),
timeout=10).read()
return
except Exception:
mid = None # message gone -> repost below
try:
resp = json.loads(urllib.request.urlopen(urllib.request.Request(
f"{webhook}?wait=true", data=body, headers=hdr, method="POST"),
timeout=10).read())
with open(idfile, "w") as f:
f.write(str(resp.get("id", "")))
except Exception as e:
print(f"[discord-live] {e}")
def _ntfy(cp, title, message, priority, tags):
server = cp.get("ntfy", "server", fallback="https://ntfy.sh").rstrip("/")
topic = cp.get("ntfy", "topic", fallback="").strip()
if not topic:
print("[ntfy] no topic set, skipping push")
return
url = f"{server}/{topic}"
headers = {
"Title": title,
"Priority": priority,
"Tags": tags,
}
auth = cp.get("ntfy", "auth", fallback="").strip() # "user:pass" for self-hosted
if auth:
import base64
headers["Authorization"] = "Basic " + base64.b64encode(
auth.encode()).decode()
try:
req = urllib.request.Request(url, data=message.encode("utf-8"),
headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=10) as r:
r.read()
print(f"[ntfy] sent -> {url}")
except Exception as e:
print(f"[ntfy] FAILED: {e}")
def _toast(title, message):
try:
from winotify import Notification, audio
t = Notification(app_id="Eve Ore Watcher", title=title, msg=message,
duration="long")
t.set_audio(audio.Default, loop=False)
t.show()
print("[toast] shown")
except Exception as e:
print(f"[toast] FAILED (is 'winotify' installed?): {e}")
# --------------------------------------------------------------------------- #
# OCR mode
# --------------------------------------------------------------------------- #
def parse_orehold_text(text):
"""Pull (current, capacity) m3 out of OCR text like '12,345 / 22,000 m3'."""
cleaned = text.replace(",", "").replace(".", "").replace(" ", "")
m = re.search(r"(\d{2,7})/(\d{2,7})", cleaned)
if not m:
return None
cur, cap = int(m.group(1)), int(m.group(2))
if cap <= 0 or cur > cap * 1.2:
return None
return cur, cap
def capture_window(cls="triuiScreen"):
"""Capture the EVE client window via PrintWindow — gets EVE's pixels even when it's
behind Discord/unfocused/moved (no overlap, no screen-position dependence). Returns a
PIL Image (window-relative, e.g. 3840x2160) or None. Frees GDI handles each call."""
if os.name != "nt":
return None
import ctypes
import ctypes.wintypes as wt
from PIL import Image
u = ctypes.windll.user32
g = ctypes.windll.gdi32
try:
u.SetProcessDPIAware()
except Exception:
pass
hwnd = u.FindWindowW(cls, None)
if not hwnd:
res = []
@ctypes.WINFUNCTYPE(ctypes.c_bool, wt.HWND, wt.LPARAM)
def _cb(h, _l):
n = u.GetWindowTextLengthW(h)
if n:
b = ctypes.create_unicode_buffer(n + 1)
u.GetWindowTextW(h, b, n + 1)
if b.value.startswith("EVE -"):
res.append(h)
return True
u.EnumWindows(_cb, 0)
hwnd = res[0] if res else 0
if not hwnd:
return None
r = wt.RECT()
u.GetClientRect(hwnd, ctypes.byref(r))
w, h = r.right, r.bottom
if w <= 0 or h <= 0:
return None
hdc = u.GetDC(hwnd)
mdc = g.CreateCompatibleDC(hdc)
bmp = g.CreateCompatibleBitmap(hdc, w, h)
g.SelectObject(mdc, bmp)
u.PrintWindow(hwnd, mdc, 2) # PW_RENDERFULLCONTENT (works for DX windows)
class BMIH(ctypes.Structure):
_fields_ = [("biSize", wt.DWORD), ("biWidth", ctypes.c_long), ("biHeight", ctypes.c_long),
("biPlanes", wt.WORD), ("biBitCount", wt.WORD), ("biCompression", wt.DWORD),
("biSizeImage", wt.DWORD), ("p1", ctypes.c_long), ("p2", ctypes.c_long),
("p3", wt.DWORD), ("p4", wt.DWORD)]
bmi = BMIH()
bmi.biSize = ctypes.sizeof(BMIH); bmi.biWidth = w; bmi.biHeight = -h
bmi.biPlanes = 1; bmi.biBitCount = 32; bmi.biCompression = 0
buf = ctypes.create_string_buffer(w * h * 4)
g.GetDIBits(mdc, bmp, 0, h, buf, ctypes.byref(bmi), 0)
img = Image.frombuffer("RGB", (w, h), buf, "raw", "BGRX", 0, 1)
g.DeleteObject(bmp); g.DeleteDC(mdc); u.ReleaseDC(hwnd, hdc) # avoid GDI leak
return img
def grab_region(region):
"""Crop a WINDOW-RELATIVE region from the EVE window capture (occlusion-proof).
Falls back to absolute-screen grab if the EVE window can't be found."""
img = capture_window()
if img is not None:
l, t, ww, hh = region
return img.crop((l, t, l + ww, t + hh))
import mss
from PIL import Image
left, top, width, height = region
with mss.mss() as sct:
raw = sct.grab({"left": left, "top": top, "width": width, "height": height})
return Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
def _detect_readout(cp, img, mon_left=0, mon_top=0, save=True, scale=1):
"""Find the ore-hold 'cur / cap m3' readout inside a screenshot. Returns the
absolute-screen region [l,t,w,h] (and saves it) or None. `scale` = how much `img`
was upscaled vs screen (coords are divided back). Split out so it's unit-testable."""
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if cp else ""
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
lines = {}
for i in range(len(data["text"])):
if not data["text"][i].strip():
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
lines.setdefault(key, []).append(i)
best = None # (cap, [l,t,w,h])
for idxs in lines.values():
joined = " ".join(data["text"][i] for i in idxs)
parsed = parse_orehold_text(joined)
if not parsed:
continue
cur, cap = parsed
if not (500 <= cap <= 2_000_000): # plausible ore/gas hold capacity
continue
xs = [data["left"][i] // scale for i in idxs]
ys = [data["top"][i] // scale for i in idxs]
rights = [(data["left"][i] + data["width"][i]) // scale for i in idxs]
bots = [(data["top"][i] + data["height"][i]) // scale for i in idxs]
pad = 8
region = [mon_left + min(xs) - pad, mon_top + min(ys) - pad,
(max(rights) - min(xs)) + 2 * pad, (max(bots) - min(ys)) + 2 * pad]
if best is None or cap > best[0]: # ore hold cap is the big number
best = (cap, region)
if best:
l, t, w, h = best[1]
if save:
save_region(cp, l, t, w, h)
print(f"[ocr] auto-detected ore-hold readout -> region={l},{t},{w},{h} "
f"(cap~{best[0]:,})")
return best[1]
return None
def auto_region(cp):
"""Find the ore-hold 'X / Y m³' readout WITHIN the EVE window capture (clean,
occlusion-proof). Returns a WINDOW-RELATIVE region. Have the Ore Hold window open."""
img = capture_window()
if img is None:
return None
up = img.resize((img.width * 2, img.height * 2)) # upscale so small m³ text reads
return _detect_readout(cp, up, 0, 0, scale=2) # window-relative (origin 0,0)
def run_ocr(cp):
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
poll = cp.getint("watcher", "poll_secs", fallback=10)
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
reset_pct = cp.getfloat("watcher", "reset_pct", fallback=50.0)
cooldown = cp.getint("watcher", "cooldown_secs", fallback=120)
# stall = hold not growing -> lasers/drones stopped (depleted rock, idle drones)
stall_secs = cp.getint("watcher", "stall_secs", fallback=75)
status_secs = cp.getint("watcher", "status_secs", fallback=90) # live Discord readout
# The "X/Y m³" text vanishes when items are selected/compressed, so OCR can lock
# onto the wrong gauge (e.g. hull) and post a bogus low %. Until the visual fill-bar
# read is in, only post the hold % to Discord when the read is *trusted* — i.e. we've
# actually watched it grow this session. Off by default; flip [watcher] post_hold=true
# once the fill-bar method proves out.
post_hold = cp.getboolean("watcher", "post_hold", fallback=False)
ever_grew = False
region_s = cp.get("ocr", "region", fallback="").strip()
if region_s:
region = [int(x) for x in region_s.split(",")]
else:
print("[ocr] no saved region — auto-detecting the Ore Hold readout on screen...")
region = auto_region(cp)
while region is None:
print(f"[ocr] not visible yet — open your in-game Ore Hold window. "
f"Retrying in {max(poll, 15)}s...")
time.sleep(max(poll, 15))
region = auto_region(cp)
print(f"[ocr] watching region={region} every {poll}s; "
f"alert>={alert_pct}% reset<{reset_pct}% stall>{stall_secs}s")
armed = True
last_alert = 0.0
last_cur = -1
last_grow = time.time()
last_stall_alert = 0.0
misses = 0
samples = [] # recent (t, cur) for the live fill-rate estimate
rate_m3min = 0.0
last_status = 0.0
was_mining = False
while True:
try:
img = grab_region(region)
text = pytesseract.image_to_string(img, config="--psm 7")
parsed = parse_orehold_text(text)
if parsed:
misses = 0
cur, cap = parsed
pct = 100.0 * cur / cap
print(f"[ocr] {cur}/{cap} m3 ({pct:.1f}%) armed={armed}")
# --- live fill rate (m3/min) over a ~90s window -> .mining_rate ---
now = time.time()
samples.append((now, cur))
samples = [(t, c) for (t, c) in samples if now - t <= 90 and c <= cur]
if len(samples) >= 2 and (samples[-1][0] - samples[0][0]) >= 20:
dc = samples[-1][1] - samples[0][1]
dt = samples[-1][0] - samples[0][0]
rate_m3min = dc / dt * 60 if dc > 0 else 0.0
write_rate(rate_m3min, cur, cap)
# --- live status feed to Discord during a mining session ---
# trust the read only once we've watched the hold actually grow; a static
# low % that never moves is the wrong-gauge / selection-summary misread.
mining = bot_mining(cp)
if mining and post_hold and ever_grew and not bot_muted(cp) and \
(not was_mining or time.time() - last_status >= status_secs):
free = cap - cur
if rate_m3min > 0:
full_at = int(time.time() + free / rate_m3min * 60)
tail = f"filling {rate_m3min:,.0f} m³/min · full <t:{full_at}:R>"
else:
tail = "not growing yet — lasers/drones idle?"
act = " · 🟠 **COMPRESS NOW**" if pct >= alert_pct else \
(" · compress soon" if pct >= 80 else "")
_discord_live(cp, "hold", f"⛏️ {socket.gethostname()} ore hold",
f"{pct:.0f}% ({cur:,}/{cap:,} m³) · {tail}{act}")
last_status = time.time()
was_mining = mining
if pct < reset_pct:
armed = True
# --- still mining? (hold should be growing) ---
if cur > last_cur:
last_grow = time.time()
if last_cur >= 0:
ever_grew = True
last_cur = cur
if post_hold and ever_grew and pct < alert_pct - 1 \
and time.time() - last_grow > stall_secs \
and time.time() - last_stall_alert > cooldown:
notify(cp, "Mining stopped?",
f"Hold hasn't grown in {stall_secs}s at {pct:.0f}% — rock "
f"depleted, drones idle, or lasers offlined? Check.",
priority="high", tags="warning")
last_stall_alert = time.time()
# --- hold full ---
if post_hold and ever_grew and armed and pct >= alert_pct and \
time.time() - last_alert > cooldown:
notify(cp, "Hold full — compress",
f"Hold at {pct:.0f}% ({cur:,}/{cap:,} m3). "
f"Compress / unload / swap.")
armed = False
last_alert = time.time()
else:
misses += 1
print(f"[ocr] no reading (text={text!r})")
# window moved / closed? after ~2 min of misses, re-find it on screen
if misses % 12 == 0:
print("[ocr] readout lost — re-scanning screen for the Ore Hold...")
new = auto_region(cp)
if new:
region = new
except Exception as e:
print(f"[ocr] error: {e}")
time.sleep(poll)
# --------------------------------------------------------------------------- #
# Timer mode
# --------------------------------------------------------------------------- #
def run_timer(cp):
hold = cp.getfloat("timer", "ore_hold_m3", fallback=22000.0)
yield_pm = cp.getfloat("timer", "yield_m3_per_min", fallback=0.0)
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
if yield_pm <= 0:
sys.exit("Set [timer] yield_m3_per_min in config.ini (your m3/min).")
target = hold * alert_pct / 100.0
secs = target / yield_pm * 60.0
while True:
input(f"\nPress Enter when lasers go hot "
f"(will ping in {secs/60:.1f} min at {alert_pct:.0f}% / "
f"{target:,.0f} m3)... ")
start = time.time()
while time.time() - start < secs:
remaining = secs - (time.time() - start)
print(f" [timer] {remaining/60:5.1f} min to full", end="\r")
time.sleep(5)
notify(cp, "Retriever ore hold full (est.)",
f"~{alert_pct:.0f}% full ({target:,.0f} m3 at "
f"{yield_pm:.0f} m3/min). Check & unload.")
print("\n[timer] alert fired. Loop again for the next run.")
# --------------------------------------------------------------------------- #
# Region snip helper (Tkinter drag-to-select)
# --------------------------------------------------------------------------- #
def run_snip(cp):
import tkinter as tk
coords = {}
root = tk.Tk()
root.attributes("-fullscreen", True)
root.attributes("-alpha", 0.25)
root.configure(bg="black")
root.title("Drag a box over the Ore Hold 'cur / cap m3' text")
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
canvas.pack(fill=tk.BOTH, expand=True)
rect = {"id": None, "x0": 0, "y0": 0}
def on_press(e):
rect["x0"], rect["y0"] = e.x_root, e.y_root
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y,
outline="red", width=2)
def on_drag(e):
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
rect["y0"] - root.winfo_rooty(), e.x, e.y)
def on_release(e):
x0, y0 = rect["x0"], rect["y0"]
x1, y1 = e.x_root, e.y_root
coords["region"] = (min(x0, x1), min(y0, y1),
abs(x1 - x0), abs(y1 - y0))
root.destroy()
canvas.bind("<ButtonPress-1>", on_press)
canvas.bind("<B1-Motion>", on_drag)
canvas.bind("<ButtonRelease-1>", on_release)
root.bind("<Escape>", lambda e: root.destroy())
print("Drag a tight box around the '12,345 / 22,000 m3' text. Esc to cancel.")
root.mainloop()
if "region" in coords:
l, t, w, h = coords["region"]
save_region(cp, l, t, w, h)
print(f"Saved region={l},{t},{w},{h} to config.ini")
else:
print("Cancelled, nothing saved.")
# --------------------------------------------------------------------------- #
def main():
cp = load_config()
if "--snip" in sys.argv:
run_snip(cp)
return
if "--test" in sys.argv:
notify(cp, "Eve watcher test",
"If you got this on phone + toast, delivery works.")
return
if "--read" in sys.argv: # one-shot: print current hold cur/cap/%
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
rs = cp.get("ocr", "region", fallback="").strip()
region = [int(x) for x in rs.split(",")] if rs else auto_region(cp)
if not region:
print("READ none — Ore Hold readout not visible (open your Ore Hold window)")
return
img = grab_region(region)
img = img.resize((img.width * 2, img.height * 2))
text = pytesseract.image_to_string(img, config="--psm 7")
parsed = parse_orehold_text(text)
if parsed:
cur, cap = parsed
print(f"READ {cur}/{cap} m3 ({100.0*cur/cap:.1f}%)")
else:
print(f"READ fail — raw OCR={text!r}")
return
mode = cp.get("watcher", "mode", fallback="ocr").strip().lower()
print(f"=== eve_orehold_watcher starting (mode={mode}) ===")
heartbeat(cp, "orehold")
if mode == "ocr":
run_ocr(cp)
elif mode == "timer":
run_timer(cp)
else:
sys.exit(f"Unknown mode '{mode}' (use 'ocr' or 'timer')")
if __name__ == "__main__":
main()