From 6e1cc17235be8ec054240de5da18c01508968450 Mon Sep 17 00:00:00 2001 From: Brage Date: Mon, 15 Jan 2024 00:28:48 +0100 Subject: [PATCH] Migrated coordinator to shared listener --- .../converter/ConverterCoordinator.kt | 2 + .../coordinator/Coordinator.kt | 126 ++++++++++-------- .../mediaprocessing/coordinator/Task.kt | 77 ++--------- .../coordination/EventBasedMessageListener.kt | 64 --------- .../PersistentEventBasedMessageListener.kt | 33 +++++ .../tasks/event/BaseInfoFromFile.kt | 10 +- .../tasks/event/CollectAndStoreTask.kt | 4 +- .../coordinator/tasks/event/CompleteTask.kt | 4 +- .../tasks/event/DownloadAndStoreCoverTask.kt | 4 +- .../event/MetadataAndBaseInfoToCoverTask.kt | 4 +- .../event/MetadataAndBaseInfoToFileOut.kt | 4 +- .../tasks/event/ParseVideoFileStreams.kt | 4 +- .../tasks/event/ReadVideoFileStreams.kt | 4 +- .../event/ffmpeg/EncodeArgumentCreatorTask.kt | 4 +- .../ffmpeg/ExtractArgumentCreatorTask.kt | 4 +- .../shared/common/CoordinatorBase.kt | 1 - .../mediaprocessing/shared/common/Utils.kt | 7 + 17 files changed, 157 insertions(+), 199 deletions(-) delete mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index f3581946..4411941e 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.converter import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.DatabaseConfig @@ -17,6 +18,7 @@ import org.springframework.stereotype.Service @Service class ConverterCoordinator() : CoordinatorBase() { + val io = Coroutines.io() private val log = KotlinLogging.logger {} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 967d4781..d48155fd 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -5,7 +5,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.coordinator.coordination.EventBasedMessageListener +import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore @@ -15,9 +16,8 @@ import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv +import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @@ -25,17 +25,54 @@ import java.util.UUID import javax.annotation.PostConstruct @Service -class Coordinator() { +class Coordinator() : CoordinatorBase() { + val io = Coroutines.io() - @Autowired - private lateinit var producer: CoordinatorProducer + override fun onCoordinatorReady() { + readAllUncompletedMessagesInQueue() + } - @Autowired - private lateinit var listener: DefaultMessageListener + override fun onMessageReceived(event: DeserializedConsumerRecord>) { + val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value) + if (!success) { + log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" } + } else { + io.launch { + delay(500) // Give the database a few sec to update + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } + } + } + + override fun createTasksBasedOnEventsAndPersistence( + referenceId: String, + eventId: String, + messages: List + ) { + val triggered = messages.find { it.eventId == eventId } + if (triggered == null) { + log.error { "Could not find $eventId in provided messages" } + return + } + listeners.forwardEventMessageToListeners(triggered, messages) + + if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) { + if (getProcessStarted(messages)?.type == ProcessType.FLOW) { + forwarder.produceAllMissingProcesserEvents( + producer = producer, + referenceId = referenceId, + eventId = eventId, + messages = messages + ) + } else { + log.info { "Process for $referenceId was started manually and will require user input for continuation" } + } + } + } private val log = KotlinLogging.logger {} - val listeners = EventBasedMessageListener() + override val listeners = PersistentEventBasedMessageListener() private val forwarder = Forwarder() @@ -48,9 +85,6 @@ class Coordinator() { producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent) } - - val io = Coroutines.io() - fun readAllUncompletedMessagesInQueue() { val messages = PersistentDataReader().getUncompletedMessages() io.launch { @@ -70,7 +104,7 @@ class Coordinator() { delay(fixedDelay) var delayed = 0L var msc = PersistentDataReader().getMessagesFor(referenceId) - while (msc.find { it.eventId == eventId } != null || delayed < 1000*60) { + while (msc.find { it.eventId == eventId } != null || delayed < 1000 * 60) { delayed += fixedDelay msc = PersistentDataReader().getMessagesFor(referenceId) } @@ -82,7 +116,7 @@ class Coordinator() { } fun operationToRunOnMessages(referenceId: String, eventId: String, messages: List) { - createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) + createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) io.launch { buildModelBasedOnMessagesFor(referenceId, messages) @@ -99,40 +133,6 @@ class Coordinator() { } } - fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List) { - val triggered = messages.find { it.eventId == eventId } - if (triggered == null) { - log.error { "Could not find $eventId in provided messages" } - return - } - listeners.forwardEventMessageToListeners(triggered, messages) - - if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) { - if (getProcessStarted(messages)?.type == ProcessType.FLOW) { - forwarder.produceAllMissingProcesserEvents(producer = producer, referenceId = referenceId, eventId = eventId, messages = messages) - } else { - log.info { "Process for $referenceId was started manually and will require user input for continuation" } - } - } - } - - @PostConstruct - fun onReady() { - io.launch { - listener.onMessageReceived = { event -> - val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value) - if (!success) { - log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" } - } else - io.launch { - delay(500) // Give the database a few sec to update - readAllMessagesFor(event.value.referenceId, event.value.eventId) - } - } - listener.listen(KafkaEnv.kafkaTopic) - } - readAllUncompletedMessagesInQueue() - } class Forwarder() { @@ -141,19 +141,26 @@ class Coordinator() { ) fun hasAnyRequiredEventToCreateProcesserEvents(messages: List): Boolean { - return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }.isNotEmpty() + return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event } + .isNotEmpty() } fun isMissingEncodeWorkCreated(messages: List): Boolean { val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED } return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() } } + fun isMissingExtractWorkCreated(messages: List): Boolean { val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() } } - fun produceAllMissingProcesserEvents(producer: CoordinatorProducer, referenceId: String, eventId: String, messages: List) { + fun produceAllMissingProcesserEvents( + producer: CoordinatorProducer, + referenceId: String, + eventId: String, + messages: List + ) { val currentMessage = messages.find { it.eventId == eventId } if (!currentMessage?.data.isSuccess()) { return @@ -164,11 +171,13 @@ class Coordinator() { produceEncodeWork(producer, currentMessage) } } + KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> { if (isMissingExtractWorkCreated(messages)) { produceExtractWork(producer, currentMessage) } } + else -> {} } } @@ -188,11 +197,13 @@ class Coordinator() { inputFile = data.inputFile, arguments = it.arguments, outFile = it.outputFile - ).let { createdRequest -> - producer.sendMessage(message.referenceId, + ).let { createdRequest -> + producer.sendMessage( + message.referenceId, KafkaEvents.EVENT_WORK_ENCODE_CREATED, eventId = message.eventId, - createdRequest) + createdRequest + ) } } } @@ -212,7 +223,8 @@ class Coordinator() { arguments = it.arguments, outFile = it.outputFile ).let { createdRequest -> - producer.sendMessage(message.referenceId, + producer.sendMessage( + message.referenceId, KafkaEvents.EVENT_WORK_EXTRACT_CREATED, eventId = message.eventId, createdRequest @@ -227,8 +239,10 @@ class Coordinator() { outFileBaseName = outFile.nameWithoutExtension, outDirectory = outFile.parentFile.absolutePath ).let { createdRequest -> - producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED, - createdRequest) + producer.sendMessage( + message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED, + createdRequest + ) } } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt index 57a8bb74..3280d46d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt @@ -1,99 +1,44 @@ package no.iktdev.mediaprocessing.coordinator import mu.KotlinLogging -import no.iktdev.mediaprocessing.coordinator.coordination.Tasks +import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import org.springframework.beans.factory.annotation.Autowired -import javax.annotation.PostConstruct -abstract class TaskCreator: TaskCreatorListener { +abstract class TaskCreator(coordinator: Coordinator): + TaskCreatorImpl(coordinator) { val log = KotlinLogging.logger {} - abstract val producesEvent: KafkaEvents - @Autowired - lateinit var producer: CoordinatorProducer - @Autowired - lateinit var coordinator: Coordinator - - open val requiredEvents: List = listOf() - open val listensForEvents: List = listOf() - - open fun isPrerequisiteEventsOk(events: List): Boolean { + override fun isPrerequisiteEventsOk(events: List): Boolean { val currentEvents = events.map { it.event } return requiredEvents.all { currentEvents.contains(it) } } - open fun isPrerequisiteDataPresent(events: List): Boolean { + override fun isPrerequisiteDataPresent(events: List): Boolean { val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() } return failed.isEmpty() } - open fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean { + override fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean { return event.event == singleOne } - fun getListener(): Tasks { + /*override fun getListener(): Tasks { val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) - } + }*/ - open fun prerequisitesRequired(events: List): List<() -> Boolean> { + override fun prerequisitesRequired(events: List): List<() -> Boolean> { return listOf { isPrerequisiteEventsOk(events) } } - open fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> { + override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> { return listOf() } - - private val context: MutableMap = mutableMapOf() - private val context_key_reference = "reference" - private val context_key_producesEvent = "event" - final override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - context[context_key_reference] = referenceId - getListener().producesEvent.let { - context[context_key_producesEvent] = it - } - - if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) { - val result = onProcessEvents(event, events) - if (result != null) { - onResult(result) - } - } else { - // TODO: Re-enable this - // log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" } - } - } - - abstract fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? - - - private fun onResult(data: MessageDataWrapper) { - producer.sendMessage( - referenceId = context[context_key_reference] as String, - event = context[context_key_producesEvent] as KafkaEvents, - data = data - ) - } - - @PostConstruct - fun postConstruct() { - coordinator.listeners.add(getListener()) - } } - -fun interface Prerequisite { - fun execute(value: Any): Boolean -} - -interface TaskCreatorListener { - fun onEventReceived(referenceId: String, event: PersistentMessage, events: List): Unit -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt deleted file mode 100644 index 54803894..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt +++ /dev/null @@ -1,64 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.coordination - -import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents - -class EventBasedMessageListener { - private val listeners: MutableList = mutableListOf() - - fun add(produces: KafkaEvents, listener: TaskCreatorListener) { - listeners.add(Tasks(producesEvent = produces, taskHandler = listener)) - } - - fun add(task: Tasks) { - listeners.add(task) - } - - private fun waitingListeners(events: List): List { - val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } - return nonCreators - } - - private fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List): List { - return waitingListeners.filter { event.event in it.listensForEvents } - } - - /** - * This will be called in sequence, thus some messages might be made a duplicate of. - */ - fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List) { - val waitingListeners = waitingListeners(events) - //val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners) - //availableListeners.forEach { - waitingListeners.forEach { - try { - it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events) - } catch (e: Exception) { - e.printStackTrace() - } - } - } - - /** - * This will be called with all messages at once, thus it should reflect kafka topic and database - */ - fun forwardBatchEventMessagesToListeners(events: List) { - val waitingListeners = waitingListeners(events) - waitingListeners.forEach { - try { - val last = events.last() - it.taskHandler.onEventReceived(last.referenceId, last, events) - } catch (e: Exception) { - e.printStackTrace() - } - } - } - -} - -data class Tasks( - val producesEvent: KafkaEvents, - val listensForEvents: List = listOf(), - val taskHandler: TaskCreatorListener -) \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt new file mode 100644 index 00000000..0b6d3768 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt @@ -0,0 +1,33 @@ +package no.iktdev.mediaprocessing.coordinator.coordination + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener +import no.iktdev.mediaprocessing.shared.common.tasks.Tasks +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess + +class PersistentEventBasedMessageListener: EventBasedMessageListener() { + + override fun listenerWantingEvent( + event: PersistentMessage, + waitingListeners: List> + ): List> { + return waitingListeners.filter { event.event in it.listensForEvents } + } + + override fun onForward( + event: PersistentMessage, + history: List, + listeners: List> + ) { + listeners.forEach { + it.onEventReceived(referenceId = event.referenceId, event = event, events = history) + } + } + + override fun waitingListeners(events: List): List> { + val nonCreators = listeners.filter { !events.filter { event -> !event.data.isSuccess() }.map { e -> e.event }.contains(it.producesEvent) } + return nonCreators + } + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index 40e30de7..638a5d2e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -1,6 +1,8 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.shared.common.lastOrSuccess import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents @@ -8,11 +10,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @Service -class BaseInfoFromFile() : TaskCreator() { +class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED @@ -26,9 +29,10 @@ class BaseInfoFromFile() : TaskCreator() { } } - override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper { + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - return readFileInfo(event.data as ProcessStarted) + val selected = events.filter { it.event == KafkaEvents.EVENT_PROCESS_STARTED }.lastOrSuccess() ?: return null + return readFileInfo(selected.data as ProcessStarted) } fun readFileInfo(started: ProcessStarted): MessageDataWrapper { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index e6ef7b9b..c8980c01 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException @@ -16,12 +17,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.db.query.* import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.sql.SQLIntegrityConstraintViolationException @Service -class CollectAndStoreTask() : TaskCreator() { +class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val requiredEvents: List = listOf( diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt index a973dcf7..f7bda0eb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -9,10 +10,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service -class CompleteTask() : TaskCreator() { +class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED override val requiredEvents: List = listOf( diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 95a4d31f..daee2cc3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import kotlinx.coroutines.runBlocking +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.DownloadClient import no.iktdev.mediaprocessing.shared.common.getComputername @@ -11,12 +12,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverDownloadWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.util.* @Service -class DownloadAndStoreCoverTask: TaskCreator() { +class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index 2d61e9fa..87cdb593 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents @@ -9,10 +10,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerform import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service -class MetadataAndBaseInfoToCoverTask : TaskCreator() { +class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 916fb7b6..c7212b84 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds @@ -15,6 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerform import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service @@ -25,7 +27,7 @@ import java.time.LocalDateTime */ @Service @EnableScheduling -class MetadataAndBaseInfoToFileOut(): TaskCreator() { +class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index 39b1f727..269364a2 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream @@ -13,10 +14,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsPars import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service -class ParseVideoFileStreams() : TaskCreator() { +class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 128777c8..cf2caf4c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson import com.google.gson.JsonObject import kotlinx.coroutines.runBlocking +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -13,11 +14,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @Service -class ReadVideoFileStreams(): TaskCreator() { +class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 0c877b92..ac9dae71 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.SharedConfig @@ -10,11 +11,12 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @Service -class EncodeArgumentCreatorTask : TaskCreator() { +class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val preference = Preference.getPreference() override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index c7c87a8b..7d17ab52 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.* import no.iktdev.mediaprocessing.shared.common.Preference @@ -13,11 +14,12 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @Service -class ExtractArgumentCreatorTask : TaskCreator() { +class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val preference = Preference.getPreference() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index 1ba607f8..f292a573 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -16,7 +16,6 @@ import javax.annotation.PostConstruct abstract class CoordinatorBase> { abstract val listeners: L - val io = Coroutines.io() @Autowired lateinit var producer: CoordinatorProducer diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 0f5cf88a..e515ca90 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -2,6 +2,9 @@ package no.iktdev.mediaprocessing.shared.common import kotlinx.coroutines.delay import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import java.io.File import java.io.RandomAccessFile import java.net.InetAddress @@ -22,6 +25,10 @@ fun isFileAvailable(file: File): Boolean { return false } +fun List.lastOrSuccess(): PersistentMessage? { + return this.lastOrNull { it.data.isSuccess() } ?: this.lastOrNull() +} + suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) { var elapsedDelay = 0L