Fixes + ignore duplicate for multi events

This commit is contained in:
bskjon 2024-04-17 22:30:41 +02:00
parent 3b8ce8f86a
commit 607142cc75
6 changed files with 132 additions and 16 deletions

View File

@ -39,17 +39,6 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
val receivedEvents = events.map { it.event } val receivedEvents = events.map { it.event }
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val requiresOneOf = listOf(
EventWorkConvertPerformed,
EventWorkExtractPerformed,
EventWorkEncodePerformed
)
if (requiresOneOf.none { it in receivedEvents }) {
val missing = requiresOneOf.subtract(receivedEvents.toSet())
log.info { "Can't complete at this moment. Missing required event(s)\n\t" + missing.joinToString("\n\t") }
return null
}

View File

@ -39,6 +39,8 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover } val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover }
if (cover == null || cover.data !is CoverInfoPerformed) { if (cover == null || cover.data !is CoverInfoPerformed) {
return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId", event.eventId) return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId", event.eventId)

View File

@ -13,6 +13,7 @@ object SharedConfig {
val uiUrl: String = System.getenv("APP_URL_UI") ?: "http://ui:8080" val uiUrl: String = System.getenv("APP_URL_UI") ?: "http://ui:8080"
val preference: File = File("/data/config/preference.json") val preference: File = File("/data/config/preference.json")
val verbose: Boolean = System.getenv("VERBOSE")?.let { it.toBoolean() } ?: false
} }
object DatabaseEnvConfig { object DatabaseEnvConfig {

View File

@ -34,18 +34,31 @@ class PersistentEventManager(private val dataSource: DataSource) {
} }
private val duplicatable = listOf(
KafkaEvents.EventWorkConvertCreated,
KafkaEvents.EventWorkExtractCreated,
KafkaEvents.EventWorkConvertPerformed,
KafkaEvents.EventWorkExtractPerformed
)
/** /**
* @param referenceId Reference * @param referenceId Reference
* @param eventId Current eventId for the message, required to prevent deletion of itself * @param eventId Current eventId for the message, required to prevent deletion of itself
* @param event Current event for the message * @param event Current event for the message
*/ */
private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents) { private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) {
val present = getEventsWith(referenceId).filter { it.eventId != eventId } val present = getEventsWith(referenceId).filter { it.eventId != eventId }
val superseded = present.filter { it.event == event && it.eventId != eventId }
val availableForRemoval = mutableListOf<PersistentMessage>() val availableForRemoval = mutableListOf<PersistentMessage>()
val helper = PersistentMessageHelper(present) val helper = PersistentMessageHelper(present)
superseded.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) }
val superseded = present.filter { it.event == event && it.eventId != eventId }
val notSuperseded = if (derivedFromId != null && event in duplicatable) {
present.filter { it.event == event && (it.data.derivedFromEventId == derivedFromId) }
} else emptyList()
superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) }
deleteSupersededEvents(availableForRemoval) deleteSupersededEvents(availableForRemoval)
@ -176,7 +189,7 @@ class PersistentEventManager(private val dataSource: DataSource) {
true true
} }
if (success) { if (success) {
deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event) deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event, derivedFromId = message.data?.derivedFromEventId)
} }
return success return success
} }

View File

@ -230,6 +230,115 @@ class PersistentEventMangerTest {
} }
} }
@Test
fun testSupersededButKeepWork() {
val startEventPayload = createMessage()
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload),
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val newEvents = listOf(
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val result = eventManager.getEventsWith(defaultReferenceId)
val expected = (keepStack + newEvents).map { it.message.eventId }
val missing = expected - result.map { it.eventId }
assertThat(missing).isEmpty()
assertThat(expected.size).isEqualTo(result.size)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test
fun testSupersededWork() {
val startEventPayload = createMessage()
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaProcessStarted, startEventPayload),
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEventPayload.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val newEvents = listOf(
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "ad93a41a-db08-436b-84e4-55adb4752f38", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cfeee961-69c1-4eed-8ec5-82ebca01c9e1", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "64625872-bbfe-4604-85cd-02f58e904267", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "0ab96b32-45a5-4517-b0c0-c03d48145340", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "cabd9038-307f-48e4-ac99-88232b1a817c", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "10c0fd42-b5be-42b2-a27b-12ecccc51635", derivedFromEventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val replacedWith = listOf(
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "e40b2096-2e6f-4672-9c5a-6c81fe8fc302", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventWorkExtractCreated,
createMessage(eventId = "b69fb306-e390-4a9e-8d11-89d0688dff16", derivedFromEventId = "e40b2096-2e6f-4672-9c5a-6c81fe8fc302")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val result = eventManager.getEventsWith(defaultReferenceId)
val expected = (keepStack + replacedWith).map { it.message.eventId }
val missing = expected - result.map { it.eventId }
assertThat(missing).isEmpty()
assertThat(expected.size).isEqualTo(result.size)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test @Test
fun testDerivedOrphanNotInserted() { fun testDerivedOrphanNotInserted() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {

View File

@ -38,7 +38,9 @@ enum class KafkaEvents(val event: String) {
EventMediaProcessCompleted("event:media-process:completed"), EventMediaProcessCompleted("event:media-process:completed"),
EventRequestProcessCompleted("event:request-process:completed"), EventRequestProcessCompleted("event:request-process:completed"),
EventCollectAndStore("event::save"); EventCollectAndStore("event::save"),
;
companion object { companion object {
fun toEvent(event: String): KafkaEvents? { fun toEvent(event: String): KafkaEvents? {