Metadata
This commit is contained in:
parent
921f724157
commit
7f8e7c44bc
@ -14,8 +14,8 @@ from worker.poller import run_worker
|
|||||||
shutdown_flag = False
|
shutdown_flag = False
|
||||||
|
|
||||||
# Heartbeat state (nå med full backoff-tracking)
|
# Heartbeat state (nå med full backoff-tracking)
|
||||||
worker_heartbeat = None
|
worker_heartbeat = time.time()
|
||||||
worker_in_backoff = None
|
worker_in_backoff = False
|
||||||
worker_error = None
|
worker_error = None
|
||||||
|
|
||||||
backoff_entered_at = None
|
backoff_entered_at = None
|
||||||
@ -38,7 +38,7 @@ def set_heartbeat(ts, in_backoff=False, error=None):
|
|||||||
global backoff_entered_at, backoff_exited_at
|
global backoff_entered_at, backoff_exited_at
|
||||||
global backoff_entered_human, backoff_exited_human
|
global backoff_entered_human, backoff_exited_human
|
||||||
|
|
||||||
prev = worker_in_backoff
|
prev: bool = worker_in_backoff
|
||||||
|
|
||||||
# Går INN i backoff
|
# Går INN i backoff
|
||||||
if in_backoff and prev is False:
|
if in_backoff and prev is False:
|
||||||
|
|||||||
@ -1,12 +1,11 @@
|
|||||||
from typing import Set
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import uuid
|
||||||
|
import time
|
||||||
from models.event import MetadataSearchResultEvent, EventMetadata
|
from models.event import MetadataSearchResultEvent, EventMetadata
|
||||||
from worker.poller import run_worker, run_iteration
|
from worker.poller import run_worker
|
||||||
from models.task import MetadataSearchTask, MetadataSearchData
|
from models.task import MetadataSearchTask, MetadataSearchData
|
||||||
from models.enums import TaskStatus
|
from models.enums import TaskStatus
|
||||||
import uuid
|
|
||||||
from utils.time import utc_now
|
from utils.time import utc_now
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
def make_dummy_event():
|
def make_dummy_event():
|
||||||
@ -71,8 +70,8 @@ def test_run_worker_processes_one(monkeypatch):
|
|||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed"))
|
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed"))
|
||||||
monkeypatch.setattr("worker.poller.time.sleep", lambda s: None)
|
monkeypatch.setattr("worker.poller.time.sleep", lambda s: None)
|
||||||
|
|
||||||
# NEW: dummy heartbeat
|
|
||||||
monkeypatch.setattr("worker.poller.time.time", lambda: 123)
|
monkeypatch.setattr("worker.poller.time.time", lambda: 123)
|
||||||
|
|
||||||
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
|
|
||||||
run_worker(
|
run_worker(
|
||||||
@ -93,15 +92,7 @@ def test_backoff(monkeypatch):
|
|||||||
|
|
||||||
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None)
|
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None)
|
||||||
|
|
||||||
def fake_sleep(seconds):
|
monkeypatch.setattr(time, "sleep", lambda s: intervals.append(s))
|
||||||
intervals.append(seconds)
|
|
||||||
|
|
||||||
monkeypatch.setattr(time, "sleep", fake_sleep)
|
|
||||||
|
|
||||||
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
|
||||||
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
|
||||||
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
|
||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
|
||||||
|
|
||||||
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
|
|
||||||
@ -123,20 +114,9 @@ def test_backoff_on_connection_error(monkeypatch):
|
|||||||
reconnects.append("reconnect")
|
reconnects.append("reconnect")
|
||||||
def close(self): pass
|
def close(self): pass
|
||||||
|
|
||||||
def failing_fetch(db):
|
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: (_ for _ in ()).throw(RuntimeError("lost")))
|
||||||
raise RuntimeError("DB connection lost")
|
|
||||||
|
|
||||||
monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch)
|
monkeypatch.setattr(time, "sleep", lambda s: intervals.append(s))
|
||||||
|
|
||||||
def fake_sleep(seconds):
|
|
||||||
intervals.append(seconds)
|
|
||||||
|
|
||||||
monkeypatch.setattr(time, "sleep", fake_sleep)
|
|
||||||
|
|
||||||
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
|
||||||
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
|
||||||
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
|
||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
|
||||||
|
|
||||||
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
|
|
||||||
@ -147,4 +127,43 @@ def test_backoff_on_connection_error(monkeypatch):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert reconnects == ["reconnect", "reconnect"]
|
assert reconnects == ["reconnect", "reconnect"]
|
||||||
assert all(interval == 5 for interval in intervals)
|
assert intervals == [5, 5]
|
||||||
|
|
||||||
|
|
||||||
|
def test_backoff_enter_exit(monkeypatch):
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
class FakeDB:
|
||||||
|
def connect(self): pass
|
||||||
|
def close(self): pass
|
||||||
|
|
||||||
|
seq = [True, False, True]
|
||||||
|
|
||||||
|
def fake_fetch(db):
|
||||||
|
return make_task() if seq.pop(0) else None
|
||||||
|
|
||||||
|
import worker.poller as poller
|
||||||
|
|
||||||
|
monkeypatch.setattr("worker.poller.fetch_next_task", fake_fetch)
|
||||||
|
monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True)
|
||||||
|
|
||||||
|
async def fake_process_task(db, task):
|
||||||
|
return make_dummy_event()
|
||||||
|
|
||||||
|
monkeypatch.setattr("worker.poller.process_task", fake_process_task)
|
||||||
|
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda *a, **k: None)
|
||||||
|
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: None)
|
||||||
|
monkeypatch.setattr(time, "sleep", lambda s: None)
|
||||||
|
|
||||||
|
def record_hb(ts, in_backoff=False, error=None):
|
||||||
|
calls.append(in_backoff)
|
||||||
|
|
||||||
|
db = FakeDB()
|
||||||
|
poll_interval = 5
|
||||||
|
worker_id = "test-worker"
|
||||||
|
|
||||||
|
poller.run_iteration(db, worker_id, poll_interval, record_hb)
|
||||||
|
poller.run_iteration(db, worker_id, poll_interval, record_hb)
|
||||||
|
poller.run_iteration(db, worker_id, poll_interval, record_hb)
|
||||||
|
|
||||||
|
assert calls == [False, True, False]
|
||||||
|
|||||||
@ -15,20 +15,12 @@ from worker.processor import process_task
|
|||||||
from utils.logger import logger
|
from utils.logger import logger
|
||||||
from models.task import MetadataSearchTask, Task
|
from models.task import MetadataSearchTask, Task
|
||||||
|
|
||||||
# Viktig: dette er heartbeat_ref fra app.py
|
|
||||||
from app import set_heartbeat as heartbeat_ref
|
|
||||||
|
|
||||||
|
def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_ref) -> tuple[int, int]:
|
||||||
def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]:
|
|
||||||
"""
|
|
||||||
Kjør én iterasjon av poller-loopen.
|
|
||||||
Returnerer (sleep_interval, next_poll_interval).
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
task: Optional[Task] = fetch_next_task(db)
|
task: Optional[Task] = fetch_next_task(db)
|
||||||
|
|
||||||
if task:
|
if task:
|
||||||
# Worker er aktiv → ikke i backoff
|
|
||||||
heartbeat_ref(time.time(), in_backoff=False, error=None)
|
heartbeat_ref(time.time(), in_backoff=False, error=None)
|
||||||
|
|
||||||
if not isinstance(task, MetadataSearchTask):
|
if not isinstance(task, MetadataSearchTask):
|
||||||
@ -54,11 +46,9 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int
|
|||||||
mark_failed(db, str(task.taskId))
|
mark_failed(db, str(task.taskId))
|
||||||
heartbeat_ref(time.time(), in_backoff=False, error=str(task_error))
|
heartbeat_ref(time.time(), in_backoff=False, error=str(task_error))
|
||||||
|
|
||||||
# Etter en task → reset poll interval
|
|
||||||
return poll_interval, 5
|
return poll_interval, 5
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Ingen tasks → worker går i backoff
|
|
||||||
logger.debug("Ingen nye tasks.")
|
logger.debug("Ingen nye tasks.")
|
||||||
heartbeat_ref(time.time(), in_backoff=True, error=None)
|
heartbeat_ref(time.time(), in_backoff=True, error=None)
|
||||||
return poll_interval, min(poll_interval * 2, 60)
|
return poll_interval, min(poll_interval * 2, 60)
|
||||||
@ -70,12 +60,12 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int
|
|||||||
return poll_interval, 5
|
return poll_interval, 5
|
||||||
|
|
||||||
|
|
||||||
def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None:
|
def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None:
|
||||||
poll_interval: int = 5
|
poll_interval: int = 5
|
||||||
worker_id = f"PyMetadata-{uuid.uuid4()}"
|
worker_id = f"PyMetadata-{uuid.uuid4()}"
|
||||||
|
|
||||||
while not shutdown_flag_ref():
|
while not shutdown_flag_ref():
|
||||||
sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval)
|
sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval, heartbeat_ref)
|
||||||
time.sleep(sleep_interval)
|
time.sleep(sleep_interval)
|
||||||
|
|
||||||
logger.info("👋 run_worker loop avsluttet")
|
logger.info("👋 run_worker loop avsluttet")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user