From 072cb2b1922d03f4ca89e7b177d9238ab1e8ac78 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 20 Jul 2024 12:03:00 +0200 Subject: [PATCH] v3 36 --- .../{ConvertServiceV2.kt => ConvertService.kt} | 14 ++++++++++---- .../coordinator/CoordinatorEventCoordinator.kt | 10 ++++++++-- .../listeners/BaseInfoFromFileTaskListener.kt | 10 ++++++---- .../tasksV2/listeners/CompletedTaskListener.kt | 5 ++++- .../tasksV2/listeners/ConvertWorkTaskListener.kt | 8 ++++++-- .../listeners/CoverDownloadTaskListener.kt | 8 ++++++-- .../listeners/CoverFromMetadataTaskListener.kt | 7 +++++-- .../listeners/EncodeWorkArgumentsTaskListener.kt | 7 +++++-- .../tasksV2/listeners/EncodeWorkTaskListener.kt | 6 +++++- .../listeners/ExtractWorkArgumentsTaskListener.kt | 8 ++++++-- .../tasksV2/listeners/ExtractWorkTaskListener.kt | 8 ++++++-- .../listeners/MediaOutInformationTaskListener.kt | 11 ++++++++--- .../MetadataWaitOrDefaultTaskListener.kt | 8 +++++++- .../ParseMediaFileStreamsTaskListener.kt | 8 ++++++-- .../listeners/ReadMediaFileStreamsTaskListener.kt | 9 +++++++-- .../{EncodeServiceV2.kt => EncodeService.kt} | 15 +++++++++------ .../{ExtractServiceV2.kt => ExtractService.kt} | 12 +++++++++--- apps/pyMetadata/app.py | 9 +++++---- apps/pyMetadata/clazz/shared.py | 4 +++- .../kotlin/no/iktdev/eventi/data/EventImpl.kt | 3 ++- .../eventi/implementations/EventListenerImpl.kt | 6 ++++-- .../eventi/mock/listeners/FirstEventListener.kt | 7 ++++++- .../eventi/mock/listeners/ForthEventListener.kt | 7 ++++++- .../eventi/mock/listeners/SecondEventListener.kt | 7 ++++++- .../eventi/mock/listeners/ThirdEventListener.kt | 9 +++++++-- 25 files changed, 152 insertions(+), 54 deletions(-) rename apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/{ConvertServiceV2.kt => ConvertService.kt} (92%) rename apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/{EncodeServiceV2.kt => EncodeService.kt} (96%) rename apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/{ExtractServiceV2.kt => ExtractService.kt} (95%) diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertServiceV2.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt similarity index 92% rename from apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertServiceV2.kt rename to apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 994ffc08..d4945e21 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertServiceV2.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -14,14 +14,18 @@ import no.iktdev.mediaprocessing.shared.contract.data.ConvertData import no.iktdev.mediaprocessing.shared.contract.data.ConvertWorkPerformed import no.iktdev.mediaprocessing.shared.contract.data.ConvertedData import org.springframework.beans.factory.annotation.Autowired -import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.stereotype.Service @Service -class ConvertServiceV2( +class ConvertService( @Autowired var tasks: TaskCoordinator, ) : TaskService(), ConvertListener, TaskCoordinator.TaskEvents { + + fun getProducerName(): String { + return this::class.java.simpleName + } + override val log = KotlinLogging.logger {} override val logDir = ConverterEnv.logDirectory @@ -86,7 +90,8 @@ class ConvertServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Success + status = EventStatus.Success, + source = getProducerName() ), data = ConvertedData( outputFiles = outputFiles @@ -105,7 +110,8 @@ class ConvertServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Failed + status = EventStatus.Failed, + source = getProducerName() ) )) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index 949a1b09..476e150c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -29,6 +29,10 @@ class Coordinator( init { } + fun getProducerName(): String { + return this::class.java.simpleName + } + public fun startProcess(file: File, type: ProcessType) { val operations: List = listOf( StartOperationEvents.ENCODE, @@ -43,7 +47,8 @@ class Coordinator( val event = MediaProcessStartEvent( metadata = EventMetadata( referenceId = referenceId.toString(), - status = EventStatus.Success + status = EventStatus.Success, + source = getProducerName() ), data = StartEventData( file = file.absolutePath, @@ -71,7 +76,8 @@ class Coordinator( metadata = EventMetadata( referenceId = referenceId, derivedFromEventId = eventToAttachTo.eventId(), - status = EventStatus.Success + status = EventStatus.Success, + source = getProducerName() ), data = message )) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt index 862741ff..ebd7fbff 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt @@ -26,7 +26,9 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { override val produceEvent: Events = Events.EventMediaReadBaseInfoPerformed override val listensForEvents: List = listOf(Events.EventMediaProcessStarted) - + override fun getProducerName(): String { + return this::class.java.simpleName + } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { val event = incomingEvent.consume() @@ -36,11 +38,11 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { } val message = try { readFileInfo(event.data as StartEventData, event.metadata.eventId)?.let { - BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success), data = it) - } ?: BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed)) + BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it) + } ?: BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())) } catch (e: Exception) { e.printStackTrace() - BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed)) + BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())) } onProduceEvent(message) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index 634edacf..472f611d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -33,6 +33,9 @@ class CompletedTaskListener: CoordinatorEventListener() { var doNotProduceComplete = System.getenv("DISABLE_COMPLETE").toBoolean() ?: false + override fun getProducerName(): String { + return this::class.java.simpleName + } override fun onReady() { super.onReady() @@ -410,7 +413,7 @@ class CompletedTaskListener: CoordinatorEventListener() { onProduceEvent(MediaProcessCompletedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = CompletedEventData( events.map { it.eventId() } ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index efe07e43..ca89dfc1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -24,6 +24,10 @@ import java.io.File class ConvertWorkTaskListener: WorkTaskListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null override val produceEvent: Events = Events.EventWorkConvertCreated @@ -73,7 +77,7 @@ class ConvertWorkTaskListener: WorkTaskListener() { val convertFile = file?.let { File(it) } if (convertFile == null || !convertFile.exists()) { onProduceEvent(ConvertWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) )) return } else { @@ -86,7 +90,7 @@ class ConvertWorkTaskListener: WorkTaskListener() { ConvertWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = convertData ).also { event -> onProduceEvent(event) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt index 4986b0b4..215ef80b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt @@ -21,6 +21,10 @@ import java.io.File class CoverDownloadTaskListener : CoordinatorEventListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null override val produceEvent: Events = Events.EventWorkDownloadCoverPerformed @@ -34,7 +38,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { val failedEventDefault = MediaCoverDownloadedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) val data = event.az()?.data @@ -83,7 +87,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { return } onProduceEvent(MediaCoverDownloadedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = DownloadedCover(result.absolutePath) )) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index b9e3b73e..e839a54f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -18,6 +18,9 @@ import org.springframework.stereotype.Service class CoverFromMetadataTaskListener: CoordinatorEventListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } @Autowired override var coordinator: Coordinator? = null @@ -73,11 +76,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { val result = if (coverUrl.isNullOrBlank()) { log.warn { "No cover available for ${baseInfo.title}" } MediaCoverInfoReceivedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Skipped) + metadata = event.makeDerivedEventInfo(EventStatus.Skipped, getProducerName()) ) } else { MediaCoverInfoReceivedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = CoverDetails( url = coverUrl, outFileBaseName = NameHelper.normalize(coverTitle), diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt index c66316ea..a4051f4d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt @@ -22,6 +22,9 @@ import java.io.File class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } @Autowired override var coordinator: Coordinator? = null @@ -74,11 +77,11 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { val result = mapper.getArguments() if (result == null) { onProduceEvent(EncodeArgumentCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) )) } else { onProduceEvent(EncodeArgumentCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = result )) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt index ae420a76..455d6f0c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt @@ -22,6 +22,10 @@ import org.springframework.stereotype.Service class EncodeWorkTaskListener : WorkTaskListener() { private val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null override val produceEvent: Events = Events.EventWorkEncodeCreated @@ -55,7 +59,7 @@ class EncodeWorkTaskListener : WorkTaskListener() { return } EncodeWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = encodeArguments ).also { event -> onProduceEvent(event) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 3e9a3a31..6d461c16 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -21,6 +21,10 @@ import java.io.File class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null override val produceEvent: Events = Events.EventMediaParameterExtractCreated @@ -68,11 +72,11 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { val result = mapper.getArguments() if (result.isEmpty()) { onProduceEvent(ExtractArgumentCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Skipped) + metadata = event.makeDerivedEventInfo(EventStatus.Skipped, getProducerName()) )) } else { onProduceEvent(ExtractArgumentCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = result )) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt index 83fd3ea9..1a55936f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt @@ -22,6 +22,10 @@ import org.springframework.stereotype.Service class ExtractWorkTaskListener: WorkTaskListener() { private val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null override val produceEvent: Events = Events.EventWorkExtractCreated @@ -62,14 +66,14 @@ class ExtractWorkTaskListener: WorkTaskListener() { } if (arguments.isEmpty()) { ExtractWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) return } arguments.mapNotNull { ExtractWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it ) }.forEach { event -> diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt index 4a809626..59d3b579 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt @@ -27,6 +27,11 @@ import java.io.FileFilter @Service class MediaOutInformationTaskListener: CoordinatorEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null @@ -52,7 +57,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { log.error { "Required event ${Events.EventMediaReadBaseInfoPerformed} is not present" } coordinator?.produceNewEvent( MediaOutInformationConstructedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) ) return @@ -65,12 +70,12 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { outDirectory = pm.getOutputDirectory().absolutePath, info = vi ).let { MediaOutInformationConstructedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it ) } } else { MediaOutInformationConstructedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) } onProduceEvent(result) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index f24d90cb..c54d8bf1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -29,6 +29,11 @@ val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull @Service @EnableScheduling class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null @@ -101,7 +106,8 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { metadata = EventMetadata( referenceId = it.key, derivedFromEventId = it.value.eventId, - status = EventStatus.Skipped + status = EventStatus.Skipped, + source = getProducerName() ) ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt index 580cf1ee..863abdca 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt @@ -27,6 +27,10 @@ import org.springframework.stereotype.Service class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { val log = KotlinLogging.logger {} + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null @@ -50,13 +54,13 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { val readData = event.dataAs() val result = try { MediaFileStreamsParsedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = parseStreams(readData) ) } catch (e: Exception) { e.printStackTrace() MediaFileStreamsParsedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) } onProduceEvent(result) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt index 662e815a..f840fdcc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt @@ -27,6 +27,11 @@ import java.io.File @Service class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: Coordinator? = null @@ -57,13 +62,13 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { try { val data = fileReadStreams(startEvent, event.metadata.eventId) MediaFileStreamsReadEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = data ) } catch (e: Exception) { e.printStackTrace() MediaFileStreamsReadEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed) + metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) } } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt similarity index 96% rename from apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt rename to apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 2eb18ca5..b50f9db9 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeServiceV2.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -4,7 +4,6 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import mu.KotlinLogging -import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventMetadata import no.iktdev.eventi.data.EventStatus import no.iktdev.mediaprocessing.processer.ProcesserEnv @@ -15,9 +14,7 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.taskManager import no.iktdev.mediaprocessing.shared.common.persistance.Status -import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.task.Task -import no.iktdev.mediaprocessing.shared.contract.Events import no.iktdev.mediaprocessing.shared.contract.data.EncodeArgumentData import no.iktdev.mediaprocessing.shared.contract.data.EncodeWorkPerformedEvent import no.iktdev.mediaprocessing.shared.contract.data.EncodedData @@ -29,11 +26,15 @@ import java.io.File import java.time.Duration @Service -class EncodeServiceV2( +class EncodeService( @Autowired var tasks: TaskCoordinator, @Autowired private val reporter: Reporter ) : FfmpegTaskService(), TaskCoordinator.TaskEvents { + fun getProducerName(): String { + return this::class.java.simpleName + } + override val log = KotlinLogging.logger {} override val logDir = ProcesserEnv.encodeLogDirectory @@ -139,7 +140,8 @@ class EncodeServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Success + status = EventStatus.Success, + source = getProducerName() ), data = EncodedData( outputFile @@ -168,7 +170,8 @@ class EncodeServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Failed + status = EventStatus.Failed, + source = getProducerName() ) )) sendProgress( diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt similarity index 95% rename from apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt rename to apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index c764636a..db20234f 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractServiceV2.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -24,11 +24,15 @@ import org.springframework.stereotype.Service import java.io.File @Service -class ExtractServiceV2( +class ExtractService( @Autowired var tasks: TaskCoordinator, @Autowired private val reporter: Reporter ) : FfmpegTaskService(), TaskCoordinator.TaskEvents { + fun getProducerName(): String { + return this::class.java.simpleName + } + override val log = KotlinLogging.logger {} override val logDir = ProcesserEnv.encodeLogDirectory @@ -111,7 +115,8 @@ class ExtractServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Success + status = EventStatus.Success, + source = getProducerName() ), data = ExtractedData( outputFile @@ -141,7 +146,8 @@ class ExtractServiceV2( metadata = EventMetadata( referenceId = task.referenceId, derivedFromEventId = task.eventId, - status = EventStatus.Failed + status = EventStatus.Failed, + source = getProducerName() ) ) ) diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index a2f2b54e..012c9b03 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -34,7 +34,7 @@ events_server_address = os.environ.get("DATABASE_ADDRESS") or "192.168.2.250" # events_server_port = os.environ.get("DATABASE_PORT") or "3306" events_server_database_name = os.environ.get("DATABASE_NAME_E") or "eventsV3" # "events" events_server_username = os.environ.get("DATABASE_USERNAME") or "root" -events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root" +events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root" // default password @@ -65,8 +65,8 @@ class EventsPullerThread(threading.Thread): GROUP BY referenceId HAVING SUM(event = 'event:media-read-base-info:performed') > 0 - AND SUM(event = 'event:media-metadata-search:performed') != 0 - AND SUM(event = 'event:media-process:completed') != 0 + AND SUM(event = 'event:media-metadata-search:performed') = 0 + AND SUM(event = 'event:media-process:completed') = 0 ) AND event = 'event:media-read-base-info:performed'; """) @@ -211,7 +211,8 @@ class MetadataEventHandler: eventId=str(uuid.uuid4()), derivedFromEventId=event.metadata.eventId, status="Failed" if result is None else "Success", - created=datetime.now().isoformat() + created=datetime.now().isoformat(), + source="metadataApp" ), data=result, eventType="EventMediaMetadataSearchPerformed" diff --git a/apps/pyMetadata/clazz/shared.py b/apps/pyMetadata/clazz/shared.py index 0eca316c..30a0be07 100644 --- a/apps/pyMetadata/clazz/shared.py +++ b/apps/pyMetadata/clazz/shared.py @@ -11,6 +11,7 @@ class EventMetadata: referenceId: str status: str created: datetime + source: str def to_dict(self): return asdict(self) @@ -53,7 +54,8 @@ def json_to_media_event(json_data: str) -> MediaEvent: eventId=metadata_dict['eventId'], referenceId=metadata_dict['referenceId'], status=metadata_dict['status'], - created=parse_datetime(metadata_dict['created']) + created=parse_datetime(metadata_dict['created']), + source=metadata_dict['source'] ) event_data = EventData( diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/data/EventImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/data/EventImpl.kt index 6d6713e6..ba34d132 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/data/EventImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/data/EventImpl.kt @@ -24,7 +24,8 @@ data class EventMetadata( val eventId: String = UUID.randomUUID().toString(), val referenceId: String, val status: EventStatus, - val created: LocalDateTime = LocalDateTime.now() + val created: LocalDateTime = LocalDateTime.now(), + val source: String = "Unknown producer" ) enum class EventStatus { diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt index b962b22e..0f95d928 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt @@ -19,6 +19,7 @@ abstract class EventListenerImpl> { onReady() } + abstract fun getProducerName(): String protected open fun onProduceEvent(event: T) { coordinator?.produceNewEvent(event) ?: { @@ -82,11 +83,12 @@ abstract class EventListenerImpl> { */ abstract fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) - fun T.makeDerivedEventInfo(status: EventStatus): EventMetadata { + fun T.makeDerivedEventInfo(status: EventStatus, source: String): EventMetadata { return EventMetadata( referenceId = this.metadata.referenceId, derivedFromEventId = this.metadata.eventId, - status = status + status = status, + source = source ) } diff --git a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/FirstEventListener.kt b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/FirstEventListener.kt index 0c3aa0be..ca50a2a5 100644 --- a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/FirstEventListener.kt +++ b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/FirstEventListener.kt @@ -12,6 +12,11 @@ import org.springframework.stereotype.Service @Service class FirstEventListener() : MockDataEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: MockEventCoordinator? = null @@ -30,7 +35,7 @@ class FirstEventListener() : MockDataEventListener() { } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { - val info = incomingEvent.consume()!!.makeDerivedEventInfo(EventStatus.Success) + val info = incomingEvent.consume()!!.makeDerivedEventInfo(EventStatus.Success, getProducerName()) onProduceEvent(FirstEvent( eventType = produceEvent, metadata = info, diff --git a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ForthEventListener.kt b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ForthEventListener.kt index 92e6a424..01968147 100644 --- a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ForthEventListener.kt +++ b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ForthEventListener.kt @@ -14,6 +14,11 @@ import org.springframework.stereotype.Service @Service class ForthEventListener() : MockDataEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: MockEventCoordinator? = null @@ -35,7 +40,7 @@ class ForthEventListener() : MockDataEventListener() { val event = incomingEvent.consume() if (event == null) return - val info = event.makeDerivedEventInfo(EventStatus.Success) + val info = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()) onProduceEvent(InitEvent( eventType = produceEvent, metadata = info, diff --git a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/SecondEventListener.kt b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/SecondEventListener.kt index f07a3c37..7a721174 100644 --- a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/SecondEventListener.kt +++ b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/SecondEventListener.kt @@ -12,6 +12,11 @@ import org.springframework.stereotype.Service @Service class SecondEventListener() : MockDataEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: MockEventCoordinator? = null @@ -33,7 +38,7 @@ class SecondEventListener() : MockDataEventListener() { val event = incomingEvent.consume() if (event == null) return - val info = event.makeDerivedEventInfo(EventStatus.Success) + val info = event.makeDerivedEventInfo(EventStatus.Success,getProducerName()) onProduceEvent(SecondEvent( eventType = produceEvent, metadata = info diff --git a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ThirdEventListener.kt b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ThirdEventListener.kt index 155df44b..60f10826 100644 --- a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ThirdEventListener.kt +++ b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/listeners/ThirdEventListener.kt @@ -15,6 +15,11 @@ import org.springframework.stereotype.Service @Service class ThirdEventListener() : MockDataEventListener() { + + override fun getProducerName(): String { + return this::class.java.simpleName + } + @Autowired override var coordinator: MockEventCoordinator? = null @@ -36,11 +41,11 @@ class ThirdEventListener() : MockDataEventListener() { val event = incomingEvent.consume() if (event == null) return - val info = event.makeDerivedEventInfo(EventStatus.Success) + val info = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()) (event as SecondEvent).data.elements.forEach { element -> onProduceEvent(ThirdEvent( eventType = produceEvent, - metadata = event.makeDerivedEventInfo(EventStatus.Success), + metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = element ) )