From 105410566c19dcd2bfab2b5d0b1170022368d5a2 Mon Sep 17 00:00:00 2001 From: Brage Date: Tue, 18 Jul 2023 01:41:38 +0200 Subject: [PATCH] Updated reader --- Reader/build.gradle.kts | 2 +- .../content/reader/fileWatcher/FileWatcher.kt | 26 +++++++++++++++++++ .../src/main/resources/application.properties | 3 ++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index f2cafc32..eb6f229c 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -23,7 +23,7 @@ repositories { } dependencies { - implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha11") + implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha12") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("com.github.pgreze:kotlin-process:1.3.1") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt index aea2f0c3..98bab9d5 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/fileWatcher/FileWatcher.kt @@ -1,5 +1,6 @@ package no.iktdev.streamit.content.reader.fileWatcher +import com.google.gson.Gson import dev.vishna.watchservice.KWatchEvent import dev.vishna.watchservice.asWatchChannel import kotlinx.coroutines.channels.consumeEach @@ -14,13 +15,18 @@ import no.iktdev.streamit.library.kafka.KnownEvents import no.iktdev.streamit.library.kafka.Message import no.iktdev.streamit.library.kafka.Status import no.iktdev.streamit.library.kafka.StatusType +import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer +import no.iktdev.streamit.library.kafka.listener.EventMessageListener import no.iktdev.streamit.library.kafka.producer.DefaultProducer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.stereotype.Service +import java.io.File private val logger = KotlinLogging.logger {} @Service class FileWatcher: FileWatcherEvents { val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) + val defaultConsumer = DefaultConsumer(subId = "0a") val queue = FileWatcherQueue() @@ -45,16 +51,36 @@ class FileWatcher: FileWatcherEvents { } } } + + object : EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) { + override fun onMessage(data: ConsumerRecord) { + if (data.value().status.statusType == StatusType.SUCCESS) { + if (data.value().data is String) { + val file = File(CommonConfig.incomingContent, data.value().data as String) + Coroutines.io().launch { + watcherChannel?.send(KWatchEvent( + file = file, + kind = KWatchEvent.Kind.Initialized, + tag = null + )) + } + } + } + } + + } } override fun onFileAvailable(file: PendingFile) { + logger.debug { "onFileAvailable har mottatt pendingFile ${file.file.name}" } val naming = Naming(file.file.nameWithoutExtension) val message = Message( referenceId = file.id, status = Status(StatusType.SUCCESS), data = FileResult(file = file.file.absolutePath, title = naming.guessDesiredTitle(), desiredNewName = naming.guessDesiredFileName()) ) + logger.debug { "Producing message: ${Gson().toJson(message)}" } messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event, message) } diff --git a/Reader/src/main/resources/application.properties b/Reader/src/main/resources/application.properties index 172f9ade..ea098386 100644 --- a/Reader/src/main/resources/application.properties +++ b/Reader/src/main/resources/application.properties @@ -1,2 +1,3 @@ spring.output.ansi.enabled=always -logging.level.org.apache.kafka=WARN \ No newline at end of file +logging.level.org.apache.kafka=WARN +logging.level.root=DEBUG