diff --git a/apps/py-metadata/db/repository.py b/apps/py-metadata/db/repository.py index c325c475..a36c70c9 100644 --- a/apps/py-metadata/db/repository.py +++ b/apps/py-metadata/db/repository.py @@ -1,17 +1,19 @@ import json from typing import Optional +from uuid import UUID from db.database import Database from models.enums import TaskStatus from models.event import MetadataSearchResultEvent -from models.task import MetadataSearchPayload, Task, MetadataSearchTask, MetadataSearchData +from models.task import MetadataSearchPayload, MetadataSearchTask from utils.logger import logger from utils.time import parse_mysql_ts, utc_now -def fetch_next_task(db: Database) -> Optional[Task]: +def fetch_next_task(db: Database) -> Optional[MetadataSearchTask]: db.validate() cursor = db.conn.cursor(dictionary=True) cursor.execute( - "SELECT * FROM TASKS WHERE TASK='MetadataSearchTask' AND STATUS='Pending' AND CLAIMED=0 AND CONSUMED=0 " + "SELECT * FROM TASKS " + "WHERE TASK='MetadataSearchTask' AND STATUS='Pending' AND CLAIMED=0 AND CONSUMED=0 " "ORDER BY PERSISTED_AT ASC LIMIT 1" ) row = cursor.fetchone() @@ -19,29 +21,41 @@ def fetch_next_task(db: Database) -> Optional[Task]: return None try: - if row["TASK"] == "MetadataSearchTask": - # hele JSON ligger i DATA - payload = MetadataSearchPayload.model_validate_json(row["DATA"]) - return MetadataSearchTask( - referenceId=row["REFERENCE_ID"], - taskId=row["TASK_ID"], - task=row["TASK"], - status=row["STATUS"], - data=payload.data, - claimed=row["CLAIMED"], - claimedBy=row["CLAIMED_BY"], - consumed=row["CONSUMED"], - lastCheckIn=parse_mysql_ts(row["LAST_CHECK_IN"]), - persistedAt=parse_mysql_ts(row["PERSISTED_AT"]) - ) - else: - return None - except Exception as e: - logger.error(f"❌ Feil ved deserialisering av task {row.get('TASK_ID')}: {e}") + # 1. Parse payload (JSON → Pydantic) + payload = MetadataSearchPayload.model_validate_json(row["DATA"]) + + # 2. Parse UUIDs (DB gir str → modellen krever UUID) + reference_id = UUID(row["REFERENCE_ID"]) + task_id = UUID(row["TASK_ID"]) + + # 3. Parse enum (DB gir str → modellen krever TaskStatus) + status = TaskStatus(row["STATUS"]) + + # 4. Parse timestamps (DB gir str → modellen krever datetime) + last_check_in = parse_mysql_ts(row["LAST_CHECK_IN"]) + persisted_at = parse_mysql_ts(row["PERSISTED_AT"]) + + # 5. Bygg modellen med riktige typer + return MetadataSearchTask( + referenceId=reference_id, + taskId=task_id, + task=row["TASK"], + status=status, + data=payload.data, + claimed=row["CLAIMED"], + claimedBy=row["CLAIMED_BY"], + consumed=row["CONSUMED"], + lastCheckIn=last_check_in, + persistedAt=persisted_at + ) + + except Exception: + logger.exception(f"❌ Feil ved deserialisering av task {row.get('TASK_ID')}") mark_failed(db, row["TASK_ID"]) return None + def mark_failed(db: Database, task_id: str) -> None: cursor = db.conn.cursor() cursor.execute( diff --git a/apps/py-metadata/utils/time.py b/apps/py-metadata/utils/time.py index 3445b7f2..0b8baf19 100644 --- a/apps/py-metadata/utils/time.py +++ b/apps/py-metadata/utils/time.py @@ -8,3 +8,4 @@ def parse_mysql_ts(value): if value is None: return None return datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc) +