Refactor python

This commit is contained in:
bskjon 2024-06-18 18:40:37 +02:00
parent 583c0613a5
commit e46029bedd
8 changed files with 275 additions and 364 deletions

View File

@ -7,15 +7,19 @@ import uuid
import threading
import json
import time
from kafka import KafkaConsumer, KafkaProducer
from shared import ConsumerRecord, MediaEvent, decode_key, decode_value, suppress_ignore, consume_on_key
from fuzzywuzzy import fuzz
from sources.result import DataResult, Metadata
from sources.anii import metadata as AniiMetadata
from sources.imdb import metadata as ImdbMetadata
from sources.mal import metadata as MalMetadata
from sources.cache import ResultCache
from sources.select import UseSource
from algo.AdvancedMatcher import AdvancedMatcher
from algo.SimpleMatcher import SimpleMatcher
from algo.PrefixMatcher import PrefixMatcher
from clazz.KafkaMessageSchema import KafkaMessage, MessageDataWrapper
from clazz.Metadata import Metadata
from kafka import KafkaConsumer, KafkaProducer
from sources.anii import Anii
from sources.imdb import Imdb
from sources.mal import Mal
# Konfigurer Kafka-forbindelsen
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092"
@ -23,26 +27,6 @@ consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"MetadataConsumer"
kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents"
suppress_ignore: List[str] = [
"event:media-process:started",
"event:request-process:started",
"event::save",
"event:media-process:completed",
"event:work-encode:created",
"event:work-extract:created",
"event:work-convert:created",
"event:work-encode:performed",
"event:work-extract:performed",
"event:work-convert:performed",
"event:media-read-out-cover:performed",
"event:work-download-cover:performed",
"event:media-read-out-name-and-type:performed",
"event:media-parse-stream:performed",
"event:media-extract-parameter:created",
"event:media-encode-parameter:created",
"event:media-metadata-search:performed"
]
# Konfigurer logging
logging.basicConfig(
level=logging.INFO,
@ -53,28 +37,6 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
class ProducerDataValueSchema:
def __init__(self, referenceId, data: DataResult):
self.referenceId = referenceId
self.data = data
def to_dict(self):
return {
'referenceId': self.referenceId,
'eventId': str(uuid.uuid4()),
'data': self.data.to_dict() if self.data else None
}
def to_json(self):
data_dict = self.to_dict()
return json.dumps(data_dict)
def decode_key(key_bytes):
return key_bytes.decode('utf-8') if key_bytes else None
def decode_value(value_bytes):
return json.loads(value_bytes.decode('utf-8')) if value_bytes else None
# Kafka consumer-klasse
class KafkaConsumerThread(threading.Thread):
@ -104,21 +66,21 @@ class KafkaConsumerThread(threading.Thread):
while not self.shutdown.is_set():
for message in consumer:
for cm in consumer:
if self.shutdown.is_set():
break
message: ConsumerRecord = ConsumerRecord(cm)
# Sjekk om meldingen har målnøkkelen
if message.key == "request:metadata:obtain" or message.key == "event:media-read-base-info:performed":
if message.key in consume_on_key:
logger.info("==> Incoming message: %s \n%s", message.key, message.value)
# Opprett en ny tråd for å håndtere meldingen
handler_thread = MessageHandlerThread(message)
handler_thread.start()
else:
if (message.key not in suppress_ignore):
logger.info("Ignored message: key=%s", message.key)
logger.debug("Ignored message: key=%s", message.key)
# Introduce a small sleep to reduce CPU usage
time.sleep(1)
if consumer is not None:
@ -132,89 +94,86 @@ class KafkaConsumerThread(threading.Thread):
# Kafka message handler-klasse
class MessageHandlerThread(threading.Thread):
def __init__(self, message):
producerMessageKey = "event:media-metadata-search:performed"
def __init__(self, message: ConsumerRecord):
super().__init__()
self.message = message
def run(self):
logger.info("Handling message: key=%s, value=%s", self.message.key, self.message.value)
if 'data' not in self.message.value:
logger.error("data is not present in message!")
messageData = self.message.value["data"]
# Sjekk om meldingen har en Status
if 'status' in messageData:
status_type = messageData['status']
# Sjekk om statusen er COMPLETED
if status_type == 'COMPLETED':
baseName = messageData["sanitizedName"]
title = messageData["title"]
mediaEvent = MediaEvent(message=self.message)
eventId = self.message.value["eventId"]
logger.info("Searching for %s", title)
result = self.get_metadata(title, baseName, eventId)
if (result is None):
logger.info("No result for %s or %s", title, baseName)
producerMessage = self.compose_message(referenceId=self.message.value["referenceId"], result=result)
# Serialiser resultatet til JSON som strenger
result_json = json.dumps(producerMessage.to_dict())
# Send resultatet tilbake ved hjelp av Kafka-producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
key_serializer=lambda k: k.encode('utf-8') if isinstance(k, str) else None,
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else None
)
producer.send(kafka_topic, key="event:media-metadata-search:performed", value=result_json)
producer.close()
else:
logger.info("Message status is not of 'COMPLETED', %s", self.message.value)
else:
logger.warn("No status present for %s", self.message.value)
def get_metadata(self, name: str, baseName: str, evnetId: str) -> Optional[DataResult]:
result = None
logger.info("Checking cache")
titleCache = ResultCache.get(name)
if (titleCache is None):
titleCache = UseSource(title=name, eventId=evnetId).select_result()
if titleCache is not None:
logger.info("Storing response for %s in in-memory cache", name)
ResultCache.add(title=name, result=titleCache)
else:
logger.info("Cache hit for %s", name)
baseNameCache = ResultCache.get(baseName)
if (baseNameCache is None):
baseNameCache = UseSource(title=baseName, eventId=evnetId).select_result()
if baseNameCache is not None:
logger.info("Storing response for %s in in-memory cache", baseName)
ResultCache.add(title=baseName, result=baseNameCache)
else:
logger.info("Cache hit for %s", baseName)
if titleCache is not None and baseNameCache is not None:
if (titleCache.data.type.lower() == "movie" or baseNameCache.data.type.lower() == "movie"):
result = baseNameCache
else:
result = titleCache
elif titleCache is not None:
result = titleCache
elif baseNameCache is not None:
result = baseNameCache
return result
if mediaEvent.data is None:
logger.error("No data present for %s", self.message.value)
return
if mediaEvent.isConsumable() == False:
logger.info("Message status is not of 'COMPLETED', %s", self.message.value)
return
logger.info("Processing record: key=%s, value=%s", self.message.key, self.message.value)
def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema:
return ProducerDataValueSchema(
referenceId=referenceId,
data=result
searchableTitles: List[str] = mediaEvent.data["searchTitles"]
joinedTitles = ", ".join(searchableTitles)
logger.info("Searching for %s", joinedTitles)
result: Metadata | None = self.__getMetadata(searchableTitles)
result_message: str | None = None
if (result is None):
result_message = f"No result for {joinedTitles}"
logger.info(result_message)
messageData = MessageDataWrapper(
status = "ERROR" if result is None else "COMPLETED",
message = result_message,
data = result,
derivedFromEventId = mediaEvent.eventId
)
producerMessage = KafkaMessage(referenceId=mediaEvent.referenceId, data=messageData).to_json()
# Serialiser resultatet til JSON som strenger
result_json = json.dumps(producerMessage)
logger.info("<== Outgoing message: %s \n%s", self.producerMessageKey, result_json)
# Send resultatet tilbake ved hjelp av Kafka-producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
key_serializer=lambda k: k.encode('utf-8') if isinstance(k, str) else None,
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else None
)
producer.send(kafka_topic, key=self.producerMessageKey, value=result_json)
producer.close()
def __getMetadata(self, titles: List[str]) -> Metadata | None:
mal = Mal(titles=titles)
anii = Anii(titles=titles)
imdb = Imdb(titles=titles)
results: List[Metadata] = [
mal.search(),
anii.search(),
imdb.search()
]
filtered_results = [result for result in results if result is not None]
logger.info("Simple matcher")
simpleSelector = SimpleMatcher(titles=titles, metadata=filtered_results).getBestMatch()
logger.info("Advanced matcher")
advancedSelector = AdvancedMatcher(titles=titles, metadata=filtered_results).getBestMatch()
logger.info("Prefrix matcher")
prefixSelector = PrefixMatcher(titles=titles, metadata=filtered_results).getBestMatch()
if simpleSelector is not None:
return simpleSelector
if advancedSelector is not None:
return advancedSelector
if prefixSelector is not None:
return prefixSelector
return None
# Global variabel for å indikere om applikasjonen skal avsluttes
should_stop = False

View File

@ -5,4 +5,5 @@ fuzzywuzzy>=0.18.0
requests>=2.31.0
python-Levenshtein>=0.21.1
mal-api>=0.5.3
Unidecode>=1.3.8
Unidecode>=1.3.8
tabulate>=0.9.0

View File

@ -1,40 +1,83 @@
import logging, sys
import hashlib
from typing import List
from clazz.Metadata import Metadata, Summary
from .source import SourceBase
from AnilistPython import Anilist
from .result import Metadata, DataResult, Summary
class metadata():
name: str = None
anilist = Anilist()
log = logging.getLogger(__name__)
def __init__(self, name) -> None:
self.name = name
def lookup(self) -> DataResult:
""""""
class Anii(SourceBase):
def __init__(self, titles: List[str]) -> None:
super().__init__(titles)
def search(self) -> Metadata | None:
idToTitle: dict[str, str] = {}
results: dict[str, str] = {}
try:
result = self.anilist.get_anime(self.name)
for title in self.titles:
try:
result = Anilist().get_anime(title)
if result:
_title = result.get("name_english", None)
givenId = self.generate_id(_title)
idToTitle[givenId] = _title
results[givenId] = result
except IndexError as notFound:
pass
except Exception as e:
log.exception(e)
except IndexError as notFound:
self.logNoMatch("Anii", titles=self.titles)
pass
except Exception as e:
log.exception(e)
meta = Metadata(
if not idToTitle or not results:
self.logNoMatch("Anii", titles=self.titles)
return None
best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles)
return self.__getMetadata(results[best_match_id])
def queryIds(self, title: str) -> dict[str, str]:
return super().queryIds(title)
def __getMetadata(self, result: dict) -> Metadata:
try:
summary = result.get("desc", None)
return Metadata(
title = result.get("name_english", None),
altTitle = [result.get("name_romaji", [])],
cover = result.get("cover_image", None),
summary = [
banner = None,
summary = [] if summary is None else [
Summary(
language = "eng",
summary = result.get("desc", None)
summary = summary
)
],
type = 'movie' if result.get('airing_format', '').lower() == 'movie' else 'serie',
type = self.getMediaType(result.get('airing_format', '')),
genres = result.get('genres', []),
source="anii",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult(status="COMPLETED", message= None, data= None)
return DataResult(status="COMPLETED", message= None, data=meta)
except IndexError as ingore:
return DataResult(status="COMPLETED", message=f"No result for {self.name}")
except Exception as e:
return DataResult(status="ERROR", message=str(e))
log.exception(e)
return None
def generate_id(self, text: str):
return hashlib.md5(text.encode()).hexdigest()
def getMediaType(self, type: str) -> str:
return 'movie' if type.lower() == 'movie' else 'serie'

View File

@ -1,14 +0,0 @@
from typing import Optional
from .result import DataResult
class ResultCache:
_cache = {}
@classmethod
def add(cls, title: str, result: DataResult):
cls._cache[title] = result
@classmethod
def get(cls, title) -> Optional[DataResult]:
return cls._cache.get(title)

View File

@ -1,38 +1,70 @@
import imdb
from .result import Metadata, DataResult, Summary
import logging
from imdb import Cinemagoer
from imdb.Movie import Movie
class metadata():
name: str = None
imdb = imdb.Cinemagoer()
from typing import List
def __init__(self, name) -> None:
self.name = name
from clazz.Metadata import Metadata, Summary
from .source import SourceBase
log = logging.getLogger(__name__)
class Imdb(SourceBase):
def __init__(self, titles: List[str]) -> None:
super().__init__(titles)
def search(self) -> Metadata | None:
idToTitle: dict[str, str] = {}
for title in self.titles:
receivedIds = self.queryIds(title)
for id, title in receivedIds.items():
idToTitle[id] = title
if not idToTitle:
self.logNoMatch("Imdb", titles=self.titles)
return None
best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles)
return self.__getMetadata(best_match_id)
def lookup(self) -> DataResult:
""""""
def queryIds(self, title: str) -> dict[str, str]:
idToTitle: dict[str, str] = {}
try:
query = self.imdb.search_movie(self.name)
imdbId = query[0].movieID
result = self.imdb.get_movie(imdbId)
meta = Metadata(
search = Cinemagoer().search_movie(title)
cappedResult: List[Movie] = search[:5]
usable: List[Movie] = [found for found in cappedResult if self.isMatchOrPartial("Imdb", title, found._getitem("title"))]
for item in usable:
idToTitle[item.movieID] = item._getitem("title")
except Exception as e:
log.exception(e)
return idToTitle
def __getMetadata(self, id: str) -> Metadata | None:
try:
result = Cinemagoer().get_movie(id)
summary = result.get("plot outline", None)
return Metadata(
title = result.get("title", None),
altTitle = [result.get("localized title", [])],
cover = result.get("cover url", None),
summary = [
banner = None,
summary = [] if summary is None else [
Summary(
language = "eng",
summary = result.get("plot outline", None)
summary = summary
)
],
type = 'movie' if result.get('kind', '').lower() == 'movie' else 'serie',
type = self.getMediaType(result.get('kind', '')),
genres = result.get('genres', []),
source="imdb",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult(status="COMPLETED", message= None, data= None)
return DataResult(status="COMPLETED", message= None, data= meta)
except Exception as e:
return DataResult(status="ERROR", data=None, message=str(e))
log.exception(e)
return None
def getMediaType(self, type: str) -> str:
return 'movie' if type.lower() == 'movie' else 'serie'

View File

@ -1,36 +1,71 @@
from mal import *
from .result import Metadata, DataResult, Summary
import logging, sys
from typing import List
class metadata():
name: str = None
from clazz.Metadata import Metadata, Summary
from .source import SourceBase
from mal import Anime, AnimeSearch, AnimeSearchResult
log = logging.getLogger(__name__)
class Mal(SourceBase):
""""""
def __init__(self, titles: List[str]) -> None:
super().__init__(titles)
def search(self) -> Metadata | None:
idToTitle: dict[str, str] = {}
for title in self.titles:
receivedIds = self.queryIds(title)
for id, title in receivedIds.items():
idToTitle[id] = title
if not idToTitle:
self.logNoMatch("MAL", titles=self.titles)
return None
best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles)
return self.__getMetadata(best_match_id)
def __init__(self, name: str) -> None:
self.name = name
def queryIds(self, title: str) -> dict[str, str]:
idToTitle: dict[str, str] = {}
def lookup(self) -> DataResult:
try:
search = AnimeSearch(self.name)
if (len(search.results) == 0):
return DataResult(status="SKIPPED", message="No results")
anime = Anime(search.results[0].mal_id)
meta = Metadata(
title = anime.title,
altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName],
cover = anime.image_url,
summary = [
Summary(
language = "eng",
summary = anime.synopsis
)
],
type = 'movie' if anime.type.lower() == 'movie' else 'serie',
genres = anime.genres,
source="mal",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult(status="COMPLETED", message = None, data = None)
return DataResult(status = "COMPLETED", message = None, data = meta)
search = AnimeSearch(title)
cappedResult: List[AnimeSearchResult] = search.results[:5]
usable: List[AnimeSearchResult] = [found for found in cappedResult if self.isMatchOrPartial("MAL", title, found.title)]
for item in usable:
log.info(f"malId: {item.mal_id} to {item.title}")
idToTitle[item.mal_id] = item.title
except Exception as e:
return DataResult(status="ERROR", message=str(e))
log.exception(e)
return idToTitle
def __getMetadata(self, id: str):
try:
anime = Anime(id)
return Metadata(
title = anime.title,
altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName],
cover = anime.image_url,
banner = None,
summary = [] if anime.synopsis is None else [
Summary(
language = "eng",
summary = anime.synopsis
)
],
type = self.getMediaType(anime.type),
genres = anime.genres,
source="mal",
)
except Exception as e:
log.exception(e)
return None
def getMediaType(self, type: str) -> str:
return 'movie' if type.lower() == 'movie' else 'serie'

View File

@ -1,33 +0,0 @@
from typing import List, Optional
from dataclasses import dataclass, asdict
@dataclass
class Summary:
summary: str
language: str
def to_dict(self):
return asdict(self)
@dataclass
class Metadata:
title: str
altTitle: List[str]
cover: str
type: str # Serie/Movie
summary: List[Summary]
genres: List[str]
source: str
usedTitle: str
def to_dict(self):
return asdict(self)
@dataclass
class DataResult:
status: str # COMPLETED / ERROR
message: str | None = None
data: Metadata = None
def to_dict(self):
return asdict(self)

View File

@ -1,112 +0,0 @@
import logging
from dataclasses import dataclass, asdict
from typing import List, Optional
from .result import Metadata, DataResult
from .anii import metadata as AniiMetadata
from .imdb import metadata as ImdbMetadata
from .mal import metadata as MalMetadata
from fuzzywuzzy import fuzz
from unidecode import unidecode
import json
import re
logger = logging.getLogger(__name__)
@dataclass
class WeightedData:
result: DataResult
weight: int = 1
def to_dict(self):
return asdict(self)
@dataclass
class DataAndScore:
result: DataResult = None
score: int = 0
def to_dict(self):
return asdict(self)
class UseSource():
title: str
eventId: str
def __init__(self, title, eventId) -> None:
self.title = title
self.eventId = eventId
def stripped(self, input_string) -> str:
unitext = unidecode(input_string)
unitext = re.sub(r'[^a-zA-Z0-9\s]', ' ', unitext)
unitext = re.sub(r'\s{2,}', ' ', unitext)
return unitext
def __perform_search(self, title)-> List[WeightedData]:
anii = AniiMetadata(title).lookup()
imdb = ImdbMetadata(title).lookup()
mal = MalMetadata(title).lookup()
result: List[WeightedData] = []
if (anii is not None) and (anii.status == "COMPLETED" and anii.data is not None):
result.append(WeightedData(anii, 1.2))
if (imdb is not None) and (imdb.status == "COMPLETED" and imdb.data is not None):
imdb_weight = 1
if (imdb.data.title == title or self.stripped(imdb.data.title) == self.stripped(title)):
imdb_weight = 100
result.append(WeightedData(imdb, imdb_weight))
if (mal is not None) and (mal.status == "COMPLETED" and mal.data is not None):
result.append(WeightedData(mal, 1.8))
return result
def __calculate_score(self, title: str, weightData: List[WeightedData]) -> List[DataAndScore]:
""""""
result: List[WeightedData] = []
for wd in weightData:
highScore = fuzz.ratio(self.stripped(title.lower()), self.stripped(wd.result.data.title.lower()))
logger.info(f"[H:{highScore}]\t{self.stripped(wd.result.data.title.lower())} => {self.stripped(title.lower())}")
for name in wd.result.data.altTitle:
altScore = fuzz.ratio(self.stripped(title.lower()), self.stripped(name.lower()))
if (altScore > highScore):
highScore = altScore
logger.info(f"[A:{highScore}]\t{self.stripped(wd.result.data.title.lower())} => {self.stripped(title.lower())}")
givenScore = highScore * wd.weight
logger.info(f"[G:{givenScore}]\t{self.stripped(wd.result.data.title.lower())} => {self.stripped(title.lower())} Weight:{wd.weight}")
result.append(DataAndScore(wd.result, givenScore))
return result
def select_result(self) -> Optional[DataResult]:
""""""
result = self.__perform_search(title=self.title)
scoredResult = self.__calculate_score(title=self.title, weightData=result)
scoredResult.sort(key=lambda x: x.score, reverse=True)
selected: DataResult|None = scoredResult[0].result if len(scoredResult) > 0 else None
jsr = ""
try:
jsr = json.dumps([obj.to_dict() for obj in scoredResult], indent=4)
with open(f"./logs/{self.eventId}-{self.title}.json", "w", encoding="utf-8") as f:
f.write(jsr)
except Exception as e:
logger.info("Couldn't dump log..")
logger.error(e)
logger.info(jsr)
try:
titles: List[str] = []
for wd in scoredResult:
titles.append(wd.result.data.title)
titles.extend(wd.result.data.altTitle)
joinedTitles = "\n\t" + "\n\t".join(titles)
logger.info(f"\nName: {self.title} \n \nFound: {joinedTitles} \nTitle selected: \n\t{selected.data.title} \n")
except Exception as e:
logger.error(e)
pass
# Return the result with the highest score (most likely result)
return selected