From fe6a3defdc6364b013220f9885cc03d318f1b7ae Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 18 Jul 2023 14:18:52 +0200 Subject: [PATCH] Updated reader --- Reader/Dockerfile | 2 ++ .../content/reader/streams/StreamsReader.kt | 15 +++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Reader/Dockerfile b/Reader/Dockerfile index 1c29eab6..639627ed 100644 --- a/Reader/Dockerfile +++ b/Reader/Dockerfile @@ -2,5 +2,7 @@ FROM bskjon/azuljava:17 EXPOSE 8080 RUN mkdir -p /src/input +RUN apt update -y +RUN apt install -y libav-tools COPY ./build/libs/reader.jar /usr/share/app/app.jar \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt index a07f5e13..27a07cdc 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/streams/StreamsReader.kt @@ -6,6 +6,7 @@ import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.reader.ReaderEnv +import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher import no.iktdev.streamit.library.kafka.KnownEvents import no.iktdev.streamit.library.kafka.KnownEvents.EVENT_READER_RECEIVED_FILE import no.iktdev.streamit.library.kafka.Message @@ -28,31 +29,33 @@ class StreamsReader { init { object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { override fun onMessage(data: ConsumerRecord) { + if (data.value().status.statusType != StatusType.SUCCESS) { logger.info { "Ignoring event: ${data.key()} as status is not Success!" } return - } else if (data.value().data !is String) { + } else if (data.value().data !is FileWatcher.FileResult) { logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" } return } - logger.info { "Preparing Probe for ${data.value().data}" } + val dataValue = data.value().data as FileWatcher.FileResult + logger.info { "Preparing Probe for ${dataValue.file}" } val output = mutableListOf() - val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", data.value().data as String), daemonInterface = object: + val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object: IDaemon { override fun onOutputChanged(line: String) { output.add(line) } override fun onStarted() { - logger.info { "Probe started for ${data.value().data}" } + logger.info { "Probe started for ${dataValue.file}" } } override fun onError() { - logger.error { "An error occurred for ${data.value().data}" } + logger.error { "An error occurred for ${dataValue.file}" } } override fun onEnded() { - logger.info { "Probe ended for ${data.value().data}" } + logger.info { "Probe ended for ${dataValue.file}" } } })