This commit is contained in:
bskjon 2024-07-19 15:07:10 +02:00
parent 4a47823b09
commit 40918ad008
10 changed files with 181 additions and 107 deletions

View File

@ -31,6 +31,9 @@ class ConvertWorkTaskListener: WorkTaskListener() {
Events.EventWorkExtractPerformed
)
override fun canProduceMultipleEvents(): Boolean {
return true
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (!isOfEventsIListenFor(incomingEvent))
return false

View File

@ -4,6 +4,7 @@ import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
@ -23,13 +24,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
override val produceEvent: Events = Events.EventMediaReadOutCover
override val listensForEvents: List<Events> = listOf(
Events.EventMediaMetadataSearchPerformed,
Events.EventMediaReadOutNameAndType
Events.EventMediaMetadataSearchPerformed
)
override fun isPrerequisitesFulfilled(incomingEvent: Event, events: List<Event>): Boolean {
return events.any { it.eventType == Events.EventMediaMetadataSearchPerformed } &&
events.any { it.eventType == Events.EventMediaReadOutNameAndType }
return (events.any { it.eventType == Events.EventMediaReadOutNameAndType && it.isSuccessful() })
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
@ -37,6 +36,8 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
if (!state) {
return false
}
if (!incomingEvent.isSuccessful())
return false
return incomingEvent.eventType in listensForEvents
}
@ -54,11 +55,9 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
return
}
val metadata = events.findLast { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az<MediaMetadataReceivedEvent>()?.data
if (metadata == null) {
//log.info { "No metadata.." }
return
}
val metadataEvent = if (event.eventType == Events.EventMediaMetadataSearchPerformed) event else events.findLast { it.eventType == Events.EventMediaMetadataSearchPerformed }
val metadata = metadataEvent?.az<MediaMetadataReceivedEvent>()?.data
?: return
val mediaOutInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()?.data
if (mediaOutInfo == null) {
log.info { "No Media out info" }

View File

@ -30,6 +30,9 @@ class EncodeWorkTaskListener : WorkTaskListener() {
Events.EventMediaWorkProceedPermitted
)
override fun canProduceMultipleEvents(): Boolean {
return true
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {

View File

@ -30,6 +30,10 @@ class ExtractWorkTaskListener: WorkTaskListener() {
Events.EventMediaWorkProceedPermitted
)
override fun canProduceMultipleEvents(): Boolean {
return true
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return state

View File

@ -5,6 +5,7 @@ import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.eventi.database.toEpochSeconds
@ -44,22 +45,31 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
val metadataTimeout = metadataTimeoutMinutes * 60
val waitingProcessesForMeta: MutableMap<String, MetadataTriggerData> = mutableMapOf()
/**
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
*/
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val hasReadBaseInfo = events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed && it.isSuccessful() }
val hasMetadataSearched = events.any { it.eventType == Events.EventMediaMetadataSearchPerformed }
val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)
if (!hasReadBaseInfo) {
return
}
if (events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed } &&
events.none { it.eventType == Events.EventMediaMetadataSearchPerformed } &&
!waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) {
if (hasPollerForMetadataEvent && hasMetadataSearched) {
waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId)
return
}
if (!hasMetadataSearched && !hasPollerForMetadataEvent) {
val consumedIncoming = incomingEvent.consume()
if (consumedIncoming == null) {
log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed}?.az<BaseInfoEvent>()?.data
if (baseInfo == null) {
log.error { "BaseInfoEvent is null for referenceId: ${consumedIncoming.metadata.referenceId} on eventId: ${consumedIncoming.metadata.eventId}" }
@ -70,21 +80,16 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
if (!waitingProcessesForMeta.containsKey(consumedIncoming.metadata.referenceId)) {
waitingProcessesForMeta[consumedIncoming.metadata.referenceId] =
MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now())
log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
}
}
if (events.any { it.eventType == Events.EventMediaMetadataSearchPerformed }
&& waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) {
waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId)
waitingProcessesForMeta[consumedIncoming.metadata.referenceId] =
MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now())
log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
}
}
@Scheduled(fixedDelay = (1_000))
@Scheduled(fixedDelay = (5_000))
fun sendErrorMessageForMetadata() {
val expired = waitingProcessesForMeta.filter {
LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout)

View File

@ -163,7 +163,7 @@ class EncodeServiceV2(
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
log.error { "Encode failed for ${task.referenceId}\n$message" }
tasks.onProduceEvent(EncodeWorkPerformedEvent(
metadata = EventMetadata(
referenceId = task.referenceId,

View File

@ -135,7 +135,7 @@ class ExtractServiceV2(
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
log.error { "Extract failed for ${task.referenceId}\n$message" }
tasks.onProduceEvent(
ExtractWorkPerformedEvent(
metadata = EventMetadata(

View File

@ -11,6 +11,8 @@ from fuzzywuzzy import fuzz
import mysql.connector
from datetime import datetime
import mysql.connector.cursor
from algo.AdvancedMatcher import AdvancedMatcher
from algo.SimpleMatcher import SimpleMatcher
from algo.PrefixMatcher import PrefixMatcher
@ -21,15 +23,17 @@ from sources.anii import Anii
from sources.imdb import Imdb
from sources.mal import Mal
from mysql.connector.abstracts import MySQLConnectionAbstract
from mysql.connector.pooling import PooledMySQLConnection
from mysql.connector.types import RowType as MySqlRowType
# Konfigurer Database
events_server_address = os.environ.get("DATABASE_ADDRESS") or "127.0.0.1"
events_server_address = os.environ.get("DATABASE_ADDRESS") or "192.168.2.250" # "127.0.0.1"
events_server_port = os.environ.get("DATABASE_PORT") or "3306"
events_server_database_name = os.environ.get("DATABASE_NAME_E") or "events"
events_server_database_name = os.environ.get("DATABASE_NAME_E") or "eventsV3" # "events"
events_server_username = os.environ.get("DATABASE_USERNAME") or "root"
events_server_password = os.environ.get("DATABASE_PASSWORD") or "root"
events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root"
@ -44,88 +48,143 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
class EventsPullerThread(threading.Thread):
connector = None
def __init__(self):
super().__init__()
self.shutdown = threading.Event()
def getEventsAvailable(self, connection: PooledMySQLConnection | MySQLConnectionAbstract) -> List[MySqlRowType]:
cursor = connection.cursor(dictionary=True)
cursor.execute("""
SELECT *
FROM events
WHERE referenceId IN (
SELECT referenceId
FROM events
GROUP BY referenceId
HAVING
SUM(event = 'event:media-read-base-info:performed') > 0
AND SUM(event = 'event:media-metadata-search:performed') = 0
AND SUM(event = 'event:media-process:completed') = 0
)
AND event = 'event:media-read-base-info:performed';
""")
row = cursor.fetchall()
cursor.close()
return row
def storeProducedEvent(self, connection: PooledMySQLConnection | MySQLConnectionAbstract, event: MediaEvent) -> bool:
return
try:
cursor = connection.cursor()
query = """
INSERT INTO events (referenceId, eventId, event, data)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(query, (
event.metadata.referenceId,
event.metadata.eventId,
"event:media-metadata-search:performed",
event_data_to_json(event)
))
connection.commit()
cursor.close()
return True
except mysql.connector.Error as err:
logger.error("Error inserting into database: %s", err)
return False
def run(self) -> None:
logger.info(f"Using {events_server_address}:{events_server_port} on table: {events_server_database_name}")
while not self.shutdown.is_set():
connection = None
cursor = None
try:
connection = mysql.connector.connect(
producedMessage: bool = False
connection = mysql.connector.connect(
host=events_server_address,
port=events_server_port,
database=events_server_database_name,
user=events_server_username,
password=events_server_password
)
cursor = connection.cursor(dictionary=True)
cursor.execute("""
SELECT *
FROM events
WHERE referenceId IN (
SELECT referenceId
FROM events
GROUP BY referenceId
HAVING
SUM(event = 'event:media-read-base-info:performed') > 0
AND SUM(event = 'event:media-metadata-search:performed') = 0
AND SUM(event = 'event:media-process:completed') = 0
)
AND event = 'event:media-read-base-info:performed';
""")
# not event:media-metadata-search:performed
for row in cursor.fetchall():
if self.shutdown.is_set():
break
logger.info("Event found!")
handler_thread = MessageHandlerThread(row)
handler_thread.start()
)
try:
rows = self.getEventsAvailable(connection=connection)
for row in rows:
if (row is not None):
try:
referenceId = row["referenceId"]
event = row["event"]
logMessage = f"""
============================================================================
Found message for: {referenceId} @ {event}
============================================================================"""
logger.info(logMessage)
event: MediaEvent = json_to_media_event(row["data"])
producedEvent = MetadataEventHandler(row).run()
producedMessage = f"""
============================================================================
Producing message for: {referenceId} @ {event}
{event_data_to_json(producedEvent)}
============================================================================"""
logger.info(producedMessage)
producedEvent = self.storeProducedEvent(connection=connection, event=producedEvent)
except Exception as e:
"""Produce failure here"""
logger.exception(e)
producedEvent = MediaEvent(
metadata = EventMetadata(
referenceId=event.metadata.referenceId,
eventId=str(uuid.uuid4()),
derivedFromEventId=event.metadata.eventId,
status= "Failed",
created= datetime.now().isoformat()
),
data=None,
eventType="EventMediaMetadataSearchPerformed"
)
self.storeProducedEvent(connection=connection, event=producedEvent)
except mysql.connector.Error as err:
logger.error("Database error: %s", err)
finally:
if cursor:
cursor.close()
if connection:
connection.close()
connection = None
# Introduce a small sleep to reduce CPU usage
time.sleep(5)
if (self.shutdown.is_set()):
logger.info("Shutdown is set..")
def stop(self):
self.shutdown.set()
global should_stop
should_stop = True
# Kafka message handler-klasse
class MessageHandlerThread(threading.Thread):
mediaEvent: MediaEvent|None = None
def __init__(self, row):
class MetadataEventHandler():
mediaEvent: MediaEvent | None = None
def __init__(self, data: MediaEvent):
super().__init__()
jsonData = row['data']
logger.info(jsonData)
self.mediaEvent = json_to_media_event(jsonData)
self.mediaEvent = None
self.mediaEvent = data
logger.info(self.mediaEvent)
def run(self):
logger.info("Starting processing")
def run(self) -> MediaEvent:
logger.info("Starting search")
if (self.mediaEvent is None):
logger.error("Event does not contain anything...")
return
event: MediaEvent = self.mediaEvent
logger.info("Processing event: event=%s, value=%s", event.eventType, event)
searchableTitles: List[str] = event.data.searchTitles
searchableTitles.extend([
event.data.title,
@ -154,11 +213,7 @@ class MessageHandlerThread(threading.Thread):
data=result,
eventType="EventMediaMetadataSearchPerformed"
)
logger.info("<== Outgoing message: %s \n%s", event.eventType, event_data_to_json(producedEvent))
self.insert_into_database(producedEvent, "event:media-metadata-search:performed")
return producedEvent
def __getMetadata(self, titles: List[str]) -> Metadata | None:
@ -186,34 +241,6 @@ class MessageHandlerThread(threading.Thread):
return prefixSelector
return None
def insert_into_database(self, event: MediaEvent, eventKey: str):
try:
connection = mysql.connector.connect(
host=events_server_address,
port=events_server_port,
database=events_server_database_name,
user=events_server_username,
password=events_server_password
)
cursor = connection.cursor()
query = """
INSERT INTO events (referenceId, eventId, event, data)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(query, (
event.metadata.referenceId,
event.metadata.eventId,
eventKey,
event_data_to_json(event)
))
connection.commit()
cursor.close()
connection.close()
logger.info("Storing event")
except mysql.connector.Error as err:
logger.error("Error inserting into database: %s", err)
# Global variabel for å indikere om applikasjonen skal avsluttes
should_stop = False

View File

@ -31,6 +31,7 @@ class SourceBase(ABC):
partialMatch = fuzz.ratio(title, clean_foundTitle) if clean_foundTitle is not None else 0
if directMatch >= 60:
log.info(f"{source} -> Direct Match for '{title}' of '{foundTitle}' on part '{clean_foundTitle}' with direct score: {directMatch} and partial {partialMatch}")
return True
elif partialMatch >= 80:
log.info(f"{source} -> Partial Match for '{title}' of '{foundTitle}' on part '{clean_foundTitle}' with direct score: {directMatch} and partial {partialMatch}")
@ -40,13 +41,36 @@ class SourceBase(ABC):
return False
def getMatchingOnTitleWords(self, idToTitle: dict[str, str], titles: List[str]) -> dict[str, str]:
matched_idToTitle = {}
for title in titles:
title_words = set(title.split())
for id, stored_title in idToTitle.items():
stored_title_words = set(stored_title.split())
if title_words & stored_title_words: # sjekker om det er et felles ord
score = fuzz.token_set_ratio(title, stored_title)
if score >= 75:
matched_idToTitle[id] = (stored_title, score)
# Returnerer den originale dict med score 0 hvis ingen titler matcher
if not matched_idToTitle:
for id, stored_title in idToTitle.items():
matched_idToTitle[id] = (stored_title, 0)
# Returnerer den originale dict hvis ingen titler matcher
return matched_idToTitle if matched_idToTitle else idToTitle
def findBestMatchAcrossTitles(self, idToTitle: dict[str, str], titles: List[str]) -> Tuple[str, str]:
# Få den filtrerte eller originale idToTitle basert på ordmatching
filtered_idToTitle = self.getMatchingOnTitleWords(idToTitle, titles)
best_match_id = ""
best_match_title = ""
best_ratio = 0
for title in titles:
for id, stored_title in idToTitle.items():
for id, stored_title in filtered_idToTitle.items():
ratio = fuzz.ratio(title, stored_title)
if ratio > best_ratio:
best_ratio = ratio

View File

@ -38,6 +38,11 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
return false
}
open fun canProduceMultipleEvents(): Boolean {
return false
}
open fun haveProducedExpectedMessageBasedOnEvent(incomingEvent: T, events: List<T>): Boolean {
val eventsProducedByListener = events.filter { it.eventType == produceEvent }
val triggeredBy = events.filter { it.eventType in listensForEvents }
@ -62,6 +67,10 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
if (haveProducedExpectedMessageBasedOnEvent(incomingEvent, events))
return false
if (events.any { it.eventType == produceEvent } && !canProduceMultipleEvents()) {
return false
}
//val isDerived = events.any { it.metadata.derivedFromEventId == incomingEvent.metadata.eventId } // && incomingEvent.eventType == produceEvent
return true
}