This commit is contained in:
bskjon 2024-06-30 23:55:19 +02:00
parent 8153d8e9b3
commit 13681fb696
5 changed files with 100 additions and 9 deletions

View File

@ -5,8 +5,10 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.EventCoordinator import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.* import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
@ -16,6 +18,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.support.TaskUtils
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
@ -92,10 +95,26 @@ class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) :
return null return null
} }
val taskEvents = startedData.operations.map {
when(it) {
StartOperationEvents.ENCODE -> TaskType.Encode
StartOperationEvents.EXTRACT -> TaskType.Extract
StartOperationEvents.CONVERT -> TaskType.Convert
}
}
val mapper = ProcessMapping(events) val isWaiting = taskEvents.map {
if (mapper.canCollect()) { isAwaitingTask(it, events)
}.any { it }
//val mapper = ProcessMapping(events)
//if (mapper.canCollect()) {
if (isWaiting) {
return ProcessCompleted(Status.COMPLETED, event.eventId) return ProcessCompleted(Status.COMPLETED, event.eventId)
} }
return null return null

View File

@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.az import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -39,7 +40,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato
sevent ?: event sevent ?: event
} else event } else event
forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.let { forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.takeIf { it.isSuccess() }?.let {
it.entries.forEach { argsGroup -> it.entries.forEach { argsGroup ->
val ffmpegTask = FfmpegTaskData( val ffmpegTask = FfmpegTaskData(
inputFile = it.inputFile, inputFile = it.inputFile,

View File

@ -0,0 +1,67 @@
package no.iktdev.mediaprocessing.coordinator.utils
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
return when (task) {
TaskType.Encode -> {
val argumentEvent = KafkaEvents.EventMediaParameterEncodeCreated
val taskCreatedEvent = KafkaEvents.EventWorkEncodeCreated
val taskCompletedEvent = KafkaEvents.EventWorkEncodePerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true
if (!argument.isSuccess()) return false
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
it.event in listOf(
argumentEvent,
taskCreatedEvent,
taskCompletedEvent
)
}
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
}
TaskType.Extract -> {
val argumentEvent = KafkaEvents.EventMediaParameterExtractCreated
val taskCreatedEvent = KafkaEvents.EventWorkExtractCreated
val taskCompletedEvent = KafkaEvents.EventWorkExtractPerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true
if (!argument.isSuccess()) return false
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
it.event in listOf(
argumentEvent,
taskCreatedEvent,
taskCompletedEvent
)
}
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
}
TaskType.Convert -> {
val argumentEvent = KafkaEvents.EventMediaParameterConvertCreated
val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated
val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true
if (!argument.isSuccess()) return false
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
it.event in listOf(
argumentEvent,
taskCreatedEvent,
taskCompletedEvent
)
}
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
}
}
}

View File

@ -58,21 +58,21 @@ class PersistentMessageHelper(val messages: List<PersistentMessage>) {
for (event in usableEvents) { for (event in usableEvents) {
derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId) derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId)
} }
val eventsToDelete = mutableSetOf<String>() val eventsToFind = mutableSetOf<String>()
// Utfør DFS for å finne alle avledede hendelser som skal slettes // Utfør DFS for å finne alle avledede hendelser som skal slettes
dfs(triggered.eventId, derivedEventsMap, eventsToDelete) dfs(triggered.eventId, derivedEventsMap, eventsToFind)
return messages.filter { it.eventId in eventsToDelete } return messages.filter { it.eventId in eventsToFind }
} }
/** /**
* @param eventId Initial eventId * @param eventId Initial eventId
*/ */
private fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToDelete: MutableSet<String>) { private fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToFind: MutableSet<String>) {
eventsToDelete.add(eventId) eventsToFind.add(eventId)
derivedEventsMap[eventId]?.forEach { derivedEventId -> derivedEventsMap[eventId]?.forEach { derivedEventId ->
dfs(derivedEventId, derivedEventsMap, eventsToDelete) dfs(derivedEventId, derivedEventsMap, eventsToFind)
} }
} }
} }

View File

@ -4,4 +4,8 @@ enum class Status {
SKIPPED, SKIPPED,
COMPLETED, COMPLETED,
ERROR ERROR
}
fun Status.isCompleted(): Boolean {
return this == Status.COMPLETED
} }