394 lines
16 KiB
Python
394 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Rock watcher — 'when do I switch asteroids?' alerts from your Survey Scanner.
|
|
|
|
EVE has no API for asteroid contents, so this reads the **Survey Scanner Results**
|
|
window off the screen (the only place that shows units remaining per rock). It tracks
|
|
each rock's amount over time and tells you, in Discord, when the rock you're mining is
|
|
about to run dry — and which rock to jump to next.
|
|
|
|
Two ways it knows your depletion rate (best available wins):
|
|
1. Direct — if the survey scanner is left running, each rock's number drops every
|
|
cycle; it measures units/sec straight from that (most accurate, multi-rock).
|
|
2. Hold-fill fallback — if there's a single rock and the scanner is static, it uses
|
|
the live ore-hold fill rate (written by eve_orehold_watcher to .mining_rate)
|
|
divided by the ore's m³/unit.
|
|
|
|
Only active during a mining session: start one with `!mining on` in Discord
|
|
(`!mining off` to stop). Runs headless, launched by start-all.ps1.
|
|
|
|
python eve_rock_watcher.py # normal (waits for !mining on)
|
|
python eve_rock_watcher.py --snip # manually box the survey window (if auto fails)
|
|
python eve_rock_watcher.py --test # parse one screen grab and print rows
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(0, HERE)
|
|
import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining
|
|
|
|
# ore unit volumes (m³/unit) for the hold-fill fallback; base names, longest first
|
|
ORE_VOL = {
|
|
"dark ochre": 8.0, "veldspar": 0.1, "scordite": 0.15, "pyroxeres": 0.3,
|
|
"plagioclase": 0.35, "omber": 0.6, "kernite": 1.2, "jaspet": 2.0,
|
|
"hemorphite": 3.0, "hedbergite": 3.0, "gneiss": 5.0, "crokite": 16.0,
|
|
"spodumain": 16.0, "bistot": 16.0, "arkonor": 16.0, "mercoxit": 40.0,
|
|
}
|
|
ORE_NAMES = sorted(ORE_VOL, key=len, reverse=True) # match "dark ochre" before "ochre"
|
|
|
|
|
|
def match_ore(line_lower):
|
|
for name in ORE_NAMES:
|
|
if name in line_lower:
|
|
return name
|
|
return None
|
|
|
|
|
|
def parse_survey(text):
|
|
"""OCR text of the Survey Scanner Results -> [(ore_base_name, units), ...]."""
|
|
rows = []
|
|
for raw in text.splitlines():
|
|
line = raw.strip()
|
|
if not line:
|
|
continue
|
|
ore = match_ore(line.lower())
|
|
if not ore:
|
|
continue
|
|
ints = [int(n.replace(",", "")) for n in re.findall(r"\d[\d,]*", line)]
|
|
ints = [n for n in ints if n >= 10] # drop tiny (distance fragments)
|
|
if not ints:
|
|
continue
|
|
rows.append((ore, max(ints))) # units is the big number on the row
|
|
return rows
|
|
|
|
|
|
# Selected-Item panel (no survey scanner needed): "...Quantity 40,370 Units".
|
|
# Match the NUMBER before "Units" — don't require the word "Quantity" (OCR drops it).
|
|
QTY_RE = re.compile(r"([\d.,]{2,})\s*units?\b", re.I)
|
|
|
|
|
|
def parse_selected(text):
|
|
"""The selected asteroid's remaining units from its info panel, or None."""
|
|
m = QTY_RE.search(text.replace("\n", " "))
|
|
if not m:
|
|
return None
|
|
digits = re.sub(r"\D", "", m.group(1)) # tolerate OCR , vs . in the number
|
|
return int(digits) if digits and len(digits) >= 2 else None
|
|
|
|
|
|
def save_rock_region(cp, l, t, wd, ht, mode="survey"):
|
|
if not cp.has_section("rock"):
|
|
cp.add_section("rock")
|
|
cp["rock"]["region"] = f"{l},{t},{wd},{ht}"
|
|
cp["rock"]["mode"] = mode # 'survey' rows or 'selected' item
|
|
with open(w.CONFIG_PATH, "w") as f:
|
|
cp.write(f)
|
|
|
|
|
|
def detect_selected_region(cp):
|
|
"""Find the Selected Item 'Quantity N Units' line WITHIN the EVE window capture
|
|
(clean, occlusion-proof). Returns a WINDOW-RELATIVE tight [l,t,w,h] or None."""
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
img = w.capture_window()
|
|
if img is None:
|
|
return None
|
|
S = 2
|
|
d = pytesseract.image_to_data(img.resize((img.width*S, img.height*S)),
|
|
output_type=pytesseract.Output.DICT)
|
|
n = len(d["text"])
|
|
for i in range(n):
|
|
if "unit" not in d["text"][i].strip().lower():
|
|
continue
|
|
uy, uh, ux, ur = d["top"][i], d["height"][i], d["left"][i], d["left"][i] + d["width"][i]
|
|
# the number is the word(s) just LEFT of "Units" on the same line
|
|
nums = [j for j in range(n) if d["text"][j].strip() and abs(d["top"][j] - uy) < uh
|
|
and 0 < (ux - d["left"][j]) < 340 * S and re.search(r"\d", d["text"][j])]
|
|
if not nums:
|
|
continue
|
|
l = min(d["left"][j] for j in nums) // S
|
|
reg = [max(0, l - 20), max(0, uy // S - 6), (ur // S - l) + 45, uh // S + 12]
|
|
save_rock_region(cp, *reg, mode="selected")
|
|
print(f"[rock] Selected-Item quantity region (window-relative) -> {reg}")
|
|
return reg
|
|
return None
|
|
|
|
|
|
def detect_survey_region(cp):
|
|
"""Find the Survey Scanner Results window by locating >=2 'ore name + number' rows."""
|
|
import mss
|
|
import pytesseract
|
|
from PIL import Image
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
try:
|
|
with mss.mss() as sct:
|
|
mon = sct.monitors[0]
|
|
raw = sct.grab(mon)
|
|
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
|
|
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
print(f"[rock] detect error: {e}")
|
|
return None
|
|
lines = {}
|
|
for i in range(len(data["text"])):
|
|
if not data["text"][i].strip():
|
|
continue
|
|
k = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
lines.setdefault(k, []).append(i)
|
|
boxes = []
|
|
for idxs in lines.values():
|
|
joined = " ".join(data["text"][i] for i in idxs).lower()
|
|
if match_ore(joined) and re.search(r"\d", joined):
|
|
xs = [data["left"][i] for i in idxs]
|
|
ys = [data["top"][i] for i in idxs]
|
|
rs = [data["left"][i] + data["width"][i] for i in idxs]
|
|
bs = [data["top"][i] + data["height"][i] for i in idxs]
|
|
boxes.append((min(xs), min(ys), max(rs), max(bs)))
|
|
if len(boxes) < 2:
|
|
return None
|
|
pad = 12
|
|
l = mon["left"] + min(b[0] for b in boxes) - pad
|
|
t = mon["top"] + min(b[1] for b in boxes) - pad
|
|
wd = max(b[2] for b in boxes) - min(b[0] for b in boxes) + 2 * pad
|
|
ht = max(b[3] for b in boxes) - min(b[1] for b in boxes) + 2 * pad
|
|
save_rock_region(cp, l, t, wd, ht)
|
|
print(f"[rock] auto-detected survey window -> {l},{t},{wd},{ht} ({len(boxes)} rows)")
|
|
return [l, t, wd, ht]
|
|
|
|
|
|
def hold_rate():
|
|
"""Live ore-hold fill rate (m³/min) written by eve_orehold_watcher, if fresh."""
|
|
try:
|
|
d = json.load(open(w.RATE_PATH))
|
|
if time.time() - d.get("ts", 0) <= 120:
|
|
return float(d.get("rate_m3min", 0) or 0)
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
class Tracker:
|
|
"""Per-rock unit history -> measured depletion rate (units/sec)."""
|
|
|
|
def __init__(self):
|
|
self.h = {} # key -> [(t, units)]
|
|
|
|
def update(self, rows, now):
|
|
counts, seen, keyed = {}, set(), []
|
|
for ore, units in rows:
|
|
ore = ore.lower()
|
|
counts[ore] = counts.get(ore, 0) + 1
|
|
key = f"{ore}#{counts[ore]}"
|
|
keyed.append((key, ore, units))
|
|
seen.add(key)
|
|
hist = self.h.setdefault(key, [])
|
|
if hist and units > hist[-1][1] + 1: # went up = new rock / rescan -> reset
|
|
hist.clear()
|
|
hist.append((now, units))
|
|
self.h[key] = [(t, u) for (t, u) in hist if now - t <= 120]
|
|
for key in list(self.h): # forget rocks no longer listed
|
|
if key not in seen:
|
|
del self.h[key]
|
|
return keyed
|
|
|
|
def rate(self, key):
|
|
hist = self.h.get(key, [])
|
|
if len(hist) < 2 or hist[-1][0] - hist[0][0] < 15:
|
|
return 0.0
|
|
du = hist[0][1] - hist[-1][1]
|
|
dt = hist[-1][0] - hist[0][0]
|
|
return du / dt if du > 0 else 0.0
|
|
|
|
|
|
def get_region(cp):
|
|
"""Return (region, mode). mode = 'survey' (scanner rows) or 'selected' (Selected Item)."""
|
|
if cp.has_section("rock"):
|
|
rs = cp.get("rock", "region", fallback="").strip()
|
|
if rs:
|
|
return [int(x) for x in rs.split(",")], cp.get("rock", "mode", fallback="survey")
|
|
print("[rock] no saved region — locating the Selected-Item Quantity line...")
|
|
r = detect_selected_region(cp) # primary: the rock you have selected
|
|
if r:
|
|
return r, "selected"
|
|
r = detect_survey_region(cp) # fallback: a Survey Scanner window
|
|
if r:
|
|
return r, "survey"
|
|
return None, None
|
|
|
|
|
|
def read_rows(cp, region, mode):
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd: # Tesseract isn't always on PATH; set it explicitly
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
img = w.grab_region(region)
|
|
try: # upscale: small panel text reads better
|
|
img = img.resize((img.width * 2, img.height * 2))
|
|
except Exception:
|
|
pass
|
|
text = pytesseract.image_to_string(img, config="--psm 6")
|
|
if mode == "selected":
|
|
q = parse_selected(text)
|
|
return [("rock", q)] if q else []
|
|
return parse_survey(text)
|
|
|
|
|
|
def _fmt_dur(secs):
|
|
"""Human time-left, e.g. '1h 04m', '7m 20s', '45s'."""
|
|
secs = int(max(0, secs))
|
|
h, rem = divmod(secs, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}h {m:02d}m"
|
|
if m:
|
|
return f"{m}m {s:02d}s"
|
|
return f"{s}s"
|
|
|
|
|
|
def main():
|
|
cp = w.load_config()
|
|
if "--snip" in sys.argv:
|
|
import importlib
|
|
# reuse the ore-hold snipper UI but save to [rock]
|
|
l = w.run_snip.__doc__ # noqa: F841 (keep import side effect minimal)
|
|
print("Drag a box around the whole Survey Scanner Results list.")
|
|
_snip_to_rock(cp)
|
|
return
|
|
|
|
sec = cp.has_section("rock")
|
|
poll = cp.getint("rock", "poll_secs", fallback=5) if sec else 5
|
|
switch_secs = cp.getint("rock", "switch_secs", fallback=45) if sec else 45
|
|
cooldown = cp.getint("rock", "cooldown_secs", fallback=60) if sec else 60
|
|
status_secs = cp.getint("rock", "status_secs", fallback=90) if sec else 90
|
|
|
|
if "--test" in sys.argv:
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
region, mode = get_region(cp)
|
|
if not region:
|
|
print("Couldn't find a Survey Scanner window or a Selected-Item 'Quantity N "
|
|
"Units'. Select a rock (or open the survey results) and retry.")
|
|
return
|
|
print(f"mode={mode} rows:", read_rows(cp, region, mode))
|
|
return
|
|
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
|
|
print(f"[rock] started; switch<{switch_secs}s, poll {poll}s (waits for !mining on)")
|
|
tracker = Tracker()
|
|
last_alert = {}
|
|
last_status = 0.0
|
|
region = mode = None
|
|
while True:
|
|
if not w.bot_mining(cp): # only during a mining session
|
|
tracker.h.clear()
|
|
time.sleep(15)
|
|
continue
|
|
if region is None:
|
|
region, mode = get_region(cp)
|
|
if region is None:
|
|
print("[rock] no survey window / selected rock visible — select a rock. "
|
|
"Retrying...")
|
|
time.sleep(max(poll, 15))
|
|
continue
|
|
w.heartbeat(cp, "rock")
|
|
try:
|
|
rows = read_rows(cp, region, mode)
|
|
if not rows:
|
|
# nothing selected at the moment — KEEP the fixed region (positions are
|
|
# stable; don't burn CPU re-hunting on a 9000px screen), just retry.
|
|
time.sleep(poll)
|
|
continue
|
|
now = time.time()
|
|
keyed = tracker.update(rows, now)
|
|
hr = hold_rate()
|
|
actives = [] # (key, ore, units, tleft)
|
|
for key, ore, units in keyed:
|
|
r = tracker.rate(key) # measured units/sec
|
|
if r <= 0 and len(keyed) == 1 and hr > 0 and ORE_VOL.get(ore):
|
|
r = hr / 60.0 / ORE_VOL[ore] # single-rock hold-fill fallback
|
|
if r <= 0:
|
|
continue
|
|
actives.append((key, ore, units, units / r))
|
|
# the rock you're emptying = the one with the least time left
|
|
actives.sort(key=lambda x: x[3])
|
|
for key, ore, units, tleft in actives:
|
|
if tleft <= switch_secs and now - last_alert.get(key, 0) > cooldown:
|
|
last_alert[key] = now
|
|
others = [(o, u) for (k, o, u) in keyed if k != key]
|
|
nxt = max(others, key=lambda x: x[1]) if others else None
|
|
empty_at = int(now + tleft)
|
|
msg = f"{ore.title()} rock empties in ~{_fmt_dur(tleft)} (<t:{empty_at}:R>) — {units:,} u left."
|
|
if nxt:
|
|
msg += f" Switch to {nxt[0].title()} ({nxt[1]:,} u)."
|
|
w.notify(cp, "Switch rocks", msg, priority="high", tags="pick,gem")
|
|
print(f"[rock] ALERT {msg}")
|
|
# periodic live readout of the rock you're on (so you see the countdown)
|
|
if actives and not w.bot_muted(cp) and now - last_status >= status_secs:
|
|
key, ore, units, tleft = actives[0]
|
|
empty_at = int(now + tleft)
|
|
others = [(o, u) for (k, o, u) in keyed if k != key]
|
|
nxt = max(others, key=lambda x: x[1]) if others else None
|
|
line = f"🪨 {ore.title()} {units:,} u · ~{_fmt_dur(tleft)} left (empties <t:{empty_at}:R>)"
|
|
if nxt:
|
|
line += f" · next: {nxt[0].title()} ({nxt[1]:,} u)"
|
|
w._discord_live(cp, "rock", f"⛏️ {socket.gethostname()} current rock", line)
|
|
last_status = now
|
|
except Exception as e:
|
|
print(f"[rock] error: {e}")
|
|
time.sleep(poll)
|
|
|
|
|
|
def _snip_to_rock(cp):
|
|
"""Drag-select the survey window, save its box to [rock] region."""
|
|
import tkinter as tk
|
|
coords = {}
|
|
root = tk.Tk()
|
|
root.attributes("-fullscreen", True)
|
|
root.attributes("-alpha", 0.25)
|
|
root.configure(bg="black")
|
|
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
|
|
canvas.pack(fill=tk.BOTH, expand=True)
|
|
rect = {"id": None, "x0": 0, "y0": 0}
|
|
|
|
def on_press(e):
|
|
rect["x0"], rect["y0"] = e.x_root, e.y_root
|
|
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2)
|
|
|
|
def on_drag(e):
|
|
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
|
|
rect["y0"] - root.winfo_rooty(), e.x, e.y)
|
|
|
|
def on_release(e):
|
|
x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root
|
|
coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0))
|
|
root.destroy()
|
|
|
|
canvas.bind("<ButtonPress-1>", on_press)
|
|
canvas.bind("<B1-Motion>", on_drag)
|
|
canvas.bind("<ButtonRelease-1>", on_release)
|
|
root.bind("<Escape>", lambda e: root.destroy())
|
|
root.mainloop()
|
|
if "r" in coords:
|
|
l, t, wd, ht = coords["r"]
|
|
save_rock_region(cp, l, t, wd, ht)
|
|
print(f"Saved survey region {l},{t},{wd},{ht} to config.ini")
|
|
else:
|
|
print("Cancelled.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|