diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index 8b1f6110..098951bd 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -71,6 +71,21 @@ class StreamsReader { val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n")) messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message) } + + override fun filter(consumerRecord: ConsumerRecord): Boolean { + return shouldIgnoreMessageO(consumerRecord) + } + + fun shouldIgnoreMessageO(consumerRecord: ConsumerRecord): Boolean { + logger.info { "Consumer filter validating against ${consumerRecord.key()}" } + if (consumerRecord.key().isNullOrBlank()) { + logger.info { "Consumer event is null or blank, ignoring!" } + return true + } + val isEventAccepted = accepts.any { it == consumerRecord.key() } + logger.info { "Consumer event is ${if (isEventAccepted) "found" else "not found"} within accepts" } + return !isEventAccepted + } }.listen() }