eve-watcher/eve_local_watcher.py

198 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""Local watcher — warns only about DANGEROUS pilots entering Local.
You're aware of how many people are in system; a raw head-count is just noise. So this
reads your Local member list off the screen, and when a NEW pilot appears it looks them
up on zKillboard + ESI and pings you only if they're a threat — an active killer, lots
of kills, or flashy/negative sec. Quiet otherwise. (Plain 'a ship died in your system'
is already handled by danger_watch.)
Only runs during a mining session (`!mining on`). Point it at your Local window once:
python eve_local_watcher.py --snip (or send a screenshot and Brock sets the region)
python eve_local_watcher.py # normal (waits for !mining on)
python eve_local_watcher.py --test # read once, print the names it sees
"""
import json
import os
import re
import socket
import sys
import time
import urllib.request
HERE = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, HERE)
import eve_orehold_watcher as w # load_config, notify, grab_region, heartbeat, bot_mining, snip_region
UA = "eve-watcher-local (brockdarnold@gmail.com)"
NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9 '\-\.]{2,36}$") # plausible EVE pilot name
def extract_names(text):
"""Pull plausible pilot names out of the OCR'd member list."""
names = []
for line in text.splitlines():
s = line.strip()
if len(s) < 3 or s.lower().startswith("local"):
continue
if NAME_RE.match(s) and sum(c.isalpha() for c in s) >= 3:
names.append(s)
return names
def _get(url, data=None, method="GET"):
hdr = {"User-Agent": UA}
if data is not None:
hdr["Content-Type"] = "application/json"
data = json.dumps(data).encode()
return json.loads(urllib.request.urlopen(
urllib.request.Request(url, data=data, headers=hdr, method=method), timeout=15).read())
def resolve_char(name):
"""Name -> character_id via ESI (None if it's not a real character / OCR garbage)."""
try:
d = _get("https://esi.evetech.net/latest/universe/ids/?datasource=tranquility",
data=[name], method="POST")
chars = d.get("characters") or []
return chars[0]["id"] if chars else None
except Exception:
return None
def threat(cid):
"""Return a threat dict for a character, or None if not notable."""
kills = danger = recent = 0
try:
z = _get(f"https://zkillboard.com/api/stats/characterID/{cid}/")
kills = int(z.get("shipsDestroyed") or 0)
danger = int(z.get("dangerRatio") or 0)
recent = int(((z.get("activepvp") or {}).get("kills") or {}).get("count") or 0)
except Exception:
pass
sec = None
try:
sec = _get(f"https://esi.evetech.net/latest/characters/{cid}/?datasource=tranquility").get("security_status")
except Exception:
pass
notable = recent >= 1 or kills >= 25 or danger >= 50 or (sec is not None and sec <= -1.5)
if not notable:
return None
return {"kills": kills, "danger": danger, "recent": recent, "sec": sec}
# ---- screen region (unchanged: auto-locate by the 'Local' label, or --snip) ----
def save_local_region(cp, l, t, wd, ht):
if not cp.has_section("local"):
cp.add_section("local")
cp["local"]["region"] = f"{l},{t},{wd},{ht}"
with open(w.CONFIG_PATH, "w") as f:
cp.write(f)
def detect_local_region(cp):
import mss
import pytesseract
from PIL import Image
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
try:
with mss.mss() as sct:
mon = sct.monitors[0]
raw = sct.grab(mon)
img = Image.frombytes("RGB", raw.size, raw.bgra, "raw", "BGRX")
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
except Exception as e:
print(f"[local] detect error: {e}")
return None
for i in range(len(data["text"])):
if data["text"][i].strip().lower() == "local":
x = mon["left"] + data["left"][i] - 10
y = mon["top"] + data["top"][i] - 6
save_local_region(cp, x, y, max(220, data["width"][i] + 220), 600)
print(f"[local] auto-located Local window near {x},{y}")
return [x, y, max(220, data["width"][i] + 220), 600]
return None
def get_region(cp):
rs = cp.get("local", "region", fallback="").strip() if cp.has_section("local") else ""
if rs:
return [int(v) for v in rs.split(",")]
print("[local] no saved region — trying to auto-locate the Local window...")
return detect_local_region(cp)
def main():
cp = w.load_config()
if "--snip" in sys.argv:
r = w.snip_region("Drag a box around your whole Local member-list window. Esc to cancel.")
if r:
save_local_region(cp, *r)
print(f"Saved Local region {r} to config.ini")
else:
print("Cancelled.")
return
import pytesseract
tcmd = cp.get("ocr", "tesseract_cmd", fallback="").strip()
if tcmd:
pytesseract.pytesseract.tesseract_cmd = tcmd
poll = cp.getint("local", "poll_secs", fallback=12) if cp.has_section("local") else 12
if "--test" in sys.argv:
region = get_region(cp)
if not region:
print("No Local region. Run --snip (or send a screenshot).")
return
print("names:", extract_names(pytesseract.image_to_string(w.grab_region(region))))
return
print(f"[local] started; threat-only (kills/killers/sec). poll {poll}s (waits for !mining on)")
region = None
seen = set() # names already evaluated (don't re-query)
primed = False # first scan just records who's already here (no alerts)
while True:
if not w.bot_mining(cp):
seen.clear()
primed = False
time.sleep(15)
continue
if region is None:
region = get_region(cp)
if region is None:
print("[local] Local window not found — open it / run --snip. Retrying...")
time.sleep(max(poll, 20))
continue
w.heartbeat(cp, "local")
try:
names = extract_names(pytesseract.image_to_string(w.grab_region(region)))
new = [n for n in names if n not in seen]
for n in new[:6]: # cap lookups per scan
seen.add(n)
if not primed:
continue # first pass = baseline, no alerts
cid = resolve_char(n)
if not cid:
continue # OCR garbage / not a real pilot
t = threat(cid)
if not t:
continue # entered, but not a threat -> stay quiet
sec = f"{t['sec']:.1f}" if t['sec'] is not None else "?"
w.notify(cp, "⚠ Threat in Local",
f"**{n}** entered — {t['recent']} recent kills, {t['kills']} total, "
f"danger {t['danger']}%, sec {sec}. "
f"https://zkillboard.com/character/{cid}/",
priority="urgent", tags="skull,warning")
print(f"[local] THREAT {n} (kills {t['kills']}, danger {t['danger']})")
primed = True
except Exception as e:
print(f"[local] error: {e}")
time.sleep(poll)
if __name__ == "__main__":
main()