From d9c8cab1fe836e98e1d49b8488fd9b423fb941cd Mon Sep 17 00:00:00 2001 From: Brage Date: Thu, 27 Jul 2023 20:31:07 +0200 Subject: [PATCH] Update --- CommonCode/build.gradle.kts | 2 +- .../deserializers/DeserializerRegistry.kt | 6 +- Convert/build.gradle.kts | 2 +- .../content/convert/kafka/SubtitleConsumer.kt | 10 +-- Encode/build.gradle.kts | 2 +- .../encode/runner/RunnerCoordinator.kt | 90 ++++++++++++------- Reader/build.gradle.kts | 2 +- .../reader/collector/ResultCollection.kt | 2 +- .../reader/collector/SubtitleConsumer.kt | 16 ++-- .../content/reader/collector/VideoConsumer.kt | 4 +- 10 files changed, 82 insertions(+), 54 deletions(-) diff --git a/CommonCode/build.gradle.kts b/CommonCode/build.gradle.kts index 4cb1a560..e86f2f1c 100644 --- a/CommonCode/build.gradle.kts +++ b/CommonCode/build.gradle.kts @@ -20,7 +20,7 @@ dependencies { implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("com.google.code.gson:gson:2.8.9") diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt index 07911bc1..792afb15 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt @@ -14,10 +14,10 @@ class DeserializerRegistry { KafkaEvents.EVENT_READER_DETERMINED_FILENAME to ContentOutNameDeserializer(), KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO to EncodeWorkDeserializer(), - KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE to EncodeWorkDeserializer(), + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED to EncodeWorkDeserializer(), KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE to ExtractWorkDeserializer(), - KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE to ExtractWorkDeserializer(), - KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE to ConvertWorkDeserializer() + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED to ExtractWorkDeserializer(), + KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED to ConvertWorkDeserializer() ) fun getRegistry(): Map> = _registry.toMap() diff --git a/Convert/build.gradle.kts b/Convert/build.gradle.kts index 219f7176..ff2c72d1 100644 --- a/Convert/build.gradle.kts +++ b/Convert/build.gradle.kts @@ -25,7 +25,7 @@ dependencies { implementation("no.iktdev.library:subtitle:1.7-SNAPSHOT") - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt index 9d04b0c8..8196470c 100644 --- a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt @@ -27,7 +27,7 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi private final val listener = object : SimpleMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, - accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) + accepts = listOf(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event) ) { override fun onMessageReceived(data: ConsumerRecord) { val referenceId = data.value().referenceId @@ -40,7 +40,7 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi collection = workResult.collection, language = workResult.language, ) - produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.PENDING)), convertWork) + produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_STARTED, Message(referenceId = referenceId, Status(statusType = StatusType.PENDING)), convertWork) Coroutines.io().launch { ConvertRunner(referenceId, this@SubtitleConsumer).readAndConvert(convertWork) } @@ -55,15 +55,15 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi } override fun onStarted(referenceId: String) { - produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), null) + produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_STARTED, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), null) } override fun onError(referenceId: String, info: SubtitleInfo, message: String) { - produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null) + produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null) } override fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork) { - produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work) + produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work) } } \ No newline at end of file diff --git a/Encode/build.gradle.kts b/Encode/build.gradle.kts index 8e533be1..f42cc660 100644 --- a/Encode/build.gradle.kts +++ b/Encode/build.gradle.kts @@ -23,7 +23,7 @@ repositories { dependencies { implementation(project(":CommonCode")) - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index 0e8906a5..9df044d8 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -2,9 +2,11 @@ package no.iktdev.streamit.content.encode.runner import com.google.gson.Gson import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.sync.Mutex import no.iktdev.streamit.content.encode.EncodeEnv import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork @@ -14,18 +16,27 @@ import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.producer.DefaultProducer +import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env import org.springframework.stereotype.Service import java.util.concurrent.* private val logger = KotlinLogging.logger {} +data class ExecutionBlock( + val type: String, + val work: suspend () -> Int +) + @Service class RunnerCoordinator { private val logger = KotlinLogging.logger {} val producer = DefaultProducer(CommonConfig.kafkaTopic) + final val defaultScope = Coroutines.default() + val queue = Channel() - val executor: ExecutorService = ThreadPoolExecutor( + + /*val executor: ExecutorService = ThreadPoolExecutor( EncodeEnv.maxRunners, EncodeEnv.maxRunners, 0L, @@ -33,43 +44,60 @@ class RunnerCoordinator { LinkedBlockingQueue() ) val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher() - val scope = CoroutineScope(dispatcher) + val scope = CoroutineScope(dispatcher)*/ - fun addEncodeMessageToQueue(message: Message) { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) - scope.launch { - try { - if (message.data != null && message.data is EncodeWork) { - val data: EncodeWork = message.data as EncodeWork - val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) - logger.info { "${message.referenceId} Starting encoding ${data.workId}" } - encodeDaemon.runUsingWorkItem() - } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) + init { + defaultScope.launch { + repeat(EncodeEnv.maxRunners) { + launch { + for (item in queue) { + item.work() + } } - } catch (e: Exception) { - e.printStackTrace() - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) } } } + fun addEncodeMessageToQueue(message: Message) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) + try { + if (message.data != null && message.data is EncodeWork) { + + val workBlock = suspend { + val data: EncodeWork = message.data as EncodeWork + val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) + logger.info { "${message.referenceId} Starting encoding ${data.workId}" } + encodeDaemon.runUsingWorkItem() + } + queue.trySend(ExecutionBlock("encode", workBlock)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) + } else { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) + } + } catch (e: Exception) { + e.printStackTrace() + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) + } + } + fun addExtractMessageToQueue(message: Message) { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) - scope.launch { - try { - if (message.data != null && message.data is ExtractWork) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) + try { + if (message.data != null && message.data is ExtractWork) { + val workBlock = suspend { val data: ExtractWork = message.data as ExtractWork val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) logger.info { "${message.referenceId} Starting extraction ${data.workId}" } extractDaemon.runUsingWorkItem() - } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) } - } catch (e: Exception) { - e.printStackTrace() - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) + queue.trySend(ExecutionBlock("extract", workBlock)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) + } else { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) } + } catch (e: Exception) { + e.printStackTrace() + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) } } @@ -80,12 +108,12 @@ class RunnerCoordinator { val encodeListener = object: IEncodeListener { override fun onStarted(referenceId: String, work: EncodeWork) { logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } override fun onError(referenceId: String, work: EncodeWork, code: Int) { logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) } override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { @@ -94,24 +122,24 @@ class RunnerCoordinator { override fun onEnded(referenceId: String, work: EncodeWork) { logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } } val extractListener = object : IExtractListener { override fun onStarted(referenceId: String, work: ExtractWork) { logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } override fun onError(referenceId: String, work: ExtractWork, code: Int) { logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) } override fun onEnded(referenceId: String, work: ExtractWork) { logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) } } diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 86cda78b..aab588ca 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -24,7 +24,7 @@ repositories { val exposedVersion = "0.38.2" dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt index a2c37255..41b35e4e 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt @@ -64,7 +64,7 @@ class ResultCollection: DefaultEventCollection() { } fun getEncodeWork(): EncodeWork? { - return firstOrNull(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE)?.let { + return firstOrNull(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED)?.let { EncodeWorkDeserializer().deserializeIfSuccessful(it.value()) } } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt index 0626fd5c..807888d3 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt @@ -27,20 +27,20 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle" topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, accepts = listOf( - KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, - KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, + KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED.event ) ) { override fun onMessageReceived(data: ConsumerRecord) { val referenceId = data.value().referenceId - if (data.key() == KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) { + if (data.key() == KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event) { val work = data.value().dataAs(ExtractWork::class.java) if (work == null) { logger.info { "Event: ${data.key()} value is null" } } else { storeExtractWork(referenceId, work) } - } else if (data.key() == KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) { + } else if (data.key() == KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED.event) { val work = data.value().dataAs(ConvertWork::class.java) if (work == null) { logger.info { "Event: ${data.key()} value is null" } @@ -63,10 +63,10 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle" fun produceMessage(referenceId: String, outFile: String, statusType: StatusType, result: Any?) { if (statusType == StatusType.SUCCESS) { - produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, referenceId) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_STORED_SUBTITLE, referenceId) logger.info { "Stored ${File(outFile).absolutePath} subtitle" } } else { - produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, Message(referenceId, Status(statusType), result), "See log") + produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_STORED_SUBTITLE, Message(referenceId, Status(statusType), result), "See log") logger.error { "Failed to store ${File(outFile).absolutePath} subtitle" } } } @@ -104,8 +104,8 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle" override fun loadDeserializers(): Map> { return DeserializerRegistry.getEventToDeserializer( - KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE, - KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED, + KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED ) } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt index 0bb884e1..fb139450 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt @@ -33,7 +33,7 @@ class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), IColle topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, - completionEvent = KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE, + completionEvent = KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED, acceptsFilter = listOf( KafkaEvents.EVENT_METADATA_OBTAINED, KafkaEvents.EVENT_READER_DETERMINED_SERIE, @@ -133,7 +133,7 @@ class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), IColle e.printStackTrace() } - produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", status) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_STORED_VIDEO, collection.getReferenceId() ?: "M.I.A", status) logger.info { "Stored ${encodeWork.outFile} video" } }