Updated encode runner

This commit is contained in:
Brage 2023-07-25 23:20:42 +02:00
parent 27372f37e5
commit 0cc9b28591

View File

@ -35,12 +35,9 @@ class RunnerCoordinator {
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher() val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher) val scope = CoroutineScope(dispatcher)
val semaphore = Semaphore(EncodeEnv.maxRunners)
fun addEncodeMessageToQueue(message: Message) { fun addEncodeMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
scope.launch { scope.launch {
semaphore.acquire()
try { try {
if (message.data != null && message.data is EncodeWork) { if (message.data != null && message.data is EncodeWork) {
val data: EncodeWork = message.data as EncodeWork val data: EncodeWork = message.data as EncodeWork
@ -53,17 +50,13 @@ class RunnerCoordinator {
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
} finally {
semaphore.release()
} }
} }
} }
fun addExtractMessageToQueue(message: Message) { fun addExtractMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
scope.launch { scope.launch {
semaphore.acquire()
try { try {
if (message.data != null && message.data is ExtractWork) { if (message.data != null && message.data is ExtractWork) {
val data: ExtractWork = message.data as ExtractWork val data: ExtractWork = message.data as ExtractWork
@ -76,8 +69,6 @@ class RunnerCoordinator {
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
} finally {
semaphore.release()
} }
} }
} }