Added Convert
This commit is contained in:
parent
3e8d71c682
commit
2f18e7bd31
52
.github/workflows/main.yml
vendored
52
.github/workflows/main.yml
vendored
@ -17,6 +17,7 @@ jobs:
|
||||
commonCode: ${{ steps.filter.outputs.commonCode }}
|
||||
reader: ${{ steps.filter.outputs.reader }}
|
||||
encode: ${{ steps.filter.outputs.encode }}
|
||||
convert: ${{ steps.filter.outputs.convert }}
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
@ -32,6 +33,8 @@ jobs:
|
||||
- 'Reader/**'
|
||||
encode:
|
||||
- 'Encode/**'
|
||||
convert:
|
||||
- 'Convert/**'
|
||||
commonCode:
|
||||
- 'CommonCode/**'
|
||||
# Step to print the outputs from "pre-check" job
|
||||
@ -41,6 +44,7 @@ jobs:
|
||||
echo "commonCode: ${{ needs.pre-check.outputs.commonCode }}"
|
||||
echo "reader: ${{ needs.pre-check.outputs.reader }}"
|
||||
echo "encode: ${{ needs.pre-check.outputs.encode }}"
|
||||
echo "convert: ${{ needs.pre-check.outputs.convert }}"
|
||||
|
||||
build-commoncode:
|
||||
runs-on: ubuntu-latest
|
||||
@ -196,3 +200,51 @@ jobs:
|
||||
bskjon/mediaprocessing-pymetadata:latest
|
||||
bskjon/mediaprocessing-pymetadata:${{ github.sha }}
|
||||
bskjon/mediaprocessing-pymetadata:${{ steps.docker-tag.outputs.tag }}
|
||||
|
||||
|
||||
build-convert:
|
||||
needs: build-commoncode
|
||||
if: ${{ needs.pre-check.outputs.convert == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.commonCode == 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
#if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Cache CommonCode Gradle dependencies
|
||||
id: cache-gradle
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/.gradle/caches
|
||||
key: ${{ runner.os }}-gradle-${{ hashFiles('CommonCode/gradle/wrapper/gradle-wrapper.properties') }}
|
||||
|
||||
|
||||
- name: Build Convert module
|
||||
id: build-convert
|
||||
run: |
|
||||
cd Convert
|
||||
chmod +x ./gradlew
|
||||
./gradlew build
|
||||
echo "Build completed"
|
||||
|
||||
|
||||
- name: Generate Docker image tag
|
||||
id: docker-tag
|
||||
run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
|
||||
|
||||
- name: Docker login
|
||||
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_HUB_NAME }}
|
||||
password: ${{ secrets.DOCKER_HUB_TOKEN }}
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: ./Encode
|
||||
push: true
|
||||
tags: |
|
||||
bskjon/mediaprocessing-converter:latest
|
||||
bskjon/mediaprocessing-converter:${{ github.sha }}
|
||||
bskjon/mediaprocessing-converter:${{ steps.docker-tag.outputs.tag }}
|
||||
@ -13,7 +13,9 @@ abstract class DefaultKafkaReader(val subId: String) {
|
||||
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
|
||||
val defaultConsumer = DefaultConsumer(subId = subId)
|
||||
|
||||
abstract fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>>
|
||||
open fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||
return emptyMap()
|
||||
}
|
||||
|
||||
fun produceErrorMessage(event: KafkaEvents, baseMessage: Message, reason: String) {
|
||||
val message = Message(
|
||||
|
||||
@ -2,10 +2,35 @@ package no.iktdev.streamit.content.common.streams
|
||||
|
||||
class SubtitleStreamSelector(val streams: List<SubtitleStream>) {
|
||||
|
||||
fun getCandidateForConversion(): List<SubtitleStream> {
|
||||
val languageGrouped = getDesiredStreams().groupBy { it.tags.language ?: "eng" }
|
||||
val priority = listOf("subrip", "srt", "webvtt", "vtt", "ass")
|
||||
|
||||
val result = mutableListOf<SubtitleStream>()
|
||||
for ((language, streams) in languageGrouped) {
|
||||
val selectedStream = streams.firstOrNull { it.codec_name in priority }
|
||||
if (selectedStream != null) {
|
||||
result.add(selectedStream)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
fun getDesiredStreams(): List<SubtitleStream> {
|
||||
val desiredTypes = listOf(SubtitleType.DEFAULT, SubtitleType.CC, SubtitleType.SHD)
|
||||
val typeGuesser = SubtitleTypeGuesser()
|
||||
val codecFiltered = streams.filter { getFormatToCodec(it.codec_name) != null }
|
||||
// TODO: Expand and remove stuff like sign and songs etc..
|
||||
return codecFiltered
|
||||
|
||||
val mappedToType = codecFiltered.map { typeGuesser.guessType(it) to it }.filter { it.first in desiredTypes }
|
||||
.groupBy { it.second.tags.language ?: "eng" }
|
||||
.mapValues { entry ->
|
||||
val languageStreams = entry.value
|
||||
val sortedStreams = languageStreams.sortedBy { desiredTypes.indexOf(it.first) }
|
||||
sortedStreams.firstOrNull()?.second
|
||||
}.mapNotNull { it.value }
|
||||
|
||||
|
||||
return mappedToType
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
package no.iktdev.streamit.content.common.streams
|
||||
|
||||
/**
|
||||
* @property SHD is Hard of hearing
|
||||
* @property CC is Closed-Captions
|
||||
* @property NON_DIALOGUE is for Signs or Song (as in lyrics)
|
||||
* @property DEFAULT is default subtitle as dialog
|
||||
*/
|
||||
enum class SubtitleType {
|
||||
SHD,
|
||||
CC,
|
||||
NON_DIALOGUE,
|
||||
DEFAULT
|
||||
}
|
||||
|
||||
class SubtitleTypeGuesser {
|
||||
fun guessType(subtitle: SubtitleStream): SubtitleType {
|
||||
if (subtitle.tags != null && subtitle.tags.title?.isBlank() == false) {
|
||||
val title = subtitle.tags.title!!
|
||||
if (title.lowercase().contains("song")
|
||||
|| title.lowercase().contains("songs")
|
||||
|| title.lowercase().contains("sign")
|
||||
|| title.lowercase().contains("signs")
|
||||
) {
|
||||
return SubtitleType.NON_DIALOGUE
|
||||
}
|
||||
if (getSubtitleType(title, listOf("cc", "closed caption"),
|
||||
SubtitleType.CC
|
||||
) == SubtitleType.CC
|
||||
) return SubtitleType.CC
|
||||
if (getSubtitleType(title, listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing"),
|
||||
SubtitleType.SHD
|
||||
) == SubtitleType.SHD
|
||||
) return SubtitleType.SHD
|
||||
}
|
||||
|
||||
return SubtitleType.DEFAULT
|
||||
}
|
||||
|
||||
private fun getSubtitleType(title: String, keys: List<String>, expected: SubtitleType): SubtitleType {
|
||||
val bracedText = Regex.fromLiteral("[(](?<=\\().*?(?=\\))[)]").find(title)
|
||||
val brakedText = Regex.fromLiteral("[(](?<=\\().*?(?=\\))[)]").find(title)
|
||||
|
||||
if (bracedText == null || brakedText == null)
|
||||
return SubtitleType.DEFAULT
|
||||
|
||||
var text = bracedText.value.ifBlank { brakedText.value }
|
||||
text = Regex.fromLiteral("[\\[\\]()-.,_+]").replace(text, "")
|
||||
|
||||
return if (keys.find { item ->
|
||||
item.lowercase().contains(text.lowercase()) || text.lowercase().contains(item.lowercase())
|
||||
}.isNullOrEmpty()) SubtitleType.DEFAULT else expected
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,47 @@
|
||||
package no.iktdev.streamit.content.convert
|
||||
|
||||
import no.iktdev.library.subtitle.Syncro
|
||||
import no.iktdev.library.subtitle.export.Export
|
||||
import no.iktdev.library.subtitle.reader.BaseReader
|
||||
import no.iktdev.library.subtitle.reader.Reader
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork
|
||||
import java.io.File
|
||||
|
||||
class ConvertRunner(val referenceId: String, val listener: IConvertListener) {
|
||||
|
||||
private fun getReade(inputFile: File): BaseReader? {
|
||||
return Reader(inputFile).getSubtitleReader()
|
||||
}
|
||||
|
||||
suspend fun readAndConvert (subtitleInfo: SubtitleInfo) {
|
||||
val reader = getReade(subtitleInfo.inputFile)
|
||||
val dialogs = reader?.read()
|
||||
if (dialogs.isNullOrEmpty()) {
|
||||
listener.onError(referenceId, subtitleInfo, "Dialogs read from file is null or empty!")
|
||||
return
|
||||
}
|
||||
listener.onStarted(referenceId, subtitleInfo)
|
||||
|
||||
val syncedDialogs = Syncro().sync(dialogs)
|
||||
|
||||
val converted = Export(subtitleInfo.inputFile, syncedDialogs).write()
|
||||
converted.forEach {
|
||||
val item = ConvertWork(
|
||||
inFile = subtitleInfo.inputFile.absolutePath,
|
||||
collection = subtitleInfo.collection,
|
||||
language = subtitleInfo.language,
|
||||
outFile = it.absolutePath
|
||||
)
|
||||
listener.onEnded(referenceId, subtitleInfo, work = item)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
interface IConvertListener {
|
||||
fun onStarted(referenceId: String, info: SubtitleInfo)
|
||||
fun onError(referenceId: String, info: SubtitleInfo, message: String)
|
||||
fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork)
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package no.iktdev.streamit.content.convert
|
||||
|
||||
import java.io.File
|
||||
|
||||
data class SubtitleInfo(
|
||||
val inputFile: File,
|
||||
val collection: String,
|
||||
val language: String
|
||||
)
|
||||
@ -1,19 +1,67 @@
|
||||
package no.iktdev.streamit.content.convert.kafka
|
||||
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ConvertWork
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
|
||||
import no.iktdev.streamit.content.convert.ConvertRunner
|
||||
import no.iktdev.streamit.content.convert.IConvertListener
|
||||
import no.iktdev.streamit.content.convert.SubtitleInfo
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
@Service
|
||||
class SubtitleConsumer: DefaultKafkaReader("convertHandler") {
|
||||
class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertListener {
|
||||
|
||||
/*init {
|
||||
object: SimpleMessageListener(topic =b )
|
||||
}*/
|
||||
private final val listener = object : SimpleMessageListener(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
consumer = defaultConsumer,
|
||||
accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event)
|
||||
) {
|
||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||
val referenceId = data.value().referenceId
|
||||
val workResult = data.value().dataAs(ExtractWork::class.java)
|
||||
|
||||
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||
TODO("Not yet implemented")
|
||||
if (workResult?.produceConvertEvent == true) {
|
||||
val convertWork = SubtitleInfo(
|
||||
inputFile = File(workResult.outFile),
|
||||
collection = workResult.collection,
|
||||
language = workResult.language,
|
||||
)
|
||||
Coroutines.io().launch {
|
||||
ConvertRunner(referenceId, this@SubtitleConsumer).readAndConvert(convertWork)
|
||||
}
|
||||
} else {
|
||||
logger.info { "Skipping ${data.value().referenceId} ${workResult?.outFile} as it is not a convert candidate" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
listener.listen()
|
||||
}
|
||||
|
||||
override fun onStarted(referenceId: String, info: SubtitleInfo) {
|
||||
produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), info)
|
||||
}
|
||||
|
||||
override fun onError(referenceId: String, info: SubtitleInfo, message: String) {
|
||||
produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null)
|
||||
}
|
||||
|
||||
override fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork) {
|
||||
produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work)
|
||||
}
|
||||
|
||||
}
|
||||
@ -23,7 +23,7 @@ repositories {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha72")
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74")
|
||||
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
|
||||
|
||||
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7")
|
||||
|
||||
@ -5,7 +5,7 @@ import no.iktdev.streamit.content.reader.preference
|
||||
|
||||
class VideoEncodeArguments(val video: VideoStream, val index: Int) {
|
||||
|
||||
fun isVideoCodecEqual() = video.codec_name == getCorrectCodec()
|
||||
fun isVideoCodecEqual() = getCodec(video.codec_name) == getCodec(preference.video.codec.lowercase())
|
||||
|
||||
|
||||
fun getVideoArguments(): List<String> {
|
||||
@ -13,7 +13,7 @@ class VideoEncodeArguments(val video: VideoStream, val index: Int) {
|
||||
if (isVideoCodecEqual()) result.addAll(listOf(
|
||||
"-vcodec", "copy"
|
||||
)) else {
|
||||
result.addAll(listOf("-c:v", getCorrectCodec()))
|
||||
result.addAll(listOf("-c:v", getCodec(preference.video.codec.lowercase())))
|
||||
result.addAll(listOf("-crf", preference.video.threshold.toString()))
|
||||
}
|
||||
if (preference.video.pixelFormatPassthrough.none { it == video.pix_fmt }) {
|
||||
@ -24,16 +24,13 @@ class VideoEncodeArguments(val video: VideoStream, val index: Int) {
|
||||
}
|
||||
|
||||
|
||||
protected fun getCorrectCodec(): String {
|
||||
return when(preference.video.codec.lowercase()) {
|
||||
"hevc" -> "libx265"
|
||||
"h265" -> "libx265"
|
||||
"h.265" -> "libx265"
|
||||
|
||||
"h.264" -> "libx264"
|
||||
"h264" -> "libx264"
|
||||
|
||||
else -> preference.video.codec.lowercase()
|
||||
protected fun getCodec(name: String): String {
|
||||
return when(name) {
|
||||
"hevc", "hevec", "h265", "h.265", "libx265"
|
||||
-> "libx265"
|
||||
"h.264", "h264", "libx264"
|
||||
-> "libx264"
|
||||
else -> name
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -66,6 +66,8 @@ class EncodeArgumentSelector(val collection: String, val inputFile: String, val
|
||||
val availableSubtitleStreams = streams.streams.filterIsInstance<SubtitleStream>()
|
||||
val subtitleStreams = SubtitleStreamSelector(availableSubtitleStreams)
|
||||
|
||||
val conversionCandidates = subtitleStreams.getCandidateForConversion()
|
||||
|
||||
return subtitleStreams.getDesiredStreams().map {
|
||||
val args = SubtitleEncodeArguments(it, availableSubtitleStreams.indexOf(it))
|
||||
val language = it.tags.language ?: "eng"
|
||||
@ -78,6 +80,7 @@ class EncodeArgumentSelector(val collection: String, val inputFile: String, val
|
||||
inFile = inputFile,
|
||||
outFile = outFile.absolutePath,
|
||||
arguments = args.getSubtitleArguments(),
|
||||
produceConvertEvent = conversionCandidates.contains(it)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
package no.iktdev.streamit.content.reader.collector
|
||||
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
|
||||
import no.iktdev.streamit.library.db.query.SubtitleQuery
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import java.io.File
|
||||
|
||||
class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConvertedSubtitle") {
|
||||
|
||||
private val listener = object: SimpleMessageListener(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
consumer = defaultConsumer,
|
||||
accepts = listOf(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event)
|
||||
) {
|
||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||
val workResult = data.value().dataAs(ExtractWork::class.java)
|
||||
if (!data.value().isSuccessful() || workResult == null) {
|
||||
return
|
||||
}
|
||||
|
||||
val of = File(workResult.outFile)
|
||||
SubtitleQuery(
|
||||
title = of.nameWithoutExtension,
|
||||
language = workResult.language,
|
||||
collection = workResult.collection,
|
||||
format = of.extension.uppercase()
|
||||
).insertAndGetStatus()
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
listener.listen()
|
||||
}
|
||||
|
||||
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||
return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE)
|
||||
}
|
||||
}
|
||||
@ -1,73 +1,73 @@
|
||||
package no.iktdev.streamit.content.reader.collector
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
import no.iktdev.streamit.content.common.Downloader
|
||||
import no.iktdev.streamit.content.common.SequentialKafkaReader
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.Metadata
|
||||
import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo
|
||||
import no.iktdev.streamit.content.common.dto.reader.FileResult
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.library.db.query.*
|
||||
import no.iktdev.streamit.library.db.tables.catalog
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||
import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener
|
||||
import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent
|
||||
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
|
||||
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
@Service
|
||||
class EncodedVideoConsumer: SequentialKafkaReader("collectorConsumerVideo") {
|
||||
override val accept: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_READER_RECEIVED_FILE
|
||||
override val subAccepts: List<KafkaEvents>
|
||||
get() = listOf(
|
||||
class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), ICollectedMessagesEvent<ResultCollection> {
|
||||
|
||||
val listener = CollectorMessageListener<ResultCollection>(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
consumer = defaultConsumer,
|
||||
initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE,
|
||||
completionEvent = KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE,
|
||||
acceptsFilter = listOf(
|
||||
KafkaEvents.EVENT_METADATA_OBTAINED,
|
||||
KafkaEvents.EVENT_READER_DETERMINED_SERIE,
|
||||
KafkaEvents.EVENT_READER_DETERMINED_MOVIE,
|
||||
KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE
|
||||
),
|
||||
listener = this,
|
||||
eventCollectionClass = ResultCollection::class.java
|
||||
)
|
||||
|
||||
|
||||
val listener = object: SequentialMessageListener(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
consumer = defaultConsumer,
|
||||
accept = accept.event,
|
||||
subAccepts = subAccepts.map { it.event },
|
||||
deserializers = loadDeserializers(),
|
||||
validity = 86400000,
|
||||
listener =this
|
||||
) {}
|
||||
|
||||
init {
|
||||
listener.listen()
|
||||
}
|
||||
|
||||
|
||||
override fun getRequiredMessages(): List<String> {
|
||||
return listOf(accept.event) + subAccepts.map { it.event }
|
||||
|
||||
|
||||
|
||||
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||
return DeserializerRegistry.getEventToDeserializer(*listener.acceptsFilter.toTypedArray(), listener.initiatorEvent, listener.completionEvent)
|
||||
}
|
||||
|
||||
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
|
||||
val metadata = result[KafkaEvents.EVENT_METADATA_OBTAINED.event]?.data as Metadata?
|
||||
val fileData = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]?.data as FileResult?
|
||||
val encodeStatus = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.status?.statusType ?: StatusType.ERROR
|
||||
val encodeWork = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.data as EncodeWork?
|
||||
override fun onCollectionCompleted(collection: ResultCollection?) {
|
||||
val metadata = collection?.getMetadata()
|
||||
val fileData = collection?.getFileResult()
|
||||
val encodeWork = collection?.getEncodeWork()
|
||||
val serieData = collection?.getSerieInfo()
|
||||
val movieData = collection?.getMovieInfo()
|
||||
|
||||
|
||||
if (fileData == null || encodeStatus != StatusType.SUCCESS || encodeWork == null) {
|
||||
if (fileData == null || encodeWork == null || collection.getReferenceId() == null) {
|
||||
logger.error { "Required data is null, as it has either status as non successful or simply missing" }
|
||||
return
|
||||
}
|
||||
|
||||
val videoFileNameWithExtension = File(encodeWork.outFile).name
|
||||
|
||||
|
||||
val contentType = metadata?.type ?: return
|
||||
val iid = if (contentType == "movie") MovieQuery(videoFileNameWithExtension).insertAndGetId() else null
|
||||
|
||||
val serieData = result[KafkaEvents.EVENT_READER_DETERMINED_SERIE.event]?.data as EpisodeInfo?
|
||||
if (serieData != null) {
|
||||
val success = SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension).insertAndGetStatus()
|
||||
if (!success)
|
||||
@ -106,11 +106,9 @@ class EncodedVideoConsumer: SequentialKafkaReader("collectorConsumerVideo") {
|
||||
description = summary
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||
return DeserializerRegistry.getEventToDeserializer(*subAccepts.toTypedArray())
|
||||
val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS))
|
||||
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)
|
||||
logger.info { "Stored ${metadata.title} video" }
|
||||
}
|
||||
}
|
||||
@ -1,27 +1,18 @@
|
||||
package no.iktdev.streamit.content.reader.collector
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
import no.iktdev.streamit.content.common.Downloader
|
||||
import no.iktdev.streamit.content.common.SequentialKafkaReader
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.Metadata
|
||||
|
||||
import no.iktdev.streamit.content.common.dto.reader.FileResult
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
|
||||
import no.iktdev.streamit.library.db.query.*
|
||||
import no.iktdev.streamit.library.db.query.SubtitleQuery
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
|
||||
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import java.io.File
|
||||
|
||||
class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerSubtitle") {
|
||||
class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") {
|
||||
|
||||
private val listener = object: SimpleMessageListener(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
|
||||
@ -0,0 +1,72 @@
|
||||
package no.iktdev.streamit.content.reader.collector
|
||||
|
||||
import no.iktdev.streamit.content.common.deserializers.*
|
||||
import no.iktdev.streamit.content.common.dto.ContentOutName
|
||||
import no.iktdev.streamit.content.common.dto.Metadata
|
||||
import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo
|
||||
import no.iktdev.streamit.content.common.dto.reader.FileResult
|
||||
import no.iktdev.streamit.content.common.dto.reader.MovieInfo
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.listener.collector.DefaultEventCollection
|
||||
import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
|
||||
class ResultCollection: DefaultEventCollection() {
|
||||
|
||||
fun getFirstOrNull(events: KafkaEvents): ConsumerRecord<String, Message>? {
|
||||
return getRecords().firstOrNull { it.key() == events.event }
|
||||
}
|
||||
|
||||
fun getReferenceId(): String? {
|
||||
return getRecords().firstOrNull()?.value()?.referenceId
|
||||
}
|
||||
|
||||
/**
|
||||
* @see KafkaEvents.EVENT_READER_RECEIVED_FILE
|
||||
* @see FileResult for data structure
|
||||
*/
|
||||
fun getFileResult(): FileResult? {
|
||||
val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null
|
||||
return FileResultDeserializer().deserializeIfSuccessful(record.value())
|
||||
}
|
||||
|
||||
/**
|
||||
* @see KafkaEvents.EVENT_READER_DETERMINED_FILENAME
|
||||
* @see ContentOutName for data structure
|
||||
*/
|
||||
fun getFileName(): ContentOutName? {
|
||||
val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null
|
||||
return ContentOutNameDeserializer().deserializeIfSuccessful(record.value())
|
||||
}
|
||||
|
||||
/**
|
||||
* @see KafkaEvents.EVENT_METADATA_OBTAINED and
|
||||
* @see Metadata for datastructure
|
||||
*/
|
||||
fun getMetadata(): Metadata? {
|
||||
return firstOrNull(KafkaEvents.EVENT_METADATA_OBTAINED)?.let {
|
||||
MetadataResultDeserializer().deserializeIfSuccessful(it.value())
|
||||
}
|
||||
}
|
||||
|
||||
fun getMovieInfo(): MovieInfo? {
|
||||
return firstOrNull(KafkaEvents.EVENT_READER_DETERMINED_MOVIE)?.let {
|
||||
MovieInfoDeserializer().deserializeIfSuccessful(it.value())
|
||||
}
|
||||
}
|
||||
|
||||
fun getSerieInfo(): EpisodeInfo? {
|
||||
return firstOrNull(KafkaEvents.EVENT_READER_DETERMINED_SERIE)?.let {
|
||||
EpisodeInfoDeserializer().deserializeIfSuccessful(it.value())
|
||||
}
|
||||
}
|
||||
|
||||
fun getEncodeWork(): EncodeWork? {
|
||||
return firstOrNull(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE)?.let {
|
||||
EncodeWorkDeserializer().deserializeIfSuccessful(it.value())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user