More logging
This commit is contained in:
parent
c13510f786
commit
ae07e18a84
@ -1,5 +1,6 @@
|
||||
package no.iktdev.streamit.content.encode
|
||||
|
||||
import com.google.gson.Gson
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
@ -49,7 +50,7 @@ class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : Def
|
||||
accepts
|
||||
) {
|
||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||
logger.info { "${data.value().referenceId}: ${data.key()}" }
|
||||
logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" }
|
||||
val message = data.value().apply {
|
||||
this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value())
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package no.iktdev.streamit.content.encode
|
||||
|
||||
import com.google.gson.Gson
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
@ -48,7 +49,7 @@ class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : De
|
||||
accepts
|
||||
) {
|
||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||
logger.info { "${data.value().referenceId}: ${data.key()}" }
|
||||
logger.info { "${data.value().referenceId}: ${data.key()} ${Gson().toJson(data.value())}" }
|
||||
val message = data.value().apply {
|
||||
this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value())
|
||||
}
|
||||
|
||||
@ -45,13 +45,13 @@ class RunnerCoordinator {
|
||||
encodeExecutor.execute {
|
||||
try {
|
||||
runBlocking {
|
||||
if (message.data is EncodeWork) {
|
||||
if (message.data != null && message.data is EncodeWork) {
|
||||
val data: EncodeWork = message.data as EncodeWork
|
||||
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
|
||||
logger.info { "${message.referenceId} Starting encoding ${data.workId}" }
|
||||
encodeDaemon.runUsingWorkItem()
|
||||
} else {
|
||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork")))
|
||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
@ -67,7 +67,7 @@ class RunnerCoordinator {
|
||||
extractExecutor.execute {
|
||||
runBlocking {
|
||||
try {
|
||||
if (message.data is ExtractWork) {
|
||||
if (message.data != null && message.data is ExtractWork) {
|
||||
val data: ExtractWork = message.data as ExtractWork
|
||||
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
||||
logger.info { "${message.referenceId} Starting extraction ${data.workId}" }
|
||||
|
||||
Loading…
Reference in New Issue
Block a user