Using Sequential again

This commit is contained in:
Brage 2023-07-25 19:41:52 +02:00
parent 73c97dd73a
commit 69796409b7
2 changed files with 50 additions and 49 deletions

View File

@ -3,7 +3,10 @@ package no.iktdev.streamit.content.reader.analyzer.encoding
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.DefaultKafkaReader
import no.iktdev.streamit.content.common.deserializers.ContentOutNameDeserializer
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
import no.iktdev.streamit.content.common.deserializers.FileResultDeserializer
import no.iktdev.streamit.content.common.deserializers.MediaStreamsDeserializer
import no.iktdev.streamit.content.common.dto.ContentOutName
import no.iktdev.streamit.content.common.dto.reader.FileResult
import no.iktdev.streamit.content.common.streams.MediaStreams
@ -14,8 +17,8 @@ import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener
import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent
import no.iktdev.streamit.library.kafka.listener.collector.NeedyMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
import org.apache.kafka.clients.consumer.ConsumerRecord
@ -25,24 +28,22 @@ import java.io.File
private val logger = KotlinLogging.logger {}
@Service
class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent<ResultCollection> {
class EncodedStreams : DefaultKafkaReader("streamSelector"), ISequentialMessageEvent {
val collectionListener = NeedyMessageListener<ResultCollection>(
val listener = object : SequentialMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE,
completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME,
needs = listOf(
KafkaEvents.EVENT_READER_RECEIVED_FILE,
KafkaEvents.EVENT_READER_RECEIVED_STREAMS,
KafkaEvents.EVENT_READER_DETERMINED_FILENAME
accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event,
subAccepts = listOf(
KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event,
KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event,
),
listener = this,
eventCollectionClass = ResultCollection::class.java
)
deserializers = this.loadDeserializers()
) {}
init {
collectionListener.listen()
listener.listen()
}
fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) {
@ -108,29 +109,42 @@ class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesE
)
}
override fun onCollectionCompleted(collection: ResultCollection?) {
override fun getRequiredMessages(): List<String> {
return listener.subAccepts + listOf(listener.accept)
}
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
logger.info { "Collection received" }
val collectedEvents = collection?.getRecords()?.map { it.key() }?.toList() ?: emptyList()
if (collectedEvents.isEmpty()) {
if (result.keys.isEmpty()) {
logger.error { "\nConsumer $subId collected: is null or empty!" }
} else {
logger.info { "\nConsumer $subId collected:\n ${collectedEvents.joinToString("\n\t")}" }
logger.info { "\nConsumer $subId collected:\n ${result.keys.joinToString("\n\t")}" }
}
val referenceId = collection?.getRecords()?.firstOrNull()?.value()?.referenceId
if (referenceId == null) {
logger.warn { "referenceId is null, throwing collection" }
return
}
val outFileNameWithoutExtension: String? = if (collection.getFileName() != null) {
collection.getFileName()?.baseName
val outFileNameWithoutExtension: String? = if (getFileName(result) != null) {
getFileName(result)?.baseName
} else {
logger.info { "Getting filename from ${KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event} resulted in null. Falling back to sanitized name" }
collection.getFileResult()?.sanitizedName
getFileResult(result)?.sanitizedName
}
createEncodeWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension)
createExtractWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension)
val fileResult = getFileResult(result)
createEncodeWork(referenceId, fileResult?.title, fileResult?.file, getStreams(result), outFileNameWithoutExtension)
createExtractWork(referenceId, fileResult?.title, fileResult?.file, getStreams(result), outFileNameWithoutExtension)
}
fun getFileResult(result: Map<String, Message?>): FileResult? {
val record = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event] ?: return null
return FileResultDeserializer().deserializeIfSuccessful(record)
}
fun getFileName(result: Map<String, Message?>): ContentOutName? {
val record = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event] ?: return null
return ContentOutNameDeserializer().deserializeIfSuccessful(record)
}
fun getStreams(result: Map<String, Message?>): MediaStreams? {
val record = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event] ?: return null
return MediaStreamsDeserializer().deserializeIfSuccessful(record)
}
}

View File

@ -17,18 +17,5 @@ class ResultCollection: DefaultEventCollection() {
fun getFirstOrNull(events: KafkaEvents): ConsumerRecord<String, Message>? {
return getRecords().firstOrNull { it.key() == events.event }
}
fun getFileResult(): FileResult? {
val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null
return FileResultDeserializer().deserializeIfSuccessful(record.value())
}
fun getFileName(): ContentOutName? {
val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null
return ContentOutNameDeserializer().deserializeIfSuccessful(record.value())
}
fun getStreams(): MediaStreams? {
val record = getFirstOrNull(KafkaEvents.EVENT_READER_RECEIVED_STREAMS) ?: return null
return MediaStreamsDeserializer().deserializeIfSuccessful(record.value())
}
}