Updated app.py

This commit is contained in:
Brage 2023-07-17 01:40:54 +02:00
parent eaa93cd9c1
commit 34c2e4d059

View File

@ -1,5 +1,8 @@
import logging
import signal import signal
import sys, os, uuid import sys
import os
import uuid
import threading import threading
import json import json
from kafka import KafkaConsumer, KafkaProducer from kafka import KafkaConsumer, KafkaProducer
@ -9,9 +12,19 @@ from sources.anii import metadata as AniiMetadata
from sources.imdb import metadata as ImdbMetadata from sources.imdb import metadata as ImdbMetadata
# Konfigurer Kafka-forbindelsen # Konfigurer Kafka-forbindelsen
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") if os.environ.get("KAFKA_BOOTSTRAP_SERVER") != None else "127.0.0.1:9092" bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092"
consumer_group = os.environ.get("KAFKA_CONSUMER_ID") if os.environ.get("KAFKA_CONSUMER_ID") != None else f"Metadata-{uuid.uuid4()}" consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"Metadata-{uuid.uuid4()}"
kafka_topic = os.environ.get("KAFKA_TOPIC") if os.environ.get("KAFKA_TOPIC") != None else "127.0.0.1:9092" kafka_topic = os.environ.get("KAFKA_TOPIC") or "127.0.0.1:9092"
# Konfigurer logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class ProducerDataValueSchema: class ProducerDataValueSchema:
def __init__(self, referenceId, statusType, errorMessage, data): def __init__(self, referenceId, statusType, errorMessage, data):
@ -44,7 +57,6 @@ class ProducerDataValueSchema:
return cls(referenceId, statusType, errorMessage, data) return cls(referenceId, statusType, errorMessage, data)
# Kafka consumer-klasse # Kafka consumer-klasse
class KafkaConsumerThread(threading.Thread): class KafkaConsumerThread(threading.Thread):
def __init__(self, bootstrap_servers, topic): def __init__(self, bootstrap_servers, topic):
@ -56,11 +68,15 @@ class KafkaConsumerThread(threading.Thread):
def run(self): def run(self):
consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers)
logger.info("Kafka Consumer started")
while not self.shutdown.is_set(): while not self.shutdown.is_set():
for message in consumer: for message in consumer:
if self.shutdown.is_set(): if self.shutdown.is_set():
break break
logger.info("Received message: key=%s, value=%s", message.key, message.value)
# Sjekk om meldingen har målnøkkelen # Sjekk om meldingen har målnøkkelen
if message.key == "request:metadata:obtain" or message.key == "event:reader:received-file": if message.key == "request:metadata:obtain" or message.key == "event:reader:received-file":
# Opprett en ny tråd for å håndtere meldingen # Opprett en ny tråd for å håndtere meldingen
@ -68,6 +84,7 @@ class KafkaConsumerThread(threading.Thread):
handler_thread.start() handler_thread.start()
consumer.close() consumer.close()
logger.info("Kafka Consumer stopped")
def stop(self): def stop(self):
self.shutdown.set() self.shutdown.set()
@ -79,6 +96,8 @@ class MessageHandlerThread(threading.Thread):
self.message = message self.message = message
def run(self): def run(self):
logger.info("Handling message: key=%s, value=%s", self.message.key, self.message.value)
# Deserialiser meldingsverdien fra JSON til et Python-dictionary # Deserialiser meldingsverdien fra JSON til et Python-dictionary
message_value = json.loads(self.message.value) message_value = json.loads(self.message.value)
@ -90,6 +109,8 @@ class MessageHandlerThread(threading.Thread):
if status_type == 'SUCCESS': if status_type == 'SUCCESS':
data_value = message_value['data']["title"] data_value = message_value['data']["title"]
logger.info("Performing action for data: %s", data_value)
# Utfør handlingen basert på verdien # Utfør handlingen basert på verdien
result = self.perform_action(title=data_value) result = self.perform_action(title=data_value)
@ -137,16 +158,13 @@ class MessageHandlerThread(threading.Thread):
# Returner det mest sannsynlige resultatet # Returner det mest sannsynlige resultatet
return most_likely_result return most_likely_result
def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema: def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema:
""""""
return ProducerDataValueSchema( return ProducerDataValueSchema(
referenceId=referenceId, referenceId=referenceId,
statusType=result.statusType, statusType=result.statusType,
errorMessage=result.errorMessage, errorMessage=result.errorMessage,
data=result.data data=result.data
) )
# Global variabel for å indikere om applikasjonen skal avsluttes # Global variabel for å indikere om applikasjonen skal avsluttes
@ -166,6 +184,8 @@ def main():
consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic) consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic)
consumer_thread.start() consumer_thread.start()
logger.info("App started")
# Vent til should_stop er satt til True for å avslutte applikasjonen # Vent til should_stop er satt til True for å avslutte applikasjonen
while not should_stop: while not should_stop:
pass pass
@ -174,5 +194,7 @@ def main():
consumer_thread.stop() consumer_thread.stop()
consumer_thread.join() consumer_thread.join()
logger.info("App stopped")
if __name__ == '__main__': if __name__ == '__main__':
main() main()