From 40918ad00810114b7c25b9a9c6d97c549ddd214a Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 19 Jul 2024 15:07:10 +0200 Subject: [PATCH] v3 28 --- .../listeners/ConvertWorkTaskListener.kt | 3 + .../CoverFromMetadataTaskListener.kt | 17 +- .../listeners/EncodeWorkTaskListener.kt | 3 + .../listeners/ExtractWorkTaskListener.kt | 4 + .../MetadataWaitOrDefaultTaskListener.kt | 33 +-- .../processer/services/EncodeServiceV2.kt | 2 +- .../processer/services/ExtractServiceV2.kt | 2 +- apps/pyMetadata/app.py | 189 ++++++++++-------- apps/pyMetadata/sources/source.py | 26 ++- .../implementations/EventListenerImpl.kt | 9 + 10 files changed, 181 insertions(+), 107 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index 305cd78c..efe07e43 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -31,6 +31,9 @@ class ConvertWorkTaskListener: WorkTaskListener() { Events.EventWorkExtractPerformed ) + override fun canProduceMultipleEvents(): Boolean { + return true + } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { if (!isOfEventsIListenFor(incomingEvent)) return false diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index 6eaa9740..b9e3b73e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventStatus +import no.iktdev.eventi.data.isSuccessful import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper @@ -23,13 +24,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { override val produceEvent: Events = Events.EventMediaReadOutCover override val listensForEvents: List = listOf( - Events.EventMediaMetadataSearchPerformed, - Events.EventMediaReadOutNameAndType + Events.EventMediaMetadataSearchPerformed ) override fun isPrerequisitesFulfilled(incomingEvent: Event, events: List): Boolean { - return events.any { it.eventType == Events.EventMediaMetadataSearchPerformed } && - events.any { it.eventType == Events.EventMediaReadOutNameAndType } + return (events.any { it.eventType == Events.EventMediaReadOutNameAndType && it.isSuccessful() }) } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { @@ -37,6 +36,8 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { if (!state) { return false } + if (!incomingEvent.isSuccessful()) + return false return incomingEvent.eventType in listensForEvents } @@ -54,11 +55,9 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { return } - val metadata = events.findLast { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az()?.data - if (metadata == null) { - //log.info { "No metadata.." } - return - } + val metadataEvent = if (event.eventType == Events.EventMediaMetadataSearchPerformed) event else events.findLast { it.eventType == Events.EventMediaMetadataSearchPerformed } + val metadata = metadataEvent?.az()?.data + ?: return val mediaOutInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az()?.data if (mediaOutInfo == null) { log.info { "No Media out info" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt index e3cf44bd..ae420a76 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt @@ -30,6 +30,9 @@ class EncodeWorkTaskListener : WorkTaskListener() { Events.EventMediaWorkProceedPermitted ) + override fun canProduceMultipleEvents(): Boolean { + return true + } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { val event = incomingEvent.consume() if (event == null) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt index 63bb1a1c..83fd3ea9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt @@ -30,6 +30,10 @@ class ExtractWorkTaskListener: WorkTaskListener() { Events.EventMediaWorkProceedPermitted ) + override fun canProduceMultipleEvents(): Boolean { + return true + } + override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { val state = super.shouldIProcessAndHandleEvent(incomingEvent, events) return state diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index ccf9989c..f24d90cb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -5,6 +5,7 @@ import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventMetadata import no.iktdev.eventi.data.EventStatus +import no.iktdev.eventi.data.isSuccessful import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.eventi.database.toEpochSeconds @@ -44,22 +45,31 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { val metadataTimeout = metadataTimeoutMinutes * 60 val waitingProcessesForMeta: MutableMap = mutableMapOf() + /** * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event */ override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { + val hasReadBaseInfo = events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed && it.isSuccessful() } + val hasMetadataSearched = events.any { it.eventType == Events.EventMediaMetadataSearchPerformed } + val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId) + if (!hasReadBaseInfo) { + return + } - if (events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed } && - events.none { it.eventType == Events.EventMediaMetadataSearchPerformed } && - !waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) { + if (hasPollerForMetadataEvent && hasMetadataSearched) { + waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId) + return + } + + if (!hasMetadataSearched && !hasPollerForMetadataEvent) { val consumedIncoming = incomingEvent.consume() if (consumedIncoming == null) { log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } - val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed}?.az()?.data if (baseInfo == null) { log.error { "BaseInfoEvent is null for referenceId: ${consumedIncoming.metadata.referenceId} on eventId: ${consumedIncoming.metadata.eventId}" } @@ -70,21 +80,16 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC) val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) - if (!waitingProcessesForMeta.containsKey(consumedIncoming.metadata.referenceId)) { - waitingProcessesForMeta[consumedIncoming.metadata.referenceId] = - MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now()) - log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } - } - } - if (events.any { it.eventType == Events.EventMediaMetadataSearchPerformed } - && waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) { - waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId) + waitingProcessesForMeta[consumedIncoming.metadata.referenceId] = + MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now()) + + log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } } } - @Scheduled(fixedDelay = (1_000)) + @Scheduled(fixedDelay = (5_000)) fun sendErrorMessageForMetadata() { val expired = waitingProcessesForMeta.filter { LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt index b5858b93..2eb18ca5 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt @@ -163,7 +163,7 @@ class EncodeServiceV2( taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR) - log.info { "Encode failed for ${task.referenceId}\n$message" } + log.error { "Encode failed for ${task.referenceId}\n$message" } tasks.onProduceEvent(EncodeWorkPerformedEvent( metadata = EventMetadata( referenceId = task.referenceId, diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt index 5f4b36b7..c764636a 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt @@ -135,7 +135,7 @@ class ExtractServiceV2( taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR) - log.info { "Encode failed for ${task.referenceId}\n$message" } + log.error { "Extract failed for ${task.referenceId}\n$message" } tasks.onProduceEvent( ExtractWorkPerformedEvent( metadata = EventMetadata( diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index b8ada766..2b8441f1 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -11,6 +11,8 @@ from fuzzywuzzy import fuzz import mysql.connector from datetime import datetime +import mysql.connector.cursor + from algo.AdvancedMatcher import AdvancedMatcher from algo.SimpleMatcher import SimpleMatcher from algo.PrefixMatcher import PrefixMatcher @@ -21,15 +23,17 @@ from sources.anii import Anii from sources.imdb import Imdb from sources.mal import Mal - +from mysql.connector.abstracts import MySQLConnectionAbstract +from mysql.connector.pooling import PooledMySQLConnection +from mysql.connector.types import RowType as MySqlRowType # Konfigurer Database -events_server_address = os.environ.get("DATABASE_ADDRESS") or "127.0.0.1" +events_server_address = os.environ.get("DATABASE_ADDRESS") or "192.168.2.250" # "127.0.0.1" events_server_port = os.environ.get("DATABASE_PORT") or "3306" -events_server_database_name = os.environ.get("DATABASE_NAME_E") or "events" +events_server_database_name = os.environ.get("DATABASE_NAME_E") or "eventsV3" # "events" events_server_username = os.environ.get("DATABASE_USERNAME") or "root" -events_server_password = os.environ.get("DATABASE_PASSWORD") or "root" +events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root" @@ -44,87 +48,142 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) - class EventsPullerThread(threading.Thread): - connector = None def __init__(self): super().__init__() self.shutdown = threading.Event() + + def getEventsAvailable(self, connection: PooledMySQLConnection | MySQLConnectionAbstract) -> List[MySqlRowType]: + cursor = connection.cursor(dictionary=True) + cursor.execute(""" + SELECT * + FROM events + WHERE referenceId IN ( + SELECT referenceId + FROM events + GROUP BY referenceId + HAVING + SUM(event = 'event:media-read-base-info:performed') > 0 + AND SUM(event = 'event:media-metadata-search:performed') = 0 + AND SUM(event = 'event:media-process:completed') = 0 + ) + AND event = 'event:media-read-base-info:performed'; + """) + row = cursor.fetchall() + cursor.close() + return row + + def storeProducedEvent(self, connection: PooledMySQLConnection | MySQLConnectionAbstract, event: MediaEvent) -> bool: + return + + try: + cursor = connection.cursor() + + query = """ + INSERT INTO events (referenceId, eventId, event, data) + VALUES (%s, %s, %s, %s) + """ + cursor.execute(query, ( + event.metadata.referenceId, + event.metadata.eventId, + "event:media-metadata-search:performed", + event_data_to_json(event) + )) + connection.commit() + cursor.close() + return True + except mysql.connector.Error as err: + logger.error("Error inserting into database: %s", err) + return False def run(self) -> None: logger.info(f"Using {events_server_address}:{events_server_port} on table: {events_server_database_name}") while not self.shutdown.is_set(): - connection = None - cursor = None - try: - connection = mysql.connector.connect( + producedMessage: bool = False + + connection = mysql.connector.connect( host=events_server_address, port=events_server_port, database=events_server_database_name, user=events_server_username, password=events_server_password - ) - cursor = connection.cursor(dictionary=True) - cursor.execute(""" - SELECT * - FROM events - WHERE referenceId IN ( - SELECT referenceId - FROM events - GROUP BY referenceId - HAVING - SUM(event = 'event:media-read-base-info:performed') > 0 - AND SUM(event = 'event:media-metadata-search:performed') = 0 - AND SUM(event = 'event:media-process:completed') = 0 - ) - AND event = 'event:media-read-base-info:performed'; - """) - # not event:media-metadata-search:performed - for row in cursor.fetchall(): - if self.shutdown.is_set(): - break - logger.info("Event found!") - handler_thread = MessageHandlerThread(row) - handler_thread.start() - + ) + try: + rows = self.getEventsAvailable(connection=connection) + for row in rows: + if (row is not None): + try: + referenceId = row["referenceId"] + event = row["event"] + logMessage = f""" +============================================================================ +Found message for: {referenceId} @ {event} +============================================================================""" + logger.info(logMessage) + + event: MediaEvent = json_to_media_event(row["data"]) + producedEvent = MetadataEventHandler(row).run() + producedMessage = f""" +============================================================================ +Producing message for: {referenceId} @ {event} +{event_data_to_json(producedEvent)} +============================================================================""" + logger.info(producedMessage) + + producedEvent = self.storeProducedEvent(connection=connection, event=producedEvent) + + except Exception as e: + """Produce failure here""" + logger.exception(e) + producedEvent = MediaEvent( + metadata = EventMetadata( + referenceId=event.metadata.referenceId, + eventId=str(uuid.uuid4()), + derivedFromEventId=event.metadata.eventId, + status= "Failed", + created= datetime.now().isoformat() + ), + data=None, + eventType="EventMediaMetadataSearchPerformed" + ) + self.storeProducedEvent(connection=connection, event=producedEvent) + except mysql.connector.Error as err: logger.error("Database error: %s", err) finally: - if cursor: - cursor.close() if connection: connection.close() + connection = None # Introduce a small sleep to reduce CPU usage time.sleep(5) if (self.shutdown.is_set()): logger.info("Shutdown is set..") + + + def stop(self): self.shutdown.set() global should_stop should_stop = True -# Kafka message handler-klasse -class MessageHandlerThread(threading.Thread): - mediaEvent: MediaEvent|None = None - def __init__(self, row): +class MetadataEventHandler(): + mediaEvent: MediaEvent | None = None + def __init__(self, data: MediaEvent): super().__init__() - jsonData = row['data'] - logger.info(jsonData) - self.mediaEvent = json_to_media_event(jsonData) + self.mediaEvent = None + + self.mediaEvent = data logger.info(self.mediaEvent) - def run(self): - logger.info("Starting processing") + def run(self) -> MediaEvent: + logger.info("Starting search") if (self.mediaEvent is None): logger.error("Event does not contain anything...") return event: MediaEvent = self.mediaEvent - - logger.info("Processing event: event=%s, value=%s", event.eventType, event) - searchableTitles: List[str] = event.data.searchTitles searchableTitles.extend([ @@ -154,11 +213,7 @@ class MessageHandlerThread(threading.Thread): data=result, eventType="EventMediaMetadataSearchPerformed" ) - - - logger.info("<== Outgoing message: %s \n%s", event.eventType, event_data_to_json(producedEvent)) - self.insert_into_database(producedEvent, "event:media-metadata-search:performed") - + return producedEvent def __getMetadata(self, titles: List[str]) -> Metadata | None: @@ -185,34 +240,6 @@ class MessageHandlerThread(threading.Thread): if prefixSelector is not None: return prefixSelector return None - - def insert_into_database(self, event: MediaEvent, eventKey: str): - try: - connection = mysql.connector.connect( - host=events_server_address, - port=events_server_port, - database=events_server_database_name, - user=events_server_username, - password=events_server_password - ) - cursor = connection.cursor() - - query = """ - INSERT INTO events (referenceId, eventId, event, data) - VALUES (%s, %s, %s, %s) - """ - cursor.execute(query, ( - event.metadata.referenceId, - event.metadata.eventId, - eventKey, - event_data_to_json(event) - )) - connection.commit() - cursor.close() - connection.close() - logger.info("Storing event") - except mysql.connector.Error as err: - logger.error("Error inserting into database: %s", err) # Global variabel for å indikere om applikasjonen skal avsluttes diff --git a/apps/pyMetadata/sources/source.py b/apps/pyMetadata/sources/source.py index 44349a0e..aaa09637 100644 --- a/apps/pyMetadata/sources/source.py +++ b/apps/pyMetadata/sources/source.py @@ -31,6 +31,7 @@ class SourceBase(ABC): partialMatch = fuzz.ratio(title, clean_foundTitle) if clean_foundTitle is not None else 0 if directMatch >= 60: + log.info(f"{source} -> Direct Match for '{title}' of '{foundTitle}' on part '{clean_foundTitle}' with direct score: {directMatch} and partial {partialMatch}") return True elif partialMatch >= 80: log.info(f"{source} -> Partial Match for '{title}' of '{foundTitle}' on part '{clean_foundTitle}' with direct score: {directMatch} and partial {partialMatch}") @@ -40,13 +41,36 @@ class SourceBase(ABC): return False + def getMatchingOnTitleWords(self, idToTitle: dict[str, str], titles: List[str]) -> dict[str, str]: + matched_idToTitle = {} + + for title in titles: + title_words = set(title.split()) + for id, stored_title in idToTitle.items(): + stored_title_words = set(stored_title.split()) + if title_words & stored_title_words: # sjekker om det er et felles ord + score = fuzz.token_set_ratio(title, stored_title) + if score >= 75: + matched_idToTitle[id] = (stored_title, score) + + # Returnerer den originale dict med score 0 hvis ingen titler matcher + if not matched_idToTitle: + for id, stored_title in idToTitle.items(): + matched_idToTitle[id] = (stored_title, 0) + + # Returnerer den originale dict hvis ingen titler matcher + return matched_idToTitle if matched_idToTitle else idToTitle + def findBestMatchAcrossTitles(self, idToTitle: dict[str, str], titles: List[str]) -> Tuple[str, str]: + # Få den filtrerte eller originale idToTitle basert på ordmatching + filtered_idToTitle = self.getMatchingOnTitleWords(idToTitle, titles) + best_match_id = "" best_match_title = "" best_ratio = 0 for title in titles: - for id, stored_title in idToTitle.items(): + for id, stored_title in filtered_idToTitle.items(): ratio = fuzz.ratio(title, stored_title) if ratio > best_ratio: best_ratio = ratio diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt index d3e0de72..b962b22e 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt @@ -38,6 +38,11 @@ abstract class EventListenerImpl> { return false } + open fun canProduceMultipleEvents(): Boolean { + return false + } + + open fun haveProducedExpectedMessageBasedOnEvent(incomingEvent: T, events: List): Boolean { val eventsProducedByListener = events.filter { it.eventType == produceEvent } val triggeredBy = events.filter { it.eventType in listensForEvents } @@ -62,6 +67,10 @@ abstract class EventListenerImpl> { if (haveProducedExpectedMessageBasedOnEvent(incomingEvent, events)) return false + if (events.any { it.eventType == produceEvent } && !canProduceMultipleEvents()) { + return false + } + //val isDerived = events.any { it.metadata.derivedFromEventId == incomingEvent.metadata.eventId } // && incomingEvent.eventType == produceEvent return true }