Lib update test

This commit is contained in:
Brage 2023-07-19 03:02:52 +02:00
parent 894d8717c0
commit f245f14291
3 changed files with 42 additions and 50 deletions

View File

@ -23,7 +23,7 @@ repositories {
} }
dependencies { dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha23") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha26")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -3,3 +3,4 @@ rootProject.name = "Reader"
include(":CommonCode") include(":CommonCode")
project(":CommonCode").projectDir = File("../CommonCode") project(":CommonCode").projectDir = File("../CommonCode")
include(":streamit-library-kafka")

View File

@ -29,58 +29,49 @@ class StreamsReader {
init { init {
val listener = StreamReaderListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) object: SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
listener.listen()
/*object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
override fun onMessage(data: ConsumerRecord<String, Message>) { override fun onMessage(data: ConsumerRecord<String, Message>) {
logger.info { "RECORD: ${data.key()}" }
logger.info { "Active filters: ${this.accepts.joinToString(",") }}" }
if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
return
}
val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java)
if (dataValue == null) {
logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" }
return
}
logger.info { "Preparing Probe for ${dataValue.file}" }
val output = mutableListOf<String>()
val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object:
IDaemon {
override fun onOutputChanged(line: String) {
output.add(line)
}
override fun onStarted() {
logger.info { "Probe started for ${dataValue.file}" }
}
override fun onError() {
logger.error { "An error occurred for ${dataValue.file}" }
}
override fun onEnded() {
logger.info { "Probe ended for ${dataValue.file}" }
}
})
val resultCode = runBlocking {
d.run()
}
val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n"))
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
} }
}.listen()*/ }.listen()
} }
inner class StreamReaderListener(topic: String, consumer: DefaultConsumer, accepts: List<String>): SimpleMessageListener(topic = topic, consumer = consumer, accepts = accepts) {
override fun onMessage(data: ConsumerRecord<String, Message>) {
logger.info { "RECORD: ${data.key()}" }
logger.info { "Active filters: ${this.accepts.joinToString(",") }}" }
if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
return
}
val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java)
if (dataValue == null) {
logger.info { "Ignoring event: ${data.key()} as values is not of expected type!" }
return
}
logger.info { "Preparing Probe for ${dataValue.file}" }
val output = mutableListOf<String>()
val d = Daemon(executable = ReaderEnv.ffprobe, parameters = listOf("-v", "quiet", "-print_format", "json", "-show_streams", dataValue.file), daemonInterface = object:
IDaemon {
override fun onOutputChanged(line: String) {
output.add(line)
}
override fun onStarted() {
logger.info { "Probe started for ${dataValue.file}" }
}
override fun onError() {
logger.error { "An error occurred for ${dataValue.file}" }
}
override fun onEnded() {
logger.info { "Probe ended for ${dataValue.file}" }
}
})
val resultCode = runBlocking {
d.run()
}
val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n"))
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
}
}
} }