diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index aca1b4d6..c4021673 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,8 +26,8 @@ jobs: build-encode: needs: build-commoncode runs-on: ubuntu-latest - - if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} + if: false + #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} steps: - name: Checkout repository diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 8021c861..af2f31e0 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha34") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha35") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt index a9503a95..22175f78 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt @@ -12,6 +12,7 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.dto.ActionType +import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.springframework.stereotype.Service @@ -20,62 +21,23 @@ import java.io.File private val logger = KotlinLogging.logger {} @Service -class EncodedStreams { +class EncodedStreams : ISequentialMessageEvent { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) - val defaultConsumer = DefaultConsumer(subId = "encodedStreams").apply { + final val defaultConsumer = DefaultConsumer(subId = "encodedStreams").apply { autoCommit = false } - val mainListener = object : SequentialMessageListener( + final val mainListener = object : SequentialMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event, subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event), deserializers = EncodedDeserializers().getDeserializers(), - ) { - override fun areAllMessagesPresent(currentEvents: List): Boolean { - val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) - val waitingFor = expected.filter { !currentEvents.contains(it) } - logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" } - return expected.containsAll(currentEvents) - } - - override fun onAllMessagesProcessed(referenceId: String, result: Map) { - logger.info { "All messages are received" } - val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event] - if (baseMessage == null) { - produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!") - return - } - - if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) { - - return - } - val fileResult = baseMessage?.data as FileWatcher.FileResult? - if (fileResult == null) { - produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!") - return - } - - val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension } - - val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams? - if (streams == null) { - produceErrorMessage(baseMessage, "No streams received!") - return - } - - val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName) - produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments()) - encodeInformation.getSubtitleArguments().forEach { s -> - produceEncodeMessage(baseMessage, s) - } - } - } + this + ) {} init { mainListener.listen() @@ -101,5 +63,45 @@ class EncodedStreams { messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message) } + override fun areAllMessagesPresent(currentEvents: List): Boolean { + val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) + val waitingFor = expected.filter { !currentEvents.contains(it) } + logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" } + return expected.containsAll(currentEvents) + } + + override fun onAllMessagesProcessed(referenceId: String, result: Map) { + logger.info { "All messages are received" } + val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event] + if (baseMessage == null) { + produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!") + return + } + + if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) { + + return + } + val fileResult = baseMessage.data as FileWatcher.FileResult? + if (fileResult == null) { + produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!") + return + } + + val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension } + + val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams? + if (streams == null) { + produceErrorMessage(baseMessage, "No streams received!") + return + } + + val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName) + produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments()) + encodeInformation.getSubtitleArguments().forEach { s -> + produceEncodeMessage(baseMessage, s) + } + } + } \ No newline at end of file