Updated and replaced code from lib

This commit is contained in:
Brage 2023-07-19 17:00:35 +02:00
parent ea685568ff
commit 3da6c81655
4 changed files with 130 additions and 55 deletions

View File

@ -23,7 +23,7 @@ repositories {
}
dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha28")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha31")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -18,14 +18,10 @@ class EncodeStreamsMessageParser {
return file.value().dataAs(FileWatcher.FileResult::class.java)
}
fun getMediaStreamsFromEvent(records: MutableList<ConsumerRecord<String, Message>>): MediaStreams? {
val streams = records.find { it.key() == KnownEvents.EVENT_READER_RECEIVED_STREAMS.event } ?: return null
if (streams.value().status.statusType != StatusType.SUCCESS || streams.value().data !is String) return null
val json = streams.value().data as String
fun getMediaStreamsFromJsonString(streamAsJson: String): MediaStreams? {
val gson = Gson()
/*return gson.fromJson(streams.value().data as String, MediaStreams::class.java)*/
val jsonObject = gson.fromJson(json, JsonObject::class.java)
val jsonObject = gson.fromJson(streamAsJson, JsonObject::class.java)
val streamsJsonArray = jsonObject.getAsJsonArray("streams")

View File

@ -0,0 +1,66 @@
package no.iktdev.streamit.content.reader.analyzer
import com.google.gson.Gson
import com.google.gson.JsonObject
import com.sun.net.httpserver.Authenticator.Success
import no.iktdev.streamit.content.common.streams.AudioStream
import no.iktdev.streamit.content.common.streams.MediaStreams
import no.iktdev.streamit.content.common.streams.SubtitleStream
import no.iktdev.streamit.content.common.streams.VideoStream
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization
class EncodedDeserializers {
val gson = Gson()
val fileReceived = object : IMessageDataDeserialization<FileWatcher.FileResult> {
override fun deserialize(incomingMessage: Message): FileWatcher.FileResult? {
if (incomingMessage.status.statusType != StatusType.SUCCESS) {
return null
}
return incomingMessage.dataAs(FileWatcher.FileResult::class.java)
}
}
val mediaStreams = object : IMessageDataDeserialization<MediaStreams> {
override fun deserialize(incomingMessage: Message): MediaStreams? {
if (incomingMessage.status.statusType != StatusType.SUCCESS) {
return null
}
/*return gson.fromJson(streams.value().data as String, MediaStreams::class.java)*/
val jsonObject = gson.fromJson(incomingMessage.dataAsJson(), JsonObject::class.java)
val streamsJsonArray = jsonObject.getAsJsonArray("streams")
val rstreams = streamsJsonArray.mapNotNull { streamJson ->
val streamObject = streamJson.asJsonObject
val codecType = streamObject.get("codec_type").asString
if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") {
null
} else {
when (codecType) {
"video" -> gson.fromJson(streamObject, VideoStream::class.java)
"audio" -> gson.fromJson(streamObject, AudioStream::class.java)
"subtitle" -> gson.fromJson(streamObject, SubtitleStream::class.java)
else -> null //throw IllegalArgumentException("Unknown stream type: $codecType")
}
}
}
return MediaStreams(rstreams)
}
}
fun getDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return mutableMapOf(
KnownEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived,
KnownEvents.EVENT_READER_RECEIVED_STREAMS.event to mediaStreams
)
}
}

View File

@ -1,22 +1,23 @@
package no.iktdev.streamit.content.reader.analyzer
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.streams.MediaStreams
import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector
import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.pooled.IPooledEvents
import no.iktdev.streamit.library.kafka.listener.pooled.PooledEventMessageListener
import no.iktdev.streamit.library.kafka.dto.ActionType
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.stereotype.Service
import java.io.File
@Service
class EncodedStreams: IPooledEvents.OnEventsReceived {
class EncodedStreams {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
@ -24,64 +25,76 @@ class EncodedStreams: IPooledEvents.OnEventsReceived {
autoCommit = false
}
val mainListener = object : SequentialMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event,
subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event),
deserializers = EncodedDeserializers().getDeserializers(),
) {
override fun areAllMessagesPresent(currentEvents: List<String>): Boolean {
val expected =
listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
return expected.containsAll(currentEvents)
}
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event]
if (baseMessage == null) {
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!")
return
}
if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) {
return
}
val fileResult = baseMessage?.data as FileWatcher.FileResult?
if (fileResult == null) {
produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!")
return
}
val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension }
val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams?
if (streams == null) {
produceErrorMessage(baseMessage, "No streams received!")
return
}
val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments())
encodeInformation.getSubtitleArguments().forEach { s ->
produceEncodeMessage(baseMessage, s)
}
}
}
init {
val ackListener = PooledEventMessageListener(
topic = CommonConfig.kafkaTopic, consumer = defaultConsumer,
mainFilter = KnownEvents.EVENT_READER_RECEIVED_FILE.event,
subFilter = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event),
event = this
)
ackListener.listen()
mainListener.listen()
}
override fun areAllMessagesReceived(recordedEvents: MutableMap<String, StatusType>): Boolean {
val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
return expected.containsAll(recordedEvents.keys)
}
private fun produceErrorMessage(referenceId: String, reason: String) {
val message = Message(referenceId = referenceId,
private fun produceErrorMessage(baseMessage: Message, reason: String) {
val message = Message(
referenceId = baseMessage.referenceId,
actionType = baseMessage.actionType,
Status(statusType = StatusType.ERROR, message = reason)
)
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
}
private fun produceEncodeMessage(referenceId: String, data: EncodeInformation?) {
val message = Message(referenceId = referenceId,
private fun produceEncodeMessage(baseMessage: Message, data: EncodeInformation?) {
val message = Message(
referenceId = baseMessage.referenceId,
actionType = baseMessage.actionType,
Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED),
data = data
)
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
}
override fun onAllEventsConsumed(referenceId: String, records: MutableList<ConsumerRecord<String, Message>>) {
val parser = EncodeStreamsMessageParser()
val fileResult = parser.getFileNameFromEvent(records)
if (fileResult == null) {
produceErrorMessage(referenceId, "FileResult is either null or not deserializable!")
return
}
val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension }
val streams = parser.getMediaStreamsFromEvent(records)
if (streams == null) {
produceErrorMessage(referenceId, "No streams received!")
return
}
val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
produceEncodeMessage(referenceId, encodeInformation.getVideoAndAudioArguments())
encodeInformation.getSubtitleArguments().forEach { s ->
produceEncodeMessage(referenceId, s)
}
}
}