Encoder queue change

This commit is contained in:
Brage 2023-07-29 13:35:25 +02:00
parent ac7c252cee
commit 123522f58d

View File

@ -3,6 +3,7 @@ package no.iktdev.streamit.content.encode.runner
import com.google.gson.Gson import com.google.gson.Gson
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import no.iktdev.streamit.content.encode.EncodeEnv import no.iktdev.streamit.content.encode.EncodeEnv
import mu.KotlinLogging import mu.KotlinLogging
@ -19,6 +20,7 @@ import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -28,36 +30,43 @@ data class ExecutionBlock(
) )
@Service @Service
class RunnerCoordinator { class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
val producer = DefaultProducer(CommonConfig.kafkaTopic) val producer = DefaultProducer(CommonConfig.kafkaTopic)
final val defaultScope = Coroutines.default() final val defaultScope = Coroutines.default()
val queue = Channel<ExecutionBlock>()
private val jobsInProgress = AtomicInteger(0)
val queue = Channel<ExecutionBlock>(Channel.UNLIMITED)
/*val executor: ExecutorService = ThreadPoolExecutor(
EncodeEnv.maxRunners,
EncodeEnv.maxRunners,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue()
)
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher)*/
init { init {
defaultScope.launch { maxConcurrentJobs = EncodeEnv.maxRunners
repeat(EncodeEnv.maxRunners) { repeat(EncodeEnv.maxRunners) {
launch { launchWorker()
for (item in queue) { }
item.work() }
}
fun launchWorker() = defaultScope.launch {
while (true) {
val workItem = queue.receive() // Coroutine will wait here until a work item is available
if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) {
val job = processWorkItem(workItem)
job.invokeOnCompletion {
jobsInProgress.decrementAndGet()
} }
} }
} }
} }
private suspend fun processWorkItem(workItem: ExecutionBlock): Job {
workItem.work()
return Job()
}
fun addEncodeMessageToQueue(message: Message) { fun addEncodeMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
try { try {