From 69796409b7d3c4439b6fce48ff69a45820df9dd7 Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 25 Jul 2023 19:41:52 +0200 Subject: [PATCH] Using Sequential again --- .../analyzer/encoding/EncodedStreams.kt | 86 +++++++++++-------- .../analyzer/encoding/ResultCollection.kt | 13 --- 2 files changed, 50 insertions(+), 49 deletions(-) diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index 07c6431b..ef736a39 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -3,7 +3,10 @@ package no.iktdev.streamit.content.reader.analyzer.encoding import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.ContentOutNameDeserializer import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.deserializers.FileResultDeserializer +import no.iktdev.streamit.content.common.deserializers.MediaStreamsDeserializer import no.iktdev.streamit.content.common.dto.ContentOutName import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.content.common.streams.MediaStreams @@ -14,8 +17,8 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent -import no.iktdev.streamit.library.kafka.listener.collector.NeedyMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import org.apache.kafka.clients.consumer.ConsumerRecord @@ -25,24 +28,22 @@ import java.io.File private val logger = KotlinLogging.logger {} @Service -class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent { +class EncodedStreams : DefaultKafkaReader("streamSelector"), ISequentialMessageEvent { - val collectionListener = NeedyMessageListener( + val listener = object : SequentialMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, - initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, - completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME, - needs = listOf( - KafkaEvents.EVENT_READER_RECEIVED_FILE, - KafkaEvents.EVENT_READER_RECEIVED_STREAMS, - KafkaEvents.EVENT_READER_DETERMINED_FILENAME - ), + accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event, + subAccepts = listOf( + KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event, + KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event, + ), listener = this, - eventCollectionClass = ResultCollection::class.java - ) + deserializers = this.loadDeserializers() + ) {} init { - collectionListener.listen() + listener.listen() } fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) { @@ -108,29 +109,42 @@ class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesE ) } - override fun onCollectionCompleted(collection: ResultCollection?) { - logger.info { "Collection received" } - val collectedEvents = collection?.getRecords()?.map { it.key() }?.toList() ?: emptyList() - if (collectedEvents.isEmpty()) { - logger.error { "\nConsumer $subId collected: is null or empty!" } - } else { - logger.info { "\nConsumer $subId collected:\n ${collectedEvents.joinToString("\n\t")}" } - } - - val referenceId = collection?.getRecords()?.firstOrNull()?.value()?.referenceId - if (referenceId == null) { - logger.warn { "referenceId is null, throwing collection" } - return - } - val outFileNameWithoutExtension: String? = if (collection.getFileName() != null) { - collection.getFileName()?.baseName - } else { - logger.info { "Getting filename from ${KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event} resulted in null. Falling back to sanitized name" } - collection.getFileResult()?.sanitizedName - } - - createEncodeWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension) - createExtractWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension) + override fun getRequiredMessages(): List { + return listener.subAccepts + listOf(listener.accept) } + override fun onAllMessagesProcessed(referenceId: String, result: Map) { + logger.info { "Collection received" } + if (result.keys.isEmpty()) { + logger.error { "\nConsumer $subId collected: is null or empty!" } + } else { + logger.info { "\nConsumer $subId collected:\n ${result.keys.joinToString("\n\t")}" } + } + + val outFileNameWithoutExtension: String? = if (getFileName(result) != null) { + getFileName(result)?.baseName + } else { + logger.info { "Getting filename from ${KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event} resulted in null. Falling back to sanitized name" } + getFileResult(result)?.sanitizedName + } + + val fileResult = getFileResult(result) + createEncodeWork(referenceId, fileResult?.title, fileResult?.file, getStreams(result), outFileNameWithoutExtension) + createExtractWork(referenceId, fileResult?.title, fileResult?.file, getStreams(result), outFileNameWithoutExtension) + } + + fun getFileResult(result: Map): FileResult? { + val record = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] ?: return null + return FileResultDeserializer().deserializeIfSuccessful(record) + } + + fun getFileName(result: Map): ContentOutName? { + val record = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event] ?: return null + return ContentOutNameDeserializer().deserializeIfSuccessful(record) + } + + fun getStreams(result: Map): MediaStreams? { + val record = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event] ?: return null + return MediaStreamsDeserializer().deserializeIfSuccessful(record) + } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt index 7fa6a1bc..2ad56484 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt @@ -17,18 +17,5 @@ class ResultCollection: DefaultEventCollection() { fun getFirstOrNull(events: KafkaEvents): ConsumerRecord? { return getRecords().firstOrNull { it.key() == events.event } } - fun getFileResult(): FileResult? { - val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null - return FileResultDeserializer().deserializeIfSuccessful(record.value()) - } - fun getFileName(): ContentOutName? { - val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null - return ContentOutNameDeserializer().deserializeIfSuccessful(record.value()) - } - - fun getStreams(): MediaStreams? { - val record = getFirstOrNull(KafkaEvents.EVENT_READER_RECEIVED_STREAMS) ?: return null - return MediaStreamsDeserializer().deserializeIfSuccessful(record.value()) - } } \ No newline at end of file