diff --git a/apps/py-metadata/api/health_api.py b/apps/py-metadata/api/health_api.py index bce2cc14..60854541 100644 --- a/apps/py-metadata/api/health_api.py +++ b/apps/py-metadata/api/health_api.py @@ -20,7 +20,6 @@ async def health(): worker_ok = False db_error = None worker_error = None - in_backoff = None # --- Database check --- try: @@ -30,18 +29,18 @@ async def health(): db_ok = False db_error = str(e) - # --- Worker heartbeat check --- + # --- Worker check --- try: hb = get_worker_heartbeat() + last = hb["ts"] in_backoff = hb["inBackoff"] worker_error = hb["error"] now = time.time() - diff = now - last + diff = now - last if last else None - # Worker må ha iterert siste 90 sekunder - worker_ok = diff < 90 and worker_error is None + worker_ok = diff is not None and diff < 90 and worker_error is None if not worker_ok and worker_error is None: worker_error = f"Heartbeat too old: {diff:.2f}s" @@ -49,6 +48,7 @@ async def health(): except Exception as e: worker_ok = False worker_error = str(e) + hb = {} status = db_ok and worker_ok @@ -60,6 +60,6 @@ async def health(): "database_error": db_error, "worker": worker_ok, "worker_error": worker_error, - "worker_in_backoff": in_backoff + **hb # inkluderer alle backoff-feltene } ) diff --git a/apps/py-metadata/app.py b/apps/py-metadata/app.py index 4f115cf4..5a4ccbd6 100644 --- a/apps/py-metadata/app.py +++ b/apps/py-metadata/app.py @@ -1,6 +1,7 @@ import signal import sys import time +from datetime import datetime from threading import Thread import uvicorn @@ -12,11 +13,16 @@ from worker.poller import run_worker shutdown_flag = False -# Heartbeat state -worker_heartbeat = time.time() -worker_in_backoff = False +# Heartbeat state (nå med full backoff-tracking) +worker_heartbeat = None +worker_in_backoff = None worker_error = None +backoff_entered_at = None +backoff_exited_at = None +backoff_entered_human = None +backoff_exited_human = None + def handle_shutdown(signum, frame): global shutdown_flag @@ -25,17 +31,56 @@ def handle_shutdown(signum, frame): def set_heartbeat(ts, in_backoff=False, error=None): + """ + Oppdaterer worker-state, inkludert når backoff starter og slutter. + """ global worker_heartbeat, worker_in_backoff, worker_error + global backoff_entered_at, backoff_exited_at + global backoff_entered_human, backoff_exited_human + + prev = worker_in_backoff + + # Går INN i backoff + if in_backoff and prev is False: + backoff_entered_at = ts + backoff_entered_human = datetime.fromtimestamp(ts).isoformat() + + # Går UT av backoff + if not in_backoff and prev is True: + backoff_exited_at = ts + backoff_exited_human = datetime.fromtimestamp(ts).isoformat() + worker_heartbeat = ts worker_in_backoff = in_backoff worker_error = error def get_heartbeat(): + """ + Returnerer all metadata health_api trenger. + """ + now = time.time() + + # Beregn varighet i backoff + if backoff_entered_at and not backoff_exited_at: + duration = now - backoff_entered_at + elif backoff_entered_at and backoff_exited_at: + duration = backoff_exited_at - backoff_entered_at + else: + duration = None + + duration_human = f"{duration:.2f}s" if duration else None + return { "ts": worker_heartbeat, "inBackoff": worker_in_backoff, - "error": worker_error + "error": worker_error, + "backoffEnteredAt": backoff_entered_at, + "backoffExitedAt": backoff_exited_at, + "backoffEnteredHuman": backoff_entered_human, + "backoffExitedHuman": backoff_exited_human, + "backoffDurationSeconds": duration, + "backoffDurationHuman": duration_human, } diff --git a/apps/py-metadata/worker/poller.py b/apps/py-metadata/worker/poller.py index 417c2546..d8ed1860 100644 --- a/apps/py-metadata/worker/poller.py +++ b/apps/py-metadata/worker/poller.py @@ -1,25 +1,34 @@ import asyncio import time -from typing import Optional import uuid +from typing import Optional + from db.database import Database -from db.repository import claim_task, fetch_next_task, mark_failed, persist_event_and_mark_consumed +from db.repository import ( + claim_task, + fetch_next_task, + mark_failed, + persist_event_and_mark_consumed +) from models.event import MetadataSearchResultEvent from worker.processor import process_task from utils.logger import logger from models.task import MetadataSearchTask, Task +# Viktig: dette er heartbeat_ref fra app.py +from app import set_heartbeat as heartbeat_ref -def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_ref) -> tuple[int, int]: + +def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]: """ Kjør én iterasjon av poller-loopen. - Returnerer (sleep_interval, next_interval). + Returnerer (sleep_interval, next_poll_interval). """ try: task: Optional[Task] = fetch_next_task(db) if task: - # Poller er aktiv → ikke i backoff + # Worker er aktiv → ikke i backoff heartbeat_ref(time.time(), in_backoff=False, error=None) if not isinstance(task, MetadataSearchTask): @@ -41,14 +50,15 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_re raise RuntimeError("process_task returned nothing!") except Exception as task_error: - logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}") + logger.error(f"❌ Task {task.taskId} feilet: {task_error}") mark_failed(db, str(task.taskId)) heartbeat_ref(time.time(), in_backoff=False, error=str(task_error)) - return poll_interval, 5 # reset interval + # Etter en task → reset poll interval + return poll_interval, 5 else: - # Ingen tasks → poller er i backoff + # Ingen tasks → worker går i backoff logger.debug("Ingen nye tasks.") heartbeat_ref(time.time(), in_backoff=True, error=None) return poll_interval, min(poll_interval * 2, 60) @@ -60,12 +70,12 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_re return poll_interval, 5 -def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None: +def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None: poll_interval: int = 5 worker_id = f"PyMetadata-{uuid.uuid4()}" while not shutdown_flag_ref(): - sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval, heartbeat_ref) + sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval) time.sleep(sleep_interval) logger.info("👋 run_worker loop avsluttet")