Updated check

This commit is contained in:
bskjon 2024-07-01 19:27:43 +02:00
parent 8ccefc1843
commit 46b053eaf0
4 changed files with 26 additions and 27 deletions

View File

@ -1,16 +1,13 @@
package no.iktdev.mediaprocessing.coordinator.utils
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage>): Boolean {
if (tasks.contains(TaskType.Encode)) {
if (tasks.any { it == TaskType.Encode }) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterEncodeCreated
) } == null) {
@ -18,23 +15,21 @@ fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage
}
}
if (tasks.contains(TaskType.Convert) && !tasks.contains(TaskType.Extract)) {
if (tasks.any { it == TaskType.Convert } && tasks.none {it == TaskType.Extract}) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventWorkConvertCreated
) } == null) {
return true
}
}
if (tasks.contains(TaskType.Extract)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterExtractCreated
) } == null) {
} else if (tasks.any { it == TaskType.Convert }) {
val extractEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }
if (extractEvent == null || extractEvent.isSuccess()) {
return true
}
}
if (tasks.contains(TaskType.Convert)) {
if (tasks.contains(TaskType.Extract)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterExtractCreated
) } == null) {
@ -86,6 +81,11 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
}
TaskType.Convert -> {
val extractEvents = events.findLast { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }
if (extractEvents == null || extractEvents.isSkipped()) {
false
} else {
val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated
val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed
@ -99,7 +99,7 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
)
}
trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
}
}
}
}

View File

@ -1,4 +1,4 @@
package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
package no.iktdev.mediaprocessing.coordinator.watcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay

View File

@ -1,4 +1,4 @@
package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
package no.iktdev.mediaprocessing.coordinator.watcher
import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
import dev.vishna.watchservice.KWatchEvent.Kind.Initialized

View File

@ -28,7 +28,6 @@ enum class KafkaEvents(val event: String) {
EventWorkConvertPerformed ("event:work-convert:performed"),
EventWorkDownloadCoverPerformed ("event:work-download-cover:performed"),
EVENT_STORE_VIDEO_PERFORMED ("event:store-video:performed"),
EVENT_STORE_SUBTITLE_PERFORMED ("event:store-subtitle:performed"),
EVENT_STORE_COVER_PERFORMED ("event:store-cover:performed"),