Encoder queue update

This commit is contained in:
Brage 2023-07-29 14:12:16 +02:00
parent 123522f58d
commit a9145740e9

View File

@ -49,13 +49,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
fun launchWorker() = defaultScope.launch { fun launchWorker() = defaultScope.launch {
while (true) { while (true) {
logger.info("Worker is waiting for a work item...")
val workItem = queue.receive() // Coroutine will wait here until a work item is available val workItem = queue.receive() // Coroutine will wait here until a work item is available
logger.info("Worker received a work item.")
if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) { if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) {
val job = processWorkItem(workItem) val job = processWorkItem(workItem)
job.invokeOnCompletion { job.invokeOnCompletion {
jobsInProgress.decrementAndGet() jobsInProgress.decrementAndGet()
logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" }
} }
} }
logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" }
} }
} }
@ -78,8 +83,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" } logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" }
encodeDaemon.runUsingWorkItem() encodeDaemon.runUsingWorkItem()
} }
queue.trySend(ExecutionBlock("encode", workBlock)) val result = queue.trySend(ExecutionBlock("encode", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) val statusType = when (result.isClosed) {
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
false -> {
if (result.isSuccess) {
StatusType.PENDING // Jobben ble sendt til køen
} else {
StatusType.ERROR // Feil ved sending av jobben
}
}
}
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(statusType)))
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
} }
@ -99,8 +114,18 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" } logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" }
extractDaemon.runUsingWorkItem() extractDaemon.runUsingWorkItem()
} }
queue.trySend(ExecutionBlock("extract", workBlock)) val result = queue.trySend(ExecutionBlock("extract", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) val statusType = when (result.isClosed) {
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
false -> {
if (result.isSuccess) {
StatusType.PENDING // Jobben ble sendt til køen
} else {
StatusType.ERROR // Feil ved sending av jobben
}
}
}
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(statusType)))
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
} }