v3 - db polling for pyMetadata

This commit is contained in:
bskjon 2024-07-13 17:44:22 +02:00
parent c58b00a236
commit 72b2d30fab
3 changed files with 164 additions and 148 deletions

View File

@ -8,11 +8,13 @@ import threading
import json
import time
from fuzzywuzzy import fuzz
import mysql.connector
from datetime import datetime
from algo.AdvancedMatcher import AdvancedMatcher
from algo.SimpleMatcher import SimpleMatcher
from algo.PrefixMatcher import PrefixMatcher
from clazz.shared import ConsumerRecord, MediaEvent, decode_key, decode_value, suppress_ignore, consume_on_key
from clazz.shared import EventMetadata, MediaEvent, event_data_to_json, json_to_media_event
from clazz.KafkaMessageSchema import KafkaMessage, MessageDataWrapper
from clazz.Metadata import Metadata
from kafka import KafkaConsumer, KafkaProducer
@ -21,11 +23,22 @@ from sources.anii import Anii
from sources.imdb import Imdb
from sources.mal import Mal
# Konfigurer Kafka-forbindelsen
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092"
consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"MetadataConsumer"
kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents"
events_server_address = os.environ.get("DATABASE_ADDRESS") or "127.0.0.1"
events_server_port = os.environ.get("DATABASE_PORT") or "3306"
events_server_database_name = os.environ.get("DATABASE_NAME_E") or "events"
events_server_username = os.environ.get("DATABASE_USERNAME") or "root"
events_server_password = os.environ.get("DATABASE_PASSWORD") or "root"
# Konfigurer logging
logging.basicConfig(
@ -38,54 +51,50 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
# Kafka consumer-klasse
class KafkaConsumerThread(threading.Thread):
def __init__(self, bootstrap_servers, topic, consumer_group):
class EventsPullerThread(threading.Thread):
connector = None
def __init__(self):
super().__init__()
self.bootstrap_servers = bootstrap_servers
self.consumer_group = consumer_group
self.topic = topic
self.shutdown = threading.Event()
def run(self):
consumer = None
try:
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.consumer_group,
key_deserializer=lambda x: decode_key(x),
value_deserializer=lambda x: decode_value(x)
)
logger.info("Kafka Consumer started")
except:
logger.exception("Kafka Consumer failed to start")
self.stop()
#sys.exit(1)
def run(self) -> None:
while not self.shutdown.is_set():
for cm in consumer:
connection = None
cursor = None
try:
connection = mysql.connector.connect(
host=events_server_address,
port=events_server_port,
database=events_server_database_name,
user=events_server_username,
password=events_server_password
)
cursor = connection.cursor(dictionary=True)
cursor.execute("""
SELECT e1.*
FROM events e1
LEFT JOIN events e2
ON e1.referenceId = e2.referenceId
AND e2.event = 'event:media-metadata-search:performed'
WHERE e1.event = 'event:media-read-base-info:performed'
AND e2.referenceId IS NULL;
""")
# not event:media-metadata-search:performed
for row in cursor.fetchall():
if self.shutdown.is_set():
break
message: ConsumerRecord = ConsumerRecord(cm)
# Sjekk om meldingen har målnøkkelen
if message.key in consume_on_key:
logger.info("==> Incoming message: %s \n%s", message.key, message.value)
# Opprett en ny tråd for å håndtere meldingen
handler_thread = MessageHandlerThread(message)
handler_thread = MessageHandlerThread(row)
handler_thread.start()
else:
if (message.key not in suppress_ignore):
logger.debug("Ignored message: key=%s", message.key)
# Introduce a small sleep to reduce CPU usage
time.sleep(1)
if consumer is not None:
consumer.close()
logger.info("Kafka Consumer stopped")
time.sleep(5000)
except mysql.connector.Error as err:
logger.error("Database error: %s", err)
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def stop(self):
self.shutdown.set()
@ -94,29 +103,25 @@ class KafkaConsumerThread(threading.Thread):
# Kafka message handler-klasse
class MessageHandlerThread(threading.Thread):
producerMessageKey = "event:media-metadata-search:performed"
def __init__(self, message: ConsumerRecord):
mediaEvent: MediaEvent|None = None
def __init__(self, row):
super().__init__()
self.message = message
self.mediaEvent = json_to_media_event(json.loads(row['data']))
def run(self):
mediaEvent = MediaEvent(message=self.message)
if mediaEvent.data is None:
logger.error("No data present for %s", self.message.value)
return
if mediaEvent.isConsumable() == False:
logger.info("Message status is not of 'COMPLETED', %s", self.message.value)
if (self.mediaEvent is None):
logger.error("Event does not contain anything...")
return
logger.info("Processing record: key=%s, value=%s", self.message.key, self.message.value)
event: MediaEvent = self.mediaEvent
logger.info("Processing event: event=%s, value=%s", event.eventType, event)
searchableTitles: List[str] = mediaEvent.data["searchTitles"]
searchableTitles: List[str] = event.data.searchTitles
searchableTitles.extend([
mediaEvent.data["title"],
mediaEvent.data["sanitizedName"]
event.data.title,
event.data.sanitizedName
])
@ -129,26 +134,22 @@ class MessageHandlerThread(threading.Thread):
result_message = f"No result for {joinedTitles}"
logger.info(result_message)
messageData = MessageDataWrapper(
status = "ERROR" if result is None else "COMPLETED",
message = result_message,
producedEvent = MediaEvent(
metadata = EventMetadata(
referenceId=event.metadata.referenceId,
eventId=str(uuid.uuid4()),
derivedFromEventId=event.metadata.eventId,
status= "Failed" if result is None else "Success",
created= datetime.now().isoformat()
),
data=result,
derivedFromEventId = mediaEvent.eventId
eventType="event:media-metadata-search:performed"
)
producerMessage = KafkaMessage(referenceId=mediaEvent.referenceId, data=messageData).to_json()
logger.info("<== Outgoing message: %s \n%s", self.producerMessageKey, producerMessage)
# Send resultatet tilbake ved hjelp av Kafka-producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
key_serializer=lambda k: k.encode('utf-8') if isinstance(k, str) else None,
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else None
)
producer.send(kafka_topic, key=self.producerMessageKey, value=producerMessage)
producer.close()
logger.info("<== Outgoing message: %s \n%s", event.eventType, event_data_to_json(producedEvent))
self.insert_into_database(producedEvent)
@ -177,6 +178,34 @@ class MessageHandlerThread(threading.Thread):
return prefixSelector
return None
def insert_into_database(self, event: MediaEvent):
try:
connection = mysql.connector.connect(
host=events_server_address,
port=events_server_port,
database=events_server_database_name,
user=events_server_username,
password=events_server_password
)
cursor = connection.cursor()
query = """
INSERT INTO events (referenceId, eventId, event, data)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(query, (
event.metadata.referenceId,
event.metadata.eventId,
event.eventType,
event_data_to_json(event)
))
connection.commit()
cursor.close()
connection.close()
logger.info("Storing event")
except mysql.connector.Error as err:
logger.error("Error inserting into database: %s", err)
# Global variabel for å indikere om applikasjonen skal avsluttes
should_stop = False
@ -193,7 +222,7 @@ def main():
signal.signal(signal.SIGINT, signal_handler)
# Opprett og start consumer-tråden
consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic, consumer_group)
consumer_thread = EventsPullerThread()
consumer_thread.start()
logger.info("App started")

View File

@ -1,74 +1,61 @@
from typing import Any, List
import json
from dataclasses import dataclass
from typing import Any, List, Optional
from datetime import datetime
suppress_ignore: List[str] = [
"event:media-process:started",
"event:request-process:started",
"event::save",
"event:media-process:completed",
"event:work-encode:created",
"event:work-extract:created",
"event:work-convert:created",
"event:work-encode:performed",
"event:work-extract:performed",
"event:work-convert:performed",
"event:media-read-out-cover:performed",
"event:work-download-cover:performed",
"event:media-read-out-name-and-type:performed",
"event:media-parse-stream:performed",
"event:media-extract-parameter:created",
"event:media-encode-parameter:created",
"event:media-metadata-search:performed"
]
consume_on_key: List[str] = [
"request:metadata:obtain",
"event:media-read-base-info:performed"
]
def decode_key(key_bytes: bytes | None):
return key_bytes.decode('utf-8') if key_bytes else None
def decode_value(value_bytes: bytes | None):
return json.loads(value_bytes.decode('utf-8')) if value_bytes else None
class ConsumerRecord:
topic: str
partition: int
offset: int
key: str
value: str | None
timestamp: int
def __init__(self, message: Any) -> None:
if message is not None:
self.key = message.key
self.value = message.value
self.topic = message.topic
self.offset = message.offset
self.partition = message.partition
self.timestamp = message.timestamp
class MediaEvent():
__consumerRecord: ConsumerRecord
referenceId: str
# Definer dataclassene for strukturen
@dataclass
class EventMetadata:
derivedFromEventId: str
eventId: str
data: dict | None
referenceId: str
status: str
created: datetime
def __init__(self, message: ConsumerRecord) -> None:
self.__consumerRecord = message
self.referenceId = message.value["referenceId"]
self.eventId = message.value["eventId"]
self.data = message.value["data"] if "data" in message.value else None
@dataclass
class EventData:
title: str
sanitizedName: str
searchTitles: List[str]
def isConsumable(self) -> bool:
if "status" in self.data:
if self.data["status"] == "COMPLETED":
return True
return False
@dataclass
class MediaEvent:
metadata: EventMetadata
eventType: str
data: Any| EventData
# Funksjon for å parse datetime fra streng
def parse_datetime(datetime_str: str) -> datetime:
return datetime.fromisoformat(datetime_str)
def event_data_to_json(event_data: EventData) -> str:
return json.dumps(event_data.__dict__)
# Funksjon for å konvertere JSON til klasser
def json_to_media_event(json_data: str) -> MediaEvent:
data_dict = json.loads(json_data)
metadata_dict = data_dict['metadata']
event_data_dict = data_dict['data']
metadata = EventMetadata(
derivedFromEventId=metadata_dict['derivedFromEventId'],
eventId=metadata_dict['eventId'],
referenceId=metadata_dict['referenceId'],
status=metadata_dict['status'],
created=parse_datetime(metadata_dict['created'])
)
event_data = EventData(
title=event_data_dict['title'],
sanitizedName=event_data_dict['sanitizedName'],
searchTitles=event_data_dict['searchTitles']
)
media_event = MediaEvent(
metadata=metadata,
eventType=data_dict['eventType'],
data=event_data
)
return media_event

View File

@ -1,9 +1,9 @@
cinemagoer>=2023.5.1
AnilistPython>=0.1.3
kafka-python>=2.0.2
fuzzywuzzy>=0.18.0
requests>=2.31.0
python-Levenshtein>=0.21.1
mal-api>=0.5.3
Unidecode>=1.3.8
tabulate>=0.9.0
mysql-connector-python>=9.0.0