Changed behaviour

This commit is contained in:
Brage 2024-03-26 00:41:40 +01:00
parent 07111f7b98
commit 173859cffc
7 changed files with 101 additions and 59 deletions

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
@ -17,12 +18,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.stereotype.Service
@Service
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, EventBasedProcessMessageListener>() {
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
val io = Coroutines.io()
private val log = KotlinLogging.logger {}
override val listeners: EventBasedProcessMessageListener = EventBasedProcessMessageListener()
override val listeners = PersistentEventProcessBasedMessageListener()
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
@ -36,26 +37,12 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Eve
listeners.forwardEventMessageToListeners(triggeredMessage, messages)
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
fun readAllInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
override fun onCoordinatorReady() {
log.info { "Converter Coordinator is ready" }
readAllInQueue()
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
@ -69,7 +56,21 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Eve
} else {
log.debug { "Skipping ${event.key}" }
}
//log.info { Gson().toJson(event.value) }
}
fun readAllInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
}

View File

@ -0,0 +1,42 @@
package no.iktdev.mediaprocessing.converter
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class TaskCreator(coordinator: ConverterCoordinator) :
TaskCreatorImpl<ConverterCoordinator, PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>(coordinator) {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
/*override fun getListener(): Tasks {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}*/
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> {
return listOf()
}
}

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing.converter.coordination
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class PersistentEventProcessBasedMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun listenerWantingEvent(
event: PersistentProcessDataMessage,
waitingListeners: List<Tasks<PersistentProcessDataMessage>>
): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,29 +0,0 @@
package no.iktdev.mediaprocessing.converter.flow
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class ProcesserTaskCreator(coordinator: ConverterCoordinator):
TaskCreatorImpl<ConverterCoordinator, PersistentProcessDataMessage, EventBasedProcessMessageListener>(coordinator) {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events
.filter { e -> e.event in requiredEvents }
.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
}

View File

@ -5,8 +5,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.converter.TaskCreator
import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.converter.flow.ProcesserTaskCreator
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
@ -22,9 +22,10 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.util.*
private val log = KotlinLogging.logger {}
@Service
class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : ProcesserTaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : TaskCreator(coordinator) {
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {

View File

@ -0,0 +1,3 @@
spring.output.ansi.enabled=always
logging.level.org.apache.kafka=WARN
logging.level.root=INFO

View File

@ -1,18 +1,10 @@
package no.iktdev.mediaprocessing.processer
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import javax.annotation.PostConstruct
abstract class TaskCreator(coordinator: Coordinator) :
TaskCreatorImpl<Coordinator, PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>(coordinator) {