This commit is contained in:
bskjon 2024-04-19 20:41:51 +02:00
parent 5235e0838c
commit bc7f557a3e
3 changed files with 145 additions and 11 deletions

View File

@ -34,33 +34,37 @@ class PersistentEventManager(private val dataSource: DataSource) {
} }
private val duplicatable = listOf( private val exemptedFromSingleEvent = listOf(
KafkaEvents.EventWorkConvertCreated, KafkaEvents.EventWorkConvertCreated,
KafkaEvents.EventWorkExtractCreated, KafkaEvents.EventWorkExtractCreated,
KafkaEvents.EventWorkConvertPerformed, KafkaEvents.EventWorkConvertPerformed,
KafkaEvents.EventWorkExtractPerformed KafkaEvents.EventWorkExtractPerformed
) )
private fun isExempted(event: KafkaEvents): Boolean {
return event in exemptedFromSingleEvent
}
/** /**
* @param referenceId Reference * @param referenceId Reference
* @param eventId Current eventId for the message, required to prevent deletion of itself * @param eventId Current eventId for the message, required to prevent deletion of itself
* @param event Current event for the message * @param event Current event for the message
*/ */
private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) { private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) {
val present = getEventsWith(referenceId).filter { it.eventId != eventId } val forRemoval = mutableListOf<PersistentMessage>()
val availableForRemoval = mutableListOf<PersistentMessage>() val present = getEventsWith(referenceId).filter { it.data.derivedFromEventId != null }
val helper = PersistentMessageHelper(present) val helper = PersistentMessageHelper(present)
val superseded = present.filter { it.event == event && it.eventId != eventId } val replaced = if (!isExempted(event)) present.find { it.eventId != eventId && it.event == event } else null
val orphaned = replaced?.let { helper.getEventsRelatedTo(it.eventId) } ?: emptyList()
val notSuperseded = if (derivedFromId != null && event in duplicatable) { forRemoval.addAll(orphaned)
present.filter { it.event == event && (it.data.derivedFromEventId == derivedFromId) }
} else emptyList()
superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) } //superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getEventsRelatedTo(it.eventId)) }
deleteSupersededEvents(availableForRemoval) deleteSupersededEvents(forRemoval)
} }

View File

@ -46,7 +46,7 @@ class PersistentMessageHelper(val messages: List<PersistentMessage>) {
return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat } return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat }
} }
fun getCascadingFrom(eventId: String): List<PersistentMessage> { fun getEventsRelatedTo(eventId: String): List<PersistentMessage> {
val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList() val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList()
val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null } val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null }
@ -65,7 +65,7 @@ class PersistentMessageHelper(val messages: List<PersistentMessage>) {
/** /**
* @param eventId Initial eventId * @param eventId Initial eventId
*/ */
fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToDelete: MutableSet<String>) { private fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToDelete: MutableSet<String>) {
eventsToDelete.add(eventId) eventsToDelete.add(eventId)
derivedEventsMap[eventId]?.forEach { derivedEventId -> derivedEventsMap[eventId]?.forEach { derivedEventId ->
dfs(derivedEventId, derivedEventsMap, eventsToDelete) dfs(derivedEventId, derivedEventsMap, eventsToDelete)

View File

@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.deleteAll import org.jetbrains.exposed.sql.deleteAll
import kotlin.math.sin
class PersistentEventMangerTest { class PersistentEventMangerTest {
@ -339,6 +340,135 @@ class PersistentEventMangerTest {
} }
} }
@Test
fun testConvertBatchFromExtract() {
val startEventPayload = createMessage()
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload),
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val convertEvents = mutableListOf<PersistentEventMangerTest.EventToMessage>();
val extractEvents = listOf(
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
).onEach { entry ->
run {
eventManager.setEvent(entry.event, entry.message)
convertEvents.add(EventToMessage(KafkaEvents.EventWorkConvertCreated,
createMessage(derivedFromEventId = entry.message.eventId)))
}
}
val simpleCascade = eventManager.getEventsWith(defaultReferenceId)
assertThat(simpleCascade.size).isEqualTo(keepStack.size+extractEvents.size)
assertThat(convertEvents.size).isEqualTo(extractEvents.size)
convertEvents.forEach {
eventManager.setEvent(it.event, it.message)
}
val result = eventManager.getEventsWith(defaultReferenceId)
assertThat(result.size).isEqualTo(keepStack.size+extractEvents.size+convertEvents.size)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test
fun testSomeAreSingleSomeAreNot() {
val startEventPayload = createMessage()
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload),
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val newEvents = listOf(
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventMediaProcessCompleted,
createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "cabd9038-307f-48e4-ac99-88232b1a817c")),
EventToMessage(KafkaEvents.EventMediaProcessCompleted,
createMessage(eventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d", derivedFromEventId = "cabd9038-307f-48e4-ac99-88232b1a817c")),
EventToMessage(KafkaEvents.EventCollectAndStore,
createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")),
EventToMessage(KafkaEvents.EventCollectAndStore,
createMessage(eventId = "4e6d3a6a-ab89-4627-9158-3c3f92ff7b4c", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")),
EventToMessage(KafkaEvents.EventCollectAndStore,
createMessage(eventId = "4e6d3a6a-ab89-4627-9158-3c3f92ff7b4c", derivedFromEventId = "3519af2e-0767-4dbb-b0c5-f19cb926900d")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val result = eventManager.getEventsWith(defaultReferenceId)
val singles = result.filter { it.event != KafkaEvents.EventWorkExtractCreated }
singles.forEach {
val instancesOfMe = singles.filter { sit -> it.event == sit.event }
assertThat(instancesOfMe).hasSize(1)
}
assertThat(result.filter { it.event == KafkaEvents.EventCollectAndStore }).hasSize(1)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test @Test
fun testDerivedOrphanNotInserted() { fun testDerivedOrphanNotInserted() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {