From bc7f557a3e79dc150febc6b2191fc338fab06011 Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 19 Apr 2024 20:41:51 +0200 Subject: [PATCH] Update --- .../persistance/PersistentEventManager.kt | 22 +-- .../common/persistance/PersistentMessage.kt | 4 +- .../common/tests/PersistentEventMangerTest.kt | 130 ++++++++++++++++++ 3 files changed, 145 insertions(+), 11 deletions(-) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index 9aac7ef9..0524721b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -34,33 +34,37 @@ class PersistentEventManager(private val dataSource: DataSource) { } - private val duplicatable = listOf( + private val exemptedFromSingleEvent = listOf( KafkaEvents.EventWorkConvertCreated, KafkaEvents.EventWorkExtractCreated, KafkaEvents.EventWorkConvertPerformed, KafkaEvents.EventWorkExtractPerformed ) + private fun isExempted(event: KafkaEvents): Boolean { + return event in exemptedFromSingleEvent + } + + /** * @param referenceId Reference * @param eventId Current eventId for the message, required to prevent deletion of itself * @param event Current event for the message */ private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) { - val present = getEventsWith(referenceId).filter { it.eventId != eventId } + val forRemoval = mutableListOf() - val availableForRemoval = mutableListOf() + val present = getEventsWith(referenceId).filter { it.data.derivedFromEventId != null } val helper = PersistentMessageHelper(present) - val superseded = present.filter { it.event == event && it.eventId != eventId } + val replaced = if (!isExempted(event)) present.find { it.eventId != eventId && it.event == event } else null + val orphaned = replaced?.let { helper.getEventsRelatedTo(it.eventId) } ?: emptyList() - val notSuperseded = if (derivedFromId != null && event in duplicatable) { - present.filter { it.event == event && (it.data.derivedFromEventId == derivedFromId) } - } else emptyList() + forRemoval.addAll(orphaned) - superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) } + //superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getEventsRelatedTo(it.eventId)) } - deleteSupersededEvents(availableForRemoval) + deleteSupersededEvents(forRemoval) } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 05f8e867..932d019d 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -46,7 +46,7 @@ class PersistentMessageHelper(val messages: List) { return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat } } - fun getCascadingFrom(eventId: String): List { + fun getEventsRelatedTo(eventId: String): List { val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList() val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null } @@ -65,7 +65,7 @@ class PersistentMessageHelper(val messages: List) { /** * @param eventId Initial eventId */ - fun dfs(eventId: String, derivedEventsMap: Map>, eventsToDelete: MutableSet) { + private fun dfs(eventId: String, derivedEventsMap: Map>, eventsToDelete: MutableSet) { eventsToDelete.add(eventId) derivedEventsMap[eventId]?.forEach { derivedEventId -> dfs(derivedEventId, derivedEventsMap, eventsToDelete) diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt index a98498bf..2a425dd7 100644 --- a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt @@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test import java.util.UUID import org.assertj.core.api.Assertions.assertThat import org.jetbrains.exposed.sql.deleteAll +import kotlin.math.sin class PersistentEventMangerTest { @@ -339,6 +340,135 @@ class PersistentEventMangerTest { } } + @Test + fun testConvertBatchFromExtract() { + val startEventPayload = createMessage() + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload), + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val convertEvents = mutableListOf(); + + val extractEvents = listOf( + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + ).onEach { entry -> + run { + eventManager.setEvent(entry.event, entry.message) + convertEvents.add(EventToMessage(KafkaEvents.EventWorkConvertCreated, + createMessage(derivedFromEventId = entry.message.eventId))) + } + } + + val simpleCascade = eventManager.getEventsWith(defaultReferenceId) + assertThat(simpleCascade.size).isEqualTo(keepStack.size+extractEvents.size) + + assertThat(convertEvents.size).isEqualTo(extractEvents.size) + convertEvents.forEach { + eventManager.setEvent(it.event, it.message) + } + + val result = eventManager.getEventsWith(defaultReferenceId) + + assertThat(result.size).isEqualTo(keepStack.size+extractEvents.size+convertEvents.size) + + withTransaction(dataSource) { + events.deleteAll() + } + } + + + @Test + fun testSomeAreSingleSomeAreNot() { + val startEventPayload = createMessage() + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload), + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val newEvents = listOf( + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventWorkExtractCreated, + createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")), + EventToMessage(KafkaEvents.EventMediaProcessCompleted, + createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "cabd9038-307f-48e4-ac99-88232b1a817c")), + EventToMessage(KafkaEvents.EventMediaProcessCompleted, + createMessage(eventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d", derivedFromEventId = "cabd9038-307f-48e4-ac99-88232b1a817c")), + + EventToMessage(KafkaEvents.EventCollectAndStore, + createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")), + EventToMessage(KafkaEvents.EventCollectAndStore, + createMessage(eventId = "4e6d3a6a-ab89-4627-9158-3c3f92ff7b4c", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")), + EventToMessage(KafkaEvents.EventCollectAndStore, + createMessage(eventId = "4e6d3a6a-ab89-4627-9158-3c3f92ff7b4c", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val result = eventManager.getEventsWith(defaultReferenceId) + val singles = result.filter { it.event != KafkaEvents.EventWorkExtractCreated } + singles.forEach { + val instancesOfMe = singles.filter { sit -> it.event == sit.event } + assertThat(instancesOfMe).hasSize(1) + } + assertThat(result.filter { it.event == KafkaEvents.EventCollectAndStore }).hasSize(1) + + + + withTransaction(dataSource) { + events.deleteAll() + } + } + @Test fun testDerivedOrphanNotInserted() { val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {