Fixed consumer id

This commit is contained in:
Brage 2023-07-17 01:50:14 +02:00
parent b310ecbd28
commit 56e41dc3ef
3 changed files with 6 additions and 5 deletions

View File

@ -22,7 +22,7 @@ class EncodeStreamsProducer: IPooledEvents.OnEventsReceived {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer().apply {
val defaultConsumer = DefaultConsumer(subId = "0m").apply {
autoCommit = false
}

View File

@ -22,7 +22,7 @@ private val logger = KotlinLogging.logger {}
class StreamsReader {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer().apply {
val defaultConsumer = DefaultConsumer(subId = "0a").apply {
// autoCommit = false
}
init {

View File

@ -59,14 +59,15 @@ class ProducerDataValueSchema:
# Kafka consumer-klasse
class KafkaConsumerThread(threading.Thread):
def __init__(self, bootstrap_servers, topic):
def __init__(self, bootstrap_servers, topic, consumer_group):
super().__init__()
self.bootstrap_servers = bootstrap_servers
self.consumer_group = consumer_group
self.topic = topic
self.shutdown = threading.Event()
def run(self):
consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers)
consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group)
logger.info("Kafka Consumer started")
@ -181,7 +182,7 @@ def main():
signal.signal(signal.SIGINT, signal_handler)
# Opprett og start consumer-tråden
consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic)
consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic, consumer_group)
consumer_thread.start()
logger.info("App started")