This commit is contained in:
Brage 2023-07-19 17:37:16 +02:00
parent 609ef6f41a
commit c5e4823d3a
2 changed files with 8 additions and 3 deletions

View File

@ -23,7 +23,7 @@ repositories {
} }
dependencies { dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha33") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha34")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -1,5 +1,6 @@
package no.iktdev.streamit.content.reader.analyzer package no.iktdev.streamit.content.reader.analyzer
import mu.KotlinLogging
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.streams.MediaStreams import no.iktdev.streamit.content.common.streams.MediaStreams
import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector
@ -16,6 +17,8 @@ import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
private val logger = KotlinLogging.logger {}
@Service @Service
class EncodedStreams { class EncodedStreams {
@ -34,12 +37,14 @@ class EncodedStreams {
deserializers = EncodedDeserializers().getDeserializers(), deserializers = EncodedDeserializers().getDeserializers(),
) { ) {
override fun areAllMessagesPresent(currentEvents: List<String>): Boolean { override fun areAllMessagesPresent(currentEvents: List<String>): Boolean {
val expected = val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event) val waitingFor = expected.filter { !currentEvents.contains(it) }
logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" }
return expected.containsAll(currentEvents) return expected.containsAll(currentEvents)
} }
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) { override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
logger.info { "All messages are received" }
val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event] val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event]
if (baseMessage == null) { if (baseMessage == null) {
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!") produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "No base message found!")