Changed behaviour and verification for timeout listener
This commit is contained in:
parent
d1e96f5d2a
commit
816e268b2d
@ -47,24 +47,35 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
|
||||
Events.ProcessCompleted
|
||||
)
|
||||
|
||||
private val internalFilter = listOf(
|
||||
Events.BaseInfoRead,
|
||||
Events.MetadataSearchPerformed,
|
||||
Events.ProcessCompleted
|
||||
)
|
||||
|
||||
val metadataTimeout = metadataTimeoutMinutes * 60
|
||||
|
||||
private val timeoutScope = CoroutineScope(Dispatchers.Default)
|
||||
val timeoutJobs = ConcurrentHashMap<String, Job>()
|
||||
|
||||
override fun shouldIHandleFailedEvents(incomingEvent: Event): Boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
|
||||
if (!super.shouldIProcessAndHandleEvent(incomingEvent, events))
|
||||
if (!isOfEventsIListenFor(incomingEvent))
|
||||
return false
|
||||
|
||||
val childOf = events.filter { it.derivedFromEventId() == incomingEvent.eventId() }
|
||||
val haveListenerProduced = childOf.any { it.eventType == produceEvent }
|
||||
if (haveListenerProduced)
|
||||
return false
|
||||
|
||||
val metadataEvent = events.findEventOf<MediaMetadataReceivedEvent>()
|
||||
val metadataSource = metadataEvent?.metadata?.source
|
||||
|
||||
if (events.any { it.eventType == produceEvent } && !canProduceMultipleEvents() && metadataSource == getProducerName()) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (!havProducedDerivedEventOnIncomingEvent(incomingEvent, events) && canProduceMultipleEvents()) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (haveProducedExpectedMessageBasedOnEvent(incomingEvent, events))
|
||||
return false
|
||||
|
||||
return (events.any { it.eventType == Events.BaseInfoRead })
|
||||
}
|
||||
|
||||
|
||||
@ -2,8 +2,8 @@ package no.iktdev.mediaprocessing.coordinator
|
||||
|
||||
import no.iktdev.eventi.data.EventMetadata
|
||||
import no.iktdev.eventi.data.EventStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaProcessStartEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData
|
||||
import no.iktdev.mediaprocessing.coordinator.tasksV2.listeners.MetadataWaitOrDefaultTaskListener
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.data.*
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
|
||||
import java.util.UUID
|
||||
|
||||
@ -19,6 +19,37 @@ fun defaultStartEvent(): MediaProcessStartEvent {
|
||||
)
|
||||
}
|
||||
|
||||
fun defaultBaseInfoEvent(): BaseInfoEvent {
|
||||
return BaseInfoEvent(
|
||||
metadata = defaultMetadata(),
|
||||
data = BaseInfo(
|
||||
title = "Potetmos",
|
||||
sanitizedName = "Potetmos mannen",
|
||||
searchTitles = listOf("Potetmos mannen")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fun metadataSearchTimedOutEvent(): MediaMetadataReceivedEvent {
|
||||
return MediaMetadataReceivedEvent(
|
||||
metadata = defaultMetadata()
|
||||
.copy(status = EventStatus.Skipped)
|
||||
.copy(source = MetadataWaitOrDefaultTaskListener::class.java.simpleName),
|
||||
data = null
|
||||
)
|
||||
}
|
||||
|
||||
fun defaultMetadataSearchEvent(): MediaMetadataReceivedEvent {
|
||||
return MediaMetadataReceivedEvent(
|
||||
metadata = defaultMetadata(),
|
||||
data = pyMetadata(
|
||||
title = "Potetmos",
|
||||
type = "movie",
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
fun defaultMetadata(): EventMetadata {
|
||||
return EventMetadata(
|
||||
referenceId = defaultReferenceId,
|
||||
|
||||
@ -0,0 +1,52 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.defaultBaseInfoEvent
|
||||
import no.iktdev.mediaprocessing.coordinator.defaultMetadataSearchEvent
|
||||
import no.iktdev.mediaprocessing.coordinator.defaultStartEvent
|
||||
import no.iktdev.mediaprocessing.coordinator.metadataSearchTimedOutEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
|
||||
import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.DisplayName
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class MetadataWaitOrDefaultTaskListenerTest {
|
||||
|
||||
@Test
|
||||
@DisplayName("""
|
||||
When incoming event is of base name, and there is no search performed,
|
||||
Validation check should proceed
|
||||
""")
|
||||
fun validate_shouldIProcessAndHandleEvent1() {
|
||||
val listener = MetadataWaitOrDefaultTaskListener()
|
||||
val events = listOf<Event>(defaultStartEvent(), defaultBaseInfoEvent())
|
||||
val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events)
|
||||
assertTrue(result)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("""
|
||||
When incoming event is of MetadataReceivedEvent,
|
||||
And timeout listener is the origin,
|
||||
Then validation should abort
|
||||
""")
|
||||
fun validate_shouldIProcessAndHandleEvent2() {
|
||||
val listener = MetadataWaitOrDefaultTaskListener()
|
||||
val events = listOf<Event>(defaultStartEvent(), defaultBaseInfoEvent(), metadataSearchTimedOutEvent())
|
||||
val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events)
|
||||
assertFalse(result)
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("""
|
||||
When incoming event is of MetadataReceivedEvent,
|
||||
And metadata service has produced the event,
|
||||
Then validation should allow, due to cleanup
|
||||
""")
|
||||
fun validate_shouldIProcessAndHandleEvent3() {
|
||||
val listener = MetadataWaitOrDefaultTaskListener()
|
||||
val events = listOf<Event>(defaultStartEvent(), defaultBaseInfoEvent(), defaultMetadataSearchEvent())
|
||||
val result = listener.shouldIProcessAndHandleEvent(incomingEvent = events.last(), events)
|
||||
assertTrue(result)
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user