From ce9e1c8271af3cb73a168ee2fb9960859670283c Mon Sep 17 00:00:00 2001 From: Brage Date: Wed, 19 Jul 2023 01:36:22 +0200 Subject: [PATCH] Lib update + test iml --- Reader/build.gradle.kts | 2 +- .../content/reader/streams/StreamsReader.kt | 97 +++++++++++-------- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 4e14e9bd..fc8f829c 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha21") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha22") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index d64d6bb4..8664fc32 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -15,6 +15,7 @@ import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer import no.iktdev.streamit.library.kafka.listener.EventMessageListener +import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.producer.DefaultProducer import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service @@ -24,52 +25,62 @@ private val logger = KotlinLogging.logger {} class StreamsReader { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) - val defaultConsumer = DefaultConsumer(subId = "streamReader").apply { - // autoCommit = false - } + val defaultConsumer = DefaultConsumer(subId = "streamReader") + + init { - object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { + val listener = StreamReaderListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) + listener.listen() + + /*object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { override fun onMessage(data: ConsumerRecord) { - logger.info { "RECORD: ${data.key()}" } - logger.info { "Active filters: ${this.acceptsEvents.joinToString(",") }}" } - if (data.value().status.statusType != StatusType.SUCCESS) { - logger.info { "Ignoring event: ${data.key()} as status is not Success!" } - return - } - val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java) - if (dataValue == null) { - logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" } - return - } - logger.info { "Preparing Probe for ${dataValue.file}" } - val output = mutableListOf() - val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object: - IDaemon { - override fun onOutputChanged(line: String) { - output.add(line) - } - - override fun onStarted() { - logger.info { "Probe started for ${dataValue.file}" } - } - - override fun onError() { - logger.error { "An error occurred for ${dataValue.file}" } - } - - override fun onEnded() { - logger.info { "Probe ended for ${dataValue.file}" } - } - - }) - val resultCode = runBlocking { - d.run() - } - - val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n")) - messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message) } - }.listen() + }.listen()*/ + } + + inner class StreamReaderListener(topic: String, consumer: DefaultConsumer, accepts: List): SimpleMessageListener(topic = topic, consumer = consumer, accepts = accepts) { + override fun onMessage(data: ConsumerRecord) { + logger.info { "RECORD: ${data.key()}" } + logger.info { "Active filters: ${this.accepts.joinToString(",") }}" } + if (data.value().status.statusType != StatusType.SUCCESS) { + logger.info { "Ignoring event: ${data.key()} as status is not Success!" } + return + } + val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java) + + if (dataValue == null) { + logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" } + return + } + logger.info { "Preparing Probe for ${dataValue.file}" } + val output = mutableListOf() + val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object: + IDaemon { + override fun onOutputChanged(line: String) { + output.add(line) + } + + override fun onStarted() { + logger.info { "Probe started for ${dataValue.file}" } + } + + override fun onError() { + logger.error { "An error occurred for ${dataValue.file}" } + } + + override fun onEnded() { + logger.info { "Probe ended for ${dataValue.file}" } + } + + }) + val resultCode = runBlocking { + d.run() + } + + val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n")) + messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message) + } + } } \ No newline at end of file