Added socket

This commit is contained in:
Brage 2023-09-13 01:23:14 +02:00
parent db8b5db636
commit 20478420be
8 changed files with 284 additions and 24 deletions

View File

@ -0,0 +1,19 @@
package no.iktdev.streamit.content.common.dto
data class WorkOrderItem(
val id: String,
val inputFile: String,
val outputFile: String,
val collection: String,
val state: State,
val progress: Int = 0,
val remainingTime: Long? = null
)
enum class State {
QUEUED,
STARTED,
UPDATED,
FAILURE,
ENDED
}

View File

@ -37,6 +37,8 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:2.7.0")
implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
testImplementation("junit:junit:4.13.2") testImplementation("junit:junit:4.13.2")

View File

@ -0,0 +1,33 @@
package no.iktdev.streamit.content.encode
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory
import org.springframework.boot.web.server.WebServerFactoryCustomizer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.method.HandlerTypePredicate
import org.springframework.web.servlet.config.annotation.CorsRegistry
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws")
// .setAllowedOrigins("*")
.withSockJS()
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.enableSimpleBroker("/topic")
registry.setApplicationDestinationPrefixes("/app")
}
}

View File

@ -1,6 +1,8 @@
package no.iktdev.streamit.content.encode package no.iktdev.streamit.content.encode
import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.observableMapOf import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.streamit.content.common.dto.WorkOrderItem
import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.Progress
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
@ -20,6 +22,9 @@ fun main(args: Array<String>) {
context = runApplication<EncoderApplication>(*args) context = runApplication<EncoderApplication>(*args)
} }
val encoderItems = ObservableMap<String, WorkOrderItem>()
val extractItems = ObservableMap<String, WorkOrderItem>()
/*val progress = ObservableMap<String, EncodeInformation>().also { /*val progress = ObservableMap<String, EncodeInformation>().also {
it.addListener(object: ObservableMap.Listener<String, EncodeInformation> { it.addListener(object: ObservableMap.Listener<String, EncodeInformation> {
override fun onPut(key: String, value: EncodeInformation) { override fun onPut(key: String, value: EncodeInformation) {

View File

@ -7,5 +7,6 @@ data class Progress(
val time: String, val time: String,
val duration: String, val duration: String,
val speed: String, val speed: String,
val estimatedCompletionSeconds: Long = -1,
val estimatedCompletion: String = "Unknown", val estimatedCompletion: String = "Unknown",
) )

View File

@ -92,6 +92,7 @@ class ProgressDecoder(val workBase: WorkBase) {
return Progress( return Progress(
workId = workBase.workId, outFileName = File(workBase.outFile).name, workId = workBase.workId, outFileName = File(workBase.outFile).name,
progress = progress, progress = progress,
estimatedCompletionSeconds = ect,
estimatedCompletion = getETA(ect), estimatedCompletion = getETA(ect),
duration = durationTime, duration = durationTime,
time = decoded.out_time ?: "NA", time = decoded.out_time ?: "NA",

View File

@ -7,9 +7,12 @@ import no.iktdev.streamit.content.encode.EncodeEnv
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.dto.State
import no.iktdev.streamit.content.common.dto.WorkOrderItem
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.content.encode.progress.DecodedProgressData import no.iktdev.streamit.content.encode.encoderItems
import no.iktdev.streamit.content.encode.extractItems
import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.Progress
import no.iktdev.streamit.content.encode.progressMap import no.iktdev.streamit.content.encode.progressMap
import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.KafkaEvents
@ -29,7 +32,9 @@ data class ExecutionBlock(
) )
@Service @Service
class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { class RunnerCoordinator(
private var maxConcurrentJobs: Int = 1,
) {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
val producer = DefaultProducer(CommonConfig.kafkaTopic) val producer = DefaultProducer(CommonConfig.kafkaTopic)
@ -76,13 +81,24 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
} }
fun addEncodeMessageToQueue(message: Message) { fun addEncodeMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.PENDING))
)
try { try {
if (message.data != null && message.data is EncodeWork) { if (message.data != null && message.data is EncodeWork) {
val work = message.data as EncodeWork val work = message.data as EncodeWork
encoderItems.put(
message.referenceId, WorkOrderItem(
id = message.referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.QUEUED
)
)
val workBlock = suspend { val workBlock = suspend {
val data: EncodeWork = work val data: EncodeWork = work
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
@ -100,21 +116,42 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
} }
} }
} }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event,
message.withNewStatus(Status(statusType))
)
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))
)
} }
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.ERROR, e.message))
)
} }
} }
fun addExtractMessageToQueue(message: Message) { fun addExtractMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.PENDING))
)
try { try {
if (message.data != null && message.data is ExtractWork) { if (message.data != null && message.data is ExtractWork) {
val work = message.data as ExtractWork val work = message.data as ExtractWork
extractItems.put(
message.referenceId, WorkOrderItem(
id = message.referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.QUEUED
)
)
val workBlock = suspend { val workBlock = suspend {
val data: ExtractWork = work val data: ExtractWork = work
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
@ -132,57 +169,154 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
} }
} }
} }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(statusType))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event,
message.withNewStatus(Status(statusType))
)
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))
)
} }
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event,
message.withNewStatus(Status(StatusType.ERROR, e.message))
)
} }
} }
val encodeListener = object : IEncodeListener { val encodeListener = object : IEncodeListener {
override fun onStarted(referenceId: String, work: EncodeWork) { override fun onStarted(referenceId: String, work: EncodeWork) {
logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Started\n${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Started\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event,
Message(referenceId, Status(statusType = StatusType.SUCCESS), work)
)
encoderItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.STARTED
)
)
} }
override fun onError(referenceId: String, work: EncodeWork, code: Int) { override fun onError(referenceId: String, work: EncodeWork, code: Int) {
logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Failed\n${work.outFile} \nError: $code" } logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Failed\n${work.outFile} \nError: $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event,
Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)
)
encoderItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.FAILURE
)
)
} }
override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) {
logger.debug { "Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${Gson().toJson(progress)}" } logger.debug {
"Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${
Gson().toJson(
progress
)
}"
}
progressMap.put(work.workId, progress) progressMap.put(work.workId, progress)
encoderItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.UPDATED,
progress = progress.progress,
remainingTime = progress.estimatedCompletionSeconds
)
)
} }
override fun onEnded(referenceId: String, work: EncodeWork) { override fun onEnded(referenceId: String, work: EncodeWork) {
logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Ended\n${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Ended\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event,
Message(referenceId, Status(statusType = StatusType.SUCCESS), work)
)
encoderItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.ENDED,
progress = 100,
remainingTime = null
)
)
} }
} }
val extractListener = object : IExtractListener { val extractListener = object : IExtractListener {
override fun onStarted(referenceId: String, work: ExtractWork) { override fun onStarted(referenceId: String, work: ExtractWork) {
logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Started\n${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Started\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event,
Message(referenceId, Status(statusType = StatusType.SUCCESS), work)
)
extractItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.STARTED
)
)
} }
override fun onError(referenceId: String, work: ExtractWork, code: Int) { override fun onError(referenceId: String, work: ExtractWork, code: Int) {
logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Failed\n${work.outFile} \nError: $code" } logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Failed\n${work.outFile} \nError: $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event,
Message(referenceId, Status(StatusType.ERROR, code.toString()), work)
)
extractItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.FAILURE
)
)
} }
override fun onEnded(referenceId: String, work: ExtractWork) { override fun onEnded(referenceId: String, work: ExtractWork) {
logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Ended\n${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Ended\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event,
Message(referenceId, Status(statusType = StatusType.SUCCESS), work)
)
extractItems.put(
referenceId, WorkOrderItem(
id = referenceId,
inputFile = work.inFile,
outputFile = work.outFile,
collection = work.collection,
state = State.ENDED
)
)
} }
} }

View File

@ -0,0 +1,65 @@
package no.iktdev.streamit.content.encode.topics
import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.streamit.content.common.dto.WorkOrderItem
import no.iktdev.streamit.content.encode.encoderItems
import no.iktdev.streamit.content.encode.extractItems
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.stereotype.Controller
@Controller
class EncoderTopic(
@Autowired val template: SimpMessagingTemplate?,
) {
init {
encoderItems.addListener(object : ObservableMap.Listener<String, WorkOrderItem> {
override fun onMapUpdated(map: Map<String, WorkOrderItem>) {
super.onMapUpdated(map)
pushEncoderQueue()
}
override fun onPut(key: String, value: WorkOrderItem) {
super.onPut(key, value)
pushEncoderWorkOrder(value)
}
})
extractItems.addListener(object : ObservableMap.Listener<String, WorkOrderItem> {
override fun onMapUpdated(map: Map<String, WorkOrderItem>) {
super.onMapUpdated(map)
pushExtractorQueue()
}
override fun onPut(key: String, value: WorkOrderItem) {
super.onPut(key, value)
pushExtractorWorkOrder(value)
}
})
}
fun pushEncoderWorkOrder(item: WorkOrderItem) {
template?.convertAndSend("/topic/encoder/workorder", item)
}
fun pushExtractorWorkOrder(item: WorkOrderItem) {
template?.convertAndSend("/topic/extractor/workorder", item)
}
@MessageMapping("/encoder/queue")
fun pushEncoderQueue() {
template?.convertAndSend("/topic/encoder/queue", encoderItems.values)
}
@MessageMapping("/extractor/queue")
fun pushExtractorQueue() {
template?.convertAndSend("/topic/extractor/queue", extractItems.values)
}
}