diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt index c74d92f8..e4691275 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt @@ -20,7 +20,7 @@ import java.io.File @Service class EncodeStreamsProducer: IPooledEvents.OnEventsReceived { - val messageProducer = DefaultProducer(CommonConfig.kafkaConsumerId) + val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val defaultConsumer = DefaultConsumer().apply { autoCommit = false @@ -28,7 +28,7 @@ class EncodeStreamsProducer: IPooledEvents.OnEventsReceived { init { val ackListener = PooledEventMessageListener( - topic = CommonConfig.kafkaConsumerId, consumer = defaultConsumer, + topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, mainFilter = KnownEvents.EVENT_READER_RECEIVED_FILE.event, subFilter = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event), event = this diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index e184f01f..aea1a5a3 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -21,12 +21,12 @@ private val logger = KotlinLogging.logger {} @Service class StreamsReader { - val messageProducer = DefaultProducer(CommonConfig.kafkaConsumerId) + val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val defaultConsumer = DefaultConsumer().apply { // autoCommit = false } init { - object: EventMessageListener(CommonConfig.kafkaConsumerId, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { + object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { override fun onMessage(data: ConsumerRecord) { if (data.value().status.statusType != StatusType.SUCCESS) { logger.info { "Ignoring event: ${data.key()} as status is not Success!" }