From 7f8e7c44bc93808698f70dc697e7be2d11221d51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Thu, 29 Jan 2026 23:44:38 +0100 Subject: [PATCH] Metadata --- apps/py-metadata/app.py | 6 +-- apps/py-metadata/tests/test_poller.py | 75 +++++++++++++++++---------- apps/py-metadata/worker/poller.py | 16 ++---- 3 files changed, 53 insertions(+), 44 deletions(-) diff --git a/apps/py-metadata/app.py b/apps/py-metadata/app.py index 5a4ccbd6..3e315d4e 100644 --- a/apps/py-metadata/app.py +++ b/apps/py-metadata/app.py @@ -14,8 +14,8 @@ from worker.poller import run_worker shutdown_flag = False # Heartbeat state (nå med full backoff-tracking) -worker_heartbeat = None -worker_in_backoff = None +worker_heartbeat = time.time() +worker_in_backoff = False worker_error = None backoff_entered_at = None @@ -38,7 +38,7 @@ def set_heartbeat(ts, in_backoff=False, error=None): global backoff_entered_at, backoff_exited_at global backoff_entered_human, backoff_exited_human - prev = worker_in_backoff + prev: bool = worker_in_backoff # Går INN i backoff if in_backoff and prev is False: diff --git a/apps/py-metadata/tests/test_poller.py b/apps/py-metadata/tests/test_poller.py index baf6e296..c362436e 100644 --- a/apps/py-metadata/tests/test_poller.py +++ b/apps/py-metadata/tests/test_poller.py @@ -1,12 +1,11 @@ -from typing import Set import pytest +import uuid +import time from models.event import MetadataSearchResultEvent, EventMetadata -from worker.poller import run_worker, run_iteration +from worker.poller import run_worker from models.task import MetadataSearchTask, MetadataSearchData from models.enums import TaskStatus -import uuid from utils.time import utc_now -import time def make_dummy_event(): @@ -71,8 +70,8 @@ def test_run_worker_processes_one(monkeypatch): monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed")) monkeypatch.setattr("worker.poller.time.sleep", lambda s: None) - # NEW: dummy heartbeat monkeypatch.setattr("worker.poller.time.time", lambda: 123) + dummy_hb = lambda ts, in_backoff=False, error=None: None run_worker( @@ -93,15 +92,7 @@ def test_backoff(monkeypatch): monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None) - def fake_sleep(seconds): - intervals.append(seconds) - - monkeypatch.setattr(time, "sleep", fake_sleep) - - monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) - monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") - monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) - monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) + monkeypatch.setattr(time, "sleep", lambda s: intervals.append(s)) dummy_hb = lambda ts, in_backoff=False, error=None: None @@ -123,20 +114,9 @@ def test_backoff_on_connection_error(monkeypatch): reconnects.append("reconnect") def close(self): pass - def failing_fetch(db): - raise RuntimeError("DB connection lost") + monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: (_ for _ in ()).throw(RuntimeError("lost"))) - monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch) - - def fake_sleep(seconds): - intervals.append(seconds) - - monkeypatch.setattr(time, "sleep", fake_sleep) - - monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) - monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") - monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) - monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) + monkeypatch.setattr(time, "sleep", lambda s: intervals.append(s)) dummy_hb = lambda ts, in_backoff=False, error=None: None @@ -147,4 +127,43 @@ def test_backoff_on_connection_error(monkeypatch): ) assert reconnects == ["reconnect", "reconnect"] - assert all(interval == 5 for interval in intervals) + assert intervals == [5, 5] + + +def test_backoff_enter_exit(monkeypatch): + calls = [] + + class FakeDB: + def connect(self): pass + def close(self): pass + + seq = [True, False, True] + + def fake_fetch(db): + return make_task() if seq.pop(0) else None + + import worker.poller as poller + + monkeypatch.setattr("worker.poller.fetch_next_task", fake_fetch) + monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True) + + async def fake_process_task(db, task): + return make_dummy_event() + + monkeypatch.setattr("worker.poller.process_task", fake_process_task) + monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda *a, **k: None) + monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: None) + monkeypatch.setattr(time, "sleep", lambda s: None) + + def record_hb(ts, in_backoff=False, error=None): + calls.append(in_backoff) + + db = FakeDB() + poll_interval = 5 + worker_id = "test-worker" + + poller.run_iteration(db, worker_id, poll_interval, record_hb) + poller.run_iteration(db, worker_id, poll_interval, record_hb) + poller.run_iteration(db, worker_id, poll_interval, record_hb) + + assert calls == [False, True, False] diff --git a/apps/py-metadata/worker/poller.py b/apps/py-metadata/worker/poller.py index d8ed1860..7e9eff05 100644 --- a/apps/py-metadata/worker/poller.py +++ b/apps/py-metadata/worker/poller.py @@ -15,20 +15,12 @@ from worker.processor import process_task from utils.logger import logger from models.task import MetadataSearchTask, Task -# Viktig: dette er heartbeat_ref fra app.py -from app import set_heartbeat as heartbeat_ref - -def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]: - """ - Kjør én iterasjon av poller-loopen. - Returnerer (sleep_interval, next_poll_interval). - """ +def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_ref) -> tuple[int, int]: try: task: Optional[Task] = fetch_next_task(db) if task: - # Worker er aktiv → ikke i backoff heartbeat_ref(time.time(), in_backoff=False, error=None) if not isinstance(task, MetadataSearchTask): @@ -54,11 +46,9 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int mark_failed(db, str(task.taskId)) heartbeat_ref(time.time(), in_backoff=False, error=str(task_error)) - # Etter en task → reset poll interval return poll_interval, 5 else: - # Ingen tasks → worker går i backoff logger.debug("Ingen nye tasks.") heartbeat_ref(time.time(), in_backoff=True, error=None) return poll_interval, min(poll_interval * 2, 60) @@ -70,12 +60,12 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int return poll_interval, 5 -def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None: +def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None: poll_interval: int = 5 worker_id = f"PyMetadata-{uuid.uuid4()}" while not shutdown_flag_ref(): - sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval) + sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval, heartbeat_ref) time.sleep(sleep_interval) logger.info("👋 run_worker loop avsluttet")