From 816e268b2d66813ec248c747ce43a750f33a1fc6 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 12 Apr 2025 01:03:53 +0200 Subject: [PATCH] Changed behaviour and verification for timeout listener --- .../MetadataWaitOrDefaultTaskListener.kt | 31 +++++++---- .../mediaprocessing/coordinator/TestData.kt | 35 ++++++++++++- .../MetadataWaitOrDefaultTaskListenerTest.kt | 52 +++++++++++++++++++ 3 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListenerTest.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index fdc907ed..c98eb72a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -47,24 +47,35 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { Events.ProcessCompleted ) - private val internalFilter = listOf( - Events.BaseInfoRead, - Events.MetadataSearchPerformed, - Events.ProcessCompleted - ) - val metadataTimeout = metadataTimeoutMinutes * 60 private val timeoutScope = CoroutineScope(Dispatchers.Default) val timeoutJobs = ConcurrentHashMap() - override fun shouldIHandleFailedEvents(incomingEvent: Event): Boolean { - return true - } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { - if (!super.shouldIProcessAndHandleEvent(incomingEvent, events)) + if (!isOfEventsIListenFor(incomingEvent)) return false + + val childOf = events.filter { it.derivedFromEventId() == incomingEvent.eventId() } + val haveListenerProduced = childOf.any { it.eventType == produceEvent } + if (haveListenerProduced) + return false + + val metadataEvent = events.findEventOf() + val metadataSource = metadataEvent?.metadata?.source + + if (events.any { it.eventType == produceEvent } && !canProduceMultipleEvents() && metadataSource == getProducerName()) { + return false + } + + if (!havProducedDerivedEventOnIncomingEvent(incomingEvent, events) && canProduceMultipleEvents()) { + return true + } + + if (haveProducedExpectedMessageBasedOnEvent(incomingEvent, events)) + return false + return (events.any { it.eventType == Events.BaseInfoRead }) } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/TestData.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/TestData.kt index 1074aa87..3a0eb00c 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/TestData.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/TestData.kt @@ -2,8 +2,8 @@ package no.iktdev.mediaprocessing.coordinator import no.iktdev.eventi.data.EventMetadata import no.iktdev.eventi.data.EventStatus -import no.iktdev.mediaprocessing.shared.common.contract.data.MediaProcessStartEvent -import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData +import no.iktdev.mediaprocessing.coordinator.tasksV2.listeners.MetadataWaitOrDefaultTaskListener +import no.iktdev.mediaprocessing.shared.common.contract.data.* import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import java.util.UUID @@ -19,6 +19,37 @@ fun defaultStartEvent(): MediaProcessStartEvent { ) } +fun defaultBaseInfoEvent(): BaseInfoEvent { + return BaseInfoEvent( + metadata = defaultMetadata(), + data = BaseInfo( + title = "Potetmos", + sanitizedName = "Potetmos mannen", + searchTitles = listOf("Potetmos mannen") + ) + ) +} + +fun metadataSearchTimedOutEvent(): MediaMetadataReceivedEvent { + return MediaMetadataReceivedEvent( + metadata = defaultMetadata() + .copy(status = EventStatus.Skipped) + .copy(source = MetadataWaitOrDefaultTaskListener::class.java.simpleName), + data = null + ) +} + +fun defaultMetadataSearchEvent(): MediaMetadataReceivedEvent { + return MediaMetadataReceivedEvent( + metadata = defaultMetadata(), + data = pyMetadata( + title = "Potetmos", + type = "movie", + ) + ) +} + + fun defaultMetadata(): EventMetadata { return EventMetadata( referenceId = defaultReferenceId, diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListenerTest.kt new file mode 100644 index 00000000..f6fdb268 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListenerTest.kt @@ -0,0 +1,52 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners + +import no.iktdev.mediaprocessing.coordinator.defaultBaseInfoEvent +import no.iktdev.mediaprocessing.coordinator.defaultMetadataSearchEvent +import no.iktdev.mediaprocessing.coordinator.defaultStartEvent +import no.iktdev.mediaprocessing.coordinator.metadataSearchTimedOutEvent +import no.iktdev.mediaprocessing.shared.common.contract.data.Event +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class MetadataWaitOrDefaultTaskListenerTest { + + @Test + @DisplayName(""" + When incoming event is of base name, and there is no search performed, + Validation check should proceed + """) + fun validate_shouldIProcessAndHandleEvent1() { + val listener = MetadataWaitOrDefaultTaskListener() + val events = listOf(defaultStartEvent(), defaultBaseInfoEvent()) + val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events) + assertTrue(result) + } + + @Test + @DisplayName(""" + When incoming event is of MetadataReceivedEvent, + And timeout listener is the origin, + Then validation should abort + """) + fun validate_shouldIProcessAndHandleEvent2() { + val listener = MetadataWaitOrDefaultTaskListener() + val events = listOf(defaultStartEvent(), defaultBaseInfoEvent(), metadataSearchTimedOutEvent()) + val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events) + assertFalse(result) + } + + @Test + @DisplayName(""" + When incoming event is of MetadataReceivedEvent, + And metadata service has produced the event, + Then validation should allow, due to cleanup + """) + fun validate_shouldIProcessAndHandleEvent3() { + val listener = MetadataWaitOrDefaultTaskListener() + val events = listOf(defaultStartEvent(), defaultBaseInfoEvent(), defaultMetadataSearchEvent()) + val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events) + assertTrue(result) + } + +} \ No newline at end of file