eve-watcher/eve_rock_watcher.py

394 lines
16 KiB
Python

#!/usr/bin/env python3
"""Rock watcher — 'when do I switch asteroids?' alerts from your Survey Scanner.
EVE has no API for asteroid contents, so this reads the **Survey Scanner Results**
window off the screen (the only place that shows units remaining per rock). It tracks
each rock's amount over time and tells you, in Discord, when the rock you're mining is
about to run dry — and which rock to jump to next.
Two ways it knows your depletion rate (best available wins):
1. Direct — if the survey scanner is left running, each rock's number drops every
cycle; it measures units/sec straight from that (most accurate, multi-rock).
2. Hold-fill fallback — if there's a single rock and the scanner is static, it uses
the live ore-hold fill rate (written by eve_orehold_watcher to .mining_rate)
divided by the ore's m³/unit.
Only active during a mining session: start one with `!mining on` in Discord
(`!mining off` to stop). Runs headless, launched by start-all.ps1.
python eve_rock_watcher.py # normal (waits for !mining on)
python eve_rock_watcher.py --snip # manually box the survey window (if auto fails)
python eve_rock_watcher.py --test # parse one screen grab and print rows
"""
import json
import os
import re
import socket
import sys
import time
HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, HERE)
import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining
# ore unit volumes (m³/unit) for the hold-fill fallback; base names, longest first
ORE_VOL = {
"dark ochre": 8.0, "veldspar": 0.1, "scordite": 0.15, "pyroxeres": 0.3,
"plagioclase": 0.35, "omber": 0.6, "kernite": 1.2, "jaspet": 2.0,
"hemorphite": 3.0, "hedbergite": 3.0, "gneiss": 5.0, "crokite": 16.0,
"spodumain": 16.0, "bistot": 16.0, "arkonor": 16.0, "mercoxit": 40.0,
}
ORE_NAMES = sorted(ORE_VOL, key=len, reverse=True) # match "dark ochre" before "ochre"
def match_ore(line_lower):
for name in ORE_NAMES:
if name in line_lower:
return name
return None
def parse_survey(text):
"""OCR text of the Survey Scanner Results -> [(ore_base_name, units), ...]."""
rows = []
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
ore = match_ore(line.lower())
if not ore:
continue
ints = [int(n.replace(",", "")) for n in re.findall(r"\d[\d,]*", line)]
ints = [n for n in ints if n >= 10] # drop tiny (distance fragments)
if not ints:
continue
rows.append((ore, max(ints))) # units is the big number on the row
return rows
# Selected-Item panel (no survey scanner needed): "...Quantity 40,370 Units".
# Match the NUMBER before "Units" — don't require the word "Quantity" (OCR drops it).
QTY_RE = re.compile(r"([\d.,]{2,})\s*units?\b", re.I)
def parse_selected(text):
"""The selected asteroid's remaining units from its info panel, or None."""
m = QTY_RE.search(text.replace("\n", " "))
if not m:
return None
digits = re.sub(r"\D", "", m.group(1)) # tolerate OCR , vs . in the number
return int(digits) if digits and len(digits) >= 2 else None
def save_rock_region(cp, l, t, wd, ht, mode="survey"):
if not cp.has_section("rock"):
cp.add_section("rock")
cp["rock"]["region"] = f"{l},{t},{wd},{ht}"
cp["rock"]["mode"] = mode # 'survey' rows or 'selected' item
with open(w.CONFIG_PATH, "w") as f:
cp.write(f)
def detect_selected_region(cp):
"""Find the Selected Item 'Quantity N Units' line WITHIN the EVE window capture
(clean, occlusion-proof). Returns a WINDOW-RELATIVE tight [l,t,w,h] or None."""
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
img = w.capture_window()
if img is None:
return None
S = 2
d = pytesseract.image_to_data(img.resize((img.width*S, img.height*S)),
output_type=pytesseract.Output.DICT)
n = len(d["text"])
for i in range(n):
if "unit" not in d["text"][i].strip().lower():
continue
uy, uh, ux, ur = d["top"][i], d["height"][i], d["left"][i], d["left"][i] + d["width"][i]
# the number is the word(s) just LEFT of "Units" on the same line
nums = [j for j in range(n) if d["text"][j].strip() and abs(d["top"][j] - uy) < uh
and 0 < (ux - d["left"][j]) < 340 * S and re.search(r"\d", d["text"][j])]
if not nums:
continue
l = min(d["left"][j] for j in nums) // S
reg = [max(0, l - 20), max(0, uy // S - 6), (ur // S - l) + 45, uh // S + 12]
save_rock_region(cp, *reg, mode="selected")
print(f"[rock] Selected-Item quantity region (window-relative) -> {reg}")
return reg
return None
def detect_survey_region(cp):
"""Find the Survey Scanner Results window by locating >=2 'ore name + number' rows."""
import mss
import pytesseract
from PIL import Image
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
try:
with mss.mss() as sct:
mon = sct.monitors[0]
raw = sct.grab(mon)
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
except Exception as e:
print(f"[rock] detect error: {e}")
return None
lines = {}
for i in range(len(data["text"])):
if not data["text"][i].strip():
continue
k = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
lines.setdefault(k, []).append(i)
boxes = []
for idxs in lines.values():
joined = " ".join(data["text"][i] for i in idxs).lower()
if match_ore(joined) and re.search(r"\d", joined):
xs = [data["left"][i] for i in idxs]
ys = [data["top"][i] for i in idxs]
rs = [data["left"][i] + data["width"][i] for i in idxs]
bs = [data["top"][i] + data["height"][i] for i in idxs]
boxes.append((min(xs), min(ys), max(rs), max(bs)))
if len(boxes) < 2:
return None
pad = 12
l = mon["left"] + min(b[0] for b in boxes) - pad
t = mon["top"] + min(b[1] for b in boxes) - pad
wd = max(b[2] for b in boxes) - min(b[0] for b in boxes) + 2 * pad
ht = max(b[3] for b in boxes) - min(b[1] for b in boxes) + 2 * pad
save_rock_region(cp, l, t, wd, ht)
print(f"[rock] auto-detected survey window -> {l},{t},{wd},{ht} ({len(boxes)} rows)")
return [l, t, wd, ht]
def hold_rate():
"""Live ore-hold fill rate (m³/min) written by eve_orehold_watcher, if fresh."""
try:
d = json.load(open(w.RATE_PATH))
if time.time() - d.get("ts", 0) <= 120:
return float(d.get("rate_m3min", 0) or 0)
except Exception:
pass
return 0.0
class Tracker:
"""Per-rock unit history -> measured depletion rate (units/sec)."""
def __init__(self):
self.h = {} # key -> [(t, units)]
def update(self, rows, now):
counts, seen, keyed = {}, set(), []
for ore, units in rows:
ore = ore.lower()
counts[ore] = counts.get(ore, 0) + 1
key = f"{ore}#{counts[ore]}"
keyed.append((key, ore, units))
seen.add(key)
hist = self.h.setdefault(key, [])
if hist and units > hist[-1][1] + 1: # went up = new rock / rescan -> reset
hist.clear()
hist.append((now, units))
self.h[key] = [(t, u) for (t, u) in hist if now - t <= 120]
for key in list(self.h): # forget rocks no longer listed
if key not in seen:
del self.h[key]
return keyed
def rate(self, key):
hist = self.h.get(key, [])
if len(hist) < 2 or hist[-1][0] - hist[0][0] < 15:
return 0.0
du = hist[0][1] - hist[-1][1]
dt = hist[-1][0] - hist[0][0]
return du / dt if du > 0 else 0.0
def get_region(cp):
"""Return (region, mode). mode = 'survey' (scanner rows) or 'selected' (Selected Item)."""
if cp.has_section("rock"):
rs = cp.get("rock", "region", fallback="").strip()
if rs:
return [int(x) for x in rs.split(",")], cp.get("rock", "mode", fallback="survey")
print("[rock] no saved region — locating the Selected-Item Quantity line...")
r = detect_selected_region(cp) # primary: the rock you have selected
if r:
return r, "selected"
r = detect_survey_region(cp) # fallback: a Survey Scanner window
if r:
return r, "survey"
return None, None
def read_rows(cp, region, mode):
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd: # Tesseract isn't always on PATH; set it explicitly
pytesseract.pytesseract.tesseract_cmd = tcmd
img = w.grab_region(region)
try: # upscale: small panel text reads better
img = img.resize((img.width * 2, img.height * 2))
except Exception:
pass
text = pytesseract.image_to_string(img, config="--psm 6")
if mode == "selected":
q = parse_selected(text)
return [("rock", q)] if q else []
return parse_survey(text)
def _fmt_dur(secs):
"""Human time-left, e.g. '1h 04m', '7m 20s', '45s'."""
secs = int(max(0, secs))
h, rem = divmod(secs, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m:02d}m"
if m:
return f"{m}m {s:02d}s"
return f"{s}s"
def main():
cp = w.load_config()
if "--snip" in sys.argv:
import importlib
# reuse the ore-hold snipper UI but save to [rock]
l = w.run_snip.__doc__ # noqa: F841 (keep import side effect minimal)
print("Drag a box around the whole Survey Scanner Results list.")
_snip_to_rock(cp)
return
sec = cp.has_section("rock")
poll = cp.getint("rock", "poll_secs", fallback=5) if sec else 5
switch_secs = cp.getint("rock", "switch_secs", fallback=45) if sec else 45
cooldown = cp.getint("rock", "cooldown_secs", fallback=60) if sec else 60
status_secs = cp.getint("rock", "status_secs", fallback=90) if sec else 90
if "--test" in sys.argv:
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
region, mode = get_region(cp)
if not region:
print("Couldn't find a Survey Scanner window or a Selected-Item 'Quantity N "
"Units'. Select a rock (or open the survey results) and retry.")
return
print(f"mode={mode} rows:", read_rows(cp, region, mode))
return
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
print(f"[rock] started; switch<{switch_secs}s, poll {poll}s (waits for !mining on)")
tracker = Tracker()
last_alert = {}
last_status = 0.0
region = mode = None
while True:
if not w.bot_mining(cp): # only during a mining session
tracker.h.clear()
time.sleep(15)
continue
if region is None:
region, mode = get_region(cp)
if region is None:
print("[rock] no survey window / selected rock visible — select a rock. "
"Retrying...")
time.sleep(max(poll, 15))
continue
w.heartbeat(cp, "rock")
try:
rows = read_rows(cp, region, mode)
if not rows:
# nothing selected at the moment — KEEP the fixed region (positions are
# stable; don't burn CPU re-hunting on a 9000px screen), just retry.
time.sleep(poll)
continue
now = time.time()
keyed = tracker.update(rows, now)
hr = hold_rate()
actives = [] # (key, ore, units, tleft)
for key, ore, units in keyed:
r = tracker.rate(key) # measured units/sec
if r <= 0 and len(keyed) == 1 and hr > 0 and ORE_VOL.get(ore):
r = hr / 60.0 / ORE_VOL[ore] # single-rock hold-fill fallback
if r <= 0:
continue
actives.append((key, ore, units, units / r))
# the rock you're emptying = the one with the least time left
actives.sort(key=lambda x: x[3])
for key, ore, units, tleft in actives:
if tleft <= switch_secs and now - last_alert.get(key, 0) > cooldown:
last_alert[key] = now
others = [(o, u) for (k, o, u) in keyed if k != key]
nxt = max(others, key=lambda x: x[1]) if others else None
empty_at = int(now + tleft)
msg = f"{ore.title()} rock empties in ~{_fmt_dur(tleft)} (<t:{empty_at}:R>) — {units:,} u left."
if nxt:
msg += f" Switch to {nxt[0].title()} ({nxt[1]:,} u)."
w.notify(cp, "Switch rocks", msg, priority="high", tags="pick,gem")
print(f"[rock] ALERT {msg}")
# periodic live readout of the rock you're on (so you see the countdown)
if actives and not w.bot_muted(cp) and now - last_status >= status_secs:
key, ore, units, tleft = actives[0]
empty_at = int(now + tleft)
others = [(o, u) for (k, o, u) in keyed if k != key]
nxt = max(others, key=lambda x: x[1]) if others else None
line = f"🪨 {ore.title()} {units:,} u · ~{_fmt_dur(tleft)} left (empties <t:{empty_at}:R>)"
if nxt:
line += f" · next: {nxt[0].title()} ({nxt[1]:,} u)"
w._discord_live(cp, "rock", f"⛏️ {socket.gethostname()} current rock", line)
last_status = now
except Exception as e:
print(f"[rock] error: {e}")
time.sleep(poll)
def _snip_to_rock(cp):
"""Drag-select the survey window, save its box to [rock] region."""
import tkinter as tk
coords = {}
root = tk.Tk()
root.attributes("-fullscreen", True)
root.attributes("-alpha", 0.25)
root.configure(bg="black")
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
canvas.pack(fill=tk.BOTH, expand=True)
rect = {"id": None, "x0": 0, "y0": 0}
def on_press(e):
rect["x0"], rect["y0"] = e.x_root, e.y_root
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2)
def on_drag(e):
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
rect["y0"] - root.winfo_rooty(), e.x, e.y)
def on_release(e):
x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root
coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0))
root.destroy()
canvas.bind("<ButtonPress-1>", on_press)
canvas.bind("<B1-Motion>", on_drag)
canvas.bind("<ButtonRelease-1>", on_release)
root.bind("<Escape>", lambda e: root.destroy())
root.mainloop()
if "r" in coords:
l, t, wd, ht = coords["r"]
save_rock_region(cp, l, t, wd, ht)
print(f"Saved survey region {l},{t},{wd},{ht} to config.ini")
else:
print("Cancelled.")
if __name__ == "__main__":
main()