diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 2b478046..d8cd2776 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger private val logger = KotlinLogging.logger {} data class ExecutionBlock( + val workId: String, val type: String, val work: suspend () -> Int ) @@ -56,8 +57,9 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { val job = processWorkItem(workItem) inProgressJobs.add(job) job.invokeOnCompletion { + logger.info { "OnCompletion invoked!\n\nWorkId: ${workItem.workId}-${workItem.type} \n\tCurrent active worksers: ${jobsInProgress.get()}" } val workers = jobsInProgress.decrementAndGet() - logger.info { "Available workers: ${workers}/${maxConcurrentJobs}" } + logger.info { "Worker Released: Available: ${workers}/${maxConcurrentJobs}" } inProgressJobs.remove(job) } } @@ -86,7 +88,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" } encodeDaemon.runUsingWorkItem() } - val result = queue.trySend(ExecutionBlock("encode", workBlock)) + val result = queue.trySend(ExecutionBlock(work.workId, "encode", workBlock)) val statusType = when (result.isClosed) { true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert false -> { @@ -111,13 +113,14 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) try { if (message.data != null && message.data is ExtractWork) { + val work = message.data as ExtractWork val workBlock = suspend { - val data: ExtractWork = message.data as ExtractWork + val data: ExtractWork = work val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" } extractDaemon.runUsingWorkItem() } - val result = queue.trySend(ExecutionBlock("extract", workBlock)) + val result = queue.trySend(ExecutionBlock(work.workId,"extract", workBlock)) val statusType = when (result.isClosed) { true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert false -> {