From d270f56936729a096e4ff5cb9acf11a9f07c81e4 Mon Sep 17 00:00:00 2001 From: Brage Date: Sun, 23 Jul 2023 21:20:53 +0200 Subject: [PATCH] Fixed tagging (again) + made python a bit more sleepy + More --- .github/workflows/main.yml | 6 +- .../deserializers/DeserializerRegistry.kt | 1 + .../streamit/content/encode}/EncodeEnv.kt | 4 +- .../content/encode}/EncoderApplication.kt | 5 +- .../content/encode/runner/EncodeDaemon.kt | 2 +- .../content/encode/runner/ExtractDaemon.kt | 5 +- .../encode/runner/RunnerCoordinator.kt | 3 +- Reader/build.gradle.kts | 2 +- .../contentDeterminator/ContentDeterminate.kt | 6 +- .../analyzer/encoding/EncodedStreams.kt | 61 +++++++------------ .../analyzer/encoding/ResultCollection.kt | 34 +++++++++++ pyMetadata/app.py | 2 +- 12 files changed, 74 insertions(+), 57 deletions(-) rename Encode/src/main/kotlin/{ => no/iktdev/streamit/content/encode}/EncodeEnv.kt (56%) rename Encode/src/main/kotlin/{ => no/iktdev/streamit/content/encode}/EncoderApplication.kt (86%) create mode 100644 Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index da338a2e..03331e26 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -106,9 +106,9 @@ jobs: context: ./Encode push: true tags: | - bskjon/media-processing.encoder:latest - bskjon/media-processing.encoder:${{ github.sha }} - bskjon/media-processing.encoder:${{ steps.docker-tag.outputs.tag }} + bskjon/mediaprocessing-encoder:latest + bskjon/mediaprocessing-encoder:${{ github.sha }} + bskjon/mediaprocessing-encoder:${{ steps.docker-tag.outputs.tag }} build-reader: needs: build-commoncode diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt index 4be9aab1..59a3c706 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt @@ -7,6 +7,7 @@ class DeserializerRegistry { companion object { private val _registry = mutableMapOf>( KafkaEvents.EVENT_READER_RECEIVED_FILE to FileResultDeserializer(), + KafkaEvents.EVENT_READER_RECEIVED_STREAMS to MediaStreamsDeserializer(), KafkaEvents.EVENT_METADATA_OBTAINED to MetadataResultDeserializer(), KafkaEvents.EVENT_READER_DETERMINED_SERIE to EpisodeInfoDeserializer(), KafkaEvents.EVENT_READER_DETERMINED_MOVIE to MovieInfoDeserializer(), diff --git a/Encode/src/main/kotlin/EncodeEnv.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeEnv.kt similarity index 56% rename from Encode/src/main/kotlin/EncodeEnv.kt rename to Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeEnv.kt index 04344d8e..ac83adda 100644 --- a/Encode/src/main/kotlin/EncodeEnv.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeEnv.kt @@ -1,7 +1,9 @@ +package no.iktdev.streamit.content.encode + class EncodeEnv { companion object { val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false - val maxRunners: Int = System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 + val maxRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1} } } \ No newline at end of file diff --git a/Encode/src/main/kotlin/EncoderApplication.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt similarity index 86% rename from Encode/src/main/kotlin/EncoderApplication.kt rename to Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt index 1baedf30..c58ff211 100644 --- a/Encode/src/main/kotlin/EncoderApplication.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt @@ -1,5 +1,5 @@ -import mu.KotlinLogging -import no.iktdev.exfl.observable.ObservableMap +package no.iktdev.streamit.content.encode + import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext @@ -16,7 +16,6 @@ fun getContext(): ApplicationContext? { fun main(args: Array) { context = runApplication(*args) } -private val logger = KotlinLogging.logger {} /*val progress = ObservableMap().also { it.addListener(object: ObservableMap.Listener { diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt index 3eef4f08..1a37446d 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt @@ -1,6 +1,6 @@ package no.iktdev.streamit.content.encode.runner -import EncodeEnv +import no.iktdev.streamit.content.encode.EncodeEnv import no.iktdev.exfl.observable.ObservableList import no.iktdev.exfl.observable.observableListOf import no.iktdev.streamit.content.common.deamon.Daemon diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt index ed3b0da2..e9673527 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt @@ -1,14 +1,11 @@ package no.iktdev.streamit.content.encode.runner -import EncodeEnv -import no.iktdev.exfl.observable.ObservableList +import no.iktdev.streamit.content.encode.EncodeEnv import no.iktdev.exfl.observable.observableListOf import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.IDaemon -import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.content.encode.progress.Progress -import no.iktdev.streamit.content.encode.progress.ProgressDecoder import java.io.File class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonInterface: IExtractListener): IDaemon { diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index a329c98e..67f6b0d5 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -1,9 +1,8 @@ package no.iktdev.streamit.content.encode.runner -import EncodeEnv +import no.iktdev.streamit.content.encode.EncodeEnv import kotlinx.coroutines.runBlocking import no.iktdev.streamit.content.common.CommonConfig -import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.content.encode.progress.Progress diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 13ddfd31..d510e2c1 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha66") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt index 17fb2e5e..36f6f3fc 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt @@ -74,14 +74,16 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM return } - val out = ContentOutName(videoInfo.fullName) - produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out) + if (videoInfo is EpisodeInfo) { produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo) } else if (videoInfo is MovieInfo) { produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo) } + + val out = ContentOutName(videoInfo.fullName) + produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out) } final override fun loadDeserializers(): Map> { diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index 1289c8dd..f571b420 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -12,60 +12,30 @@ import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener +import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener +import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service import java.io.File private val logger = KotlinLogging.logger {} @Service -class EncodedStreams : DefaultKafkaReader("encodedStreams"), ISequentialMessageEvent { +class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent { - - - final val mainListener = object : SequentialMessageListener( + val collectionListener = CollectorMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, - accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event, - subAccepts = listOf( - KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event, - KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event - ), - deserializers = loadDeserializers(), + initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, + completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME, listener = this - ) {} + ) init { - mainListener.listen() - } - - - override fun getRequiredMessages(): List { - return mainListener.subAccepts + listOf(mainListener.accept) - } - - override fun onAllMessagesProcessed(referenceId: String, result: Map) { - logger.info { "All messages are received" } - val fileResultEvent = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] - val determinedFileNameEvent = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event] - val streamEvent = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event] - - val fileResult = if (fileResultEvent != null && fileResultEvent.isSuccessful()) { - fileResultEvent.data as FileResult? - } else null - - val outFileNameWithoutExtension = if (determinedFileNameEvent != null && determinedFileNameEvent.isSuccessful()) { - (determinedFileNameEvent.data as ContentOutName).baseName - } else fileResult?.sanitizedName - - val streams = if (streamEvent != null && streamEvent.isSuccessful()) { - streamEvent.data as MediaStreams - } else null - - createEncodeWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension) - createExtractWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension) + collectionListener.listen() } fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) { @@ -131,4 +101,17 @@ class EncodedStreams : DefaultKafkaReader("encodedStreams"), ISequentialMessageE ) } + override fun onCollectionCompleted(collection: ResultCollection?) { + logger.info { "Collection received" } + val referenceId = collection?.getRecords()?.firstOrNull()?.value()?.referenceId + if (referenceId == null) { + logger.warn { "referenceId is null, throwing collection" } + return + } + val outFileNameWithoutExtension: String? = collection.getFileName()?.baseName ?: collection.getFileResult()?.sanitizedName + + createEncodeWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension) + createExtractWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension) + } + } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt new file mode 100644 index 00000000..ab953675 --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/ResultCollection.kt @@ -0,0 +1,34 @@ +package no.iktdev.streamit.content.reader.analyzer.encoding + +import no.iktdev.streamit.content.common.deserializers.ContentOutNameDeserializer +import no.iktdev.streamit.content.common.deserializers.FileResultDeserializer +import no.iktdev.streamit.content.common.deserializers.MediaStreamsDeserializer +import no.iktdev.streamit.content.common.dto.ContentOutName +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.content.common.streams.MediaStreams +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.collector.EventCollection +import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful +import org.apache.kafka.clients.consumer.ConsumerRecord + +class ResultCollection: EventCollection() { + + fun getFirstOrNull(events: KafkaEvents): ConsumerRecord? { + return getRecords().firstOrNull { it.key() == events.event } + } + fun getFileResult(): FileResult? { + val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null + return FileResultDeserializer().deserializeIfSuccessful(record.value()) + } + + fun getFileName(): ContentOutName? { + val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null + return ContentOutNameDeserializer().deserializeIfSuccessful(record.value()) + } + + fun getStreams(): MediaStreams? { + val record = getFirstOrNull(KafkaEvents.EVENT_READER_RECEIVED_STREAMS) ?: return null + return MediaStreamsDeserializer().deserializeIfSuccessful(record.value()) + } +} \ No newline at end of file diff --git a/pyMetadata/app.py b/pyMetadata/app.py index 7fd85c43..aef479d9 100644 --- a/pyMetadata/app.py +++ b/pyMetadata/app.py @@ -243,7 +243,7 @@ def main(): # Vent til should_stop er satt til True for å avslutte applikasjonen while not should_stop: - pass + time.sleep(60) # Stopp consumer-tråden consumer_thread.stop()