diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f29abade..c468bbfe 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -17,6 +17,7 @@ jobs: commonCode: ${{ steps.filter.outputs.commonCode }} reader: ${{ steps.filter.outputs.reader }} encode: ${{ steps.filter.outputs.encode }} + convert: ${{ steps.filter.outputs.convert }} steps: - name: Checkout repository @@ -32,6 +33,8 @@ jobs: - 'Reader/**' encode: - 'Encode/**' + convert: + - 'Convert/**' commonCode: - 'CommonCode/**' # Step to print the outputs from "pre-check" job @@ -41,6 +44,7 @@ jobs: echo "commonCode: ${{ needs.pre-check.outputs.commonCode }}" echo "reader: ${{ needs.pre-check.outputs.reader }}" echo "encode: ${{ needs.pre-check.outputs.encode }}" + echo "convert: ${{ needs.pre-check.outputs.convert }}" build-commoncode: runs-on: ubuntu-latest @@ -196,3 +200,51 @@ jobs: bskjon/mediaprocessing-pymetadata:latest bskjon/mediaprocessing-pymetadata:${{ github.sha }} bskjon/mediaprocessing-pymetadata:${{ steps.docker-tag.outputs.tag }} + + + build-convert: + needs: build-commoncode + if: ${{ needs.pre-check.outputs.convert == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.commonCode == 'true' }} + runs-on: ubuntu-latest + #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Cache CommonCode Gradle dependencies + id: cache-gradle + uses: actions/cache@v2 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('CommonCode/gradle/wrapper/gradle-wrapper.properties') }} + + + - name: Build Convert module + id: build-convert + run: | + cd Convert + chmod +x ./gradlew + ./gradlew build + echo "Build completed" + + + - name: Generate Docker image tag + id: docker-tag + run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" + + - name: Docker login + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: ./Encode + push: true + tags: | + bskjon/mediaprocessing-converter:latest + bskjon/mediaprocessing-converter:${{ github.sha }} + bskjon/mediaprocessing-converter:${{ steps.docker-tag.outputs.tag }} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt index 3cf0730a..e4d41c4b 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt @@ -13,7 +13,9 @@ abstract class DefaultKafkaReader(val subId: String) { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val defaultConsumer = DefaultConsumer(subId = subId) - abstract fun loadDeserializers(): Map> + open fun loadDeserializers(): Map> { + return emptyMap() + } fun produceErrorMessage(event: KafkaEvents, baseMessage: Message, reason: String) { val message = Message( diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt index d3d870d8..1c362194 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt @@ -2,10 +2,35 @@ package no.iktdev.streamit.content.common.streams class SubtitleStreamSelector(val streams: List) { + fun getCandidateForConversion(): List { + val languageGrouped = getDesiredStreams().groupBy { it.tags.language ?: "eng" } + val priority = listOf("subrip", "srt", "webvtt", "vtt", "ass") + + val result = mutableListOf() + for ((language, streams) in languageGrouped) { + val selectedStream = streams.firstOrNull { it.codec_name in priority } + if (selectedStream != null) { + result.add(selectedStream) + } + } + return result + } + fun getDesiredStreams(): List { + val desiredTypes = listOf(SubtitleType.DEFAULT, SubtitleType.CC, SubtitleType.SHD) + val typeGuesser = SubtitleTypeGuesser() val codecFiltered = streams.filter { getFormatToCodec(it.codec_name) != null } - // TODO: Expand and remove stuff like sign and songs etc.. - return codecFiltered + + val mappedToType = codecFiltered.map { typeGuesser.guessType(it) to it }.filter { it.first in desiredTypes } + .groupBy { it.second.tags.language ?: "eng" } + .mapValues { entry -> + val languageStreams = entry.value + val sortedStreams = languageStreams.sortedBy { desiredTypes.indexOf(it.first) } + sortedStreams.firstOrNull()?.second + }.mapNotNull { it.value } + + + return mappedToType } diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleTypeGuesser.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleTypeGuesser.kt new file mode 100644 index 00000000..595a48c0 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleTypeGuesser.kt @@ -0,0 +1,56 @@ +package no.iktdev.streamit.content.common.streams + +/** + * @property SHD is Hard of hearing + * @property CC is Closed-Captions + * @property NON_DIALOGUE is for Signs or Song (as in lyrics) + * @property DEFAULT is default subtitle as dialog + */ +enum class SubtitleType { + SHD, + CC, + NON_DIALOGUE, + DEFAULT +} + +class SubtitleTypeGuesser { + fun guessType(subtitle: SubtitleStream): SubtitleType { + if (subtitle.tags != null && subtitle.tags.title?.isBlank() == false) { + val title = subtitle.tags.title!! + if (title.lowercase().contains("song") + || title.lowercase().contains("songs") + || title.lowercase().contains("sign") + || title.lowercase().contains("signs") + ) { + return SubtitleType.NON_DIALOGUE + } + if (getSubtitleType(title, listOf("cc", "closed caption"), + SubtitleType.CC + ) == SubtitleType.CC + ) return SubtitleType.CC + if (getSubtitleType(title, listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing"), + SubtitleType.SHD + ) == SubtitleType.SHD + ) return SubtitleType.SHD + } + + return SubtitleType.DEFAULT + } + + private fun getSubtitleType(title: String, keys: List, expected: SubtitleType): SubtitleType { + val bracedText = Regex.fromLiteral("[(](?<=\\().*?(?=\\))[)]").find(title) + val brakedText = Regex.fromLiteral("[(](?<=\\().*?(?=\\))[)]").find(title) + + if (bracedText == null || brakedText == null) + return SubtitleType.DEFAULT + + var text = bracedText.value.ifBlank { brakedText.value } + text = Regex.fromLiteral("[\\[\\]()-.,_+]").replace(text, "") + + return if (keys.find { item -> + item.lowercase().contains(text.lowercase()) || text.lowercase().contains(item.lowercase()) + }.isNullOrEmpty()) SubtitleType.DEFAULT else expected + + } +} + diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertRunner.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertRunner.kt new file mode 100644 index 00000000..042c4e12 --- /dev/null +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertRunner.kt @@ -0,0 +1,47 @@ +package no.iktdev.streamit.content.convert + +import no.iktdev.library.subtitle.Syncro +import no.iktdev.library.subtitle.export.Export +import no.iktdev.library.subtitle.reader.BaseReader +import no.iktdev.library.subtitle.reader.Reader +import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork +import java.io.File + +class ConvertRunner(val referenceId: String, val listener: IConvertListener) { + + private fun getReade(inputFile: File): BaseReader? { + return Reader(inputFile).getSubtitleReader() + } + + suspend fun readAndConvert (subtitleInfo: SubtitleInfo) { + val reader = getReade(subtitleInfo.inputFile) + val dialogs = reader?.read() + if (dialogs.isNullOrEmpty()) { + listener.onError(referenceId, subtitleInfo, "Dialogs read from file is null or empty!") + return + } + listener.onStarted(referenceId, subtitleInfo) + + val syncedDialogs = Syncro().sync(dialogs) + + val converted = Export(subtitleInfo.inputFile, syncedDialogs).write() + converted.forEach { + val item = ConvertWork( + inFile = subtitleInfo.inputFile.absolutePath, + collection = subtitleInfo.collection, + language = subtitleInfo.language, + outFile = it.absolutePath + ) + listener.onEnded(referenceId, subtitleInfo, work = item) + } + + } + + +} + +interface IConvertListener { + fun onStarted(referenceId: String, info: SubtitleInfo) + fun onError(referenceId: String, info: SubtitleInfo, message: String) + fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork) +} \ No newline at end of file diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/SubtitleInfo.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/SubtitleInfo.kt new file mode 100644 index 00000000..aa411491 --- /dev/null +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/SubtitleInfo.kt @@ -0,0 +1,9 @@ +package no.iktdev.streamit.content.convert + +import java.io.File + +data class SubtitleInfo( + val inputFile: File, + val collection: String, + val language: String +) \ No newline at end of file diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt index 8cd7a9d6..6bdd7055 100644 --- a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt @@ -1,19 +1,67 @@ package no.iktdev.streamit.content.convert.kafka +import kotlinx.coroutines.launch +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.convert.ConvertRunner +import no.iktdev.streamit.content.convert.IConvertListener +import no.iktdev.streamit.content.convert.SubtitleInfo +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener -import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service +import java.io.File + +private val logger = KotlinLogging.logger {} @Service -class SubtitleConsumer: DefaultKafkaReader("convertHandler") { +class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertListener { - /*init { - object: SimpleMessageListener(topic =b ) - }*/ + private final val listener = object : SimpleMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val referenceId = data.value().referenceId + val workResult = data.value().dataAs(ExtractWork::class.java) - override fun loadDeserializers(): Map> { - TODO("Not yet implemented") + if (workResult?.produceConvertEvent == true) { + val convertWork = SubtitleInfo( + inputFile = File(workResult.outFile), + collection = workResult.collection, + language = workResult.language, + ) + Coroutines.io().launch { + ConvertRunner(referenceId, this@SubtitleConsumer).readAndConvert(convertWork) + } + } else { + logger.info { "Skipping ${data.value().referenceId} ${workResult?.outFile} as it is not a convert candidate" } + } + } + } + + init { + listener.listen() + } + + override fun onStarted(referenceId: String, info: SubtitleInfo) { + produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), info) + } + + override fun onError(referenceId: String, info: SubtitleInfo, message: String) { + produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null) + } + + override fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork) { + produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work) } } \ No newline at end of file diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index ba51f4e4..44ad7a41 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha72") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/VideoEncodeArguments.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/VideoEncodeArguments.kt index bad55ab8..a216eb9a 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/VideoEncodeArguments.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/VideoEncodeArguments.kt @@ -5,7 +5,7 @@ import no.iktdev.streamit.content.reader.preference class VideoEncodeArguments(val video: VideoStream, val index: Int) { - fun isVideoCodecEqual() = video.codec_name == getCorrectCodec() + fun isVideoCodecEqual() = getCodec(video.codec_name) == getCodec(preference.video.codec.lowercase()) fun getVideoArguments(): List { @@ -13,7 +13,7 @@ class VideoEncodeArguments(val video: VideoStream, val index: Int) { if (isVideoCodecEqual()) result.addAll(listOf( "-vcodec", "copy" )) else { - result.addAll(listOf("-c:v", getCorrectCodec())) + result.addAll(listOf("-c:v", getCodec(preference.video.codec.lowercase()))) result.addAll(listOf("-crf", preference.video.threshold.toString())) } if (preference.video.pixelFormatPassthrough.none { it == video.pix_fmt }) { @@ -24,16 +24,13 @@ class VideoEncodeArguments(val video: VideoStream, val index: Int) { } - protected fun getCorrectCodec(): String { - return when(preference.video.codec.lowercase()) { - "hevc" -> "libx265" - "h265" -> "libx265" - "h.265" -> "libx265" - - "h.264" -> "libx264" - "h264" -> "libx264" - - else -> preference.video.codec.lowercase() + protected fun getCodec(name: String): String { + return when(name) { + "hevc", "hevec", "h265", "h.265", "libx265" + -> "libx265" + "h.264", "h264", "libx264" + -> "libx264" + else -> name } } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt index 99a46592..d215dd59 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt @@ -66,6 +66,8 @@ class EncodeArgumentSelector(val collection: String, val inputFile: String, val val availableSubtitleStreams = streams.streams.filterIsInstance() val subtitleStreams = SubtitleStreamSelector(availableSubtitleStreams) + val conversionCandidates = subtitleStreams.getCandidateForConversion() + return subtitleStreams.getDesiredStreams().map { val args = SubtitleEncodeArguments(it, availableSubtitleStreams.indexOf(it)) val language = it.tags.language ?: "eng" @@ -78,6 +80,7 @@ class EncodeArgumentSelector(val collection: String, val inputFile: String, val inFile = inputFile, outFile = outFile.absolutePath, arguments = args.getSubtitleArguments(), + produceConvertEvent = conversionCandidates.contains(it) ) } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt new file mode 100644 index 00000000..7eb5181f --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt @@ -0,0 +1,45 @@ +package no.iktdev.streamit.content.reader.collector + +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.library.db.query.SubtitleQuery +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import org.apache.kafka.clients.consumer.ConsumerRecord +import java.io.File + +class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") { + + private val listener = object: SimpleMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accepts = listOf(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val workResult = data.value().dataAs(ExtractWork::class.java) + if (!data.value().isSuccessful() || workResult == null) { + return + } + + val of = File(workResult.outFile) + SubtitleQuery( + title = of.nameWithoutExtension, + language = workResult.language, + collection = workResult.collection, + format = of.extension.uppercase() + ).insertAndGetStatus() + } + } + + init { + listener.listen() + } + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE) + } +} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt index 54feb555..95525048 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt @@ -1,73 +1,73 @@ package no.iktdev.streamit.content.reader.collector import kotlinx.coroutines.runBlocking +import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader import no.iktdev.streamit.content.common.Downloader -import no.iktdev.streamit.content.common.SequentialKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry -import no.iktdev.streamit.content.common.dto.Metadata -import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo -import no.iktdev.streamit.content.common.dto.reader.FileResult -import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.library.db.query.* -import no.iktdev.streamit.library.db.tables.catalog import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener +import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization -import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import org.springframework.stereotype.Service import java.io.File +private val logger = KotlinLogging.logger {} + @Service -class EncodedVideoConsumer: SequentialKafkaReader("collectorConsumerVideo") { - override val accept: KafkaEvents - get() = KafkaEvents.EVENT_READER_RECEIVED_FILE - override val subAccepts: List - get() = listOf( +class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent { + + val listener = CollectorMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE, + completionEvent = KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE, + acceptsFilter = listOf( KafkaEvents.EVENT_METADATA_OBTAINED, KafkaEvents.EVENT_READER_DETERMINED_SERIE, KafkaEvents.EVENT_READER_DETERMINED_MOVIE, - KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE - ) + ), + listener = this, + eventCollectionClass = ResultCollection::class.java + ) - val listener = object: SequentialMessageListener( - topic = CommonConfig.kafkaTopic, - consumer = defaultConsumer, - accept = accept.event, - subAccepts = subAccepts.map { it.event }, - deserializers = loadDeserializers(), - validity = 86400000, - listener =this - ) {} - init { listener.listen() } - override fun getRequiredMessages(): List { - return listOf(accept.event) + subAccepts.map { it.event } + + + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer(*listener.acceptsFilter.toTypedArray(), listener.initiatorEvent, listener.completionEvent) } - override fun onAllMessagesProcessed(referenceId: String, result: Map) { - val metadata = result[KafkaEvents.EVENT_METADATA_OBTAINED.event]?.data as Metadata? - val fileData = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]?.data as FileResult? - val encodeStatus = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.status?.statusType ?: StatusType.ERROR - val encodeWork = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.data as EncodeWork? + override fun onCollectionCompleted(collection: ResultCollection?) { + val metadata = collection?.getMetadata() + val fileData = collection?.getFileResult() + val encodeWork = collection?.getEncodeWork() + val serieData = collection?.getSerieInfo() + val movieData = collection?.getMovieInfo() - if (fileData == null || encodeStatus != StatusType.SUCCESS || encodeWork == null) { + if (fileData == null || encodeWork == null || collection.getReferenceId() == null) { + logger.error { "Required data is null, as it has either status as non successful or simply missing" } return } val videoFileNameWithExtension = File(encodeWork.outFile).name + val contentType = metadata?.type ?: return val iid = if (contentType == "movie") MovieQuery(videoFileNameWithExtension).insertAndGetId() else null - val serieData = result[KafkaEvents.EVENT_READER_DETERMINED_SERIE.event]?.data as EpisodeInfo? if (serieData != null) { val success = SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension).insertAndGetStatus() if (!success) @@ -106,11 +106,9 @@ class EncodedVideoConsumer: SequentialKafkaReader("collectorConsumerVideo") { description = summary ) } - } - - - override fun loadDeserializers(): Map> { - return DeserializerRegistry.getEventToDeserializer(*subAccepts.toTypedArray()) + val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS)) + produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) + logger.info { "Stored ${metadata.title} video" } } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt index 5468abce..efe948a0 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt @@ -1,27 +1,18 @@ package no.iktdev.streamit.content.reader.collector -import kotlinx.coroutines.runBlocking import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.DefaultKafkaReader -import no.iktdev.streamit.content.common.Downloader -import no.iktdev.streamit.content.common.SequentialKafkaReader import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry -import no.iktdev.streamit.content.common.dto.Metadata - -import no.iktdev.streamit.content.common.dto.reader.FileResult -import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork -import no.iktdev.streamit.library.db.query.* +import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization -import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import org.apache.kafka.clients.consumer.ConsumerRecord import java.io.File -class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerSubtitle") { +class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { private val listener = object: SimpleMessageListener( topic = CommonConfig.kafkaTopic, diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt new file mode 100644 index 00000000..a2c37255 --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ResultCollection.kt @@ -0,0 +1,72 @@ +package no.iktdev.streamit.content.reader.collector + +import no.iktdev.streamit.content.common.deserializers.* +import no.iktdev.streamit.content.common.dto.ContentOutName +import no.iktdev.streamit.content.common.dto.Metadata +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.content.common.dto.reader.MovieInfo +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.collector.DefaultEventCollection +import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful +import org.apache.kafka.clients.consumer.ConsumerRecord + +class ResultCollection: DefaultEventCollection() { + + fun getFirstOrNull(events: KafkaEvents): ConsumerRecord? { + return getRecords().firstOrNull { it.key() == events.event } + } + + fun getReferenceId(): String? { + return getRecords().firstOrNull()?.value()?.referenceId + } + + /** + * @see KafkaEvents.EVENT_READER_RECEIVED_FILE + * @see FileResult for data structure + */ + fun getFileResult(): FileResult? { + val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null + return FileResultDeserializer().deserializeIfSuccessful(record.value()) + } + + /** + * @see KafkaEvents.EVENT_READER_DETERMINED_FILENAME + * @see ContentOutName for data structure + */ + fun getFileName(): ContentOutName? { + val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null + return ContentOutNameDeserializer().deserializeIfSuccessful(record.value()) + } + + /** + * @see KafkaEvents.EVENT_METADATA_OBTAINED and + * @see Metadata for datastructure + */ + fun getMetadata(): Metadata? { + return firstOrNull(KafkaEvents.EVENT_METADATA_OBTAINED)?.let { + MetadataResultDeserializer().deserializeIfSuccessful(it.value()) + } + } + + fun getMovieInfo(): MovieInfo? { + return firstOrNull(KafkaEvents.EVENT_READER_DETERMINED_MOVIE)?.let { + MovieInfoDeserializer().deserializeIfSuccessful(it.value()) + } + } + + fun getSerieInfo(): EpisodeInfo? { + return firstOrNull(KafkaEvents.EVENT_READER_DETERMINED_SERIE)?.let { + EpisodeInfoDeserializer().deserializeIfSuccessful(it.value()) + } + } + + fun getEncodeWork(): EncodeWork? { + return firstOrNull(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE)?.let { + EncodeWorkDeserializer().deserializeIfSuccessful(it.value()) + } + } + +} \ No newline at end of file