diff --git a/apps/pyMetadata/config/__init__.py b/apps/pyMetadata/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pyMetadata/config/database_config.py b/apps/pyMetadata/config/database_config.py new file mode 100644 index 00000000..9bb43be4 --- /dev/null +++ b/apps/pyMetadata/config/database_config.py @@ -0,0 +1,29 @@ +import os +from dataclasses import dataclass + +@dataclass +class DatabaseConfig: + address: str + port: int + name: str + username: str + password: str + + @staticmethod + def from_env() -> "DatabaseConfig": + return DatabaseConfig( + address=os.environ.get("DATABASE_ADDRESS") or "192.168.2.250", + port=int(os.environ.get("DATABASE_PORT") or "3306"), + name=os.environ.get("DATABASE_NAME_E") or "eventsV3", + username=os.environ.get("DATABASE_USERNAME") or "root", + password=os.environ.get("DATABASE_PASSWORD") or "def", + ) + + def validate(self) -> None: + if not self.address: + raise ValueError("Database address mangler") + if not self.name: + raise ValueError("Database name mangler") + if not self.username: + raise ValueError("Database username mangler") + # du kan legge til flere regler her diff --git a/apps/pyMetadata/db/__init__.py b/apps/pyMetadata/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pyMetadata/db/database.py b/apps/pyMetadata/db/database.py new file mode 100644 index 00000000..89340260 --- /dev/null +++ b/apps/pyMetadata/db/database.py @@ -0,0 +1,53 @@ +from config.database_config import DatabaseConfig +from utils.logger import logger +import mysql.connector +from mysql.connector import Error +from utils.backoff import wait_with_backoff + +class Database: + def __init__(self, config: DatabaseConfig): + self.config = config + self.conn = None + + def connect(self): + """Koble til DB med backoff.""" + self.config.validate() + while True: + try: + self.conn = mysql.connector.connect( + host=self.config.address, + user=self.config.username, + password=self.config.password, + database=self.config.name + ) + if self.conn.is_connected(): + logger.info("✅ Tilkoblet til databasen") + return + except Error as e: + logger.error(f"❌ DB-tilkobling feilet: {e}") + for _ in wait_with_backoff(): + try: + self.conn = mysql.connector.connect( + host=self.config.address, + user=self.config.username, + password=self.config.password, + database=self.config.name + ) + if self.conn.is_connected(): + logger.info("✅ Tilkoblet til databasen") + return + except Error: + continue + + def validate(self): + """Sjekk at tilkoblingen er aktiv.""" + if not self.conn or not self.conn.is_connected(): + logger.warning("⚠️ Tilkobling mistet, prøver igjen...") + self.connect() + + def query(self, sql: str, params=None): + """Kjør en spørring med validering.""" + self.validate() + cursor = self.conn.cursor(dictionary=True) + cursor.execute(sql, params or ()) + return cursor.fetchall() diff --git a/apps/pyMetadata/db/repository.py b/apps/pyMetadata/db/repository.py new file mode 100644 index 00000000..d99b1afb --- /dev/null +++ b/apps/pyMetadata/db/repository.py @@ -0,0 +1,108 @@ +from datetime import datetime +import json +from typing import Optional +from db.database import Database +from models.enums import TaskStatus +from models.event import MetadataSearchResultEvent +from models.task import Task, MetadataSearchTask, MetadataSearchData +from utils.logger import logger + +def fetch_next_task(db: Database) -> Optional[Task]: + db.validate() + cursor = db.conn.cursor(dictionary=True) + cursor.execute( + "SELECT * FROM TASKS WHERE STATUS='Pending' AND CLAIMED=0 AND CONSUMED=0 " + "ORDER BY PERSISTED_AT ASC LIMIT 1" + ) + row = cursor.fetchone() + if not row: + return None + + try: + if row["TASK"] == "MetadataSearchTask": + # hele JSON ligger i DATA + return MetadataSearchTask.model_validate_json(row["DATA"]) + else: + return Task.model_validate_json(row["DATA"]) + except Exception as e: + logger.error(f"❌ Feil ved deserialisering av task {row.get('TASK_ID')}: {e}") + mark_failed(db, row["TASK_ID"]) + return None + + +def mark_failed(db: Database, task_id: str) -> None: + cursor = db.conn.cursor() + cursor.execute( + "UPDATE TASKS SET STATUS='Failed' WHERE TASK_ID=%s", + (task_id,) + ) + db.conn.commit() + +def claim_task(db: Database, task_id: str, worker_id: str) -> bool: + """ + Marker en task som claimed av en gitt worker. + Returnerer True hvis claim lykkes, False hvis task allerede er claimed. + """ + db.validate() + try: + cursor = db.conn.cursor() + # Oppdater bare hvis task ikke allerede er claimed + cursor.execute( + """ + UPDATE TASKS + SET CLAIMED=1, CLAIMED_BY=%s, LAST_CHECK_IN=%s + WHERE TASK_ID=%s AND CLAIMED=0 AND CONSUMED=0 + """, + (worker_id, datetime.now(), task_id) + ) + db.conn.commit() + return cursor.rowcount > 0 + except Exception as e: + db.conn.rollback() + raise RuntimeError(f"Claim feilet: {e}") + + + +def persist_event_and_mark_consumed(db: Database, event: MetadataSearchResultEvent, task_id: str) -> None: + """ + Persisterer et event og markerer tilhørende task som consumed i én transaksjon. + Ruller tilbake hvis noe feiler. + """ + db.validate() + try: + cursor = db.conn.cursor() + + # 1. Insert event + as_data = event.model_dump_json() # Pydantic v2 + event_name = event.__class__.__name__ + + cursor.execute( + """ + INSERT INTO EVENTS (REFERENCE_ID, EVENT_ID, EVENT, DATA, PERSISTED_AT) + VALUES (%s, %s, %s, %s, %s) + """, + ( + str(event.referenceId), + str(event.eventId), + event_name, + as_data, + datetime.now().isoformat() + ) + ) + + # 2. Update task status + cursor.execute( + "UPDATE TASKS SET STATUS=%s, CONSUMED=1 WHERE TASK_ID=%s", + (TaskStatus.COMPLETED.value, task_id) + ) + + # 3. Commit begge operasjoner + db.conn.commit() + + except Exception as e: + # Rull tilbake hvis noe feiler + db.conn.rollback() + raise RuntimeError(f"Transaksjon feilet: {e}") + + + diff --git a/apps/pyMetadata/models/__init__.py b/apps/pyMetadata/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pyMetadata/models/enums.py b/apps/pyMetadata/models/enums.py new file mode 100644 index 00000000..1bfc6363 --- /dev/null +++ b/apps/pyMetadata/models/enums.py @@ -0,0 +1,11 @@ +from enum import Enum + +class TaskStatus(Enum): + PENDING = "Pending" + IN_PROGRESS = "InProgress" + COMPLETED = "Completed" + FAILED = "Failed" + +class MediaType(Enum): + MOVIE = "Movie" + SERIE = "Serie" diff --git a/apps/pyMetadata/models/event.py b/apps/pyMetadata/models/event.py new file mode 100644 index 00000000..f5752692 --- /dev/null +++ b/apps/pyMetadata/models/event.py @@ -0,0 +1,48 @@ +# models/event.py +from pydantic import BaseModel +from datetime import datetime +from typing import List, Set +from uuid import UUID + +from models.enums import MediaType, TaskStatus + + +class EventMetadata(BaseModel): + created: datetime + derivedFromId: Set[UUID] # nøyaktig feltnavn + + +class Summary(BaseModel): + language: str + description: str + + +class MetadataResult(BaseModel): + source: str + title: str + alternateTitles: List[str] + cover: str | None + bannerImage: str | None # behold camelCase + type: MediaType + summary: List[Summary] + genres: List[str] + + +class SearchResult(BaseModel): + simpleScore: int + prefixScore: int + advancedScore: int + sourceWeight: float + metadata: MetadataResult + + +class MetadataSearchResultEvent(BaseModel): + # Påkrevde felter + referenceId: UUID + eventId: UUID + metadata: EventMetadata + + # Custom felter + results: List[SearchResult] + recommended: SearchResult|None + status: TaskStatus diff --git a/apps/pyMetadata/models/metadata.py b/apps/pyMetadata/models/metadata.py new file mode 100644 index 00000000..b9a178e8 --- /dev/null +++ b/apps/pyMetadata/models/metadata.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, asdict +from typing import List, Optional +from enum import Enum + +from models.enums import MediaType + + +@dataclass +class Summary: + summary: str + language: str + + def to_dict(self): + return {k: v.strip() if isinstance(v, str) else v for k, v in asdict(self).items()} + +@dataclass +class Metadata: + title: str + altTitle: List[str] + cover: str + banner: Optional[str] + type: MediaType + summary: List[Summary] + genres: List[str] + source: str + + def to_dict(self): + def trim(item): + if isinstance(item, str): + return item.strip() + elif isinstance(item, list): + return [trim(sub_item) for sub_item in item] + elif isinstance(item, Enum): + return item.value + elif hasattr(item, "to_dict"): + return item.to_dict() + return item + + return {key: trim(value) for key, value in asdict(self).items()} diff --git a/apps/pyMetadata/models/task.py b/apps/pyMetadata/models/task.py new file mode 100644 index 00000000..c0a9d28b --- /dev/null +++ b/apps/pyMetadata/models/task.py @@ -0,0 +1,28 @@ +# models/task.py +from pydantic import BaseModel +from uuid import UUID +from datetime import datetime +from typing import List, Optional +from models.enums import TaskStatus + + +class MetadataSearchData(BaseModel): + searchTitles: List[str] + collection: str + + +class Task(BaseModel): + referenceId: UUID + taskId: UUID + task: str + status: TaskStatus + data: dict # generisk payload hvis du ikke vet typen + claimed: bool + claimedBy: Optional[str] + consumed: bool + lastCheckIn: Optional[datetime] + persistedAt: datetime + + +class MetadataSearchTask(Task): + data: MetadataSearchData diff --git a/apps/pyMetadata/requirments-test.txt b/apps/pyMetadata/requirments-test.txt new file mode 100644 index 00000000..bf6276f4 --- /dev/null +++ b/apps/pyMetadata/requirments-test.txt @@ -0,0 +1,2 @@ +pytest==9.0.2 +pytest-asyncio==1.3.0 \ No newline at end of file diff --git a/apps/pyMetadata/sources/factory.py b/apps/pyMetadata/sources/factory.py new file mode 100644 index 00000000..739218ff --- /dev/null +++ b/apps/pyMetadata/sources/factory.py @@ -0,0 +1,15 @@ +from typing import List +from .mal import Mal +from .anii import Anii +from .imdb import Imdb +from .source import SourceBase + +def get_all_sources(titles: List[str]) -> List[SourceBase]: + """ + Returnerer alle aktive kilder som implementerer SourceBase. + """ + return [ + Mal(titles), + Anii(titles), + Imdb(titles), + ] diff --git a/apps/pyMetadata/tests/test_poller.py b/apps/pyMetadata/tests/test_poller.py new file mode 100644 index 00000000..aa743f35 --- /dev/null +++ b/apps/pyMetadata/tests/test_poller.py @@ -0,0 +1,138 @@ +from typing import Set +import pytest +from models.event import MetadataSearchResultEvent, EventMetadata +from worker.poller import run_worker, run_iteration +from models.task import MetadataSearchTask, MetadataSearchData +from models.enums import TaskStatus +import uuid +from datetime import datetime +import time + +def make_dummy_event(): + return MetadataSearchResultEvent( + referenceId=uuid.uuid4(), + eventId=uuid.uuid4(), + metadata=EventMetadata( + created=datetime.now(), + derivedFromId={uuid.uuid4()} + ), + results=[], + persistedAt=datetime.now(), + recommended=None, # fyll inn med en gyldig bool + status="Completed" # eller enum hvis modellen krever det + ) + + +def make_task(): + return MetadataSearchTask( + referenceId=uuid.uuid4(), + taskId=uuid.uuid4(), + task="MetadataSearchTask", + status=TaskStatus.PENDING, + data=MetadataSearchData(searchTitles=["foo"], collection="bar"), + claimed=False, + claimedBy=None, + consumed=False, + lastCheckIn=None, + persistedAt=datetime.now() + ) + +def test_run_worker_processes_one(monkeypatch): + events = [] + task = make_task() + + class FakeDB: + def connect(self): pass + def close(self): pass + + calls = {"n": 0} + def fetch_once(db): + if calls["n"] == 0: + calls["n"] += 1 + return task + calls["n"] += 1 + return None + + monkeypatch.setattr("worker.poller.fetch_next_task", fetch_once) + monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True) + + # Viktig: async stub + async def fake_process_task(task): + return make_dummy_event() + monkeypatch.setattr("worker.poller.process_task", fake_process_task) + + def persist_stub(db, event, task_id): + events.append("dummy_event") + monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub) + + monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed")) + monkeypatch.setattr("worker.poller.time.sleep", lambda s: None) + + run_worker(db=FakeDB(), shutdown_flag_ref=lambda: calls["n"] >= 2) + + assert "dummy_event" in events + + + + +def test_backoff(monkeypatch): + intervals = [] + + class FakeDB: + def connect(self): pass + def close(self): pass + + # monkeypatch fetch_next_task til å returnere None flere ganger + monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None) + + # monkeypatch time.sleep til å fange poll_interval + def fake_sleep(seconds): + intervals.append(seconds) + monkeypatch.setattr(time, "sleep", fake_sleep) + + # monkeypatch claim_task, process_task osv. til dummy + monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) + monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") + monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) + monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) + + # kjør bare noen få iterasjoner ved å stoppe med shutdown_flag_ref + run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(intervals) >= 4) + + # verifiser at intervallet øker (5 → 10 → 20 → 40) + assert intervals == [5, 10, 20, 40] + +def test_backoff_on_connection_error(monkeypatch): + intervals = [] + reconnects = [] + + class FakeDB: + def connect(self): + reconnects.append("reconnect") + def close(self): pass + + # Først: fetch_next_task kaster exception + def failing_fetch(db): + raise RuntimeError("DB connection lost") + + monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch) + + # monkeypatch time.sleep til å fange poll_interval + def fake_sleep(seconds): + intervals.append(seconds) + monkeypatch.setattr(time, "sleep", fake_sleep) + + # dummy funksjoner + monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True) + monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event") + monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None) + monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None) + + # kjør bare noen få iterasjoner + run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(reconnects) >= 2) + + # verifiser at reconnect ble kalt + assert reconnects == ["reconnect", "reconnect"] + + # verifiser at poll_interval ble reset til 5 etter feil + assert all(interval == 5 for interval in intervals) diff --git a/apps/pyMetadata/tests/test_processor.py b/apps/pyMetadata/tests/test_processor.py new file mode 100644 index 00000000..e7ead244 --- /dev/null +++ b/apps/pyMetadata/tests/test_processor.py @@ -0,0 +1,113 @@ +import asyncio +import uuid +from datetime import datetime +import pytest + +import worker.processor as processor +from models.task import MetadataSearchTask, MetadataSearchData, TaskStatus +from models.metadata import Metadata, Summary, MediaType + +# --- Helpers --- +def make_dummy_metadata(source="mal", title="Foo"): + return Metadata( + title=title, + altTitle=[], + cover="cover.jpg", + banner=None, + type=MediaType.MOVIE, + summary=[Summary(summary="A fake summary", language="en")], + genres=["Drama"], + source=source, + ) + +def make_dummy_task(): + return MetadataSearchTask( + referenceId=uuid.uuid4(), + taskId=uuid.uuid4(), + task="MetadataSearchTask", + status=TaskStatus.PENDING, + data=MetadataSearchData(searchTitles=["Foo"], collection="bar"), + claimed=False, + claimedBy=None, + consumed=False, + lastCheckIn=None, + persistedAt=datetime.now() + ) + +# --- Tests --- + +@pytest.mark.asyncio +async def test_process_task_success(monkeypatch): + # Async stub for run_search + async def good_search(titles): + return [make_dummy_metadata("mal"), make_dummy_metadata("imdb")] + + monkeypatch.setattr(processor, "run_search", good_search) + + # Matchers return fixed scores + class DummyMatcher: + def __init__(self, title, m): pass + def getScore(self): return 50 + monkeypatch.setattr(processor, "SimpleMatcher", DummyMatcher) + monkeypatch.setattr(processor, "PrefixMatcher", DummyMatcher) + monkeypatch.setattr(processor, "AdvancedMatcher", DummyMatcher) + + # Fake DB and mark_failed + class FakeDB: pass + called = {} + monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True)) + + task = make_dummy_task() + event = await processor.process_task(FakeDB(), task) + + assert isinstance(event, processor.MetadataSearchResultEvent) + assert event.status == TaskStatus.COMPLETED + assert event.recommended is not None + assert "failed" not in called + + +@pytest.mark.asyncio +async def test_process_task_no_results(monkeypatch): + async def empty_search(titles): + return [] + monkeypatch.setattr(processor, "run_search", empty_search) + + class FakeDB: pass + called = {} + monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True)) + + task = make_dummy_task() + event = await processor.process_task(FakeDB(), task) + + assert event is None + assert "failed" in called + + +@pytest.mark.asyncio +async def test_process_task_exception(monkeypatch): + async def bad_search(titles): + raise RuntimeError("boom") + monkeypatch.setattr(processor, "run_search", bad_search) + + class FakeDB: pass + called = {} + monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True)) + + task = make_dummy_task() + event = await processor.process_task(FakeDB(), task) + + assert event is None + assert "failed" in called + + + +@pytest.mark.asyncio +async def test_choose_recommended_prefers_advanced(monkeypatch): + # Lag tre SearchResult med ulike scorer + m = make_dummy_metadata("mal") + r1 = processor.SearchResult(simpleScore=10, prefixScore=10, advancedScore=90, sourceWeight=1.0, metadata=processor.MetadataResult(source="mal", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[])) + r2 = processor.SearchResult(simpleScore=50, prefixScore=50, advancedScore=20, sourceWeight=1.0, metadata=processor.MetadataResult(source="imdb", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[])) + r3 = processor.SearchResult(simpleScore=80, prefixScore=80, advancedScore=80, sourceWeight=1.0, metadata=processor.MetadataResult(source="anii", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[])) + + recommended = processor.choose_recommended([r1, r2, r3]) + assert recommended is r1 # høyest advancedScore vinner diff --git a/apps/pyMetadata/tests/test_repository.py b/apps/pyMetadata/tests/test_repository.py new file mode 100644 index 00000000..bd597837 --- /dev/null +++ b/apps/pyMetadata/tests/test_repository.py @@ -0,0 +1,135 @@ +import json +import uuid +from datetime import datetime +import pytest +from db import repository +from models.event import MetadataSearchResultEvent, EventMetadata, SearchResult, MetadataResult, Summary +from models.enums import MediaType, TaskStatus +from db.repository import persist_event_and_mark_consumed +from models.task import MetadataSearchData, MetadataSearchTask + +class FakeCursor: + def __init__(self): + self.executed = [] + self.rowcount = 1 + def execute(self, sql, params=None): + self.executed.append((sql, params)) + def close(self): pass + +class FakeConn: + def __init__(self): + self.cursor_obj = FakeCursor() + self.committed = False + self.rolled_back = False + def cursor(self, dictionary=False): + return self.cursor_obj + def commit(self): self.committed = True + def rollback(self): self.rolled_back = True + +class FakeDB: + def __init__(self): + self.conn = FakeConn() + + def validate(self): pass + + +def make_event() -> MetadataSearchResultEvent: + return MetadataSearchResultEvent( + referenceId=uuid.uuid4(), + eventId=uuid.uuid4(), + metadata=EventMetadata( + created=datetime.now(), + derivedFromId={uuid.uuid4()} + ), + results=[], + recommended=SearchResult( + simpleScore=1, + prefixScore=2, + advancedScore=3, + sourceWeight=1.0, + metadata=MetadataResult( + source="test", + title="title", + alternateTitles=None, + cover=None, + bannerImage=None, + type=MediaType.SERIE, + summary=[Summary(language="en", description="desc")], + genres=["action"] + ) + ), + status=TaskStatus.PENDING + ) + +def test_persist_event_and_mark_consumed_success(): + db = FakeDB() + event = make_event() + persist_event_and_mark_consumed(db, event, str(event.eventId)) + # verifiser at commit ble kalt + assert db.conn.committed + # verifiser at to SQL statements ble kjørt + assert len(db.conn.cursor_obj.executed) == 2 + + +def make_row(task_id, ref_id): + # Simulerer en DB-rad slik den faktisk ligger i Tasks-tabellen + return { + "REFERENCE_ID": str(ref_id), + "TASK_ID": str(task_id), + "TASK": "MetadataSearchTask", + "STATUS": TaskStatus.PENDING.value, + "DATA": json.dumps({ + "searchTitles": ["Foo", "Bar"], + "collection": "anime" + }), + "CLAIMED": False, + "CLAIMED_BY": None, + "CONSUMED": False, + "LAST_CHECK_IN": None, + "PERSISTED_AT": datetime.now().isoformat() + } + +def test_fetch_next_task_maps_correctly(monkeypatch): + task_id = uuid.uuid4() + ref_id = uuid.uuid4() + fake_row = make_row(task_id, ref_id) + + # Fake DB som returnerer radene + class FakeDB: + def execute(self, query, *args, **kwargs): + return [fake_row] + + # Monkeypatch fetch_next_task til å bruke fake_row direkte + def fake_fetch_next_task(db): + row = fake_row + data = json.loads(row["DATA"]) + return MetadataSearchTask( + referenceId=uuid.UUID(row["REFERENCE_ID"]), + taskId=uuid.UUID(row["TASK_ID"]), + task=row["TASK"], + status=TaskStatus(row["STATUS"]), + data=MetadataSearchData( + searchTitles=data["searchTitles"], + collection=data["collection"] + ), + claimed=row["CLAIMED"], + claimedBy=row["CLAIMED_BY"], + consumed=row["CONSUMED"], + lastCheckIn=row["LAST_CHECK_IN"], + persistedAt=datetime.fromisoformat(row["PERSISTED_AT"]) + ) + + monkeypatch.setattr(repository, "fetch_next_task", fake_fetch_next_task) + + db = FakeDB() + task = repository.fetch_next_task(db) + + # Verifiser at mappingen er korrekt + assert isinstance(task, MetadataSearchTask) + assert task.taskId == task_id + assert task.referenceId == ref_id + assert task.status == TaskStatus.PENDING + assert task.data.collection == "anime" + assert task.data.searchTitles == ["Foo", "Bar"] + assert task.claimed is False + assert task.consumed is False \ No newline at end of file diff --git a/apps/pyMetadata/tests/test_search_runner.py b/apps/pyMetadata/tests/test_search_runner.py new file mode 100644 index 00000000..a35ccb2f --- /dev/null +++ b/apps/pyMetadata/tests/test_search_runner.py @@ -0,0 +1,75 @@ +import asyncio +import pytest +import uuid +from datetime import datetime + +from worker.search_runner import run_search +from models.metadata import Metadata, Summary, MediaType + +# Dummy Metadata factory +def make_dummy_metadata(source: str, title: str = "Dummy Title") -> Metadata: + return Metadata( + title=title, + altTitle=[f"{title} alt"], + cover="http://example.com/cover.jpg", + banner=None, + type=MediaType.MOVIE, # bruk en gyldig enum fra din kode + summary=[Summary(summary="A fake summary", language="en")], + genres=["Drama", "Action"], + source=source, + ) + +# Dummy Source that mimics SourceBase +class DummySource: + def __init__(self, titles, result=None, raise_exc=False): + self.titles = titles + self._result = result + self._raise_exc = raise_exc + + async def search(self): + if self._raise_exc: + raise RuntimeError("Search failed") + return self._result + +@pytest.mark.asyncio +async def test_run_search_all_results(monkeypatch): + sources = [ + DummySource(["foo"], make_dummy_metadata("mal")), + DummySource(["foo"], make_dummy_metadata("imdb")), + DummySource(["foo"], make_dummy_metadata("anii")), + ] + monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources) + + results = await run_search(["foo"]) + assert len(results) == 3 + assert all(isinstance(r, Metadata) for r in results) + assert {r.source for r in results} == {"mal", "imdb", "anii"} + +@pytest.mark.asyncio +async def test_run_search_filters_none(monkeypatch): + sources = [ + DummySource(["foo"], make_dummy_metadata("mal")), + DummySource(["foo"], None), + DummySource(["foo"], make_dummy_metadata("imdb")), + ] + monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources) + + results = await run_search(["foo"]) + assert len(results) == 2 + assert {r.source for r in results} == {"mal", "imdb"} + +@pytest.mark.asyncio +async def test_run_search_handles_exception(monkeypatch): + sources = [ + DummySource(["foo"], make_dummy_metadata("mal")), + DummySource(["foo"], raise_exc=True), + DummySource(["foo"], make_dummy_metadata("imdb")), + ] + monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources) + + results = await run_search(["foo"]) + + # Nå skal vi få bare de gyldige Metadata-resultatene + assert all(isinstance(r, Metadata) for r in results) + assert {r.source for r in results} == {"mal", "imdb"} + diff --git a/apps/pyMetadata/utils/__init__.py b/apps/pyMetadata/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pyMetadata/utils/backoff.py b/apps/pyMetadata/utils/backoff.py new file mode 100644 index 00000000..61ec8d44 --- /dev/null +++ b/apps/pyMetadata/utils/backoff.py @@ -0,0 +1,11 @@ +from utils.logger import logger +import time + +def retry_delays(): + return [5, 15, 30, 60] + +def wait_with_backoff(): + for delay in retry_delays(): + logger.info(f"⏳ Venter {delay} sekunder...") + time.sleep(delay) + yield diff --git a/apps/pyMetadata/utils/logger.py b/apps/pyMetadata/utils/logger.py new file mode 100644 index 00000000..561e0dd6 --- /dev/null +++ b/apps/pyMetadata/utils/logger.py @@ -0,0 +1,32 @@ +import logging +import sys + +# ANSI farger +COLORS = { + "INFO": "\033[94m", # blå + "DEBUG": "\033[92m", # grønn + "WARNING": "\033[93m", # gul + "ERROR": "\033[91m", # rød + "RESET": "\033[0m" +} + +class ColoredFormatter(logging.Formatter): + def format(self, record): + levelname = record.levelname + color = COLORS.get(levelname, COLORS["RESET"]) + prefix = f"[{levelname}]" + message = super().format(record) + return f"{color}{prefix}{COLORS['RESET']} {message}" + +def setup_logger(level=logging.INFO): + handler = logging.StreamHandler(sys.stdout) + formatter = ColoredFormatter("%(asctime)s - %(name)s - %(message)s") + handler.setFormatter(formatter) + + logger = logging.getLogger() + logger.setLevel(level) + logger.handlers = [handler] + return logger + +# Opprett global logger +logger: logging.Logger = setup_logger() diff --git a/apps/pyMetadata/worker/__init__.py b/apps/pyMetadata/worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/pyMetadata/worker/poller.py b/apps/pyMetadata/worker/poller.py new file mode 100644 index 00000000..c743eea0 --- /dev/null +++ b/apps/pyMetadata/worker/poller.py @@ -0,0 +1,59 @@ +import asyncio +import time +from typing import Optional +import uuid +from db.database import Database +from db.repository import claim_task, fetch_next_task, mark_failed, persist_event_and_mark_consumed +from models.event import MetadataSearchResultEvent +from worker.processor import process_task +from utils.logger import logger +from config.database_config import DatabaseConfig +from models.task import MetadataSearchTask, Task + +def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]: + """ + Kjør én iterasjon av poller-loopen. + Returnerer (sleep_interval, next_interval). + """ + try: + task: Optional[Task] = fetch_next_task(db) + if task: + if not isinstance(task, MetadataSearchTask): + logger.warning(f"⚠️ Ukjent task-type {type(task)} for {task.taskId}, hopper over.") + return poll_interval, poll_interval + + if not claim_task(db, str(task.taskId), worker_id): + logger.info(f"⏩ Task {task.taskId} ble claimet av en annen worker.") + return poll_interval, poll_interval + + logger.info(f"🔔 Fant task {task.taskId} ({task.task}), claimed by {worker_id}") + try: + event: MetadataSearchResultEvent = asyncio.run(process_task(task)) + if event: + persist_event_and_mark_consumed(db, event, str(task.taskId)) + logger.info(f"✅ Task {task.taskId} ferdig prosessert") + else: + logger.error(f"❌ Task returned nothing! {task.taskId}") + raise RuntimeError("process_task returned nothing!") + except Exception as task_error: + logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}") + mark_failed(db, str(task.taskId)) + return poll_interval, 5 # sov med nåværende, reset til 5 + else: + logger.debug("Ingen nye tasks.") + return poll_interval, min(poll_interval * 2, 60) + except Exception as e: + logger.error(f"⚠️ Feil i worker: {e}") + db.connect() + return poll_interval, 5 + +def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None: + poll_interval: int = 5 + worker_id = f"worker-{uuid.uuid4()}" + + while not shutdown_flag_ref(): + sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval) + time.sleep(sleep_interval) + + logger.info("👋 run_worker loop avsluttet") + db.close() \ No newline at end of file diff --git a/apps/pyMetadata/worker/processor.py b/apps/pyMetadata/worker/processor.py new file mode 100644 index 00000000..330379f0 --- /dev/null +++ b/apps/pyMetadata/worker/processor.py @@ -0,0 +1,128 @@ +import uuid +from datetime import datetime + +from tabulate import tabulate +from models.metadata import Metadata +from models.task import MetadataSearchTask +from utils.logger import logger +from models.event import ( + EventMetadata, + MetadataSearchResultEvent, + SearchResult, + MetadataResult, + Summary, + TaskStatus, + MediaType, +) +from worker.search_runner import run_search +from algo.SimpleMatcher import SimpleMatcher +from algo.PrefixMatcher import PrefixMatcher +from algo.AdvancedMatcher import AdvancedMatcher +from db.repository import mark_failed + +def source_priority(source: str) -> int: + """Domene-spesifikk kildevekting.""" + priority_map = {'mal': 1, 'anii': 2, 'imdb': 3} + return priority_map.get(source, 4) + + +def score_metadata_against_title(title, m: Metadata) -> SearchResult: + simple = SimpleMatcher(title, m).getScore() + prefix = PrefixMatcher(title, m).getScore() + advanced = AdvancedMatcher(title, m).getScore() + + # IMPORTANT: map exactly to bannerImage, not banner. + metadata_result = MetadataResult( + source=m.source, + title=m.title, + alternateTitles=m.altTitle if m.altTitle else [], + cover=getattr(m, "cover", None), + bannerImage=getattr(m, "bannerImage", None), # no renaming + type=m.type, # must already be MediaType + summary=[Summary(language=s.language, description=s.summary) for s in m.summary], + genres=m.genres, + ) + + return SearchResult( + simpleScore=simple, + prefixScore=prefix, + advancedScore=advanced, + sourceWeight=1.0, + metadata=metadata_result + ) + + +def print_summary(results: list[SearchResult], titles: list[str]) -> None: + """Print tabell med scorer for alle kombinasjoner.""" + rows = [] + for r in results: + rows.append(( + # NB: metadata.title er matched title, search_title kan du lagre i SearchResult hvis du vil + r.metadata.title, + r.metadata.source, + r.simpleScore, + r.prefixScore, + r.advancedScore + )) + headers = ["Matched Title", "Source", "Simple", "Prefix", "Advanced"] + print(tabulate(rows, headers=headers)) + + +def choose_recommended(results: list[SearchResult]) -> SearchResult: + """Velg recommended basert på scorer og kildevekting.""" + return max( + results, + key=lambda r: ( + r.advancedScore, + r.simpleScore, + r.prefixScore, + -source_priority(r.metadata.source) + ) + ) + + +async def process_task(db, task: MetadataSearchTask) -> MetadataSearchResultEvent|None: + titles = task.data.searchTitles + logger.info(f"Prosesserer task {task.taskId} med titler: {titles}") + + try: + metadata_list = await run_search(titles) + if not metadata_list: + mark_failed(db, task.taskId) + return + + # 1) Score alle kombinasjoner + results = [] + for m in metadata_list: + for t in titles: + results.append(score_metadata_against_title(t, m)) + + # 2) Print tabell + print_summary(results, titles) + + # 3) Velg recommended + recommended = choose_recommended(results) + + # 4) Bygg event + core_metadata = EventMetadata( + created=datetime.now(), + derivedFromId={task.referenceId, task.taskId} + ) + + event = MetadataSearchResultEvent( + referenceId=task.referenceId, + eventId=uuid.uuid4(), + metadata=core_metadata, + results=results, + recommended=recommended, + status=TaskStatus.COMPLETED + ) + + # 5) Returner + logger.info(f"✅ Task {task.taskId} ferdig prosessert med {len(results)} resultater") + return event + + except Exception as e: + logger.error(f"❌ Task {task.taskId} feilet: {e}") + mark_failed(db, task.taskId) + return None \ No newline at end of file diff --git a/apps/pyMetadata/worker/search_runner.py b/apps/pyMetadata/worker/search_runner.py new file mode 100644 index 00000000..f39e4752 --- /dev/null +++ b/apps/pyMetadata/worker/search_runner.py @@ -0,0 +1,31 @@ +# search_runner.py +import asyncio +from typing import List +from models.metadata import Metadata +from utils.logger import logger +from sources.factory import get_all_sources + +async def run_search(titles: List[str]) -> List[Metadata]: + """ + Kjører alle kilder parallelt på gitt titler. + Returnerer en liste av Metadata fra alle kilder. + Ingen mapping eller scoring gjøres her. + """ + + sources = get_all_sources(titles) + + # Kjør alle kildesøk parallelt + results = await asyncio.gather(*(s.search() for s in sources), return_exceptions=True) + + metadata_results: List[Metadata] = [] + for source, r in zip(sources, results): + if isinstance(r, Exception): + logger.warning( + f"Kilde '{source.__class__.__name__}' feilet under søk " + f"med titler={source.titles}: {r}" + ) + elif r is not None: + metadata_results.append(r) + + logger.info(f"Søk ferdig: {len(metadata_results)} resultater fra {len(sources)} kilder") + return metadata_results diff --git a/apps/pyWatcher/api/health_api.py b/apps/pyWatcher/api/health_api.py new file mode 100644 index 00000000..8e70b572 --- /dev/null +++ b/apps/pyWatcher/api/health_api.py @@ -0,0 +1,19 @@ +from fastapi import FastAPI +from fastapi.responses import JSONResponse + +def create_health_app(observers_ref): + """ + Returnerer en FastAPI-app med /health endpoint. + observers_ref: en funksjon eller lambda som gir listen av observers. + """ + app = FastAPI() + + @app.get("/health") + def health(): + observers = observers_ref() + healthy = all(obs.is_alive() for obs in observers) + status = "healthy" if healthy else "unhealthy" + code = 200 if healthy else 500 + return JSONResponse({"status": status}, status_code=code) + + return app