From 72b2d30fab3cbf1245b08efb50a23749fbb74fd5 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 13 Jul 2024 17:44:22 +0200 Subject: [PATCH] v3 - db polling for pyMetadata --- apps/pyMetadata/app.py | 185 ++++++++++++++++++------------- apps/pyMetadata/clazz/shared.py | 123 +++++++++----------- apps/pyMetadata/requirements.txt | 4 +- 3 files changed, 164 insertions(+), 148 deletions(-) diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index 9a52154f..817a0388 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -8,11 +8,13 @@ import threading import json import time from fuzzywuzzy import fuzz +import mysql.connector +from datetime import datetime from algo.AdvancedMatcher import AdvancedMatcher from algo.SimpleMatcher import SimpleMatcher from algo.PrefixMatcher import PrefixMatcher -from clazz.shared import ConsumerRecord, MediaEvent, decode_key, decode_value, suppress_ignore, consume_on_key +from clazz.shared import EventMetadata, MediaEvent, event_data_to_json, json_to_media_event from clazz.KafkaMessageSchema import KafkaMessage, MessageDataWrapper from clazz.Metadata import Metadata from kafka import KafkaConsumer, KafkaProducer @@ -21,11 +23,22 @@ from sources.anii import Anii from sources.imdb import Imdb from sources.mal import Mal + + + # Konfigurer Kafka-forbindelsen bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092" consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"MetadataConsumer" kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents" +events_server_address = os.environ.get("DATABASE_ADDRESS") or "127.0.0.1" +events_server_port = os.environ.get("DATABASE_PORT") or "3306" +events_server_database_name = os.environ.get("DATABASE_NAME_E") or "events" +events_server_username = os.environ.get("DATABASE_USERNAME") or "root" +events_server_password = os.environ.get("DATABASE_PASSWORD") or "root" + + + # Konfigurer logging logging.basicConfig( @@ -38,54 +51,50 @@ logging.basicConfig( logger = logging.getLogger(__name__) -# Kafka consumer-klasse -class KafkaConsumerThread(threading.Thread): - def __init__(self, bootstrap_servers, topic, consumer_group): +class EventsPullerThread(threading.Thread): + connector = None + def __init__(self): super().__init__() - self.bootstrap_servers = bootstrap_servers - self.consumer_group = consumer_group - self.topic = topic self.shutdown = threading.Event() - def run(self): - consumer = None - try: - consumer = KafkaConsumer( - self.topic, - bootstrap_servers=self.bootstrap_servers, - group_id=self.consumer_group, - key_deserializer=lambda x: decode_key(x), - value_deserializer=lambda x: decode_value(x) - ) - logger.info("Kafka Consumer started") - - except: - logger.exception("Kafka Consumer failed to start") - self.stop() - #sys.exit(1) - - + def run(self) -> None: while not self.shutdown.is_set(): - for cm in consumer: - if self.shutdown.is_set(): - break - message: ConsumerRecord = ConsumerRecord(cm) - - - # Sjekk om meldingen har målnøkkelen - if message.key in consume_on_key: - logger.info("==> Incoming message: %s \n%s", message.key, message.value) - # Opprett en ny tråd for å håndtere meldingen - handler_thread = MessageHandlerThread(message) + connection = None + cursor = None + try: + connection = mysql.connector.connect( + host=events_server_address, + port=events_server_port, + database=events_server_database_name, + user=events_server_username, + password=events_server_password + ) + cursor = connection.cursor(dictionary=True) + cursor.execute(""" + SELECT e1.* + FROM events e1 + LEFT JOIN events e2 + ON e1.referenceId = e2.referenceId + AND e2.event = 'event:media-metadata-search:performed' + WHERE e1.event = 'event:media-read-base-info:performed' + AND e2.referenceId IS NULL; + """) + # not event:media-metadata-search:performed + for row in cursor.fetchall(): + if self.shutdown.is_set(): + break + handler_thread = MessageHandlerThread(row) handler_thread.start() - else: - if (message.key not in suppress_ignore): - logger.debug("Ignored message: key=%s", message.key) - # Introduce a small sleep to reduce CPU usage - time.sleep(1) - if consumer is not None: - consumer.close() - logger.info("Kafka Consumer stopped") + + # Introduce a small sleep to reduce CPU usage + time.sleep(5000) + except mysql.connector.Error as err: + logger.error("Database error: %s", err) + finally: + if cursor: + cursor.close() + if connection: + connection.close() def stop(self): self.shutdown.set() @@ -94,29 +103,25 @@ class KafkaConsumerThread(threading.Thread): # Kafka message handler-klasse class MessageHandlerThread(threading.Thread): - producerMessageKey = "event:media-metadata-search:performed" - def __init__(self, message: ConsumerRecord): + mediaEvent: MediaEvent|None = None + def __init__(self, row): super().__init__() - self.message = message + self.mediaEvent = json_to_media_event(json.loads(row['data'])) def run(self): - - mediaEvent = MediaEvent(message=self.message) - - if mediaEvent.data is None: - logger.error("No data present for %s", self.message.value) - return - if mediaEvent.isConsumable() == False: - logger.info("Message status is not of 'COMPLETED', %s", self.message.value) + if (self.mediaEvent is None): + logger.error("Event does not contain anything...") return + + event: MediaEvent = self.mediaEvent - logger.info("Processing record: key=%s, value=%s", self.message.key, self.message.value) + logger.info("Processing event: event=%s, value=%s", event.eventType, event) - searchableTitles: List[str] = mediaEvent.data["searchTitles"] + searchableTitles: List[str] = event.data.searchTitles searchableTitles.extend([ - mediaEvent.data["title"], - mediaEvent.data["sanitizedName"] + event.data.title, + event.data.sanitizedName ]) @@ -129,26 +134,22 @@ class MessageHandlerThread(threading.Thread): result_message = f"No result for {joinedTitles}" logger.info(result_message) - messageData = MessageDataWrapper( - status = "ERROR" if result is None else "COMPLETED", - message = result_message, - data = result, - derivedFromEventId = mediaEvent.eventId + + producedEvent = MediaEvent( + metadata = EventMetadata( + referenceId=event.metadata.referenceId, + eventId=str(uuid.uuid4()), + derivedFromEventId=event.metadata.eventId, + status= "Failed" if result is None else "Success", + created= datetime.now().isoformat() + ), + data=result, + eventType="event:media-metadata-search:performed" ) - producerMessage = KafkaMessage(referenceId=mediaEvent.referenceId, data=messageData).to_json() - - logger.info("<== Outgoing message: %s \n%s", self.producerMessageKey, producerMessage) - - # Send resultatet tilbake ved hjelp av Kafka-producer - producer = KafkaProducer( - bootstrap_servers=bootstrap_servers, - key_serializer=lambda k: k.encode('utf-8') if isinstance(k, str) else None, - value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else None - ) - producer.send(kafka_topic, key=self.producerMessageKey, value=producerMessage) - producer.close() - + + logger.info("<== Outgoing message: %s \n%s", event.eventType, event_data_to_json(producedEvent)) + self.insert_into_database(producedEvent) @@ -176,6 +177,34 @@ class MessageHandlerThread(threading.Thread): if prefixSelector is not None: return prefixSelector return None + + def insert_into_database(self, event: MediaEvent): + try: + connection = mysql.connector.connect( + host=events_server_address, + port=events_server_port, + database=events_server_database_name, + user=events_server_username, + password=events_server_password + ) + cursor = connection.cursor() + + query = """ + INSERT INTO events (referenceId, eventId, event, data) + VALUES (%s, %s, %s, %s) + """ + cursor.execute(query, ( + event.metadata.referenceId, + event.metadata.eventId, + event.eventType, + event_data_to_json(event) + )) + connection.commit() + cursor.close() + connection.close() + logger.info("Storing event") + except mysql.connector.Error as err: + logger.error("Error inserting into database: %s", err) # Global variabel for å indikere om applikasjonen skal avsluttes @@ -193,7 +222,7 @@ def main(): signal.signal(signal.SIGINT, signal_handler) # Opprett og start consumer-tråden - consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic, consumer_group) + consumer_thread = EventsPullerThread() consumer_thread.start() logger.info("App started") diff --git a/apps/pyMetadata/clazz/shared.py b/apps/pyMetadata/clazz/shared.py index ced213ba..ddafec73 100644 --- a/apps/pyMetadata/clazz/shared.py +++ b/apps/pyMetadata/clazz/shared.py @@ -1,74 +1,61 @@ - -from typing import Any, List import json +from dataclasses import dataclass +from typing import Any, List, Optional +from datetime import datetime - -suppress_ignore: List[str] = [ - "event:media-process:started", - "event:request-process:started", - "event::save", - "event:media-process:completed", - "event:work-encode:created", - "event:work-extract:created", - "event:work-convert:created", - "event:work-encode:performed", - "event:work-extract:performed", - "event:work-convert:performed", - "event:media-read-out-cover:performed", - "event:work-download-cover:performed", - "event:media-read-out-name-and-type:performed", - "event:media-parse-stream:performed", - "event:media-extract-parameter:created", - "event:media-encode-parameter:created", - "event:media-metadata-search:performed" -] - -consume_on_key: List[str] = [ - "request:metadata:obtain", - "event:media-read-base-info:performed" -] - -def decode_key(key_bytes: bytes | None): - return key_bytes.decode('utf-8') if key_bytes else None - -def decode_value(value_bytes: bytes | None): - return json.loads(value_bytes.decode('utf-8')) if value_bytes else None - - - -class ConsumerRecord: - topic: str - partition: int - offset: int - key: str - value: str | None - timestamp: int - - def __init__(self, message: Any) -> None: - if message is not None: - self.key = message.key - self.value = message.value - self.topic = message.topic - self.offset = message.offset - self.partition = message.partition - self.timestamp = message.timestamp - - -class MediaEvent(): - __consumerRecord: ConsumerRecord - referenceId: str +# Definer dataclassene for strukturen +@dataclass +class EventMetadata: + derivedFromEventId: str eventId: str - data: dict | None + referenceId: str + status: str + created: datetime - def __init__(self, message: ConsumerRecord) -> None: - self.__consumerRecord = message - self.referenceId = message.value["referenceId"] - self.eventId = message.value["eventId"] - self.data = message.value["data"] if "data" in message.value else None +@dataclass +class EventData: + title: str + sanitizedName: str + searchTitles: List[str] - def isConsumable(self) -> bool: - if "status" in self.data: - if self.data["status"] == "COMPLETED": - return True - return False +@dataclass +class MediaEvent: + metadata: EventMetadata + eventType: str + data: Any| EventData +# Funksjon for å parse datetime fra streng +def parse_datetime(datetime_str: str) -> datetime: + return datetime.fromisoformat(datetime_str) + +def event_data_to_json(event_data: EventData) -> str: + return json.dumps(event_data.__dict__) + +# Funksjon for å konvertere JSON til klasser +def json_to_media_event(json_data: str) -> MediaEvent: + data_dict = json.loads(json_data) + + metadata_dict = data_dict['metadata'] + event_data_dict = data_dict['data'] + + metadata = EventMetadata( + derivedFromEventId=metadata_dict['derivedFromEventId'], + eventId=metadata_dict['eventId'], + referenceId=metadata_dict['referenceId'], + status=metadata_dict['status'], + created=parse_datetime(metadata_dict['created']) + ) + + event_data = EventData( + title=event_data_dict['title'], + sanitizedName=event_data_dict['sanitizedName'], + searchTitles=event_data_dict['searchTitles'] + ) + + media_event = MediaEvent( + metadata=metadata, + eventType=data_dict['eventType'], + data=event_data + ) + + return media_event \ No newline at end of file diff --git a/apps/pyMetadata/requirements.txt b/apps/pyMetadata/requirements.txt index 018a69c6..c4037f8d 100644 --- a/apps/pyMetadata/requirements.txt +++ b/apps/pyMetadata/requirements.txt @@ -1,9 +1,9 @@ cinemagoer>=2023.5.1 AnilistPython>=0.1.3 -kafka-python>=2.0.2 fuzzywuzzy>=0.18.0 requests>=2.31.0 python-Levenshtein>=0.21.1 mal-api>=0.5.3 Unidecode>=1.3.8 -tabulate>=0.9.0 \ No newline at end of file +tabulate>=0.9.0 +mysql-connector-python>=9.0.0 \ No newline at end of file