574 lines
23 KiB
Python
574 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
eve_orehold_watcher.py - Ping me when my Retriever's ore hold is full.
|
|
|
|
Runs on Goliath (Windows). Two detection modes (set in config.ini):
|
|
|
|
mode = ocr Reads the in-game "Ore Hold" inventory window every POLL_SECS
|
|
via screen capture + OCR, parses "<current> / <capacity> m3",
|
|
and alerts when fill% >= ALERT_PCT. Most accurate. Requires
|
|
Tesseract-OCR installed and a one-time region calibration
|
|
(run with --snip to draw the box).
|
|
|
|
mode = timer No screen reading. You give it your ore-hold size and effective
|
|
yield (m3/min); it counts down from when you press Enter and
|
|
alerts at the projected fill time. Dead reliable, zero setup.
|
|
Only blind spot: a rock depleting earlier than estimated.
|
|
|
|
Both modes deliver to:
|
|
- ntfy (push to your phone; reuses the same ntfy app you already have)
|
|
- Windows toast (on-screen on Goliath, so you see it without alt-tabbing)
|
|
|
|
Alerts re-arm automatically (OCR: when the hold drops back below RESET_PCT,
|
|
e.g. after you unload; timer: when you start a new run) and respect a cooldown.
|
|
|
|
Usage:
|
|
python eve_orehold_watcher.py # run the watcher (mode from config)
|
|
python eve_orehold_watcher.py --snip # draw the OCR capture region, save it
|
|
python eve_orehold_watcher.py --test # fire one test alert and exit
|
|
"""
|
|
|
|
import configparser
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
|
|
# Stop child processes (Tesseract on every OCR scan, etc.) from flashing a console
|
|
# window. Patches subprocess so every spawned process gets CREATE_NO_WINDOW on Windows.
|
|
# All watchers import this module, so this covers the whole fleet.
|
|
if os.name == "nt":
|
|
_CREATE_NO_WINDOW = 0x08000000
|
|
_orig_popen_init = subprocess.Popen.__init__
|
|
|
|
def _popen_no_window(self, *a, **k):
|
|
k["creationflags"] = k.get("creationflags", 0) | _CREATE_NO_WINDOW
|
|
_orig_popen_init(self, *a, **k)
|
|
|
|
subprocess.Popen.__init__ = _popen_no_window
|
|
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
CONFIG_PATH = os.path.join(HERE, "config.ini")
|
|
|
|
|
|
def heartbeat(cp, source):
|
|
"""No-op in Discord — 'watcher online' pings were non-information spam. Watchers
|
|
being up is evident from the actual alerts/status. Kept as a local log line only."""
|
|
print(f"[{source}] online")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Config
|
|
# --------------------------------------------------------------------------- #
|
|
def load_config():
|
|
if not os.path.exists(CONFIG_PATH):
|
|
sys.exit(f"No config.ini found. Copy config.ini.example -> config.ini "
|
|
f"and edit it.\n(looked in {CONFIG_PATH})")
|
|
# inline_comment_prefixes: strip ' ; comment' after values, else getint() crashes
|
|
# on lines like "poll_secs = 10 ; how often" (config.ini.example has these).
|
|
cp = configparser.ConfigParser(inline_comment_prefixes=(";",))
|
|
cp.read(CONFIG_PATH, encoding="utf-8-sig") # tolerate a UTF-8 BOM
|
|
return cp
|
|
|
|
|
|
def save_region(cp, left, top, width, height):
|
|
if not cp.has_section("ocr"):
|
|
cp.add_section("ocr")
|
|
cp["ocr"]["region"] = f"{left},{top},{width},{height}"
|
|
with open(CONFIG_PATH, "w") as f:
|
|
cp.write(f)
|
|
|
|
|
|
def snip_region(prompt="Drag a box around the area to watch. Esc to cancel."):
|
|
"""Full-screen drag-select; returns (left, top, width, height) or None. Reusable
|
|
by any watcher that needs a one-time region (survey scanner, Local window, ...)."""
|
|
import tkinter as tk
|
|
coords = {}
|
|
root = tk.Tk()
|
|
root.attributes("-fullscreen", True)
|
|
root.attributes("-alpha", 0.25)
|
|
root.configure(bg="black")
|
|
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
|
|
canvas.pack(fill=tk.BOTH, expand=True)
|
|
rect = {"id": None, "x0": 0, "y0": 0}
|
|
|
|
def on_press(e):
|
|
rect["x0"], rect["y0"] = e.x_root, e.y_root
|
|
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2)
|
|
|
|
def on_drag(e):
|
|
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
|
|
rect["y0"] - root.winfo_rooty(), e.x, e.y)
|
|
|
|
def on_release(e):
|
|
x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root
|
|
coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0))
|
|
root.destroy()
|
|
|
|
canvas.bind("<ButtonPress-1>", on_press)
|
|
canvas.bind("<B1-Motion>", on_drag)
|
|
canvas.bind("<ButtonRelease-1>", on_release)
|
|
root.bind("<Escape>", lambda e: root.destroy())
|
|
print(prompt)
|
|
root.mainloop()
|
|
return coords.get("r")
|
|
|
|
|
|
RATE_PATH = os.path.join(HERE, ".mining_rate")
|
|
|
|
|
|
def write_rate(m3_per_min, cur, cap):
|
|
"""Publish the live ore-hold fill rate so the rock watcher can estimate how long
|
|
until the asteroid you're on is depleted. Written every OCR tick."""
|
|
try:
|
|
with open(RATE_PATH, "w") as f:
|
|
json.dump({"rate_m3min": round(m3_per_min, 1), "cur": cur, "cap": cap,
|
|
"ts": int(time.time())}, f)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Delivery
|
|
# --------------------------------------------------------------------------- #
|
|
_BOT_STATE = {"t": 0, "muted": False, "mining": False}
|
|
|
|
|
|
DEFAULT_STATE_URL = ("https://git.armoredarmadillo.com/brockdarnold/eve-watcher/"
|
|
"raw/branch/main/alert_state.json")
|
|
|
|
|
|
def _bot_state(cp):
|
|
"""Poll the shared alert-state the Discord bot publishes (mute/quiet/mining),
|
|
cached 30s, so a Discord `!mute` / `!mining` also drives the local watchers.
|
|
Defaults to the public alert_state.json (no config needed)."""
|
|
url = (cp.get("coordination", "bot_state_url", fallback="").strip()
|
|
if cp.has_section("coordination") else "") or DEFAULT_STATE_URL
|
|
if time.time() - _BOT_STATE["t"] < 30:
|
|
return _BOT_STATE
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "eve-watcher"})
|
|
d = json.loads(urllib.request.urlopen(req, timeout=5).read())
|
|
_BOT_STATE["muted"] = bool(d.get("muted"))
|
|
_BOT_STATE["mining"] = bool(d.get("mining"))
|
|
_BOT_STATE["t"] = time.time()
|
|
except Exception:
|
|
pass
|
|
return _BOT_STATE
|
|
|
|
|
|
def bot_muted(cp):
|
|
return _bot_state(cp)["muted"]
|
|
|
|
|
|
def bot_mining(cp):
|
|
"""True when a mining session is active (started via `!mining on` in Discord)."""
|
|
return _bot_state(cp)["mining"]
|
|
|
|
|
|
def notify(cp, title, message, priority="high", tags="rock,bell"):
|
|
"""Fan out to ntfy (phone), a Windows toast, and a Discord channel."""
|
|
if bot_muted(cp):
|
|
print("[muted by bot]")
|
|
return
|
|
_ntfy(cp, title, message, priority, tags)
|
|
_toast(title, message)
|
|
_discord(cp, title, message)
|
|
|
|
|
|
# Shared fleet-channel webhook — fallback so alerts reach Discord even if an older
|
|
# config.ini has no webhook set (config is gitignored, so updates can't fill it).
|
|
DEFAULT_WEBHOOK = ("https://discord.com/api/webhooks/1515603432583598172/"
|
|
"7g2A9Lfg1afbZGoxBENu9TpxSxE4zfpg16nRqE08qzyI3a0uttADL6wyJ2ERHRfsHlK9")
|
|
|
|
|
|
def _discord(cp, title, message):
|
|
"""Post to a Discord channel via webhook (shared with fleetmates)."""
|
|
webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK
|
|
if not webhook:
|
|
return
|
|
mention = cp.get("discord", "mention", fallback="").strip() # e.g. <@USERID> or @here
|
|
payload = json.dumps({"content": f"{mention} **{title}**\n{message}".strip()})
|
|
try:
|
|
req = urllib.request.Request(
|
|
webhook, data=payload.encode("utf-8"),
|
|
headers={"Content-Type": "application/json", "User-Agent": "eve-watcher"},
|
|
method="POST") # Discord 403s a missing/blank User-Agent — must set one
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
r.read()
|
|
print("[discord] sent")
|
|
except Exception as e:
|
|
print(f"[discord] FAILED: {e}")
|
|
|
|
|
|
def _discord_live(cp, key, title, message):
|
|
"""Post-or-EDIT a single 'live' message so status updates IN PLACE (no spam).
|
|
Stores the message id per key in .live_<key>. Skips when muted."""
|
|
if bot_muted(cp):
|
|
return
|
|
webhook = cp.get("discord", "webhook", fallback="").strip() or DEFAULT_WEBHOOK
|
|
if not webhook:
|
|
return
|
|
idfile = os.path.join(HERE, f".live_{key}")
|
|
body = json.dumps({"content": f"**{title}**\n{message}"}).encode("utf-8")
|
|
hdr = {"Content-Type": "application/json", "User-Agent": "eve-watcher"}
|
|
mid = None
|
|
try:
|
|
mid = (open(idfile).read().strip() or None)
|
|
except Exception:
|
|
pass
|
|
if mid: # try to edit the existing message
|
|
try:
|
|
urllib.request.urlopen(urllib.request.Request(
|
|
f"{webhook}/messages/{mid}", data=body, headers=hdr, method="PATCH"),
|
|
timeout=10).read()
|
|
return
|
|
except Exception:
|
|
mid = None # message gone -> repost below
|
|
try:
|
|
resp = json.loads(urllib.request.urlopen(urllib.request.Request(
|
|
f"{webhook}?wait=true", data=body, headers=hdr, method="POST"),
|
|
timeout=10).read())
|
|
with open(idfile, "w") as f:
|
|
f.write(str(resp.get("id", "")))
|
|
except Exception as e:
|
|
print(f"[discord-live] {e}")
|
|
|
|
|
|
def _ntfy(cp, title, message, priority, tags):
|
|
server = cp.get("ntfy", "server", fallback="https://ntfy.sh").rstrip("/")
|
|
topic = cp.get("ntfy", "topic", fallback="").strip()
|
|
if not topic:
|
|
print("[ntfy] no topic set, skipping push")
|
|
return
|
|
url = f"{server}/{topic}"
|
|
headers = {
|
|
"Title": title,
|
|
"Priority": priority,
|
|
"Tags": tags,
|
|
}
|
|
auth = cp.get("ntfy", "auth", fallback="").strip() # "user:pass" for self-hosted
|
|
if auth:
|
|
import base64
|
|
headers["Authorization"] = "Basic " + base64.b64encode(
|
|
auth.encode()).decode()
|
|
try:
|
|
req = urllib.request.Request(url, data=message.encode("utf-8"),
|
|
headers=headers, method="POST")
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
r.read()
|
|
print(f"[ntfy] sent -> {url}")
|
|
except Exception as e:
|
|
print(f"[ntfy] FAILED: {e}")
|
|
|
|
|
|
def _toast(title, message):
|
|
try:
|
|
from winotify import Notification, audio
|
|
t = Notification(app_id="Eve Ore Watcher", title=title, msg=message,
|
|
duration="long")
|
|
t.set_audio(audio.Default, loop=False)
|
|
t.show()
|
|
print("[toast] shown")
|
|
except Exception as e:
|
|
print(f"[toast] FAILED (is 'winotify' installed?): {e}")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# OCR mode
|
|
# --------------------------------------------------------------------------- #
|
|
def parse_orehold_text(text):
|
|
"""Pull (current, capacity) m3 out of OCR text like '12,345 / 22,000 m3'."""
|
|
cleaned = text.replace(",", "").replace(".", "").replace(" ", "")
|
|
m = re.search(r"(\d{2,7})/(\d{2,7})", cleaned)
|
|
if not m:
|
|
return None
|
|
cur, cap = int(m.group(1)), int(m.group(2))
|
|
if cap <= 0 or cur > cap * 1.2:
|
|
return None
|
|
return cur, cap
|
|
|
|
|
|
def grab_region(region):
|
|
import mss
|
|
from PIL import Image
|
|
left, top, width, height = region
|
|
with mss.mss() as sct:
|
|
raw = sct.grab({"left": left, "top": top, "width": width,
|
|
"height": height})
|
|
return Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
|
|
|
|
|
|
def _detect_readout(cp, img, mon_left=0, mon_top=0, save=True, scale=1):
|
|
"""Find the ore-hold 'cur / cap m3' readout inside a screenshot. Returns the
|
|
absolute-screen region [l,t,w,h] (and saves it) or None. `scale` = how much `img`
|
|
was upscaled vs screen (coords are divided back). Split out so it's unit-testable."""
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip() if cp else ""
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
|
|
lines = {}
|
|
for i in range(len(data["text"])):
|
|
if not data["text"][i].strip():
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
lines.setdefault(key, []).append(i)
|
|
best = None # (cap, [l,t,w,h])
|
|
for idxs in lines.values():
|
|
joined = " ".join(data["text"][i] for i in idxs)
|
|
parsed = parse_orehold_text(joined)
|
|
if not parsed:
|
|
continue
|
|
cur, cap = parsed
|
|
if not (500 <= cap <= 2_000_000): # plausible ore/gas hold capacity
|
|
continue
|
|
xs = [data["left"][i] // scale for i in idxs]
|
|
ys = [data["top"][i] // scale for i in idxs]
|
|
rights = [(data["left"][i] + data["width"][i]) // scale for i in idxs]
|
|
bots = [(data["top"][i] + data["height"][i]) // scale for i in idxs]
|
|
pad = 8
|
|
region = [mon_left + min(xs) - pad, mon_top + min(ys) - pad,
|
|
(max(rights) - min(xs)) + 2 * pad, (max(bots) - min(ys)) + 2 * pad]
|
|
if best is None or cap > best[0]: # ore hold cap is the big number
|
|
best = (cap, region)
|
|
if best:
|
|
l, t, w, h = best[1]
|
|
if save:
|
|
save_region(cp, l, t, w, h)
|
|
print(f"[ocr] auto-detected ore-hold readout -> region={l},{t},{w},{h} "
|
|
f"(cap~{best[0]:,})")
|
|
return best[1]
|
|
return None
|
|
|
|
|
|
def auto_region(cp):
|
|
"""Scan the whole screen for the ore-hold readout and save its location —
|
|
replaces the manual --snip. Just have the in-game Ore Hold window open."""
|
|
import mss
|
|
from PIL import Image
|
|
try:
|
|
with mss.mss() as sct:
|
|
mon = sct.monitors[0] # full virtual desktop (all screens)
|
|
raw = sct.grab(mon)
|
|
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
|
|
# crop the center band (where EVE sits) then upscale, so small m³ text is readable
|
|
W, Hh = img.size
|
|
cx0, cy0 = int(W * 0.22), 0
|
|
crop = img.crop((cx0, cy0, int(W * 0.82), int(Hh * 0.97)))
|
|
up = crop.resize((crop.width * 2, crop.height * 2))
|
|
except Exception as e:
|
|
print(f"[ocr] auto-detect error: {e}")
|
|
return None
|
|
return _detect_readout(cp, up, mon["left"] + cx0, mon["top"] + cy0, scale=2)
|
|
|
|
|
|
def run_ocr(cp):
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
|
|
poll = cp.getint("watcher", "poll_secs", fallback=10)
|
|
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
|
|
reset_pct = cp.getfloat("watcher", "reset_pct", fallback=50.0)
|
|
cooldown = cp.getint("watcher", "cooldown_secs", fallback=120)
|
|
# stall = hold not growing -> lasers/drones stopped (depleted rock, idle drones)
|
|
stall_secs = cp.getint("watcher", "stall_secs", fallback=75)
|
|
status_secs = cp.getint("watcher", "status_secs", fallback=90) # live Discord readout
|
|
|
|
region_s = cp.get("ocr", "region", fallback="").strip()
|
|
if region_s:
|
|
region = [int(x) for x in region_s.split(",")]
|
|
else:
|
|
print("[ocr] no saved region — auto-detecting the Ore Hold readout on screen...")
|
|
region = auto_region(cp)
|
|
while region is None:
|
|
print(f"[ocr] not visible yet — open your in-game Ore Hold window. "
|
|
f"Retrying in {max(poll, 15)}s...")
|
|
time.sleep(max(poll, 15))
|
|
region = auto_region(cp)
|
|
|
|
print(f"[ocr] watching region={region} every {poll}s; "
|
|
f"alert>={alert_pct}% reset<{reset_pct}% stall>{stall_secs}s")
|
|
armed = True
|
|
last_alert = 0.0
|
|
last_cur = -1
|
|
last_grow = time.time()
|
|
last_stall_alert = 0.0
|
|
misses = 0
|
|
samples = [] # recent (t, cur) for the live fill-rate estimate
|
|
rate_m3min = 0.0
|
|
last_status = 0.0
|
|
was_mining = False
|
|
while True:
|
|
try:
|
|
img = grab_region(region)
|
|
text = pytesseract.image_to_string(img, config="--psm 7")
|
|
parsed = parse_orehold_text(text)
|
|
if parsed:
|
|
misses = 0
|
|
cur, cap = parsed
|
|
pct = 100.0 * cur / cap
|
|
print(f"[ocr] {cur}/{cap} m3 ({pct:.1f}%) armed={armed}")
|
|
# --- live fill rate (m3/min) over a ~90s window -> .mining_rate ---
|
|
now = time.time()
|
|
samples.append((now, cur))
|
|
samples = [(t, c) for (t, c) in samples if now - t <= 90 and c <= cur]
|
|
if len(samples) >= 2 and (samples[-1][0] - samples[0][0]) >= 20:
|
|
dc = samples[-1][1] - samples[0][1]
|
|
dt = samples[-1][0] - samples[0][0]
|
|
rate_m3min = dc / dt * 60 if dc > 0 else 0.0
|
|
write_rate(rate_m3min, cur, cap)
|
|
# --- live status feed to Discord during a mining session ---
|
|
mining = bot_mining(cp)
|
|
if mining and not bot_muted(cp) and \
|
|
(not was_mining or time.time() - last_status >= status_secs):
|
|
free = cap - cur
|
|
if rate_m3min > 0:
|
|
full_at = int(time.time() + free / rate_m3min * 60)
|
|
tail = f"filling {rate_m3min:,.0f} m³/min · full <t:{full_at}:R>"
|
|
else:
|
|
tail = "not growing yet — lasers/drones idle?"
|
|
act = " · 🟠 **COMPRESS NOW**" if pct >= alert_pct else \
|
|
(" · compress soon" if pct >= 80 else "")
|
|
_discord_live(cp, "hold", f"⛏️ {socket.gethostname()} ore hold",
|
|
f"{pct:.0f}% ({cur:,}/{cap:,} m³) · {tail}{act}")
|
|
last_status = time.time()
|
|
was_mining = mining
|
|
if pct < reset_pct:
|
|
armed = True
|
|
# --- still mining? (hold should be growing) ---
|
|
if cur > last_cur:
|
|
last_grow = time.time()
|
|
last_cur = cur
|
|
if pct < alert_pct - 1 and time.time() - last_grow > stall_secs \
|
|
and time.time() - last_stall_alert > cooldown:
|
|
notify(cp, "Mining stopped?",
|
|
f"Hold hasn't grown in {stall_secs}s at {pct:.0f}% — rock "
|
|
f"depleted, drones idle, or lasers offlined? Check.",
|
|
priority="high", tags="warning")
|
|
last_stall_alert = time.time()
|
|
# --- hold full ---
|
|
if armed and pct >= alert_pct and \
|
|
time.time() - last_alert > cooldown:
|
|
notify(cp, "Hold full — compress",
|
|
f"Hold at {pct:.0f}% ({cur:,}/{cap:,} m3). "
|
|
f"Compress / unload / swap.")
|
|
armed = False
|
|
last_alert = time.time()
|
|
else:
|
|
misses += 1
|
|
print(f"[ocr] no reading (text={text!r})")
|
|
# window moved / closed? after ~2 min of misses, re-find it on screen
|
|
if misses % 12 == 0:
|
|
print("[ocr] readout lost — re-scanning screen for the Ore Hold...")
|
|
new = auto_region(cp)
|
|
if new:
|
|
region = new
|
|
except Exception as e:
|
|
print(f"[ocr] error: {e}")
|
|
time.sleep(poll)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Timer mode
|
|
# --------------------------------------------------------------------------- #
|
|
def run_timer(cp):
|
|
hold = cp.getfloat("timer", "ore_hold_m3", fallback=22000.0)
|
|
yield_pm = cp.getfloat("timer", "yield_m3_per_min", fallback=0.0)
|
|
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
|
|
if yield_pm <= 0:
|
|
sys.exit("Set [timer] yield_m3_per_min in config.ini (your m3/min).")
|
|
|
|
target = hold * alert_pct / 100.0
|
|
secs = target / yield_pm * 60.0
|
|
while True:
|
|
input(f"\nPress Enter when lasers go hot "
|
|
f"(will ping in {secs/60:.1f} min at {alert_pct:.0f}% / "
|
|
f"{target:,.0f} m3)... ")
|
|
start = time.time()
|
|
while time.time() - start < secs:
|
|
remaining = secs - (time.time() - start)
|
|
print(f" [timer] {remaining/60:5.1f} min to full", end="\r")
|
|
time.sleep(5)
|
|
notify(cp, "Retriever ore hold full (est.)",
|
|
f"~{alert_pct:.0f}% full ({target:,.0f} m3 at "
|
|
f"{yield_pm:.0f} m3/min). Check & unload.")
|
|
print("\n[timer] alert fired. Loop again for the next run.")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Region snip helper (Tkinter drag-to-select)
|
|
# --------------------------------------------------------------------------- #
|
|
def run_snip(cp):
|
|
import tkinter as tk
|
|
coords = {}
|
|
|
|
root = tk.Tk()
|
|
root.attributes("-fullscreen", True)
|
|
root.attributes("-alpha", 0.25)
|
|
root.configure(bg="black")
|
|
root.title("Drag a box over the Ore Hold 'cur / cap m3' text")
|
|
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
|
|
canvas.pack(fill=tk.BOTH, expand=True)
|
|
rect = {"id": None, "x0": 0, "y0": 0}
|
|
|
|
def on_press(e):
|
|
rect["x0"], rect["y0"] = e.x_root, e.y_root
|
|
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y,
|
|
outline="red", width=2)
|
|
|
|
def on_drag(e):
|
|
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
|
|
rect["y0"] - root.winfo_rooty(), e.x, e.y)
|
|
|
|
def on_release(e):
|
|
x0, y0 = rect["x0"], rect["y0"]
|
|
x1, y1 = e.x_root, e.y_root
|
|
coords["region"] = (min(x0, x1), min(y0, y1),
|
|
abs(x1 - x0), abs(y1 - y0))
|
|
root.destroy()
|
|
|
|
canvas.bind("<ButtonPress-1>", on_press)
|
|
canvas.bind("<B1-Motion>", on_drag)
|
|
canvas.bind("<ButtonRelease-1>", on_release)
|
|
root.bind("<Escape>", lambda e: root.destroy())
|
|
print("Drag a tight box around the '12,345 / 22,000 m3' text. Esc to cancel.")
|
|
root.mainloop()
|
|
|
|
if "region" in coords:
|
|
l, t, w, h = coords["region"]
|
|
save_region(cp, l, t, w, h)
|
|
print(f"Saved region={l},{t},{w},{h} to config.ini")
|
|
else:
|
|
print("Cancelled, nothing saved.")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
def main():
|
|
cp = load_config()
|
|
if "--snip" in sys.argv:
|
|
run_snip(cp)
|
|
return
|
|
if "--test" in sys.argv:
|
|
notify(cp, "Eve watcher test",
|
|
"If you got this on phone + toast, delivery works.")
|
|
return
|
|
mode = cp.get("watcher", "mode", fallback="ocr").strip().lower()
|
|
print(f"=== eve_orehold_watcher starting (mode={mode}) ===")
|
|
heartbeat(cp, "orehold")
|
|
if mode == "ocr":
|
|
run_ocr(cp)
|
|
elif mode == "timer":
|
|
run_timer(cp)
|
|
else:
|
|
sys.exit(f"Unknown mode '{mode}' (use 'ocr' or 'timer')")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|