Updated logging
This commit is contained in:
parent
d6f9945576
commit
c13510f786
@ -1,5 +1,6 @@
|
|||||||
package no.iktdev.streamit.content.encode
|
package no.iktdev.streamit.content.encode
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||||
@ -14,8 +15,11 @@ import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSucce
|
|||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("encodeWork") {
|
class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("encodeWork") {
|
||||||
|
|
||||||
lateinit var encodeInstructionsListener: EncodeInformationListener
|
lateinit var encodeInstructionsListener: EncodeInformationListener
|
||||||
|
|
||||||
init {
|
init {
|
||||||
@ -45,6 +49,7 @@ class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : Def
|
|||||||
accepts
|
accepts
|
||||||
) {
|
) {
|
||||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||||
|
logger.info { "${data.value().referenceId}: ${data.key()}" }
|
||||||
val message = data.value().apply {
|
val message = data.value().apply {
|
||||||
this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value())
|
this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package no.iktdev.streamit.content.encode
|
package no.iktdev.streamit.content.encode
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||||
@ -14,6 +15,7 @@ import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeseri
|
|||||||
import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful
|
import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("extractWork") {
|
class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("extractWork") {
|
||||||
@ -46,6 +48,7 @@ class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : De
|
|||||||
accepts
|
accepts
|
||||||
) {
|
) {
|
||||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||||
|
logger.info { "${data.value().referenceId}: ${data.key()}" }
|
||||||
val message = data.value().apply {
|
val message = data.value().apply {
|
||||||
this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value())
|
this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService
|
|||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
import java.util.concurrent.ThreadPoolExecutor
|
import java.util.concurrent.ThreadPoolExecutor
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class RunnerCoordinator {
|
class RunnerCoordinator {
|
||||||
@ -47,12 +48,14 @@ class RunnerCoordinator {
|
|||||||
if (message.data is EncodeWork) {
|
if (message.data is EncodeWork) {
|
||||||
val data: EncodeWork = message.data as EncodeWork
|
val data: EncodeWork = message.data as EncodeWork
|
||||||
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
|
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
|
||||||
|
logger.info { "${message.referenceId} Starting encoding ${data.workId}" }
|
||||||
encodeDaemon.runUsingWorkItem()
|
encodeDaemon.runUsingWorkItem()
|
||||||
} else {
|
} else {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork")))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
e.printStackTrace()
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,11 +70,13 @@ class RunnerCoordinator {
|
|||||||
if (message.data is ExtractWork) {
|
if (message.data is ExtractWork) {
|
||||||
val data: ExtractWork = message.data as ExtractWork
|
val data: ExtractWork = message.data as ExtractWork
|
||||||
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
||||||
|
logger.info { "${message.referenceId} Starting extraction ${data.workId}" }
|
||||||
extractDaemon.runUsingWorkItem()
|
extractDaemon.runUsingWorkItem()
|
||||||
} else {
|
} else {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
e.printStackTrace()
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user