diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt index 2bc0331d..0f434d4f 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt @@ -18,13 +18,12 @@ class ClaimsService() { @Scheduled(fixedDelay = (300_000)) fun validateClaims() { - val expiredClaims = persistentReader.getExpiredClaimsProcessEvents() + val expiredClaims = eventManager.getProcessEventsWithExpiredClaim() expiredClaims.forEach { log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } } - val store = persistentWriter expiredClaims.forEach { - val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) + val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) if (result) { log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" } } else { diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt index 2a6dfcd6..4504527f 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -4,6 +4,7 @@ import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents import no.iktdev.mediaprocessing.shared.common.toEventsDatabase import org.springframework.boot.autoconfigure.SpringBootApplication @@ -20,8 +21,7 @@ fun getContext(): ApplicationContext? { } -lateinit var persistentReader: PersistentDataReader -lateinit var persistentWriter: PersistentDataStore +lateinit var eventManager: PersistentEventManager private lateinit var eventsDatabase: MySqlDataSource fun getEventsDatabase(): MySqlDataSource { @@ -33,8 +33,7 @@ fun main(args: Array) { eventsDatabase.createDatabase() eventsDatabase.createTables(processerEvents) - persistentReader = PersistentDataReader(eventsDatabase) - persistentWriter = PersistentDataStore(eventsDatabase) + eventManager = PersistentEventManager(eventsDatabase) context = runApplication(*args) } diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index c9d949f1..3ebc1a9c 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -40,14 +40,14 @@ class ConverterCoordinator() : CoordinatorBase>) { - if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) { - val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value) + if (event.key == KafkaEvents.EventWorkConvertCreated) { + val success = eventManager.setProcessEvent(event.key, event.value) if (!success) { log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}!" } } else { readAllMessagesFor(event.value.referenceId, event.value.eventId) } - } else if (event.key == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED) { + } else if (event.key == KafkaEvents.EventWorkExtractPerformed) { readAllInQueue() } else { log.debug { "Skipping ${event.key}" } @@ -55,7 +55,7 @@ class ConverterCoordinator() : CoordinatorBase get() = listOf( - KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, - KafkaEvents.EVENT_WORK_CONVERT_CREATED + KafkaEvents.EventWorkExtractPerformed, + KafkaEvents.EventWorkConvertCreated ) override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED + get() = KafkaEvents.EventWorkConvertPerformed fun getRequiredExtractProcessForContinuation( referenceId: String, requiresEventId: String ): PersistentProcessDataMessage? { - return persistentReader.getProcessEvent(referenceId, requiresEventId) + return eventManager.getProcessEventWith(referenceId, requiresEventId) } fun canConvert(extract: PersistentProcessDataMessage?): Boolean { @@ -61,7 +58,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) events: List ): MessageDataWrapper? { val convertEvent = - events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest } + events.find { it.event == KafkaEvents.EventWorkConvertCreated && it.data is ConvertWorkerRequest } if (convertEvent == null) { // No convert here.. return null @@ -94,17 +91,16 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) } } - val isAlreadyClaimed = - persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId) if (isAlreadyClaimed) { log.warn { "Process is already claimed!" } return null } - val setClaim = persistentWriter.setProcessEventClaim( + val setClaim = eventManager.setProcessEventClaim( referenceId = event.referenceId, eventId = event.eventId, - claimedBy = serviceId + claimer = serviceId ) if (!setClaim) { return null @@ -133,20 +129,19 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) } val consumedIsSuccessful = - persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + eventManager.setProcessEventCompleted(event.referenceId, event.eventId) runBlocking { delay(1000) if (!consumedIsSuccessful) { - persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + eventManager.setProcessEventCompleted(event.referenceId, event.eventId) } delay(1000) - var readbackIsSuccess = - persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + var readbackIsSuccess = eventManager.isProcessEventCompleted(event.referenceId, event.eventId) while (!readbackIsSuccess) { delay(1000) readbackIsSuccess = - persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + eventManager.isProcessEventCompleted(event.referenceId, event.eventId) } } return result @@ -208,13 +203,13 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) } } catch (e: Exception) { - persistentWriter.setProcessEventCompleted(referenceId, event.eventId, serviceId) + eventManager.setProcessEventCompleted(referenceId, event.eventId) failed.add(event) log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." } producer.sendMessage( referenceId = referenceId, event = producesEvent, - data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}") + data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}", derivedFromEventId = event.eventId) ) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 551fe713..7fea98e7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -7,16 +7,12 @@ import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent -import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import org.springframework.stereotype.Service import java.io.File import java.util.UUID @@ -30,13 +26,10 @@ class Coordinator() : CoordinatorBase>) { - val success = persistentWriter.storeEventDataMessage(event.key.event, event.value) + val success = eventManager.setEvent(event.key, event.value) if (!success) { log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" } } else { - deleteOlderEventsIfSuperseded(event.key, event.value) - - io.launch { delay(1000) // Give the database a few sec to update readAllMessagesFor(event.value.referenceId, event.value.eventId) @@ -78,7 +71,7 @@ class Coordinator() : CoordinatorBase): MediaProcessStarted? { - return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted - } - - - fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message) { - var existingMessages = persistentReader.getMessagesFor(value.referenceId) - - if (!KafkaEvents.isOfWork(event)) { - val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId } - superseded.forEach { - persistentWriter.deleteStoredEventDataMessage( - referenceId = it.referenceId, - eventId = it.eventId, - event = it.event - ) - } - } - - existingMessages = persistentReader.getMessagesFor(value.referenceId) - val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) } - for (item: PersistentMessage in workItems) { - val originatorId = if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED) || - item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED) - ) { - val ec = item.data as FfmpegWorkRequestCreated - ec.derivedFromEventId - } else if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED)) { - try { - (item.data as ProcesserEncodeWorkPerformed).derivedFromEventId - } catch (e: Exception) { - null - } - } else if (item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED)) { - try { - (item.data as ProcesserExtractWorkPerformed).derivedFromEventId - } catch (e: Exception) { - null - } - } else null - - originatorId?.let { originator -> - deleteEventsIfNoOriginator(item.referenceId, item.eventId, item.event, originator, existingMessages) - } - } - } - - private fun deleteEventsIfNoOriginator( - referenceId: String, - eventId: String, - event: KafkaEvents, - originatorId: String, - existingMessages: List - ) { - val originator = existingMessages.find { it.eventId == originatorId } - if (originator == null) { - persistentWriter.deleteStoredEventDataMessage(referenceId, eventId, event) - } + return messages.find { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index d22ef220..ef520823 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -9,6 +9,7 @@ import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.toEventsDatabase import no.iktdev.mediaprocessing.shared.common.toStoredDatabase @@ -46,8 +47,7 @@ fun getEventsDatabase(): MySqlDataSource { return eventsDatabase } -lateinit var persistentReader: PersistentDataReader -lateinit var persistentWriter: PersistentDataStore +lateinit var eventManager: PersistentEventManager fun main(args: Array) { Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener { @@ -57,16 +57,19 @@ fun main(args: Array) { }) eventsDatabase = DatabaseEnvConfig.toEventsDatabase() - storeDatabase = DatabaseEnvConfig.toStoredDatabase() - eventsDatabase.createDatabase() + + storeDatabase = DatabaseEnvConfig.toStoredDatabase() storeDatabase.createDatabase() + + eventManager = PersistentEventManager(eventsDatabase) + + val kafkaTables = listOf( events, // For kafka ) - eventsDatabase.createTables(*kafkaTables.toTypedArray()) val tables = arrayOf( catalog, @@ -83,9 +86,8 @@ fun main(args: Array) { ) storeDatabase.createTables(*tables) - persistentReader = PersistentDataReader(eventsDatabase) - persistentWriter = PersistentDataStore(eventsDatabase) + eventsDatabase.createTables(*kafkaTables.toTypedArray()) context = runApplication(*args) printSharedConfig() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt index 8da0a096..297f8ec7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt @@ -2,8 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.controller import com.google.gson.Gson import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.persistentReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.coordinator.eventManager import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.HttpStatus @@ -20,7 +19,7 @@ class ActionEventController(@Autowired var coordinator: Coordinator) { @RequestMapping("/flow/proceed") fun permitRunOnSequence(@RequestBody data: RequestWorkProceed): ResponseEntity { - val set = persistentReader.getMessagesFor(data.referenceId) + val set = eventManager.getEventsWith(data.referenceId) if (set.isEmpty()) { return ResponseEntity.status(HttpStatus.NO_CONTENT).body(Gson().toJson(data)) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt index b869a0e7..b84c0830 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt @@ -30,10 +30,10 @@ class ProcessMapping(val events: List) { fun waitsForEncode(): Boolean { - val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } - val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED} + val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterEncodeCreated } + val created = events.filter { it.event == KafkaEvents.EventWorkEncodeCreated} - val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } + val performed = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed } val isSkipped = events.filter { it.isSkipped() } return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size @@ -41,18 +41,18 @@ class ProcessMapping(val events: List) { fun waitsForExtract(): Boolean { // Check if message is declared as skipped with statis - val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }.filter { it.data.isSuccess() } - val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } + val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterExtractCreated }.filter { it.data.isSuccess() } + val created = events.filter { it.event == KafkaEvents.EventWorkExtractCreated } - val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } + val performed = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed } val isSkipped = events.filter { it.isSkipped() } return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size } fun waitsForConvert(): Boolean { - val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } - val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } + val created = events.filter { it.event == KafkaEvents.EventWorkConvertCreated } + val performed = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed } val isSkipped = events.filter { it.isSkipped() } return created.size > performed.size + isSkipped.size diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index a5c3548c..136b53e9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -21,9 +21,9 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED + get() = KafkaEvents.EventMediaReadBaseInfoPerformed - override val requiredEvents: List = listOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) + override val requiredEvents: List = listOf(KafkaEvents.EventMediaProcessStarted) override fun prerequisitesRequired(events: List): List<() -> Boolean> { @@ -34,22 +34,23 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } - val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null - return readFileInfo(selected.data as MediaProcessStarted) + val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null + return readFileInfo(selected.data as MediaProcessStarted, event.eventId) } - fun readFileInfo(started: MediaProcessStarted): MessageDataWrapper { + fun readFileInfo(started: MediaProcessStarted, eventId: String): MessageDataWrapper { val result = try { val fileName = File(started.file).nameWithoutExtension val fileNameParser = FileNameParser(fileName) BaseInfoPerformed( Status.COMPLETED, title = fileNameParser.guessDesiredTitle(), - sanitizedName = fileNameParser.guessDesiredFileName() + sanitizedName = fileNameParser.guessDesiredFileName(), + derivedFromEventId = eventId ) } catch (e: Exception) { e.printStackTrace() - SimpleMessageData(Status.ERROR, e.message ?: "Unable to obtain proper info from file") + SimpleMessageData(Status.ERROR, e.message ?: "Unable to obtain proper info from file", eventId) } return result } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index 80301b58..e24422d4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus @@ -32,7 +33,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val requiredEvents: List = listOf( - EVENT_MEDIA_PROCESS_STARTED, + EventMediaProcessStarted, EVENT_MEDIA_PROCESS_COMPLETED ) override val listensForEvents: List = KafkaEvents.entries @@ -40,7 +41,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null + val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { return null @@ -65,20 +66,20 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta null else storeCatalog(metadata = meta,genres = genres, videoFile = videoFile, videoDetails = videoInfo) - } ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null") + } ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null", event.eventId) mapped.metadata?.let { storeMetadata(catalogId = catalogId, metadata = it) } - return SimpleMessageData(Status.COMPLETED) + return SimpleMessageData(Status.COMPLETED, derivedFromEventId = event.eventId) } private fun storeSubtitles(collection: String, subtitles: List): Boolean { val result = subtitles.map { subtitle -> val subtitleFile = File(subtitle) val language = subtitleFile.parentFile.name - subtitle to executeWithStatus { + subtitle to executeWithStatus(getStoreDatabase()) { SubtitleQuery( collection = collection, associatedWithVideo = subtitleFile.nameWithoutExtension, @@ -93,7 +94,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta private fun storeMetadata(catalogId: Int, metadata: MetadataDto) { metadata.summary.forEach { - withTransaction { + withTransaction(getStoreDatabase()) { SummaryQuery( cid = catalogId, language = it.language, @@ -104,7 +105,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta } private fun storeAndGetGenres(genres: List): String? { - return withTransaction { + return withTransaction(getStoreDatabase()) { val gq = GenreQuery( *genres.toTypedArray() ) gq.insertAndGetIds() gq.getIds().joinToString(",") @@ -141,7 +142,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta } val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062 return if (result == null || ignoreException ) { - return withTransaction { + return withTransaction(getStoreDatabase()) { precreatedCatalogQuery.getId() } } else null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index e05f5e3e..0e23d45d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -22,16 +22,16 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED override val requiredEvents: List = listOf( - EVENT_MEDIA_PROCESS_STARTED, - EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + EventMediaProcessStarted, + EventMediaReadBaseInfoPerformed, + EventMediaReadOutNameAndType ) override val listensForEvents: List = KafkaEvents.entries override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null + val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null if (!started.data.isSuccess()) { return null } @@ -40,9 +40,9 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. val requiresOneOf = listOf( - EVENT_WORK_CONVERT_PERFORMED, - EVENT_WORK_EXTRACT_PERFORMED, - EVENT_WORK_ENCODE_PERFORMED + EventWorkConvertPerformed, + EventWorkExtractPerformed, + EventWorkEncodePerformed ) if (requiresOneOf.none { it in receivedEvents }) { @@ -56,7 +56,7 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task val mapper = ProcessMapping(events) if (mapper.canCollect()) { - return ProcessCompleted(Status.COMPLETED) + return ProcessCompleted(Status.COMPLETED, event.eventId) } return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt index 0654b00f..cd291b83 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt @@ -39,9 +39,9 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. val requiresOneOf = listOf( - EVENT_WORK_CONVERT_PERFORMED, - EVENT_WORK_EXTRACT_PERFORMED, - EVENT_WORK_ENCODE_PERFORMED + EventWorkConvertPerformed, + EventWorkExtractPerformed, + EventWorkEncodePerformed ) if (requiresOneOf.none { it in receivedEvents }) { @@ -55,7 +55,7 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta val mapper = ProcessMapping(events) if (mapper.canCollect()) { - return ProcessCompleted(Status.COMPLETED) + return ProcessCompleted(Status.COMPLETED, event.eventId) } return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 185969ca..9b009f32 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -16,11 +16,11 @@ import java.io.File @Service class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_WORK_CONVERT_CREATED + get() = KafkaEvents.EventWorkConvertCreated override val requiredEvents: List get() = listOf( - KafkaEvents.EVENT_WORK_EXTRACT_CREATED + KafkaEvents.EventWorkExtractCreated // TODO: Add event for request as well ) @@ -30,7 +30,7 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : } val eventData = event.data as FfmpegWorkRequestCreated? ?: return null - val requiredEventId = if (event.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED) { + val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { event.eventId } else null; @@ -41,7 +41,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : inputFile = eventData.outFile, allowOverwrite = true, outFileBaseName = outFile.nameWithoutExtension, - outDirectory = outFile.parentFile.absolutePath + outDirectory = outFile.parentFile.absolutePath, + derivedFromEventId = event.eventId ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index 83f4d28a..c62ef747 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -9,9 +9,9 @@ import org.springframework.stereotype.Service @Service class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_WORK_ENCODE_CREATED + get() = KafkaEvents.EventWorkEncodeCreated override val requiredEvents: List - get() = listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) + get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated) } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 551ffd72..186f19d0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -9,8 +9,8 @@ import org.springframework.stereotype.Service @Service class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_WORK_EXTRACT_CREATED + get() = KafkaEvents.EventWorkExtractCreated override val requiredEvents: List - get() = listOf(KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) + get() = listOf(KafkaEvents.EventMediaParameterExtractCreated) } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 329350f8..ace87498 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -24,13 +24,13 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED + get() = KafkaEvents.EventWorkDownloadCoverPerformed override val requiredEvents: List get() = listOf( - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, - KafkaEvents.EVENT_MEDIA_READ_OUT_COVER, - KafkaEvents.EVENT_WORK_ENCODE_PERFORMED + KafkaEvents.EventMediaMetadataSearchPerformed, + KafkaEvents.EventMediaReadOutCover, + KafkaEvents.EventWorkEncodePerformed ) override fun prerequisitesRequired(events: List): List<() -> Boolean> { return super.prerequisitesRequired(events) + listOf { @@ -39,14 +39,14 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val cover = events.find { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_COVER } + val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover } if (cover == null || cover.data !is CoverInfoPerformed) { - return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId") + return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId", event.eventId) } val coverData = cover.data as CoverInfoPerformed val outDir = File(coverData.outDir) if (!outDir.exists()) - return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId") + return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId", event.eventId) val client = DownloadClient(coverData.url, File(coverData.outDir), coverData.outFileBaseName) @@ -67,10 +67,10 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator } return if (result == null) { - SimpleMessageData(Status.ERROR, "Could not download cover, check logs") + SimpleMessageData(Status.ERROR, "Could not download cover, check logs", event.eventId) } else { val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR - CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath) + CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath, event.eventId) } } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index 65895bd0..4190d380 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -20,12 +20,12 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER + get() = KafkaEvents.EventMediaReadOutCover override val requiredEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE, - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED + KafkaEvents.EventMediaReadBaseInfoPerformed, + KafkaEvents.EventMediaReadOutNameAndType, + KafkaEvents.EventMediaMetadataSearchPerformed ) override fun prerequisitesRequired(events: List): List<() -> Boolean> { @@ -50,7 +50,8 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi status = Status.COMPLETED, url = coverUrl, outFileBaseName = baseInfo.title, - outDir = fileOut.outDirectory + outDir = fileOut.outDirectory, + derivedFromEventId = event.eventId ) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 34003a32..a5c6ae78 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -39,23 +39,23 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60 override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + get() = KafkaEvents.EventMediaReadOutNameAndType - val waitingProcessesForMeta: MutableMap = mutableMapOf() + val waitingProcessesForMeta: MutableMap = mutableMapOf() override val listensForEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED + KafkaEvents.EventMediaReadBaseInfoPerformed, + KafkaEvents.EventMediaMetadataSearchPerformed ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } - val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? - val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed? + val baseInfo = events.lastOrSuccessOf(KafkaEvents.EventMediaReadBaseInfoPerformed) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? + val meta = events.lastOrSuccessOf(KafkaEvents.EventMediaMetadataSearchPerformed) { it.data is MetadataPerformed }?.data as MetadataPerformed? // Only Return here as both baseInfo events are required to continue - if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) { + if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EventMediaReadOutNameAndType }) { return null } if (baseInfo.isSuccess() && meta == null) { @@ -65,7 +65,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) log.info { "Sending ${baseInfo?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } if (!waitingProcessesForMeta.containsKey(event.referenceId)) { - waitingProcessesForMeta[event.referenceId] = LocalDateTime.now() + waitingProcessesForMeta[event.referenceId] = MetadataTriggerData(event.eventId, LocalDateTime.now()) } return null } @@ -92,9 +92,9 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina val vi = fileDeterminate.getDeterminedVideoInfo()?.toJsonObject() return if (vi != null) { - VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath) + VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath, event.eventId) } else { - SimpleMessageData(Status.ERROR, "No VideoInfo found...") + SimpleMessageData(Status.ERROR, "No VideoInfo found...", event.eventId) } } @@ -103,13 +103,15 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina @Scheduled(fixedDelay = (1_000)) fun sendErrorMessageForMetadata() { val expired = waitingProcessesForMeta.filter { - LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + metadataTimeout) + LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout) } expired.forEach { log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } - producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}")) + producer.sendMessage(it.key, KafkaEvents.EventMediaMetadataSearchPerformed, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}", derivedFromEventId = it.value.eventId)) waitingProcessesForMeta.remove(it.key) } } + data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime) + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index d61703eb..ad3cb44c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -25,10 +25,10 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED + get() = KafkaEvents.EventMediaParseStreamPerformed override val requiredEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED + KafkaEvents.EventMediaReadStreamPerformed ) override fun prerequisitesRequired(events: List): List<() -> Boolean> { @@ -39,11 +39,11 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } - val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null - return parseStreams(desiredEvent.data as ReaderPerformed) + val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null + return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId) } - fun parseStreams(data: ReaderPerformed): MessageDataWrapper { + fun parseStreams(data: ReaderPerformed, eventId: String): MessageDataWrapper { val gson = Gson() return try { val jStreams = data.output.getAsJsonArray("streams") @@ -71,11 +71,11 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : audioStream = audioStreams, subtitleStream = subtitleStreams ) - MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams) + MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams, eventId) } catch (e: Exception) { e.printStackTrace() - SimpleMessageData(Status.ERROR, message = e.message) + SimpleMessageData(Status.ERROR, message = e.message, eventId) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 99214955..22043288 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -26,10 +26,10 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED + get() = KafkaEvents.EventMediaReadStreamPerformed override val requiredEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_PROCESS_STARTED + KafkaEvents.EventMediaProcessStarted ) @@ -43,18 +43,18 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null - return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) } + return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) } } - suspend fun fileReadStreams(started: MediaProcessStarted): MessageDataWrapper { + suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): MessageDataWrapper { val file = File(started.file) return if (file.exists() && file.isFile) { val result = readStreams(file) val joined = result.output.joinToString(" ") val jsoned = Gson().fromJson(joined, JsonObject::class.java) - ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned) + ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned, derivedFromEventId = eventId) } else { - SimpleMessageData(Status.ERROR, "File in data is not a file or does not exist") + SimpleMessageData(Status.ERROR, "File in data is not a file or does not exist", eventId) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt index 9f016e47..7fb13fe7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt @@ -12,12 +12,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkReques import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import org.springframework.beans.factory.annotation.Autowired abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) { override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted? + val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted? if (started == null) { return null } @@ -26,7 +25,7 @@ abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : return null } - val proceed = events.find { it.event == KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED } + val proceed = events.find { it.event == KafkaEvents.EventMediaWorkProceedPermitted } if (proceed == null && started.type == ProcessType.MANUAL) { log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" } return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 6cec7169..0fd5494f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -24,14 +24,14 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator val preference = Preference.getPreference() override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED + get() = KafkaEvents.EventMediaParameterEncodeCreated override val requiredEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + KafkaEvents.EventMediaProcessStarted, + KafkaEvents.EventMediaReadBaseInfoPerformed, + KafkaEvents.EventMediaParseStreamPerformed, + KafkaEvents.EventMediaReadOutNameAndType ) override fun prerequisitesRequired(events: List): List<() -> Boolean> { @@ -61,7 +61,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator } if (videoInfoWrapper == null || videoInfo == null) { - log.error { "${KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE} result is read as null" } + log.error { "${KafkaEvents.EventMediaReadOutNameAndType} result is read as null" } return null } @@ -74,7 +74,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator outDir = File(videoInfoWrapper.outDirectory), preference = preference.encodePreference, baseInfo = baseInfo, - serializedParsedStreams = serializedParsedStreams + serializedParsedStreams = serializedParsedStreams, + eventId = event.eventId ) } @@ -84,7 +85,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator outDir: File, preference: EncodingPreference, baseInfo: BaseInfoPerformed, - serializedParsedStreams: ParsedMediaStreams + serializedParsedStreams: ParsedMediaStreams, + eventId: String ): MessageDataWrapper { val outVideoFile = outDir.using("${outFullName}.mp4").absolutePath @@ -97,7 +99,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator val vaArgs = toFfmpegWorkerArguments(vArg, aArg) return if (vaArgs.isEmpty()) { - SimpleMessageData(Status.ERROR, message = "Unable to produce arguments") + SimpleMessageData(Status.ERROR, message = "Unable to produce arguments", derivedFromEventId = eventId) } else { FfmpegWorkerArgumentsCreated( status = Status.COMPLETED, @@ -107,7 +109,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator outputFile = outVideoFile, arguments = vaArgs ) - ) + ), + derivedFromEventId = eventId ) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index ef3fcfaa..04d598ef 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -28,13 +28,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato val preference = Preference.getPreference() override val producesEvent: KafkaEvents - get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED + get() = KafkaEvents.EventMediaParameterExtractCreated override val requiredEvents: List = listOf( - KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + KafkaEvents.EventMediaProcessStarted, + KafkaEvents.EventMediaReadBaseInfoPerformed, + KafkaEvents.EventMediaParseStreamPerformed, + KafkaEvents.EventMediaReadOutNameAndType ) @@ -64,7 +64,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato val videoInfo = videoInfoWrapper?.toValueObject() if (videoInfoWrapper == null || videoInfo == null) { - log.error { "${KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE} result is read as null" } + log.error { "${KafkaEvents.EventMediaReadOutNameAndType} result is read as null" } return null } @@ -73,7 +73,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato outFullName = videoInfo.fullName, outDir = File(videoInfoWrapper.outDirectory), baseInfo = baseInfo, - serializedParsedStreams = serializedParsedStreams + serializedParsedStreams = serializedParsedStreams, + eventId = event.eventId ) } @@ -82,7 +83,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato outFullName: String, outDir: File, baseInfo: BaseInfoPerformed, - serializedParsedStreams: ParsedMediaStreams + serializedParsedStreams: ParsedMediaStreams, + eventId: String ): MessageDataWrapper? { val subRootDir = outDir.using("sub") val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments() @@ -94,12 +96,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato ) } if (entries.isEmpty()) { - return SimpleMessageData(status = Status.SKIPPED, "No entries found!") + return SimpleMessageData(status = Status.SKIPPED, "No entries found!", derivedFromEventId = eventId) } return FfmpegWorkerArgumentsCreated( status = Status.COMPLETED, inputFile = inputFile, - entries = entries + entries = entries, + derivedFromEventId = eventId ) } diff --git a/apps/processer/build.gradle.kts b/apps/processer/build.gradle.kts index dbe9a62a..c5ecb4aa 100644 --- a/apps/processer/build.gradle.kts +++ b/apps/processer/build.gradle.kts @@ -53,10 +53,17 @@ dependencies { implementation(project(mapOf("path" to ":shared:kafka"))) - - testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation("org.junit.jupiter:junit-jupiter") + + testImplementation("io.mockk:mockk:1.12.0") + testImplementation("com.h2database:h2:1.4.200") + testImplementation("org.assertj:assertj-core:3.4.1") + + testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2") + testImplementation("io.kotlintest:kotlintest-assertions:3.3.2") + testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0") implementation(kotlin("stdlib-jdk8")) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 5300c93d..d9a19fbb 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service @@ -22,6 +23,15 @@ class Coordinator(): CoordinatorBase = mutableListOf() + fun getRegisteredEventListeners() = coordinatorEventListeners.toList() + fun addCoordinatorEventListener(listener: CoordinatorEvents) { + coordinatorEventListeners.add(listener) + } + fun removeCoordinatorEventListener(listener: CoordinatorEvents) { + coordinatorEventListeners.remove(listener) + } + override fun createTasksBasedOnEventsAndPersistence( referenceId: String, eventId: String, @@ -40,16 +50,18 @@ class Coordinator(): CoordinatorBase>) { - if (!processKafkaEvents.contains(event.key)) { + if (!acceptEvents.contains(event.key)) { + return + } + if (event.key == KafkaEvents.EventNotificationOfWorkItemRemoval) { + handleDeletionOfEvents(event) return } - val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value) + val success = eventManager.setProcessEvent(event.key, event.value) if (!success) { log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}" } } else { - deleteOlderEventsIfSuperseded(event.key, event.value) - io.launch { delay(500) readAllMessagesFor(event.value.referenceId, event.value.eventId) @@ -57,29 +69,20 @@ class Coordinator(): CoordinatorBase) { - val existingMessages = persistentReader.getMessagesFor(value.referenceId) - - val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) } - - - if (KafkaEvents.isOfWork(event)) { - // Here i would need to list all of the work events, then proceed to check which one of the derivedId does not correspond to a entry - // Nonmatching has been superseded - - - - val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId } - superseded.forEach { - persistentWriter.deleteStoredEventDataMessage(referenceId = it.referenceId, eventId = it.eventId, event= it.event ) + private fun handleDeletionOfEvents(kafkaPayload: DeserializedConsumerRecord>) { + if (kafkaPayload.value.data is NotificationOfDeletionPerformed) { + val data = kafkaPayload.value.data as NotificationOfDeletionPerformed + if (data.deletedEvent in processKafkaEvents) { + coordinatorEventListeners.forEach { it.onCancelOrStopProcess(data.deletedEventId) } + eventManager.deleteProcessEvent(kafkaPayload.value.referenceId, data.deletedEventId) } + } else { + log.warn { "Deletion handling was triggered with wrong data" } } } - - fun readAllAvailableInQueue() { - val messages = persistentReader.getAvailableProcessEvents() + val messages = eventManager.getProcessEventsClaimable() io.launch { messages.forEach { delay(1000) @@ -89,15 +92,19 @@ class Coordinator(): CoordinatorBase) { eventsDatabase = DatabaseEnvConfig.toEventsDatabase() eventsDatabase.createDatabase() eventsDatabase.createTables(processerEvents) - persistentReader = PersistentDataReader(eventsDatabase) - persistentWriter = PersistentDataStore(eventsDatabase) + + eventManager = PersistentEventManager(eventsDatabase) + val context = runApplication(*args) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt new file mode 100644 index 00000000..5e6b4ed5 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.processer.controller + +import no.iktdev.mediaprocessing.processer.Coordinator +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Controller +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping + +@Controller +class CancelController(@Autowired var coordinator: Coordinator) { + + @RequestMapping(path = ["/cancel"]) + fun cancelProcess(@RequestBody eventId: String? = null): ResponseEntity { + if (eventId.isNullOrBlank()) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("No eventId provided!") + } + coordinator.getRegisteredEventListeners().forEach { it.onCancelOrStopProcess(eventId) } + return ResponseEntity.ok(null) + } + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt index f88fbf72..65e25a8b 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt @@ -2,17 +2,30 @@ package no.iktdev.mediaprocessing.processer.ffmpeg import com.github.pgreze.process.Redirect import com.github.pgreze.process.process -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.withContext +import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.using import no.iktdev.mediaprocessing.processer.ProcesserEnv +import no.iktdev.mediaprocessing.processer.eventManager import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import java.io.File +import java.time.Duration + +class FfmpegWorker( + val referenceId: String, + val eventId: String, + val info: FfmpegWorkRequestCreated, + val listener: FfmpegWorkerEvents, + val logDir: File +) { + private val scope = Coroutines.io() + private var job: Job? = null + + fun isWorking(): Boolean { + return job != null && (job?.isCompleted != true) && scope.isActive + } -class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents, val logDir: File) { - val scope = Coroutines.io() val decoder = FfmpegProgressDecoder() private val outputCache = mutableListOf() private val log = KotlinLogging.logger {} @@ -44,20 +57,41 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe } } - suspend fun run() { + fun run() { val args = FfmpegWorkerArgumentsBuilder().using(info).build() - execute(args) + job = scope.launch { + execute(args) + } } - suspend fun runWithProgress() { + fun runWithProgress() { val args = FfmpegWorkerArgumentsBuilder().using(info).buildWithProgress() - execute(args) + job = scope.launch { + execute(args) + } } + private suspend fun startIAmAlive() { + scope.launch { + while (scope.isActive && job?.isCompleted != true) { + delay(Duration.ofMinutes(5).toMillis()) + listener.onIAmAlive(referenceId, eventId) + } + } + } + + fun cancel(message: String = "Work was interrupted as requested") { + job?.cancel() + scope.cancel(message) + listener.onError(referenceId, eventId, info, message) + } + + private suspend fun execute(args: List) { withContext(Dispatchers.IO) { logFile.createNewFile() } + startIAmAlive() listener.onStarted(referenceId, eventId, info) val processOp = process( ProcesserEnv.ffmpeg, *args.toTypedArray(), @@ -67,7 +101,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe //log.info { it } onOutputChanged(it) }, - destroyForcibly = true) + destroyForcibly = true + ) val result = processOp onOutputChanged("Received exit code: ${result.resultCode}") @@ -86,7 +121,7 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe decoder.parseVideoProgress(outputCache.toList())?.let { decoded -> try { val _progress = decoder.getProgress(decoded) - if (progress == null || _progress.progress > (progress?.progress ?: -1) ) { + if (progress == null || _progress.progress > (progress?.progress ?: -1)) { progress = _progress listener.onProgressChanged(referenceId, eventId, info, _progress) } @@ -107,8 +142,14 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe } interface FfmpegWorkerEvents { - fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated,) + fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) - fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) + fun onProgressChanged( + referenceId: String, + eventId: String, + info: FfmpegWorkRequestCreated, + progress: FfmpegDecodedProgress + ) + fun onIAmAlive(referenceId: String, eventId: String) {} } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt index c30a5960..980620e2 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt @@ -2,10 +2,7 @@ package no.iktdev.mediaprocessing.processer.services import mu.KotlinLogging import no.iktdev.mediaprocessing.processer.Coordinator -import no.iktdev.mediaprocessing.processer.persistentReader -import no.iktdev.mediaprocessing.processer.persistentWriter -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.processer.eventManager import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled @@ -21,13 +18,12 @@ class ClaimsService() { @Scheduled(fixedDelay = (300_000)) fun validateClaims() { - val expiredClaims = persistentReader.getExpiredClaimsProcessEvents() + val expiredClaims = eventManager.getProcessEventsWithExpiredClaim() expiredClaims.forEach { log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } } - val store = persistentWriter expiredClaims.forEach { - val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) + val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) if (result) { log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" } } else { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index fe735877..89a792d0 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -28,19 +28,29 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired private val log = KotlinLogging.logger {} private val logDir = ProcesserEnv.encodeLogDirectory - override val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED + override val producesEvent = KafkaEvents.EventWorkEncodePerformed override val requiredEvents: List = listOf( - KafkaEvents.EVENT_WORK_ENCODE_CREATED + KafkaEvents.EventWorkEncodeCreated ) - val scope = Coroutines.io() private var runner: FfmpegWorker? = null - private var runnerJob: Job? = null val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + + private final val coordinatorEvents = object: Coordinator.CoordinatorEvents { + override fun onCancelOrStopProcess(eventId: String) { + cancelWorkIfRunning(eventId) + } + } + init { log.info { "Starting with id: $serviceId" } } + override fun attachListener() { + super.attachListener() + coordinator.addCoordinatorEventListener(listener = coordinatorEvents) + } + override fun prerequisitesRequired(events: List): List<() -> Boolean> { return super.prerequisitesRequired(events) + listOf { @@ -53,16 +63,16 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired return null } if (event.data !is FfmpegWorkRequestCreated) { - return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") + return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId) } - val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId) if (isAlreadyClaimed) { log.warn { "Process is already claimed!" } return null } - if (runnerJob?.isActive != true) { + if (runner?.isWorking() != true) { startEncode(event) } else { log.warn { "Worker is already running.." } @@ -78,19 +88,17 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired logDir.mkdirs() } - val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming - persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + eventManager.setProcessEventCompleted(event.referenceId, event.eventId) return } - runnerJob = scope.launch { - runner!!.runWithProgress() - } + runner?.runWithProgress() } else { log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" } @@ -105,7 +113,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired return } log.info { "Encode started for ${runner.referenceId}" } - persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) + eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendProgress(referenceId, eventId, status = WorkStatus.Started, info, FfmpegDecodedProgress( progress = 0, time = "Unkown", @@ -113,13 +121,6 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired speed = "0", ) ) - - scope.launch { - while (runnerJob?.isActive == true) { - delay(java.time.Duration.ofMinutes(5).toMillis()) - persistentWriter.updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId) - } - } } override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) { @@ -129,18 +130,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired return } log.info { "Encode completed for ${runner.referenceId}" } - val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + val consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId) runBlocking { delay(1000) if (!consumedIsSuccessful) { - persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId) } delay(1000) - var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId) while (!readbackIsSuccess) { delay(1000) - readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId) } producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) @@ -179,6 +180,10 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired sendProgress(referenceId, eventId, WorkStatus.Working, info, progress) } + override fun onIAmAlive(referenceId: String, eventId: String) { + super.onIAmAlive(referenceId, eventId) + eventManager.setProcessEventClaimRefresh(referenceId, eventId, serviceId) + } } fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) { @@ -195,13 +200,20 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired fun clearWorker() { - this.runner?.scope?.cancel() this.runner = null } @PreDestroy fun shutdown() { - scope.cancel() - runner?.scope?.cancel("Stopping application") + runner?.cancel("Stopping application") } + + fun cancelWorkIfRunning(eventId: String) { + if (runner?.eventId == eventId) { + runner?.cancel() + } + } + + + } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index aeb37615..47e39c74 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -30,12 +30,11 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire private val logDir = ProcesserEnv.extractLogDirectory - override val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED + override val producesEvent = KafkaEvents.EventWorkExtractPerformed val scope = Coroutines.io() private var runner: FfmpegWorker? = null - private var runnerJob: Job? = null val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" init { @@ -43,7 +42,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire } override val requiredEvents: List - get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED) + get() = listOf(KafkaEvents.EventWorkExtractCreated) override fun prerequisitesRequired(events: List): List<() -> Boolean> { return super.prerequisitesRequired(events) + listOf { @@ -56,16 +55,16 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire return null } if (event.data !is FfmpegWorkRequestCreated) { - return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") + return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId) } - val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId) if (isAlreadyClaimed) { log.warn { "Process is already claimed!" } return null } - if (runnerJob?.isActive != true) { + if (runner?.isWorking() != true) { startExtract(event) } else { log.warn { "Worker is already running.." } @@ -82,7 +81,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire } - val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} extract" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) @@ -90,12 +89,10 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming - persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + eventManager.setProcessEventCompleted(event.referenceId, event.eventId) return } - runnerJob = scope.launch { - runner!!.run() - } + runner!!.run() } else { log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" } } @@ -110,7 +107,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire return } log.info { "Extract started for ${runner.referenceId}" } - persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) + eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendProgress(referenceId, eventId, WorkStatus.Started, info) } @@ -121,12 +118,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire return } log.info { "Extract completed for ${runner.referenceId}" } - var consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + var consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId) runBlocking { delay(1000) limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) { - consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId) } log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" } @@ -134,9 +131,9 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire - var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId) limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) { - readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId) log.info { readbackIsSuccess } } log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" } @@ -189,13 +186,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire fun clearWorker() { - this.runner?.scope?.cancel() this.runner = null } @PreDestroy fun shutdown() { scope.cancel() - runner?.scope?.cancel("Stopping application") + runner?.cancel("Stopping application") } } \ No newline at end of file diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceTest.kt new file mode 100644 index 00000000..22f0f943 --- /dev/null +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceTest.kt @@ -0,0 +1,4 @@ +package no.iktdev.mediaprocessing.processer.services + +class EncodeServiceTest { +} \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt index b08d20d8..3edde598 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt @@ -59,7 +59,7 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo private fun getCurrentState(events: List, processes: Map): SummaryState { val stored = events.findLast { it.event == KafkaEvents.EVENT_COLLECT_AND_STORE } - val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED } + val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted } val completedMediaEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED } val completedRequestEvent = events.findLast { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED } @@ -79,9 +79,9 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo } val workPrepared = events.filter { it.event in listOf( - KafkaEvents.EVENT_WORK_EXTRACT_CREATED, - KafkaEvents.EVENT_WORK_CONVERT_CREATED, - KafkaEvents.EVENT_WORK_ENCODE_CREATED + KafkaEvents.EventWorkExtractCreated, + KafkaEvents.EventWorkConvertCreated, + KafkaEvents.EventWorkEncodeCreated ) } if (workPrepared.isNotEmpty()) { return SummaryState.Pending @@ -92,29 +92,29 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo } val perparation = events.filter { it.event in listOf( - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED, - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, + KafkaEvents.EventMediaParameterExtractCreated, + KafkaEvents.EventMediaParameterEncodeCreated, ) } if (perparation.isNotEmpty()) { return SummaryState.Preparing } - val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) } + val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EventMediaReadOutNameAndType) } if (analyzed2 != null) { return SummaryState.Analyzing } - val waitingForMeta = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED } + val waitingForMeta = events.findLast { it.event == KafkaEvents.EventMediaMetadataSearchPerformed } if (waitingForMeta != null) { return SummaryState.Metadata } - val analyzed = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) } + val analyzed = events.findLast { it.event in listOf(KafkaEvents.EventMediaParseStreamPerformed, KafkaEvents.EventMediaReadBaseInfoPerformed, KafkaEvents.EventMediaReadOutNameAndType) } if (analyzed != null) { return SummaryState.Analyzing } - val readEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED } + val readEvent = events.findLast { it.event == KafkaEvents.EventMediaReadStreamPerformed } if (readEvent != null) { return SummaryState.Read } @@ -133,10 +133,10 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo val processesStatuses = getCurrentStateFromProcesserEvents(procM) val messageStatus = getCurrentState(it, processesStatuses) - val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED }?.data.let { data -> + val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EventMediaReadBaseInfoPerformed }?.data.let { data -> if (data is BaseInfoPerformed) data else null } - val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }?.data.let { data -> + val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EventMediaReadOutNameAndType }?.data.let { data -> if (data is VideoInfoPerformed) data else null } diff --git a/shared/common/build.gradle.kts b/shared/common/build.gradle.kts index 0e9a1311..0af13992 100644 --- a/shared/common/build.gradle.kts +++ b/shared/common/build.gradle.kts @@ -1,7 +1,7 @@ plugins { id("java") kotlin("jvm") - + id("org.jetbrains.kotlin.plugin.serialization") version "1.5.0" // Legg til Kotlin Serialization-plugin } group = "no.iktdev.mediaprocessing.shared" @@ -48,9 +48,13 @@ dependencies { testImplementation("io.mockk:mockk:1.12.0") testImplementation("com.h2database:h2:1.4.200") + testImplementation("org.assertj:assertj-core:3.4.1") + testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2") testImplementation("io.kotlintest:kotlintest-assertions:3.3.2") + testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0") + } tasks.test { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt index b055b9c7..890988f5 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt @@ -20,6 +20,8 @@ abstract class DataSource(val config: DatabaseConnectionConfig) { abstract fun toConnectionUrl(): String + abstract fun toDatabaseConnectionUrl(database: String): String + fun toPortedAddress(): String { var baseAddress = config.address if (!config.port.isNullOrBlank()) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt index 243d39a5..53b0065b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt @@ -17,7 +17,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) { override fun createDatabase(): Database? { val ok = transaction(toDatabaseServerConnection()) { val tmc = TransactionManager.current().connection - val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}'" + val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}';" val stmt = tmc.prepareStatement(query, true) val resultSet = stmt.executeQuery() @@ -52,7 +52,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) { } override fun createDatabaseStatement(): String { - return "CREATE DATABASE ${config.databaseName}" + return "CREATE DATABASE ${config.databaseName};" } protected fun toDatabaseServerConnection(): Database { @@ -66,7 +66,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) { override fun toDatabase(): Database { val database = Database.connect( - "${toConnectionUrl()}/${config.databaseName}", + toDatabaseConnectionUrl(config.databaseName), user = config.username, password = config.password ) @@ -74,6 +74,10 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) { return database } + override fun toDatabaseConnectionUrl(database: String): String { + return toConnectionUrl() + "/$database" + } + override fun toConnectionUrl(): String { return "jdbc:mysql://${toPortedAddress()}" } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt index cbd2c6be..28e409a5 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -1,10 +1,12 @@ package no.iktdev.mediaprocessing.shared.common.datasource +import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.transactions.transaction import java.sql.Connection +import java.sql.SQLIntegrityConstraintViolationException open class TableDefaultOperations { @@ -46,6 +48,11 @@ fun withTransaction(db: Database? = null, block: () -> T): T? { null } } +fun withTransaction(db: DataSource? = null, block: () -> T): T? { + return withTransaction(db?.database, block) +} + + fun insertWithSuccess(db: Database? = null, block: () -> T): Boolean { return try { @@ -125,6 +132,18 @@ fun executeWithStatus(db: Database? = null, block: () -> T): Boolean { } } - +fun executeWithStatus(db: DataSource? = null, block: () -> T): Boolean { + return executeWithStatus(db?.database, block) +} + +fun Exception.isExposedSqlException(): Boolean { + return this is ExposedSQLException +} + +fun ExposedSQLException.isCausedByDuplicateError(): Boolean { + return if (this.cause is SQLIntegrityConstraintViolationException) { + return this.errorCode == 1062 + } else false +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index c9442016..40973623 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -11,6 +11,7 @@ import java.time.LocalDateTime class PersistentDataReader(var dataSource: DataSource) { val dzz = DeserializingRegistry() + @Deprecated("Use PersistentEventManager.getAllEventsGrouped") fun getAllMessages(): List> { val events = withTransaction(dataSource.database) { events.selectAll() @@ -19,6 +20,7 @@ class PersistentDataReader(var dataSource: DataSource) { return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList() } + @Deprecated("Use PersistentEventManager.getEvetnsWith") fun getMessagesFor(referenceId: String): List { return withTransaction(dataSource.database) { events.select { events.referenceId eq referenceId } @@ -27,6 +29,7 @@ class PersistentDataReader(var dataSource: DataSource) { } ?: emptyList() } + @Deprecated("Use PersistentEventManager.getEventsUncompleted") fun getUncompletedMessages(): List> { val result = withDirtyRead(dataSource.database) { events.selectAll() @@ -37,6 +40,7 @@ class PersistentDataReader(var dataSource: DataSource) { return result } + @Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted") fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { val result = withDirtyRead(dataSource.database) { processerEvents.select { @@ -47,6 +51,7 @@ class PersistentDataReader(var dataSource: DataSource) { return result?.claimed ?: true } + @Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted") fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { return withDirtyRead(dataSource.database) { processerEvents.select { @@ -57,6 +62,7 @@ class PersistentDataReader(var dataSource: DataSource) { }?.singleOrNull()?.consumed ?: false } + @Deprecated(message = "Use PersistentEventManager.getProcessEventsClaimable") fun getAvailableProcessEvents(): List { return withDirtyRead(dataSource.database) { processerEvents.select { @@ -66,6 +72,7 @@ class PersistentDataReader(var dataSource: DataSource) { } ?: emptyList() } + @Deprecated("Use PersistentEventManager.getProcessEventsWithExpiredClaim") fun getExpiredClaimsProcessEvents(): List { val deadline = LocalDateTime.now() val entries = withTransaction(dataSource.database) { @@ -77,6 +84,7 @@ class PersistentDataReader(var dataSource: DataSource) { return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline } } + @Deprecated("Use PersistentEventManager.getProcessEventWith") fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { val message = withDirtyRead(dataSource.database) { processerEvents.select { @@ -87,6 +95,7 @@ class PersistentDataReader(var dataSource: DataSource) { return message } + @Deprecated("Use PersistentEventManager.getAllEventsProcesser") fun getProcessEvents(): List { return withTransaction(dataSource.database) { processerEvents.selectAll() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt new file mode 100644 index 00000000..b6f06a86 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -0,0 +1,292 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import kotlinx.coroutines.launch +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.datasource.* +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.javatime.CurrentDateTime +import java.sql.SQLIntegrityConstraintViolationException +import java.time.LocalDateTime +import javax.xml.crypto.Data +import kotlin.coroutines.coroutineContext + +private val log = KotlinLogging.logger {} + +class PersistentEventManager(private val dataSource: DataSource) { + val dzz = DeserializingRegistry() + + + /** + * Deletes the events + */ + private fun deleteSupersededEvents(superseded: List) { + withTransaction(dataSource) { + superseded.forEach { duplicate -> + events.deleteWhere { + (events.referenceId eq duplicate.referenceId) and + (events.eventId eq duplicate.eventId) and + (events.event eq duplicate.event.event) + } + } + } + } + + + /** + * @param referenceId Reference + * @param eventId Current eventId for the message, required to prevent deletion of itself + * @param event Current event for the message + */ + private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents) { + val present = getEventsWith(referenceId).filter { it.eventId != eventId } + + val superseded = present.filter { it.event == event && it.eventId != eventId } + val availableForRemoval = mutableListOf() + val helper = PersistentMessageHelper(present) + superseded.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) } + + deleteSupersededEvents(availableForRemoval) + + } + + + //region Database read + + fun getEventsWith(referenceId: String): List { + return withDirtyRead(dataSource.database) { + events.select { + (events.referenceId eq referenceId) + } + .orderBy(events.created, SortOrder.ASC) + .toPersistentMessage(dzz) + } ?: emptyList() + } + + fun getProcessEventWith(referenceId: String, eventId: String): PersistentProcessDataMessage? { + return withDirtyRead(dataSource.database) { + processerEvents.select { + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }.toPersistentProcesserMessage(dzz) + }?.singleOrNull() + } + + fun getAllEvents(): List { + return withDirtyRead(dataSource.database) { + events.selectAll() + .toPersistentMessage(dzz) + } ?: emptyList() + } + + fun getAllEventsGrouped(): List> { + return getAllEvents().toGrouped() + } + + fun getAllProcessEvents(): List { + return withDirtyRead(dataSource.database) { + processerEvents.selectAll() + .toPersistentProcesserMessage(dzz) + } ?: emptyList() + } + + fun getEventsUncompleted(): List> { + val identifiesAsCompleted = listOf( + KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED, + KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED, + KafkaEvents.EVENT_COLLECT_AND_STORE + ) + val all = getAllEventsGrouped() + return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } } + } + + fun getProcessEventsUncompleted(): List { + return withTransaction(dataSource.database) { + processerEvents.select { + (processerEvents.consumed eq false) + }.toPersistentProcesserMessage(dzz) + } ?: emptyList() + } + + fun getProcessEventsClaimable(): List { + return withTransaction(dataSource.database) { + processerEvents.select { + (processerEvents.consumed eq false) and + (processerEvents.claimed eq false) + }.toPersistentProcesserMessage(dzz) + } ?: emptyList() + } + + fun getProcessEventsWithExpiredClaim(): List { + val deadline = LocalDateTime.now() + return getProcessEventsUncompleted() + .filter { it.claimed && if (it.lastCheckIn != null) it.lastCheckIn.plusMinutes(15) < deadline else true } + } + + fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean { + return getProcessEventWith(referenceId, eventId)?.claimed ?: false + } + + fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean { + return getProcessEventWith(referenceId, eventId)?.consumed ?: false + } + + //endregion + + //region Database write + + /** + * Stores the kafka event and its data in the database as PersistentMessage + * @param event KafkaEvents + * @param message Kafka message object + */ + fun setEvent(event: KafkaEvents, message: Message<*>): Boolean { + val existing = getEventsWith(message.referenceId) + val derivedId = message.data?.derivedFromEventId + if (derivedId != null) { + val isNewEventOrphan = existing.none { it.eventId == derivedId } + if (isNewEventOrphan) { + log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" } + return false + } + } + + val exception = executeOrException(dataSource.database) { + events.insert { + it[referenceId] = message.referenceId + it[eventId] = message.eventId + it[events.event] = event.event + it[data] = message.dataAsJson() + } + } + val success = if (exception != null) { + if (exception.isExposedSqlException()) { + if ((exception as ExposedSQLException).isCausedByDuplicateError()) { + log.info { "Error is of SQLIntegrityConstraintViolationException" } + } else { + log.info { "Error code is: ${exception.errorCode}" } + exception.printStackTrace() + } + } else { + exception.printStackTrace() + } + false + } else { + true + } + if (success) { + deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event) + } + return success + } + + fun setProcessEvent(event: KafkaEvents, message: Message<*>): Boolean { + val exception = executeOrException(dataSource.database) { + processerEvents.insert { + it[processerEvents.referenceId] = message.referenceId + it[processerEvents.eventId] = message.eventId + it[processerEvents.event] = event.event + it[processerEvents.data] = message.dataAsJson() + } + } + return if (exception != null) { + if (exception.isExposedSqlException()) { + if ((exception as ExposedSQLException).isCausedByDuplicateError()) { + log.info { "Error is of SQLIntegrityConstraintViolationException" } + } else { + log.info { "Error code is: ${exception.errorCode}" } + exception.printStackTrace() + } + } + false + } else { + true + } + } + + fun setProcessEventClaim(referenceId: String, eventId: String, claimer: String): Boolean { + return executeWithStatus(dataSource.database) { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimed eq false) and + (processerEvents.consumed eq false) + }) { + it[claimedBy] = claimer + it[lastCheckIn] = CurrentDateTime + it[claimed] = true + } + } + } + + fun setProcessEventCompleted(referenceId: String, eventId: String): Boolean { + return executeWithStatus(dataSource) { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }) { + it[consumed] = true + it[claimed] = true + } + } + } + + fun setProcessEventClaimRefresh(referenceId: String, eventId: String, claimer: String): Boolean { + return executeWithStatus(dataSource) { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimed eq true) and + (processerEvents.claimedBy eq claimer) + }) { + it[lastCheckIn] = CurrentDateTime + } + } + } + + /** + * Removes the claim set on the process event + */ + fun deleteProcessEventClaim(referenceId: String, eventId: String): Boolean { + return executeWithStatus(dataSource) { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }) { + it[claimed] = false + it[claimedBy] = null + it[lastCheckIn] = null + } + } + } + + fun deleteProcessEvent(referenceId: String, eventId: String): Boolean { + return executeWithStatus (dataSource) { + processerEvents.deleteWhere { + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + } + } + } + + //endregion + +} + + +fun List?.toGrouped(): List> { + return this?.groupBy { it.referenceId }?.mapNotNull { it.value } ?: emptyList() +} + +fun Query?.toPersistentMessage(dzz: DeserializingRegistry): List { + return this?.mapNotNull { fromRowToPersistentMessage(it, dzz) } ?: emptyList() +} + +fun Query?.toPersistentProcesserMessage(dzz: DeserializingRegistry): List { + return this?.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } ?: emptyList() +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 8fdedfd6..05f8e867 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -38,7 +38,40 @@ fun PersistentMessage.isSkipped(): Boolean { } } +class PersistentMessageHelper(val messages: List) { + fun findOrphanedEvents(): List { + val withDerivedId = messages.filter { it.data.derivedFromEventId != null } + val idsFlat = messages.map { it.eventId } + return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat } + } + + fun getCascadingFrom(eventId: String): List { + val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList() + val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null } + + val derivedEventsMap = mutableMapOf>() + for (event in usableEvents) { + derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId) + } + val eventsToDelete = mutableSetOf() + + // Utfør DFS for å finne alle avledede hendelser som skal slettes + dfs(triggered.eventId, derivedEventsMap, eventsToDelete) + + return messages.filter { it.eventId in eventsToDelete } + } + + /** + * @param eventId Initial eventId + */ + fun dfs(eventId: String, derivedEventsMap: Map>, eventsToDelete: MutableSet) { + eventsToDelete.add(eventId) + derivedEventsMap[eventId]?.forEach { derivedEventId -> + dfs(derivedEventId, derivedEventsMap, eventsToDelete) + } + } +} fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { val kev = try { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt index 635e9a62..736c935a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -25,7 +25,7 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents) } @PostConstruct - fun attachListener() { + open fun attachListener() { coordinator.listeners.add(getListener()) } diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt index bffcaa7a..e9abdea1 100644 --- a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt @@ -11,9 +11,21 @@ import javax.sql.DataSource class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource( DatabaseConnectionConfig( - databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password, port = null + databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = "", port = null ) ) { + + companion object { + val connectionUrl = "jdbc:h2:test;MODE=MySQL" //"jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;" + fun getDatasource(): JdbcDataSource { + val ds = JdbcDataSource() + ds.setUrl(connectionUrl) + ds.user = "test" + ds.password = "" + return ds + } + } + override fun getConnection(): Connection { return jdbcDataSource.connection } @@ -61,6 +73,6 @@ class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: Str } override fun toConnectionUrl(): String { - return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;" + return connectionUrl // "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;" } } diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource2.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource2.kt new file mode 100644 index 00000000..ce799afa --- /dev/null +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource2.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.shared.common + +import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig +import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import org.jetbrains.exposed.sql.Database + +class H2DataSource2(conf: DatabaseConnectionConfig): MySqlDataSource(conf) { + + override fun createDatabaseStatement(): String { + return "CREATE SCHEMA ${config.databaseName};" + } + + override fun toDatabaseConnectionUrl(database: String): String { + return toConnectionUrl() + } + override fun toDatabase(): Database { + return super.toDatabase() + } + override fun toConnectionUrl(): String { + return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;CASE_INSENSITIVE_IDENTIFIERS=TRUE;" + } + +} \ No newline at end of file diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/PersistentMessageFromJsonDump.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/PersistentMessageFromJsonDump.kt new file mode 100644 index 00000000..e328b2ca --- /dev/null +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/PersistentMessageFromJsonDump.kt @@ -0,0 +1,56 @@ +package no.iktdev.mediaprocessing.shared.common + +import kotlinx.serialization.json.* +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.events +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.json.JSONArray +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter + + +class PersistentMessageFromJsonDump(events: String) { + private var data: JsonArray? + + init { + val jsonArray = Json.parseToJsonElement(events) as JsonArray + data = jsonArray.firstOrNull { it.jsonObject["data"] != null }?.jsonObject?.get("data") as? JsonArray + } + + fun getPersistentMessages(): List { + return data?.mapNotNull { + try { + mapToPersistentMessage(it) + } catch (e: Exception) { + System.err.print(it.toString()) + e.printStackTrace() + null + } + } ?: emptyList() + } + + val dzz = DeserializingRegistry() + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS") + private fun mapToPersistentMessage(e: JsonElement): PersistentMessage? { + val referenceId: String = e.jsonObject["referenceId"]?.jsonPrimitive?.content ?: throw RuntimeException("No ReferenceId found") + val eventId: String = e.jsonObject["eventId"]?.jsonPrimitive?.content ?: throw RuntimeException("No EventId") + val event: String = e.jsonObject["event"]?.jsonPrimitive?.content ?: throw RuntimeException("No Event") + val data: String = e.jsonObject["data"]?.jsonPrimitive?.content ?: throw RuntimeException("No data") + val created: String = e.jsonObject["created"]?.jsonPrimitive?.content ?: throw RuntimeException("No Created date time found") + + val kev = KafkaEvents.toEvent(event) ?: throw RuntimeException("Not able to convert event to Enum") + val dzdata = dzz.deserializeData(kev, data) + + return PersistentMessage( + referenceId = referenceId, + eventId = eventId, + event = kev, + data = dzdata, + created = LocalDateTime.parse(created, formatter) + ) + + } + + +} \ No newline at end of file diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt new file mode 100644 index 00000000..5220eebd --- /dev/null +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/tests/PersistentEventMangerTest.kt @@ -0,0 +1,257 @@ +package no.iktdev.mediaprocessing.shared.common.tests + +import no.iktdev.mediaprocessing.shared.common.H2DataSource2 +import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig +import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager +import no.iktdev.mediaprocessing.shared.common.persistance.events +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData +import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted +import org.junit.jupiter.api.Test +import java.util.UUID +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.exposed.sql.deleteAll + + +class PersistentEventMangerTest { + val defaultReferenceId = UUID.randomUUID().toString() + val dataSource = H2DataSource2(DatabaseConnectionConfig( + address = "", + username = "", + password = "", + databaseName = "test", + port = null + )) + val eventManager: PersistentEventManager = PersistentEventManager(dataSource) + + init { + val kafkaTables = listOf( + events, // For kafka + ) + dataSource.createDatabase() + dataSource.createTables(*kafkaTables.toTypedArray()) + } + + @Test + fun testDatabaseIsCreated() { + val success = dataSource.createDatabase() + assertThat(success).isNotNull() + } + + @Test + fun testDatabaseInit() { + val referenceId = UUID.randomUUID().toString() + val mStart = Message( + referenceId = referenceId, + eventId = UUID.randomUUID().toString(), + data = MediaProcessStarted( + status = Status.COMPLETED, + file = "Nan" + ) + ) + eventManager.setEvent(KafkaEvents.EventMediaProcessStarted, mStart) + val stored = eventManager.getEventsWith(referenceId); + assertThat(stored).isNotEmpty() + } + + @Test + fun testSuperseded1() { + val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()) + val oldStack = listOf( + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + ) + eventManager.setEvent(startEvent.event, startEvent.message) + for (entry in oldStack) { + eventManager.setEvent(entry.event, entry.message) + } + val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId) + assertThat(currentTableWithOldStack).hasSize(oldStack.size +1) + + val supersedingStack = listOf( + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")) + ).forEach {entry -> eventManager.setEvent(entry.event, entry.message)} + + + // Final check + + val result = eventManager.getEventsWith(defaultReferenceId) + val idsThatShouldBeRemoved = listOf( + "9e8f2e04-4950-437f-a203-cfd566203078", + "af7f2519-0f1d-4679-82bd-0314d1b97b68" + ) + val search = result.filter { it.eventId in idsThatShouldBeRemoved } + assertThat(search).isEmpty() + + + val expectedInList = listOf( + startEvent.message.eventId, + "48c72454-6c7b-406b-b598-fc0a961dabde", + "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", + "f6cae204-7c8e-4003-b598-f7b4e566d03e", + "cbb1e871-e9a5-496d-a655-db719ac4903c", + "98a39721-41ff-4d79-905e-ced260478524", + "2c3a40bb-2225-4dd4-a8c3-32c6356f8764" + ) + val searchForExpected = result.map { it.eventId } + assertThat(expectedInList).isEqualTo(searchForExpected) + withTransaction(dataSource) { + events.deleteAll() + } + } + + @Test + fun testSuperseded2() { + val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { + eventManager.setEvent(it.event, it.message) + } + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)), + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val toBeReplaced = listOf( + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + + val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId) + assertThat(currentTableWithOldStack).hasSize(keepStack.size + toBeReplaced.size +1) + + val supersedingStack = listOf( + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")) + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + + // Final check + + val result = eventManager.getEventsWith(defaultReferenceId) + + val idsRemoved = toBeReplaced.map { it.message.eventId } + val search = result.filter { it.eventId in idsRemoved } + assertThat(search).isEmpty() + + + val expectedInList = listOf(startEvent.message.eventId) + keepStack.map { it.message.eventId } + supersedingStack.map { it.message.eventId } + val searchForExpected = result.map { it.eventId } + assertThat(expectedInList).isEqualTo(searchForExpected) + + withTransaction(dataSource) { + events.deleteAll() + } + } + + @Test + fun testSuperseded3() { + val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { + eventManager.setEvent(it.event, it.message) + } + val keepStack = listOf( + EventToMessage(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)), + + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + val toBeReplaced = listOf( + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")), + EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed, + createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")), + EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed, + createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")), + EventToMessage(KafkaEvents.EventMediaReadOutCover, + createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaReadOutNameAndType, + createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")), + EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated, + createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + EventToMessage(KafkaEvents.EventMediaParameterExtractCreated, + createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")), + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + + val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId) + assertThat(currentTableWithOldStack).hasSize(keepStack.size + toBeReplaced.size +1) + + val supersedingStack = listOf( + EventToMessage(KafkaEvents.EventMediaParseStreamPerformed, + createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "48c72454-6c7b-406b-b598-fc0a961dabde")) + ).onEach { entry -> eventManager.setEvent(entry.event, entry.message) } + + + // Final check + + val result = eventManager.getEventsWith(defaultReferenceId) + + val idsRemoved = toBeReplaced.map { it.message.eventId } + val search = result.filter { it.eventId in idsRemoved } + assertThat(search).isEmpty() + + + val expectedInList = listOf(startEvent.message.eventId) + keepStack.map { it.message.eventId } + supersedingStack.map { it.message.eventId } + val searchForExpected = result.map { it.eventId } + assertThat(expectedInList).isEqualTo(searchForExpected) + + withTransaction(dataSource) { + events.deleteAll() + } + } + + @Test + fun testDerivedOrphanNotInserted() { + val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also { + eventManager.setEvent(it.event, it.message) + } + val result = eventManager.setEvent(KafkaEvents.EventMediaReadStreamPerformed, + createMessage(derivedFromEventId = UUID.randomUUID().toString())) + assertThat(result).isFalse() + } + + data class EventToMessage(val event: KafkaEvents, val message: Message<*>) + + private fun createMessage(referenceId: String = defaultReferenceId, eventId: String = UUID.randomUUID().toString(), derivedFromEventId: String? = null): Message{ + return Message( + referenceId = referenceId, + eventId = eventId, + data = SimpleMessageData( + status = Status.COMPLETED, + message = "Potato", + derivedFromEventId = derivedFromEventId + ) + ) + } + +} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index eec256e3..47a136ca 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -15,30 +15,29 @@ class DeserializingRegistry { companion object { val deserializables = mutableMapOf( - KafkaEvents.EVENT_MEDIA_PROCESS_STARTED to MediaProcessStarted::class.java, - KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class.java, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java, - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class.java, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to VideoInfoPerformed::class.java, - KafkaEvents.EVENT_MEDIA_READ_OUT_COVER to CoverInfoPerformed::class.java, - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java, - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java, - KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null, - KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null, + KafkaEvents.EventMediaProcessStarted to MediaProcessStarted::class.java, + KafkaEvents.EventMediaReadStreamPerformed to ReaderPerformed::class.java, + KafkaEvents.EventMediaParseStreamPerformed to MediaStreamsParsePerformed::class.java, + KafkaEvents.EventMediaReadBaseInfoPerformed to BaseInfoPerformed::class.java, + KafkaEvents.EventMediaMetadataSearchPerformed to MetadataPerformed::class.java, + KafkaEvents.EventMediaReadOutNameAndType to VideoInfoPerformed::class.java, + KafkaEvents.EventMediaReadOutCover to CoverInfoPerformed::class.java, + KafkaEvents.EventMediaParameterEncodeCreated to FfmpegWorkerArgumentsCreated::class.java, + KafkaEvents.EventMediaParameterExtractCreated to FfmpegWorkerArgumentsCreated::class.java, + KafkaEvents.EventMediaParameterConvertCreated to null, + KafkaEvents.EventMediaParameterDownloadCoverCreated to null, - KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java, - KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java, + KafkaEvents.EventNotificationOfWorkItemRemoval to NotificationOfDeletionPerformed::class.java, - KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to ProcesserEncodeWorkPerformed::class.java, - KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to ProcesserExtractWorkPerformed::class.java, - KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to ConvertWorkPerformed::class.java, - KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to CoverDownloadWorkPerformed::class.java, + KafkaEvents.EventWorkEncodeCreated to FfmpegWorkRequestCreated::class.java, + KafkaEvents.EventWorkExtractCreated to FfmpegWorkRequestCreated::class.java, + KafkaEvents.EventWorkConvertCreated to ConvertWorkerRequest::class.java, + + KafkaEvents.EventWorkEncodePerformed to ProcesserEncodeWorkPerformed::class.java, + KafkaEvents.EventWorkExtractPerformed to ProcesserExtractWorkPerformed::class.java, + KafkaEvents.EventWorkConvertPerformed to ConvertWorkPerformed::class.java, + KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java, - KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null, - KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null, - KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null, KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java ) diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index d82d0f84..06adb2c8 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -1,31 +1,34 @@ package no.iktdev.mediaprocessing.shared.kafka.core enum class KafkaEvents(val event: String) { - EVENT_MEDIA_PROCESS_STARTED("event:media-process:started"), + EventMediaProcessStarted("event:media-process:started"), EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"), - EVENT_MEDIA_READ_STREAM_PERFORMED("event:media-read-stream:performed"), - EVENT_MEDIA_PARSE_STREAM_PERFORMED("event:media-parse-stream:performed"), - EVENT_MEDIA_READ_BASE_INFO_PERFORMED("event:media-read-base-info:performed"), - EVENT_MEDIA_METADATA_SEARCH_PERFORMED("event:media-metadata-search:performed"), - EVENT_MEDIA_READ_OUT_NAME_AND_TYPE("event:media-read-out-name-and-type:performed"), - EVENT_MEDIA_READ_OUT_COVER("event:media-read-out-cover:performed"), + EventMediaReadStreamPerformed("event:media-read-stream:performed"), + EventMediaParseStreamPerformed("event:media-parse-stream:performed"), + EventMediaReadBaseInfoPerformed("event:media-read-base-info:performed"), + EventMediaMetadataSearchPerformed("event:media-metadata-search:performed"), + EventMediaReadOutNameAndType("event:media-read-out-name-and-type:performed"), + EventMediaReadOutCover("event:media-read-out-cover:performed"), - EVENT_MEDIA_ENCODE_PARAMETER_CREATED("event:media-encode-parameter:created"), - EVENT_MEDIA_EXTRACT_PARAMETER_CREATED("event:media-extract-parameter:created"), - EVENT_MEDIA_CONVERT_PARAMETER_CREATED("event:media-convert-parameter:created"), - EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED("event:media-download-cover-parameter:created"), + EventMediaParameterEncodeCreated("event:media-encode-parameter:created"), + EventMediaParameterExtractCreated("event:media-extract-parameter:created"), + EventMediaParameterConvertCreated("event:media-convert-parameter:created"), + EventMediaParameterDownloadCoverCreated("event:media-download-cover-parameter:created"), - EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted"), + EventMediaWorkProceedPermitted("event:media-work-proceed:permitted"), - EVENT_WORK_ENCODE_CREATED("event:work-encode:created"), - EVENT_WORK_EXTRACT_CREATED("event:work-extract:created"), - EVENT_WORK_CONVERT_CREATED("event:work-convert:created"), + // This event is to be used for commuincating across all appss taht an event has ben removed and to rterminate existint events + EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"), - EVENT_WORK_ENCODE_PERFORMED("event:work-encode:performed"), - EVENT_WORK_EXTRACT_PERFORMED("event:work-extract:performed"), - EVENT_WORK_CONVERT_PERFORMED("event:work-convert:performed"), - EVENT_WORK_DOWNLOAD_COVER_PERFORMED("event:work-download-cover:performed"), + EventWorkEncodeCreated("event:work-encode:created"), + EventWorkExtractCreated("event:work-extract:created"), + EventWorkConvertCreated("event:work-convert:created"), + + EventWorkEncodePerformed("event:work-encode:performed"), + EventWorkExtractPerformed("event:work-extract:performed"), + EventWorkConvertPerformed("event:work-convert:performed"), + EventWorkDownloadCoverPerformed("event:work-download-cover:performed"), EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"), @@ -45,13 +48,13 @@ enum class KafkaEvents(val event: String) { fun isOfWork(event: KafkaEvents): Boolean { return event in listOf( - EVENT_WORK_CONVERT_CREATED, - EVENT_WORK_EXTRACT_CREATED, - EVENT_WORK_ENCODE_CREATED, + EventWorkConvertCreated, + EventWorkExtractCreated, + EventWorkEncodeCreated, - EVENT_WORK_ENCODE_PERFORMED, - EVENT_WORK_CONVERT_PERFORMED, - EVENT_WORK_EXTRACT_PERFORMED + EventWorkEncodePerformed, + EventWorkConvertPerformed, + EventWorkExtractPerformed ) } } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt index 56174eaa..e16e623c 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt @@ -3,15 +3,17 @@ package no.iktdev.mediaprocessing.shared.kafka.dto abstract class MessageDataWrapper( @Transient open val status: Status = Status.ERROR, - @Transient open val message: String? = null + @Transient open val message: String? = null, + @Transient open val derivedFromEventId: String? = null ) data class SimpleMessageData( override val status: Status, - override val message: String? = null -) : MessageDataWrapper(status, message) + override val message: String? = null, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, message, derivedFromEventId) fun MessageDataWrapper?.isSuccess(): Boolean { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt index b3e19fd9..7efc60fc 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt @@ -5,12 +5,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaReadBaseInfoPerformed) data class BaseInfoPerformed( override val status: Status, val title: String, - val sanitizedName: String -) : MessageDataWrapper(status) + val sanitizedName: String, + override val derivedFromEventId: String +) : MessageDataWrapper(status = status, derivedFromEventId = derivedFromEventId) fun BaseInfoPerformed?.hasValidData(): Boolean { return this != null && this.title.isNotBlank() && this.sanitizedName.isNotBlank() diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt index f53f3693..dfceeba8 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt @@ -5,11 +5,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertPerformed) data class ConvertWorkPerformed( override val status: Status, override val message: String? = null, val producedBy: String, - val derivedFromEventId: String, + override val derivedFromEventId: String, val outFiles: List = listOf() -): MessageDataWrapper(status, message) \ No newline at end of file +): MessageDataWrapper(status, message, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt index 38d32b95..2646b497 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt @@ -6,13 +6,14 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED) +@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertCreated) data class ConvertWorkerRequest( override val status: Status, val requiresEventId: String? = null, + override val derivedFromEventId: String? = null, val inputFile: String, val allowOverwrite: Boolean, val outFileBaseName: String, val outDirectory: String, val outFormats: List = listOf() -): MessageDataWrapper(status) \ No newline at end of file +): MessageDataWrapper(status, derivedFromEventId = derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt index 4565bf8f..ba90b57f 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt @@ -5,9 +5,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventWorkDownloadCoverPerformed) data class CoverDownloadWorkPerformed( override val status: Status, override val message: String? = null, - val coverFile: String -): MessageDataWrapper(status, message) + val coverFile: String, + override val derivedFromEventId: String? +): MessageDataWrapper(status, message, derivedFromEventId = derivedFromEventId) diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt index 851d76d6..567e929a 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt @@ -5,11 +5,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_COVER) +@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutCover) data class CoverInfoPerformed( override val status: Status, val url: String, val outDir: String, - val outFileBaseName: String -) - : MessageDataWrapper(status) \ No newline at end of file + val outFileBaseName: String, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt index dfe44cf7..dbb09c27 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt @@ -6,13 +6,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status @KafkaBelongsToEvent( - KafkaEvents.EVENT_WORK_ENCODE_CREATED, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED + KafkaEvents.EventWorkEncodeCreated, + KafkaEvents.EventWorkExtractCreated ) data class FfmpegWorkRequestCreated( override val status: Status, - val derivedFromEventId: String, val inputFile: String, val arguments: List, - val outFile: String -): MessageDataWrapper(status) \ No newline at end of file + val outFile: String, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt index e0b8e355..891da313 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt @@ -12,15 +12,15 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status * @param arguments Requires arguments, instructions for what ffmpeg should do */ @KafkaBelongsToEvent( - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED + KafkaEvents.EventMediaParameterEncodeCreated, + KafkaEvents.EventMediaParameterExtractCreated ) data class FfmpegWorkerArgumentsCreated( override val status: Status, val inputFile: String, // absolutePath - val entries: List -): - MessageDataWrapper(status) + val entries: List, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) data class FfmpegWorkerArgument( val outputFile: String, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt index de05cebd..caba7b5e 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt @@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaProcessStarted) data class MediaProcessStarted( override val status: Status, val type: ProcessType = ProcessType.FLOW, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt index 7164a584..fe86829a 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt @@ -6,9 +6,9 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaParseStreamPerformed) data class MediaStreamsParsePerformed( override val status: Status, - val streams: ParsedMediaStreams - -): MessageDataWrapper(status) \ No newline at end of file + val streams: ParsedMediaStreams, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt index f7133b61..f7f1a5ea 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt @@ -5,12 +5,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaMetadataSearchPerformed) data class MetadataPerformed( override val status: Status, override val message: String? = null, - val data: pyMetadata? = null - ) : MessageDataWrapper(status, message) + val data: pyMetadata? = null, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) data class pyMetadata( val title: String, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/NotificationOfDeletionPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/NotificationOfDeletionPerformed.kt new file mode 100644 index 00000000..a83f8552 --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/NotificationOfDeletionPerformed.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.shared.kafka.dto.events_result + +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.Status + +@KafkaBelongsToEvent(KafkaEvents.EventNotificationOfWorkItemRemoval) +data class NotificationOfDeletionPerformed( + override val status: Status = Status.COMPLETED, + override val message: String? = null, + override val derivedFromEventId: String? = null, // Skal aldri settes derived + val deletedEventId: String, + val deletedEvent: KafkaEvents +): MessageDataWrapper() diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt index f356c213..92ab2f04 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt @@ -7,6 +7,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED) data class ProcessCompleted( - override val status: Status -) : MessageDataWrapper(status) { -} \ No newline at end of file + override val status: Status, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt index 8382fd42..6d535ea0 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt @@ -6,9 +6,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaReadStreamPerformed) data class ReaderPerformed( override val status: Status, val file: String, //AbsolutePath - val output: JsonObject -) : MessageDataWrapper(status) \ No newline at end of file + val output: JsonObject, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt index 3f0f7154..470dbcbc 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt @@ -7,13 +7,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) +@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType) data class VideoInfoPerformed( override val status: Status, val info: JsonObject, - val outDirectory: String -) - : MessageDataWrapper(status) { + val outDirectory: String, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) { fun toValueObject(): VideoInfo? { val type = info.get("type").asString return when (type) { @@ -46,7 +46,7 @@ data class SubtitleInfo( val language: String ) -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) +@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType) open class VideoInfo( @Transient open val type: String, @Transient open val title: String, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt index eed1469e..476fb182 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt @@ -7,12 +7,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status // Derived from ffmpeg work @KafkaBelongsToEvent( - KafkaEvents.EVENT_WORK_ENCODE_PERFORMED + KafkaEvents.EventWorkEncodePerformed ) data class ProcesserEncodeWorkPerformed( override val status: Status, override val message: String? = null, val producedBy: String, - val derivedFromEventId: String, - val outFile: String? = null -): MessageDataWrapper(status, message) \ No newline at end of file + val outFile: String? = null, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt index 8a5a4d8f..c9589977 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt @@ -7,12 +7,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status // Derived from ffmpeg work @KafkaBelongsToEvent( - KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED + KafkaEvents.EventWorkExtractPerformed ) data class ProcesserExtractWorkPerformed( override val status: Status, override val message: String? = null, val producedBy: String, - val derivedFromEventId: String, - val outFile: String? = null -): MessageDataWrapper(status, message) \ No newline at end of file + val outFile: String? = null, + override val derivedFromEventId: String? +) : MessageDataWrapper(status, derivedFromEventId) \ No newline at end of file diff --git a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt index 71d96c53..0f606c28 100644 --- a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt +++ b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt @@ -28,7 +28,7 @@ class SerializationTest { val json = gson.toJson(message) val deserializer = DeserializingRegistry() - val result = deserializer.deserialize(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, json) + val result = deserializer.deserialize(KafkaEvents.EventMediaProcessStarted, json) assertThat(result.data).isInstanceOf(MediaProcessStarted::class.java)