Updated encode runner
This commit is contained in:
parent
af6d58e7bd
commit
4bb2cef402
@ -37,6 +37,7 @@ class RunnerCoordinator {
|
|||||||
val scope = CoroutineScope(dispatcher)
|
val scope = CoroutineScope(dispatcher)
|
||||||
|
|
||||||
fun addEncodeMessageToQueue(message: Message) {
|
fun addEncodeMessageToQueue(message: Message) {
|
||||||
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
||||||
scope.launch {
|
scope.launch {
|
||||||
try {
|
try {
|
||||||
if (message.data != null && message.data is EncodeWork) {
|
if (message.data != null && message.data is EncodeWork) {
|
||||||
@ -53,28 +54,25 @@ class RunnerCoordinator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun addExtractMessageToQueue(message: Message) {
|
fun addExtractMessageToQueue(message: Message) {
|
||||||
executor.execute {
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
||||||
runBlocking {
|
scope.launch {
|
||||||
try {
|
try {
|
||||||
if (message.data != null && message.data is ExtractWork) {
|
if (message.data != null && message.data is ExtractWork) {
|
||||||
val data: ExtractWork = message.data as ExtractWork
|
val data: ExtractWork = message.data as ExtractWork
|
||||||
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
||||||
logger.info { "${message.referenceId} Starting extraction ${data.workId}" }
|
logger.info { "${message.referenceId} Starting extraction ${data.workId}" }
|
||||||
extractDaemon.runUsingWorkItem()
|
extractDaemon.runUsingWorkItem()
|
||||||
} else {
|
} else {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
e.printStackTrace()
|
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
e.printStackTrace()
|
||||||
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -84,7 +82,7 @@ class RunnerCoordinator {
|
|||||||
val encodeListener = object: IEncodeListener {
|
val encodeListener = object: IEncodeListener {
|
||||||
override fun onStarted(referenceId: String, work: EncodeWork) {
|
override fun onStarted(referenceId: String, work: EncodeWork) {
|
||||||
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
|
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onError(referenceId: String, work: EncodeWork, code: Int) {
|
override fun onError(referenceId: String, work: EncodeWork, code: Int) {
|
||||||
@ -98,24 +96,24 @@ class RunnerCoordinator {
|
|||||||
|
|
||||||
override fun onEnded(referenceId: String, work: EncodeWork) {
|
override fun onEnded(referenceId: String, work: EncodeWork) {
|
||||||
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
|
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val extractListener = object : IExtractListener {
|
val extractListener = object : IExtractListener {
|
||||||
override fun onStarted(referenceId: String, work: ExtractWork) {
|
override fun onStarted(referenceId: String, work: ExtractWork) {
|
||||||
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" }
|
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" }
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onError(referenceId: String, work: ExtractWork, code: Int) {
|
override fun onError(referenceId: String, work: ExtractWork, code: Int) {
|
||||||
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" }
|
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" }
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR), work))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onEnded(referenceId: String, work: ExtractWork) {
|
override fun onEnded(referenceId: String, work: ExtractWork) {
|
||||||
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" }
|
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" }
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user