292 lines
11 KiB
Python
292 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Rock watcher — 'when do I switch asteroids?' alerts from your Survey Scanner.
|
|
|
|
EVE has no API for asteroid contents, so this reads the **Survey Scanner Results**
|
|
window off the screen (the only place that shows units remaining per rock). It tracks
|
|
each rock's amount over time and tells you, in Discord, when the rock you're mining is
|
|
about to run dry — and which rock to jump to next.
|
|
|
|
Two ways it knows your depletion rate (best available wins):
|
|
1. Direct — if the survey scanner is left running, each rock's number drops every
|
|
cycle; it measures units/sec straight from that (most accurate, multi-rock).
|
|
2. Hold-fill fallback — if there's a single rock and the scanner is static, it uses
|
|
the live ore-hold fill rate (written by eve_orehold_watcher to .mining_rate)
|
|
divided by the ore's m³/unit.
|
|
|
|
Only active during a mining session: start one with `!mining on` in Discord
|
|
(`!mining off` to stop). Runs headless, launched by start-all.ps1.
|
|
|
|
python eve_rock_watcher.py # normal (waits for !mining on)
|
|
python eve_rock_watcher.py --snip # manually box the survey window (if auto fails)
|
|
python eve_rock_watcher.py --test # parse one screen grab and print rows
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(0, HERE)
|
|
import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining
|
|
|
|
# ore unit volumes (m³/unit) for the hold-fill fallback; base names, longest first
|
|
ORE_VOL = {
|
|
"dark ochre": 8.0, "veldspar": 0.1, "scordite": 0.15, "pyroxeres": 0.3,
|
|
"plagioclase": 0.35, "omber": 0.6, "kernite": 1.2, "jaspet": 2.0,
|
|
"hemorphite": 3.0, "hedbergite": 3.0, "gneiss": 5.0, "crokite": 16.0,
|
|
"spodumain": 16.0, "bistot": 16.0, "arkonor": 16.0, "mercoxit": 40.0,
|
|
}
|
|
ORE_NAMES = sorted(ORE_VOL, key=len, reverse=True) # match "dark ochre" before "ochre"
|
|
|
|
|
|
def match_ore(line_lower):
|
|
for name in ORE_NAMES:
|
|
if name in line_lower:
|
|
return name
|
|
return None
|
|
|
|
|
|
def parse_survey(text):
|
|
"""OCR text of the Survey Scanner Results -> [(ore_base_name, units), ...]."""
|
|
rows = []
|
|
for raw in text.splitlines():
|
|
line = raw.strip()
|
|
if not line:
|
|
continue
|
|
ore = match_ore(line.lower())
|
|
if not ore:
|
|
continue
|
|
ints = [int(n.replace(",", "")) for n in re.findall(r"\d[\d,]*", line)]
|
|
ints = [n for n in ints if n >= 10] # drop tiny (distance fragments)
|
|
if not ints:
|
|
continue
|
|
rows.append((ore, max(ints))) # units is the big number on the row
|
|
return rows
|
|
|
|
|
|
def save_rock_region(cp, l, t, wd, ht):
|
|
if not cp.has_section("rock"):
|
|
cp.add_section("rock")
|
|
cp["rock"]["region"] = f"{l},{t},{wd},{ht}"
|
|
with open(w.CONFIG_PATH, "w") as f:
|
|
cp.write(f)
|
|
|
|
|
|
def detect_survey_region(cp):
|
|
"""Find the Survey Scanner Results window by locating >=2 'ore name + number' rows."""
|
|
import mss
|
|
import pytesseract
|
|
from PIL import Image
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
try:
|
|
with mss.mss() as sct:
|
|
mon = sct.monitors[0]
|
|
raw = sct.grab(mon)
|
|
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
|
|
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
print(f"[rock] detect error: {e}")
|
|
return None
|
|
lines = {}
|
|
for i in range(len(data["text"])):
|
|
if not data["text"][i].strip():
|
|
continue
|
|
k = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
lines.setdefault(k, []).append(i)
|
|
boxes = []
|
|
for idxs in lines.values():
|
|
joined = " ".join(data["text"][i] for i in idxs).lower()
|
|
if match_ore(joined) and re.search(r"\d", joined):
|
|
xs = [data["left"][i] for i in idxs]
|
|
ys = [data["top"][i] for i in idxs]
|
|
rs = [data["left"][i] + data["width"][i] for i in idxs]
|
|
bs = [data["top"][i] + data["height"][i] for i in idxs]
|
|
boxes.append((min(xs), min(ys), max(rs), max(bs)))
|
|
if len(boxes) < 2:
|
|
return None
|
|
pad = 12
|
|
l = mon["left"] + min(b[0] for b in boxes) - pad
|
|
t = mon["top"] + min(b[1] for b in boxes) - pad
|
|
wd = max(b[2] for b in boxes) - min(b[0] for b in boxes) + 2 * pad
|
|
ht = max(b[3] for b in boxes) - min(b[1] for b in boxes) + 2 * pad
|
|
save_rock_region(cp, l, t, wd, ht)
|
|
print(f"[rock] auto-detected survey window -> {l},{t},{wd},{ht} ({len(boxes)} rows)")
|
|
return [l, t, wd, ht]
|
|
|
|
|
|
def hold_rate():
|
|
"""Live ore-hold fill rate (m³/min) written by eve_orehold_watcher, if fresh."""
|
|
try:
|
|
d = json.load(open(w.RATE_PATH))
|
|
if time.time() - d.get("ts", 0) <= 120:
|
|
return float(d.get("rate_m3min", 0) or 0)
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
class Tracker:
|
|
"""Per-rock unit history -> measured depletion rate (units/sec)."""
|
|
|
|
def __init__(self):
|
|
self.h = {} # key -> [(t, units)]
|
|
|
|
def update(self, rows, now):
|
|
counts, seen, keyed = {}, set(), []
|
|
for ore, units in rows:
|
|
ore = ore.lower()
|
|
counts[ore] = counts.get(ore, 0) + 1
|
|
key = f"{ore}#{counts[ore]}"
|
|
keyed.append((key, ore, units))
|
|
seen.add(key)
|
|
hist = self.h.setdefault(key, [])
|
|
if hist and units > hist[-1][1] + 1: # went up = new rock / rescan -> reset
|
|
hist.clear()
|
|
hist.append((now, units))
|
|
self.h[key] = [(t, u) for (t, u) in hist if now - t <= 120]
|
|
for key in list(self.h): # forget rocks no longer listed
|
|
if key not in seen:
|
|
del self.h[key]
|
|
return keyed
|
|
|
|
def rate(self, key):
|
|
hist = self.h.get(key, [])
|
|
if len(hist) < 2 or hist[-1][0] - hist[0][0] < 15:
|
|
return 0.0
|
|
du = hist[0][1] - hist[-1][1]
|
|
dt = hist[-1][0] - hist[0][0]
|
|
return du / dt if du > 0 else 0.0
|
|
|
|
|
|
def get_region(cp):
|
|
region_s = cp.get("rock", "region", fallback="").strip() if cp.has_section("rock") else ""
|
|
if region_s:
|
|
return [int(x) for x in region_s.split(",")]
|
|
print("[rock] no saved region — auto-detecting the Survey Scanner Results window...")
|
|
return detect_survey_region(cp)
|
|
|
|
|
|
def main():
|
|
cp = w.load_config()
|
|
if "--snip" in sys.argv:
|
|
import importlib
|
|
# reuse the ore-hold snipper UI but save to [rock]
|
|
l = w.run_snip.__doc__ # noqa: F841 (keep import side effect minimal)
|
|
print("Drag a box around the whole Survey Scanner Results list.")
|
|
_snip_to_rock(cp)
|
|
return
|
|
|
|
poll = cp.getint("rock", "poll_secs", fallback=5) if cp.has_section("rock") else 5
|
|
switch_secs = cp.getint("rock", "switch_secs", fallback=45) if cp.has_section("rock") else 45
|
|
cooldown = cp.getint("rock", "cooldown_secs", fallback=60) if cp.has_section("rock") else 60
|
|
|
|
if "--test" in sys.argv:
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
region = get_region(cp)
|
|
if not region:
|
|
print("Couldn't find the survey window. Open it (with rocks scanned) and retry, "
|
|
"or run --snip.")
|
|
return
|
|
text = pytesseract.image_to_string(w.grab_region(region))
|
|
print("rows:", parse_survey(text))
|
|
return
|
|
|
|
import pytesseract
|
|
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
|
|
if tcmd:
|
|
pytesseract.pytesseract.tesseract_cmd = tcmd
|
|
|
|
print(f"[rock] started; switch<{switch_secs}s, poll {poll}s (waits for !mining on)")
|
|
tracker = Tracker()
|
|
last_alert = {}
|
|
region = None
|
|
while True:
|
|
if not w.bot_mining(cp): # only during a mining session
|
|
tracker.h.clear()
|
|
time.sleep(15)
|
|
continue
|
|
if region is None:
|
|
region = get_region(cp)
|
|
if region is None:
|
|
print("[rock] survey window not visible — open it (rocks scanned). Retrying...")
|
|
time.sleep(max(poll, 15))
|
|
continue
|
|
w.heartbeat(cp, "rock")
|
|
try:
|
|
text = pytesseract.image_to_string(w.grab_region(region))
|
|
rows = parse_survey(text)
|
|
if not rows:
|
|
region = None # lost it; re-detect next loop
|
|
time.sleep(poll)
|
|
continue
|
|
now = time.time()
|
|
keyed = tracker.update(rows, now)
|
|
hr = hold_rate()
|
|
for key, ore, units in keyed:
|
|
r = tracker.rate(key) # measured units/sec
|
|
if r <= 0 and len(keyed) == 1 and hr > 0 and ORE_VOL.get(ore):
|
|
r = hr / 60.0 / ORE_VOL[ore] # single-rock hold-fill fallback
|
|
if r <= 0:
|
|
continue
|
|
tleft = units / r
|
|
if tleft <= switch_secs and now - last_alert.get(key, 0) > cooldown:
|
|
last_alert[key] = now
|
|
others = [(o, u) for (k, o, u) in keyed if k != key]
|
|
nxt = max(others, key=lambda x: x[1]) if others else None
|
|
msg = (f"{ore.title()} rock ~{int(tleft)}s from empty "
|
|
f"({units:,} u left).")
|
|
if nxt:
|
|
msg += f" Next: {nxt[0].title()} ({nxt[1]:,} u)."
|
|
w.notify(cp, "Switch rocks", msg, priority="high", tags="pick,gem")
|
|
print(f"[rock] ALERT {msg}")
|
|
except Exception as e:
|
|
print(f"[rock] error: {e}")
|
|
time.sleep(poll)
|
|
|
|
|
|
def _snip_to_rock(cp):
|
|
"""Drag-select the survey window, save its box to [rock] region."""
|
|
import tkinter as tk
|
|
coords = {}
|
|
root = tk.Tk()
|
|
root.attributes("-fullscreen", True)
|
|
root.attributes("-alpha", 0.25)
|
|
root.configure(bg="black")
|
|
canvas = tk.Canvas(root, cursor="cross", bg="black", highlightthickness=0)
|
|
canvas.pack(fill=tk.BOTH, expand=True)
|
|
rect = {"id": None, "x0": 0, "y0": 0}
|
|
|
|
def on_press(e):
|
|
rect["x0"], rect["y0"] = e.x_root, e.y_root
|
|
rect["id"] = canvas.create_rectangle(e.x, e.y, e.x, e.y, outline="red", width=2)
|
|
|
|
def on_drag(e):
|
|
canvas.coords(rect["id"], rect["x0"] - root.winfo_rootx(),
|
|
rect["y0"] - root.winfo_rooty(), e.x, e.y)
|
|
|
|
def on_release(e):
|
|
x0, y0, x1, y1 = rect["x0"], rect["y0"], e.x_root, e.y_root
|
|
coords["r"] = (min(x0, x1), min(y0, y1), abs(x1 - x0), abs(y1 - y0))
|
|
root.destroy()
|
|
|
|
canvas.bind("<ButtonPress-1>", on_press)
|
|
canvas.bind("<B1-Motion>", on_drag)
|
|
canvas.bind("<ButtonRelease-1>", on_release)
|
|
root.bind("<Escape>", lambda e: root.destroy())
|
|
root.mainloop()
|
|
if "r" in coords:
|
|
l, t, wd, ht = coords["r"]
|
|
save_rock_region(cp, l, t, wd, ht)
|
|
print(f"Saved survey region {l},{t},{wd},{ht} to config.ini")
|
|
else:
|
|
print("Cancelled.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|