From d775f91c3eed6d65564ee8e5986549e26851e800 Mon Sep 17 00:00:00 2001 From: Brage Date: Sun, 10 Dec 2023 21:30:39 +0100 Subject: [PATCH] Autowired producer --- .../coordinator/Implementations.kt | 14 +++++- .../input/watcher/InputDirectoryWatcher.kt | 9 +++- .../common/kafka/CoordinatorProducer.kt | 24 --------- .../shared/kafka/core/CoordinatorProducer.kt | 49 +++++++++++++++++++ .../shared/kafka/core/DefaultConsumer.kt | 46 ----------------- .../shared/kafka/core/DefaultProducer.kt | 43 ---------------- .../shared/kafka/core/KafkaEnv.kt | 3 ++ .../shared/kafka/core/KafkaEvents.kt | 11 +++-- .../shared/kafka/core/KafkaImplementation.kt | 47 ++++++++++++++++++ 9 files changed, 125 insertions(+), 121 deletions(-) delete mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt create mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/CoordinatorProducer.kt delete mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultConsumer.kt delete mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultProducer.kt create mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt index 14f8fe69..9212a982 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt @@ -1,7 +1,17 @@ package no.iktdev.mediaprocessing.coordinator import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import +import org.springframework.stereotype.Component -/*class SocketImplemented: SocketImplementation() { +@Configuration +class SocketLocalInit: SocketImplementation() -}*/ \ No newline at end of file +@Configuration +@Import(CoordinatorProducer::class, DefaultMessageListener::class) +class KafkaLocalInit: KafkaImplementation() { +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt index 31887f26..1ee60ca6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt @@ -44,6 +44,11 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche init { io.launch { watcherChannel.consumeEach { + if (it.file == SharedConfig.incomingContent) { + logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } + } else { + logger.info { "IO Event: ${it.kind}: ${it.file.name}" } + } when (it.kind) { Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved) else -> { @@ -64,9 +69,9 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche } override fun onFileAvailable(file: PendingFile) { - logger.info { "File pending availability ${file.file.name}" } + logger.info { "File available ${file.file.name}" } - // This sens it to coordinator to start the process + // This sends it to coordinator to start the process coordinator.startProcess(file.file, ProcessType.FLOW) } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt deleted file mode 100644 index 62dac523..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt +++ /dev/null @@ -1,24 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.kafka - -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.Message -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.streamit.library.kafka.dto.Status - -open class CoordinatorProducer(): DefaultProducer(SharedConfig.kafkaTopic) { - fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) { - super.sendMessage(event.event, Message( - referenceId = referenceId, - data = data - )) - } - fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) { - super.sendMessage(event.event, Message( - referenceId = referenceId, - eventId = eventId, - data = data - )) - } -} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/CoordinatorProducer.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/CoordinatorProducer.kt new file mode 100644 index 00000000..380f1894 --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/CoordinatorProducer.kt @@ -0,0 +1,49 @@ +package no.iktdev.mediaprocessing.shared.kafka.core + +import com.google.gson.Gson +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.apache.kafka.clients.producer.ProducerRecord +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component + +@Component +open class CoordinatorProducer() { + + @Autowired + lateinit var kafkaTemplate: KafkaTemplate + + fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) { + send( KafkaEnv.kafkaTopic, + event.event, Message( + referenceId = referenceId, + data = data + ) + ) + } + + fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) { + send( KafkaEnv.kafkaTopic, + event.event, Message( + referenceId = referenceId, + eventId = eventId, + data = data + ) + ) + } + + open fun send(topic: String, key: String, message: Message) { + val serializedMessage = serializeMessage(message) + try { + kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage)) + } catch (e: Exception) { + e.printStackTrace() + } + } + + private fun serializeMessage(message: Message): String { + val gson = Gson() + return gson.toJson(message) + } +} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultConsumer.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultConsumer.kt deleted file mode 100644 index dbbc6d0d..00000000 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultConsumer.kt +++ /dev/null @@ -1,46 +0,0 @@ -package no.iktdev.mediaprocessing.shared.kafka.core - -import com.google.gson.Gson -import com.google.gson.reflect.TypeToken -import mu.KotlinLogging -import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams -import no.iktdev.mediaprocessing.shared.kafka.dto.Message -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.StringDeserializer -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.listener.ContainerProperties.AckMode -import kotlin.reflect.full.findAnnotation -import java.util.UUID -import kotlin.reflect.KClass - -open class DefaultConsumer(val subId: String = UUID.randomUUID().toString()) { - val log = KotlinLogging.logger {} - - var autoCommit: Boolean = true - var ackModeOverride: AckMode? = null - - open fun consumerFactory(): DefaultKafkaConsumerFactory { - val config: MutableMap = HashMap() - config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers - config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId" - config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit - config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages - - return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) - - } - - fun consumerFactoryListener(): ConcurrentKafkaListenerContainerFactory { - val factory = ConcurrentKafkaListenerContainerFactory() - factory.consumerFactory = consumerFactory() - ackModeOverride?.let { - factory.containerProperties.ackMode = it - } - return factory - } -} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultProducer.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultProducer.kt deleted file mode 100644 index 6c9b662c..00000000 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultProducer.kt +++ /dev/null @@ -1,43 +0,0 @@ -package no.iktdev.mediaprocessing.shared.kafka.core - -import com.google.gson.Gson -import no.iktdev.mediaprocessing.shared.kafka.dto.Message -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory - -open class DefaultProducer(val topic: String) { - private var kafkaTemplate: KafkaTemplate? = null - - - open fun createKafkaTemplate(): KafkaTemplate { - val producerFactory: ProducerFactory - - val config: MutableMap = HashMap() - config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers - config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - - producerFactory = DefaultKafkaProducerFactory(config) - return KafkaTemplate(producerFactory) - } - - open fun usingKafkaTemplate(): KafkaTemplate { - return kafkaTemplate ?: createKafkaTemplate().also { kafkaTemplate = it } - } - - open fun sendMessage(key: String, message: Message) { - val kafkaTemplate = usingKafkaTemplate() - val serializedMessage = serializeMessage(message) - kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage)) - } - - private fun serializeMessage(message: Message): String { - val gson = Gson() - return gson.toJson(message) - } -} diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt index bdf01c91..50a24e31 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt @@ -8,5 +8,8 @@ class KafkaEnv { var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}" var enabled: Boolean = System.getenv("KAFKA_ENABLED").toBoolean() val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest" + + var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" + } } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 2b071c83..792c747b 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -34,8 +34,11 @@ enum class KafkaEvents(val event: String) { EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), - EVENT_PROCESS_COMPLETED("event:process:completed"), -} -fun toEvent(event: String): KafkaEvents? { - return KafkaEvents.entries.find { it.event == event } + EVENT_PROCESS_COMPLETED("event:process:completed"); + + companion object { + fun toEvent(event: String): KafkaEvents? { + return KafkaEvents.entries.find { it.event == event } + } + } } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt new file mode 100644 index 00000000..2d7ce3aa --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt @@ -0,0 +1,47 @@ +package no.iktdev.mediaprocessing.shared.kafka.core + +import mu.KotlinLogging +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.* + +@Configuration +open class KafkaImplementation { + private val log = KotlinLogging.logger {} + + @Bean + open fun admin() = KafkaAdmin(mapOf( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to KafkaEnv.servers + )) + + @Bean + open fun producerFactory(): ProducerFactory { + val config: MutableMap = HashMap() + config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers + config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + log.info { config } + return DefaultKafkaProducerFactory(config) + } + @Bean + open fun kafkaTemplate(): KafkaTemplate { + return KafkaTemplate(producerFactory()) + } + + @Bean + open fun consumerFactory(): ConsumerFactory { + val config: MutableMap = HashMap() + config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers + config[ConsumerConfig.GROUP_ID_CONFIG] = KafkaEnv.consumerId + config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages + log.info { config } + return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) + } +} \ No newline at end of file