Changing replay
This commit is contained in:
parent
a365d16962
commit
af5e1613be
@ -29,8 +29,9 @@ class EventBasedMessageListener {
|
|||||||
*/
|
*/
|
||||||
fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List<PersistentMessage>) {
|
fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List<PersistentMessage>) {
|
||||||
val waitingListeners = waitingListeners(events)
|
val waitingListeners = waitingListeners(events)
|
||||||
val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners)
|
//val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners)
|
||||||
availableListeners.forEach {
|
//availableListeners.forEach {
|
||||||
|
waitingListeners.forEach {
|
||||||
try {
|
try {
|
||||||
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
|
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
|||||||
@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
|||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -32,8 +33,9 @@ class ParseVideoFileStreams() : TaskCreator() {
|
|||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
|
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
|
||||||
|
val desiredEvent = events.find { it.data is ReaderPerformed } ?: return null
|
||||||
|
|
||||||
return parseStreams(event.data as ReaderPerformed)
|
return parseStreams(desiredEvent.data as ReaderPerformed)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
|
fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
|
||||||
|
|||||||
@ -41,8 +41,8 @@ class ReadVideoFileStreams(): TaskCreator() {
|
|||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
|
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
|
||||||
|
val desiredEvent = events.find { it.data is ProcessStarted } ?: return null
|
||||||
return runBlocking { fileReadStreams(event.data as ProcessStarted) }
|
return runBlocking { fileReadStreams(desiredEvent.data as ProcessStarted) }
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper {
|
suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper {
|
||||||
|
|||||||
@ -126,8 +126,11 @@ class MessageHandlerThread(threading.Thread):
|
|||||||
baseName = self.message.value["data"]["sanitizedName"]
|
baseName = self.message.value["data"]["sanitizedName"]
|
||||||
title = self.message.value['data']["title"]
|
title = self.message.value['data']["title"]
|
||||||
|
|
||||||
|
logger.info("Searching for %s", title)
|
||||||
result = self.get_metadata(title)
|
result = self.get_metadata(title)
|
||||||
if (result is None):
|
if (result is None):
|
||||||
|
logger.info("No result for %s", title)
|
||||||
|
logger.info("Searching for %s", baseName)
|
||||||
result = self.get_metadata(baseName)
|
result = self.get_metadata(baseName)
|
||||||
|
|
||||||
producerMessage = self.compose_message(referenceId=self.message.value["referenceId"], result=result)
|
producerMessage = self.compose_message(referenceId=self.message.value["referenceId"], result=result)
|
||||||
@ -143,6 +146,8 @@ class MessageHandlerThread(threading.Thread):
|
|||||||
)
|
)
|
||||||
producer.send(kafka_topic, key="event:media-metadata-search:performed", value=result_json)
|
producer.send(kafka_topic, key="event:media-metadata-search:performed", value=result_json)
|
||||||
producer.close()
|
producer.close()
|
||||||
|
else:
|
||||||
|
logger.info("Message status is not of 'COMPLETED', %s", self.message.value)
|
||||||
|
|
||||||
def get_metadata(self, name: str) -> Optional[DataResult]:
|
def get_metadata(self, name: str) -> Optional[DataResult]:
|
||||||
result = None
|
result = None
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user