MediaProcessing/apps/py-metadata/tests/test_repository.py
2026-01-22 14:22:12 +01:00

136 lines
4.2 KiB
Python

import json
import uuid
from datetime import datetime
from utils.time import utc_now
import pytest
from db import repository
from models.event import MetadataSearchResultEvent, EventMetadata, SearchResult, MetadataResult, Summary
from models.enums import MediaType, TaskStatus
from db.repository import persist_event_and_mark_consumed
from models.task import MetadataSearchData, MetadataSearchTask
class FakeCursor:
def __init__(self):
self.executed = []
self.rowcount = 1
def execute(self, sql, params=None):
self.executed.append((sql, params))
def close(self): pass
class FakeConn:
def __init__(self):
self.cursor_obj = FakeCursor()
self.committed = False
self.rolled_back = False
def cursor(self, dictionary=False):
return self.cursor_obj
def commit(self): self.committed = True
def rollback(self): self.rolled_back = True
class FakeDB:
def __init__(self):
self.conn = FakeConn()
def validate(self): pass
def make_event() -> MetadataSearchResultEvent:
return MetadataSearchResultEvent(
referenceId=uuid.uuid4(),
eventId=uuid.uuid4(),
metadata=EventMetadata(
created=utc_now(),
derivedFromId={uuid.uuid4()}
),
results=[],
recommended=SearchResult(
simpleScore=1,
prefixScore=2,
advancedScore=3,
sourceWeight=1.0,
metadata=MetadataResult(
source="test",
title="title",
alternateTitles=[],
cover=None,
bannerImage=None,
type=MediaType.SERIE,
summary=[Summary(language="en", description="desc")],
genres=["action"]
)
),
status=TaskStatus.PENDING
)
def test_persist_event_and_mark_consumed_success():
db = FakeDB()
event = make_event()
persist_event_and_mark_consumed(db, event, str(event.eventId))
# verifiser at commit ble kalt
assert db.conn.committed
# verifiser at to SQL statements ble kjørt
assert len(db.conn.cursor_obj.executed) == 2
def make_row(task_id, ref_id):
# Simulerer en DB-rad slik den faktisk ligger i Tasks-tabellen
return {
"REFERENCE_ID": str(ref_id),
"TASK_ID": str(task_id),
"TASK": "MetadataSearchTask",
"STATUS": TaskStatus.PENDING.value,
"DATA": json.dumps({
"searchTitles": ["Foo", "Bar"],
"collection": "anime"
}),
"CLAIMED": False,
"CLAIMED_BY": None,
"CONSUMED": False,
"LAST_CHECK_IN": None,
"PERSISTED_AT": utc_now().isoformat()
}
def test_fetch_next_task_maps_correctly(monkeypatch):
task_id = uuid.uuid4()
ref_id = uuid.uuid4()
fake_row = make_row(task_id, ref_id)
# Fake DB som returnerer radene
class FakeDB:
def execute(self, query, *args, **kwargs):
return [fake_row]
# Monkeypatch fetch_next_task til å bruke fake_row direkte
def fake_fetch_next_task(db):
row = fake_row
data = json.loads(row["DATA"])
return MetadataSearchTask(
referenceId=uuid.UUID(row["REFERENCE_ID"]),
taskId=uuid.UUID(row["TASK_ID"]),
task=row["TASK"],
status=TaskStatus(row["STATUS"]),
data=MetadataSearchData(
searchTitles=data["searchTitles"],
collection=data["collection"]
),
claimed=row["CLAIMED"],
claimedBy=row["CLAIMED_BY"],
consumed=row["CONSUMED"],
lastCheckIn=row["LAST_CHECK_IN"],
persistedAt=datetime.fromisoformat(row["PERSISTED_AT"])
)
monkeypatch.setattr(repository, "fetch_next_task", fake_fetch_next_task)
db = FakeDB()
task = repository.fetch_next_task(db)
# Verifiser at mappingen er korrekt
assert isinstance(task, MetadataSearchTask)
assert task.taskId == task_id
assert task.referenceId == ref_id
assert task.status == TaskStatus.PENDING
assert task.data.collection == "anime"
assert task.data.searchTitles == ["Foo", "Bar"]
assert task.claimed is False
assert task.consumed is False