From 84bc18ab4138e0ee3cc91545d564061e7cb47d5b Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 19 Apr 2024 02:22:25 +0200 Subject: [PATCH] Update --- .../coordinator/Coordinator.kt | 36 +++++++++---------- .../coordinator/Implementations.kt | 4 ++- .../controller/RequestEventController.kt | 7 ++-- .../tasks/event/CompleteMediaTask.kt | 16 ++++----- .../event/ffmpeg/EncodeArgumentCreatorTask.kt | 6 ++-- .../ffmpeg/ExtractArgumentCreatorTask.kt | 6 ++-- .../mediaprocessing/processer/Coordinator.kt | 12 +++++-- .../mediaprocessing/processer/Reporter.kt | 5 +++ .../iktdev/mediaprocessing/ui/Coordinator.kt | 3 +- .../shared/common/CoordinatorBase.kt | 3 ++ .../common/persistance/processerEvents.kt | 2 +- .../shared/contract/dto/Enums.kt | 7 +--- .../shared/kafka/core/KafkaEvents.kt | 4 --- .../dto/events_result/MediaProcessStarted.kt | 10 +++--- .../events_result/RequestProcessStarted.kt | 18 ---------- 15 files changed, 63 insertions(+), 76 deletions(-) delete mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index a5e17ecb..44be3eb6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -8,15 +8,17 @@ import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMe import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ProcessType -import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents -import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import java.io.File import java.util.UUID +@EnableScheduling @Service class Coordinator() : CoordinatorBase() { val io = Coroutines.io() @@ -58,32 +60,22 @@ class Coordinator() : CoordinatorBase = listOf( - ProcessStartOperationEvents.ENCODE, - ProcessStartOperationEvents.EXTRACT, - ProcessStartOperationEvents.CONVERT + val operations: List = listOf( + StartOperationEvents.ENCODE, + StartOperationEvents.EXTRACT, + StartOperationEvents.CONVERT ) startProcess(file, type, operations) } - fun startProcess(file: File, type: ProcessType, operations: List) { + fun startProcess(file: File, type: ProcessType, operations: List): UUID { + val referenceId: UUID = UUID.randomUUID() val processStartEvent = MediaProcessStarted( status = Status.COMPLETED, file = file.absolutePath, type = type ) producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EventMediaProcessStarted, processStartEvent) - - } - - public fun startRequestProcess(file: File, operations: List): UUID { - val referenceId: UUID = UUID.randomUUID() - val start = RequestProcessStarted( - status = Status.COMPLETED, - file = file.absolutePath, - operations = operations - ) - producer.sendMessage(referenceId = referenceId.toString(), KafkaEvents.EVENT_REQUEST_PROCESS_STARTED, start) return referenceId } @@ -150,6 +142,14 @@ class Coordinator() : CoordinatorBase = mutableMapOf( - ProcessStartOperationEvents.ENCODE to hasEncodeAndIsRequired, - ProcessStartOperationEvents.EXTRACT to hasExtractAndIsRequired, - ProcessStartOperationEvents.CONVERT to hasConvertAndIsRequired + val missingRequired: MutableMap = mutableMapOf( + StartOperationEvents.ENCODE to hasEncodeAndIsRequired, + StartOperationEvents.EXTRACT to hasExtractAndIsRequired, + StartOperationEvents.CONVERT to hasConvertAndIsRequired ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 4a7051e2..2c441119 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -44,8 +44,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted - if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) { - log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" } + if (!started.operations.contains(StartOperationEvents.ENCODE)) { + log.info { "Couldn't find operation event ${StartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" } return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index 1eb36c4c..38a222b7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -8,7 +8,7 @@ import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.* import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream @@ -52,8 +52,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato return null } val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted - if (!started.operations.contains(ProcessStartOperationEvents.EXTRACT)) { - log.info { "Couldn't find operation event ${ProcessStartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" } + if (!started.operations.contains(StartOperationEvents.EXTRACT)) { + log.info { "Couldn't find operation event ${StartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" } return null } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index c34d63cb..0fcc7149 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -7,10 +7,12 @@ import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled @@ -101,9 +103,13 @@ class Coordinator(): CoordinatorBase lists.any { it.event in processKafkaEvents } } - val validMessages = usableMessages.filter { lists -> lists.any { it.event == KafkaEvents.EventMediaProcessStarted } || - (lists.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED } && lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } ) - }.flatten().filter { it.event in processKafkaEvents } + val validMessages = usableMessages.filter { lists -> + lists.any { it.event == KafkaEvents.EventMediaProcessStarted && (it.data as MediaProcessStarted).type == ProcessType.FLOW } || + lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } + } + .flatten() + .filter { it.event in processKafkaEvents } + validMessages.filter { it.eventId !in existing }.forEach { eventManager.setProcessEvent(it.event, Message( diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt index 2544e514..8b85ff8d 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.processer import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.stereotype.Service import org.springframework.web.client.RestTemplate @@ -10,9 +11,12 @@ import org.springframework.web.client.RestTemplate class Reporter() { @Autowired lateinit var restTemplate: RestTemplate + @Autowired + lateinit var messageTemplate: SimpMessagingTemplate fun sendEncodeProgress(progress: ProcesserEventInfo) { try { restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) + messageTemplate.convertAndSend("/topic/encode/progress", progress) } catch (e: Exception) { e.printStackTrace() } @@ -21,6 +25,7 @@ class Reporter() { fun sendExtractProgress(progress: ProcesserEventInfo) { try { restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java) + messageTemplate.convertAndSend("/topic/extract/progress", progress) } catch (e: Exception) { e.printStackTrace() } diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt index 00118bbe..e3104aac 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt @@ -61,13 +61,12 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo val stored = events.findLast { it.event == KafkaEvents.EventCollectAndStore } val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted } val completedMediaEvent = events.findLast { it.event == KafkaEvents.EventMediaProcessCompleted } - val completedRequestEvent = events.findLast { it.event == KafkaEvents.EventRequestProcessCompleted } if (stored != null && stored.data.isSuccess()) { return SummaryState.Completed } - if (completedMediaEvent?.data.isSuccess() || completedRequestEvent?.data.isSuccess()) { + if (completedMediaEvent?.data.isSuccess()) { return SummaryState.AwaitingStore } if (processes.values.all { it.status == SummaryState.Completed }) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index 93a054b8..440ad732 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -19,6 +19,8 @@ import org.springframework.stereotype.Service import javax.annotation.PostConstruct abstract class CoordinatorBase> { + private var ready: Boolean = false + fun isReady() = ready private val log = KotlinLogging.logger {} abstract val listeners: L @@ -37,6 +39,7 @@ abstract class CoordinatorBase> { log.info { "Attaching listeners to Coordinator" } listener.onMessageReceived = { event -> onMessageReceived(event)} listener.listen(KafkaEnv.kafkaTopic) + ready = true } abstract fun onMessageReceived(event: DeserializedConsumerRecord>) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt index 9777113c..7602d78c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt @@ -19,6 +19,6 @@ object processerEvents: IntIdTable() { val lastCheckIn: Column = datetime("lastCheckIn").nullable() init { - uniqueIndex(referenceId, event) + uniqueIndex(referenceId, eventId, event) } } \ No newline at end of file diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt index 9f687a80..ce7b69d1 100644 --- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt @@ -16,13 +16,8 @@ enum class SubtitleFormats { SMI } -enum class ProcessStartOperationEvents { +enum class StartOperationEvents { ENCODE, EXTRACT, CONVERT -} - -enum class RequestStartOperationEvents { - CONVERT, - EXTRACT, } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index ac4d74c5..e2c4a000 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.shared.kafka.core enum class KafkaEvents(val event: String) { EventMediaProcessStarted("event:media-process:started"), - EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"), EventMediaReadStreamPerformed("event:media-read-stream:performed"), EventMediaParseStreamPerformed("event:media-parse-stream:performed"), @@ -18,7 +17,6 @@ enum class KafkaEvents(val event: String) { EventMediaWorkProceedPermitted("event:media-work-proceed:permitted"), - // This event is to be used for commuincating across all appss taht an event has ben removed and to rterminate existint events EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"), EventWorkEncodeCreated("event:work-encode:created"), @@ -37,7 +35,6 @@ enum class KafkaEvents(val event: String) { EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), EventMediaProcessCompleted("event:media-process:completed"), - EventRequestProcessCompleted("event:request-process:completed"), EventCollectAndStore("event::save"), ; @@ -63,7 +60,6 @@ enum class KafkaEvents(val event: String) { fun isOfFinalize(event: KafkaEvents): Boolean { return event in listOf( EventMediaProcessCompleted, - EventRequestProcessCompleted, EventCollectAndStore ) } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt index caba7b5e..58505519 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt @@ -1,7 +1,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result import no.iktdev.mediaprocessing.shared.contract.ProcessType -import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -11,10 +11,10 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status data class MediaProcessStarted( override val status: Status, val type: ProcessType = ProcessType.FLOW, - val operations: List = listOf( - ProcessStartOperationEvents.ENCODE, - ProcessStartOperationEvents.EXTRACT, - ProcessStartOperationEvents.CONVERT + val operations: List = listOf( + StartOperationEvents.ENCODE, + StartOperationEvents.EXTRACT, + StartOperationEvents.CONVERT ), val file: String // AbsolutePath ) : MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt deleted file mode 100644 index 1392f722..00000000 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt +++ /dev/null @@ -1,18 +0,0 @@ -package no.iktdev.mediaprocessing.shared.kafka.dto.events_result - -import no.iktdev.mediaprocessing.shared.contract.ProcessType -import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents -import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.Status - -@KafkaBelongsToEvent(KafkaEvents.EVENT_REQUEST_PROCESS_STARTED) -data class RequestProcessStarted( - override val status: Status, - val operations: List = listOf( - RequestStartOperationEvents.CONVERT - ), - val file: String // AbsolutePath -) : MessageDataWrapper(status) \ No newline at end of file