From 4bb2cef402c9a27367b8f6e8c35e709c420aabd0 Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 25 Jul 2023 22:50:19 +0200 Subject: [PATCH] Updated encode runner --- .../encode/runner/RunnerCoordinator.kt | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 4d65411f..0f54f794 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -37,6 +37,7 @@ class RunnerCoordinator { val scope = CoroutineScope(dispatcher) fun addEncodeMessageToQueue(message: Message) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) scope.launch { try { if (message.data != null && message.data is EncodeWork) { @@ -53,28 +54,25 @@ class RunnerCoordinator { } } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) } fun addExtractMessageToQueue(message: Message) { - executor.execute { - runBlocking { - try { - if (message.data != null && message.data is ExtractWork) { - val data: ExtractWork = message.data as ExtractWork - val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) - logger.info { "${message.referenceId} Starting extraction ${data.workId}" } - extractDaemon.runUsingWorkItem() - } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) - } - } catch (e: Exception) { - e.printStackTrace() - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) + scope.launch { + try { + if (message.data != null && message.data is ExtractWork) { + val data: ExtractWork = message.data as ExtractWork + val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) + logger.info { "${message.referenceId} Starting extraction ${data.workId}" } + extractDaemon.runUsingWorkItem() + } else { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) } + } catch (e: Exception) { + e.printStackTrace() + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) } } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) } @@ -84,7 +82,7 @@ class RunnerCoordinator { val encodeListener = object: IEncodeListener { override fun onStarted(referenceId: String, work: EncodeWork) { logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } override fun onError(referenceId: String, work: EncodeWork, code: Int) { @@ -98,24 +96,24 @@ class RunnerCoordinator { override fun onEnded(referenceId: String, work: EncodeWork) { logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } } val extractListener = object : IExtractListener { override fun onStarted(referenceId: String, work: ExtractWork) { logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } override fun onError(referenceId: String, work: ExtractWork, code: Int) { logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) } override fun onEnded(referenceId: String, work: ExtractWork) { logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } }