336 lines
12 KiB
Python
336 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
eve_orehold_watcher.py - Ping me when my Retriever's ore hold is full.
|
|
|
|
Runs on Goliath (Windows). Two detection modes (set in config.ini):
|
|
|
|
mode = ocr Reads the in-game "Ore Hold" inventory window every POLL_SECS
|
|
via screen capture + OCR, parses "<current> / <capacity> m3",
|
|
and alerts when fill% >= ALERT_PCT. Most accurate. Requires
|
|
Tesseract-OCR installed and a one-time region calibration
|
|
(run with --snip to draw the box).
|
|
|
|
mode = timer No screen reading. You give it your ore-hold size and effective
|
|
yield (m3/min); it counts down from when you press Enter and
|
|
alerts at the projected fill time. Dead reliable, zero setup.
|
|
Only blind spot: a rock depleting earlier than estimated.
|
|
|
|
Both modes deliver to:
|
|
- ntfy (push to your phone; reuses the same ntfy app you already have)
|
|
- Windows toast (on-screen on Goliath, so you see it without alt-tabbing)
|
|
|
|
Alerts re-arm automatically (OCR: when the hold drops back below RESET_PCT,
|
|
e.g. after you unload; timer: when you start a new run) and respect a cooldown.
|
|
|
|
Usage:
|
|
python eve_orehold_watcher.py # run the watcher (mode from config)
|
|
python eve_orehold_watcher.py --snip # draw the OCR capture region, save it
|
|
python eve_orehold_watcher.py --test # fire one test alert and exit
|
|
"""
|
|
|
|
import configparser
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
CONFIG_PATH = os.path.join(HERE, "config.ini")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Config
|
|
# --------------------------------------------------------------------------- #
|
|
def load_config():
|
|
if not os.path.exists(CONFIG_PATH):
|
|
sys.exit(f"No config.ini found. Copy config.ini.example -> config.ini "
|
|
f"and edit it.\n(looked in {CONFIG_PATH})")
|
|
cp = configparser.ConfigParser()
|
|
cp.read(CONFIG_PATH, encoding="utf-8-sig") # tolerate a UTF-8 BOM
|
|
return cp
|
|
|
|
|
|
def save_region(cp, left, top, width, height):
|
|
if not cp.has_section("ocr"):
|
|
cp.add_section("ocr")
|
|
cp["ocr"]["region"] = f"{left},{top},{width},{height}"
|
|
with open(CONFIG_PATH, "w") as f:
|
|
cp.write(f)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Delivery
|
|
# --------------------------------------------------------------------------- #
|
|
_BOT_STATE = {"t": 0, "muted": False}
|
|
|
|
|
|
def bot_muted(cp):
|
|
"""Coordinate with the Discord bot: poll the shared alert-state it publishes, so a
|
|
`!mute` in Discord also silences these local watchers. Cached 30s."""
|
|
url = cp.get("coordination", "bot_state_url", fallback="").strip() \
|
|
if cp.has_section("coordination") else ""
|
|
if not url:
|
|
return False
|
|
if time.time() - _BOT_STATE["t"] < 30:
|
|
return _BOT_STATE["muted"]
|
|
try:
|
|
d = json.loads(urllib.request.urlopen(url, timeout=5).read())
|
|
_BOT_STATE["muted"] = bool(d.get("muted"))
|
|
_BOT_STATE["t"] = time.time()
|
|
except Exception:
|
|
pass
|
|
return _BOT_STATE["muted"]
|
|
|
|
|
|
def notify(cp, title, message, priority="high", tags="rock,bell"):
|
|
"""Fan out to ntfy (phone), a Windows toast, and a Discord channel."""
|
|
if bot_muted(cp):
|
|
print("[muted by bot]")
|
|
return
|
|
_ntfy(cp, title, message, priority, tags)
|
|
_toast(title, message)
|
|
_discord(cp, title, message)
|
|
|
|
|
|
def _discord(cp, title, message):
|
|
"""Post to a Discord channel via webhook (shared with fleetmates)."""
|
|
webhook = cp.get("discord", "webhook", fallback="").strip()
|
|
if not webhook:
|
|
return
|
|
mention = cp.get("discord", "mention", fallback="").strip() # e.g. <@USERID> or @here
|
|
payload = json.dumps({"content": f"{mention} **{title}**\n{message}".strip()})
|
|
try:
|
|
req = urllib.request.Request(
|
|
webhook, data=payload.encode("utf-8"),
|
|
headers={"Content-Type": "application/json"}, method="POST")
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
r.read()
|
|
print("[discord] sent")
|
|
except Exception as e:
|
|
print(f"[discord] FAILED: {e}")
|
|
|
|
|
|
def _ntfy(cp, title, message, priority, tags):
|
|
server = cp.get("ntfy", "server", fallback="https://ntfy.sh").rstrip("/")
|
|
topic = cp.get("ntfy", "topic", fallback="").strip()
|
|
if not topic:
|
|
print("[ntfy] no topic set, skipping push")
|
|
return
|
|
url = f"{server}/{topic}"
|
|
headers = {
|
|
"Title": title,
|
|
"Priority": priority,
|
|
"Tags": tags,
|
|
}
|
|
auth = cp.get("ntfy", "auth", fallback="").strip() # "user:pass" for self-hosted
|
|
if auth:
|
|
import base64
|
|
headers["Authorization"] = "Basic " + base64.b64encode(
|
|
auth.encode()).decode()
|
|
try:
|
|
req = urllib.request.Request(url, data=message.encode("utf-8"),
|
|
headers=headers, method="POST")
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
r.read()
|
|
print(f"[ntfy] sent -> {url}")
|
|
except Exception as e:
|
|
print(f"[ntfy] FAILED: {e}")
|
|
|
|
|
|
def _toast(title, message):
|
|
try:
|
|
from winotify import Notification, audio
|
|
t = Notification(app_id="Eve Ore Watcher", title=title, msg=message,
|
|
duration="long")
|
|
t.set_audio(audio.Default, loop=False)
|
|
t.show()
|
|
print("[toast] shown")
|
|
except Exception as e:
|
|
print(f"[toast] FAILED (is 'winotify' installed?): {e}")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# OCR mode
|
|
# --------------------------------------------------------------------------- #
|
|
def parse_orehold_text(text):
|
|
"""Pull (current, capacity) m3 out of OCR text like '12,345 / 22,000 m3'."""
|
|
cleaned = text.replace(",", "").replace(".", "").replace(" ", "")
|
|
m = re.search(r"(\d{2,7})/(\d{2,7})", cleaned)
|
|
if not m:
|
|
return None
|
|
cur, cap = int(m.group(1)), int(m.group(2))
|
|
if cap <= 0 or cur > cap * 1.2:
|
|
return None
|
|
return cur, cap
|
|
|
|
|
|
def grab_region(region):
|
|
import mss
|
|
from PIL import Image
|
|
left, top, width, height = region
|
|
with mss.mss() as sct:
|
|
raw = sct.grab({"left": left, "top": top, "width": width,
|
|
"height": height})
|
|
return Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
|
|
|
|
|
|
def run_ocr(cp):
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
|
|
region_s = cp.get("ocr", "region", fallback="").strip()
|
|
if not region_s:
|
|
sys.exit("No OCR region set. Run: python eve_orehold_watcher.py --snip")
|
|
region = [int(x) for x in region_s.split(",")]
|
|
|
|
poll = cp.getint("watcher", "poll_secs", fallback=10)
|
|
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
|
|
reset_pct = cp.getfloat("watcher", "reset_pct", fallback=50.0)
|
|
cooldown = cp.getint("watcher", "cooldown_secs", fallback=120)
|
|
# stall = hold not growing -> lasers/drones stopped (depleted rock, idle drones)
|
|
stall_secs = cp.getint("watcher", "stall_secs", fallback=150)
|
|
|
|
print(f"[ocr] watching region={region} every {poll}s; "
|
|
f"alert>={alert_pct}% reset<{reset_pct}% stall>{stall_secs}s")
|
|
armed = True
|
|
last_alert = 0.0
|
|
last_cur = -1
|
|
last_grow = time.time()
|
|
last_stall_alert = 0.0
|
|
while True:
|
|
try:
|
|
img = grab_region(region)
|
|
text = pytesseract.image_to_string(img, config="--psm 7")
|
|
parsed = parse_orehold_text(text)
|
|
if parsed:
|
|
cur, cap = parsed
|
|
pct = 100.0 * cur / cap
|
|
print(f"[ocr] {cur}/{cap} m3 ({pct:.1f}%) armed={armed}")
|
|
if pct < reset_pct:
|
|
armed = True
|
|
# --- still mining? (hold should be growing) ---
|
|
if cur > last_cur:
|
|
last_grow = time.time()
|
|
last_cur = cur
|
|
if pct < alert_pct - 1 and time.time() - last_grow > stall_secs \
|
|
and time.time() - last_stall_alert > cooldown:
|
|
notify(cp, "Mining stopped?",
|
|
f"Hold hasn't grown in {stall_secs}s at {pct:.0f}% — rock "
|
|
f"depleted, drones idle, or lasers offlined? Check.",
|
|
priority="high", tags="warning")
|
|
last_stall_alert = time.time()
|
|
# --- hold full ---
|
|
if armed and pct >= alert_pct and \
|
|
time.time() - last_alert > cooldown:
|
|
notify(cp, "Hold full — compress",
|
|
f"Hold at {pct:.0f}% ({cur:,}/{cap:,} m3). "
|
|
f"Compress / unload / swap.")
|
|
armed = False
|
|
last_alert = time.time()
|
|
else:
|
|
print(f"[ocr] no reading (text={text!r})")
|
|
except Exception as e:
|
|
print(f"[ocr] error: {e}")
|
|
time.sleep(poll)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Timer mode
|
|
# --------------------------------------------------------------------------- #
|
|
def run_timer(cp):
|
|
hold = cp.getfloat("timer", "ore_hold_m3", fallback=22000.0)
|
|
yield_pm = cp.getfloat("timer", "yield_m3_per_min", fallback=0.0)
|
|
alert_pct = cp.getfloat("watcher", "alert_pct", fallback=95.0)
|
|
if yield_pm <= 0:
|
|
sys.exit("Set [timer] yield_m3_per_min in config.ini (your m3/min).")
|
|
|
|
target = hold * alert_pct / 100.0
|
|
secs = target / yield_pm * 60.0
|
|
while True:
|
|
input(f"\nPress Enter when lasers go hot "
|
|
f"(will ping in {secs/60:.1f} min at {alert_pct:.0f}% / "
|
|
f"{target:,.0f} m3)... ")
|
|
start = time.time()
|
|
while time.time() - start < secs:
|
|
remaining = secs - (time.time() - start)
|
|
print(f" [timer] {remaining/60:5.1f} min to full", end="\r")
|
|
time.sleep(5)
|
|
notify(cp, "Retriever ore hold full (est.)",
|
|
f"~{alert_pct:.0f}% full ({target:,.0f} m3 at "
|
|
f"{yield_pm:.0f} m3/min). Check & unload.")
|
|
print("\n[timer] alert fired. Loop again for the next run.")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Region snip helper (Tkinter drag-to-select)
|
|
# --------------------------------------------------------------------------- #
|
|
def run_snip(cp):
|
|
import tkinter as tk
|
|
coords = {}
|
|
|
|
root = tk.Tk()
|
|
root.attributes("-fullscreen", True)
|
|
root.attributes("-alpha", 0.25)
|
|
root.configure(bg="black")
|
|
root.title("Drag a box over the Ore Hold 'cur / cap m3' text")
|
|
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
|
|
canvas.pack(fill=tk.BOTH, expand=True)
|
|
rect = {"id": None, "x0": 0, "y0": 0}
|
|
|
|
def on_press(e):
|
|
rect["x0"], rect["y0"] = e.x_root, e.y_root
|
|
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y,
|
|
outline="red", width=2)
|
|
|
|
def on_drag(e):
|
|
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
|
|
rect["y0"] - root.winfo_rooty(), e.x, e.y)
|
|
|
|
def on_release(e):
|
|
x0, y0 = rect["x0"], rect["y0"]
|
|
x1, y1 = e.x_root, e.y_root
|
|
coords["region"] = (min(x0, x1), min(y0, y1),
|
|
abs(x1 - x0), abs(y1 - y0))
|
|
root.destroy()
|
|
|
|
canvas.bind("<ButtonPress-1>", on_press)
|
|
canvas.bind("<B1-Motion>", on_drag)
|
|
canvas.bind("<ButtonRelease-1>", on_release)
|
|
root.bind("<Escape>", lambda e: root.destroy())
|
|
print("Drag a tight box around the '12,345 / 22,000 m3' text. Esc to cancel.")
|
|
root.mainloop()
|
|
|
|
if "region" in coords:
|
|
l, t, w, h = coords["region"]
|
|
save_region(cp, l, t, w, h)
|
|
print(f"Saved region={l},{t},{w},{h} to config.ini")
|
|
else:
|
|
print("Cancelled, nothing saved.")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
def main():
|
|
cp = load_config()
|
|
if "--snip" in sys.argv:
|
|
run_snip(cp)
|
|
return
|
|
if "--test" in sys.argv:
|
|
notify(cp, "Eve watcher test",
|
|
"If you got this on phone + toast, delivery works.")
|
|
return
|
|
mode = cp.get("watcher", "mode", fallback="ocr").strip().lower()
|
|
print(f"=== eve_orehold_watcher starting (mode={mode}) ===")
|
|
if mode == "ocr":
|
|
run_ocr(cp)
|
|
elif mode == "timer":
|
|
run_timer(cp)
|
|
else:
|
|
sys.exit(f"Unknown mode '{mode}' (use 'ocr' or 'timer')")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|