This commit is contained in:
Brage 2023-07-19 17:57:07 +02:00
parent c5e4823d3a
commit a7fb4e6b45
3 changed files with 49 additions and 47 deletions

View File

@ -26,8 +26,8 @@ jobs:
build-encode: build-encode:
needs: build-commoncode needs: build-commoncode
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: false
if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
steps: steps:
- name: Checkout repository - name: Checkout repository

View File

@ -23,7 +23,7 @@ repositories {
} }
dependencies { dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha34") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha35")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -12,6 +12,7 @@ import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.dto.ActionType import no.iktdev.streamit.library.kafka.dto.ActionType
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -20,62 +21,23 @@ import java.io.File
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Service @Service
class EncodedStreams { class EncodedStreams : ISequentialMessageEvent {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer(subId = "encodedStreams").apply { final val defaultConsumer = DefaultConsumer(subId = "encodedStreams").apply {
autoCommit = false autoCommit = false
} }
val mainListener = object : SequentialMessageListener( final val mainListener = object : SequentialMessageListener(
topic = CommonConfig.kafkaTopic, topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer, consumer = defaultConsumer,
accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event, accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event,
subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event), subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event),
deserializers = EncodedDeserializers().getDeserializers(), deserializers = EncodedDeserializers().getDeserializers(),
) { this
override fun areAllMessagesPresent(currentEvents: List<String>): Boolean { ) {}
val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
val waitingFor = expected.filter { !currentEvents.contains(it) }
logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" }
return expected.containsAll(currentEvents)
}
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
logger.info { "All messages are received" }
val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event]
if (baseMessage == null) {
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!")
return
}
if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) {
return
}
val fileResult = baseMessage?.data as FileWatcher.FileResult?
if (fileResult == null) {
produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!")
return
}
val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension }
val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams?
if (streams == null) {
produceErrorMessage(baseMessage, "No streams received!")
return
}
val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments())
encodeInformation.getSubtitleArguments().forEach { s ->
produceEncodeMessage(baseMessage, s)
}
}
}
init { init {
mainListener.listen() mainListener.listen()
@ -101,5 +63,45 @@ class EncodedStreams {
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message) messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
} }
override fun areAllMessagesPresent(currentEvents: List<String>): Boolean {
val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
val waitingFor = expected.filter { !currentEvents.contains(it) }
logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" }
return expected.containsAll(currentEvents)
}
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
logger.info { "All messages are received" }
val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event]
if (baseMessage == null) {
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!")
return
}
if (result.values.all { it?.status?.statusType == StatusType.SUCCESS }) {
return
}
val fileResult = baseMessage.data as FileWatcher.FileResult?
if (fileResult == null) {
produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!")
return
}
val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension }
val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams?
if (streams == null) {
produceErrorMessage(baseMessage, "No streams received!")
return
}
val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments())
encodeInformation.getSubtitleArguments().forEach { s ->
produceEncodeMessage(baseMessage, s)
}
}
} }