diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index 4411941e..997ddf8a 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.DatabaseConfig @@ -17,12 +18,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.springframework.stereotype.Service @Service -class ConverterCoordinator() : CoordinatorBase() { +class ConverterCoordinator() : CoordinatorBase() { val io = Coroutines.io() private val log = KotlinLogging.logger {} - override val listeners: EventBasedProcessMessageListener = EventBasedProcessMessageListener() + override val listeners = PersistentEventProcessBasedMessageListener() override fun createTasksBasedOnEventsAndPersistence( referenceId: String, eventId: String, @@ -36,26 +37,12 @@ class ConverterCoordinator() : CoordinatorBase>) { if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) { val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) @@ -69,7 +56,21 @@ class ConverterCoordinator() : CoordinatorBase(coordinator) { + + override fun isPrerequisiteEventsOk(events: List): Boolean { + val currentEvents = events.map { it.event } + return requiredEvents.all { currentEvents.contains(it) } + } + override fun isPrerequisiteDataPresent(events: List): Boolean { + val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() } + return failed.isEmpty() + } + + override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { + return event.event == singleOne + } + + /*override fun getListener(): Tasks { + val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } + return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) + }*/ + + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return listOf { + isPrerequisiteEventsOk(events) + } + } + + override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> { + return listOf() + } +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/coordination/PersistentEventProcessBasedMessageListener.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/coordination/PersistentEventProcessBasedMessageListener.kt new file mode 100644 index 00000000..7d1f5a3a --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/coordination/PersistentEventProcessBasedMessageListener.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing.converter.coordination + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener +import no.iktdev.mediaprocessing.shared.common.tasks.Tasks + +class PersistentEventProcessBasedMessageListener: EventBasedMessageListener() { + + override fun listenerWantingEvent( + event: PersistentProcessDataMessage, + waitingListeners: List> + ): List> { + return waitingListeners.filter { event.event in it.listensForEvents } + } + + override fun onForward( + event: PersistentProcessDataMessage, + history: List, + listeners: List> + ) { + listeners.forEach { + it.onEventReceived(referenceId = event.referenceId, event = event, events = history) + } + } + + override fun waitingListeners(events: List): List> { + val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } + return nonCreators + } + +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt deleted file mode 100644 index 64f34ba5..00000000 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt +++ /dev/null @@ -1,29 +0,0 @@ -package no.iktdev.mediaprocessing.converter.flow - -import no.iktdev.mediaprocessing.converter.ConverterCoordinator -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage -import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess - -abstract class ProcesserTaskCreator(coordinator: ConverterCoordinator): - TaskCreatorImpl(coordinator) { - - override fun isPrerequisiteEventsOk(events: List): Boolean { - val currentEvents = events.map { it.event } - return requiredEvents.all { currentEvents.contains(it) } - } - - override fun isPrerequisiteDataPresent(events: List): Boolean { - val failed = events - .filter { e -> e.event in requiredEvents } - .filter { !it.data.isSuccess() } - return failed.isEmpty() - } - - override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { - return event.event == singleOne - } - -} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index fb7720dc..6a7021cc 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -5,8 +5,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import mu.KotlinLogging import no.iktdev.mediaprocessing.converter.ConverterCoordinator +import no.iktdev.mediaprocessing.converter.TaskCreator import no.iktdev.mediaprocessing.converter.convert.Converter -import no.iktdev.mediaprocessing.converter.flow.ProcesserTaskCreator import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore @@ -22,9 +22,10 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.util.* +private val log = KotlinLogging.logger {} + @Service -class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : ProcesserTaskCreator(coordinator) { - private val log = KotlinLogging.logger {} +class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : TaskCreator(coordinator) { val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" init { diff --git a/apps/converter/src/main/resources/application.properties b/apps/converter/src/main/resources/application.properties new file mode 100644 index 00000000..a1f43aa1 --- /dev/null +++ b/apps/converter/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.output.ansi.enabled=always +logging.level.org.apache.kafka=WARN +logging.level.root=INFO diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt index 5c05946d..22312561 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt @@ -1,18 +1,10 @@ package no.iktdev.mediaprocessing.processer -import com.google.gson.Gson -import mu.KotlinLogging import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl -import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.messaging.simp.SimpMessagingTemplate -import javax.annotation.PostConstruct abstract class TaskCreator(coordinator: Coordinator) : TaskCreatorImpl(coordinator) {