From f8a69ee6205f9ce83c1431ef2f04b281915d18f1 Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 21 Feb 2025 22:48:02 +0100 Subject: [PATCH] New completion --- .../CoordinatorEventCoordinator.kt | 12 ++-- .../coordinator/EventsDatabase.kt | 2 +- .../controller/RequestEventController.kt | 6 +- .../listeners/CompletedTaskListener.kt | 19 ++---- .../listeners/ConvertWorkTaskListener.kt | 8 +-- .../EncodeWorkArgumentsTaskListener.kt | 8 +-- .../ExtractWorkArgumentsTaskListener.kt | 8 +-- .../ReadMediaFileStreamsTaskListener.kt | 7 +- .../tasksV2/mapping/EventsSummaryMapping.kt | 68 +++++++++++++++++++ ...sedItemsStore.kt => ProcessedFileStore.kt} | 20 +++--- .../tasksV2/validator/CompletionValidator.kt | 22 +++--- .../contract/data/MediaProcessStartEvent.kt | 10 +-- .../shared/common/contract/dto/Enums.kt | 4 +- .../common/contract/dto/EventSummary.kt | 20 ++++++ .../tables/{processed.kt => processedFile.kt} | 10 +-- 15 files changed, 141 insertions(+), 83 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EventsSummaryMapping.kt rename apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/{ProcessedItemsStore.kt => ProcessedFileStore.kt} (57%) create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/EventSummary.kt rename shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/{processed.kt => processedFile.kt} (52%) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index b6017365..4b6251b6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -11,7 +11,7 @@ import no.iktdev.mediaprocessing.shared.common.contract.data.Event import no.iktdev.mediaprocessing.shared.common.contract.data.MediaProcessStartEvent import no.iktdev.mediaprocessing.shared.common.contract.data.PermitWorkCreationEvent import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.database.cal.EventsManager import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.ApplicationContext @@ -36,15 +36,15 @@ class Coordinator( } public fun startProcess(file: File, type: ProcessType) { - val operations: List = listOf( - StartOperationEvents.ENCODE, - StartOperationEvents.EXTRACT, - StartOperationEvents.CONVERT + val operations: List = listOf( + OperationEvents.ENCODE, + OperationEvents.EXTRACT, + OperationEvents.CONVERT ) startProcess(file, type, operations) } - fun startProcess(file: File, type: ProcessType, operations: List): UUID { + fun startProcess(file: File, type: ProcessType, operations: List): UUID { val referenceId: UUID = UUID.randomUUID() val event = MediaProcessStartEvent( metadata = EventMetadata( diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsDatabase.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsDatabase.kt index 9c6fe31f..5a0f97ec 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsDatabase.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsDatabase.kt @@ -11,7 +11,7 @@ class EventsDatabase() { allEvents, tasks, runners, - processed, + processedFile, files ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt index 2487da2d..7edeb71e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt @@ -4,7 +4,7 @@ import com.google.gson.Gson import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.shared.common.contract.ProcessType import no.iktdev.mediaprocessing.shared.common.contract.dto.EventRequest -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity @@ -30,7 +30,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) { if (!file.exists()) { return ResponseEntity.status(HttpStatus.NO_CONTENT).body(convert.file) } - referenceId = coordinator.startProcess(file, ProcessType.FLOW, listOf(StartOperationEvents.CONVERT)).toString() + referenceId = coordinator.startProcess(file, ProcessType.FLOW, listOf(OperationEvents.CONVERT)).toString() } catch (e: Exception) { return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Gson().toJson(convert)) @@ -49,7 +49,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) { if (!file.exists()) { return ResponseEntity.status(HttpStatus.NO_CONTENT).body(payload) } - referenceId = coordinator.startProcess(file, ProcessType.MANUAL, listOf(StartOperationEvents.EXTRACT)).toString() + referenceId = coordinator.startProcess(file, ProcessType.MANUAL, listOf(OperationEvents.EXTRACT)).toString() } catch (e: Exception) { return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(payload) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index c9ce1eb1..5d01c1b8 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -5,26 +5,16 @@ import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.data.* import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener -import no.iktdev.mediaprocessing.coordinator.getStoreDatabase -import no.iktdev.eventi.database.executeOrException -import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.EventsSummaryMapping import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store.* import no.iktdev.mediaprocessing.coordinator.tasksV2.validator.CompletionValidator import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.* import no.iktdev.mediaprocessing.shared.common.contract.reader.* -import no.iktdev.streamit.library.db.query.SummaryQuery -import no.iktdev.streamit.library.db.tables.catalog -import no.iktdev.streamit.library.db.tables.titles -import org.jetbrains.exposed.exceptions.ExposedSQLException -import org.jetbrains.exposed.sql.insertIgnore -import org.jetbrains.exposed.sql.select -import org.jetbrains.exposed.sql.update import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File -import java.sql.SQLIntegrityConstraintViolationException @Service class CompletedTaskListener : CoordinatorEventListener() { @@ -186,10 +176,13 @@ class CompletedTaskListener : CoordinatorEventListener() { e.printStackTrace() } - ProcessedItemsStore.store( + val summary = EventsSummaryMapping().map(events) + + + ProcessedFileStore.store( mediaInfo.title, events, - (listOfNotNull(newVideoPath?.second) + (newSubtitles?.map { it.destination } ?: emptyList())) + summary ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index 0ee103ba..e53e5589 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -1,20 +1,16 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners -import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.* -import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.taskManager import no.iktdev.mediaprocessing.coordinator.tasksV2.implementations.WorkTaskListener import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @@ -74,7 +70,7 @@ class ConvertWorkTaskListener: WorkTaskListener() { foundEvent?.outputFile } else if (event.eventType == Events.EventMediaProcessStarted) { val startEvent = event.az()?.data - if (startEvent?.operations?.isOnly(StartOperationEvents.CONVERT) == true) { + if (startEvent?.operations?.isOnly(OperationEvents.CONVERT) == true) { startEvent.file } else null } else { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt index af87ad37..9a0357a5 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt @@ -4,19 +4,15 @@ import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventStatus -import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.EncodeWorkArgumentsMapping import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract -import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import java.io.File @Service class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { @@ -55,7 +51,7 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { active = false return } - if (started.data == null || started.data?.operations?.contains(StartOperationEvents.ENCODE) == false) { + if (started.data == null || started.data?.operations?.contains(OperationEvents.ENCODE) == false) { active = false return } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 4888e1d4..28f79b62 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -4,18 +4,14 @@ import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventStatus -import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.ExtractWorkArgumentsMapping import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract -import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import java.io.File @Service class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { @@ -47,7 +43,7 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { } active = true val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az() ?: return - if (started.data == null || started.data?.operations?.contains(StartOperationEvents.EXTRACT) == false) { + if (started.data == null || started.data?.operations?.contains(OperationEvents.EXTRACT) == false) { active = false return } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt index f2b23e09..37acdbdf 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt @@ -8,19 +8,16 @@ import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventStatus import no.iktdev.eventi.data.dataAs -import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract -import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract import no.iktdev.mediaprocessing.shared.common.contract.data.Event import no.iktdev.mediaprocessing.shared.common.contract.data.MediaFileStreamsReadEvent import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @@ -36,7 +33,7 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { override var coordinator: Coordinator? = null val log = KotlinLogging.logger {} - val requiredOperations = listOf(StartOperationEvents.ENCODE, StartOperationEvents.EXTRACT) + val requiredOperations = listOf(OperationEvents.ENCODE, OperationEvents.EXTRACT) override val produceEvent: Events = Events.EventMediaReadStreamPerformed override val listensForEvents: List = listOf(Events.EventMediaProcessStarted) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EventsSummaryMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EventsSummaryMapping.kt new file mode 100644 index 00000000..bc0d717c --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EventsSummaryMapping.kt @@ -0,0 +1,68 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping + +import no.iktdev.eventi.data.dataAs +import no.iktdev.eventi.data.isSuccessful +import no.iktdev.mediaprocessing.shared.common.contract.Events +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.contract.dto.EventSummary +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationsSummary +import no.iktdev.mediaprocessing.shared.common.contract.dto.OutputFiles +import no.iktdev.mediaprocessing.shared.common.getChecksum + +class EventsSummaryMapping { + + fun map(events: List): EventSummary { + val startOperations = events.find { it.eventType == Events.EventMediaProcessStarted }?.dataAs() ?: throw RuntimeException("No start event found") + val successOperations = listOfNotNull( + if (isEncodedSuccessful(events)) OperationEvents.ENCODE else null, + if (isExtractedSuccessful(events)) OperationEvents.EXTRACT else null, + if (isConvertedSuccessful(events)) OperationEvents.CONVERT else null + ) + + + return EventSummary( + inputFile = startOperations.file, + inputFileChecksum = getChecksum(startOperations.file), + operationsSummary = OperationsSummary( + requestedOperations = startOperations.operations, + completedOperations = successOperations + ), + outputFiles = getProducesFiles(events) + ) + } + + + fun isEncodedSuccessful(events: List) = events.filter { it.eventType == Events.EventWorkEncodePerformed }.any { it.isSuccessful() } + fun isExtractedSuccessful(events: List) = events.filter { it.eventType == Events.EventWorkExtractPerformed }.any { it.isSuccessful() } + fun isConvertedSuccessful(events: List) = events.filter { it.eventType == Events.EventWorkConvertPerformed }.any { it.isSuccessful() } + + fun getProducesFiles(events: List): OutputFiles { + val encoded = if (isEncodedSuccessful(events)) { + events.filter { it.eventType == Events.EventWorkEncodePerformed } + .filter { it.isSuccessful() } + .mapNotNull { it.dataAs()?.outputFile } + } else emptyList() + + val extracted = if (isExtractedSuccessful(events)) { + events.filter { it.eventType == Events.EventWorkExtractPerformed } + .filter { it.isSuccessful() } + .mapNotNull { it.dataAs() } + .map { it.outputFile } + } else emptyList() + + val converted = if (isConvertedSuccessful(events)) { + events.filter { it.eventType == Events.EventWorkConvertPerformed } + .filter { it.isSuccessful() } + .mapNotNull { it.dataAs() } + .flatMap { it.outputFiles } + } else emptyList() + + return OutputFiles( + encoded = encoded, + extracted = extracted, + converted = converted + ) + } + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedItemsStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedFileStore.kt similarity index 57% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedItemsStore.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedFileStore.kt index f45d9918..0ac67d60 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedItemsStore.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ProcessedFileStore.kt @@ -4,30 +4,26 @@ import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.eventi.data.isSuccessful import no.iktdev.mediaprocessing.coordinator.eventDatabase -import no.iktdev.mediaprocessing.coordinator.getStoreDatabase import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.database.tables.processed +import no.iktdev.mediaprocessing.shared.common.contract.dto.EventSummary +import no.iktdev.mediaprocessing.shared.common.database.tables.processedFile import no.iktdev.mediaprocessing.shared.common.getChecksum -import no.iktdev.streamit.library.db.executeOrException import no.iktdev.streamit.library.db.withTransaction import org.jetbrains.exposed.sql.insert -object ProcessedItemsStore { +object ProcessedFileStore { val log = KotlinLogging.logger {} - fun store(title: String, events: List, processedFiles: List) { + fun store(title: String, events: List, summary: EventSummary) { val inputFilePath = events.findFirstEventOf()?.data?.file ?: return val checksum = getChecksum(inputFilePath) - val isEncoded = events.findEventsOf().any { it.isSuccessful() } - val isExtracted = events.findEventsOf().any { it.isSuccessful() } + withTransaction(eventDatabase.database.database, block = { - processed.insert { + processedFile.insert { it[this.title] = title - it[this.fileName] = inputFilePath - it[this.processedFiles] = Gson().toJson(processedFiles) - it[this.encoded] = isEncoded - it[this.extracted] = isExtracted + it[this.inputFile] = inputFilePath + it[this.data] = Gson().toJson(summary) it[this.checksum] = checksum } }) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt index 9033a1ac..16e3e443 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt @@ -4,7 +4,7 @@ import no.iktdev.eventi.data.dataAs import no.iktdev.eventi.data.isSuccessful import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats import java.io.File @@ -17,11 +17,11 @@ object CompletionValidator { * Checks whether it requires encode or extract or both, and it has created events with args */ fun req1(started: MediaProcessStartEvent, events: List): Boolean { - val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.ENCODE) == true) { + val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(OperationEvents.ENCODE) == true) { events.any { it.eventType == Events.EventMediaParameterEncodeCreated } } else true - val extractFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.EXTRACT) == true) { + val extractFulfilledOrSkipped = if (started.data?.operations?.contains(OperationEvents.EXTRACT) == true) { events.any { it.eventType == Events.EventMediaParameterExtractCreated } } else true @@ -34,8 +34,8 @@ object CompletionValidator { * Checks whether work that was supposed to be created has been created. * Checks if all subtitles that can be processed has been created if convert is set. */ - fun req2(operations: List, events: List): Boolean { - if (StartOperationEvents.ENCODE in operations) { + fun req2(operations: List, events: List): Boolean { + if (OperationEvents.ENCODE in operations) { val encodeParamter = events.find { it.eventType == Events.EventMediaParameterEncodeCreated }?.az() val encodeWork = events.find { it.eventType == Events.EventWorkEncodeCreated } if (encodeParamter?.isSuccessful() == true && (encodeWork == null)) @@ -44,12 +44,12 @@ object CompletionValidator { val extractParamter = events.find { it.eventType == Events.EventMediaParameterExtractCreated }?.az() val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } - if (StartOperationEvents.EXTRACT in operations) { + if (OperationEvents.EXTRACT in operations) { if (extractParamter?.isSuccessful() == true && extractParamter.data?.size != extractWork.size) return false } - if (StartOperationEvents.CONVERT in operations) { + if (OperationEvents.CONVERT in operations) { val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } val supportedSubtitleFormats = SubtitleFormats.entries.map { it.name } @@ -66,22 +66,22 @@ object CompletionValidator { /** * Checks whether all work that has been created has been completed */ - fun req3(operations: List, events: List): Boolean { - if (StartOperationEvents.ENCODE in operations) { + fun req3(operations: List, events: List): Boolean { + if (OperationEvents.ENCODE in operations) { val encodeWork = events.filter { it.eventType == Events.EventWorkEncodeCreated } val encodePerformed = events.filter { it.eventType == Events.EventWorkEncodePerformed } if (encodePerformed.size < encodeWork.size) return false } - if (StartOperationEvents.EXTRACT in operations) { + if (OperationEvents.EXTRACT in operations) { val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } val extractPerformed = events.filter { it.eventType == Events.EventWorkExtractPerformed } if (extractPerformed.size < extractWork.size) return false } - if (StartOperationEvents.CONVERT in operations) { + if (OperationEvents.CONVERT in operations) { val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } val convertPerformed = events.filter { it.eventType == Events.EventWorkConvertPerformed } if (convertPerformed.size < convertWork.size) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaProcessStartEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaProcessStartEvent.kt index 8ddc4c7f..c91ad00e 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaProcessStartEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaProcessStartEvent.kt @@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.common.contract.data import no.iktdev.eventi.data.EventMetadata import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.ProcessType -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents data class MediaProcessStartEvent( override val metadata: EventMetadata, @@ -13,10 +13,10 @@ data class MediaProcessStartEvent( data class StartEventData( val type: ProcessType = ProcessType.FLOW, - val operations: List = listOf( - StartOperationEvents.ENCODE, - StartOperationEvents.EXTRACT, - StartOperationEvents.CONVERT + val operations: List = listOf( + OperationEvents.ENCODE, + OperationEvents.EXTRACT, + OperationEvents.CONVERT ), val file: String // AbsolutePath ) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/Enums.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/Enums.kt index b9362b9d..b4f888a1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/Enums.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/Enums.kt @@ -16,12 +16,12 @@ enum class SubtitleFormats { SMI } -enum class StartOperationEvents { +enum class OperationEvents { ENCODE, EXTRACT, CONVERT } -fun List.isOnly(expected: StartOperationEvents): Boolean { +fun List.isOnly(expected: OperationEvents): Boolean { return this.size == 1 && this.firstOrNull { it == expected } != null } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/EventSummary.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/EventSummary.kt new file mode 100644 index 00000000..584bba32 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/dto/EventSummary.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.shared.common.contract.dto + +data class EventSummary( + val operationsSummary: OperationsSummary, + val inputFile: String, + val inputFileChecksum: String, + val outputFiles: OutputFiles +) + + +data class OperationsSummary( + val requestedOperations: List, + val completedOperations: List, +) + +data class OutputFiles( + val encoded: List, + val extracted: List, + val converted: List +) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processed.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processedFile.kt similarity index 52% rename from shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processed.kt rename to shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processedFile.kt index 2b949392..24a63213 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processed.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/processedFile.kt @@ -1,19 +1,15 @@ package no.iktdev.mediaprocessing.shared.common.database.tables -import no.iktdev.mediaprocessing.shared.common.database.tables.runners.defaultExpression -import no.iktdev.mediaprocessing.shared.common.database.tables.tasks.default import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.Column import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.datetime import java.time.LocalDateTime -object processed: IntIdTable() { +object processedFile: IntIdTable() { val title: Column = varchar("title", 256) - val fileName: Column = varchar("fileName", 256) - val processedFiles: Column = text("processedFilesJson") - val encoded: Column = bool("encoded").default(false) - val extracted: Column = bool("extracted").default(false) + val inputFile: Column = varchar("fileName", 512) + val data: Column = text("data") val created: Column = datetime("created").defaultExpression(CurrentDateTime) val checksum: Column = varchar("checksum", 256).nullable() } \ No newline at end of file