Updated reader

This commit is contained in:
Brage 2023-07-18 01:41:38 +02:00
parent d5d4542919
commit 105410566c
3 changed files with 29 additions and 2 deletions

View File

@ -23,7 +23,7 @@ repositories {
} }
dependencies { dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha11") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha12")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT") implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -1,5 +1,6 @@
package no.iktdev.streamit.content.reader.fileWatcher package no.iktdev.streamit.content.reader.fileWatcher
import com.google.gson.Gson
import dev.vishna.watchservice.KWatchEvent import dev.vishna.watchservice.KWatchEvent
import dev.vishna.watchservice.asWatchChannel import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.consumeEach
@ -14,13 +15,18 @@ import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.Message import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.Status import no.iktdev.streamit.library.kafka.Status
import no.iktdev.streamit.library.kafka.StatusType import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Service @Service
class FileWatcher: FileWatcherEvents { class FileWatcher: FileWatcherEvents {
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic) val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
val defaultConsumer = DefaultConsumer(subId = "0a")
val queue = FileWatcherQueue() val queue = FileWatcherQueue()
@ -45,16 +51,36 @@ class FileWatcher: FileWatcherEvents {
} }
} }
} }
object : EventMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) {
override fun onMessage(data: ConsumerRecord<String, Message>) {
if (data.value().status.statusType == StatusType.SUCCESS) {
if (data.value().data is String) {
val file = File(CommonConfig.incomingContent, data.value().data as String)
Coroutines.io().launch {
watcherChannel?.send(KWatchEvent(
file = file,
kind = KWatchEvent.Kind.Initialized,
tag = null
))
}
}
}
}
}
} }
override fun onFileAvailable(file: PendingFile) { override fun onFileAvailable(file: PendingFile) {
logger.debug { "onFileAvailable har mottatt pendingFile ${file.file.name}" }
val naming = Naming(file.file.nameWithoutExtension) val naming = Naming(file.file.nameWithoutExtension)
val message = Message( val message = Message(
referenceId = file.id, referenceId = file.id,
status = Status(StatusType.SUCCESS), status = Status(StatusType.SUCCESS),
data = FileResult(file = file.file.absolutePath, title = naming.guessDesiredTitle(), desiredNewName = naming.guessDesiredFileName()) data = FileResult(file = file.file.absolutePath, title = naming.guessDesiredTitle(), desiredNewName = naming.guessDesiredFileName())
) )
logger.debug { "Producing message: ${Gson().toJson(message)}" }
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event, message) messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event, message)
} }

View File

@ -1,2 +1,3 @@
spring.output.ansi.enabled=always spring.output.ansi.enabled=always
logging.level.org.apache.kafka=WARN logging.level.org.apache.kafka=WARN
logging.level.root=DEBUG