From c5e4823d3a72c0212acf257e754f178d834af18b Mon Sep 17 00:00:00 2001 From: Brage Date: Wed, 19 Jul 2023 17:37:16 +0200 Subject: [PATCH] Update --- Reader/build.gradle.kts | 2 +- .../streamit/content/reader/analyzer/EncodedStreams.kt | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 71ef4d6b..8021c861 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha33") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha34") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt index e7d3782f..a9503a95 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.reader.analyzer +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.streams.MediaStreams import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector @@ -16,6 +17,8 @@ import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.springframework.stereotype.Service import java.io.File +private val logger = KotlinLogging.logger {} + @Service class EncodedStreams { @@ -34,12 +37,14 @@ class EncodedStreams { deserializers = EncodedDeserializers().getDeserializers(), ) { override fun areAllMessagesPresent(currentEvents: List): Boolean { - val expected = - listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) + val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) + val waitingFor = expected.filter { !currentEvents.contains(it) } + logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" } return expected.containsAll(currentEvents) } override fun onAllMessagesProcessed(referenceId: String, result: Map) { + logger.info { "All messages are received" } val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event] if (baseMessage == null) { produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!")