diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 9c01693a..d5a7313c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -39,17 +39,6 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task val receivedEvents = events.map { it.event } // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. - val requiresOneOf = listOf( - EventWorkConvertPerformed, - EventWorkExtractPerformed, - EventWorkEncodePerformed - ) - - if (requiresOneOf.none { it in receivedEvents }) { - val missing = requiresOneOf.subtract(receivedEvents.toSet()) - log.info { "Can't complete at this moment. Missing required event(s)\n\t" + missing.joinToString("\n\t") } - return null - } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 9caa7c25..5b609c6f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -39,6 +39,8 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover } if (cover == null || cover.data !is CoverInfoPerformed) { return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId", event.eventId) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index 9d5a4f9c..b2c90676 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -13,6 +13,7 @@ object SharedConfig { val uiUrl: String = System.getenv("APP_URL_UI") ?: "http://ui:8080" val preference: File = File("/data/config/preference.json") + val verbose: Boolean = System.getenv("VERBOSE")?.let { it.toBoolean() } ?: false } object DatabaseEnvConfig { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index 1e9b2acb..9aac7ef9 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -34,18 +34,31 @@ class PersistentEventManager(private val dataSource: DataSource) { } + private val duplicatable = listOf( + KafkaEvents.EventWorkConvertCreated, + KafkaEvents.EventWorkExtractCreated, + KafkaEvents.EventWorkConvertPerformed, + KafkaEvents.EventWorkExtractPerformed + ) + /** * @param referenceId Reference * @param eventId Current eventId for the message, required to prevent deletion of itself * @param event Current event for the message */ - private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents) { + private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) { val present = getEventsWith(referenceId).filter { it.eventId != eventId } - val superseded = present.filter { it.event == event && it.eventId != eventId } val availableForRemoval = mutableListOf() val helper = PersistentMessageHelper(present) - superseded.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) } + + val superseded = present.filter { it.event == event && it.eventId != eventId } + + val notSuperseded = if (derivedFromId != null && event in duplicatable) { + present.filter { it.event == event && (it.data.derivedFromEventId == derivedFromId) } + } else emptyList() + + superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) } deleteSupersededEvents(availableForRemoval) @@ -176,7 +189,7 @@ class PersistentEventManager(private val dataSource: DataSource) { true } if (success) { - deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event) + deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event, derivedFromId = message.data?.derivedFromEventId) } return success } diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt index 5220eebd..a98498bf 100644 --- a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt @@ -230,6 +230,115 @@ class PersistentEventMangerTest { } } + @Test + fun testSupersededButKeepWork() { + val startEventPayload = createMessage() + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload), + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val newEvents = listOf( + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val result = eventManager.getEventsWith(defaultReferenceId) + + val expected = (keepStack + newEvents).map { it.message.eventId } + val missing = expected - result.map { it.eventId } + assertThat(missing).isEmpty() + assertThat(expected.size).isEqualTo(result.size) + + withTransaction(dataSource) { + events.deleteAll() + } + } + + @Test + fun testSupersededWork() { + val startEventPayload = createMessage() + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload), + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val newEvents = listOf( + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val replacedWith = listOf( + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "e40b2096-2e6f-4672-9c5a-6c81fe8fc302", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "e40b2096-2e6f-4672-9c5a-6c81fe8fc302")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val result = eventManager.getEventsWith(defaultReferenceId) + + val expected = (keepStack + replacedWith).map { it.message.eventId } + val missing = expected - result.map { it.eventId } + assertThat(missing).isEmpty() + assertThat(expected.size).isEqualTo(result.size) + + withTransaction(dataSource) { + events.deleteAll() + } + } + @Test fun testDerivedOrphanNotInserted() { val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 2ef6fa6f..ac4d74c5 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -38,7 +38,9 @@ enum class KafkaEvents(val event: String) { EventMediaProcessCompleted("event:media-process:completed"), EventRequestProcessCompleted("event:request-process:completed"), - EventCollectAndStore("event::save"); + EventCollectAndStore("event::save"), + + ; companion object { fun toEvent(event: String): KafkaEvents? {