Updated encode runner
This commit is contained in:
parent
4bb2cef402
commit
92d55631b4
@ -2,6 +2,7 @@ package no.iktdev.streamit.content.encode.runner
|
|||||||
|
|
||||||
import com.google.gson.Gson
|
import com.google.gson.Gson
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import no.iktdev.streamit.content.encode.EncodeEnv
|
import no.iktdev.streamit.content.encode.EncodeEnv
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
@ -14,10 +15,8 @@ import no.iktdev.streamit.library.kafka.dto.Status
|
|||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.*
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -36,9 +35,12 @@ class RunnerCoordinator {
|
|||||||
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
|
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
|
||||||
val scope = CoroutineScope(dispatcher)
|
val scope = CoroutineScope(dispatcher)
|
||||||
|
|
||||||
|
val semaphore = Semaphore(EncodeEnv.maxRunners)
|
||||||
|
|
||||||
fun addEncodeMessageToQueue(message: Message) {
|
fun addEncodeMessageToQueue(message: Message) {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
||||||
scope.launch {
|
scope.launch {
|
||||||
|
semaphore.acquire()
|
||||||
try {
|
try {
|
||||||
if (message.data != null && message.data is EncodeWork) {
|
if (message.data != null && message.data is EncodeWork) {
|
||||||
val data: EncodeWork = message.data as EncodeWork
|
val data: EncodeWork = message.data as EncodeWork
|
||||||
@ -59,6 +61,7 @@ class RunnerCoordinator {
|
|||||||
fun addExtractMessageToQueue(message: Message) {
|
fun addExtractMessageToQueue(message: Message) {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
||||||
scope.launch {
|
scope.launch {
|
||||||
|
semaphore.acquire()
|
||||||
try {
|
try {
|
||||||
if (message.data != null && message.data is ExtractWork) {
|
if (message.data != null && message.data is ExtractWork) {
|
||||||
val data: ExtractWork = message.data as ExtractWork
|
val data: ExtractWork = message.data as ExtractWork
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user