Updated app.py
This commit is contained in:
parent
d2263aaad4
commit
69e938d10e
@ -114,32 +114,28 @@ class MessageHandlerThread(threading.Thread):
|
||||
def run(self):
|
||||
logger.info("Handling message: key=%s, value=%s", self.message.key, self.message.value)
|
||||
|
||||
# Deserialiser meldingsverdien fra JSON til et Python-dictionary
|
||||
message_value = json.loads(self.message.value)
|
||||
|
||||
# Sjekk om meldingen har en Status
|
||||
if 'status' in message_value:
|
||||
status_type = message_value['status']['statusType']
|
||||
if 'status' in self.message.value:
|
||||
status_type = self.message.value['status']['statusType']
|
||||
|
||||
# Sjekk om statusen er SUCCESS
|
||||
if status_type == 'SUCCESS':
|
||||
data_value = message_value['data']["title"]
|
||||
|
||||
logger.info("Performing action for data: %s", data_value)
|
||||
data_value = self.message.value['data']["title"]
|
||||
|
||||
# Utfør handlingen basert på verdien
|
||||
result = self.perform_action(title=data_value)
|
||||
|
||||
producerMessage = self.compose_message(referenceId=message_value["referenceId"], result=result)
|
||||
producerMessage = self.compose_message(referenceId=self.message.value["referenceId"], result=result)
|
||||
|
||||
# Serialiser resultatet til JSON
|
||||
result_json = json.dumps(producerMessage.to_json())
|
||||
result_json = producerMessage.to_json()
|
||||
|
||||
# Send resultatet tilbake ved hjelp av Kafka-producer
|
||||
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
|
||||
producer.send(kafka_topic, key="event:metadata:obtained", value=result_json)
|
||||
producer.close()
|
||||
|
||||
|
||||
def perform_action(self, title) -> DataResult:
|
||||
anii = AniiMetadata(title)
|
||||
imdb = ImdbMetadata(title)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user