diff --git a/pyMetadata/app.py b/pyMetadata/app.py index aff64ed5..d945fa2a 100644 --- a/pyMetadata/app.py +++ b/pyMetadata/app.py @@ -1,5 +1,8 @@ +import logging import signal -import sys, os, uuid +import sys +import os +import uuid import threading import json from kafka import KafkaConsumer, KafkaProducer @@ -9,9 +12,19 @@ from sources.anii import metadata as AniiMetadata from sources.imdb import metadata as ImdbMetadata # Konfigurer Kafka-forbindelsen -bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") if os.environ.get("KAFKA_BOOTSTRAP_SERVER") != None else "127.0.0.1:9092" -consumer_group = os.environ.get("KAFKA_CONSUMER_ID") if os.environ.get("KAFKA_CONSUMER_ID") != None else f"Metadata-{uuid.uuid4()}" -kafka_topic = os.environ.get("KAFKA_TOPIC") if os.environ.get("KAFKA_TOPIC") != None else "127.0.0.1:9092" +bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092" +consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"Metadata-{uuid.uuid4()}" +kafka_topic = os.environ.get("KAFKA_TOPIC") or "127.0.0.1:9092" + +# Konfigurer logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) class ProducerDataValueSchema: def __init__(self, referenceId, statusType, errorMessage, data): @@ -44,7 +57,6 @@ class ProducerDataValueSchema: return cls(referenceId, statusType, errorMessage, data) - # Kafka consumer-klasse class KafkaConsumerThread(threading.Thread): def __init__(self, bootstrap_servers, topic): @@ -56,11 +68,15 @@ class KafkaConsumerThread(threading.Thread): def run(self): consumer = KafkaConsumer(self.topic, bootstrap_servers=self.bootstrap_servers) + logger.info("Kafka Consumer started") + while not self.shutdown.is_set(): for message in consumer: if self.shutdown.is_set(): break + logger.info("Received message: key=%s, value=%s", message.key, message.value) + # Sjekk om meldingen har målnøkkelen if message.key == "request:metadata:obtain" or message.key == "event:reader:received-file": # Opprett en ny tråd for å håndtere meldingen @@ -68,6 +84,7 @@ class KafkaConsumerThread(threading.Thread): handler_thread.start() consumer.close() + logger.info("Kafka Consumer stopped") def stop(self): self.shutdown.set() @@ -79,6 +96,8 @@ class MessageHandlerThread(threading.Thread): self.message = message def run(self): + logger.info("Handling message: key=%s, value=%s", self.message.key, self.message.value) + # Deserialiser meldingsverdien fra JSON til et Python-dictionary message_value = json.loads(self.message.value) @@ -90,6 +109,8 @@ class MessageHandlerThread(threading.Thread): if status_type == 'SUCCESS': data_value = message_value['data']["title"] + logger.info("Performing action for data: %s", data_value) + # Utfør handlingen basert på verdien result = self.perform_action(title=data_value) @@ -137,16 +158,13 @@ class MessageHandlerThread(threading.Thread): # Returner det mest sannsynlige resultatet return most_likely_result - def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema: - """""" return ProducerDataValueSchema( referenceId=referenceId, statusType=result.statusType, errorMessage=result.errorMessage, data=result.data ) - # Global variabel for å indikere om applikasjonen skal avsluttes @@ -166,6 +184,8 @@ def main(): consumer_thread = KafkaConsumerThread(bootstrap_servers, kafka_topic) consumer_thread.start() + logger.info("App started") + # Vent til should_stop er satt til True for å avslutte applikasjonen while not should_stop: pass @@ -174,5 +194,7 @@ def main(): consumer_thread.stop() consumer_thread.join() + logger.info("App stopped") + if __name__ == '__main__': - main() \ No newline at end of file + main()