diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt index 49c330f0..54803894 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt @@ -29,8 +29,9 @@ class EventBasedMessageListener { */ fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List) { val waitingListeners = waitingListeners(events) - val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners) - availableListeners.forEach { + //val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners) + //availableListeners.forEach { + waitingListeners.forEach { try { it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events) } catch (e: Exception) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index dab956fa..39b1f727 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import org.springframework.stereotype.Service @Service @@ -32,8 +33,9 @@ class ParseVideoFileStreams() : TaskCreator() { override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + val desiredEvent = events.find { it.data is ReaderPerformed } ?: return null - return parseStreams(event.data as ReaderPerformed) + return parseStreams(desiredEvent.data as ReaderPerformed) } fun parseStreams(data: ReaderPerformed): MessageDataWrapper { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 24800d0c..128777c8 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -41,8 +41,8 @@ class ReadVideoFileStreams(): TaskCreator() { override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - - return runBlocking { fileReadStreams(event.data as ProcessStarted) } + val desiredEvent = events.find { it.data is ProcessStarted } ?: return null + return runBlocking { fileReadStreams(desiredEvent.data as ProcessStarted) } } suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index ceb9ef58..763268b6 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -126,8 +126,11 @@ class MessageHandlerThread(threading.Thread): baseName = self.message.value["data"]["sanitizedName"] title = self.message.value['data']["title"] + logger.info("Searching for %s", title) result = self.get_metadata(title) if (result is None): + logger.info("No result for %s", title) + logger.info("Searching for %s", baseName) result = self.get_metadata(baseName) producerMessage = self.compose_message(referenceId=self.message.value["referenceId"], result=result) @@ -143,6 +146,8 @@ class MessageHandlerThread(threading.Thread): ) producer.send(kafka_topic, key="event:media-metadata-search:performed", value=result_json) producer.close() + else: + logger.info("Message status is not of 'COMPLETED', %s", self.message.value) def get_metadata(self, name: str) -> Optional[DataResult]: result = None