Encoder queue update

This commit is contained in:
Brage 2023-07-29 14:35:36 +02:00
parent a9145740e9
commit 9daf8b6636
2 changed files with 45 additions and 19 deletions

View File

@ -52,11 +52,12 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
logger.info("Worker is waiting for a work item...")
val workItem = queue.receive() // Coroutine will wait here until a work item is available
logger.info("Worker received a work item.")
if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) {
if (jobsInProgress.get() <= maxConcurrentJobs) {
jobsInProgress.incrementAndGet()
val job = processWorkItem(workItem)
job.invokeOnCompletion {
jobsInProgress.decrementAndGet()
logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" }
val currentJobsInProgress = jobsInProgress.decrementAndGet()
logger.info { "Available workers: ${maxConcurrentJobs - currentJobsInProgress}" }
}
}
logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" }
@ -65,6 +66,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
}
private suspend fun processWorkItem(workItem: ExecutionBlock): Job {
logger.info { "Processing work: ${workItem.type}" }
workItem.work()
return Job()
}
@ -76,9 +78,9 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
try {
if (message.data != null && message.data is EncodeWork) {
val work = message.data as EncodeWork
val workBlock = suspend {
val data: EncodeWork = message.data as EncodeWork
val data: EncodeWork = work
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" }
encodeDaemon.runUsingWorkItem()
@ -88,7 +90,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
false -> {
if (result.isSuccess) {
StatusType.PENDING // Jobben ble sendt til køen
StatusType.SUCCESS // Jobben ble sendt til køen
} else {
StatusType.ERROR // Feil ved sending av jobben
}
@ -119,7 +121,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
false -> {
if (result.isSuccess) {
StatusType.PENDING // Jobben ble sendt til køen
StatusType.SUCCESS // Jobben ble sendt til køen
} else {
StatusType.ERROR // Feil ved sending av jobben
}

View File

@ -10,7 +10,6 @@ import no.iktdev.streamit.content.common.dto.Metadata
import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo
import no.iktdev.streamit.content.common.dto.reader.FileResult
import no.iktdev.streamit.content.common.dto.reader.MovieInfo
import no.iktdev.streamit.content.common.dto.reader.VideoInfo
import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
@ -49,26 +48,51 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM
val initMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]
if (initMessage == null || initMessage.status.statusType != StatusType.SUCCESS) {
produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!")
produceErrorMessage(
KafkaEvents.EVENT_READER_DETERMINED_FILENAME,
Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)),
"Initiator message not found!"
)
return
}
val fileResult = initMessage.data as FileResult?
if (fileResult == null) {
produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "FileResult is either null or not deserializable!")
produceErrorMessage(
KafkaEvents.EVENT_READER_DETERMINED_FILENAME,
initMessage,
"FileResult is either null or not deserializable!"
)
return
}
val metadataMessage = result[KafkaEvents.EVENT_METADATA_OBTAINED.event]
val metadata = if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null
val metadata =
if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null
val videoInfo = if (metadata?.type == null) {
FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedVideoInfo()
} else if (metadata.type.lowercase() == "movie") {
FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.MOVIE).getDeterminedVideoInfo()
} else {
FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.SERIE).getDeterminedVideoInfo()
// Due to the fact that the sources might say serie, but it is not a serie input we will give serie a try then default to movie
val videoInfo = when (metadata?.type) {
"serie" -> {
FileNameDeterminate(
fileResult.title,
fileResult.sanitizedName,
FileNameDeterminate.ContentType.SERIE
).getDeterminedVideoInfo()
}
"movie" -> {
FileNameDeterminate(
fileResult.title,
fileResult.sanitizedName,
FileNameDeterminate.ContentType.MOVIE
).getDeterminedVideoInfo()
}
else -> null
} ?: FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedVideoInfo()
if (videoInfo == null) {
produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "VideoInfo is null.")
return