This commit is contained in:
Brage 2023-07-26 16:25:30 +02:00
parent c20ce36c41
commit 605d8630df
5 changed files with 159 additions and 171 deletions

View File

@ -27,7 +27,7 @@ dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha9") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha10")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")

View File

@ -1,65 +0,0 @@
package no.iktdev.streamit.content.reader.collector
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service
import java.io.File
@Service
class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") {
private val logger = KotlinLogging.logger {}
private val listener = object: SimpleMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accepts = listOf(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event)
) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
val workResult = data.value().dataAs(ConvertWork::class.java)
if (!data.value().isSuccessful() || workResult == null) {
return
}
val of = File(workResult.outFile)
val status = transaction {
SubtitleQuery(
title = of.nameWithoutExtension,
language = workResult.language,
collection = workResult.collection,
format = of.extension.uppercase()
).insertAndGetStatus()
}
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) {
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, data.value().referenceId)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")
logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" }
}
}
}
init {
listener.listen()
}
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE)
}
}

View File

@ -1,62 +0,0 @@
package no.iktdev.streamit.content.reader.collector
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import java.io.File
private val logger = KotlinLogging.logger {}
class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") {
private val listener = object: SimpleMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event)
) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
val workResult = data.value().dataAs(ExtractWork::class.java)
if (!data.value().isSuccessful() || workResult == null) {
return
}
val of = File(workResult.outFile)
val status = transaction {
SubtitleQuery(
title = of.nameWithoutExtension,
language = workResult.language,
collection = workResult.collection,
format = of.extension.uppercase()
).insertAndGetStatus()
}
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) {
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, data.value().referenceId)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")
logger.error { "Failed to store ${File(workResult.outFile).absolutePath} subtitle" }
}
}
}
init {
listener.listen()
}
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE)
}
}

View File

@ -0,0 +1,109 @@
package no.iktdev.streamit.content.reader.collector
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service
import java.io.File
private val logger = KotlinLogging.logger {}
@Service
class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") {
private val listener = object: SimpleMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accepts = listOf(
KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event,
KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event
)
) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
val referenceId = data.value().referenceId
if (data.key() == KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) {
val work = data.value().dataAs(ExtractWork::class.java)
if (work == null) {
logger.info { "Event: ${data.key()} value is null" }
} else {
storeExtractWork(referenceId, work)
}
} else if (data.key() == KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) {
val work = data.value().dataAs(ConvertWork::class.java)
if (work == null) {
logger.info { "Event: ${data.key()} value is null" }
} else {
storeConvertWork(referenceId, work)
}
} else {
if (data.value().isSuccessful()) {
logger.warn { "Event: ${data.key()} is not captured" }
} else {
logger.info { "Event: ${data.key()} is not ${StatusType.SUCCESS.name}" }
}
}
}
}
init {
listener.listen()
}
fun produceMessage(referenceId: String, outFile: String, statusType: StatusType, result: Any?) {
if (statusType == StatusType.SUCCESS) {
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, referenceId)
logger.info { "Stored ${File(outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, Message(referenceId, Status(statusType), result), "See log")
logger.error { "Failed to store ${File(outFile).absolutePath} subtitle" }
}
}
fun storeExtractWork(referenceId: String, work: ExtractWork) {
val of = File(work.outFile)
val status = transaction {
SubtitleQuery(
title = of.nameWithoutExtension,
language = work.language,
collection = work.collection,
format = of.extension.uppercase()
)
.insertAndGetStatus()
}
produceMessage(referenceId, work.outFile, if (status) StatusType.SUCCESS else StatusType.ERROR, "Store Extracted: $status")
}
fun storeConvertWork(referenceId: String, work: ConvertWork) {
val of = File(work.outFile)
val status = transaction {
SubtitleQuery(
title = of.nameWithoutExtension,
language = work.language,
collection = work.collection,
format = of.extension.uppercase()
)
.insertAndGetStatus()
}
produceMessage(referenceId, work.outFile, if (status) StatusType.SUCCESS else StatusType.ERROR, "Store Converted: $status")
}
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return DeserializerRegistry.getEventToDeserializer(
KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE,
KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE
)
}
}

View File

@ -6,11 +6,10 @@ import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.Downloader import no.iktdev.streamit.content.common.Downloader
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
import no.iktdev.streamit.content.common.dto.Metadata
import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo
import no.iktdev.streamit.library.db.query.* import no.iktdev.streamit.library.db.query.*
import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener
import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
@ -21,7 +20,7 @@ import java.io.File
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Service @Service
class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent<ResultCollection> { class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent<ResultCollection> {
val listener = CollectorMessageListener<ResultCollection>( val listener = CollectorMessageListener<ResultCollection>(
topic = CommonConfig.kafkaTopic, topic = CommonConfig.kafkaTopic,
@ -43,9 +42,6 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
} }
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> { override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return DeserializerRegistry.getEventToDeserializer(*listener.acceptsFilter.toTypedArray(), listener.initiatorEvent, listener.completionEvent) return DeserializerRegistry.getEventToDeserializer(*listener.acceptsFilter.toTypedArray(), listener.initiatorEvent, listener.completionEvent)
} }
@ -62,23 +58,17 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
logger.error { "Required data is null, as it has either status as non successful or simply missing" } logger.error { "Required data is null, as it has either status as non successful or simply missing" }
return return
} }
val videoFileNameWithExtension = File(encodeWork.outFile).name val videoFileNameWithExtension = File(encodeWork.outFile).name
val iid = transaction {
val contentType = metadata?.type ?: return val serieStatus = if (serieData != null) {
val iid = if (contentType == "movie") transaction { getSerieQueryInstance(serieData, videoFileNameWithExtension)?.insertAndGetStatus() ?: false
} else true
if (serieData == null || metadata?.type == "movie") {
MovieQuery(videoFileNameWithExtension).insertAndGetId() MovieQuery(videoFileNameWithExtension).insertAndGetId()
} else null } else null
}
if (serieData != null) {
val success = transaction {
SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension)
.insertAndGetStatus()
}
if (!success)
return
}
val coverFile = metadata?.cover?.let { coverUrl -> val coverFile = metadata?.cover?.let { coverUrl ->
runBlocking { runBlocking {
@ -90,34 +80,50 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
} }
} }
} }
val metaGenre = metadata.genres
val gq = GenreQuery(*metaGenre.toTypedArray()) // Serie må alltid fullføres før catalog. dette i tilfelle catalog allerede eksisterer og den thrower slik at transaskjonen blir versertert!
val status = try {
transaction { transaction {
gq.insertAndGetIds() val genres = metadata?.let { insertAndGetGenres(it) }
}
val gids = transaction { gq.getIds().joinToString(",") }
val cq = CatalogQuery( val cq = CatalogQuery(
title = fileData.title, title = fileData.title,
cover = coverFile?.name, cover = coverFile?.name,
type = contentType, type = if (serieData == null) "movie" else "serie",
collection = fileData.title, collection = fileData.title,
iid = iid, iid = iid,
genres = gids genres = genres
)
val cid = transaction { cq.insertAndGetId() ?: cq.getId() } ?: return
if (!metadata.summary.isNullOrBlank()) {
val summary = metadata.summary ?: return
transaction {
SummaryQuery(
cid = cid,
language = "eng", // TODO: Fix later,
description = summary
) )
cq.insert()
val cqId = cq.getId() ?: throw RuntimeException("No Catalog id found!")
metadata?.let {
val summary = it.summary
if (summary != null) {
SummaryQuery(cid = cqId, language = "eng", description = summary)
} }
} }
}
} catch (e: Exception) {
e.printStackTrace()
}
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", null) produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", status)
logger.info { "Stored ${encodeWork.outFile} video" } logger.info { "Stored ${encodeWork.outFile} video" }
} }
/**
* Needs to be wrapped in transaction
*/
fun insertAndGetGenres(meta: Metadata): String? {
val gq = GenreQuery(*meta.genres.toTypedArray())
gq.insertAndGetIds()
return gq.getIds().joinToString(",")
}
fun getSerieQueryInstance(data: EpisodeInfo?, baseName: String?): SerieQuery? {
if (data == null || baseName == null) return null
return SerieQuery(data.title, data.episode, data.season, data.title, baseName)
}
} }