diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt index 01ae2558..a783f1ca 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.encode +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader @@ -49,7 +50,7 @@ class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : Def accepts ) { override fun onMessageReceived(data: ConsumerRecord) { - logger.info { "${data.value().referenceId}: ${data.key()}" } + logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" } val message = data.value().apply { this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value()) } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt index 2046aec8..347bf5b7 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.encode +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader @@ -48,7 +49,7 @@ class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : De accepts ) { override fun onMessageReceived(data: ConsumerRecord) { - logger.info { "${data.value().referenceId}: ${data.key()}" } + logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" } val message = data.value().apply { this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value()) } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 6e6ab3be..f21c94ab 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -45,13 +45,13 @@ class RunnerCoordinator { encodeExecutor.execute { try { runBlocking { - if (message.data is EncodeWork) { + if (message.data != null && message.data is EncodeWork) { val data: EncodeWork = message.data as EncodeWork val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) logger.info { "${message.referenceId} Starting encoding ${data.workId}" } encodeDaemon.runUsingWorkItem() } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork"))) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) } } } catch (e: Exception) { @@ -67,7 +67,7 @@ class RunnerCoordinator { extractExecutor.execute { runBlocking { try { - if (message.data is ExtractWork) { + if (message.data != null && message.data is ExtractWork) { val data: ExtractWork = message.data as ExtractWork val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) logger.info { "${message.referenceId} Starting extraction ${data.workId}" }