diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index c739e6c9..551fe713 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -1,22 +1,22 @@ package no.iktdev.mediaprocessing.coordinator -import com.google.gson.Gson import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent +import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents -import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import org.springframework.stereotype.Service import java.io.File import java.util.UUID @@ -34,8 +34,11 @@ class Coordinator() : CoordinatorBase): MediaProcessStarted? { return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted } - suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List) { - if (messages.any { it.data is ProcessCompleted }) { - // TODO: Build and insert into database + + fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message) { + var existingMessages = persistentReader.getMessagesFor(value.referenceId) + + if (!KafkaEvents.isOfWork(event)) { + val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId } + superseded.forEach { + persistentWriter.deleteStoredEventDataMessage( + referenceId = it.referenceId, + eventId = it.eventId, + event = it.event + ) + } + } + + existingMessages = persistentReader.getMessagesFor(value.referenceId) + val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) } + for (item: PersistentMessage in workItems) { + val originatorId = if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED) || + item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED) + ) { + val ec = item.data as FfmpegWorkRequestCreated + ec.derivedFromEventId + } else if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED)) { + try { + (item.data as ProcesserEncodeWorkPerformed).derivedFromEventId + } catch (e: Exception) { + null + } + } else if (item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED)) { + try { + (item.data as ProcesserExtractWorkPerformed).derivedFromEventId + } catch (e: Exception) { + null + } + } else null + + originatorId?.let { originator -> + deleteEventsIfNoOriginator(item.referenceId, item.eventId, item.event, originator, existingMessages) + } } } + + private fun deleteEventsIfNoOriginator( + referenceId: String, + eventId: String, + event: KafkaEvents, + originatorId: String, + existingMessages: List + ) { + val originator = existingMessages.find { it.eventId == originatorId } + if (originator == null) { + persistentWriter.deleteStoredEventDataMessage(referenceId, eventId, event) + } + } + } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt index d32a357c..b869a0e7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.mapping import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto @@ -33,7 +34,7 @@ class ProcessMapping(val events: List) { val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED} val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } - val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED } + val isSkipped = events.filter { it.isSkipped() } return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size } @@ -44,7 +45,7 @@ class ProcessMapping(val events: List) { val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } - val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED } + val isSkipped = events.filter { it.isSkipped() } return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size } @@ -52,7 +53,7 @@ class ProcessMapping(val events: List) { fun waitsForConvert(): Boolean { val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } - val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED } + val isSkipped = events.filter { it.isSkipped() } return created.size > performed.size + isSkipped.size } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 2d5823b5..5300c93d 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -48,6 +48,8 @@ class Coordinator(): CoordinatorBase) { + val existingMessages = persistentReader.getMessagesFor(value.referenceId) + + val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) } + + + if (KafkaEvents.isOfWork(event)) { + // Here i would need to list all of the work events, then proceed to check which one of the derivedId does not correspond to a entry + // Nonmatching has been superseded + + + + val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId } + superseded.forEach { + persistentWriter.deleteStoredEventDataMessage(referenceId = it.referenceId, eventId = it.eventId, event= it.event ) + } + } + } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt index 364c9da5..e1c9b1e4 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt @@ -5,9 +5,12 @@ import no.iktdev.mediaprocessing.shared.common.datasource.DataSource import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.Message import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.deleteWhere import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.update @@ -42,6 +45,16 @@ open class PersistentDataStore(var dataSource: DataSource) { } } + fun deleteStoredEventDataMessage(referenceId: String, eventId: String, event: KafkaEvents): Boolean { + return executeWithStatus(dataSource.database) { + events.deleteWhere { + (events.referenceId eq referenceId) and + (events.eventId eq eventId) and + (events.event eq event.event) + } + } + } + fun storeProcessDataMessage(event: String, message: Message<*>): Boolean { val exception = executeOrException(dataSource.database) { processerEvents.insert { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index f357d878..8fdedfd6 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.shared.common.persistance import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.isSkipped import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.jetbrains.exposed.sql.ResultRow import java.time.LocalDateTime @@ -29,6 +30,15 @@ fun PersistentMessage.isSuccess(): Boolean { } } +fun PersistentMessage.isSkipped(): Boolean { + return try { + this.data.isSkipped() + } catch (e: Exception) { + false + } +} + + fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { val kev = try { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 9987f11b..d82d0f84 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -28,11 +28,6 @@ enum class KafkaEvents(val event: String) { EVENT_WORK_DOWNLOAD_COVER_PERFORMED("event:work-download-cover:performed"), - EVENT_WORK_ENCODE_SKIPPED("event:work-encode:skipped"), - EVENT_WORK_EXTRACT_SKIPPED("event:work-extract:skipped"), - EVENT_WORK_CONVERT_SKIPPED("event:work-convert:skipped"), - - EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"), EVENT_STORE_SUBTITLE_PERFORMED("event:store-subtitle:performed"), EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), @@ -46,5 +41,18 @@ enum class KafkaEvents(val event: String) { fun toEvent(event: String): KafkaEvents? { return KafkaEvents.entries.find { it.event == event } } + + fun isOfWork(event: KafkaEvents): Boolean { + return event in listOf( + + EVENT_WORK_CONVERT_CREATED, + EVENT_WORK_EXTRACT_CREATED, + EVENT_WORK_ENCODE_CREATED, + + EVENT_WORK_ENCODE_PERFORMED, + EVENT_WORK_CONVERT_PERFORMED, + EVENT_WORK_EXTRACT_PERFORMED + ) + } } } \ No newline at end of file