From a13b949c9bd69e02c09fb0e222b58be9801e2b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sun, 1 Feb 2026 05:26:29 +0100 Subject: [PATCH] Adjusted Collection --- .../coordinator/ProcesserClient.kt | 6 +- ...Config.kt => ProcesserClientProperties.kt} | 15 +---- .../coordinator/config/WebClients.kt | 21 ++++++ .../coordinator/controller/LogController.kt | 21 ++++++ .../coordinator/controller/TaskController.kt | 18 ++++-- .../coordinator/dto/LogAssociatedIds.kt | 9 +++ .../translate/CoordinatorTaskTransferDto.kt | 9 ++- .../listeners/events/CollectEventsListener.kt | 38 ++++------- .../MediaCreateMetadataSearchTaskListener.kt | 12 ++-- .../listeners/events/StartedListener.kt | 3 +- .../coordinator/services/EventService.kt | 31 +++++++++ .../no/iktdev/mediaprocessing/TestBase.kt | 2 +- .../events/CollectEventsListenerTest.kt | 64 +++++++++++++------ .../event_task_contract/TaskResultEvent.kt | 3 +- .../events/ProcesserEncodeResultEvent.kt | 4 +- .../events/ProcesserExtractResultEvent.kt | 5 +- .../events/StartProcessingEvent.kt | 3 +- .../common/projection/CollectProjection.kt | 31 +++++++++ .../shared/database/stores/EventStore.kt | 11 ++++ 19 files changed, 229 insertions(+), 77 deletions(-) rename apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/{ProcesserClientConfig.kt => ProcesserClientProperties.kt} (61%) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/WebClients.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/LogController.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/LogAssociatedIds.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt index 0e14dbbc..8dfcdc30 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt @@ -6,17 +6,17 @@ import reactor.core.publisher.Mono @Component class ProcesserClient( - private val webClient: WebClient + private val processerWebClient: WebClient ) { fun fetchLog(path: String): Mono = - webClient.get() + processerWebClient.get() .uri { it.path("/state/log").queryParam("path", path).build() } .retrieve() .bodyToMono(String::class.java) fun ping(): Mono = - webClient.get() + processerWebClient.get() .uri("/actuator/health") .retrieve() .bodyToMono(String::class.java) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientProperties.kt similarity index 61% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientProperties.kt index 61c4dca9..bf7d0bb3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientProperties.kt @@ -5,21 +5,8 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.function.client.WebClient -@Configuration -class ProcesserWebClientConfig { - - @Bean - fun processerWebClient( - builder: WebClient.Builder, - props: ProcesserClientProperties - ): WebClient = - builder - .baseUrl(props.baseUrl) - .build() -} - - @ConfigurationProperties(prefix = "processer") data class ProcesserClientProperties( val baseUrl: String ) + diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/WebClients.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/WebClients.kt new file mode 100644 index 00000000..47b8144f --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/WebClients.kt @@ -0,0 +1,21 @@ +package no.iktdev.mediaprocessing.coordinator.config + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.web.reactive.function.client.WebClient + +@Configuration +class WebClients( + private val processerClientProperties: ProcesserClientProperties + +) { + @Bean + fun webClient(): WebClient.Builder = + WebClient + .builder() + .codecs { it.defaultCodecs().maxInMemorySize(10 * 1024 * 1024) } + @Bean + fun processerWebClient(builder: WebClient.Builder): WebClient { + return builder.baseUrl(processerClientProperties.baseUrl).build() + } +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/LogController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/LogController.kt new file mode 100644 index 00000000..2a1dc08f --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/LogController.kt @@ -0,0 +1,21 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.ProcesserClient +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam +import org.springframework.web.bind.annotation.RestController +import reactor.core.publisher.Mono + +@RestController +@RequestMapping("/log") +class LogController( + private val processerClient: ProcesserClient +) { + + @GetMapping + fun getLog(@RequestParam path: String): Mono { + return processerClient.fetchLog(path) + } +} + diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt index b6dd12f2..d4c623be 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -26,20 +26,28 @@ class TaskController( ) { @GetMapping("/active") - fun getActiveTasks(): List = - taskService.getActiveTasks().map { it.toCoordinatorTransferDto() } + fun getActiveTasks(): List { + val tasks = taskService.getActiveTasks() + val logEvents = eventService.getTaskEventResultsWithLogs(tasks.map { it.referenceId }.toSet()) + return tasks.map { it.toCoordinatorTransferDto(logEvents) } + } @GetMapping fun getPagedTasks(query: TaskQuery): Paginated { val paginatedTasks = taskService.getPagedTasks(query) - return paginatedTasks.map { it.toCoordinatorTransferDto() } + val logEvents = eventService.getTaskEventResultsWithLogs(paginatedTasks.items.map { it.referenceId }.toSet()) + + return paginatedTasks.map { it.toCoordinatorTransferDto(logEvents) } } @GetMapping("/{id}") - fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? = - taskService.getTaskById(id)?.toCoordinatorTransferDto() + fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? { + val tasks = taskService.getTaskById(id) ?: return null + val logEvents = eventService.getTaskEventResultsWithLogs(setOf(tasks.referenceId)) + return tasks.toCoordinatorTransferDto(logEvents) + } @GetMapping("/{taskId}/reset") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/LogAssociatedIds.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/LogAssociatedIds.kt new file mode 100644 index 00000000..13929f85 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/LogAssociatedIds.kt @@ -0,0 +1,9 @@ +package no.iktdev.mediaprocessing.coordinator.dto + +import java.util.* + +data class LogAssociatedIds( + val referenceId: UUID, + val ids: Set, + val logFile: String +) \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt index 483151d6..94251b06 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.dto.translate import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.coordinator.dto.LogAssociatedIds import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules import java.time.Instant import java.util.* @@ -17,11 +18,16 @@ data class CoordinatorTaskTransferDto( val consumed: Boolean, val lastCheckIn: Instant?, val persistedAt: Instant, + val logs: List = emptyList(), val abandoned: Boolean, ) { } -fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto { +fun PersistedTask.toCoordinatorTransferDto(logs: List): CoordinatorTaskTransferDto { + val matchingLogs = logs + .filter { log -> log.ids.contains(taskId) } + .map { it.logFile } + return CoordinatorTaskTransferDto( id = id, referenceId = referenceId, @@ -34,6 +40,7 @@ fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto { consumed = consumed, lastCheckIn = lastCheckIn, persistedAt = persistedAt, + logs = matchingLogs, abandoned = TaskLifecycleRules.isAbandoned(consumed, persistedAt, lastCheckIn) ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt index df395d11..6e921207 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListener.kt @@ -8,35 +8,25 @@ import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection import org.springframework.stereotype.Component @Component -class CollectEventsListener: EventListener() { +class CollectEventsListener : EventListener() { private val log = KotlinLogging.logger {} - val undesiredStates = listOf(CollectProjection.TaskStatus.Failed, CollectProjection.TaskStatus.Pending) - override fun onEvent( - event: Event, - history: List - ): Event? { - // Prevent Rouge trigger when replayed + override fun onEvent(event: Event, history: List): Event? { + // Avoid double-collection if (event is CollectedEvent || history.any { it is CollectedEvent }) return null - val collectProjection = CollectProjection(history) - log.info { collectProjection.prettyPrint() } + val projection = CollectProjection(history) - val taskStatus = collectProjection.getTaskStatus() - if (taskStatus.all { it == CollectProjection.TaskStatus.NotInitiated }) { - // No work has been done, so we are not ready - return null - } - val statusAcceptable = taskStatus.none { it in undesiredStates } - if (!statusAcceptable) { - if (taskStatus.any { it == CollectProjection.TaskStatus.Failed }) { - log.warn { "One or more tasks have failed in ${event.referenceId}" } - } else { - log.info { "One or more tasks are still pending in ${event.referenceId}" } - } - return null - } + // Must have a StartProcessingEvent + if (projection.startedWith == null) return null + + // Must be allowed to store (Auto or Manual + AllowCompletion) + if (!projection.isStorePermitted()) return null + + // Must have all relevant tasks completed + if (!projection.isWorkflowComplete()) return null return CollectedEvent(history.map { it.eventId }.toSet()).derivedOf(event) } -} \ No newline at end of file +} + diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateMetadataSearchTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateMetadataSearchTaskListener.kt index 37fc7bc2..5a35a69f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateMetadataSearchTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateMetadataSearchTaskListener.kt @@ -4,12 +4,9 @@ import no.iktdev.eventi.ListenerOrder import no.iktdev.eventi.events.EventListener import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchTaskCreatedEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MetadataSearchTask import no.iktdev.mediaprocessing.shared.database.stores.TaskStore - import org.jetbrains.annotations.VisibleForTesting import org.springframework.stereotype.Component import java.util.* @@ -30,6 +27,13 @@ class MediaCreateMetadataSearchTaskListener: EventListener() { event: Event, history: List ): Event? { + + val startedEvent = history.filterIsInstance().firstOrNull() ?: return null + if (startedEvent.data.operation.isNotEmpty()) { + if (!startedEvent.data.operation.contains(OperationType.Metadata)) + return null + } + // For replay if (event is MetadataSearchTaskCreatedEvent) { val hasResult = history.filter { it is MetadataSearchResultEvent } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StartedListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StartedListener.kt index b2180ae5..8bd3987f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StartedListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StartedListener.kt @@ -22,7 +22,8 @@ class StartedListener : EventListener() { operation = setOf( OperationType.ExtractSubtitles, OperationType.ConvertSubtitles, - OperationType.Encode + OperationType.Encode, + OperationType.Metadata ) ) ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index a3ca025a..d204b4a1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -2,11 +2,14 @@ package no.iktdev.mediaprocessing.coordinator.services import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.mediaprocessing.coordinator.dto.LogAssociatedIds import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent import no.iktdev.mediaprocessing.shared.common.dto.toDto import no.iktdev.mediaprocessing.shared.common.effectivePersisted +import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent import no.iktdev.mediaprocessing.shared.database.stores.EventStore import org.springframework.stereotype.Service import java.time.Instant @@ -77,4 +80,32 @@ class EventService { return EventStore.getDeletedSequences(referenceIds) } + val taskResultEventTypes: List = + EventRegistry.getEvents() + .filter { TaskResultEvent::class.java.isAssignableFrom(it) } + .map { it.simpleName } + + + fun getTaskEventResultsWithLogs(referenceIds: Set): List { + // 1. Hent persisted events som matcher TaskResultEvent-typene + val persisted = EventStore.getPersistedEventsFor(referenceIds, taskResultEventTypes) + + // 2. Deserialiser til domeneklasse + val domainEvents = persisted.map { it.toEvent() } + + // 3. Filtrer til TaskResultEvent-instansene som har logg + return domainEvents + .filterIsInstance() + .filter { it.logFile != null } + .map { + LogAssociatedIds( + referenceId = it.referenceId, + ids = setOf( it.eventId, *(it.metadata.derivedFromId?.toTypedArray() ?: emptyArray())), + logFile = it.logFile!! + ) + } + } + + + } \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt index b9034d53..32d20a00 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt @@ -46,7 +46,7 @@ open class TestBase { fun defaultStartEvent(): StartProcessingEvent { val start = StartProcessingEvent( data = StartData( - operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles, OperationType.ConvertSubtitles), + operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles, OperationType.ConvertSubtitles, OperationType.Metadata), fileUri = "file:///unit/${UUID.randomUUID()}.mkv" ) ) diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt index 6bfc89ae..80b6954c 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CollectEventsListenerTest.kt @@ -67,71 +67,97 @@ class CollectEventsListenerTest : TestBase() { @Test @DisplayName( """ - Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract - Når encode result kommer inn - Så: - Opprettes CollectEvent basert på historikken + Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract + Når encode result kommer inn + Så: + Opprettes CollectEvent basert på historikken """ ) fun success2() { val started = defaultStartEvent().let { ev -> - ev.copy(data = ev.data.copy(operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles))) + ev.copy( + data = ev.data.copy( + operation = setOf( + OperationType.Metadata, + OperationType.Encode, + OperationType.ExtractSubtitles + ) + ) + ) } + val parsed = mediaParsedEvent( collection = "MyCollection", fileName = "MyCollection 1", mediaType = MediaType.Movie ).derivedOf(started) + val metadata = metadataEvent(parsed).first() val encode = encodeEvent("/tmp/video.mp4", parsed) val history = listOf( started, parsed, + metadata, *encode.toTypedArray(), ) + val result = listener.onEvent(history.last(), history) - assertThat(result).isNotNull() - assertThat { - result is CollectedEvent - } + + assertThat(result).isNull() } + @Test @DisplayName( - """ + """ Hvis vi har kun convert hendelse - Når convert har komment inn + Når convert har kommet inn Så: Opprettes CollectEvent basert på historikken - """ + """ ) fun success3() { val started = defaultStartEvent().let { ev -> - ev.copy(data = ev.data.copy(operation = setOf(OperationType.ConvertSubtitles))) + ev.copy( + data = ev.data.copy( + operation = setOf( + OperationType.Metadata, + OperationType.ConvertSubtitles + ) + ) + ) } + val parsed = mediaParsedEvent( collection = "MyCollection", fileName = "MyCollection 1", mediaType = MediaType.Movie ).derivedOf(started) - val convert = encodeEvent("/tmp/fancy.srt", parsed) + val metadata = metadataEvent(parsed) + val convert = convertEvent( + language = "en", + baseName = "sub1", + outputFiles = listOf("/tmp/sub1.vtt"), + derivedFrom = parsed + ) val history = listOf( started, parsed, + *metadata.toTypedArray(), *convert.toTypedArray(), ) + val result = listener.onEvent(history.last(), history) - assertThat(result).isNotNull() - assertThat { - result is CollectedEvent - } + + assertThat(result).isInstanceOf(CollectedEvent::class.java) } + @Test @DisplayName( """ @@ -200,6 +226,8 @@ class CollectEventsListenerTest : TestBase() { assertThat(result).isNull() } + + @Test @DisplayName( """ diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt index f00cbf42..21dc7de3 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt @@ -8,5 +8,6 @@ import no.iktdev.eventi.models.store.TaskStatus */ open class TaskResultEvent( val status: TaskStatus, - val error: String? = null + val error: String? = null, + val logFile: String? = null ) : Event() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt index 28e58f8d..13d44b15 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt @@ -5,10 +5,10 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve class ProcesserEncodeResultEvent( val data: EncodeResult? = null, - val logFile: String? = null, + logFile: String? = null, status: TaskStatus, error: String? = null -) : TaskResultEvent(status, error) { +) : TaskResultEvent(status, error, logFile) { data class EncodeResult( val cachedOutputFile: String? = null ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt index 406c018d..8d840fe2 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt @@ -6,8 +6,9 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve class ProcesserExtractResultEvent( val data: ExtractResult? = null, status: TaskStatus, - error: String? = null -) : TaskResultEvent(status, error) { + error: String? = null, + logFile: String? = null, +) : TaskResultEvent(status, error, logFile) { data class ExtractResult( val language: String, val cachedOutputFile: String diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StartProcessingEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StartProcessingEvent.kt index bbcacccd..eb68b459 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StartProcessingEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StartProcessingEvent.kt @@ -22,5 +22,6 @@ enum class StartFlow { enum class OperationType { ExtractSubtitles, Encode, - ConvertSubtitles + ConvertSubtitles, + Metadata } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt index 467bf441..163ffb79 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt @@ -49,6 +49,37 @@ class CollectProjection(val events: List) { coverDownloadTaskStatus ) + fun getRelevantTaskStatuses(): List { + val required = startedWith?.tasks ?: emptySet() + + val statusMap = mapOf( + OperationType.Encode to encodeTaskStatus, + OperationType.ExtractSubtitles to extreactTaskStatus, + OperationType.ConvertSubtitles to convertTaskStatus, + OperationType.Metadata to metadataTaskStatus, + ) + + return required.map { statusMap[it] ?: TaskStatus.NotInitiated } + } + + fun isWorkflowComplete(): Boolean { + val statuses = getRelevantTaskStatuses() + + if (statuses.isEmpty()) return false + + val anyFailed = statuses.any { it == TaskStatus.Failed } + val anyPending = statuses.any { it == TaskStatus.Pending } + val allCompleted = statuses.all { it == TaskStatus.Completed } + + if (anyFailed) return false + if (anyPending) return false + + return allCompleted + } + + + + fun isStorePermitted(): Boolean { val start = events.filterIsInstance().firstOrNull() ?: return false // ingen start → ingen store diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt index f838b95f..19c59025 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt @@ -88,6 +88,17 @@ object EventStore: EventStore { return result.getOrDefault(emptyList()) } + fun getPersistedEventsFor(referenceId: Set, eventNames: List): List { + val deleted = getDeletedSequences(referenceId).map { it.toString() } + val result = withTransaction { + EventsTable + .getWhere { (EventsTable.referenceId eq referenceId.toString()) and + (EventsTable.referenceId notInList deleted.toList()) and + (EventsTable.event inList eventNames )} + } + return result.getOrDefault(emptyList()) + } + override fun persist(event: Event) { val asData = ZDS.WGson.toJson(event) val eventName = event::class.simpleName ?: run {