From 92d55631b4d01e0dc1ec659ca97b0cd1731bb867 Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 25 Jul 2023 23:06:12 +0200 Subject: [PATCH] Updated encode runner --- .../content/encode/runner/RunnerCoordinator.kt | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 0f54f794..308dbf25 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -2,6 +2,7 @@ package no.iktdev.streamit.content.encode.runner import com.google.gson.Gson import kotlinx.coroutines.* +import kotlinx.coroutines.sync.Mutex import no.iktdev.streamit.content.encode.EncodeEnv import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig @@ -14,10 +15,8 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.springframework.stereotype.Service -import java.util.concurrent.ExecutorService -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit +import java.util.concurrent.* + private val logger = KotlinLogging.logger {} @Service @@ -36,9 +35,12 @@ class RunnerCoordinator { val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher() val scope = CoroutineScope(dispatcher) + val semaphore = Semaphore(EncodeEnv.maxRunners) + fun addEncodeMessageToQueue(message: Message) { producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) scope.launch { + semaphore.acquire() try { if (message.data != null && message.data is EncodeWork) { val data: EncodeWork = message.data as EncodeWork @@ -59,6 +61,7 @@ class RunnerCoordinator { fun addExtractMessageToQueue(message: Message) { producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) scope.launch { + semaphore.acquire() try { if (message.data != null && message.data is ExtractWork) { val data: ExtractWork = message.data as ExtractWork