diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 7e843b84..857d3e47 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -24,7 +24,7 @@ repositories { val exposedVersion = "0.38.2" dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha9") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index 1247b548..07c6431b 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -14,6 +14,7 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent +import no.iktdev.streamit.library.kafka.listener.collector.NeedyMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener @@ -26,11 +27,16 @@ private val logger = KotlinLogging.logger {} @Service class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent { - val collectionListener = CollectorMessageListener( + val collectionListener = NeedyMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME, + needs = listOf( + KafkaEvents.EVENT_READER_RECEIVED_FILE, + KafkaEvents.EVENT_READER_RECEIVED_STREAMS, + KafkaEvents.EVENT_READER_DETERMINED_FILENAME + ), listener = this, eventCollectionClass = ResultCollection::class.java )