publish eve_local_watcher.py

This commit is contained in:
brockdarnold 2026-06-15 00:44:13 +00:00
parent febade756b
commit e97dfa2e0b

View file

@ -1,44 +1,88 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Local watcher — 'someone just entered Local' alerts for isolated systems. """Local watcher — warns only about DANGEROUS pilots entering Local.
EVE has no API for who's in Local, and the chat logs only record *messages*, not who You're aware of how many people are in system; a raw head-count is just noise. So this
jumps in/out. So this reads your **Local member list** off the screen and pings you reads your Local member list off the screen, and when a NEW pilot appears it looks them
when the pilot count rises exactly what matters when you're mining somewhere quiet up on zKillboard + ESI and pings you only if they're a threat — an active killer, lots
like Solitude and a stranger lands in system. Best in low-population systems (the whole of kills, or flashy/negative sec. Quiet otherwise. (Plain 'a ship died in your system'
member list is visible); in a busy hub it just tracks what's on screen. is already handled by danger_watch.)
Only runs during a mining session (`!mining on` in Discord). Point it at your Local Only runs during a mining session (`!mining on`). Point it at your Local window once:
window once: python eve_local_watcher.py --snip (or send Brock a screenshot and he python eve_local_watcher.py --snip (or send a screenshot and Brock sets the region)
sets [local] region in config.ini). Auto-found if the window shows the word 'Local'.
python eve_local_watcher.py # normal (waits for !mining on) python eve_local_watcher.py # normal (waits for !mining on)
python eve_local_watcher.py --snip # box your Local member-list window python eve_local_watcher.py --test # read once, print the names it sees
python eve_local_watcher.py --test # read once and print the pilot count
""" """
import json
import os import os
import re import re
import socket
import sys import sys
import time import time
import urllib.request
HERE = os.path.dirname(os.path.abspath(__file__)) HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, HERE) sys.path.insert(0, HERE)
import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining, snip_region import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining, snip_region
UA = "eve-watcher-local (brockdarnold@gmail.com)"
NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9 '\-\.]{2,36}$") # plausible EVE pilot name
def count_pilots(text):
"""Count name-like lines in the OCR'd member list (a row per pilot).""" def extract_names(text):
n = 0 """Pull plausible pilot names out of the OCR'd member list."""
names = []
for line in text.splitlines(): for line in text.splitlines():
s = line.strip() s = line.strip()
if len(s) < 3: if len(s) < 3 or s.lower().startswith("local"):
continue continue
if s.lower().startswith("local"): # channel header, not a pilot if NAME_RE.match(s) and sum(c.isalpha() for c in s) >= 3:
continue names.append(s)
if sum(c.isalpha() for c in s) >= 2: # looks like a name return names
n += 1
return n
def _get(url, data=None, method="GET"):
hdr = {"User-Agent": UA}
if data is not None:
hdr["Content-Type"] = "application/json"
data = json.dumps(data).encode()
return json.loads(urllib.request.urlopen(
urllib.request.Request(url, data=data, headers=hdr, method=method), timeout=15).read())
def resolve_char(name):
"""Name -> character_id via ESI (None if it's not a real character / OCR garbage)."""
try:
d = _get("https://esi.evetech.net/latest/universe/ids/?datasource=tranquility",
data=[name], method="POST")
chars = d.get("characters") or []
return chars[0]["id"] if chars else None
except Exception:
return None
def threat(cid):
"""Return a threat dict for a character, or None if not notable."""
kills = danger = recent = 0
try:
z = _get(f"https://zkillboard.com/api/stats/characterID/{cid}/")
kills = int(z.get("shipsDestroyed") or 0)
danger = int(z.get("dangerRatio") or 0)
recent = int(((z.get("activepvp") or {}).get("kills") or {}).get("count") or 0)
except Exception:
pass
sec = None
try:
sec = _get(f"https://esi.evetech.net/latest/characters/{cid}/?datasource=tranquility").get("security_status")
except Exception:
pass
notable = recent >= 1 or kills >= 25 or danger >= 50 or (sec is not None and sec <= -1.5)
if not notable:
return None
return {"kills": kills, "danger": danger, "recent": recent, "sec": sec}
# ---- screen region (unchanged: auto-locate by the 'Local' label, or --snip) ----
def save_local_region(cp, l, t, wd, ht): def save_local_region(cp, l, t, wd, ht):
if not cp.has_section("local"): if not cp.has_section("local"):
cp.add_section("local") cp.add_section("local")
@ -48,8 +92,6 @@ def save_local_region(cp, l, t, wd, ht):
def detect_local_region(cp): def detect_local_region(cp):
"""Best-effort: find the Local window by locating the 'Local' label, take the
column beneath it as the member list. Falls back to None (use --snip)."""
import mss import mss
import pytesseract import pytesseract
from PIL import Image from PIL import Image
@ -69,11 +111,9 @@ def detect_local_region(cp):
if data["text"][i].strip().lower() == "local": if data["text"][i].strip().lower() == "local":
x = mon["left"] + data["left"][i] - 10 x = mon["left"] + data["left"][i] - 10
y = mon["top"] + data["top"][i] - 6 y = mon["top"] + data["top"][i] - 6
wd = max(220, data["width"][i] + 220) save_local_region(cp, x, y, max(220, data["width"][i] + 220), 600)
ht = 600 # capture the list beneath the label
save_local_region(cp, x, y, wd, ht)
print(f"[local] auto-located Local window near {x},{y}") print(f"[local] auto-located Local window near {x},{y}")
return [x, y, wd, ht] return [x, y, max(220, data["width"][i] + 220), 600]
return None return None
@ -101,30 +141,24 @@ def main():
if tcmd: if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd pytesseract.pytesseract.tesseract_cmd = tcmd
sec = cp.has_section("local") poll = cp.getint("local", "poll_secs", fallback=12) if cp.has_section("local") else 12
poll = cp.getint("local", "poll_secs", fallback=8) if sec else 8
cooldown = cp.getint("local", "cooldown_secs", fallback=180) if sec else 180
# only meaningful in QUIET systems (Solitude etc.); above this it's just noise
max_pop = cp.getint("local", "max_pop", fallback=12) if sec else 12
confirm = cp.getint("local", "confirm_reads", fallback=3) if sec else 3
if "--test" in sys.argv: if "--test" in sys.argv:
region = get_region(cp) region = get_region(cp)
if not region: if not region:
print("No Local region. Run --snip (or send a screenshot).") print("No Local region. Run --snip (or send a screenshot).")
return return
print("pilots:", count_pilots(pytesseract.image_to_string(w.grab_region(region)))) print("names:", extract_names(pytesseract.image_to_string(w.grab_region(region))))
return return
print(f"[local] started; poll {poll}s (waits for !mining on)") print(f"[local] started; threat-only (kills/killers/sec). poll {poll}s (waits for !mining on)")
region = None region = None
prev = None seen = set() # names already evaluated (don't re-query)
pending = None primed = False # first scan just records who's already here (no alerts)
pend_n = 0
last_alert = 0.0
while True: while True:
if not w.bot_mining(cp): if not w.bot_mining(cp):
prev = None seen.clear()
primed = False
time.sleep(15) time.sleep(15)
continue continue
if region is None: if region is None:
@ -135,29 +169,26 @@ def main():
continue continue
w.heartbeat(cp, "local") w.heartbeat(cp, "local")
try: try:
count = count_pilots(pytesseract.image_to_string(w.grab_region(region))) names = extract_names(pytesseract.image_to_string(w.grab_region(region)))
now = time.time() new = [n for n in names if n not in seen]
if prev is None: for n in new[:6]: # cap lookups per scan
prev = count seen.add(n)
print(f"[local] baseline {count} pilots") if not primed:
elif count > max_pop: continue # first pass = baseline, no alerts
prev = count # busy system — track, never alert cid = resolve_char(n)
pending, pend_n = None, 0 if not cid:
elif count > prev: continue # OCR garbage / not a real pilot
# require the higher count to persist across `confirm` reads (OCR jitter) t = threat(cid)
pend_n = pend_n + 1 if pending == count else 1 if not t:
pending = count continue # entered, but not a threat -> stay quiet
if pend_n >= confirm and now - last_alert > cooldown: sec = f"{t['sec']:.1f}" if t['sec'] is not None else "?"
w.notify(cp, "Someone entered Local", w.notify(cp, "⚠ Threat in Local",
f"+{count - prev} in Local — {count} pilots now (was {prev}). " f"**{n}** entered — {t['recent']} recent kills, {t['kills']} total, "
"Check d-scan.", priority="high", tags="eyes") f"danger {t['danger']}%, sec {sec}. "
last_alert = now f"https://zkillboard.com/character/{cid}/",
prev = count priority="urgent", tags="skull,warning")
pending, pend_n = None, 0 print(f"[local] THREAT {n} (kills {t['kills']}, danger {t['danger']})")
else: primed = True
if count < prev:
prev = count # someone left; rebaseline down
pending, pend_n = None, 0
except Exception as e: except Exception as e:
print(f"[local] error: {e}") print(f"[local] error: {e}")
time.sleep(poll) time.sleep(poll)