Lib update + test iml

This commit is contained in:
Brage 2023-07-19 01:36:22 +02:00
parent b0830d8cb4
commit ce9e1c8271
2 changed files with 55 additions and 44 deletions

View File

@ -23,7 +23,7 @@ repositories {
}
dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha21")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha22")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -15,6 +15,7 @@ import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.stereotype.Service
@ -24,14 +25,24 @@ private val logger = KotlinLogging.logger {}
class StreamsReader {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer(subId = "streamReader").apply {
// autoCommit = false
}
val defaultConsumer = DefaultConsumer(subId = "streamReader")
init {
object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
val listener = StreamReaderListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event))
listener.listen()
/*object: EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
override fun onMessage(data: ConsumerRecord<String, Message>) {
}
}.listen()*/
}
inner class StreamReaderListener(topic: String, consumer: DefaultConsumer, accepts: List<String>): SimpleMessageListener(topic = topic, consumer = consumer, accepts = accepts) {
override fun onMessage(data: ConsumerRecord<String, Message>) {
logger.info { "RECORD: ${data.key()}" }
logger.info { "Active filters: ${this.acceptsEvents.joinToString(",") }}" }
logger.info { "Active filters: ${this.accepts.joinToString(",") }}" }
if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
return
@ -70,6 +81,6 @@ class StreamsReader {
val message = Message(status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n"))
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
}
}.listen()
}
}