diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/WorkOrderItem.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/WorkOrderItem.kt new file mode 100644 index 00000000..2bcccb1b --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/WorkOrderItem.kt @@ -0,0 +1,19 @@ +package no.iktdev.streamit.content.common.dto + +data class WorkOrderItem( + val id: String, + val inputFile: String, + val outputFile: String, + val collection: String, + val state: State, + val progress: Int = 0, + val remainingTime: Long? = null +) + +enum class State { + QUEUED, + STARTED, + UPDATED, + FAILURE, + ENDED +} \ No newline at end of file diff --git a/Encode/build.gradle.kts b/Encode/build.gradle.kts index 1e677ee4..bc6f21ae 100644 --- a/Encode/build.gradle.kts +++ b/Encode/build.gradle.kts @@ -37,6 +37,8 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.kafka:spring-kafka:2.8.5") + implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + testImplementation("junit:junit:4.13.2") diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/Configuration.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/Configuration.kt new file mode 100644 index 00000000..c135c342 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/Configuration.kt @@ -0,0 +1,33 @@ +package no.iktdev.streamit.content.encode + +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory +import org.springframework.boot.web.server.WebServerFactoryCustomizer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.simp.config.MessageBrokerRegistry +import org.springframework.web.bind.annotation.RestController +import org.springframework.web.method.HandlerTypePredicate +import org.springframework.web.servlet.config.annotation.CorsRegistry +import org.springframework.web.servlet.config.annotation.PathMatchConfigurer +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker +import org.springframework.web.socket.config.annotation.StompEndpointRegistry +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer + +@Configuration +@EnableWebSocketMessageBroker +class WebSocketConfig : WebSocketMessageBrokerConfigurer { + + override fun registerStompEndpoints(registry: StompEndpointRegistry) { + registry.addEndpoint("/ws") + // .setAllowedOrigins("*") + .withSockJS() + } + + override fun configureMessageBroker(registry: MessageBrokerRegistry) { + registry.enableSimpleBroker("/topic") + registry.setApplicationDestinationPrefixes("/app") + } +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt index 2c59cde4..4177968d 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt @@ -1,6 +1,8 @@ package no.iktdev.streamit.content.encode +import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.observableMapOf +import no.iktdev.streamit.content.common.dto.WorkOrderItem import no.iktdev.streamit.content.encode.progress.Progress import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication @@ -20,6 +22,9 @@ fun main(args: Array) { context = runApplication(*args) } +val encoderItems = ObservableMap() +val extractItems = ObservableMap() + /*val progress = ObservableMap().also { it.addListener(object: ObservableMap.Listener { override fun onPut(key: String, value: EncodeInformation) { diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt index 3b2b9cd4..bebf3271 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt @@ -7,5 +7,6 @@ data class Progress( val time: String, val duration: String, val speed: String, + val estimatedCompletionSeconds: Long = -1, val estimatedCompletion: String = "Unknown", ) \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt index 3db77f80..9349c8fd 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt @@ -92,6 +92,7 @@ class ProgressDecoder(val workBase: WorkBase) { return Progress( workId = workBase.workId, outFileName = File(workBase.outFile).name, progress = progress, + estimatedCompletionSeconds = ect, estimatedCompletion = getETA(ect), duration = durationTime, time = decoded.out_time ?: "NA", diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index a0fbf40c..a7f3885f 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -7,9 +7,12 @@ import no.iktdev.streamit.content.encode.EncodeEnv import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.dto.State +import no.iktdev.streamit.content.common.dto.WorkOrderItem import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork -import no.iktdev.streamit.content.encode.progress.DecodedProgressData +import no.iktdev.streamit.content.encode.encoderItems +import no.iktdev.streamit.content.encode.extractItems import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progressMap import no.iktdev.streamit.library.kafka.KafkaEvents @@ -29,7 +32,9 @@ data class ExecutionBlock( ) @Service -class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { +class RunnerCoordinator( + private var maxConcurrentJobs: Int = 1, +) { private val logger = KotlinLogging.logger {} val producer = DefaultProducer(CommonConfig.kafkaTopic) @@ -76,13 +81,24 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { } - - fun addEncodeMessageToQueue(message: Message) { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.PENDING)) + ) try { if (message.data != null && message.data is EncodeWork) { val work = message.data as EncodeWork + encoderItems.put( + message.referenceId, WorkOrderItem( + id = message.referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.QUEUED + ) + ) + val workBlock = suspend { val data: EncodeWork = work val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) @@ -100,28 +116,49 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { } } } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, + message.withNewStatus(Status(statusType)) + ) } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")) + ) } } catch (e: Exception) { e.printStackTrace() - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.ERROR, e.message)) + ) } } fun addExtractMessageToQueue(message: Message) { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.PENDING)) + ) try { if (message.data != null && message.data is ExtractWork) { val work = message.data as ExtractWork + extractItems.put( + message.referenceId, WorkOrderItem( + id = message.referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.QUEUED + ) + ) val workBlock = suspend { val data: ExtractWork = work val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" } extractDaemon.runUsingWorkItem() } - val result = queue.trySend(ExecutionBlock(work.workId,"extract", workBlock)) + val result = queue.trySend(ExecutionBlock(work.workId, "extract", workBlock)) val statusType = when (result.isClosed) { true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert false -> { @@ -132,57 +169,154 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { } } } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, + message.withNewStatus(Status(statusType)) + ) } else { - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")) + ) } } catch (e: Exception) { e.printStackTrace() - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, + message.withNewStatus(Status(StatusType.ERROR, e.message)) + ) } } - - - - val encodeListener = object: IEncodeListener { + val encodeListener = object : IEncodeListener { override fun onStarted(referenceId: String, work: EncodeWork) { logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Started\n${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, + Message(referenceId, Status(statusType = StatusType.SUCCESS), work) + ) + encoderItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.STARTED + ) + ) } override fun onError(referenceId: String, work: EncodeWork, code: Int) { logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Failed\n${work.outFile} \nError: $code" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, + Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work) + ) + encoderItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.FAILURE + ) + ) } override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { - logger.debug { "Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${Gson().toJson(progress)}" } + logger.debug { + "Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${ + Gson().toJson( + progress + ) + }" + } progressMap.put(work.workId, progress) + encoderItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.UPDATED, + progress = progress.progress, + remainingTime = progress.estimatedCompletionSeconds + ) + ) } override fun onEnded(referenceId: String, work: EncodeWork) { logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Ended\n${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, + Message(referenceId, Status(statusType = StatusType.SUCCESS), work) + ) + encoderItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.ENDED, + progress = 100, + remainingTime = null + ) + ) } } val extractListener = object : IExtractListener { override fun onStarted(referenceId: String, work: ExtractWork) { logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Started\n${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, + Message(referenceId, Status(statusType = StatusType.SUCCESS), work) + ) + extractItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.STARTED + ) + ) } override fun onError(referenceId: String, work: ExtractWork, code: Int) { logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Failed\n${work.outFile} \nError: $code" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, + Message(referenceId, Status(StatusType.ERROR, code.toString()), work) + ) + extractItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.FAILURE + ) + ) } override fun onEnded(referenceId: String, work: ExtractWork) { logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Ended\n${work.outFile}" } - producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) + producer.sendMessage( + KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, + Message(referenceId, Status(statusType = StatusType.SUCCESS), work) + ) + extractItems.put( + referenceId, WorkOrderItem( + id = referenceId, + inputFile = work.inFile, + outputFile = work.outFile, + collection = work.collection, + state = State.ENDED + ) + ) } } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/topics/EncoderTopic.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/topics/EncoderTopic.kt new file mode 100644 index 00000000..e4d8b5c1 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/topics/EncoderTopic.kt @@ -0,0 +1,65 @@ +package no.iktdev.streamit.content.encode.topics + +import no.iktdev.exfl.observable.ObservableMap +import no.iktdev.streamit.content.common.dto.WorkOrderItem +import no.iktdev.streamit.content.encode.encoderItems +import no.iktdev.streamit.content.encode.extractItems +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.messaging.simp.SimpMessagingTemplate +import org.springframework.stereotype.Controller + +@Controller +class EncoderTopic( + @Autowired val template: SimpMessagingTemplate?, +) { + + init { + encoderItems.addListener(object : ObservableMap.Listener { + override fun onMapUpdated(map: Map) { + super.onMapUpdated(map) + pushEncoderQueue() + } + + override fun onPut(key: String, value: WorkOrderItem) { + super.onPut(key, value) + pushEncoderWorkOrder(value) + } + }) + extractItems.addListener(object : ObservableMap.Listener { + override fun onMapUpdated(map: Map) { + super.onMapUpdated(map) + pushExtractorQueue() + } + + override fun onPut(key: String, value: WorkOrderItem) { + super.onPut(key, value) + pushExtractorWorkOrder(value) + } + }) + } + + fun pushEncoderWorkOrder(item: WorkOrderItem) { + template?.convertAndSend("/topic/encoder/workorder", item) + } + + fun pushExtractorWorkOrder(item: WorkOrderItem) { + template?.convertAndSend("/topic/extractor/workorder", item) + } + + @MessageMapping("/encoder/queue") + fun pushEncoderQueue() { + template?.convertAndSend("/topic/encoder/queue", encoderItems.values) + } + + @MessageMapping("/extractor/queue") + fun pushExtractorQueue() { + template?.convertAndSend("/topic/extractor/queue", extractItems.values) + + } + + + + + +} \ No newline at end of file