diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c4469563..ccf6e74e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -106,9 +106,9 @@ jobs: context: ./Encode push: true tags: | - bskjon/media-processing--encode:latest - bskjon/media-processing--encode:${{ github.sha }} - bskjon/media-processing--encode:${{ steps.docker-tag.outputs.tag }} + bskjon/media-processing-encode:latest + bskjon/media-processing-encode:${{ github.sha }} + bskjon/media-processing-encode:${{ steps.docker-tag.outputs.tag }} build-reader: needs: build-commoncode diff --git a/CommonCode/build.gradle.kts b/CommonCode/build.gradle.kts index b9574453..4cb1a560 100644 --- a/CommonCode/build.gradle.kts +++ b/CommonCode/build.gradle.kts @@ -7,12 +7,26 @@ version = "1.0-SNAPSHOT" repositories { mavenCentral() + maven("https://jitpack.io") + maven { + url = uri("https://reposilite.iktdev.no/releases") + } + maven { + url = uri("https://reposilite.iktdev.no/snapshots") + } } dependencies { implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") + implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + + implementation("com.google.code.gson:gson:2.8.9") + implementation("org.json:json:20230227") + + testImplementation("junit:junit:4.13.2") testImplementation("org.junit.jupiter:junit-jupiter") diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/CommonConfig.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/CommonConfig.kt index 5e358952..b533ee99 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/CommonConfig.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/CommonConfig.kt @@ -4,6 +4,6 @@ import java.io.File object CommonConfig { var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" - var incomingContent: File? = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") - + var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") + val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/DefaultKafkaReader.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt similarity index 62% rename from Reader/src/main/kotlin/no/iktdev/streamit/content/reader/DefaultKafkaReader.kt rename to CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt index a6cafb70..3cf0730a 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/DefaultKafkaReader.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt @@ -1,4 +1,4 @@ -package no.iktdev.streamit.content.reader +package no.iktdev.streamit.content.common import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.library.kafka.KafkaEvents @@ -6,25 +6,34 @@ import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.producer.DefaultProducer abstract class DefaultKafkaReader(val subId: String) { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val defaultConsumer = DefaultConsumer(subId = subId) - fun produceErrorMessage(baseMessage: Message, reason: String) { + abstract fun loadDeserializers(): Map> + + fun produceErrorMessage(event: KafkaEvents, baseMessage: Message, reason: String) { val message = Message( referenceId = baseMessage.referenceId, - actionType = baseMessage.actionType, Status(statusType = StatusType.ERROR, message = reason) ) - messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message) + messageProducer.sendMessage(event.event, message) + } + + fun produceErrorMessage(event: KafkaEvents, referenceId: String, reason: String) { + val message = Message( + referenceId = referenceId, + Status(statusType = StatusType.ERROR, message = reason) + ) + messageProducer.sendMessage(event.event, message) } fun produceMessage(event: KafkaEvents, baseMessage: Message, data: Any?) { val message = Message( referenceId = baseMessage.referenceId, - actionType = baseMessage.actionType, Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED), data = data ) diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/Downloader.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/Downloader.kt new file mode 100644 index 00000000..19023a23 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/Downloader.kt @@ -0,0 +1,96 @@ +package no.iktdev.streamit.content.common + +import no.iktdev.exfl.using +import java.io.File +import java.io.FileOutputStream +import java.net.HttpURLConnection +import java.net.URL +import kotlin.math.sign + +open class Downloader(val url: String, val outDir: File, val baseName: String) { + protected val http: HttpURLConnection = openConnection() + private val BUFFER_SIZE = 4096 + + private fun openConnection(): HttpURLConnection { + try { + return URL(url).openConnection() as HttpURLConnection + } catch (e: Exception) { + e.printStackTrace() + throw BadAddressException("Provided url is either not provided (null) or is not a valid http url") + } + } + + protected fun getLength(): Int { + return http.contentLength + } + + protected fun getProgress(read: Int, total: Int = getLength()): Int { + return ((read * 100) / total) + } + + suspend fun download(): File? { + val extension = getExtension() + ?: throw UnsupportedFormatException("Provided url does not contain a supported file extension") + val outFile = outDir.using("$baseName.$extension") + val inputStream = http.inputStream + val fos = FileOutputStream(outFile, false) + + var totalBytesRead = 0 + val buffer = ByteArray(BUFFER_SIZE) + inputStream.apply { + fos.use { fout -> + run { + var bytesRead = read(buffer) + while (bytesRead >= 0) { + fout.write(buffer, 0, bytesRead) + totalBytesRead += bytesRead + bytesRead = read(buffer) + // System.out.println(getProgress(totalBytesRead)) + } + } + } + } + inputStream.close() + fos.close() + return outFile + } + + open fun getExtension(): String? { + val possiblyExtension = url.lastIndexOf(".") + 1 + return if (possiblyExtension > 1) { + return url.toString().substring(possiblyExtension) + } else { + val mimeType = http.contentType ?: null + contentTypeToExtension()[mimeType] + } + } + + open fun contentTypeToExtension(): Map { + return mapOf( + "image/png" to "png", + "image/jpeg" to "jpg", + "image/webp" to "webp", + "image/bmp" to "bmp", + "image/tiff" to "tiff" + ) + } + + + class BadAddressException : java.lang.Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } + + class UnsupportedFormatException : Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } + + class InvalidFileException : Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/SequentialKafkaReader.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/SequentialKafkaReader.kt new file mode 100644 index 00000000..53eb2f4b --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/SequentialKafkaReader.kt @@ -0,0 +1,20 @@ +package no.iktdev.streamit.content.common + +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent +import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener +import no.iktdev.streamit.library.kafka.producer.DefaultProducer + +abstract class SequentialKafkaReader(subId: String): DefaultKafkaReader(subId), ISequentialMessageEvent { + + abstract val accept: KafkaEvents + abstract val subAccepts: List + + +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt index f8ae4d8f..7c50f532 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt @@ -7,9 +7,9 @@ import mu.KotlinLogging private val logger = KotlinLogging.logger {} -class Daemon(open val executable: String, val parameters: List, val daemonInterface: IDaemon) { +open class Daemon(open val executable: String, val daemonInterface: IDaemon) { var executor: ProcessResult? = null - suspend fun run(): Int { + open suspend fun run(parameters: List): Int { daemonInterface.onStarted() executor = process(executable, *parameters.toTypedArray(), stdout = Redirect.CAPTURE, @@ -20,7 +20,7 @@ class Daemon(open val executable: String, val parameters: List, val daem val resultCode = executor?.resultCode ?: -1 if (resultCode == 0) { daemonInterface.onEnded() - } else daemonInterface.onError() + } else daemonInterface.onError(resultCode) return resultCode } } \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/IDaemon.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/IDaemon.kt index fd29328b..0f8316e6 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/IDaemon.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/IDaemon.kt @@ -8,6 +8,6 @@ interface IDaemon { fun onEnded() {} - fun onError() + fun onError(code: Int) } \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ContentOutNameDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ContentOutNameDeserializer.kt new file mode 100644 index 00000000..322a2909 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ContentOutNameDeserializer.kt @@ -0,0 +1,11 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.ContentOutName +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class ContentOutNameDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): ContentOutName? { + return incomingMessage.dataAs(ContentOutName::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt new file mode 100644 index 00000000..4be9aab1 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/DeserializerRegistry.kt @@ -0,0 +1,36 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class DeserializerRegistry { + companion object { + private val _registry = mutableMapOf>( + KafkaEvents.EVENT_READER_RECEIVED_FILE to FileResultDeserializer(), + KafkaEvents.EVENT_METADATA_OBTAINED to MetadataResultDeserializer(), + KafkaEvents.EVENT_READER_DETERMINED_SERIE to EpisodeInfoDeserializer(), + KafkaEvents.EVENT_READER_DETERMINED_MOVIE to MovieInfoDeserializer(), + + KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO to EncodeWorkDeserializer(), + KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE to EncodeWorkDeserializer(), + KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE to ExtractWorkDeserializer(), + KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE to ExtractWorkDeserializer(), + + ) + fun getRegistry(): Map> = _registry.toMap() + fun getEventToDeserializer(vararg keys: KafkaEvents): Map> { + val missingFields = keys.filter { !getRegistry().keys.contains(it) } + + if (missingFields.isNotEmpty()) { + throw MissingDeserializerException("Missing deserializers for: ${missingFields.joinToString(", ")}") + } + return getRegistry().filter { keys.contains(it.key) }.map { it.key.event to it.value }.toMap() + } + fun addDeserializer(key: KafkaEvents, deserializer: IMessageDataDeserialization<*>) { + _registry[key] = deserializer + } + + } +} + +class MissingDeserializerException(override val message: String): RuntimeException() \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EncodeWorkDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EncodeWorkDeserializer.kt new file mode 100644 index 00000000..847fa1f5 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EncodeWorkDeserializer.kt @@ -0,0 +1,11 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class EncodeWorkDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): EncodeWork? { + return incomingMessage.dataAs(EncodeWork::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EpisodeInfoDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EpisodeInfoDeserializer.kt new file mode 100644 index 00000000..8c6cddf8 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/EpisodeInfoDeserializer.kt @@ -0,0 +1,11 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class EpisodeInfoDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): EpisodeInfo? { + return incomingMessage.dataAs(EpisodeInfo::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ExtractWorkDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ExtractWorkDeserializer.kt new file mode 100644 index 00000000..155fb56b --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/ExtractWorkDeserializer.kt @@ -0,0 +1,12 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class ExtractWorkDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): ExtractWork? { + return incomingMessage.dataAs(ExtractWork::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/FileResultDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/FileResultDeserializer.kt new file mode 100644 index 00000000..97809fe4 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/FileResultDeserializer.kt @@ -0,0 +1,13 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class FileResultDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): FileResult? { + return incomingMessage.dataAs(FileResult::class.java) + } +} diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MediaStreamsDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MediaStreamsDeserializer.kt new file mode 100644 index 00000000..f2248f32 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MediaStreamsDeserializer.kt @@ -0,0 +1,47 @@ +package no.iktdev.streamit.content.common.deserializers + +import com.google.gson.Gson +import com.google.gson.JsonObject +import no.iktdev.streamit.content.common.streams.AudioStream +import no.iktdev.streamit.content.common.streams.MediaStreams +import no.iktdev.streamit.content.common.streams.SubtitleStream +import no.iktdev.streamit.content.common.streams.VideoStream +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class MediaStreamsDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): MediaStreams? { + return try { + val gson = Gson() + val jsonObject = if (incomingMessage.data is String) { + gson.fromJson(incomingMessage.data as String, JsonObject::class.java) + } else { + gson.fromJson(incomingMessage.dataAsJson(), JsonObject::class.java) + } + + val streamsJsonArray = jsonObject.getAsJsonArray("streams") + + val rstreams = streamsJsonArray.mapNotNull { streamJson -> + val streamObject = streamJson.asJsonObject + + val codecType = streamObject.get("codec_type").asString + if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") { + null + } else { + when (codecType) { + "video" -> gson.fromJson(streamObject, VideoStream::class.java) + "audio" -> gson.fromJson(streamObject, AudioStream::class.java) + "subtitle" -> gson.fromJson(streamObject, SubtitleStream::class.java) + else -> null //throw IllegalArgumentException("Unknown stream type: $codecType") + } + } + } + + return MediaStreams(rstreams) + } catch (e: Exception) { + e.printStackTrace() + null + } + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MetadataResultDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MetadataResultDeserializer.kt new file mode 100644 index 00000000..d0ee0b5e --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MetadataResultDeserializer.kt @@ -0,0 +1,11 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.Metadata +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class MetadataResultDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): Metadata? { + return incomingMessage.dataAs(Metadata::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MovieInfoDeserializer.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MovieInfoDeserializer.kt new file mode 100644 index 00000000..fcc7b2b0 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deserializers/MovieInfoDeserializer.kt @@ -0,0 +1,11 @@ +package no.iktdev.streamit.content.common.deserializers + +import no.iktdev.streamit.content.common.dto.reader.MovieInfo +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization + +class MovieInfoDeserializer: IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): MovieInfo? { + return incomingMessage.dataAs(MovieInfo::class.java) + } +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/EpisodeInfo.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/EpisodeInfo.kt new file mode 100644 index 00000000..ce8fc0f1 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/EpisodeInfo.kt @@ -0,0 +1,9 @@ +package no.iktdev.streamit.content.common.dto.reader + +data class EpisodeInfo( + val title: String, + val episode: Int, + val season: Int, + val episodeTitle: String?, + override val fullName: String +): VideoInfo(fullName) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/FileResult.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/FileResult.kt new file mode 100644 index 00000000..30d4685a --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/FileResult.kt @@ -0,0 +1,7 @@ +package no.iktdev.streamit.content.common.dto.reader + +data class FileResult( + val file: String, + val title: String = "", + val sanitizedName: String = "" +) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/MovieInfo.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/MovieInfo.kt new file mode 100644 index 00000000..4c2b6794 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/MovieInfo.kt @@ -0,0 +1,6 @@ +package no.iktdev.streamit.content.common.dto.reader + +data class MovieInfo( + val title: String, + override val fullName: String +) : VideoInfo(fullName) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/SubtitleInfo.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/SubtitleInfo.kt new file mode 100644 index 00000000..8d6a5bae --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/SubtitleInfo.kt @@ -0,0 +1,4 @@ +package no.iktdev.streamit.content.common.dto.reader + +class SubtitleInfo { +} \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/VideoInfo.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/VideoInfo.kt new file mode 100644 index 00000000..1bed99bc --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/VideoInfo.kt @@ -0,0 +1,5 @@ +package no.iktdev.streamit.content.common.dto.reader + +abstract class VideoInfo( + @Transient open val fullName: String +) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ConvertWork.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ConvertWork.kt new file mode 100644 index 00000000..4bcdf4f8 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ConvertWork.kt @@ -0,0 +1,8 @@ +package no.iktdev.streamit.content.common.dto.reader.work + +data class ConvertWork( + override val collection: String, + val language: String, + override val inFile: String, + override val outFile: String, +) : WorkBase(collection = collection, inFile = inFile, outFile = outFile) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/EncodeWork.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/EncodeWork.kt new file mode 100644 index 00000000..b43b4d49 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/EncodeWork.kt @@ -0,0 +1,8 @@ +package no.iktdev.streamit.content.common.dto.reader.work + +data class EncodeWork( + override val collection: String, + override val inFile: String, + override val outFile: String, + val arguments: List +) : WorkBase(collection = collection, inFile = inFile, outFile = outFile) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ExtractWork.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ExtractWork.kt new file mode 100644 index 00000000..f6c5c413 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/ExtractWork.kt @@ -0,0 +1,10 @@ +package no.iktdev.streamit.content.common.dto.reader.work + +data class ExtractWork( + override val collection: String, + val language: String, + override val inFile: String, + val arguments: List, + override val outFile: String, + var produceConvertEvent: Boolean = true +) : WorkBase(collection = collection, inFile = inFile, outFile = outFile) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/WorkBase.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/WorkBase.kt new file mode 100644 index 00000000..a162ebcf --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/dto/reader/work/WorkBase.kt @@ -0,0 +1,10 @@ +package no.iktdev.streamit.content.common.dto.reader.work + +import java.util.UUID + +abstract class WorkBase( + @Transient open val workId: String = UUID.randomUUID().toString(), + @Transient open val collection: String, + @Transient open val inFile: String, + @Transient open val outFile: String +) \ No newline at end of file diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt new file mode 100644 index 00000000..d3d870d8 --- /dev/null +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/streams/SubtitleStreamSelector.kt @@ -0,0 +1,22 @@ +package no.iktdev.streamit.content.common.streams + +class SubtitleStreamSelector(val streams: List) { + + fun getDesiredStreams(): List { + val codecFiltered = streams.filter { getFormatToCodec(it.codec_name) != null } + // TODO: Expand and remove stuff like sign and songs etc.. + return codecFiltered + } + + + fun getFormatToCodec(codecName: String): String? { + return when(codecName) { + "ass" -> "ass" + "subrip" -> "srt" + "webvtt", "vtt" -> "vtt" + "smi" -> "smi" + "hdmv_pgs_subtitle" -> null + else -> null + } + } +} \ No newline at end of file diff --git a/Convert/gradle/wrapper/gradle-wrapper.jar b/Convert/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 00000000..249e5832 Binary files /dev/null and b/Convert/gradle/wrapper/gradle-wrapper.jar differ diff --git a/Convert/gradle/wrapper/gradle-wrapper.properties b/Convert/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..249c54e8 --- /dev/null +++ b/Convert/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sun Jul 23 01:48:17 CEST 2023 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/Convert/gradlew b/Convert/gradlew new file mode 100644 index 00000000..1b6c7873 --- /dev/null +++ b/Convert/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/Convert/gradlew.bat b/Convert/gradlew.bat new file mode 100644 index 00000000..107acd32 --- /dev/null +++ b/Convert/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertApplication.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertApplication.kt new file mode 100644 index 00000000..4486842d --- /dev/null +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertApplication.kt @@ -0,0 +1,19 @@ +package no.iktdev.streamit.content.convert + +import mu.KotlinLogging +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.runApplication +import org.springframework.context.ApplicationContext + +@SpringBootApplication +class ConvertApplication + +private var context: ApplicationContext? = null +@Suppress("unused") +fun getContext(): ApplicationContext? { + return context +} +fun main(args: Array) { + context = runApplication(*args) +} +private val logger = KotlinLogging.logger {} \ No newline at end of file diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertEnv.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertEnv.kt new file mode 100644 index 00000000..5852786c --- /dev/null +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/ConvertEnv.kt @@ -0,0 +1,4 @@ +package no.iktdev.streamit.content.convert + +class ConvertEnv { +} \ No newline at end of file diff --git a/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt new file mode 100644 index 00000000..8cd7a9d6 --- /dev/null +++ b/Convert/src/main/kotlin/no/iktdev/streamit/content/convert/kafka/SubtitleConsumer.kt @@ -0,0 +1,19 @@ +package no.iktdev.streamit.content.convert.kafka + +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import org.springframework.stereotype.Service + +@Service +class SubtitleConsumer: DefaultKafkaReader("convertHandler") { + + /*init { + object: SimpleMessageListener(topic =b ) + }*/ + + override fun loadDeserializers(): Map> { + TODO("Not yet implemented") + } + +} \ No newline at end of file diff --git a/Encode/Dockerfile b/Encode/Dockerfile new file mode 100644 index 00000000..279d2d06 --- /dev/null +++ b/Encode/Dockerfile @@ -0,0 +1,4 @@ +FROM bskjon/debian-azuljava17-ffmpeg:latest +EXPOSE 8080 + +COPY ./build/libs/encode.jar /usr/share/app/app.jar \ No newline at end of file diff --git a/Encode/build.gradle.kts b/Encode/build.gradle.kts index fe3ef1fd..6b5d83e2 100644 --- a/Encode/build.gradle.kts +++ b/Encode/build.gradle.kts @@ -1,5 +1,8 @@ plugins { kotlin("jvm") version "1.8.21" + id("org.springframework.boot") version "2.5.5" + id("io.spring.dependency-management") version "1.0.11.RELEASE" + kotlin("plugin.spring") version "1.5.31" } group = "no.iktdev.streamit.content" @@ -7,9 +10,32 @@ version = "1.0-SNAPSHOT" repositories { mavenCentral() + maven("https://jitpack.io") + maven { + url = uri("https://reposilite.iktdev.no/releases") + } + maven { + url = uri("https://reposilite.iktdev.no/snapshots") + } } - dependencies { + implementation(project(":CommonCode")) + + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") + implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") + + implementation("com.github.pgreze:kotlin-process:1.3.1") + implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") + + implementation("com.google.code.gson:gson:2.8.9") + + implementation("org.springframework.boot:spring-boot-starter-web") + implementation("org.springframework.boot:spring-boot-starter:2.7.0") + implementation("org.springframework.kafka:spring-kafka:2.8.5") + + testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation("org.junit.jupiter:junit-jupiter") } diff --git a/Encode/settings.gradle.kts b/Encode/settings.gradle.kts index 2b84cfd8..783ff466 100644 --- a/Encode/settings.gradle.kts +++ b/Encode/settings.gradle.kts @@ -1,2 +1,4 @@ rootProject.name = "Encode" +include(":CommonCode") +project(":CommonCode").projectDir = File("../CommonCode") \ No newline at end of file diff --git a/Encode/src/main/kotlin/EncodeEnv.kt b/Encode/src/main/kotlin/EncodeEnv.kt new file mode 100644 index 00000000..04344d8e --- /dev/null +++ b/Encode/src/main/kotlin/EncodeEnv.kt @@ -0,0 +1,7 @@ +class EncodeEnv { + companion object { + val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" + val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false + val maxRunners: Int = System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 + } +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/EncoderApplication.kt b/Encode/src/main/kotlin/EncoderApplication.kt new file mode 100644 index 00000000..1baedf30 --- /dev/null +++ b/Encode/src/main/kotlin/EncoderApplication.kt @@ -0,0 +1,28 @@ +import mu.KotlinLogging +import no.iktdev.exfl.observable.ObservableMap +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.runApplication +import org.springframework.context.ApplicationContext + +@SpringBootApplication +class EncoderApplication + +private var context: ApplicationContext? = null + +@Suppress("unused") +fun getContext(): ApplicationContext? { + return context +} +fun main(args: Array) { + context = runApplication(*args) +} +private val logger = KotlinLogging.logger {} + +/*val progress = ObservableMap().also { + it.addListener(object: ObservableMap.Listener { + override fun onPut(key: String, value: EncodeInformation) { + super.onPut(key, value) + logger.info { "$key with progress: $value." } + } + }) +}*/ \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt new file mode 100644 index 00000000..f5905aec --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncodeWorkConsumer.kt @@ -0,0 +1,54 @@ +package no.iktdev.streamit.content.encode + +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.deserializers.EncodeWorkDeserializer +import no.iktdev.streamit.content.encode.runner.RunnerCoordinator +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.stereotype.Service + +@Service +class EncodeWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("encodeWork") { + lateinit var encodeInstructionsListener: EncodeInformationListener + + init { + encodeInstructionsListener = EncodeInformationListener( + topic = CommonConfig.kafkaTopic, + defaultConsumer, + accepts = listOf(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO.event), + runnerCoordinator + ) + encodeInstructionsListener.listen() + } + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer( + KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO + ) + } + + + class EncodeInformationListener( + topic: String, + consumer: DefaultConsumer, + accepts: List, + val runnerCoordinator: RunnerCoordinator + ) : SimpleMessageListener( + topic, consumer, + accepts + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val message = data.value().apply { + this.data = EncodeWorkDeserializer().deserializeIfSuccessful(data.value()) + } + runnerCoordinator.addEncodeMessageToQueue(message) + } + } +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt new file mode 100644 index 00000000..a4b673d4 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/ExtractWorkConsumer.kt @@ -0,0 +1,55 @@ +package no.iktdev.streamit.content.encode + +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.deserializers.ExtractWorkDeserializer +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.encode.runner.RunnerCoordinator +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.stereotype.Service + +@Service +class ExtractWorkConsumer(private val runnerCoordinator: RunnerCoordinator) : DefaultKafkaReader("extractWork") { + lateinit var encodeInstructionsListener: ExtractWorkListener + + init { + encodeInstructionsListener = ExtractWorkListener( + topic = CommonConfig.kafkaTopic, + defaultConsumer, + accepts = listOf(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE.event), + runnerCoordinator + ) + encodeInstructionsListener.listen() + } + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer( + KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE + ) + } + + + class ExtractWorkListener( + topic: String, + consumer: DefaultConsumer, + accepts: List, + val runnerCoordinator: RunnerCoordinator + ) : SimpleMessageListener( + topic, consumer, + accepts + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val message = data.value().apply { + this.data = ExtractWorkDeserializer().deserializeIfSuccessful(data.value()) + } + runnerCoordinator.addExtractMessageToQueue(message) + } + } +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt new file mode 100644 index 00000000..aaec4eeb --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt @@ -0,0 +1,16 @@ +package no.iktdev.streamit.content.encode.progress + +data class Progress( + val frame: Int?, + val fps: Double?, + val stream_0_0_q: Double?, + val bitrate: String?, + val total_size: Int?, + val out_time_us: Long?, + val out_time_ms: Long?, + val out_time: String?, + val dup_frames: Int?, + val drop_frames: Int?, + val speed: Double?, + val progress: String? +) diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt new file mode 100644 index 00000000..0abe0471 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt @@ -0,0 +1,35 @@ +package no.iktdev.streamit.content.encode.progress + +class ProgressDecoder { + fun parseVideoProgress(lines: List): Progress? { + var frame: Int? = null + var progress: String? = null + val metadataMap = mutableMapOf() + + for (line in lines) { + val keyValuePairs = line.split(" ") + for (keyValuePair in keyValuePairs) { + val (key, value) = keyValuePair.split("=") + metadataMap[key] = value + } + + if (frame == null) { + frame = metadataMap["frame"]?.toIntOrNull() + } + + progress = metadataMap["progress"] + } + + return if (progress != null) { + // When "progress" is found, build and return the VideoMetadata object + Progress( + frame, metadataMap["fps"]?.toDoubleOrNull(), metadataMap["stream_0_0_q"]?.toDoubleOrNull(), + metadataMap["bitrate"], metadataMap["total_size"]?.toIntOrNull(), metadataMap["out_time_us"]?.toLongOrNull(), + metadataMap["out_time_ms"]?.toLongOrNull(), metadataMap["out_time"], metadataMap["dup_frames"]?.toIntOrNull(), + metadataMap["drop_frames"]?.toIntOrNull(), metadataMap["speed"]?.toDoubleOrNull(), progress + ) + } else { + null // If "progress" is not found, return null + } + } +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt new file mode 100644 index 00000000..3eef4f08 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt @@ -0,0 +1,68 @@ +package no.iktdev.streamit.content.encode.runner + +import EncodeEnv +import no.iktdev.exfl.observable.ObservableList +import no.iktdev.exfl.observable.observableListOf +import no.iktdev.streamit.content.common.deamon.Daemon +import no.iktdev.streamit.content.common.deamon.IDaemon +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.encode.progress.Progress +import no.iktdev.streamit.content.encode.progress.ProgressDecoder +import java.io.File + +class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInterface: IEncodeListener): IDaemon { + var outputCache = observableListOf() + private val decoder = ProgressDecoder() + private fun produceProgress(items: List) { + val progress = decoder.parseVideoProgress(items) + if (progress != null) { + daemonInterface.onProgress(referenceId, work, progress) + outputCache.clear() + } + } + + init { + outputCache.addListener(object : ObservableList.Listener { + override fun onAdded(item: String) { + produceProgress(outputCache) + } + }) + } + + suspend fun runUsingWorkItem(): Int { + if (!File(work.outFile).parentFile.exists()) { + File(work.outFile).parentFile.mkdirs() + } + val adjustedArgs = listOf( + "-hide_banner", "-i", work.inFile, *work.arguments.toTypedArray(), work.outFile, + "-progress", "pipe:1" + ) + if (EncodeEnv.allowOverwrite) listOf("-y") else emptyList() + return Daemon(EncodeEnv.ffmpeg, this).run(adjustedArgs) + } + + override fun onStarted() { + super.onStarted() + daemonInterface.onStarted(referenceId, work) + } + + override fun onEnded() { + super.onEnded() + daemonInterface.onEnded(referenceId, work) + } + + override fun onError(code: Int) { + daemonInterface.onError(referenceId, work, code) + } + override fun onOutputChanged(line: String) { + super.onOutputChanged(line) + outputCache.add(line) + } + +} + +interface IEncodeListener { + fun onStarted(referenceId: String, work: EncodeWork) + fun onError(referenceId: String, work: EncodeWork, code: Int) + fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) + fun onEnded(referenceId: String, work: EncodeWork) +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt new file mode 100644 index 00000000..ed3b0da2 --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt @@ -0,0 +1,54 @@ +package no.iktdev.streamit.content.encode.runner + +import EncodeEnv +import no.iktdev.exfl.observable.ObservableList +import no.iktdev.exfl.observable.observableListOf +import no.iktdev.streamit.content.common.deamon.Daemon +import no.iktdev.streamit.content.common.deamon.IDaemon +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.encode.progress.Progress +import no.iktdev.streamit.content.encode.progress.ProgressDecoder +import java.io.File + +class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonInterface: IExtractListener): IDaemon { + var outputCache = observableListOf() + + + suspend fun runUsingWorkItem(): Int { + if (!File(work.outFile).parentFile.exists()) { + File(work.outFile).parentFile.mkdirs() + } + val adjustedArgs = listOf( + "-hide_banner", "-i", work.inFile, *work.arguments.toTypedArray(), work.outFile, + "-progress", "pipe:1" + ) + if (EncodeEnv.allowOverwrite) listOf("-y") else emptyList() + return Daemon(EncodeEnv.ffmpeg, this).run(adjustedArgs) + } + + override fun onStarted() { + super.onStarted() + daemonInterface.onStarted(referenceId, work) + } + + override fun onEnded() { + super.onEnded() + daemonInterface.onEnded(referenceId, work) + } + + override fun onError(code: Int) { + daemonInterface.onError(referenceId, work, code) + } + override fun onOutputChanged(line: String) { + super.onOutputChanged(line) + outputCache.add(line) + } + +} + +interface IExtractListener { + fun onStarted(referenceId: String, work: ExtractWork) + fun onError(referenceId: String, work: ExtractWork, code: Int) + fun onProgress(referenceId: String, work: ExtractWork, progress: Progress) {} + fun onEnded(referenceId: String, work: ExtractWork) +} \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt new file mode 100644 index 00000000..a329c98e --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -0,0 +1,109 @@ +package no.iktdev.streamit.content.encode.runner + +import EncodeEnv +import kotlinx.coroutines.runBlocking +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.deamon.IDaemon +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.encode.progress.Progress +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.Status +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.producer.DefaultProducer +import org.springframework.stereotype.Service +import java.util.concurrent.ExecutorService +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +@Service +class RunnerCoordinator { + + val producer = DefaultProducer(CommonConfig.kafkaTopic) + + val encodeExecutor: ExecutorService = ThreadPoolExecutor( + EncodeEnv.maxRunners, + EncodeEnv.maxRunners, + 0L, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue() + ) + + val extractExecutor: ExecutorService = ThreadPoolExecutor( + EncodeEnv.maxRunners, + EncodeEnv.maxRunners, + 0L, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue() + ) + + fun addEncodeMessageToQueue(message: Message) { + encodeExecutor.execute { + runBlocking { + if (message.data is EncodeWork) { + val data: EncodeWork = message.data as EncodeWork + val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener) + encodeDaemon.runUsingWorkItem() + } else { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork"))) + } + } + } + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) + } + + fun addExtractMessageToQueue(message: Message) { + extractExecutor.execute { + runBlocking { + if (message.data is ExtractWork) { + val data: ExtractWork = message.data as ExtractWork + val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener) + extractDaemon.runUsingWorkItem() + } else { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork"))) + } + } + } + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING))) + } + + + + + + val encodeListener = object: IEncodeListener { + override fun onStarted(referenceId: String, work: EncodeWork) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + } + + override fun onError(referenceId: String, work: EncodeWork, code: Int) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work)) + } + + override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { + // TODO: Pass to listener + } + + override fun onEnded(referenceId: String, work: EncodeWork) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + } + } + + val extractListener = object : IExtractListener { + override fun onStarted(referenceId: String, work: ExtractWork) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + } + + override fun onError(referenceId: String, work: ExtractWork, code: Int) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR), work)) + } + + override fun onEnded(referenceId: String, work: ExtractWork) { + producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.SUCCESS), work)) + } + + } + +} diff --git a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt new file mode 100644 index 00000000..a3bb969f --- /dev/null +++ b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt @@ -0,0 +1,33 @@ +package no.iktdev.streamit.content.encode.progress + +class ProgressDecoderTest { + + + val text = """ + frame=16811 fps= 88 q=40.0 size= 9984kB time=00:01:10.79 bitrate=1155.3kbits/s speed=3.71x + fps=88.03 + stream_0_0_q=40.0 + bitrate=1155.3kbits/s + total_size=10223752 + out_time_us=70798005 + out_time_ms=70798005 + out_time=00:01:10.798005 + dup_frames=0 + drop_frames=0 + speed=3.71x + progress=continue + frame= 1710 fps= 84 q=-1.0 Lsize= 12124kB time=00:01:11.91 bitrate=1381.2kbits/s speed=3.53x + frame=1710 + fps=84.01 + stream_0_0_q=-1.0 + bitrate=1381.2kbits/s + total_size=12415473 + out_time_us=71910998 + out_time_ms=71910998 + out_time=00:01:11.910998 + dup_frames=0 + drop_frames=0 + speed=3.53x + progress=end + """.trimIndent() +} \ No newline at end of file diff --git a/Reader/Dockerfile b/Reader/Dockerfile index f7790028..f244074f 100644 --- a/Reader/Dockerfile +++ b/Reader/Dockerfile @@ -1,8 +1,4 @@ -FROM bskjon/azuljava:17 +FROM bskjon/debian-azuljava17-ffmpeg:latest EXPOSE 8080 -RUN mkdir -p /src/input -RUN apt update -y -RUN apt install -y ffmpeg - COPY ./build/libs/reader.jar /usr/share/app/app.jar \ No newline at end of file diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index e0828256..13ddfd31 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,8 +23,11 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha48") - implementation("no.iktdev:exfl:0.0.8-SNAPSHOT") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") + implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + + implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7") + implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt index 9a6ba22c..62be068f 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt @@ -2,14 +2,20 @@ package no.iktdev.streamit.content.reader.analyzer.contentDeterminator import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.FileResultDeserializer +import no.iktdev.streamit.content.common.deserializers.MetadataResultDeserializer import no.iktdev.streamit.content.common.dto.ContentOutName import no.iktdev.streamit.content.common.dto.Metadata -import no.iktdev.streamit.content.reader.DefaultKafkaReader -import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.content.common.dto.reader.MovieInfo +import no.iktdev.streamit.content.common.dto.reader.VideoInfo import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import org.springframework.stereotype.Service @@ -24,8 +30,8 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM consumer = defaultConsumer, accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event, subAccepts = listOf(KafkaEvents.EVENT_METADATA_OBTAINED.event), - deserializers = Deserializers().getDeserializers(), - this + deserializers = loadDeserializers(), + listener = this ) {} init { @@ -42,20 +48,20 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM logger.info { "All messages are received" } val initMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] - if (initMessage == null) { - produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!") + if (initMessage == null || initMessage.status.statusType != StatusType.SUCCESS) { + produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!") return } - val fileResult = initMessage.data as FileWatcher.FileResult? + val fileResult = initMessage.data as FileResult? if (fileResult == null) { - produceErrorMessage(initMessage, "FileResult is either null or not deserializable!") + produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "FileResult is either null or not deserializable!") return } val metadataMessage = result[KafkaEvents.EVENT_METADATA_OBTAINED.event] val metadata = if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null - val baseFileName = if (metadata?.type == null) { + val videoInfo = if (metadata?.type == null) { FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedFileName() } else if (metadata.type.lowercase() == "movie") { FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.MOVIE).getDeterminedFileName() @@ -63,9 +69,26 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.SERIE).getDeterminedFileName() } - val out = ContentOutName(baseFileName) + if (videoInfo == null) { + produceErrorMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, "VideoInfo is null." ) + return + } + + val out = ContentOutName(videoInfo.fullName) produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out) + if (videoInfo is EpisodeInfo) { + produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo) + } else if (videoInfo is MovieInfo) { + produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo) + } + } + + final override fun loadDeserializers(): Map> { + return mutableMapOf( + KafkaEvents.EVENT_READER_RECEIVED_FILE.event to FileResultDeserializer(), + KafkaEvents.EVENT_METADATA_OBTAINED.event to MetadataResultDeserializer() + ) } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/Deserializers.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/Deserializers.kt deleted file mode 100644 index b3ec0d4f..00000000 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/Deserializers.kt +++ /dev/null @@ -1,37 +0,0 @@ -package no.iktdev.streamit.content.reader.analyzer.contentDeterminator - -import no.iktdev.streamit.content.common.dto.Metadata -import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher -import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.StatusType -import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization - -class Deserializers { - - val fileReceived = object : IMessageDataDeserialization { - override fun deserialize(incomingMessage: Message): FileWatcher.FileResult? { - if (incomingMessage.status.statusType != StatusType.SUCCESS) { - return null - } - return incomingMessage.dataAs(FileWatcher.FileResult::class.java) - } - } - - val metadataReceived = object: IMessageDataDeserialization { - override fun deserialize(incomingMessage: Message): Metadata? { - if (incomingMessage.status.statusType != StatusType.SUCCESS) { - return null - } - return incomingMessage.dataAs(Metadata::class.java) - } - - } - fun getDeserializers(): Map> { - return mutableMapOf( - KafkaEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived, - KafkaEvents.EVENT_METADATA_OBTAINED.event to metadataReceived - ) - } - -} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/FileNameDeterminate.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/FileNameDeterminate.kt index 2a4e21e5..61076fca 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/FileNameDeterminate.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/FileNameDeterminate.kt @@ -1,5 +1,9 @@ package no.iktdev.streamit.content.reader.analyzer.contentDeterminator +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo +import no.iktdev.streamit.content.common.dto.reader.MovieInfo +import no.iktdev.streamit.content.common.dto.reader.VideoInfo + class FileNameDeterminate(val title: String, val sanitizedName: String, val ctype: ContentType = ContentType.UNDEFINED) { enum class ContentType { @@ -8,7 +12,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp UNDEFINED } - fun getDeterminedFileName(): String { + fun getDeterminedFileName(): VideoInfo? { return when (ctype) { ContentType.MOVIE -> determineMovieFileName() ContentType.SERIE -> determineSerieFileName() @@ -16,23 +20,23 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp } } - private fun determineMovieFileName(): String { + private fun determineMovieFileName(): MovieInfo? { val movieEx = MovieEx(title, sanitizedName) val result = when { movieEx.isDefinedWithYear() != null -> sanitizedName.replace(movieEx.isDefinedWithYear()!!, "").trim() movieEx.doesContainMovieKeywords() -> sanitizedName.replace(Regex("(?i)\\s*\\(\\s*movie\\s*\\)\\s*"), "").trim() else -> title } - return result + return MovieInfo(title, result) } - private fun determineSerieFileName(): String { + private fun determineSerieFileName(): EpisodeInfo? { val serieEx = SerieEx(title, sanitizedName) val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName) val episodeNumberSingle = serieEx.findEpisodeNumber() val seasonNumber = season ?: "1" - val episodeNumber = episode ?: (episodeNumberSingle ?: return sanitizedName) + val episodeNumber = episode ?: (episodeNumberSingle ?: return null) val seasonEpisodeCombined = serieEx.getSeasonEpisodeCombined(seasonNumber, episodeNumber) val episodeTitle = serieEx.findEpisodeTitle() @@ -50,11 +54,11 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp } } else title - - return "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() + val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() + return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) } - private fun determineUndefinedFileName(): String { + private fun determineUndefinedFileName(): VideoInfo? { val serieEx = SerieEx(title, sanitizedName) val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName) return if (sanitizedName.contains(" - ") || season != null || episode != null) { diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedDeserializers.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedDeserializers.kt deleted file mode 100644 index eb69f8c0..00000000 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedDeserializers.kt +++ /dev/null @@ -1,86 +0,0 @@ -package no.iktdev.streamit.content.reader.analyzer.encoding - -import com.google.gson.Gson -import com.google.gson.JsonObject -import no.iktdev.streamit.content.common.dto.ContentOutName -import no.iktdev.streamit.content.common.streams.AudioStream -import no.iktdev.streamit.content.common.streams.MediaStreams -import no.iktdev.streamit.content.common.streams.SubtitleStream -import no.iktdev.streamit.content.common.streams.VideoStream -import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher -import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.dto.Message -import no.iktdev.streamit.library.kafka.dto.StatusType -import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization - -class EncodedDeserializers { - val gson = Gson() - - val fileReceived = object : IMessageDataDeserialization { - override fun deserialize(incomingMessage: Message): FileWatcher.FileResult? { - if (incomingMessage.status.statusType != StatusType.SUCCESS) { - return null - } - return incomingMessage.dataAs(FileWatcher.FileResult::class.java) - } - } - - - val determinedFileNameReceived = object: IMessageDataDeserialization { - override fun deserialize(incomingMessage: Message): ContentOutName? { - if (incomingMessage.status.statusType != StatusType.SUCCESS) { - return null - } - return incomingMessage.dataAs(ContentOutName::class.java) - } - - } - - val mediaStreams = object : IMessageDataDeserialization { - override fun deserialize(incomingMessage: Message): MediaStreams? { - return try { - if (incomingMessage.status.statusType != StatusType.SUCCESS) { - return null - } - val jsonObject = if (incomingMessage.data is String) { - gson.fromJson(incomingMessage.data as String, JsonObject::class.java) - } else { - gson.fromJson(incomingMessage.dataAsJson(), JsonObject::class.java) - } - - val streamsJsonArray = jsonObject.getAsJsonArray("streams") - - val rstreams = streamsJsonArray.mapNotNull { streamJson -> - val streamObject = streamJson.asJsonObject - - val codecType = streamObject.get("codec_type").asString - if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") { - null - } else { - when (codecType) { - "video" -> gson.fromJson(streamObject, VideoStream::class.java) - "audio" -> gson.fromJson(streamObject, AudioStream::class.java) - "subtitle" -> gson.fromJson(streamObject, SubtitleStream::class.java) - else -> null //throw IllegalArgumentException("Unknown stream type: $codecType") - } - } - } - - return MediaStreams(rstreams) - } catch (e: Exception) { - e.printStackTrace() - null - } - } - - } - - fun getDeserializers(): Map> { - return mutableMapOf( - KafkaEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived, - KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event to mediaStreams, - KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event to determinedFileNameReceived - ) - } - -} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index 24f127ce..1289c8dd 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -2,30 +2,27 @@ package no.iktdev.streamit.content.reader.analyzer.encoding import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry import no.iktdev.streamit.content.common.dto.ContentOutName +import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.content.common.streams.MediaStreams -import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation import no.iktdev.streamit.content.reader.analyzer.encoding.helpers.EncodeArgumentSelector -import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher import no.iktdev.streamit.library.kafka.KafkaEvents -import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener -import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.springframework.stereotype.Service import java.io.File private val logger = KotlinLogging.logger {} @Service -class EncodedStreams : ISequentialMessageEvent { +class EncodedStreams : DefaultKafkaReader("encodedStreams"), ISequentialMessageEvent { - val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) - - final val defaultConsumer = DefaultConsumer(subId = "encodedStreams") final val mainListener = object : SequentialMessageListener( @@ -36,8 +33,8 @@ class EncodedStreams : ISequentialMessageEvent { KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event, KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event ), - deserializers = EncodedDeserializers().getDeserializers(), - this + deserializers = loadDeserializers(), + listener = this ) {} init { @@ -51,69 +48,87 @@ class EncodedStreams : ISequentialMessageEvent { override fun onAllMessagesProcessed(referenceId: String, result: Map) { logger.info { "All messages are received" } - val baseMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] - if (baseMessage == null) { - produceErrorMessage( - Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), - "Initiator message not found!" - ) - return + val fileResultEvent = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] + val determinedFileNameEvent = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event] + val streamEvent = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event] + + val fileResult = if (fileResultEvent != null && fileResultEvent.isSuccessful()) { + fileResultEvent.data as FileResult? + } else null + + val outFileNameWithoutExtension = if (determinedFileNameEvent != null && determinedFileNameEvent.isSuccessful()) { + (determinedFileNameEvent.data as ContentOutName).baseName + } else fileResult?.sanitizedName + + val streams = if (streamEvent != null && streamEvent.isSuccessful()) { + streamEvent.data as MediaStreams + } else null + + createEncodeWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension) + createExtractWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension) + } + + fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) { + if (inFile.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "No input file received"); return } - - if (result.values.any { it?.status?.statusType != StatusType.SUCCESS }) { - produceErrorMessage( - Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), - "Failed messages found!" - ) - return - } - val fileResult = baseMessage.data as FileWatcher.FileResult? - if (fileResult == null) { - produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!") - return - } - - val determinedfnm = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event] - val determinedFileName = determinedfnm?.data as ContentOutName - - val outFileName = if (determinedfnm.status.statusType == StatusType.SUCCESS) - determinedFileName.baseName - else fileResult.sanitizedName.ifBlank { File(fileResult.file).nameWithoutExtension } - - - val streams = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams? if (streams == null) { - produceErrorMessage(baseMessage, "No streams received!") - return + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "No input streams received"); return + } + if (outFileName.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "No output file name received!"); return + } + if (collection.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "No collection provided for file!"); return } val encodeInformation = - EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName) - produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments()) - encodeInformation.getSubtitleArguments().forEach { s -> - produceEncodeMessage(baseMessage, s) + EncodeArgumentSelector(collection = collection, inputFile = inFile, streams = streams, outFileName = outFileName) + + val videoInstructions = encodeInformation.getVideoAndAudioArguments() + if (videoInstructions == null) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "Failed to generate Video Arguments Bundle") + return } + produceMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, Message(referenceId, Status(StatusType.SUCCESS)), videoInstructions) + + } + + fun createExtractWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) { + if (inFile.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, referenceId, "No input file received"); return + } + if (streams == null) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, referenceId, "No input streams received"); return + } + if (outFileName.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, referenceId, "No output file name received!"); return + } + if (collection.isNullOrBlank()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, referenceId, "No collection provided for file!"); return + } + + val argsSelector = EncodeArgumentSelector(collection = collection, inputFile = inFile, streams = streams, outFileName = outFileName) + val items = argsSelector.getSubtitleArguments() + if (argsSelector == null || items.isEmpty()) { + produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, referenceId, "Failed to generate Subtitle Arguments Bundle") + return + } + + argsSelector.getSubtitleArguments().forEach { + produceMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE, Message(referenceId, Status(StatusType.SUCCESS)), it) + + } + } - private fun produceErrorMessage(baseMessage: Message, reason: String) { - val message = Message( - referenceId = baseMessage.referenceId, - actionType = baseMessage.actionType, - Status(statusType = StatusType.ERROR, message = reason) + final override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer( + KafkaEvents.EVENT_READER_RECEIVED_FILE, + KafkaEvents.EVENT_READER_RECEIVED_STREAMS, + KafkaEvents.EVENT_READER_DETERMINED_FILENAME ) - messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message) } - private fun produceEncodeMessage(baseMessage: Message, data: EncodeInformation?) { - val message = Message( - referenceId = baseMessage.referenceId, - actionType = baseMessage.actionType, - Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED), - data = data - ) - messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message) - } - - } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/EncodeInformation.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/EncodeInformation.kt deleted file mode 100644 index d45a44f8..00000000 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/EncodeInformation.kt +++ /dev/null @@ -1,8 +0,0 @@ -package no.iktdev.streamit.content.reader.analyzer.encoding.dto - -data class EncodeInformation( - val inputFile: String, - val outFileName: String, - val language: String, - val arguments: List -) diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/SubtitleEncodeArguments.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/SubtitleEncodeArguments.kt index 3999bc7d..d4966f90 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/SubtitleEncodeArguments.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/dto/SubtitleEncodeArguments.kt @@ -11,15 +11,4 @@ class SubtitleEncodeArguments(val subtitle: SubtitleStream) { return result } - fun getFormatToCodec(): String? { - return when(subtitle.codec_name) { - "ass" -> "ass" - "subrip" -> "srt" - "webvtt", "vtt" -> "vtt" - "smi" -> "smi" - "hdmv_pgs_subtitle" -> null - else -> null - } - } - } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt index ef1e86b3..90b8bc6d 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/helpers/EncodeArgumentSelector.kt @@ -1,16 +1,16 @@ package no.iktdev.streamit.content.reader.analyzer.encoding.helpers -import no.iktdev.streamit.content.common.streams.AudioStream -import no.iktdev.streamit.content.common.streams.MediaStreams -import no.iktdev.streamit.content.common.streams.SubtitleStream -import no.iktdev.streamit.content.common.streams.VideoStream +import no.iktdev.exfl.using +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.common.streams.* import no.iktdev.streamit.content.reader.analyzer.encoding.dto.AudioEncodeArguments -import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation import no.iktdev.streamit.content.reader.analyzer.encoding.dto.SubtitleEncodeArguments import no.iktdev.streamit.content.reader.analyzer.encoding.dto.VideoEncodeArguments import no.iktdev.streamit.content.reader.preference -class EncodeArgumentSelector(val inputFile: String, val streams: MediaStreams, val outFileName: String) { +class EncodeArgumentSelector(val collection: String, val inputFile: String, val streams: MediaStreams, val outFileName: String) { var defaultSelectedVideo: VideoStream? = defaultSelectedVideo() var defaultSelectedAudio: AudioStream? = defaultSelectedAudio() @@ -43,30 +43,40 @@ class EncodeArgumentSelector(val inputFile: String, val streams: MediaStreams, v } - fun getVideoAndAudioArguments(): EncodeInformation? { + fun getVideoAndAudioArguments(): EncodeWork? { val selectedVideo = defaultSelectedVideo val selectedAudio = getSelectedAudioBasedOnPreference() ?: defaultSelectedAudio return if (selectedVideo == null || selectedAudio == null) return null else { - EncodeInformation( - inputFile = inputFile, - outFileName = "$outFileName.mp4", - language = selectedAudio.tags.language ?: "eng", + val outFileName = "$outFileName.mp4" + val outFile = CommonConfig.outgoingContent.using(collection, outFileName) + EncodeWork( + collection = collection, + inFile = inputFile, arguments = VideoEncodeArguments(selectedVideo).getVideoArguments() + - AudioEncodeArguments(selectedAudio).getAudioArguments() + AudioEncodeArguments(selectedAudio).getAudioArguments(), + outFile = outFile.absolutePath ) } } - fun getSubtitleArguments(): List { - return streams.streams.filterIsInstance().map { - val subArgs = SubtitleEncodeArguments(it) - EncodeInformation( - inputFile = inputFile, - outFileName = "$outFileName.${subArgs.getFormatToCodec()}", - language = it.tags.language ?: "eng", - arguments = subArgs.getSubtitleArguments() + fun getSubtitleArguments(): List { + val subtitleStreams = SubtitleStreamSelector(streams.streams.filterIsInstance()) + + return subtitleStreams.getDesiredStreams().map { + val args = SubtitleEncodeArguments(it) + val language = it.tags.language ?: "eng" + val outFileName = "$outFileName.${subtitleStreams.getFormatToCodec(it.codec_name)}" + val outFile = CommonConfig.outgoingContent.using(collection, "sub", language, outFileName) + + ExtractWork( + collection = collection, + language = language, + inFile = inputFile, + outFile = outFile.absolutePath, + arguments = args.getSubtitleArguments(), ) } + } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt new file mode 100644 index 00000000..54feb555 --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt @@ -0,0 +1,116 @@ +package no.iktdev.streamit.content.reader.collector + +import kotlinx.coroutines.runBlocking +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.Downloader +import no.iktdev.streamit.content.common.SequentialKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.dto.Metadata +import no.iktdev.streamit.content.common.dto.reader.EpisodeInfo +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.library.db.query.* +import no.iktdev.streamit.library.db.tables.catalog +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener +import org.springframework.stereotype.Service +import java.io.File + +@Service +class EncodedVideoConsumer: SequentialKafkaReader("collectorConsumerVideo") { + override val accept: KafkaEvents + get() = KafkaEvents.EVENT_READER_RECEIVED_FILE + override val subAccepts: List + get() = listOf( + KafkaEvents.EVENT_METADATA_OBTAINED, + KafkaEvents.EVENT_READER_DETERMINED_SERIE, + KafkaEvents.EVENT_READER_DETERMINED_MOVIE, + KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE + ) + + + val listener = object: SequentialMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accept = accept.event, + subAccepts = subAccepts.map { it.event }, + deserializers = loadDeserializers(), + validity = 86400000, + listener =this + ) {} + + init { + listener.listen() + } + + + override fun getRequiredMessages(): List { + return listOf(accept.event) + subAccepts.map { it.event } + } + + override fun onAllMessagesProcessed(referenceId: String, result: Map) { + val metadata = result[KafkaEvents.EVENT_METADATA_OBTAINED.event]?.data as Metadata? + val fileData = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]?.data as FileResult? + val encodeStatus = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.status?.statusType ?: StatusType.ERROR + val encodeWork = result[KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event]?.data as EncodeWork? + + + if (fileData == null || encodeStatus != StatusType.SUCCESS || encodeWork == null) { + return + } + + val videoFileNameWithExtension = File(encodeWork.outFile).name + + val contentType = metadata?.type ?: return + val iid = if (contentType == "movie") MovieQuery(videoFileNameWithExtension).insertAndGetId() else null + + val serieData = result[KafkaEvents.EVENT_READER_DETERMINED_SERIE.event]?.data as EpisodeInfo? + if (serieData != null) { + val success = SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension).insertAndGetStatus() + if (!success) + return + } + + val coverFile = metadata?.cover?.let { coverUrl -> + runBlocking { + try { + Downloader(coverUrl, CommonConfig.outgoingContent, fileData.title).download() + } catch (e: Exception) { + // No cover + null + } + } + } + val metaGenre = metadata.genres + val gq = GenreQuery(*metaGenre.toTypedArray()) + gq.insertAndGetIds() + val gids = gq.getIds().joinToString(",") + + val cq = CatalogQuery( + title = fileData.title, + cover = coverFile?.name, + type = contentType, + collection = fileData.title, + iid = iid, + genres = gids + ) + val cid = cq.insertAndGetId() ?: cq.getId() ?: return + if (!metadata.summary.isNullOrBlank()) { + val summary = metadata.summary ?: return + SummaryQuery( + cid = cid, + language = "eng", // TODO: Fix later, + description = summary + ) + } + } + + + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer(*subAccepts.toTypedArray()) + } +} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt new file mode 100644 index 00000000..5468abce --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt @@ -0,0 +1,54 @@ +package no.iktdev.streamit.content.reader.collector + +import kotlinx.coroutines.runBlocking +import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.DefaultKafkaReader +import no.iktdev.streamit.content.common.Downloader +import no.iktdev.streamit.content.common.SequentialKafkaReader +import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry +import no.iktdev.streamit.content.common.dto.Metadata + +import no.iktdev.streamit.content.common.dto.reader.FileResult +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.library.db.query.* +import no.iktdev.streamit.library.kafka.KafkaEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener +import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization +import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import java.io.File + +class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerSubtitle") { + + private val listener = object: SimpleMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) + ) { + override fun onMessageReceived(data: ConsumerRecord) { + val workResult = data.value().dataAs(ExtractWork::class.java) + if (!data.value().isSuccessful() || workResult == null) { + return + } + + val of = File(workResult.outFile) + SubtitleQuery( + title = of.nameWithoutExtension, + language = workResult.language, + collection = workResult.collection, + format = of.extension.uppercase() + ).insertAndGetStatus() + } + } + + init { + listener.listen() + } + + override fun loadDeserializers(): Map> { + return DeserializerRegistry.getEventToDeserializer(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE) + } +} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt index 80169a6a..469ba6b8 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt @@ -9,6 +9,7 @@ import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.Naming +import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message @@ -106,10 +107,6 @@ class FileWatcher: FileWatcherEvents { messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_FILE.event , message) } - data class FileResult( - val file: String, - val title: String = "", - val sanitizedName: String = "" - ) + } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index 1750a2f4..e9f8b8c1 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -5,6 +5,7 @@ import mu.KotlinLogging import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.IDaemon +import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.content.reader.ReaderEnv import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher import no.iktdev.streamit.library.kafka.KafkaEvents @@ -35,7 +36,7 @@ class StreamsReader { logger.info { "Ignoring event: ${data.key()} as status is not Success!" } return } - val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java) + val dataValue = data.value().dataAs(FileResult::class.java) if (dataValue == null) { logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" } @@ -43,7 +44,7 @@ class StreamsReader { } logger.info { "Preparing Probe for ${dataValue.file}" } val output = mutableListOf() - val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object: + val d = Daemon(executable = ReaderEnv.ffprobe, daemonInterface = object: IDaemon { override fun onOutputChanged(line: String) { output.add(line) @@ -53,7 +54,7 @@ class StreamsReader { logger.info { "Probe started for ${dataValue.file}" } } - override fun onError() { + override fun onError(code: Int) { logger.error { "An error occurred for ${dataValue.file}" } } @@ -63,7 +64,8 @@ class StreamsReader { }) val resultCode = runBlocking { - d.run() + val args = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file) + d.run(args) } val message = Message(referenceId = data.value().referenceId, status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n")) diff --git a/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializersTest.kt b/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializersTest.kt index 43c42b65..18b41a09 100644 --- a/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializersTest.kt +++ b/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializersTest.kt @@ -1,7 +1,7 @@ package no.iktdev.streamit.content.reader.analyzer +import no.iktdev.streamit.content.common.deserializers.MediaStreamsDeserializer import no.iktdev.streamit.content.common.streams.MediaStreams -import no.iktdev.streamit.content.reader.analyzer.encoding.EncodedDeserializers import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.* @@ -9,13 +9,12 @@ import org.junit.jupiter.api.Test class EncodedDeserializersTest { - val deserializer = EncodedDeserializers() val consumer = DefaultConsumer.GsonDeserializer() @Test fun testDeserializationOfMediaStreams() { val message = consumer.deserialize("demo", messageMediaStream.toByteArray()) - val result = deserializer.mediaStreams.deserialize(message) + val result = MediaStreamsDeserializer().deserialize(message) assertInstanceOf(MediaStreams::class.java, result) assertThat(result?.streams).isNotNull() assertThat(result?.streams).isNotEmpty() diff --git a/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReaderTest.kt b/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReaderTest.kt index c3aacf84..7e32146f 100644 --- a/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReaderTest.kt +++ b/Reader/src/test/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReaderTest.kt @@ -1,6 +1,7 @@ package no.iktdev.streamit.content.reader.streams import com.google.gson.Gson +import no.iktdev.streamit.content.common.dto.reader.FileResult import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher import no.iktdev.streamit.library.kafka.dto.Message import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat @@ -26,7 +27,7 @@ class StreamsReaderTest { """.trimIndent() assertDoesNotThrow { val message = Gson().fromJson(data, Message::class.java) - val result = message.dataAs(FileWatcher.FileResult::class.java) + val result = message.dataAs(FileResult::class.java) assertThat(result?.title).isEqualTo("Iseleve") } }