Removed old checking

This commit is contained in:
bskjon 2024-07-01 01:22:02 +02:00
parent 9ebcaacc38
commit 8ccefc1843
3 changed files with 55 additions and 3 deletions

View File

@ -5,6 +5,7 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.EventCoordinator import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingPrecondition
import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -80,12 +81,20 @@ class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) :
} }
} }
val isWaitingForPrecondition = isAwaitingPrecondition(taskEvents, events)
if (isWaitingForPrecondition) {
log.info { "Waiting for preconditions" }
return null
}
val isWaiting = taskEvents.map { val isWaiting = taskEvents.map {
isAwaitingTask(it, events) isAwaitingTask(it, events)
}.any { it } }.any { it }
//val mapper = ProcessMapping(events) //val mapper = ProcessMapping(events)

View File

@ -15,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.isOnly import no.iktdev.mediaprocessing.shared.contract.dto.isOnly
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.az import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
@ -38,6 +39,10 @@ class CreateConvertWorkTask(@Autowired override var coordinator: EventCoordinato
super.onProcessEventsAccepted(event, events) super.onProcessEventsAccepted(event, events)
val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az<MediaProcessStarted>() val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az<MediaProcessStarted>()
if (event.event == KafkaEvents.EventWorkExtractPerformed && !event.isSuccess()) {
return SimpleMessageData(status = Status.SKIPPED, "Extract failed, skipping..", derivedFromEventId = event.eventId)
}
val result = if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) && val result = if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) &&
event.data.az<MediaProcessStarted>()?.operations?.isOnly(StartOperationEvents.CONVERT) == true event.data.az<MediaProcessStarted>()?.operations?.isOnly(StartOperationEvents.CONVERT) == true
) { ) {

View File

@ -4,9 +4,49 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage>): Boolean {
if (tasks.contains(TaskType.Encode)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterEncodeCreated
) } == null) {
return true
}
}
if (tasks.contains(TaskType.Convert) && !tasks.contains(TaskType.Extract)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventWorkConvertCreated
) } == null) {
return true
}
}
if (tasks.contains(TaskType.Extract)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterExtractCreated
) } == null) {
return true
}
}
if (tasks.contains(TaskType.Convert)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterExtractCreated
) } == null) {
return true
}
}
return false
}
fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean { fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
return when (task) { return when (task) {
TaskType.Encode -> { TaskType.Encode -> {
@ -46,16 +86,14 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
} }
TaskType.Convert -> { TaskType.Convert -> {
val argumentEvent = KafkaEvents.EventMediaParameterConvertCreated
val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated
val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true val argument = events.findLast { it.event == taskCreatedEvent } ?: return true
if (!argument.isSuccess()) return false if (!argument.isSuccess()) return false
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
it.event in listOf( it.event in listOf(
argumentEvent,
taskCreatedEvent, taskCreatedEvent,
taskCompletedEvent taskCompletedEvent
) )