diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index f38d69e1..9196cd61 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -49,13 +49,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { fun launchWorker() = defaultScope.launch { while (true) { + logger.info("Worker is waiting for a work item...") val workItem = queue.receive() // Coroutine will wait here until a work item is available + logger.info("Worker received a work item.") if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) { val job = processWorkItem(workItem) job.invokeOnCompletion { jobsInProgress.decrementAndGet() + logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" } } } + logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" } + } } @@ -78,8 +83,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" } encodeDaemon.runUsingWorkItem() } - queue.trySend(ExecutionBlock("encode", workBlock)) - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) + val result = queue.trySend(ExecutionBlock("encode", workBlock)) + val statusType = when (result.isClosed) { + true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert + false -> { + if (result.isSuccess) { + StatusType.PENDING // Jobben ble sendt til køen + } else { + StatusType.ERROR // Feil ved sending av jobben + } + } + } + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) } else { producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) } @@ -99,8 +114,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" } extractDaemon.runUsingWorkItem() } - queue.trySend(ExecutionBlock("extract", workBlock)) - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) + val result = queue.trySend(ExecutionBlock("extract", workBlock)) + val statusType = when (result.isClosed) { + true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert + false -> { + if (result.isSuccess) { + StatusType.PENDING // Jobben ble sendt til køen + } else { + StatusType.ERROR // Feil ved sending av jobben + } + } + } + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) } else { producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) }