diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 9196cd61..e2b7553e 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -52,11 +52,12 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { logger.info("Worker is waiting for a work item...") val workItem = queue.receive() // Coroutine will wait here until a work item is available logger.info("Worker received a work item.") - if (jobsInProgress.incrementAndGet() <= maxConcurrentJobs) { + if (jobsInProgress.get() <= maxConcurrentJobs) { + jobsInProgress.incrementAndGet() val job = processWorkItem(workItem) job.invokeOnCompletion { - jobsInProgress.decrementAndGet() - logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" } + val currentJobsInProgress = jobsInProgress.decrementAndGet() + logger.info { "Available workers: ${maxConcurrentJobs - currentJobsInProgress}" } } } logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" } @@ -65,6 +66,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { } private suspend fun processWorkItem(workItem: ExecutionBlock): Job { + logger.info { "Processing work: ${workItem.type}" } workItem.work() return Job() } @@ -76,9 +78,9 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) try { if (message.data != null && message.data is EncodeWork) { - + val work = message.data as EncodeWork val workBlock = suspend { - val data: EncodeWork = message.data as EncodeWork + val data: EncodeWork = work val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" } encodeDaemon.runUsingWorkItem() @@ -88,7 +90,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert false -> { if (result.isSuccess) { - StatusType.PENDING // Jobben ble sendt til køen + StatusType.SUCCESS // Jobben ble sendt til køen } else { StatusType.ERROR // Feil ved sending av jobben } @@ -119,7 +121,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert false -> { if (result.isSuccess) { - StatusType.PENDING // Jobben ble sendt til køen + StatusType.SUCCESS // Jobben ble sendt til køen } else { StatusType.ERROR // Feil ved sending av jobben } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt index e2b362ff..489f08ce 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt @@ -10,7 +10,6 @@ import no.iktdev.streamit.content.common.dto.Metadata import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.content.common.dto.reader.MovieInfo -import no.iktdev.streamit.content.common.dto.reader.VideoInfo import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status @@ -49,28 +48,53 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM val initMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] if (initMessage == null || initMessage.status.statusType != StatusType.SUCCESS) { - produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!") + produceErrorMessage( + KafkaEvents.EVENT_READER_DETERMINED_FILENAME, + Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), + "Initiator message not found!" + ) return } val fileResult = initMessage.data as FileResult? if (fileResult == null) { - produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "FileResult is either null or not deserializable!") + produceErrorMessage( + KafkaEvents.EVENT_READER_DETERMINED_FILENAME, + initMessage, + "FileResult is either null or not deserializable!" + ) return } val metadataMessage = result[KafkaEvents.EVENT_METADATA_OBTAINED.event] - val metadata = if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null + val metadata = + if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null - val videoInfo = if (metadata?.type == null) { - FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedVideoInfo() - } else if (metadata.type.lowercase() == "movie") { - FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.MOVIE).getDeterminedVideoInfo() - } else { - FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.SERIE).getDeterminedVideoInfo() - } + + // Due to the fact that the sources might say serie, but it is not a serie input we will give serie a try then default to movie + + + val videoInfo = when (metadata?.type) { + "serie" -> { + FileNameDeterminate( + fileResult.title, + fileResult.sanitizedName, + FileNameDeterminate.ContentType.SERIE + ).getDeterminedVideoInfo() + } + + "movie" -> { + FileNameDeterminate( + fileResult.title, + fileResult.sanitizedName, + FileNameDeterminate.ContentType.MOVIE + ).getDeterminedVideoInfo() + } + + else -> null + } ?: FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedVideoInfo() if (videoInfo == null) { - produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "VideoInfo is null." ) + produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "VideoInfo is null.") return }