From d2263aaad49ec5df2f220c75520ed1eea52efedc Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 18 Jul 2023 02:48:09 +0200 Subject: [PATCH] Updated app.py --- pyMetadata/app.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pyMetadata/app.py b/pyMetadata/app.py index 9cdcad0d..29e095de 100644 --- a/pyMetadata/app.py +++ b/pyMetadata/app.py @@ -56,6 +56,12 @@ class ProducerDataValueSchema: return cls(referenceId, statusType, errorMessage, data) +def decode_key(key_bytes): + return key_bytes.decode('utf-8') if key_bytes else None + +def decode_value(value_bytes): + return json.loads(value_bytes.decode('utf-8')) if value_bytes else None + # Kafka consumer-klasse class KafkaConsumerThread(threading.Thread): @@ -67,7 +73,13 @@ class KafkaConsumerThread(threading.Thread): self.shutdown = threading.Event() def run(self): - consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group) + consumer = KafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + group_id=self.consumer_group, + key_deserializer=lambda x: decode_key(x), + value_deserializer=lambda x: decode_value(x) + ) logger.info("Kafka Consumer started")