diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 46867666..c7b24f0c 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha26") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha28") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/settings.gradle.kts b/Reader/settings.gradle.kts index b01a11b8..ffe04905 100644 --- a/Reader/settings.gradle.kts +++ b/Reader/settings.gradle.kts @@ -1,6 +1,4 @@ rootProject.name = "Reader" include(":CommonCode") -project(":CommonCode").projectDir = File("../CommonCode") - -include(":streamit-library-kafka") \ No newline at end of file +project(":CommonCode").projectDir = File("../CommonCode") \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt index 3f43b736..846cfa2f 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt @@ -17,6 +17,7 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.listener.EventMessageListener +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service @@ -52,9 +53,8 @@ class FileWatcher: FileWatcherEvents { } } - object : EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) { - override fun onMessage(data: ConsumerRecord) { - + object : SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) { + override fun onMessageReceived(data: ConsumerRecord) { if (data.value().status.statusType == StatusType.SUCCESS) { if (data.value().data is String) { val file = File(CommonConfig.incomingContent, data.value().data as String) @@ -68,7 +68,6 @@ class FileWatcher: FileWatcherEvents { } } } - } } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index 098951bd..b770687d 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -29,10 +29,9 @@ class StreamsReader { init { - object: SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { - override fun onMessage(data: ConsumerRecord) { + object: SimpleMessageListener(topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, accepts = listOf(EVENT_READER_RECEIVED_FILE.event)) { + override fun onMessageReceived(data: ConsumerRecord) { logger.info { "RECORD: ${data.key()}" } - logger.info { "Active filters: ${this.accepts.joinToString(",") }}" } if (data.value().status.statusType != StatusType.SUCCESS) { logger.info { "Ignoring event: ${data.key()} as status is not Success!" } return @@ -72,20 +71,6 @@ class StreamsReader { messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message) } - override fun filter(consumerRecord: ConsumerRecord): Boolean { - return shouldIgnoreMessageO(consumerRecord) - } - - fun shouldIgnoreMessageO(consumerRecord: ConsumerRecord): Boolean { - logger.info { "Consumer filter validating against ${consumerRecord.key()}" } - if (consumerRecord.key().isNullOrBlank()) { - logger.info { "Consumer event is null or blank, ignoring!" } - return true - } - val isEventAccepted = accepts.any { it == consumerRecord.key() } - logger.info { "Consumer event is ${if (isEventAccepted) "found" else "not found"} within accepts" } - return !isEventAccepted - } }.listen() }