From 73c97dd73ad45b3a541ed2d72bc00a5364cfa7e1 Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 25 Jul 2023 19:21:56 +0200 Subject: [PATCH] Updated lib version + changed listener --- Reader/build.gradle.kts | 2 +- .../content/reader/analyzer/encoding/EncodedStreams.kt | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 7e843b84..857d3e47 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -24,7 +24,7 @@ repositories { val exposedVersion = "0.38.2" dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha9") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index 1247b548..07c6431b 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -14,6 +14,7 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent +import no.iktdev.streamit.library.kafka.listener.collector.NeedyMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener @@ -26,11 +27,16 @@ private val logger = KotlinLogging.logger {} @Service class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent { - val collectionListener = CollectorMessageListener( + val collectionListener = NeedyMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME, + needs = listOf( + KafkaEvents.EVENT_READER_RECEIVED_FILE, + KafkaEvents.EVENT_READER_RECEIVED_STREAMS, + KafkaEvents.EVENT_READER_DETERMINED_FILENAME + ), listener = this, eventCollectionClass = ResultCollection::class.java )