Wip 4 - Pushing Python

This commit is contained in:
Brage Skjønborg 2026-01-02 01:49:21 +01:00
parent f85fbde89d
commit 8096a979ea
24 changed files with 1074 additions and 0 deletions

View File

View File

@ -0,0 +1,29 @@
import os
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
address: str
port: int
name: str
username: str
password: str
@staticmethod
def from_env() -> "DatabaseConfig":
return DatabaseConfig(
address=os.environ.get("DATABASE_ADDRESS") or "192.168.2.250",
port=int(os.environ.get("DATABASE_PORT") or "3306"),
name=os.environ.get("DATABASE_NAME_E") or "eventsV3",
username=os.environ.get("DATABASE_USERNAME") or "root",
password=os.environ.get("DATABASE_PASSWORD") or "def",
)
def validate(self) -> None:
if not self.address:
raise ValueError("Database address mangler")
if not self.name:
raise ValueError("Database name mangler")
if not self.username:
raise ValueError("Database username mangler")
# du kan legge til flere regler her

View File

View File

@ -0,0 +1,53 @@
from config.database_config import DatabaseConfig
from utils.logger import logger
import mysql.connector
from mysql.connector import Error
from utils.backoff import wait_with_backoff
class Database:
def __init__(self, config: DatabaseConfig):
self.config = config
self.conn = None
def connect(self):
"""Koble til DB med backoff."""
self.config.validate()
while True:
try:
self.conn = mysql.connector.connect(
host=self.config.address,
user=self.config.username,
password=self.config.password,
database=self.config.name
)
if self.conn.is_connected():
logger.info("✅ Tilkoblet til databasen")
return
except Error as e:
logger.error(f"❌ DB-tilkobling feilet: {e}")
for _ in wait_with_backoff():
try:
self.conn = mysql.connector.connect(
host=self.config.address,
user=self.config.username,
password=self.config.password,
database=self.config.name
)
if self.conn.is_connected():
logger.info("✅ Tilkoblet til databasen")
return
except Error:
continue
def validate(self):
"""Sjekk at tilkoblingen er aktiv."""
if not self.conn or not self.conn.is_connected():
logger.warning("⚠️ Tilkobling mistet, prøver igjen...")
self.connect()
def query(self, sql: str, params=None):
"""Kjør en spørring med validering."""
self.validate()
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql, params or ())
return cursor.fetchall()

View File

@ -0,0 +1,108 @@
from datetime import datetime
import json
from typing import Optional
from db.database import Database
from models.enums import TaskStatus
from models.event import MetadataSearchResultEvent
from models.task import Task, MetadataSearchTask, MetadataSearchData
from utils.logger import logger
def fetch_next_task(db: Database) -> Optional[Task]:
db.validate()
cursor = db.conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM TASKS WHERE STATUS='Pending' AND CLAIMED=0 AND CONSUMED=0 "
"ORDER BY PERSISTED_AT ASC LIMIT 1"
)
row = cursor.fetchone()
if not row:
return None
try:
if row["TASK"] == "MetadataSearchTask":
# hele JSON ligger i DATA
return MetadataSearchTask.model_validate_json(row["DATA"])
else:
return Task.model_validate_json(row["DATA"])
except Exception as e:
logger.error(f"❌ Feil ved deserialisering av task {row.get('TASK_ID')}: {e}")
mark_failed(db, row["TASK_ID"])
return None
def mark_failed(db: Database, task_id: str) -> None:
cursor = db.conn.cursor()
cursor.execute(
"UPDATE TASKS SET STATUS='Failed' WHERE TASK_ID=%s",
(task_id,)
)
db.conn.commit()
def claim_task(db: Database, task_id: str, worker_id: str) -> bool:
"""
Marker en task som claimed av en gitt worker.
Returnerer True hvis claim lykkes, False hvis task allerede er claimed.
"""
db.validate()
try:
cursor = db.conn.cursor()
# Oppdater bare hvis task ikke allerede er claimed
cursor.execute(
"""
UPDATE TASKS
SET CLAIMED=1, CLAIMED_BY=%s, LAST_CHECK_IN=%s
WHERE TASK_ID=%s AND CLAIMED=0 AND CONSUMED=0
""",
(worker_id, datetime.now(), task_id)
)
db.conn.commit()
return cursor.rowcount > 0
except Exception as e:
db.conn.rollback()
raise RuntimeError(f"Claim feilet: {e}")
def persist_event_and_mark_consumed(db: Database, event: MetadataSearchResultEvent, task_id: str) -> None:
"""
Persisterer et event og markerer tilhørende task som consumed i én transaksjon.
Ruller tilbake hvis noe feiler.
"""
db.validate()
try:
cursor = db.conn.cursor()
# 1. Insert event
as_data = event.model_dump_json() # Pydantic v2
event_name = event.__class__.__name__
cursor.execute(
"""
INSERT INTO EVENTS (REFERENCE_ID, EVENT_ID, EVENT, DATA, PERSISTED_AT)
VALUES (%s, %s, %s, %s, %s)
""",
(
str(event.referenceId),
str(event.eventId),
event_name,
as_data,
datetime.now().isoformat()
)
)
# 2. Update task status
cursor.execute(
"UPDATE TASKS SET STATUS=%s, CONSUMED=1 WHERE TASK_ID=%s",
(TaskStatus.COMPLETED.value, task_id)
)
# 3. Commit begge operasjoner
db.conn.commit()
except Exception as e:
# Rull tilbake hvis noe feiler
db.conn.rollback()
raise RuntimeError(f"Transaksjon feilet: {e}")

View File

View File

@ -0,0 +1,11 @@
from enum import Enum
class TaskStatus(Enum):
PENDING = "Pending"
IN_PROGRESS = "InProgress"
COMPLETED = "Completed"
FAILED = "Failed"
class MediaType(Enum):
MOVIE = "Movie"
SERIE = "Serie"

View File

@ -0,0 +1,48 @@
# models/event.py
from pydantic import BaseModel
from datetime import datetime
from typing import List, Set
from uuid import UUID
from models.enums import MediaType, TaskStatus
class EventMetadata(BaseModel):
created: datetime
derivedFromId: Set[UUID] # nøyaktig feltnavn
class Summary(BaseModel):
language: str
description: str
class MetadataResult(BaseModel):
source: str
title: str
alternateTitles: List[str]
cover: str | None
bannerImage: str | None # behold camelCase
type: MediaType
summary: List[Summary]
genres: List[str]
class SearchResult(BaseModel):
simpleScore: int
prefixScore: int
advancedScore: int
sourceWeight: float
metadata: MetadataResult
class MetadataSearchResultEvent(BaseModel):
# Påkrevde felter
referenceId: UUID
eventId: UUID
metadata: EventMetadata
# Custom felter
results: List[SearchResult]
recommended: SearchResult|None
status: TaskStatus

View File

@ -0,0 +1,39 @@
from dataclasses import dataclass, asdict
from typing import List, Optional
from enum import Enum
from models.enums import MediaType
@dataclass
class Summary:
summary: str
language: str
def to_dict(self):
return {k: v.strip() if isinstance(v, str) else v for k, v in asdict(self).items()}
@dataclass
class Metadata:
title: str
altTitle: List[str]
cover: str
banner: Optional[str]
type: MediaType
summary: List[Summary]
genres: List[str]
source: str
def to_dict(self):
def trim(item):
if isinstance(item, str):
return item.strip()
elif isinstance(item, list):
return [trim(sub_item) for sub_item in item]
elif isinstance(item, Enum):
return item.value
elif hasattr(item, "to_dict"):
return item.to_dict()
return item
return {key: trim(value) for key, value in asdict(self).items()}

View File

@ -0,0 +1,28 @@
# models/task.py
from pydantic import BaseModel
from uuid import UUID
from datetime import datetime
from typing import List, Optional
from models.enums import TaskStatus
class MetadataSearchData(BaseModel):
searchTitles: List[str]
collection: str
class Task(BaseModel):
referenceId: UUID
taskId: UUID
task: str
status: TaskStatus
data: dict # generisk payload hvis du ikke vet typen
claimed: bool
claimedBy: Optional[str]
consumed: bool
lastCheckIn: Optional[datetime]
persistedAt: datetime
class MetadataSearchTask(Task):
data: MetadataSearchData

View File

@ -0,0 +1,2 @@
pytest==9.0.2
pytest-asyncio==1.3.0

View File

@ -0,0 +1,15 @@
from typing import List
from .mal import Mal
from .anii import Anii
from .imdb import Imdb
from .source import SourceBase
def get_all_sources(titles: List[str]) -> List[SourceBase]:
"""
Returnerer alle aktive kilder som implementerer SourceBase.
"""
return [
Mal(titles),
Anii(titles),
Imdb(titles),
]

View File

@ -0,0 +1,138 @@
from typing import Set
import pytest
from models.event import MetadataSearchResultEvent, EventMetadata
from worker.poller import run_worker, run_iteration
from models.task import MetadataSearchTask, MetadataSearchData
from models.enums import TaskStatus
import uuid
from datetime import datetime
import time
def make_dummy_event():
return MetadataSearchResultEvent(
referenceId=uuid.uuid4(),
eventId=uuid.uuid4(),
metadata=EventMetadata(
created=datetime.now(),
derivedFromId={uuid.uuid4()}
),
results=[],
persistedAt=datetime.now(),
recommended=None, # fyll inn med en gyldig bool
status="Completed" # eller enum hvis modellen krever det
)
def make_task():
return MetadataSearchTask(
referenceId=uuid.uuid4(),
taskId=uuid.uuid4(),
task="MetadataSearchTask",
status=TaskStatus.PENDING,
data=MetadataSearchData(searchTitles=["foo"], collection="bar"),
claimed=False,
claimedBy=None,
consumed=False,
lastCheckIn=None,
persistedAt=datetime.now()
)
def test_run_worker_processes_one(monkeypatch):
events = []
task = make_task()
class FakeDB:
def connect(self): pass
def close(self): pass
calls = {"n": 0}
def fetch_once(db):
if calls["n"] == 0:
calls["n"] += 1
return task
calls["n"] += 1
return None
monkeypatch.setattr("worker.poller.fetch_next_task", fetch_once)
monkeypatch.setattr("worker.poller.claim_task", lambda *a, **k: True)
# Viktig: async stub
async def fake_process_task(task):
return make_dummy_event()
monkeypatch.setattr("worker.poller.process_task", fake_process_task)
def persist_stub(db, event, task_id):
events.append("dummy_event")
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", persist_stub)
monkeypatch.setattr("worker.poller.mark_failed", lambda *a, **k: events.append("failed"))
monkeypatch.setattr("worker.poller.time.sleep", lambda s: None)
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: calls["n"] >= 2)
assert "dummy_event" in events
def test_backoff(monkeypatch):
intervals = []
class FakeDB:
def connect(self): pass
def close(self): pass
# monkeypatch fetch_next_task til å returnere None flere ganger
monkeypatch.setattr("worker.poller.fetch_next_task", lambda db: None)
# monkeypatch time.sleep til å fange poll_interval
def fake_sleep(seconds):
intervals.append(seconds)
monkeypatch.setattr(time, "sleep", fake_sleep)
# monkeypatch claim_task, process_task osv. til dummy
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
# kjør bare noen få iterasjoner ved å stoppe med shutdown_flag_ref
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(intervals) >= 4)
# verifiser at intervallet øker (5 → 10 → 20 → 40)
assert intervals == [5, 10, 20, 40]
def test_backoff_on_connection_error(monkeypatch):
intervals = []
reconnects = []
class FakeDB:
def connect(self):
reconnects.append("reconnect")
def close(self): pass
# Først: fetch_next_task kaster exception
def failing_fetch(db):
raise RuntimeError("DB connection lost")
monkeypatch.setattr("worker.poller.fetch_next_task", failing_fetch)
# monkeypatch time.sleep til å fange poll_interval
def fake_sleep(seconds):
intervals.append(seconds)
monkeypatch.setattr(time, "sleep", fake_sleep)
# dummy funksjoner
monkeypatch.setattr("worker.poller.claim_task", lambda db, tid, wid: True)
monkeypatch.setattr("worker.poller.process_task", lambda t: "dummy_event")
monkeypatch.setattr("worker.poller.persist_event_and_mark_consumed", lambda db, e, tid: None)
monkeypatch.setattr("worker.poller.mark_failed", lambda db, tid: None)
# kjør bare noen få iterasjoner
run_worker(db=FakeDB(), shutdown_flag_ref=lambda: len(reconnects) >= 2)
# verifiser at reconnect ble kalt
assert reconnects == ["reconnect", "reconnect"]
# verifiser at poll_interval ble reset til 5 etter feil
assert all(interval == 5 for interval in intervals)

View File

@ -0,0 +1,113 @@
import asyncio
import uuid
from datetime import datetime
import pytest
import worker.processor as processor
from models.task import MetadataSearchTask, MetadataSearchData, TaskStatus
from models.metadata import Metadata, Summary, MediaType
# --- Helpers ---
def make_dummy_metadata(source="mal", title="Foo"):
return Metadata(
title=title,
altTitle=[],
cover="cover.jpg",
banner=None,
type=MediaType.MOVIE,
summary=[Summary(summary="A fake summary", language="en")],
genres=["Drama"],
source=source,
)
def make_dummy_task():
return MetadataSearchTask(
referenceId=uuid.uuid4(),
taskId=uuid.uuid4(),
task="MetadataSearchTask",
status=TaskStatus.PENDING,
data=MetadataSearchData(searchTitles=["Foo"], collection="bar"),
claimed=False,
claimedBy=None,
consumed=False,
lastCheckIn=None,
persistedAt=datetime.now()
)
# --- Tests ---
@pytest.mark.asyncio
async def test_process_task_success(monkeypatch):
# Async stub for run_search
async def good_search(titles):
return [make_dummy_metadata("mal"), make_dummy_metadata("imdb")]
monkeypatch.setattr(processor, "run_search", good_search)
# Matchers return fixed scores
class DummyMatcher:
def __init__(self, title, m): pass
def getScore(self): return 50
monkeypatch.setattr(processor, "SimpleMatcher", DummyMatcher)
monkeypatch.setattr(processor, "PrefixMatcher", DummyMatcher)
monkeypatch.setattr(processor, "AdvancedMatcher", DummyMatcher)
# Fake DB and mark_failed
class FakeDB: pass
called = {}
monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True))
task = make_dummy_task()
event = await processor.process_task(FakeDB(), task)
assert isinstance(event, processor.MetadataSearchResultEvent)
assert event.status == TaskStatus.COMPLETED
assert event.recommended is not None
assert "failed" not in called
@pytest.mark.asyncio
async def test_process_task_no_results(monkeypatch):
async def empty_search(titles):
return []
monkeypatch.setattr(processor, "run_search", empty_search)
class FakeDB: pass
called = {}
monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True))
task = make_dummy_task()
event = await processor.process_task(FakeDB(), task)
assert event is None
assert "failed" in called
@pytest.mark.asyncio
async def test_process_task_exception(monkeypatch):
async def bad_search(titles):
raise RuntimeError("boom")
monkeypatch.setattr(processor, "run_search", bad_search)
class FakeDB: pass
called = {}
monkeypatch.setattr(processor, "mark_failed", lambda db, tid: called.setdefault("failed", True))
task = make_dummy_task()
event = await processor.process_task(FakeDB(), task)
assert event is None
assert "failed" in called
@pytest.mark.asyncio
async def test_choose_recommended_prefers_advanced(monkeypatch):
# Lag tre SearchResult med ulike scorer
m = make_dummy_metadata("mal")
r1 = processor.SearchResult(simpleScore=10, prefixScore=10, advancedScore=90, sourceWeight=1.0, metadata=processor.MetadataResult(source="mal", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[]))
r2 = processor.SearchResult(simpleScore=50, prefixScore=50, advancedScore=20, sourceWeight=1.0, metadata=processor.MetadataResult(source="imdb", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[]))
r3 = processor.SearchResult(simpleScore=80, prefixScore=80, advancedScore=80, sourceWeight=1.0, metadata=processor.MetadataResult(source="anii", title="Foo", alternateTitles=None, cover="", bannerImage=None, type=MediaType.MOVIE, summary=[], genres=[]))
recommended = processor.choose_recommended([r1, r2, r3])
assert recommended is r1 # høyest advancedScore vinner

View File

@ -0,0 +1,135 @@
import json
import uuid
from datetime import datetime
import pytest
from db import repository
from models.event import MetadataSearchResultEvent, EventMetadata, SearchResult, MetadataResult, Summary
from models.enums import MediaType, TaskStatus
from db.repository import persist_event_and_mark_consumed
from models.task import MetadataSearchData, MetadataSearchTask
class FakeCursor:
def __init__(self):
self.executed = []
self.rowcount = 1
def execute(self, sql, params=None):
self.executed.append((sql, params))
def close(self): pass
class FakeConn:
def __init__(self):
self.cursor_obj = FakeCursor()
self.committed = False
self.rolled_back = False
def cursor(self, dictionary=False):
return self.cursor_obj
def commit(self): self.committed = True
def rollback(self): self.rolled_back = True
class FakeDB:
def __init__(self):
self.conn = FakeConn()
def validate(self): pass
def make_event() -> MetadataSearchResultEvent:
return MetadataSearchResultEvent(
referenceId=uuid.uuid4(),
eventId=uuid.uuid4(),
metadata=EventMetadata(
created=datetime.now(),
derivedFromId={uuid.uuid4()}
),
results=[],
recommended=SearchResult(
simpleScore=1,
prefixScore=2,
advancedScore=3,
sourceWeight=1.0,
metadata=MetadataResult(
source="test",
title="title",
alternateTitles=None,
cover=None,
bannerImage=None,
type=MediaType.SERIE,
summary=[Summary(language="en", description="desc")],
genres=["action"]
)
),
status=TaskStatus.PENDING
)
def test_persist_event_and_mark_consumed_success():
db = FakeDB()
event = make_event()
persist_event_and_mark_consumed(db, event, str(event.eventId))
# verifiser at commit ble kalt
assert db.conn.committed
# verifiser at to SQL statements ble kjørt
assert len(db.conn.cursor_obj.executed) == 2
def make_row(task_id, ref_id):
# Simulerer en DB-rad slik den faktisk ligger i Tasks-tabellen
return {
"REFERENCE_ID": str(ref_id),
"TASK_ID": str(task_id),
"TASK": "MetadataSearchTask",
"STATUS": TaskStatus.PENDING.value,
"DATA": json.dumps({
"searchTitles": ["Foo", "Bar"],
"collection": "anime"
}),
"CLAIMED": False,
"CLAIMED_BY": None,
"CONSUMED": False,
"LAST_CHECK_IN": None,
"PERSISTED_AT": datetime.now().isoformat()
}
def test_fetch_next_task_maps_correctly(monkeypatch):
task_id = uuid.uuid4()
ref_id = uuid.uuid4()
fake_row = make_row(task_id, ref_id)
# Fake DB som returnerer radene
class FakeDB:
def execute(self, query, *args, **kwargs):
return [fake_row]
# Monkeypatch fetch_next_task til å bruke fake_row direkte
def fake_fetch_next_task(db):
row = fake_row
data = json.loads(row["DATA"])
return MetadataSearchTask(
referenceId=uuid.UUID(row["REFERENCE_ID"]),
taskId=uuid.UUID(row["TASK_ID"]),
task=row["TASK"],
status=TaskStatus(row["STATUS"]),
data=MetadataSearchData(
searchTitles=data["searchTitles"],
collection=data["collection"]
),
claimed=row["CLAIMED"],
claimedBy=row["CLAIMED_BY"],
consumed=row["CONSUMED"],
lastCheckIn=row["LAST_CHECK_IN"],
persistedAt=datetime.fromisoformat(row["PERSISTED_AT"])
)
monkeypatch.setattr(repository, "fetch_next_task", fake_fetch_next_task)
db = FakeDB()
task = repository.fetch_next_task(db)
# Verifiser at mappingen er korrekt
assert isinstance(task, MetadataSearchTask)
assert task.taskId == task_id
assert task.referenceId == ref_id
assert task.status == TaskStatus.PENDING
assert task.data.collection == "anime"
assert task.data.searchTitles == ["Foo", "Bar"]
assert task.claimed is False
assert task.consumed is False

View File

@ -0,0 +1,75 @@
import asyncio
import pytest
import uuid
from datetime import datetime
from worker.search_runner import run_search
from models.metadata import Metadata, Summary, MediaType
# Dummy Metadata factory
def make_dummy_metadata(source: str, title: str = "Dummy Title") -> Metadata:
return Metadata(
title=title,
altTitle=[f"{title} alt"],
cover="http://example.com/cover.jpg",
banner=None,
type=MediaType.MOVIE, # bruk en gyldig enum fra din kode
summary=[Summary(summary="A fake summary", language="en")],
genres=["Drama", "Action"],
source=source,
)
# Dummy Source that mimics SourceBase
class DummySource:
def __init__(self, titles, result=None, raise_exc=False):
self.titles = titles
self._result = result
self._raise_exc = raise_exc
async def search(self):
if self._raise_exc:
raise RuntimeError("Search failed")
return self._result
@pytest.mark.asyncio
async def test_run_search_all_results(monkeypatch):
sources = [
DummySource(["foo"], make_dummy_metadata("mal")),
DummySource(["foo"], make_dummy_metadata("imdb")),
DummySource(["foo"], make_dummy_metadata("anii")),
]
monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources)
results = await run_search(["foo"])
assert len(results) == 3
assert all(isinstance(r, Metadata) for r in results)
assert {r.source for r in results} == {"mal", "imdb", "anii"}
@pytest.mark.asyncio
async def test_run_search_filters_none(monkeypatch):
sources = [
DummySource(["foo"], make_dummy_metadata("mal")),
DummySource(["foo"], None),
DummySource(["foo"], make_dummy_metadata("imdb")),
]
monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources)
results = await run_search(["foo"])
assert len(results) == 2
assert {r.source for r in results} == {"mal", "imdb"}
@pytest.mark.asyncio
async def test_run_search_handles_exception(monkeypatch):
sources = [
DummySource(["foo"], make_dummy_metadata("mal")),
DummySource(["foo"], raise_exc=True),
DummySource(["foo"], make_dummy_metadata("imdb")),
]
monkeypatch.setattr("worker.search_runner.get_all_sources", lambda titles: sources)
results = await run_search(["foo"])
# Nå skal vi få bare de gyldige Metadata-resultatene
assert all(isinstance(r, Metadata) for r in results)
assert {r.source for r in results} == {"mal", "imdb"}

View File

View File

@ -0,0 +1,11 @@
from utils.logger import logger
import time
def retry_delays():
return [5, 15, 30, 60]
def wait_with_backoff():
for delay in retry_delays():
logger.info(f"⏳ Venter {delay} sekunder...")
time.sleep(delay)
yield

View File

@ -0,0 +1,32 @@
import logging
import sys
# ANSI farger
COLORS = {
"INFO": "\033[94m", # blå
"DEBUG": "\033[92m", # grønn
"WARNING": "\033[93m", # gul
"ERROR": "\033[91m", # rød
"RESET": "\033[0m"
}
class ColoredFormatter(logging.Formatter):
def format(self, record):
levelname = record.levelname
color = COLORS.get(levelname, COLORS["RESET"])
prefix = f"[{levelname}]"
message = super().format(record)
return f"{color}{prefix}{COLORS['RESET']} {message}"
def setup_logger(level=logging.INFO):
handler = logging.StreamHandler(sys.stdout)
formatter = ColoredFormatter("%(asctime)s - %(name)s - %(message)s")
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(level)
logger.handlers = [handler]
return logger
# Opprett global logger
logger: logging.Logger = setup_logger()

View File

View File

@ -0,0 +1,59 @@
import asyncio
import time
from typing import Optional
import uuid
from db.database import Database
from db.repository import claim_task, fetch_next_task, mark_failed, persist_event_and_mark_consumed
from models.event import MetadataSearchResultEvent
from worker.processor import process_task
from utils.logger import logger
from config.database_config import DatabaseConfig
from models.task import MetadataSearchTask, Task
def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int, int]:
"""
Kjør én iterasjon av poller-loopen.
Returnerer (sleep_interval, next_interval).
"""
try:
task: Optional[Task] = fetch_next_task(db)
if task:
if not isinstance(task, MetadataSearchTask):
logger.warning(f"⚠️ Ukjent task-type {type(task)} for {task.taskId}, hopper over.")
return poll_interval, poll_interval
if not claim_task(db, str(task.taskId), worker_id):
logger.info(f"⏩ Task {task.taskId} ble claimet av en annen worker.")
return poll_interval, poll_interval
logger.info(f"🔔 Fant task {task.taskId} ({task.task}), claimed by {worker_id}")
try:
event: MetadataSearchResultEvent = asyncio.run(process_task(task))
if event:
persist_event_and_mark_consumed(db, event, str(task.taskId))
logger.info(f"✅ Task {task.taskId} ferdig prosessert")
else:
logger.error(f"❌ Task returned nothing! {task.taskId}")
raise RuntimeError("process_task returned nothing!")
except Exception as task_error:
logger.error(f"❌ Task {task.taskId} feilet under prosessering: {task_error}")
mark_failed(db, str(task.taskId))
return poll_interval, 5 # sov med nåværende, reset til 5
else:
logger.debug("Ingen nye tasks.")
return poll_interval, min(poll_interval * 2, 60)
except Exception as e:
logger.error(f"⚠️ Feil i worker: {e}")
db.connect()
return poll_interval, 5
def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None:
poll_interval: int = 5
worker_id = f"worker-{uuid.uuid4()}"
while not shutdown_flag_ref():
sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval)
time.sleep(sleep_interval)
logger.info("👋 run_worker loop avsluttet")
db.close()

View File

@ -0,0 +1,128 @@
import uuid
from datetime import datetime
from tabulate import tabulate
from models.metadata import Metadata
from models.task import MetadataSearchTask
from utils.logger import logger
from models.event import (
EventMetadata,
MetadataSearchResultEvent,
SearchResult,
MetadataResult,
Summary,
TaskStatus,
MediaType,
)
from worker.search_runner import run_search
from algo.SimpleMatcher import SimpleMatcher
from algo.PrefixMatcher import PrefixMatcher
from algo.AdvancedMatcher import AdvancedMatcher
from db.repository import mark_failed
def source_priority(source: str) -> int:
"""Domene-spesifikk kildevekting."""
priority_map = {'mal': 1, 'anii': 2, 'imdb': 3}
return priority_map.get(source, 4)
def score_metadata_against_title(title, m: Metadata) -> SearchResult:
simple = SimpleMatcher(title, m).getScore()
prefix = PrefixMatcher(title, m).getScore()
advanced = AdvancedMatcher(title, m).getScore()
# IMPORTANT: map exactly to bannerImage, not banner.
metadata_result = MetadataResult(
source=m.source,
title=m.title,
alternateTitles=m.altTitle if m.altTitle else [],
cover=getattr(m, "cover", None),
bannerImage=getattr(m, "bannerImage", None), # no renaming
type=m.type, # must already be MediaType
summary=[Summary(language=s.language, description=s.summary) for s in m.summary],
genres=m.genres,
)
return SearchResult(
simpleScore=simple,
prefixScore=prefix,
advancedScore=advanced,
sourceWeight=1.0,
metadata=metadata_result
)
def print_summary(results: list[SearchResult], titles: list[str]) -> None:
"""Print tabell med scorer for alle kombinasjoner."""
rows = []
for r in results:
rows.append((
# NB: metadata.title er matched title, search_title kan du lagre i SearchResult hvis du vil
r.metadata.title,
r.metadata.source,
r.simpleScore,
r.prefixScore,
r.advancedScore
))
headers = ["Matched Title", "Source", "Simple", "Prefix", "Advanced"]
print(tabulate(rows, headers=headers))
def choose_recommended(results: list[SearchResult]) -> SearchResult:
"""Velg recommended basert på scorer og kildevekting."""
return max(
results,
key=lambda r: (
r.advancedScore,
r.simpleScore,
r.prefixScore,
-source_priority(r.metadata.source)
)
)
async def process_task(db, task: MetadataSearchTask) -> MetadataSearchResultEvent|None:
titles = task.data.searchTitles
logger.info(f"Prosesserer task {task.taskId} med titler: {titles}")
try:
metadata_list = await run_search(titles)
if not metadata_list:
mark_failed(db, task.taskId)
return
# 1) Score alle kombinasjoner
results = []
for m in metadata_list:
for t in titles:
results.append(score_metadata_against_title(t, m))
# 2) Print tabell
print_summary(results, titles)
# 3) Velg recommended
recommended = choose_recommended(results)
# 4) Bygg event
core_metadata = EventMetadata(
created=datetime.now(),
derivedFromId={task.referenceId, task.taskId}
)
event = MetadataSearchResultEvent(
referenceId=task.referenceId,
eventId=uuid.uuid4(),
metadata=core_metadata,
results=results,
recommended=recommended,
status=TaskStatus.COMPLETED
)
# 5) Returner
logger.info(f"✅ Task {task.taskId} ferdig prosessert med {len(results)} resultater")
return event
except Exception as e:
logger.error(f"❌ Task {task.taskId} feilet: {e}")
mark_failed(db, task.taskId)
return None

View File

@ -0,0 +1,31 @@
# search_runner.py
import asyncio
from typing import List
from models.metadata import Metadata
from utils.logger import logger
from sources.factory import get_all_sources
async def run_search(titles: List[str]) -> List[Metadata]:
"""
Kjører alle kilder parallelt gitt titler.
Returnerer en liste av Metadata fra alle kilder.
Ingen mapping eller scoring gjøres her.
"""
sources = get_all_sources(titles)
# Kjør alle kildesøk parallelt
results = await asyncio.gather(*(s.search() for s in sources), return_exceptions=True)
metadata_results: List[Metadata] = []
for source, r in zip(sources, results):
if isinstance(r, Exception):
logger.warning(
f"Kilde '{source.__class__.__name__}' feilet under søk "
f"med titler={source.titles}: {r}"
)
elif r is not None:
metadata_results.append(r)
logger.info(f"Søk ferdig: {len(metadata_results)} resultater fra {len(sources)} kilder")
return metadata_results

View File

@ -0,0 +1,19 @@
from fastapi import FastAPI
from fastapi.responses import JSONResponse
def create_health_app(observers_ref):
"""
Returnerer en FastAPI-app med /health endpoint.
observers_ref: en funksjon eller lambda som gir listen av observers.
"""
app = FastAPI()
@app.get("/health")
def health():
observers = observers_ref()
healthy = all(obs.is_alive() for obs in observers)
status = "healthy" if healthy else "unhealthy"
code = 200 if healthy else 500
return JSONResponse({"status": status}, status_code=code)
return app