diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt index f5905aec..01ae2558 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.encode +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry @@ -14,8 +15,11 @@ import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSucce import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service +private val logger = KotlinLogging.logger {} + @Service class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("encodeWork") { + lateinit var encodeInstructionsListener: EncodeInformationListener init { @@ -45,6 +49,7 @@ class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : Def accepts ) { override fun onMessageReceived(data: ConsumerRecord) { + logger.info { "${data.value().referenceId}: ${data.key()}" } val message = data.value().apply { this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value()) } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt index a4b673d4..2046aec8 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.encode +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry @@ -14,6 +15,7 @@ import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeseri import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service +private val logger = KotlinLogging.logger {} @Service class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("extractWork") { @@ -46,6 +48,7 @@ class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : De accepts ) { override fun onMessageReceived(data: ConsumerRecord) { + logger.info { "${data.value().referenceId}: ${data.key()}" } val message = data.value().apply { this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value()) } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 668caba7..6e6ab3be 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +private val logger = KotlinLogging.logger {} @Service class RunnerCoordinator { @@ -47,12 +48,14 @@ class RunnerCoordinator { if (message.data is EncodeWork) { val data: EncodeWork = message.data as EncodeWork val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) + logger.info { "${message.referenceId} Starting encoding ${data.workId}" } encodeDaemon.runUsingWorkItem() } else { producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork"))) } } } catch (e: Exception) { + e.printStackTrace() producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) } @@ -67,11 +70,13 @@ class RunnerCoordinator { if (message.data is ExtractWork) { val data: ExtractWork = message.data as ExtractWork val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) + logger.info { "${message.referenceId} Starting extraction ${data.workId}" } extractDaemon.runUsingWorkItem() } else { producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) } } catch (e: Exception) { + e.printStackTrace() producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) } }