This commit is contained in:
Brage 2023-11-07 02:13:35 +01:00
parent c5b6582c64
commit c59d406493

View File

@ -78,13 +78,17 @@ class KafkaConsumerThread(threading.Thread):
self.shutdown = threading.Event()
def run(self):
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.consumer_group,
key_deserializer=lambda x: decode_key(x),
value_deserializer=lambda x: decode_value(x)
)
consumer = None
try:
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.consumer_group,
key_deserializer=lambda x: decode_key(x),
value_deserializer=lambda x: decode_value(x)
)
except:
self.stop()
logger.info("Kafka Consumer started")