Updated reader

This commit is contained in:
Brage 2023-07-18 14:18:52 +02:00
parent 14bbb1bc71
commit fe6a3defdc
2 changed files with 11 additions and 6 deletions

View File

@ -2,5 +2,7 @@ FROM bskjon/azuljava:17
EXPOSE 8080 EXPOSE 8080
RUN mkdir -p /src/input RUN mkdir -p /src/input
RUN apt update -y
RUN apt install -y libav-tools
COPY ./build/libs/reader.jar /usr/share/app/app.jar COPY ./build/libs/reader.jar /usr/share/app/app.jar

View File

@ -6,6 +6,7 @@ import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.Daemon
import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.deamon.IDaemon
import no.iktdev.streamit.content.reader.ReaderEnv import no.iktdev.streamit.content.reader.ReaderEnv
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.KnownEvents.EVENT_READER_RECEIVED_FILE import no.iktdev.streamit.library.kafka.KnownEvents.EVENT_READER_RECEIVED_FILE
import no.iktdev.streamit.library.kafka.Message import no.iktdev.streamit.library.kafka.Message
@ -28,31 +29,33 @@ class StreamsReader {
init { init {
object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) { object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
override fun onMessage(data: ConsumerRecord<String, Message>) { override fun onMessage(data: ConsumerRecord<String, Message>) {
if (data.value().status.statusType != StatusType.SUCCESS) { if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" } logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
return return
} else if (data.value().data !is String) { } else if (data.value().data !is FileWatcher.FileResult) {
logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" } logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" }
return return
} }
logger.info { "Preparing Probe for ${data.value().data}" } val dataValue = data.value().data as FileWatcher.FileResult
logger.info { "Preparing Probe for ${dataValue.file}" }
val output = mutableListOf<String>() val output = mutableListOf<String>()
val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", data.value().data as String), daemonInterface = object: val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object:
IDaemon { IDaemon {
override fun onOutputChanged(line: String) { override fun onOutputChanged(line: String) {
output.add(line) output.add(line)
} }
override fun onStarted() { override fun onStarted() {
logger.info { "Probe started for ${data.value().data}" } logger.info { "Probe started for ${dataValue.file}" }
} }
override fun onError() { override fun onError() {
logger.error { "An error occurred for ${data.value().data}" } logger.error { "An error occurred for ${dataValue.file}" }
} }
override fun onEnded() { override fun onEnded() {
logger.info { "Probe ended for ${data.value().data}" } logger.info { "Probe ended for ${dataValue.file}" }
} }
}) })