pyMetadata changes
This commit is contained in:
parent
8e2489c31e
commit
66f5e12a51
3
apps/py-metadata/.vscode/settings.json
vendored
3
apps/py-metadata/.vscode/settings.json
vendored
@ -1,4 +1,5 @@
|
|||||||
{
|
{
|
||||||
"python.defaultInterpreterPath": "venv/bin/python",
|
"python.defaultInterpreterPath": "venv/bin/python",
|
||||||
"python.terminal.activateEnvironment": true
|
"python.terminal.activateEnvironment": true,
|
||||||
|
"python-envs.pythonProjects": []
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
# health_api.py
|
|
||||||
import time
|
import time
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
@ -21,6 +20,7 @@ async def health():
|
|||||||
worker_ok = False
|
worker_ok = False
|
||||||
db_error = None
|
db_error = None
|
||||||
worker_error = None
|
worker_error = None
|
||||||
|
in_backoff = None
|
||||||
|
|
||||||
# --- Database check ---
|
# --- Database check ---
|
||||||
try:
|
try:
|
||||||
@ -32,13 +32,17 @@ async def health():
|
|||||||
|
|
||||||
# --- Worker heartbeat check ---
|
# --- Worker heartbeat check ---
|
||||||
try:
|
try:
|
||||||
last = get_worker_heartbeat()
|
hb = get_worker_heartbeat()
|
||||||
|
last = hb["ts"]
|
||||||
|
in_backoff = hb["inBackoff"]
|
||||||
|
worker_error = hb["error"]
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
diff = now - last
|
diff = now - last
|
||||||
|
|
||||||
worker_ok = diff < 90 # 90 sekunder toleranse pga at worker kan være inaktiv ved lav belastning
|
# Worker må ha iterert siste 90 sekunder
|
||||||
|
worker_ok = diff < 90 and worker_error is None
|
||||||
|
|
||||||
# Hvis worker er false og ingen exception ble kastet → legg diff i worker_error
|
|
||||||
if not worker_ok and worker_error is None:
|
if not worker_ok and worker_error is None:
|
||||||
worker_error = f"Heartbeat too old: {diff:.2f}s"
|
worker_error = f"Heartbeat too old: {diff:.2f}s"
|
||||||
|
|
||||||
@ -55,6 +59,7 @@ async def health():
|
|||||||
"database": db_ok,
|
"database": db_ok,
|
||||||
"database_error": db_error,
|
"database_error": db_error,
|
||||||
"worker": worker_ok,
|
"worker": worker_ok,
|
||||||
"worker_error": worker_error
|
"worker_error": worker_error,
|
||||||
|
"worker_in_backoff": in_backoff
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@ -11,7 +11,11 @@ from utils.logger import logger
|
|||||||
from worker.poller import run_worker
|
from worker.poller import run_worker
|
||||||
|
|
||||||
shutdown_flag = False
|
shutdown_flag = False
|
||||||
|
|
||||||
|
# Heartbeat state
|
||||||
worker_heartbeat = time.time()
|
worker_heartbeat = time.time()
|
||||||
|
worker_in_backoff = False
|
||||||
|
worker_error = None
|
||||||
|
|
||||||
|
|
||||||
def handle_shutdown(signum, frame):
|
def handle_shutdown(signum, frame):
|
||||||
@ -20,27 +24,25 @@ def handle_shutdown(signum, frame):
|
|||||||
shutdown_flag = True
|
shutdown_flag = True
|
||||||
|
|
||||||
|
|
||||||
def set_heartbeat(ts):
|
def set_heartbeat(ts, in_backoff=False, error=None):
|
||||||
global worker_heartbeat
|
global worker_heartbeat, worker_in_backoff, worker_error
|
||||||
worker_heartbeat = ts
|
worker_heartbeat = ts
|
||||||
|
worker_in_backoff = in_backoff
|
||||||
|
worker_error = error
|
||||||
|
|
||||||
|
|
||||||
def get_heartbeat():
|
def get_heartbeat():
|
||||||
return worker_heartbeat
|
return {
|
||||||
|
"ts": worker_heartbeat,
|
||||||
|
"inBackoff": worker_in_backoff,
|
||||||
|
"error": worker_error
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def start_health_server():
|
def start_health_server():
|
||||||
uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="error")
|
uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="error")
|
||||||
|
|
||||||
|
|
||||||
def start_worker(db):
|
|
||||||
run_worker(
|
|
||||||
db=db,
|
|
||||||
shutdown_flag_ref=lambda: shutdown_flag,
|
|
||||||
heartbeat_ref=lambda ts: set_heartbeat(ts)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
signal.signal(signal.SIGINT, handle_shutdown)
|
signal.signal(signal.SIGINT, handle_shutdown)
|
||||||
signal.signal(signal.SIGTERM, handle_shutdown)
|
signal.signal(signal.SIGTERM, handle_shutdown)
|
||||||
@ -59,13 +61,13 @@ def main():
|
|||||||
Thread(target=start_health_server, daemon=True).start()
|
Thread(target=start_health_server, daemon=True).start()
|
||||||
logger.info("🌡️ Health API startet på port 8080")
|
logger.info("🌡️ Health API startet på port 8080")
|
||||||
|
|
||||||
# Start worker i egen tråd
|
# Worker kjører i main-tråden
|
||||||
Thread(target=start_worker, args=(db,), daemon=True).start()
|
logger.info("🔁 Starter worker i main-tråden")
|
||||||
logger.info("🔁 Worker startet i egen tråd")
|
run_worker(
|
||||||
|
db=db,
|
||||||
# Hold main-tråden i live
|
shutdown_flag_ref=lambda: shutdown_flag,
|
||||||
while not shutdown_flag:
|
heartbeat_ref=set_heartbeat
|
||||||
time.sleep(1)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Kritisk feil i app: {e}")
|
logger.error(f"❌ Kritisk feil i app: {e}")
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import uuid
|
|||||||
from utils.time import utc_now
|
from utils.time import utc_now
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
def make_dummy_event():
|
def make_dummy_event():
|
||||||
return MetadataSearchResultEvent(
|
return MetadataSearchResultEvent(
|
||||||
referenceId=uuid.uuid4(),
|
referenceId=uuid.uuid4(),
|
||||||
@ -18,8 +19,8 @@ def make_dummy_event():
|
|||||||
),
|
),
|
||||||
results=[],
|
results=[],
|
||||||
persistedAt=utc_now(),
|
persistedAt=utc_now(),
|
||||||
recommended=None, # fyll inn med en gyldig bool
|
recommended=None,
|
||||||
status="Completed" # eller enum hvis modellen krever det
|
status="Completed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -37,6 +38,7 @@ def make_task():
|
|||||||
persistedAt=utc_now()
|
persistedAt=utc_now()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_run_worker_processes_one(monkeypatch):
|
def test_run_worker_processes_one(monkeypatch):
|
||||||
events = []
|
events = []
|
||||||
task = make_task()
|
task = make_task()
|
||||||
@ -46,6 +48,7 @@ def test_run_worker_processes_one(monkeypatch):
|
|||||||
def close(self): pass
|
def close(self): pass
|
||||||
|
|
||||||
calls = {"n": 0}
|
calls = {"n": 0}
|
||||||
|
|
||||||
def fetch_once(db):
|
def fetch_once(db):
|
||||||
if calls["n"] == 0:
|
if calls["n"] == 0:
|
||||||
calls["n"] += 1
|
calls["n"] += 1
|
||||||
@ -56,25 +59,31 @@ def test_run_worker_processes_one(monkeypatch):
|
|||||||
monkeypatch.setattr("worker.poller.fetch_next_task", fetch_once)
|
monkeypatch.setattr("worker.poller.fetch_next_task", fetch_once)
|
||||||
monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True)
|
monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True)
|
||||||
|
|
||||||
# Viktig: async stub
|
|
||||||
async def fake_process_task(db, task):
|
async def fake_process_task(db, task):
|
||||||
return make_dummy_event()
|
return make_dummy_event()
|
||||||
|
|
||||||
monkeypatch.setattr("worker.poller.process_task", fake_process_task)
|
monkeypatch.setattr("worker.poller.process_task", fake_process_task)
|
||||||
|
|
||||||
def persist_stub(db, event, task_id):
|
def persist_stub(db, event, task_id):
|
||||||
events.append("dummy_event")
|
events.append("dummy_event")
|
||||||
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub)
|
|
||||||
|
|
||||||
|
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub)
|
||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed"))
|
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed"))
|
||||||
monkeypatch.setattr("worker.poller.time.sleep", lambda s: None)
|
monkeypatch.setattr("worker.poller.time.sleep", lambda s: None)
|
||||||
|
|
||||||
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: calls["n"] >= 2)
|
# NEW: dummy heartbeat
|
||||||
|
monkeypatch.setattr("worker.poller.time.time", lambda: 123)
|
||||||
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
|
|
||||||
|
run_worker(
|
||||||
|
db=FakeDB(),
|
||||||
|
shutdown_flag_ref=lambda: calls["n"] >= 2,
|
||||||
|
heartbeat_ref=dummy_hb
|
||||||
|
)
|
||||||
|
|
||||||
assert "dummy_event" in events
|
assert "dummy_event" in events
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_backoff(monkeypatch):
|
def test_backoff(monkeypatch):
|
||||||
intervals = []
|
intervals = []
|
||||||
|
|
||||||
@ -82,26 +91,29 @@ def test_backoff(monkeypatch):
|
|||||||
def connect(self): pass
|
def connect(self): pass
|
||||||
def close(self): pass
|
def close(self): pass
|
||||||
|
|
||||||
# monkeypatch fetch_next_task til å returnere None flere ganger
|
|
||||||
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None)
|
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None)
|
||||||
|
|
||||||
# monkeypatch time.sleep til å fange poll_interval
|
|
||||||
def fake_sleep(seconds):
|
def fake_sleep(seconds):
|
||||||
intervals.append(seconds)
|
intervals.append(seconds)
|
||||||
|
|
||||||
monkeypatch.setattr(time, "sleep", fake_sleep)
|
monkeypatch.setattr(time, "sleep", fake_sleep)
|
||||||
|
|
||||||
# monkeypatch claim_task, process_task osv. til dummy
|
|
||||||
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
||||||
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
||||||
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
||||||
|
|
||||||
# kjør bare noen få iterasjoner ved å stoppe med shutdown_flag_ref
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(intervals) >= 4)
|
|
||||||
|
run_worker(
|
||||||
|
db=FakeDB(),
|
||||||
|
shutdown_flag_ref=lambda: len(intervals) >= 4,
|
||||||
|
heartbeat_ref=dummy_hb
|
||||||
|
)
|
||||||
|
|
||||||
# verifiser at intervallet øker (5 → 10 → 20 → 40)
|
|
||||||
assert intervals == [5, 10, 20, 40]
|
assert intervals == [5, 10, 20, 40]
|
||||||
|
|
||||||
|
|
||||||
def test_backoff_on_connection_error(monkeypatch):
|
def test_backoff_on_connection_error(monkeypatch):
|
||||||
intervals = []
|
intervals = []
|
||||||
reconnects = []
|
reconnects = []
|
||||||
@ -111,28 +123,28 @@ def test_backoff_on_connection_error(monkeypatch):
|
|||||||
reconnects.append("reconnect")
|
reconnects.append("reconnect")
|
||||||
def close(self): pass
|
def close(self): pass
|
||||||
|
|
||||||
# Først: fetch_next_task kaster exception
|
|
||||||
def failing_fetch(db):
|
def failing_fetch(db):
|
||||||
raise RuntimeError("DB connection lost")
|
raise RuntimeError("DB connection lost")
|
||||||
|
|
||||||
monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch)
|
monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch)
|
||||||
|
|
||||||
# monkeypatch time.sleep til å fange poll_interval
|
|
||||||
def fake_sleep(seconds):
|
def fake_sleep(seconds):
|
||||||
intervals.append(seconds)
|
intervals.append(seconds)
|
||||||
|
|
||||||
monkeypatch.setattr(time, "sleep", fake_sleep)
|
monkeypatch.setattr(time, "sleep", fake_sleep)
|
||||||
|
|
||||||
# dummy funksjoner
|
|
||||||
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
|
||||||
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
|
||||||
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
|
||||||
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
|
||||||
|
|
||||||
# kjør bare noen få iterasjoner
|
dummy_hb = lambda ts, in_backoff=False, error=None: None
|
||||||
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(reconnects) >= 2)
|
|
||||||
|
run_worker(
|
||||||
|
db=FakeDB(),
|
||||||
|
shutdown_flag_ref=lambda: len(reconnects) >= 2,
|
||||||
|
heartbeat_ref=dummy_hb
|
||||||
|
)
|
||||||
|
|
||||||
# verifiser at reconnect ble kalt
|
|
||||||
assert reconnects == ["reconnect", "reconnect"]
|
assert reconnects == ["reconnect", "reconnect"]
|
||||||
|
|
||||||
# verifiser at poll_interval ble reset til 5 etter feil
|
|
||||||
assert all(interval == 5 for interval in intervals)
|
assert all(interval == 5 for interval in intervals)
|
||||||
|
|||||||
@ -7,17 +7,21 @@ from db.repository import claim_task, fetch_next_task, mark_failed, persist_even
|
|||||||
from models.event import MetadataSearchResultEvent
|
from models.event import MetadataSearchResultEvent
|
||||||
from worker.processor import process_task
|
from worker.processor import process_task
|
||||||
from utils.logger import logger
|
from utils.logger import logger
|
||||||
from config.database_config import DatabaseConfig
|
|
||||||
from models.task import MetadataSearchTask, Task
|
from models.task import MetadataSearchTask, Task
|
||||||
|
|
||||||
def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]:
|
|
||||||
|
def run_iteration(db: Database, worker_id: str, poll_interval: int, heartbeat_ref) -> tuple[int, int]:
|
||||||
"""
|
"""
|
||||||
Kjør én iterasjon av poller-loopen.
|
Kjør én iterasjon av poller-loopen.
|
||||||
Returnerer (sleep_interval, next_interval).
|
Returnerer (sleep_interval, next_interval).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
task: Optional[Task] = fetch_next_task(db)
|
task: Optional[Task] = fetch_next_task(db)
|
||||||
|
|
||||||
if task:
|
if task:
|
||||||
|
# Poller er aktiv → ikke i backoff
|
||||||
|
heartbeat_ref(time.time(), in_backoff=False, error=None)
|
||||||
|
|
||||||
if not isinstance(task, MetadataSearchTask):
|
if not isinstance(task, MetadataSearchTask):
|
||||||
logger.warning(f"⚠️ Ukjent task-type {type(task)} for {task.taskId}, hopper over.")
|
logger.warning(f"⚠️ Ukjent task-type {type(task)} for {task.taskId}, hopper over.")
|
||||||
return poll_interval, poll_interval
|
return poll_interval, poll_interval
|
||||||
@ -27,36 +31,42 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int
|
|||||||
return poll_interval, poll_interval
|
return poll_interval, poll_interval
|
||||||
|
|
||||||
logger.info(f"🔔 Fant task {task.taskId} ({task.task}), claimed by {worker_id}")
|
logger.info(f"🔔 Fant task {task.taskId} ({task.task}), claimed by {worker_id}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
event: MetadataSearchResultEvent = asyncio.run(process_task(db, task))
|
event: MetadataSearchResultEvent = asyncio.run(process_task(db, task))
|
||||||
if event:
|
if event:
|
||||||
persist_event_and_mark_consumed(db, event, str(task.taskId))
|
persist_event_and_mark_consumed(db, event, str(task.taskId))
|
||||||
logger.info(f"✅ Task {task.taskId} ferdig prosessert")
|
logger.info(f"✅ Task {task.taskId} ferdig prosessert")
|
||||||
else:
|
else:
|
||||||
logger.error(f"❌ Task returned nothing! {task.taskId}")
|
|
||||||
raise RuntimeError("process_task returned nothing!")
|
raise RuntimeError("process_task returned nothing!")
|
||||||
|
|
||||||
except Exception as task_error:
|
except Exception as task_error:
|
||||||
logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}")
|
logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}")
|
||||||
mark_failed(db, str(task.taskId))
|
mark_failed(db, str(task.taskId))
|
||||||
return poll_interval, 5 # sov med nåværende, reset til 5
|
heartbeat_ref(time.time(), in_backoff=False, error=str(task_error))
|
||||||
|
|
||||||
|
return poll_interval, 5 # reset interval
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# Ingen tasks → poller er i backoff
|
||||||
logger.debug("Ingen nye tasks.")
|
logger.debug("Ingen nye tasks.")
|
||||||
|
heartbeat_ref(time.time(), in_backoff=True, error=None)
|
||||||
return poll_interval, min(poll_interval * 2, 60)
|
return poll_interval, min(poll_interval * 2, 60)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"⚠️ Feil i worker: {e}")
|
logger.error(f"⚠️ Feil i worker: {e}")
|
||||||
db.connect()
|
db.connect()
|
||||||
|
heartbeat_ref(time.time(), in_backoff=True, error=str(e))
|
||||||
return poll_interval, 5
|
return poll_interval, 5
|
||||||
|
|
||||||
|
|
||||||
def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None:
|
def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None:
|
||||||
poll_interval: int = 5
|
poll_interval: int = 5
|
||||||
worker_id = f"PyMetadata-{uuid.uuid4()}"
|
worker_id = f"PyMetadata-{uuid.uuid4()}"
|
||||||
|
|
||||||
while not shutdown_flag_ref():
|
while not shutdown_flag_ref():
|
||||||
if heartbeat_ref:
|
sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval, heartbeat_ref)
|
||||||
heartbeat_ref(time.time())
|
|
||||||
|
|
||||||
sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval)
|
|
||||||
time.sleep(sleep_interval)
|
time.sleep(sleep_interval)
|
||||||
|
|
||||||
logger.info("👋 run_worker loop avsluttet")
|
logger.info("👋 run_worker loop avsluttet")
|
||||||
db.close()
|
db.close()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user