Updated references

This commit is contained in:
Brage 2023-07-17 01:16:32 +02:00
parent f3e66a1a6b
commit 457494b9b0
2 changed files with 4 additions and 4 deletions

View File

@ -20,7 +20,7 @@ import java.io.File
@Service
class EncodeStreamsProducer: IPooledEvents.OnEventsReceived {
val messageProducer = DefaultProducer(CommonConfig.kafkaConsumerId)
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer().apply {
autoCommit = false
@ -28,7 +28,7 @@ class EncodeStreamsProducer: IPooledEvents.OnEventsReceived {
init {
val ackListener = PooledEventMessageListener(
topic = CommonConfig.kafkaConsumerId, consumer = defaultConsumer,
topic = CommonConfig.kafkaTopic, consumer = defaultConsumer,
mainFilter = KnownEvents.EVENT_READER_RECEIVED_FILE.event,
subFilter = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event),
event = this

View File

@ -21,12 +21,12 @@ private val logger = KotlinLogging.logger {}
@Service
class StreamsReader {
val messageProducer = DefaultProducer(CommonConfig.kafkaConsumerId)
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer().apply {
// autoCommit = false
}
init {
object: EventMessageListener(CommonConfig.kafkaConsumerId, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
override fun onMessage(data: ConsumerRecord<String, Message>) {
if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }