From 56e41dc3efe58ce70b7a376e0ea9db1171b556db Mon Sep 17 00:00:00 2001 From: Brage Date: Mon, 17 Jul 2023 01:50:14 +0200 Subject: [PATCH] Fixed consumer id --- .../content/reader/analyzer/EncodeStreamsProducer.kt | 2 +- .../streamit/content/reader/streams/StreamsReader.kt | 2 +- pyMetadata/app.py | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt index e4691275..aca20c64 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsProducer.kt @@ -22,7 +22,7 @@ class EncodeStreamsProducer: IPooledEvents.OnEventsReceived { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) - val defaultConsumer = DefaultConsumer().apply { + val defaultConsumer = DefaultConsumer(subId = "0m").apply { autoCommit = false } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index aea1a5a3..a07f5e13 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -22,7 +22,7 @@ private val logger = KotlinLogging.logger {} class StreamsReader { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) - val defaultConsumer = DefaultConsumer().apply { + val defaultConsumer = DefaultConsumer(subId = "0a").apply { // autoCommit = false } init { diff --git a/pyMetadata/app.py b/pyMetadata/app.py index d945fa2a..46cfa9d4 100644 --- a/pyMetadata/app.py +++ b/pyMetadata/app.py @@ -59,14 +59,15 @@ class ProducerDataValueSchema: # Kafka consumer-klasse class KafkaConsumerThread(threading.Thread): - def __init__(self, bootstrap_servers, topic): + def __init__(self, bootstrap_servers, topic, consumer_group): super().__init__() self.bootstrap_servers = bootstrap_servers + self.consumer_group = consumer_group self.topic = topic self.shutdown = threading.Event() def run(self): - consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers) + consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group) logger.info("Kafka Consumer started") @@ -181,7 +182,7 @@ def main(): signal.signal(signal.SIGINT, signal_handler) # Opprett og start consumer-trĂ¥den - consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic) + consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic, consumer_group) consumer_thread.start() logger.info("App started")