diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 195fa01b..0645d190 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -4,14 +4,20 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.stereotype.Service @@ -19,36 +25,16 @@ import javax.annotation.PostConstruct @Service @EnableScheduling -class Coordinator() { - - @Autowired - private lateinit var producer: CoordinatorProducer - - @Autowired - private lateinit var listener: DefaultMessageListener - +class Coordinator(): CoordinatorBase() { private val log = KotlinLogging.logger {} - - val listeners = EventBasedMessageListener() - val io = Coroutines.io() + override val listeners = PersistentEventProcessBasedMessageListener() - fun readAllAvailableInQueue() { - val messages = PersistentDataReader().getAvailableProcessEvents() - io.launch { - messages.forEach { - delay(1000) - createTasksBasedOnEventsAndPersistance(referenceId = it.referenceId, eventId = it.eventId, messages) - } - } - } - - fun readAllMessagesFor(referenceId: String, eventId: String) { - val messages = PersistentDataReader().getAvailableProcessEvents() - createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) - } - - fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List) { + override fun createTasksBasedOnEventsAndPersistence( + referenceId: String, + eventId: String, + messages: List + ) { val triggered = messages.find { it.eventId == eventId } if (triggered == null) { log.error { "Could not find $eventId in provided messages" } @@ -57,30 +43,44 @@ class Coordinator() { listeners.forwardEventMessageToListeners(triggered, messages) } + override fun onCoordinatorReady() { + readAllAvailableInQueue() + } + + override fun onMessageReceived(event: DeserializedConsumerRecord>) { + val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) + if (!success) { + log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" } + } else { + io.launch { + delay(500) + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } + } + } + + + + + fun readAllAvailableInQueue() { + val messages = PersistentDataReader().getAvailableProcessEvents() + io.launch { + messages.forEach { + delay(1000) + createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages) + } + } + } + + fun readAllMessagesFor(referenceId: String, eventId: String) { + val messages = PersistentDataReader().getAvailableProcessEvents() + createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) + } + val processKafkaEvents = listOf( KafkaEvents.EVENT_WORK_ENCODE_CREATED, KafkaEvents.EVENT_WORK_EXTRACT_CREATED, ) - @PostConstruct - fun onReady() { - io.launch { - listener.onMessageReceived = { event -> - if (event.key in processKafkaEvents) { - val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) - if (!success) { - log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" } - } else - readAllMessagesFor(event.value.referenceId, event.value.eventId) - } else if (event.key in listOf(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED, KafkaEvents.EVENT_WORK_ENCODE_SKIPPED)) { - readAllAvailableInQueue() - } else { - log.debug { "Skipping ${event.key}" } - } - } - listener.listen(KafkaEnv.kafkaTopic) - } - readAllAvailableInQueue() - } } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt deleted file mode 100644 index b306c9ff..00000000 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt +++ /dev/null @@ -1,37 +0,0 @@ -package no.iktdev.mediaprocessing.processer - -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents - -class EventBasedMessageListener { - private val listeners: MutableList = mutableListOf() - - fun add(produces: KafkaEvents, listener: TaskCreatorListener) { - listeners.add(Tasks(produces, listener)) - } - - fun add(task: Tasks) { - listeners.add(task) - } - - private fun waitingListeners(events: List): List { - val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } - return nonCreators - } - - fun forwardEventMessageToListeners(newEvent: PersistentProcessDataMessage, events: List) { - waitingListeners(events).forEach { - try { - it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events) - } catch (e: Exception) { - e.printStackTrace() - } - } - } - -} - -data class Tasks( - val producesEvent: KafkaEvents, - val taskHandler: TaskCreatorListener -) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt index b0dfb51c..5c05946d 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt @@ -2,7 +2,10 @@ package no.iktdev.mediaprocessing.processer import com.google.gson.Gson import mu.KotlinLogging +import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -11,84 +14,35 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.messaging.simp.SimpMessagingTemplate import javax.annotation.PostConstruct -abstract class TaskCreator: TaskCreatorListener { - private val log = KotlinLogging.logger {} +abstract class TaskCreator(coordinator: Coordinator) : + TaskCreatorImpl(coordinator) { - @Autowired - lateinit var producer: CoordinatorProducer - - @Autowired - lateinit var coordinator: Coordinator - - @Autowired - lateinit var socketMessage: SimpMessagingTemplate - - open val requiredEvents: List = listOf() - - open fun isPrerequisiteEventsOk(events: List): Boolean { + override fun isPrerequisiteEventsOk(events: List): Boolean { val currentEvents = events.map { it.event } return requiredEvents.all { currentEvents.contains(it) } } - open fun isPrerequisiteDataPresent(events: List): Boolean { + override fun isPrerequisiteDataPresent(events: List): Boolean { val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() } return failed.isEmpty() } - open fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { + override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { return event.event == singleOne } - abstract fun getListener(): Tasks + /*override fun getListener(): Tasks { + val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } + return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) + }*/ - open fun prerequisitesRequired(events: List): List<() -> Boolean> { + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { return listOf { isPrerequisiteEventsOk(events) } } - - - private val context: MutableMap = mutableMapOf() - private val context_key_reference = "reference" - private val context_key_producesEvent = "event" - final override fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List) { - context[context_key_reference] = referenceId - getListener().producesEvent.let { - context[context_key_producesEvent] = it - } - - if (prerequisitesRequired(events).all { it.invoke() }) { - val result = onProcessEvents(event, events) - if (result != null) { - log.info { "Event handled on ${this::class.simpleName} ${event.eventId} is: \nSOM\n${Gson().toJson(result)}\nEOM" } - onResult(result) - } - } else { - log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" } - } + override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> { + return listOf() } - - abstract fun onProcessEvents(event: PersistentProcessDataMessage, events: List): MessageDataWrapper? - - - private fun onResult(data: MessageDataWrapper) { - producer.sendMessage( - referenceId = context[context_key_reference] as String, - event = context[context_key_producesEvent] as KafkaEvents, - data = data - ) - } - - @PostConstruct - fun postConstruct() { - coordinator.listeners.add(getListener()) - } -} - -fun interface Prerequisite { - fun execute(value: Any): Boolean -} - -interface TaskCreatorListener { - fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List): Unit } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/PersistentEventProcessBasedMessageListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/PersistentEventProcessBasedMessageListener.kt new file mode 100644 index 00000000..af22fc57 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/PersistentEventProcessBasedMessageListener.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing.processer.coordination + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener +import no.iktdev.mediaprocessing.shared.common.tasks.Tasks + +class PersistentEventProcessBasedMessageListener: EventBasedMessageListener() { + + override fun listenerWantingEvent( + event: PersistentProcessDataMessage, + waitingListeners: List> + ): List> { + return waitingListeners.filter { event.event in it.listensForEvents } + } + + override fun onForward( + event: PersistentProcessDataMessage, + history: List, + listeners: List> + ) { + listeners.forEach { + it.onEventReceived(referenceId = event.referenceId, event = event, events = history) + } + } + + override fun waitingListeners(events: List): List> { + val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } + return nonCreators + } + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/ProcesserSocketMessageListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/ProcesserSocketMessageListener.kt new file mode 100644 index 00000000..ca74bfff --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/coordination/ProcesserSocketMessageListener.kt @@ -0,0 +1,11 @@ +package no.iktdev.mediaprocessing.processer.coordination + +/** + * Class to handle messages from websockets, produced by Processer instances. + * This is due to keep a overview of progress by processer + */ +class ProcesserSocketMessageListener { + + + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index f3daa551..ac091ce4 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.processer.Tasks +import no.iktdev.mediaprocessing.processer.Coordinator import no.iktdev.mediaprocessing.processer.TaskCreator import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker @@ -19,17 +19,18 @@ import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.util.* import javax.annotation.PreDestroy @Service -class EncodeService: TaskCreator() { +class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { private val log = KotlinLogging.logger {} private val logDir = ProcesserEnv.encodeLogDirectory - val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED + override val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED val scope = Coroutines.io() private var runner: FfmpegWorker? = null @@ -42,10 +43,6 @@ class EncodeService: TaskCreator() { override val requiredEvents: List get() = listOf(KafkaEvents.EVENT_WORK_ENCODE_CREATED) - override fun getListener(): Tasks { - return Tasks(producesEvent, this) - } - override fun prerequisitesRequired(events: List): List<() -> Boolean> { return super.prerequisitesRequired(events) + listOf { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 504a6142..854b7cd7 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -3,8 +3,8 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.processer.Coordinator import no.iktdev.mediaprocessing.processer.TaskCreator -import no.iktdev.mediaprocessing.processer.Tasks import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents @@ -20,18 +20,19 @@ import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.util.* import javax.annotation.PreDestroy @Service -class ExtractService: TaskCreator() { +class ExtractService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { private val log = KotlinLogging.logger {} private val logDir = ProcesserEnv.extractLogDirectory - val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED + override val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED val scope = Coroutines.io() @@ -42,9 +43,6 @@ class ExtractService: TaskCreator() { init { log.info { "Starting with id: $serviceId" } } - override fun getListener(): Tasks { - return Tasks(producesEvent, this) - } override val requiredEvents: List get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)