Migrated to shared codebase for coordinator

This commit is contained in:
Brage 2024-03-13 21:26:22 +01:00
parent d38003f7f9
commit 5c4e8f7de8
7 changed files with 114 additions and 159 deletions

View File

@ -4,14 +4,20 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@ -19,36 +25,16 @@ import javax.annotation.PostConstruct
@Service
@EnableScheduling
class Coordinator() {
@Autowired
private lateinit var producer: CoordinatorProducer
@Autowired
private lateinit var listener: DefaultMessageListener
class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
private val log = KotlinLogging.logger {}
val listeners = EventBasedMessageListener()
val io = Coroutines.io()
override val listeners = PersistentEventProcessBasedMessageListener()
fun readAllAvailableInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistance(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
}
fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List<PersistentProcessDataMessage>) {
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
messages: List<PersistentProcessDataMessage>
) {
val triggered = messages.find { it.eventId == eventId }
if (triggered == null) {
log.error { "Could not find $eventId in provided messages" }
@ -57,30 +43,44 @@ class Coordinator() {
listeners.forwardEventMessageToListeners(triggered, messages)
}
override fun onCoordinatorReady() {
readAllAvailableInQueue()
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
} else {
io.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
}
}
fun readAllAvailableInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
val processKafkaEvents = listOf(
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
)
@PostConstruct
fun onReady() {
io.launch {
listener.onMessageReceived = { event ->
if (event.key in processKafkaEvents) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" }
} else
readAllMessagesFor(event.value.referenceId, event.value.eventId)
} else if (event.key in listOf(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED, KafkaEvents.EVENT_WORK_ENCODE_SKIPPED)) {
readAllAvailableInQueue()
} else {
log.debug { "Skipping ${event.key}" }
}
}
listener.listen(KafkaEnv.kafkaTopic)
}
readAllAvailableInQueue()
}
}

View File

@ -1,37 +0,0 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
class EventBasedMessageListener {
private val listeners: MutableList<Tasks> = mutableListOf()
fun add(produces: KafkaEvents, listener: TaskCreatorListener) {
listeners.add(Tasks(produces, listener))
}
fun add(task: Tasks) {
listeners.add(task)
}
private fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
fun forwardEventMessageToListeners(newEvent: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>) {
waitingListeners(events).forEach {
try {
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
data class Tasks(
val producesEvent: KafkaEvents,
val taskHandler: TaskCreatorListener
)

View File

@ -2,7 +2,10 @@ package no.iktdev.mediaprocessing.processer
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -11,84 +14,35 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import javax.annotation.PostConstruct
abstract class TaskCreator: TaskCreatorListener {
private val log = KotlinLogging.logger {}
abstract class TaskCreator(coordinator: Coordinator) :
TaskCreatorImpl<Coordinator, PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>(coordinator) {
@Autowired
lateinit var producer: CoordinatorProducer
@Autowired
lateinit var coordinator: Coordinator
@Autowired
lateinit var socketMessage: SimpMessagingTemplate
open val requiredEvents: List<KafkaEvents> = listOf()
open fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
open fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
open fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
abstract fun getListener(): Tasks
/*override fun getListener(): Tasks {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}*/
open fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
private val context: MutableMap<String, Any> = mutableMapOf()
private val context_key_reference = "reference"
private val context_key_producesEvent = "event"
final override fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>) {
context[context_key_reference] = referenceId
getListener().producesEvent.let {
context[context_key_producesEvent] = it
}
if (prerequisitesRequired(events).all { it.invoke() }) {
val result = onProcessEvents(event, events)
if (result != null) {
log.info { "Event handled on ${this::class.simpleName} ${event.eventId} is: \nSOM\n${Gson().toJson(result)}\nEOM" }
onResult(result)
}
} else {
log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" }
}
}
abstract fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper?
private fun onResult(data: MessageDataWrapper) {
producer.sendMessage(
referenceId = context[context_key_reference] as String,
event = context[context_key_producesEvent] as KafkaEvents,
data = data
)
}
@PostConstruct
fun postConstruct() {
coordinator.listeners.add(getListener())
override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> {
return listOf()
}
}
fun interface Prerequisite {
fun execute(value: Any): Boolean
}
interface TaskCreatorListener {
fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): Unit
}

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing.processer.coordination
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class PersistentEventProcessBasedMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun listenerWantingEvent(
event: PersistentProcessDataMessage,
waitingListeners: List<Tasks<PersistentProcessDataMessage>>
): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -0,0 +1,11 @@
package no.iktdev.mediaprocessing.processer.coordination
/**
* Class to handle messages from websockets, produced by Processer instances.
* This is due to keep a overview of progress by processer
*/
class ProcesserSocketMessageListener {
}

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.Tasks
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
@ -19,17 +19,18 @@ import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class EncodeService: TaskCreator() {
class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.encodeLogDirectory
val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
override val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
@ -42,10 +43,6 @@ class EncodeService: TaskCreator() {
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_WORK_ENCODE_CREATED)
override fun getListener(): Tasks {
return Tasks(producesEvent, this)
}
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {

View File

@ -3,8 +3,8 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.Tasks
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
@ -20,18 +20,19 @@ import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class ExtractService: TaskCreator() {
class ExtractService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.extractLogDirectory
val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
override val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
val scope = Coroutines.io()
@ -42,9 +43,6 @@ class ExtractService: TaskCreator() {
init {
log.info { "Starting with id: $serviceId" }
}
override fun getListener(): Tasks {
return Tasks(producesEvent, this)
}
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)