With upate from lib
This commit is contained in:
parent
63423666c7
commit
ea685568ff
@ -23,7 +23,7 @@ repositories {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha26")
|
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha28")
|
||||||
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
|
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
|
||||||
|
|
||||||
implementation("com.github.pgreze:kotlin-process:1.3.1")
|
implementation("com.github.pgreze:kotlin-process:1.3.1")
|
||||||
|
|||||||
@ -1,6 +1,4 @@
|
|||||||
rootProject.name = "Reader"
|
rootProject.name = "Reader"
|
||||||
|
|
||||||
include(":CommonCode")
|
include(":CommonCode")
|
||||||
project(":CommonCode").projectDir = File("../CommonCode")
|
project(":CommonCode").projectDir = File("../CommonCode")
|
||||||
|
|
||||||
include(":streamit-library-kafka")
|
|
||||||
@ -17,6 +17,7 @@ import no.iktdev.streamit.library.kafka.dto.Status
|
|||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
|
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
|
||||||
|
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
@ -52,9 +53,8 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object : EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) {
|
object : SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) {
|
||||||
override fun onMessage(data: ConsumerRecord<String, Message>) {
|
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||||
|
|
||||||
if (data.value().status.statusType == StatusType.SUCCESS) {
|
if (data.value().status.statusType == StatusType.SUCCESS) {
|
||||||
if (data.value().data is String) {
|
if (data.value().data is String) {
|
||||||
val file = File(CommonConfig.incomingContent, data.value().data as String)
|
val file = File(CommonConfig.incomingContent, data.value().data as String)
|
||||||
@ -68,7 +68,6 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -29,10 +29,9 @@ class StreamsReader {
|
|||||||
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
object: SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(EVENT_READER_RECEIVED_FILE.event)) {
|
object: SimpleMessageListener(topic = CommonConfig.kafkaTopic, consumer = defaultConsumer, accepts = listOf(EVENT_READER_RECEIVED_FILE.event)) {
|
||||||
override fun onMessage(data: ConsumerRecord<String, Message>) {
|
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||||
logger.info { "RECORD: ${data.key()}" }
|
logger.info { "RECORD: ${data.key()}" }
|
||||||
logger.info { "Active filters: ${this.accepts.joinToString(",") }}" }
|
|
||||||
if (data.value().status.statusType != StatusType.SUCCESS) {
|
if (data.value().status.statusType != StatusType.SUCCESS) {
|
||||||
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
|
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
|
||||||
return
|
return
|
||||||
@ -72,20 +71,6 @@ class StreamsReader {
|
|||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
|
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun filter(consumerRecord: ConsumerRecord<String, Message>): Boolean {
|
|
||||||
return shouldIgnoreMessageO(consumerRecord)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun shouldIgnoreMessageO(consumerRecord: ConsumerRecord<String, Message>): Boolean {
|
|
||||||
logger.info { "Consumer filter validating against ${consumerRecord.key()}" }
|
|
||||||
if (consumerRecord.key().isNullOrBlank()) {
|
|
||||||
logger.info { "Consumer event is null or blank, ignoring!" }
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
val isEventAccepted = accepts.any { it == consumerRecord.key() }
|
|
||||||
logger.info { "Consumer event is ${if (isEventAccepted) "found" else "not found"} within accepts" }
|
|
||||||
return !isEventAccepted
|
|
||||||
}
|
|
||||||
}.listen()
|
}.listen()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user