Autowired producer

This commit is contained in:
Brage 2023-12-10 21:30:39 +01:00
parent fd2483629a
commit d775f91c3e
9 changed files with 125 additions and 121 deletions

View File

@ -1,7 +1,17 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.stereotype.Component
/*class SocketImplemented: SocketImplementation() {
@Configuration
class SocketLocalInit: SocketImplementation()
}*/
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -44,6 +44,11 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
init {
io.launch {
watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
when (it.kind) {
Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved)
else -> {
@ -64,9 +69,9 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
}
override fun onFileAvailable(file: PendingFile) {
logger.info { "File pending availability ${file.file.name}" }
logger.info { "File available ${file.file.name}" }
// This sens it to coordinator to start the process
// This sends it to coordinator to start the process
coordinator.startProcess(file.file, ProcessType.FLOW)
}

View File

@ -1,24 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.kafka
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status
open class CoordinatorProducer(): DefaultProducer(SharedConfig.kafkaTopic) {
fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) {
super.sendMessage(event.event, Message(
referenceId = referenceId,
data = data
))
}
fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) {
super.sendMessage(event.event, Message(
referenceId = referenceId,
eventId = eventId,
data = data
))
}
}

View File

@ -0,0 +1,49 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
open class CoordinatorProducer() {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, String>
fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) {
send( KafkaEnv.kafkaTopic,
event.event, Message(
referenceId = referenceId,
data = data
)
)
}
fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) {
send( KafkaEnv.kafkaTopic,
event.event, Message(
referenceId = referenceId,
eventId = eventId,
data = data
)
)
}
open fun send(topic: String, key: String, message: Message<MessageDataWrapper>) {
val serializedMessage = serializeMessage(message)
try {
kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage))
} catch (e: Exception) {
e.printStackTrace()
}
}
private fun serializeMessage(message: Message<MessageDataWrapper>): String {
val gson = Gson()
return gson.toJson(message)
}
}

View File

@ -1,46 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties.AckMode
import kotlin.reflect.full.findAnnotation
import java.util.UUID
import kotlin.reflect.KClass
open class DefaultConsumer(val subId: String = UUID.randomUUID().toString()) {
val log = KotlinLogging.logger {}
var autoCommit: Boolean = true
var ackModeOverride: AckMode? = null
open fun consumerFactory(): DefaultKafkaConsumerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId"
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
}
fun consumerFactoryListener(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
ackModeOverride?.let {
factory.containerProperties.ackMode = it
}
return factory
}
}

View File

@ -1,43 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
open class DefaultProducer(val topic: String) {
private var kafkaTemplate: KafkaTemplate<String, String>? = null
open fun createKafkaTemplate(): KafkaTemplate<String, String> {
val producerFactory: ProducerFactory<String, String>
val config: MutableMap<String, Any> = HashMap()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
producerFactory = DefaultKafkaProducerFactory(config)
return KafkaTemplate(producerFactory)
}
open fun usingKafkaTemplate(): KafkaTemplate<String, String> {
return kafkaTemplate ?: createKafkaTemplate().also { kafkaTemplate = it }
}
open fun sendMessage(key: String, message: Message<MessageDataWrapper>) {
val kafkaTemplate = usingKafkaTemplate()
val serializedMessage = serializeMessage(message)
kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage))
}
private fun serializeMessage(message: Message<MessageDataWrapper>): String {
val gson = Gson()
return gson.toJson(message)
}
}

View File

@ -8,5 +8,8 @@ class KafkaEnv {
var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}"
var enabled: Boolean = System.getenv("KAFKA_ENABLED").toBoolean()
val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest"
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
}
}

View File

@ -34,8 +34,11 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
EVENT_PROCESS_COMPLETED("event:process:completed"),
}
fun toEvent(event: String): KafkaEvents? {
EVENT_PROCESS_COMPLETED("event:process:completed");
companion object {
fun toEvent(event: String): KafkaEvents? {
return KafkaEvents.entries.find { it.event == event }
}
}
}

View File

@ -0,0 +1,47 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.*
@Configuration
open class KafkaImplementation {
private val log = KotlinLogging.logger {}
@Bean
open fun admin() = KafkaAdmin(mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to KafkaEnv.servers
))
@Bean
open fun producerFactory(): ProducerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
log.info { config }
return DefaultKafkaProducerFactory(config)
}
@Bean
open fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ConsumerConfig.GROUP_ID_CONFIG] = KafkaEnv.consumerId
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages
log.info { config }
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
}
}