From 7088f0221b519fd41f77bf5511e6dc63a59b6f21 Mon Sep 17 00:00:00 2001 From: bskjon Date: Thu, 11 Apr 2024 22:08:46 +0200 Subject: [PATCH] Updated --- .../coordinator/mapping/ProcessMapping.kt | 20 +++++++++++++------ .../tasks/event/CollectAndStoreTask.kt | 8 ++++---- .../tasks/event/CompleteMediaTask.kt | 2 +- .../tasks/event/CompleteRequestTask.kt | 2 +- .../tasks/event/DownloadAndStoreCoverTask.kt | 9 +++++++-- .../iktdev/mediaprocessing/ui/Coordinator.kt | 6 +++--- .../persistance/PersistentDataReader.kt | 2 +- .../persistance/PersistentEventManager.kt | 11 +++------- .../kafka/core/DeserializingRegistry.kt | 2 +- .../shared/kafka/core/KafkaEvents.kt | 12 +++++------ .../dto/events_result/ProcessCompleted.kt | 2 +- 11 files changed, 42 insertions(+), 34 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt index b84c0830..d7f304ed 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt @@ -2,6 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.mapping import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped +import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto @@ -33,8 +34,10 @@ class ProcessMapping(val events: List) { val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterEncodeCreated } val created = events.filter { it.event == KafkaEvents.EventWorkEncodeCreated} - val performed = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed } - val isSkipped = events.filter { it.isSkipped() } + val performedEvents = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed } + + val performed = performedEvents.filter { it.isSuccess() } + val isSkipped = performedEvents.filter { it.isSkipped() } return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size } @@ -44,16 +47,21 @@ class ProcessMapping(val events: List) { val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterExtractCreated }.filter { it.data.isSuccess() } val created = events.filter { it.event == KafkaEvents.EventWorkExtractCreated } - val performed = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed } - val isSkipped = events.filter { it.isSkipped() } + val performedEvents = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed } + + val performed = performedEvents.filter { it.isSuccess() } + val isSkipped = performedEvents.filter { it.isSkipped() } + return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size } fun waitsForConvert(): Boolean { val created = events.filter { it.event == KafkaEvents.EventWorkConvertCreated } - val performed = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed } - val isSkipped = events.filter { it.isSkipped() } + val performedEvents = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed } + + val performed = performedEvents.filter { it.isSuccess() } + val isSkipped = performedEvents.filter { it.isSkipped() } return created.size > performed.size + isSkipped.size } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index e24422d4..1c01bc67 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -30,11 +30,11 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta val log = KotlinLogging.logger {} - override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE + override val producesEvent: KafkaEvents = KafkaEvents.EventCollectAndStore override val requiredEvents: List = listOf( EventMediaProcessStarted, - EVENT_MEDIA_PROCESS_COMPLETED + EventMediaProcessCompleted ) override val listensForEvents: List = KafkaEvents.entries @@ -42,8 +42,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null - val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null - if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { + val completed = events.lastOrSuccessOf(EventMediaProcessCompleted) ?: return null + if (!started.data.isSuccess() || !completed.data.isSuccess()) { return null } val mapped = ProcessMapping(events).map() ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 0e23d45d..9c01693a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -19,7 +19,7 @@ import org.springframework.stereotype.Service class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val log = KotlinLogging.logger {} - override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED + override val producesEvent: KafkaEvents = KafkaEvents.EventMediaProcessCompleted override val requiredEvents: List = listOf( EventMediaProcessStarted, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt index cd291b83..ce871a79 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt @@ -19,7 +19,7 @@ import org.springframework.stereotype.Service class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val log = KotlinLogging.logger {} - override val producesEvent: KafkaEvents = KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED + override val producesEvent: KafkaEvents = KafkaEvents.EventRequestProcessCompleted override val requiredEvents: List = listOf( EVENT_REQUEST_PROCESS_STARTED, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 771b7e01..9caa7c25 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -58,11 +58,14 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator var message: String? = null + var status = Status.COMPLETED val result = if (outFile?.exists() == true) { message = "${outFile.name} already exists" + status = Status.SKIPPED outFile } else if (coversInDifferentFormats.isNotEmpty()) { - coversInDifferentFormats.random() + status = Status.SKIPPED + coversInDifferentFormats.random() } else if (outFile != null) { runBlocking { client.download(outFile) @@ -74,7 +77,9 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator return if (result == null) { SimpleMessageData(Status.ERROR, "Could not download cover, check logs", event.eventId) } else { - val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR + if (!result.exists() || !result.canRead()) { + status = Status.ERROR + } CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath, event.eventId) } } diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt index 3edde598..8e176538 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt @@ -58,10 +58,10 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo } private fun getCurrentState(events: List, processes: Map): SummaryState { - val stored = events.findLast { it.event == KafkaEvents.EVENT_COLLECT_AND_STORE } + val stored = events.findLast { it.event == KafkaEvents.EventCollectAndStore } val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted } - val completedMediaEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED } - val completedRequestEvent = events.findLast { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED } + val completedMediaEvent = events.findLast { it.event == KafkaEvents.EventMediaProcessCompleted } + val completedRequestEvent = events.findLast { it.event == KafkaEvents.EventRequestProcessCompleted } if (stored != null && stored.data.isSuccess()) { return SummaryState.Completed diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index 40973623..2db8ca9a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -33,7 +33,7 @@ class PersistentDataReader(var dataSource: DataSource) { fun getUncompletedMessages(): List> { val result = withDirtyRead(dataSource.database) { events.selectAll() - .andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event } + .andWhere { events.event neq KafkaEvents.EventMediaProcessCompleted.event } .groupBy { it[events.referenceId] } .mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } } ?: emptyList() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index b6f06a86..e53f755a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -1,8 +1,6 @@ package no.iktdev.mediaprocessing.shared.common.persistance -import kotlinx.coroutines.launch import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.datasource.* import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents @@ -11,10 +9,7 @@ import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.javatime.CurrentDateTime -import java.sql.SQLIntegrityConstraintViolationException import java.time.LocalDateTime -import javax.xml.crypto.Data -import kotlin.coroutines.coroutineContext private val log = KotlinLogging.logger {} @@ -97,9 +92,9 @@ class PersistentEventManager(private val dataSource: DataSource) { fun getEventsUncompleted(): List> { val identifiesAsCompleted = listOf( - KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED, - KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED, - KafkaEvents.EVENT_COLLECT_AND_STORE + KafkaEvents.EventRequestProcessCompleted, + KafkaEvents.EventMediaProcessCompleted, + KafkaEvents.EventCollectAndStore ) val all = getAllEventsGrouped() return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index 47a136ca..b0c6b904 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -39,7 +39,7 @@ class DeserializingRegistry { KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java, - KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java + KafkaEvents.EventMediaProcessCompleted to ProcessCompleted::class.java ) } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 7177df5b..2ef6fa6f 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -36,9 +36,9 @@ enum class KafkaEvents(val event: String) { EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), - EVENT_MEDIA_PROCESS_COMPLETED("event:media-process:completed"), - EVENT_REQUEST_PROCESS_COMPLETED("event:request-process:completed"), - EVENT_COLLECT_AND_STORE("event::save"); + EventMediaProcessCompleted("event:media-process:completed"), + EventRequestProcessCompleted("event:request-process:completed"), + EventCollectAndStore("event::save"); companion object { fun toEvent(event: String): KafkaEvents? { @@ -60,9 +60,9 @@ enum class KafkaEvents(val event: String) { fun isOfFinalize(event: KafkaEvents): Boolean { return event in listOf( - EVENT_MEDIA_PROCESS_COMPLETED, - EVENT_REQUEST_PROCESS_COMPLETED, - EVENT_COLLECT_AND_STORE + EventMediaProcessCompleted, + EventRequestProcessCompleted, + EventCollectAndStore ) } } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt index 92ab2f04..ff5cc64e 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt @@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED) +@KafkaBelongsToEvent(KafkaEvents.EventMediaProcessCompleted) data class ProcessCompleted( override val status: Status, override val derivedFromEventId: String?