This commit is contained in:
Brage 2023-07-28 01:59:21 +02:00
parent b38b6b83dd
commit a72a1be19c
5 changed files with 18 additions and 16 deletions

View File

@ -12,7 +12,7 @@ open class Daemon(open val executable: String, val daemonInterface: IDaemon) {
var executor: ProcessResult? = null var executor: ProcessResult? = null
open suspend fun run(parameters: List<String>): Int { open suspend fun run(parameters: List<String>): Int {
daemonInterface.onStarted() daemonInterface.onStarted()
logger.info { "Daemon arguments: $executable ${parameters.joinToString(" ")}" } logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" }
executor = process(executable, *parameters.toTypedArray(), executor = process(executable, *parameters.toTypedArray(),
stdout = Redirect.CAPTURE, stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE, stderr = Redirect.CAPTURE,
@ -23,7 +23,7 @@ open class Daemon(open val executable: String, val daemonInterface: IDaemon) {
if (resultCode == 0) { if (resultCode == 0) {
daemonInterface.onEnded() daemonInterface.onEnded()
} else daemonInterface.onError(resultCode) } else daemonInterface.onError(resultCode)
logger.info { "Daemon ended: $resultCode" } logger.info { "$executable result: $resultCode" }
return resultCode return resultCode
} }
} }

View File

@ -50,7 +50,7 @@ class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : Def
accepts accepts
) { ) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) { override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" } logger.info { "\nreferenceId: ${data.value().referenceId} \nEvent: ${data.key()} \nData:\n${Gson().toJson(data.value())}" }
val message = data.value().apply { val message = data.value().apply {
this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value()) this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value())
} }

View File

@ -49,7 +49,7 @@ class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : De
accepts accepts
) { ) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) { override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" } logger.info { "\nreferenceId: ${data.value().referenceId} \nEvent: ${data.key()} \nData:\n${Gson().toJson(data.value())}" }
val message = data.value().apply { val message = data.value().apply {
this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value()) this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value())
} }

View File

@ -25,6 +25,7 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
} }
} catch (e: Exception) { } catch (e: Exception) {
//logger.error { e.message } //logger.error { e.message }
e.printStackTrace()
} }
} }

View File

@ -66,17 +66,17 @@ class RunnerCoordinator {
val workBlock = suspend { val workBlock = suspend {
val data: EncodeWork = message.data as EncodeWork val data: EncodeWork = message.data as EncodeWork
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
logger.info { "${message.referenceId} Starting encoding ${data.workId}" } logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" }
encodeDaemon.runUsingWorkItem() encodeDaemon.runUsingWorkItem()
} }
queue.trySend(ExecutionBlock("encode", workBlock)) queue.trySend(ExecutionBlock("encode", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS)))
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null"))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
} }
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
} }
} }
@ -87,17 +87,17 @@ class RunnerCoordinator {
val workBlock = suspend { val workBlock = suspend {
val data: ExtractWork = message.data as ExtractWork val data: ExtractWork = message.data as ExtractWork
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
logger.info { "${message.referenceId} Starting extraction ${data.workId}" } logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" }
extractDaemon.runUsingWorkItem() extractDaemon.runUsingWorkItem()
} }
queue.trySend(ExecutionBlock("extract", workBlock)) queue.trySend(ExecutionBlock("extract", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS)))
} else { } else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
} }
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message))) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
} }
} }
@ -107,12 +107,12 @@ class RunnerCoordinator {
val encodeListener = object: IEncodeListener { val encodeListener = object: IEncodeListener {
override fun onStarted(referenceId: String, work: EncodeWork) { override fun onStarted(referenceId: String, work: EncodeWork) {
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Started\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
} }
override fun onError(referenceId: String, work: EncodeWork, code: Int) { override fun onError(referenceId: String, work: EncodeWork, code: Int) {
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" } logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Failed\n${work.outFile} \nError: $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work))
} }
@ -121,24 +121,25 @@ class RunnerCoordinator {
} }
override fun onEnded(referenceId: String, work: EncodeWork) { override fun onEnded(referenceId: String, work: EncodeWork) {
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nEncode: Ended\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
} }
} }
val extractListener = object : IExtractListener { val extractListener = object : IExtractListener {
override fun onStarted(referenceId: String, work: ExtractWork) { override fun onStarted(referenceId: String, work: ExtractWork) {
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Started\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
} }
override fun onError(referenceId: String, work: ExtractWork, code: Int) { override fun onError(referenceId: String, work: ExtractWork, code: Int) {
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" } logger.error { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Failed\n${work.outFile} \nError: $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work))
} }
override fun onEnded(referenceId: String, work: ExtractWork) { override fun onEnded(referenceId: String, work: ExtractWork) {
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" } logger.info { "\nreferenceId: $referenceId \nWorkId ${work.workId} \nExtract: Ended\n${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work)) producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
} }