Collect fix 2

This commit is contained in:
Brage Skjønborg 2026-02-02 04:36:38 +01:00
parent 1116f80066
commit b8562f48e7
5 changed files with 200 additions and 65 deletions

View File

@ -1,30 +1,33 @@
package no.iktdev.mediaprocessing.coordinator.listeners.events package no.iktdev.mediaprocessing.coordinator.listeners.events
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
import no.iktdev.mediaprocessing.shared.common.listeners.SummaryEventListener
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
@Component @Component
class CollectEventsListener : EventListener() { class CollectEventsListener(eventStore: no.iktdev.eventi.stores.EventStore = EventStore) : SummaryEventListener(eventStore) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
override fun shouldSummarize(fullHistory: List<Event>): Boolean {
val projection = CollectProjection(fullHistory)
if (projection.startedWith == null) return false
if (!projection.isWorkflowComplete()) return false
return true
}
override fun onEvent(event: Event, history: List<Event>): Event? { override fun produceSummary(fullHistory: List<Event>): Event {
// Avoid double-collection
if (event is CollectedEvent || history.any { it is CollectedEvent }) return null
val projection = CollectProjection(history)
// Must have a StartProcessingEvent
if (projection.startedWith == null) return null
// Must have all relevant tasks completed // Must have all relevant tasks completed
if (!projection.isWorkflowComplete()) return null val eventIds = fullHistory.map { it.eventId }.toSet()
return CollectedEvent(history.map { it.eventId }.toSet()).derivedOf(event) return CollectedEvent(eventIds).derivedOf(fullHistory.last())
}
override fun summaryAlreadyExists(fullHistory: List<Event>): Boolean {
return fullHistory.any { it is CollectedEvent }
} }
} }

View File

@ -1,14 +1,17 @@
package no.iktdev.mediaprocessing package no.iktdev.mediaprocessing
import io.mockk.* import io.mockk.*
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.mediaprocessing.shared.database.InMemoryEventStore
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
@ -16,6 +19,9 @@ import java.io.File
import java.util.* import java.util.*
open class TestBase { open class TestBase {
val eventStore = InMemoryEventStore()
class DummyEvent: Event() class DummyEvent: Event()
class DummyTask: Task() class DummyTask: Task()
@ -35,14 +41,14 @@ open class TestBase {
every { coordinatorEnv.incomingContent } returns File("./tmp/input") every { coordinatorEnv.incomingContent } returns File("./tmp/input")
every { coordinatorEnv.cachedContent } returns File("./tmp/cached") every { coordinatorEnv.cachedContent } returns File("./tmp/cached")
every { coordinatorEnv.streamitAddress } returns "http://streamit.lan" every { coordinatorEnv.streamitAddress } returns "http://streamit.lan"
EventRegistry.getEvents().let {
EventTypeRegistry.register(it)
}
eventStore.clear()
} }
fun mockkIO() {
mockkConstructor(File::class)
every { anyConstructed<File>().exists() } returns true
}
fun defaultStartEvent(): StartProcessingEvent { fun defaultStartEvent(): StartProcessingEvent {
val start = StartProcessingEvent( val start = StartProcessingEvent(
data = StartData( data = StartData(

View File

@ -17,17 +17,17 @@ import org.junit.jupiter.api.Test
class CollectEventsListenerTest : TestBase() { class CollectEventsListenerTest : TestBase() {
private val listener = CollectEventsListener(eventStore)
private val listener = CollectEventsListener()
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis historikken har alle påkrevde hendelser og alle oppgaver er i en gyldig tisltand Hvis historikken har alle påkrevde hendelser og alle oppgaver er i en gyldig tilstand
Når onEvent kalles og projeksjonen tilsier gyldig status Når onEvent kalles og projeksjonen tilsier gyldig status
: :
Opprettes CollectEvent basert historikken Opprettes CollectEvent basert historikken
""" """
) )
fun success1() { fun success1() {
val started = defaultStartEvent() val started = defaultStartEvent()
@ -54,24 +54,22 @@ class CollectEventsListenerTest : TestBase() {
*convert.toTypedArray(), *convert.toTypedArray(),
*cover.toTypedArray(), *cover.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
assertThat(result).isNotNull() assertThat(result).isInstanceOf(CollectedEvent::class.java)
assertThat {
result is CollectedEvent
}
} }
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract
Når encode result kommer inn Når encode result kommer inn
: :
Opprettes CollectEvent basert historikken Opprettes ikke CollectEvent
""" """
) )
fun success2() { fun success2() {
val started = defaultStartEvent().let { ev -> val started = defaultStartEvent().let { ev ->
@ -101,6 +99,7 @@ class CollectEventsListenerTest : TestBase() {
metadata, metadata,
*encode.toTypedArray(), *encode.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
@ -108,14 +107,13 @@ class CollectEventsListenerTest : TestBase() {
} }
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis vi har kun convert hendelse Hvis vi har kun convert hendelse
Når convert har kommet inn Når convert har kommet inn
: :
Opprettes CollectEvent basert historikken Opprettes CollectEvent basert historikken
""" """
) )
fun success3() { fun success3() {
@ -150,6 +148,7 @@ class CollectEventsListenerTest : TestBase() {
*metadata.toTypedArray(), *metadata.toTypedArray(),
*convert.toTypedArray(), *convert.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
@ -161,12 +160,12 @@ class CollectEventsListenerTest : TestBase() {
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis vi har kun encoded og extracted hendelser, men vi har sagt at vi også skal konvertere Hvis vi har kun encoded og extracted hendelser, men vi har sagt at vi også skal konvertere
Når extract result kommer inn Når extract result kommer inn
: :
Skal vi si pending convert Skal vi si pending convert
Listener skal returnerere null Listener skal returnere null
""" """
) )
fun failure1() { fun failure1() {
val started = defaultStartEvent() val started = defaultStartEvent()
@ -186,6 +185,8 @@ class CollectEventsListenerTest : TestBase() {
*encode.toTypedArray(), *encode.toTypedArray(),
*extract.toTypedArray(), *extract.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
assertThat(result).isNull() assertThat(result).isNull()
} }
@ -193,11 +194,11 @@ class CollectEventsListenerTest : TestBase() {
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis historikken har alle påkrevde media hendelser, men venter metadata Hvis historikken har alle påkrevde media hendelser, men venter metadata
Når onEvent kalles og projeksjonen tilsier ugyldig tilstand Når onEvent kalles og projeksjonen tilsier ugyldig tilstand
: :
Returerer vi failure Returnerer vi null
""" """
) )
fun failure2() { fun failure2() {
val started = defaultStartEvent() val started = defaultStartEvent()
@ -221,21 +222,21 @@ class CollectEventsListenerTest : TestBase() {
*extract.toTypedArray(), *extract.toTypedArray(),
*convert.toTypedArray(), *convert.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
assertThat(result).isNull() assertThat(result).isNull()
} }
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis historikken har alle påkrevde hendelser og encode feilet Hvis historikken har alle påkrevde hendelser og encode feilet
Når onEvent kalles og projeksjonen tilsier ugyldig tilstand Når onEvent kalles og projeksjonen tilsier ugyldig tilstand
: :
Collect feiler Collect returnerer null
""" """
) )
fun failure3() { fun failure3() {
val started = defaultStartEvent() val started = defaultStartEvent()
@ -262,19 +263,22 @@ class CollectEventsListenerTest : TestBase() {
*convert.toTypedArray(), *convert.toTypedArray(),
*cover.toTypedArray(), *cover.toTypedArray(),
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
assertThat(result).isNull() assertThat(result).isNull()
} }
@Test @Test
@DisplayName( @DisplayName(
""" """
Hvis ingen oppgaver har blitt gjort Hvis ingen oppgaver har blitt gjort
Når onEvent kalles Når onEvent kalles
: :
Skal projeksjonen gi ugyldig tilstand og returnere null Skal projeksjonen gi ugyldig tilstand og returnere null
""" """
) )
fun failure4() { fun failure4() {
val started = defaultStartEvent().let { ev -> val started = defaultStartEvent().let { ev ->
@ -287,17 +291,76 @@ class CollectEventsListenerTest : TestBase() {
mediaType = MediaType.Movie mediaType = MediaType.Movie
).derivedOf(started) ).derivedOf(started)
val history = listOf( val history = listOf(
started, started,
parsed, parsed,
) )
eventStore.setHistory(history)
val result = listener.onEvent(history.last(), history) val result = listener.onEvent(history.last(), history)
assertThat(result).isNull() assertThat(result).isNull()
} }
@Test
@DisplayName(
"""
Summarizer skal være idempotent:
- Første kjøring skal produsere CollectedEvent
- Andre kjøring skal returnere null
- Re-feed skal ikke produsere flere CollectedEvent
"""
)
fun summarizerDoesNotGoHaywire() {
val started = defaultStartEvent()
val parsed = mediaParsedEvent(
collection = "MyCollection",
fileName = "MyCollection 1",
mediaType = MediaType.Movie
).derivedOf(started)
val metadata = metadataEvent(parsed)
val encode = encodeEvent("/tmp/video.mp4", parsed)
val extract = extractEvent("en", "/tmp/sub1.srt", encode.last())
val convert = convertEvent(
language = "en",
baseName = "sub1",
outputFiles = listOf("/tmp/sub1.vtt"),
derivedFrom = extract.last()
)
val cover = coverEvent("/tmp/cover.jpg", metadata.last())
val history = listOf(
started,
parsed,
*metadata.toTypedArray(),
*encode.toTypedArray(),
*extract.toTypedArray(),
*convert.toTypedArray(),
*cover.toTypedArray(),
)
// Gi summarizeren full historikk
eventStore.setHistory(history)
// Første kjøring: skal produsere CollectedEvent
val first = listener.onEvent(history.last(), history)
assertThat(first).isInstanceOf(CollectedEvent::class.java)
// Simuler at summarizer-eventet ble lagret i historikken
val collected = first as CollectedEvent
val newHistory = history + collected
eventStore.setHistory(newHistory)
// Andre kjøring: summarizer skal se at CollectedEvent finnes → returnere null
val second = listener.onEvent(collected, newHistory)
assertThat(second).isNull()
// Tredje kjøring: re-feed av siste event → fortsatt null
val third = listener.onEvent(collected, newHistory)
assertThat(third).isNull()
}
} }

View File

@ -0,0 +1,26 @@
package no.iktdev.mediaprocessing.shared.common.listeners
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.stores.EventStore
abstract class SummaryEventListener(
private val eventStore: EventStore
) : EventListener() {
final override fun onEvent(event: Event, history: List<Event>): Event? {
val fullHistory = eventStore.getPersistedEventsFor(event.referenceId)
val events = fullHistory.map { it.toEvent() }.filterNotNull()
if (!shouldSummarize(events)) return null
if (summaryAlreadyExists(events)) return null
return produceSummary(events).derivedOf(event)
}
abstract fun shouldSummarize(fullHistory: List<Event>): Boolean
abstract fun produceSummary(fullHistory: List<Event>): Event
abstract fun summaryAlreadyExists(fullHistory: List<Event>): Boolean
}

View File

@ -0,0 +1,37 @@
package no.iktdev.mediaprocessing.shared.database
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.eventi.stores.EventStore
import java.time.Instant
import java.util.*
class InMemoryEventStore : EventStore {
private val persisted = mutableListOf<PersistedEvent>()
private var nextId = 1L
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> =
persisted.filter { it.persistedAt > timestamp }
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> =
persisted.filter { it.referenceId == referenceId }
override fun persist(event: Event) {
val persistedEvent = event.toPersisted(nextId++, MyTime.utcNow())
persisted += persistedEvent!!
}
fun persistAt(event: Event, persistedAt: Instant) {
val persistedEvent = event.toPersisted(nextId++, persistedAt)
persisted += persistedEvent!!
}
fun setHistory(events: List<Event>) {
events.forEach { persist(it) }
}
fun all(): List<PersistedEvent> = persisted
fun clear() { persisted.clear(); nextId = 1L }
}