From 605d8630dfc64cc95fadcde76c17d1f2d783d87c Mon Sep 17 00:00:00 2001 From: Brage Date: Wed, 26 Jul 2023 16:25:30 +0200 Subject: [PATCH] Fix --- Reader/build.gradle.kts | 2 +- .../collector/ConvertedSubtitleConsumer.kt | 65 ----------- .../collector/ExtractedSubtitleConsumer.kt | 62 ---------- .../reader/collector/SubtitleConsumer.kt | 109 ++++++++++++++++++ ...codedVideoConsumer.kt => VideoConsumer.kt} | 92 ++++++++------- 5 files changed, 159 insertions(+), 171 deletions(-) delete mode 100644 Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt delete mode 100644 Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt create mode 100644 Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt rename Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/{EncodedVideoConsumer.kt => VideoConsumer.kt} (58%) diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 857d3e47..11bb37a9 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -27,7 +27,7 @@ dependencies { implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") - implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha9") + implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha10") implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt deleted file mode 100644 index 122097e4..00000000 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt +++ /dev/null @@ -1,65 +0,0 @@ -package no.iktdev.streamit.content.reader.collector - -import mu.KotlinLogging -import no.iktdev.streamit.content.common.CommonConfig -import no.iktdev.streamit.content.common.DefaultKafkaReader -import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry -import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork -import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork -import no.iktdev.streamit.library.db.query.SubtitleQuery -import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.Status -import no.iktdev.streamit.library.kafka.dto.StatusType -import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener -import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.jetbrains.exposed.sql.transactions.transaction -import org.springframework.stereotype.Service -import java.io.File - -@Service -class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") { - - private val logger = KotlinLogging.logger {} - - private val listener = object: SimpleMessageListener( - topic = CommonConfig.kafkaTopic, - consumer = defaultConsumer, - accepts = listOf(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) - ) { - override fun onMessageReceived(data: ConsumerRecord) { - val workResult = data.value().dataAs(ConvertWork::class.java) - if (!data.value().isSuccessful() || workResult == null) { - return - } - - val of = File(workResult.outFile) - val status = transaction { - SubtitleQuery( - title = of.nameWithoutExtension, - language = workResult.language, - collection = workResult.collection, - format = of.extension.uppercase() - ).insertAndGetStatus() - } - val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) - - if (status) { - produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, data.value().referenceId) - logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } - } else { - produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") - logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" } - } - } - } - - init { - listener.listen() - } - - override fun loadDeserializers(): Map> { - return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE) - } -} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt deleted file mode 100644 index 912abde0..00000000 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt +++ /dev/null @@ -1,62 +0,0 @@ -package no.iktdev.streamit.content.reader.collector - -import mu.KotlinLogging -import no.iktdev.streamit.content.common.CommonConfig -import no.iktdev.streamit.content.common.DefaultKafkaReader -import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry -import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork -import no.iktdev.streamit.library.db.query.SubtitleQuery -import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.Status -import no.iktdev.streamit.library.kafka.dto.StatusType -import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener -import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.jetbrains.exposed.sql.transactions.transaction -import java.io.File - -private val logger = KotlinLogging.logger {} - -class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { - - private val listener = object: SimpleMessageListener( - topic = CommonConfig.kafkaTopic, - consumer = defaultConsumer, - accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) - ) { - override fun onMessageReceived(data: ConsumerRecord) { - val workResult = data.value().dataAs(ExtractWork::class.java) - if (!data.value().isSuccessful() || workResult == null) { - return - } - - val of = File(workResult.outFile) - val status = transaction { - SubtitleQuery( - title = of.nameWithoutExtension, - language = workResult.language, - collection = workResult.collection, - format = of.extension.uppercase() - ).insertAndGetStatus() - } - val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) - - if (status) { - produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, data.value().referenceId) - logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } - } else { - produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") - logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" } - } - } - } - - init { - listener.listen() - } - - override fun loadDeserializers(): Map> { - return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE) - } -} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt new file mode 100644 index 00000000..db3ab1f2 --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/SubtitleConsumer.kt @@ -0,0 +1,109 @@ +package no.iktdev.streamit.content.reader.collector + +import mu.KotlinLogging +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.library.db.query.SubtitleQuery +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.exposed.sql.transactions.transaction +import org.springframework.stereotype.Service +import java.io.File + +private val logger = KotlinLogging.logger {} + +@Service +class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { + + private val listener = object: SimpleMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accepts = listOf( + KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, + KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event + ) + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val referenceId = data.value().referenceId + if (data.key() == KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) { + val work = data.value().dataAs(ExtractWork::class.java) + if (work == null) { + logger.info { "Event: ${data.key()} value is null" } + } else { + storeExtractWork(referenceId, work) + } + } else if (data.key() == KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) { + val work = data.value().dataAs(ConvertWork::class.java) + if (work == null) { + logger.info { "Event: ${data.key()} value is null" } + } else { + storeConvertWork(referenceId, work) + } + } else { + if (data.value().isSuccessful()) { + logger.warn { "Event: ${data.key()} is not captured" } + } else { + logger.info { "Event: ${data.key()} is not ${StatusType.SUCCESS.name}" } + } + } + } + } + + init { + listener.listen() + } + + fun produceMessage(referenceId: String, outFile: String, statusType: StatusType, result: Any?) { + if (statusType == StatusType.SUCCESS) { + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, referenceId) + logger.info { "Stored ${File(outFile).absolutePath} subtitle" } + } else { + produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, Message(referenceId, Status(statusType), result), "See log") + logger.error { "Failed to store ${File(outFile).absolutePath} subtitle" } + } + } + + fun storeExtractWork(referenceId: String, work: ExtractWork) { + val of = File(work.outFile) + val status = transaction { + SubtitleQuery( + title = of.nameWithoutExtension, + language = work.language, + collection = work.collection, + format = of.extension.uppercase() + ) + .insertAndGetStatus() + } + produceMessage(referenceId, work.outFile, if (status) StatusType.SUCCESS else StatusType.ERROR, "Store Extracted: $status") + } + + fun storeConvertWork(referenceId: String, work: ConvertWork) { + val of = File(work.outFile) + val status = transaction { + SubtitleQuery( + title = of.nameWithoutExtension, + language = work.language, + collection = work.collection, + format = of.extension.uppercase() + ) + .insertAndGetStatus() + } + produceMessage(referenceId, work.outFile, if (status) StatusType.SUCCESS else StatusType.ERROR, "Store Converted: $status") + } + + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer( + KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE, + KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE + ) + } +} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt similarity index 58% rename from Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt rename to Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt index 3452f2e9..ee2cbd2d 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/VideoConsumer.kt @@ -6,11 +6,10 @@ import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.Downloader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.dto.Metadata +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo import no.iktdev.streamit.library.db.query.* import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.Status -import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization @@ -21,7 +20,7 @@ import java.io.File private val logger = KotlinLogging.logger {} @Service -class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent { +class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent { val listener = CollectorMessageListener( topic = CommonConfig.kafkaTopic, @@ -43,9 +42,6 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), } - - - override fun loadDeserializers(): Map> { return DeserializerRegistry.getEventToDeserializer(*listener.acceptsFilter.toTypedArray(), listener.initiatorEvent, listener.completionEvent) } @@ -62,24 +58,18 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), logger.error { "Required data is null, as it has either status as non successful or simply missing" } return } - val videoFileNameWithExtension = File(encodeWork.outFile).name - - val contentType = metadata?.type ?: return - val iid = if (contentType == "movie") transaction { - MovieQuery(videoFileNameWithExtension).insertAndGetId() - } else null - - if (serieData != null) { - val success = transaction { - SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension) - .insertAndGetStatus() - } - if (!success) - return + val iid = transaction { + val serieStatus = if (serieData != null) { + getSerieQueryInstance(serieData, videoFileNameWithExtension)?.insertAndGetStatus() ?: false + } else true + if (serieData == null || metadata?.type == "movie") { + MovieQuery(videoFileNameWithExtension).insertAndGetId() + } else null } + val coverFile = metadata?.cover?.let { coverUrl -> runBlocking { try { @@ -90,34 +80,50 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), } } } - val metaGenre = metadata.genres - val gq = GenreQuery(*metaGenre.toTypedArray()) - transaction { - gq.insertAndGetIds() - } - val gids = transaction { gq.getIds().joinToString(",") } - val cq = CatalogQuery( - title = fileData.title, - cover = coverFile?.name, - type = contentType, - collection = fileData.title, - iid = iid, - genres = gids - ) - val cid = transaction { cq.insertAndGetId() ?: cq.getId() } ?: return - if (!metadata.summary.isNullOrBlank()) { - val summary = metadata.summary ?: return + // Serie må alltid fullføres før catalog. dette i tilfelle catalog allerede eksisterer og den thrower slik at transaskjonen blir versertert! + + val status = try { transaction { - SummaryQuery( - cid = cid, - language = "eng", // TODO: Fix later, - description = summary + val genres = metadata?.let { insertAndGetGenres(it) } + + val cq = CatalogQuery( + title = fileData.title, + cover = coverFile?.name, + type = if (serieData == null) "movie" else "serie", + collection = fileData.title, + iid = iid, + genres = genres ) + cq.insert() + val cqId = cq.getId() ?: throw RuntimeException("No Catalog id found!") + metadata?.let { + val summary = it.summary + if (summary != null) { + SummaryQuery(cid = cqId, language = "eng", description = summary) + } + } } + } catch (e: Exception) { + e.printStackTrace() } - produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", null) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", status) logger.info { "Stored ${encodeWork.outFile} video" } } + + /** + * Needs to be wrapped in transaction + */ + fun insertAndGetGenres(meta: Metadata): String? { + val gq = GenreQuery(*meta.genres.toTypedArray()) + gq.insertAndGetIds() + return gq.getIds().joinToString(",") + } + + fun getSerieQueryInstance(data: EpisodeInfo?, baseName: String?): SerieQuery? { + if (data == null || baseName == null) return null + return SerieQuery(data.title, data.episode, data.season, data.title, baseName) + } + } \ No newline at end of file