From b8562f48e77ee78e60c775cd8836f71ec6c8cb7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Mon, 2 Feb 2026 04:36:38 +0100 Subject: [PATCH] Collect fix 2 --- .../listeners/events/CollectEventsListener.kt | 31 ++-- .../no/iktdev/mediaprocessing/TestBase.kt | 16 +- .../events/CollectEventsListenerTest.kt | 155 ++++++++++++------ .../common/listeners/SummaryEventListener.kt | 26 +++ .../shared/database/InMemoryEventStore.kt | 37 +++++ 5 files changed, 200 insertions(+), 65 deletions(-) create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/listeners/SummaryEventListener.kt create mode 100644 shared/database/src/test/kotlin/no/iktdev/mediaprocessing/shared/database/InMemoryEventStore.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt index 528c7304..a92e4649 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt @@ -1,30 +1,33 @@ package no.iktdev.mediaprocessing.coordinator.listeners.events import mu.KotlinLogging -import no.iktdev.eventi.events.EventListener import no.iktdev.eventi.models.Event import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent +import no.iktdev.mediaprocessing.shared.common.listeners.SummaryEventListener import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection +import no.iktdev.mediaprocessing.shared.database.stores.EventStore import org.springframework.stereotype.Component @Component -class CollectEventsListener : EventListener() { +class CollectEventsListener(eventStore: no.iktdev.eventi.stores.EventStore = EventStore) : SummaryEventListener(eventStore) { + private val log = KotlinLogging.logger {} + override fun shouldSummarize(fullHistory: List): Boolean { + val projection = CollectProjection(fullHistory) + if (projection.startedWith == null) return false + if (!projection.isWorkflowComplete()) return false + return true + } - override fun onEvent(event: Event, history: List): Event? { - // Avoid double-collection - if (event is CollectedEvent || history.any { it is CollectedEvent }) return null - - val projection = CollectProjection(history) - - // Must have a StartProcessingEvent - if (projection.startedWith == null) return null - - + override fun produceSummary(fullHistory: List): Event { // Must have all relevant tasks completed - if (!projection.isWorkflowComplete()) return null + val eventIds = fullHistory.map { it.eventId }.toSet() - return CollectedEvent(history.map { it.eventId }.toSet()).derivedOf(event) + return CollectedEvent(eventIds).derivedOf(fullHistory.last()) + } + + override fun summaryAlreadyExists(fullHistory: List): Boolean { + return fullHistory.any { it is CollectedEvent } } } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt index f2bd1671..ab86bc68 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt @@ -1,14 +1,17 @@ package no.iktdev.mediaprocessing import io.mockk.* +import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec +import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent +import no.iktdev.mediaprocessing.shared.database.InMemoryEventStore import no.iktdev.mediaprocessing.shared.database.stores.TaskStore import org.junit.jupiter.api.BeforeEach @@ -16,6 +19,9 @@ import java.io.File import java.util.* open class TestBase { + val eventStore = InMemoryEventStore() + + class DummyEvent: Event() class DummyTask: Task() @@ -35,14 +41,14 @@ open class TestBase { every { coordinatorEnv.incomingContent } returns File("./tmp/input") every { coordinatorEnv.cachedContent } returns File("./tmp/cached") every { coordinatorEnv.streamitAddress } returns "http://streamit.lan" + + EventRegistry.getEvents().let { + EventTypeRegistry.register(it) + } + eventStore.clear() } - fun mockkIO() { - mockkConstructor(File::class) - every { anyConstructed().exists() } returns true - } - fun defaultStartEvent(): StartProcessingEvent { val start = StartProcessingEvent( data = StartData( diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt index dea6c69c..dd118004 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt @@ -17,17 +17,17 @@ import org.junit.jupiter.api.Test class CollectEventsListenerTest : TestBase() { + private val listener = CollectEventsListener(eventStore) - private val listener = CollectEventsListener() @Test @DisplayName( """ - Hvis historikken har alle påkrevde hendelser og alle oppgaver er i en gyldig tisltand - Når onEvent kalles og projeksjonen tilsier gyldig status - Så: - Opprettes CollectEvent basert på historikken - """ + Hvis historikken har alle påkrevde hendelser og alle oppgaver er i en gyldig tilstand + Når onEvent kalles og projeksjonen tilsier gyldig status + Så: + Opprettes CollectEvent basert på historikken + """ ) fun success1() { val started = defaultStartEvent() @@ -54,24 +54,22 @@ class CollectEventsListenerTest : TestBase() { *convert.toTypedArray(), *cover.toTypedArray(), ) - + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) - assertThat(result).isNotNull() - assertThat { - result is CollectedEvent - } + assertThat(result).isInstanceOf(CollectedEvent::class.java) } + @Test @DisplayName( """ - Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract - Når encode result kommer inn - Så: - Opprettes CollectEvent basert på historikken - """ + Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract + Når encode result kommer inn + Så: + Opprettes ikke CollectEvent + """ ) fun success2() { val started = defaultStartEvent().let { ev -> @@ -101,6 +99,7 @@ class CollectEventsListenerTest : TestBase() { metadata, *encode.toTypedArray(), ) + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) @@ -108,14 +107,13 @@ class CollectEventsListenerTest : TestBase() { } - @Test @DisplayName( - """ - Hvis vi har kun convert hendelse - Når convert har kommet inn - Så: - Opprettes CollectEvent basert på historikken + """ + Hvis vi har kun convert hendelse + Når convert har kommet inn + Så: + Opprettes CollectEvent basert på historikken """ ) fun success3() { @@ -150,6 +148,7 @@ class CollectEventsListenerTest : TestBase() { *metadata.toTypedArray(), *convert.toTypedArray(), ) + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) @@ -161,12 +160,12 @@ class CollectEventsListenerTest : TestBase() { @Test @DisplayName( """ - Hvis vi har kun encoded og extracted hendelser, men vi har sagt at vi også skal konvertere - Når extract result kommer inn - Så: - Skal vi si pending på convert - Listener skal returnerere null - """ + Hvis vi har kun encoded og extracted hendelser, men vi har sagt at vi også skal konvertere + Når extract result kommer inn + Så: + Skal vi si pending på convert + Listener skal returnere null + """ ) fun failure1() { val started = defaultStartEvent() @@ -186,6 +185,8 @@ class CollectEventsListenerTest : TestBase() { *encode.toTypedArray(), *extract.toTypedArray(), ) + eventStore.setHistory(history) + val result = listener.onEvent(history.last(), history) assertThat(result).isNull() } @@ -193,11 +194,11 @@ class CollectEventsListenerTest : TestBase() { @Test @DisplayName( """ - Hvis historikken har alle påkrevde media hendelser, men venter på metadata - Når onEvent kalles og projeksjonen tilsier ugyldig tilstand - Så: - Returerer vi failure - """ + Hvis historikken har alle påkrevde media hendelser, men venter på metadata + Når onEvent kalles og projeksjonen tilsier ugyldig tilstand + Så: + Returnerer vi null + """ ) fun failure2() { val started = defaultStartEvent() @@ -221,21 +222,21 @@ class CollectEventsListenerTest : TestBase() { *extract.toTypedArray(), *convert.toTypedArray(), ) + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) assertThat(result).isNull() } - @Test @DisplayName( """ - Hvis historikken har alle påkrevde hendelser og encode feilet - Når onEvent kalles og projeksjonen tilsier ugyldig tilstand - Så: - Collect feiler - """ + Hvis historikken har alle påkrevde hendelser og encode feilet + Når onEvent kalles og projeksjonen tilsier ugyldig tilstand + Så: + Collect returnerer null + """ ) fun failure3() { val started = defaultStartEvent() @@ -262,19 +263,22 @@ class CollectEventsListenerTest : TestBase() { *convert.toTypedArray(), *cover.toTypedArray(), ) + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) assertThat(result).isNull() } + + @Test @DisplayName( """ - Hvis ingen oppgaver har blitt gjort - Når onEvent kalles - Så: - Skal projeksjonen gi ugyldig tilstand og returnere null - """ + Hvis ingen oppgaver har blitt gjort + Når onEvent kalles + Så: + Skal projeksjonen gi ugyldig tilstand og returnere null + """ ) fun failure4() { val started = defaultStartEvent().let { ev -> @@ -287,17 +291,76 @@ class CollectEventsListenerTest : TestBase() { mediaType = MediaType.Movie ).derivedOf(started) - - val history = listOf( started, parsed, ) + eventStore.setHistory(history) val result = listener.onEvent(history.last(), history) assertThat(result).isNull() } + @Test + @DisplayName( + """ + Summarizer skal være idempotent: + - Første kjøring skal produsere CollectedEvent + - Andre kjøring skal returnere null + - Re-feed skal ikke produsere flere CollectedEvent + """ + ) + fun summarizerDoesNotGoHaywire() { + val started = defaultStartEvent() + + val parsed = mediaParsedEvent( + collection = "MyCollection", + fileName = "MyCollection 1", + mediaType = MediaType.Movie + ).derivedOf(started) + + val metadata = metadataEvent(parsed) + val encode = encodeEvent("/tmp/video.mp4", parsed) + val extract = extractEvent("en", "/tmp/sub1.srt", encode.last()) + val convert = convertEvent( + language = "en", + baseName = "sub1", + outputFiles = listOf("/tmp/sub1.vtt"), + derivedFrom = extract.last() + ) + val cover = coverEvent("/tmp/cover.jpg", metadata.last()) + + val history = listOf( + started, + parsed, + *metadata.toTypedArray(), + *encode.toTypedArray(), + *extract.toTypedArray(), + *convert.toTypedArray(), + *cover.toTypedArray(), + ) + + // Gi summarizeren full historikk + eventStore.setHistory(history) + + // Første kjøring: skal produsere CollectedEvent + val first = listener.onEvent(history.last(), history) + assertThat(first).isInstanceOf(CollectedEvent::class.java) + + // Simuler at summarizer-eventet ble lagret i historikken + val collected = first as CollectedEvent + val newHistory = history + collected + eventStore.setHistory(newHistory) + + // Andre kjøring: summarizer skal se at CollectedEvent finnes → returnere null + val second = listener.onEvent(collected, newHistory) + assertThat(second).isNull() + + // Tredje kjøring: re-feed av siste event → fortsatt null + val third = listener.onEvent(collected, newHistory) + assertThat(third).isNull() + } + } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/listeners/SummaryEventListener.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/listeners/SummaryEventListener.kt new file mode 100644 index 00000000..ff896ecc --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/listeners/SummaryEventListener.kt @@ -0,0 +1,26 @@ +package no.iktdev.mediaprocessing.shared.common.listeners + +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.events.EventListener +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.stores.EventStore + +abstract class SummaryEventListener( + private val eventStore: EventStore +) : EventListener() { + + final override fun onEvent(event: Event, history: List): Event? { + val fullHistory = eventStore.getPersistedEventsFor(event.referenceId) + val events = fullHistory.map { it.toEvent() }.filterNotNull() + + if (!shouldSummarize(events)) return null + if (summaryAlreadyExists(events)) return null + + return produceSummary(events).derivedOf(event) + } + + abstract fun shouldSummarize(fullHistory: List): Boolean + abstract fun produceSummary(fullHistory: List): Event + abstract fun summaryAlreadyExists(fullHistory: List): Boolean + +} diff --git a/shared/database/src/test/kotlin/no/iktdev/mediaprocessing/shared/database/InMemoryEventStore.kt b/shared/database/src/test/kotlin/no/iktdev/mediaprocessing/shared/database/InMemoryEventStore.kt new file mode 100644 index 00000000..97e9f624 --- /dev/null +++ b/shared/database/src/test/kotlin/no/iktdev/mediaprocessing/shared/database/InMemoryEventStore.kt @@ -0,0 +1,37 @@ +package no.iktdev.mediaprocessing.shared.database + +import no.iktdev.eventi.MyTime +import no.iktdev.eventi.ZDS.toPersisted +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.eventi.stores.EventStore +import java.time.Instant +import java.util.* + +class InMemoryEventStore : EventStore { + private val persisted = mutableListOf() + private var nextId = 1L + + override fun getPersistedEventsAfter(timestamp: Instant): List = + persisted.filter { it.persistedAt > timestamp } + + override fun getPersistedEventsFor(referenceId: UUID): List = + persisted.filter { it.referenceId == referenceId } + + override fun persist(event: Event) { + val persistedEvent = event.toPersisted(nextId++, MyTime.utcNow()) + persisted += persistedEvent!! + } + + fun persistAt(event: Event, persistedAt: Instant) { + val persistedEvent = event.toPersisted(nextId++, persistedAt) + persisted += persistedEvent!! + } + + fun setHistory(events: List) { + events.forEach { persist(it) } + } + + fun all(): List = persisted + fun clear() { persisted.clear(); nextId = 1L } +}