diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CompletedListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CompletedListener.kt new file mode 100644 index 00000000..7e1f4bc1 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/CompletedListener.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.coordinator.listeners.events + +import no.iktdev.eventi.events.EventListener +import no.iktdev.eventi.models.Event +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent +import org.springframework.stereotype.Component + +@Component +class CompletedListener: EventListener() { + override fun onEvent( + event: Event, + history: List + ): Event? { + if (event !is StoreContentAndMetadataTaskResultEvent) + return null + + + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index 82672f21..0563e686 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -1,12 +1,12 @@ package no.iktdev.mediaprocessing.coordinator.services import no.iktdev.eventi.ZDS.toEvent -import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent import no.iktdev.mediaprocessing.shared.common.dto.toDto +import no.iktdev.mediaprocessing.shared.common.effectivePersisted import no.iktdev.mediaprocessing.shared.database.stores.EventStore import org.springframework.stereotype.Service import java.util.* @@ -55,27 +55,11 @@ class EventService { } fun getEffectiveHistory(referenceId: UUID): List { - val persisted = EventStore.getPersistedEventsFor(referenceId) - - // Parse alle events (kan være null hvis ukjent type) - val parsed = persisted.mapNotNull { pe -> - pe.toEvent()?.let { ev -> pe to ev } - } - - // Finn alle eventIds som er slettet - val deletedIds = parsed - .map { it.second } - .filterIsInstance() - .map { it.deletedEventId } - .toSet() - - // Filtrer persisted basert på event-logikken - val filtered = parsed - .filter { (_, ev) -> ev.eventId !in deletedIds } // fjern slettede - .filter { (_, ev) -> ev !is DeleteEvent } // fjern selve DeleteEvent - .map { it.first } // behold kun PersistedEvent - return filtered.sortedByDescending { it.persistedAt } + return EventStore + .getPersistedEventsFor(referenceId) + .effectivePersisted() } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt index 4012770b..51e073ab 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt @@ -5,7 +5,9 @@ import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.mediaprocessing.shared.common.dto.CurrentState import no.iktdev.mediaprocessing.shared.common.dto.Mode import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary +import no.iktdev.mediaprocessing.shared.common.effective import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection import no.iktdev.mediaprocessing.shared.database.stores.EventStore @@ -23,7 +25,7 @@ class SequenceAggregatorService() { return grouped.values // aktive = ingen CollectedEvent - .filter { events -> events.none { it.event == CollectedEvent::class.java.simpleName } } + .filter { events -> events.none { it.event == CompletedEvent::class.java.simpleName } } .mapNotNull { events -> buildSummary(events) } .sortedByDescending { it.lastEventTime } } @@ -44,6 +46,7 @@ class SequenceAggregatorService() { // Deserialiser kun eventene for denne sekvensen val domainEvents = events.mapNotNull { it.toEvent() } + .effective() val projection = CollectProjection(domainEvents) @@ -62,6 +65,8 @@ class SequenceAggregatorService() { extractTaskStatus = projection.extreactTaskStatus, convertTaskStatus = projection.convertTaskStatus, coverDownloadTaskStatus = projection.coverDownloadTaskStatus, + contentMigratedTaskStatus = projection.contentMigratedTaskStatus, + contentStoredTaskStatus = projection.contentStoredTaskStatus, mode = when (projection.startedWith?.mode) { StartFlow.Auto -> Mode.Auto StartFlow.Manual -> Mode.Manual diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index b0da6f8a..300ef74f 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -2,7 +2,10 @@ package no.iktdev.mediaprocessing.shared.common import kotlinx.coroutines.delay import mu.KotlinLogging +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.web.client.RestTemplate import java.io.File @@ -216,4 +219,33 @@ fun File.resolveConflict(): File { return candidate } -fun UtcNow(): Instant = Instant.now() \ No newline at end of file +fun UtcNow(): Instant = Instant.now() + +fun List.effective(): List { + val deletedIds = this + .filterIsInstance() + .map { it.deletedEventId } + .toSet() + + return this + .filter { it.eventId !in deletedIds } + .filterNot { it is DeleteEvent } +} + +fun List.effectivePersisted(): List { + val parsed = this.mapNotNull { pe -> + pe.toEvent()?.let { ev -> pe to ev } + } + + val effectiveEvents = parsed + .map { it.second } + .effective() // bruker extension over + + val effectiveIds = effectiveEvents.map { it.eventId }.toSet() + + return parsed + .filter { (_, ev) -> ev.eventId in effectiveIds } + .map { it.first } + .sortedBy { it.persistedAt } +} + diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt index 7e6c6618..a777510f 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt @@ -15,6 +15,8 @@ data class SequenceSummary( val extractTaskStatus: CollectProjection.TaskStatus, val convertTaskStatus: CollectProjection.TaskStatus, val coverDownloadTaskStatus: CollectProjection.TaskStatus, + val contentMigratedTaskStatus: CollectProjection.TaskStatus, + val contentStoredTaskStatus: CollectProjection.TaskStatus, val mode: Mode, val currentState: CurrentState, val hasErrors: Boolean, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt index e0d4fab7..6dc5e5c1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt @@ -7,6 +7,7 @@ object EventRegistry { fun getEvents(): List> { return listOf( CollectedEvent::class.java, + CompletedEvent::class.java, ConvertTaskCreatedEvent::class.java, ConvertTaskResultEvent::class.java, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CompletedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CompletedEvent.kt new file mode 100644 index 00000000..dc9739e7 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CompletedEvent.kt @@ -0,0 +1,5 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract.events + +import no.iktdev.eventi.models.Event + +class CompletedEvent: Event() \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt index 615ea114..e96ec0f7 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt @@ -19,6 +19,10 @@ class CollectProjection(val events: List) { private set var coverDownloadTaskStatus: TaskStatus = TaskStatus.NotInitiated private set + var contentMigratedTaskStatus: TaskStatus = TaskStatus.NotInitiated + private set + var contentStoredTaskStatus: TaskStatus = TaskStatus.NotInitiated + private set val metadata: MetadataProjection? by lazy { projectMetadata() } val processedMedia: ProcessedMediaProjection? by lazy { projectProcessedMedia() } val parsedFileInfo: ParsedFileInfoProjection? by lazy { projectParsedFileInfo() } @@ -30,7 +34,8 @@ class CollectProjection(val events: List) { extreactTaskStatus = taskProjection.projectExtractSubtitleStatus() convertTaskStatus = taskProjection.projectConvertStatus() coverDownloadTaskStatus = taskProjection.projectCoverDownloadStatus() - + contentMigratedTaskStatus = taskProjection.projectMigrateContentStatus() + contentStoredTaskStatus = taskProjection.projectStoreContentAndMetadataStatus() } fun getTaskStatus(): List = listOf( diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt index 16c85116..11a55d2b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt @@ -90,6 +90,20 @@ class TaskProjection(val events: List) { } } + fun projectMigrateContentStatus(): TaskStatus { + return projectStatus( + createdIds = { it.map { e -> e.taskId }}, + resultStatus = { it.status }, + resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } } + ) + } + fun projectStoreContentAndMetadataStatus(): TaskStatus { + return projectStatus( + createdIds = { it.map { e -> e.taskId }}, + resultStatus = {it.taskStatus}, + resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } } + ) + } } \ No newline at end of file