This commit is contained in:
Brage 2023-07-25 02:43:34 +02:00
parent 4c57e9da0b
commit 07eab0c129
3 changed files with 40 additions and 9 deletions

View File

@ -1,5 +1,6 @@
package no.iktdev.streamit.content.reader.collector package no.iktdev.streamit.content.reader.collector
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
@ -8,15 +9,20 @@ import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@Service @Service
class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") { class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") {
private val logger = KotlinLogging.logger {}
private val listener = object: SimpleMessageListener( private val listener = object: SimpleMessageListener(
topic = CommonConfig.kafkaTopic, topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer, consumer = defaultConsumer,
@ -29,6 +35,7 @@ class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConverted
} }
val of = File(workResult.outFile) val of = File(workResult.outFile)
val status = transaction {
SubtitleQuery( SubtitleQuery(
title = of.nameWithoutExtension, title = of.nameWithoutExtension,
language = workResult.language, language = workResult.language,
@ -36,6 +43,16 @@ class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConverted
format = of.extension.uppercase() format = of.extension.uppercase()
).insertAndGetStatus() ).insertAndGetStatus()
} }
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) {
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")
logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" }
}
}
} }
init { init {

View File

@ -117,8 +117,8 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
} }
} }
val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS)) val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)
logger.info { "Stored ${metadata.title} video" } logger.info { "Stored ${encodeWork.outFile} video" }
} }
} }

View File

@ -1,5 +1,6 @@
package no.iktdev.streamit.content.reader.collector package no.iktdev.streamit.content.reader.collector
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
@ -7,12 +8,16 @@ import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
import java.io.File import java.io.File
private val logger = KotlinLogging.logger {}
class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") {
private val listener = object: SimpleMessageListener( private val listener = object: SimpleMessageListener(
@ -27,7 +32,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted
} }
val of = File(workResult.outFile) val of = File(workResult.outFile)
transaction { val status = transaction {
SubtitleQuery( SubtitleQuery(
title = of.nameWithoutExtension, title = of.nameWithoutExtension,
language = workResult.language, language = workResult.language,
@ -35,6 +40,15 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted
format = of.extension.uppercase() format = of.extension.uppercase()
).insertAndGetStatus() ).insertAndGetStatus()
} }
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) {
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")
logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" }
}
} }
} }