diff --git a/apps/py-metadata/.vscode/settings.json b/apps/py-metadata/.vscode/settings.json index 0c8df84d..7acdbcec 100644 --- a/apps/py-metadata/.vscode/settings.json +++ b/apps/py-metadata/.vscode/settings.json @@ -1,4 +1,5 @@ { "python.defaultInterpreterPath": "venv/bin/python", - "python.terminal.activateEnvironment": true + "python.terminal.activateEnvironment": true, + "python-envs.pythonProjects": [] } diff --git a/apps/py-metadata/api/health_api.py b/apps/py-metadata/api/health_api.py index c098c006..bce2cc14 100644 --- a/apps/py-metadata/api/health_api.py +++ b/apps/py-metadata/api/health_api.py @@ -1,4 +1,3 @@ -# health_api.py import time from fastapi import FastAPI from fastapi.responses import JSONResponse @@ -21,6 +20,7 @@ async def health(): worker_ok = False db_error = None worker_error = None + in_backoff = None # --- Database check --- try: @@ -32,13 +32,17 @@ async def health(): # --- Worker heartbeat check --- try: - last = get_worker_heartbeat() + hb = get_worker_heartbeat() + last = hb["ts"] + in_backoff = hb["inBackoff"] + worker_error = hb["error"] + now = time.time() diff = now - last - worker_ok = diff < 90 # 90 sekunder toleranse pga at worker kan være inaktiv ved lav belastning + # Worker må ha iterert siste 90 sekunder + worker_ok = diff < 90 and worker_error is None - # Hvis worker er false og ingen exception ble kastet → legg diff i worker_error if not worker_ok and worker_error is None: worker_error = f"Heartbeat too old: {diff:.2f}s" @@ -55,6 +59,7 @@ async def health(): "database": db_ok, "database_error": db_error, "worker": worker_ok, - "worker_error": worker_error + "worker_error": worker_error, + "worker_in_backoff": in_backoff } ) diff --git a/apps/py-metadata/app.py b/apps/py-metadata/app.py index 6c449517..4f115cf4 100644 --- a/apps/py-metadata/app.py +++ b/apps/py-metadata/app.py @@ -11,7 +11,11 @@ from utils.logger import logger from worker.poller import run_worker shutdown_flag = False + +# Heartbeat state worker_heartbeat = time.time() +worker_in_backoff = False +worker_error = None def handle_shutdown(signum, frame): @@ -20,27 +24,25 @@ def handle_shutdown(signum, frame): shutdown_flag = True -def set_heartbeat(ts): - global worker_heartbeat +def set_heartbeat(ts, in_backoff=False, error=None): + global worker_heartbeat, worker_in_backoff, worker_error worker_heartbeat = ts + worker_in_backoff = in_backoff + worker_error = error def get_heartbeat(): - return worker_heartbeat + return { + "ts": worker_heartbeat, + "inBackoff": worker_in_backoff, + "error": worker_error + } def start_health_server(): uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="error") -def start_worker(db): - run_worker( - db=db, - shutdown_flag_ref=lambda: shutdown_flag, - heartbeat_ref=lambda ts: set_heartbeat(ts) - ) - - def main(): signal.signal(signal.SIGINT, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) @@ -59,13 +61,13 @@ def main(): Thread(target=start_health_server, daemon=True).start() logger.info("🌡️ Health API startet på port 8080") - # Start worker i egen tråd - Thread(target=start_worker, args=(db,), daemon=True).start() - logger.info("🔁 Worker startet i egen tråd") - - # Hold main-tråden i live - while not shutdown_flag: - time.sleep(1) + # Worker kjører i main-tråden + logger.info("🔁 Starter worker i main-tråden") + run_worker( + db=db, + shutdown_flag_ref=lambda: shutdown_flag, + heartbeat_ref=set_heartbeat + ) except Exception as e: logger.error(f"❌ Kritisk feil i app: {e}") diff --git a/apps/py-metadata/tests/test_poller.py b/apps/py-metadata/tests/test_poller.py index 8b65469d..baf6e296 100644 --- a/apps/py-metadata/tests/test_poller.py +++ b/apps/py-metadata/tests/test_poller.py @@ -8,6 +8,7 @@ import uuid from utils.time import utc_now import time + def make_dummy_event(): return MetadataSearchResultEvent( referenceId=uuid.uuid4(), @@ -18,8 +19,8 @@ def make_dummy_event(): ), results=[], persistedAt=utc_now(), - recommended=None, # fyll inn med en gyldig bool - status="Completed" # eller enum hvis modellen krever det + recommended=None, + status="Completed" ) @@ -37,6 +38,7 @@ def make_task(): persistedAt=utc_now() ) + def test_run_worker_processes_one(monkeypatch): events = [] task = make_task() @@ -46,6 +48,7 @@ def test_run_worker_processes_one(monkeypatch): def close(self): pass calls = {"n": 0} + def fetch_once(db): if calls["n"] == 0: calls["n"] += 1 @@ -56,25 +59,31 @@ def test_run_worker_processes_one(monkeypatch): monkeypatch.setattr("worker.poller.fetch_next_task", fetch_once) monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True) - # Viktig: async stub async def fake_process_task(db, task): return make_dummy_event() + monkeypatch.setattr("worker.poller.process_task", fake_process_task) def persist_stub(db, event, task_id): events.append("dummy_event") - monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub) + monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub) monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed")) monkeypatch.setattr("worker.poller.time.sleep", lambda s: None) - run_worker(db=FakeDB(), shutdown_flag_ref=lambda: calls["n"] >= 2) + # NEW: dummy heartbeat + monkeypatch.setattr("worker.poller.time.time", lambda: 123) + dummy_hb = lambda ts, in_backoff=False, error=None: None + + run_worker( + db=FakeDB(), + shutdown_flag_ref=lambda: calls["n"] >= 2, + heartbeat_ref=dummy_hb + ) assert "dummy_event" in events - - def test_backoff(monkeypatch): intervals = [] @@ -82,26 +91,29 @@ def test_backoff(monkeypatch): def connect(self): pass def close(self): pass - # monkeypatch fetch_next_task til å returnere None flere ganger monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None) - # monkeypatch time.sleep til å fange poll_interval def fake_sleep(seconds): intervals.append(seconds) + monkeypatch.setattr(time, "sleep", fake_sleep) - # monkeypatch claim_task, process_task osv. til dummy monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) - # kjør bare noen få iterasjoner ved å stoppe med shutdown_flag_ref - run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(intervals) >= 4) + dummy_hb = lambda ts, in_backoff=False, error=None: None + + run_worker( + db=FakeDB(), + shutdown_flag_ref=lambda: len(intervals) >= 4, + heartbeat_ref=dummy_hb + ) - # verifiser at intervallet øker (5 → 10 → 20 → 40) assert intervals == [5, 10, 20, 40] + def test_backoff_on_connection_error(monkeypatch): intervals = [] reconnects = [] @@ -111,28 +123,28 @@ def test_backoff_on_connection_error(monkeypatch): reconnects.append("reconnect") def close(self): pass - # Først: fetch_next_task kaster exception def failing_fetch(db): raise RuntimeError("DB connection lost") monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch) - # monkeypatch time.sleep til å fange poll_interval def fake_sleep(seconds): intervals.append(seconds) + monkeypatch.setattr(time, "sleep", fake_sleep) - # dummy funksjoner monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) - # kjør bare noen få iterasjoner - run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(reconnects) >= 2) + dummy_hb = lambda ts, in_backoff=False, error=None: None + + run_worker( + db=FakeDB(), + shutdown_flag_ref=lambda: len(reconnects) >= 2, + heartbeat_ref=dummy_hb + ) - # verifiser at reconnect ble kalt assert reconnects == ["reconnect", "reconnect"] - - # verifiser at poll_interval ble reset til 5 etter feil assert all(interval == 5 for interval in intervals) diff --git a/apps/py-metadata/worker/poller.py b/apps/py-metadata/worker/poller.py index 22e963b4..417c2546 100644 --- a/apps/py-metadata/worker/poller.py +++ b/apps/py-metadata/worker/poller.py @@ -7,17 +7,21 @@ from db.repository import claim_task, fetch_next_task, mark_failed, persist_even from models.event import MetadataSearchResultEvent from worker.processor import process_task from utils.logger import logger -from config.database_config import DatabaseConfig from models.task import MetadataSearchTask, Task -def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]: + +def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_ref) -> tuple[int, int]: """ Kjør én iterasjon av poller-loopen. Returnerer (sleep_interval, next_interval). """ try: task: Optional[Task] = fetch_next_task(db) + if task: + # Poller er aktiv → ikke i backoff + heartbeat_ref(time.time(), in_backoff=False, error=None) + if not isinstance(task, MetadataSearchTask): logger.warning(f"⚠️ Ukjent task-type {type(task)} for {task.taskId}, hopper over.") return poll_interval, poll_interval @@ -27,36 +31,42 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int return poll_interval, poll_interval logger.info(f"🔔 Fant task {task.taskId} ({task.task}), claimed by {worker_id}") + try: event: MetadataSearchResultEvent = asyncio.run(process_task(db, task)) if event: persist_event_and_mark_consumed(db, event, str(task.taskId)) logger.info(f"✅ Task {task.taskId} ferdig prosessert") else: - logger.error(f"❌ Task returned nothing! {task.taskId}") raise RuntimeError("process_task returned nothing!") + except Exception as task_error: logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}") mark_failed(db, str(task.taskId)) - return poll_interval, 5 # sov med nåværende, reset til 5 + heartbeat_ref(time.time(), in_backoff=False, error=str(task_error)) + + return poll_interval, 5 # reset interval + else: + # Ingen tasks → poller er i backoff logger.debug("Ingen nye tasks.") + heartbeat_ref(time.time(), in_backoff=True, error=None) return poll_interval, min(poll_interval * 2, 60) + except Exception as e: logger.error(f"⚠️ Feil i worker: {e}") db.connect() + heartbeat_ref(time.time(), in_backoff=True, error=str(e)) return poll_interval, 5 + def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None: poll_interval: int = 5 worker_id = f"PyMetadata-{uuid.uuid4()}" while not shutdown_flag_ref(): - if heartbeat_ref: - heartbeat_ref(time.time()) - - sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval) + sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval, heartbeat_ref) time.sleep(sleep_interval) logger.info("👋 run_worker loop avsluttet") - db.close() \ No newline at end of file + db.close()