diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index cfac3afd..f38d69e1 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -3,6 +3,7 @@ package no.iktdev.streamit.content.encode.runner import com.google.gson.Gson import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.sync.Mutex import no.iktdev.streamit.content.encode.EncodeEnv import mu.KotlinLogging @@ -19,6 +20,7 @@ import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env import org.springframework.stereotype.Service import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger private val logger = KotlinLogging.logger {} @@ -28,36 +30,43 @@ data class ExecutionBlock( ) @Service -class RunnerCoordinator { +class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { private val logger = KotlinLogging.logger {} val producer = DefaultProducer(CommonConfig.kafkaTopic) final val defaultScope = Coroutines.default() - val queue = Channel() + private val jobsInProgress = AtomicInteger(0) + val queue = Channel(Channel.UNLIMITED) - /*val executor: ExecutorService = ThreadPoolExecutor( - EncodeEnv.maxRunners, - EncodeEnv.maxRunners, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue() - ) - val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher() - val scope = CoroutineScope(dispatcher)*/ init { - defaultScope.launch { - repeat(EncodeEnv.maxRunners) { - launch { - for (item in queue) { - item.work() - } + maxConcurrentJobs = EncodeEnv.maxRunners + repeat(EncodeEnv.maxRunners) { + launchWorker() + } + } + + fun launchWorker() = defaultScope.launch { + while (true) { + val workItem = queue.receive() // Coroutine will wait here until a work item is available + if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) { + val job = processWorkItem(workItem) + job.invokeOnCompletion { + jobsInProgress.decrementAndGet() } } } } + private suspend fun processWorkItem(workItem: ExecutionBlock): Job { + workItem.work() + return Job() + } + + + + fun addEncodeMessageToQueue(message: Message) { producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) try {