diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index c7b24f0c..d19b4df2 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha28") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha31") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsMessageParser.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsMessageParser.kt index 6e4920af..88d7cd25 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsMessageParser.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodeStreamsMessageParser.kt @@ -18,14 +18,10 @@ class EncodeStreamsMessageParser { return file.value().dataAs(FileWatcher.FileResult::class.java) } - fun getMediaStreamsFromEvent(records: MutableList>): MediaStreams? { - val streams = records.find { it.key() == KnownEvents.EVENT_READER_RECEIVED_STREAMS.event } ?: return null - if (streams.value().status.statusType != StatusType.SUCCESS || streams.value().data !is String) return null - val json = streams.value().data as String + fun getMediaStreamsFromJsonString(streamAsJson: String): MediaStreams? { val gson = Gson() /*return gson.fromJson(streams.value().data as String, MediaStreams::class.java)*/ - - val jsonObject = gson.fromJson(json, JsonObject::class.java) + val jsonObject = gson.fromJson(streamAsJson, JsonObject::class.java) val streamsJsonArray = jsonObject.getAsJsonArray("streams") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializers.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializers.kt new file mode 100644 index 00000000..9b5e30f9 --- /dev/null +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedDeserializers.kt @@ -0,0 +1,66 @@ +package no.iktdev.streamit.content.reader.analyzer + +import com.google.gson.Gson +import com.google.gson.JsonObject +import com.sun.net.httpserver.Authenticator.Success +import no.iktdev.streamit.content.common.streams.AudioStream +import no.iktdev.streamit.content.common.streams.MediaStreams +import no.iktdev.streamit.content.common.streams.SubtitleStream +import no.iktdev.streamit.content.common.streams.VideoStream +import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher +import no.iktdev.streamit.library.kafka.KnownEvents +import no.iktdev.streamit.library.kafka.dto.Message +import no.iktdev.streamit.library.kafka.dto.StatusType +import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization + +class EncodedDeserializers { + val gson = Gson() + + val fileReceived = object : IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): FileWatcher.FileResult? { + if (incomingMessage.status.statusType != StatusType.SUCCESS) { + return null + } + return incomingMessage.dataAs(FileWatcher.FileResult::class.java) + } + } + + val mediaStreams = object : IMessageDataDeserialization { + override fun deserialize(incomingMessage: Message): MediaStreams? { + if (incomingMessage.status.statusType != StatusType.SUCCESS) { + return null + } + /*return gson.fromJson(streams.value().data as String, MediaStreams::class.java)*/ + val jsonObject = gson.fromJson(incomingMessage.dataAsJson(), JsonObject::class.java) + + val streamsJsonArray = jsonObject.getAsJsonArray("streams") + + val rstreams = streamsJsonArray.mapNotNull { streamJson -> + val streamObject = streamJson.asJsonObject + + val codecType = streamObject.get("codec_type").asString + if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") { + null + } else { + when (codecType) { + "video" -> gson.fromJson(streamObject, VideoStream::class.java) + "audio" -> gson.fromJson(streamObject, AudioStream::class.java) + "subtitle" -> gson.fromJson(streamObject, SubtitleStream::class.java) + else -> null //throw IllegalArgumentException("Unknown stream type: $codecType") + } + } + } + + return MediaStreams(rstreams) + } + + } + + fun getDeserializers(): Map> { + return mutableMapOf( + KnownEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived, + KnownEvents.EVENT_READER_RECEIVED_STREAMS.event to mediaStreams + ) + } + +} \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt index 1c1c807e..e7d3782f 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/EncodedStreams.kt @@ -1,22 +1,23 @@ package no.iktdev.streamit.content.reader.analyzer import no.iktdev.streamit.content.common.CommonConfig +import no.iktdev.streamit.content.common.streams.MediaStreams import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation +import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher import no.iktdev.streamit.library.kafka.KnownEvents import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer -import no.iktdev.streamit.library.kafka.listener.pooled.IPooledEvents -import no.iktdev.streamit.library.kafka.listener.pooled.PooledEventMessageListener +import no.iktdev.streamit.library.kafka.dto.ActionType +import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import no.iktdev.streamit.library.kafka.producer.DefaultProducer -import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service import java.io.File @Service -class EncodedStreams: IPooledEvents.OnEventsReceived { +class EncodedStreams { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) @@ -24,64 +25,76 @@ class EncodedStreams: IPooledEvents.OnEventsReceived { autoCommit = false } + + val mainListener = object : SequentialMessageListener( + topic = CommonConfig.kafkaTopic, + consumer = defaultConsumer, + accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event, + subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event), + deserializers = EncodedDeserializers().getDeserializers(), + ) { + override fun areAllMessagesPresent(currentEvents: List): Boolean { + val expected = + listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) + return expected.containsAll(currentEvents) + } + + override fun onAllMessagesProcessed(referenceId: String, result: Map) { + val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event] + if (baseMessage == null) { + produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!") + return + } + + if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) { + + return + } + val fileResult = baseMessage?.data as FileWatcher.FileResult? + if (fileResult == null) { + produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!") + return + } + + val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension } + + val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams? + if (streams == null) { + produceErrorMessage(baseMessage, "No streams received!") + return + } + + val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName) + produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments()) + encodeInformation.getSubtitleArguments().forEach { s -> + produceEncodeMessage(baseMessage, s) + } + } + } + init { - val ackListener = PooledEventMessageListener( - topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, - mainFilter = KnownEvents.EVENT_READER_RECEIVED_FILE.event, - subFilter = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event), - event = this - ) - ackListener.listen() + mainListener.listen() } - override fun areAllMessagesReceived(recordedEvents: MutableMap): Boolean { - val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) - return expected.containsAll(recordedEvents.keys) - } - private fun produceErrorMessage(referenceId: String, reason: String) { - val message = Message(referenceId = referenceId, + private fun produceErrorMessage(baseMessage: Message, reason: String) { + val message = Message( + referenceId = baseMessage.referenceId, + actionType = baseMessage.actionType, Status(statusType = StatusType.ERROR, message = reason) ) messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message) } - private fun produceEncodeMessage(referenceId: String, data: EncodeInformation?) { - val message = Message(referenceId = referenceId, + private fun produceEncodeMessage(baseMessage: Message, data: EncodeInformation?) { + val message = Message( + referenceId = baseMessage.referenceId, + actionType = baseMessage.actionType, Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED), data = data ) messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message) } - override fun onAllEventsConsumed(referenceId: String, records: MutableList>) { - val parser = EncodeStreamsMessageParser() - val fileResult = parser.getFileNameFromEvent(records) - if (fileResult == null) { - produceErrorMessage(referenceId, "FileResult is either null or not deserializable!") - return - } - val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension } - val streams = parser.getMediaStreamsFromEvent(records) - if (streams == null) { - produceErrorMessage(referenceId, "No streams received!") - return - } - - val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName) - produceEncodeMessage(referenceId, encodeInformation.getVideoAndAudioArguments()) - encodeInformation.getSubtitleArguments().forEach { s -> - produceEncodeMessage(referenceId, s) - } - - - - - } - - - - - } \ No newline at end of file