diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt index a9d0cf3e..d7b892d8 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.reader.collector +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry @@ -8,15 +9,20 @@ import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.exposed.sql.transactions.transaction import org.springframework.stereotype.Service import java.io.File @Service class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") { + private val logger = KotlinLogging.logger {} + private val listener = object: SimpleMessageListener( topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, @@ -29,12 +35,23 @@ class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConverted } val of = File(workResult.outFile) - SubtitleQuery( - title = of.nameWithoutExtension, - language = workResult.language, - collection = workResult.collection, - format = of.extension.uppercase() - ).insertAndGetStatus() + val status = transaction { + SubtitleQuery( + title = of.nameWithoutExtension, + language = workResult.language, + collection = workResult.collection, + format = of.extension.uppercase() + ).insertAndGetStatus() + } + val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) + + if (status) { + produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) + logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } + } else { + produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") + logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" } + } } } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt index 1dafdd54..4b1d5df8 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt @@ -117,8 +117,8 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), } } - val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS)) + val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) - logger.info { "Stored ${metadata.title} video" } + logger.info { "Stored ${encodeWork.outFile} video" } } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt index 388b2578..edce3dda 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.reader.collector +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry @@ -7,12 +8,16 @@ import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import org.apache.kafka.clients.consumer.ConsumerRecord import org.jetbrains.exposed.sql.transactions.transaction import java.io.File +private val logger = KotlinLogging.logger {} + class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { private val listener = object: SimpleMessageListener( @@ -27,7 +32,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted } val of = File(workResult.outFile) - transaction { + val status = transaction { SubtitleQuery( title = of.nameWithoutExtension, language = workResult.language, @@ -35,6 +40,15 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted format = of.extension.uppercase() ).insertAndGetStatus() } + val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) + + if (status) { + produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) + logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } + } else { + produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") + logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" } + } } }