Projection correction

This commit is contained in:
Brage Skjønborg 2026-01-30 15:09:58 +01:00
parent 6a423365cd
commit 6735ab54a5
9 changed files with 91 additions and 24 deletions

View File

@ -0,0 +1,19 @@
package no.iktdev.mediaprocessing.coordinator.listeners.events
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent
import org.springframework.stereotype.Component
@Component
class CompletedListener: EventListener() {
override fun onEvent(
event: Event,
history: List<Event>
): Event? {
if (event !is StoreContentAndMetadataTaskResultEvent)
return null
}
}

View File

@ -1,12 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.services package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.Paginated
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
import no.iktdev.mediaprocessing.shared.common.dto.toDto import no.iktdev.mediaprocessing.shared.common.dto.toDto
import no.iktdev.mediaprocessing.shared.common.effectivePersisted
import no.iktdev.mediaprocessing.shared.database.stores.EventStore import no.iktdev.mediaprocessing.shared.database.stores.EventStore
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.util.* import java.util.*
@ -55,27 +55,11 @@ class EventService {
} }
fun getEffectiveHistory(referenceId: UUID): List<PersistedEvent> { fun getEffectiveHistory(referenceId: UUID): List<PersistedEvent> {
val persisted = EventStore.getPersistedEventsFor(referenceId) return EventStore
.getPersistedEventsFor(referenceId)
// Parse alle events (kan være null hvis ukjent type) .effectivePersisted()
val parsed = persisted.mapNotNull { pe ->
pe.toEvent()?.let { ev -> pe to ev }
}
// Finn alle eventIds som er slettet
val deletedIds = parsed
.map { it.second }
.filterIsInstance<DeleteEvent>()
.map { it.deletedEventId }
.toSet()
// Filtrer persisted basert på event-logikken
val filtered = parsed
.filter { (_, ev) -> ev.eventId !in deletedIds } // fjern slettede
.filter { (_, ev) -> ev !is DeleteEvent } // fjern selve DeleteEvent
.map { it.first } // behold kun PersistedEvent
return filtered.sortedByDescending { it.persistedAt }
} }
} }

View File

@ -5,7 +5,9 @@ import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.mediaprocessing.shared.common.dto.CurrentState import no.iktdev.mediaprocessing.shared.common.dto.CurrentState
import no.iktdev.mediaprocessing.shared.common.dto.Mode import no.iktdev.mediaprocessing.shared.common.dto.Mode
import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary
import no.iktdev.mediaprocessing.shared.common.effective
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
import no.iktdev.mediaprocessing.shared.database.stores.EventStore import no.iktdev.mediaprocessing.shared.database.stores.EventStore
@ -23,7 +25,7 @@ class SequenceAggregatorService() {
return grouped.values return grouped.values
// aktive = ingen CollectedEvent // aktive = ingen CollectedEvent
.filter { events -> events.none { it.event == CollectedEvent::class.java.simpleName } } .filter { events -> events.none { it.event == CompletedEvent::class.java.simpleName } }
.mapNotNull { events -> buildSummary(events) } .mapNotNull { events -> buildSummary(events) }
.sortedByDescending { it.lastEventTime } .sortedByDescending { it.lastEventTime }
} }
@ -44,6 +46,7 @@ class SequenceAggregatorService() {
// Deserialiser kun eventene for denne sekvensen // Deserialiser kun eventene for denne sekvensen
val domainEvents = events.mapNotNull { it.toEvent() } val domainEvents = events.mapNotNull { it.toEvent() }
.effective()
val projection = CollectProjection(domainEvents) val projection = CollectProjection(domainEvents)
@ -62,6 +65,8 @@ class SequenceAggregatorService() {
extractTaskStatus = projection.extreactTaskStatus, extractTaskStatus = projection.extreactTaskStatus,
convertTaskStatus = projection.convertTaskStatus, convertTaskStatus = projection.convertTaskStatus,
coverDownloadTaskStatus = projection.coverDownloadTaskStatus, coverDownloadTaskStatus = projection.coverDownloadTaskStatus,
contentMigratedTaskStatus = projection.contentMigratedTaskStatus,
contentStoredTaskStatus = projection.contentStoredTaskStatus,
mode = when (projection.startedWith?.mode) { mode = when (projection.startedWith?.mode) {
StartFlow.Auto -> Mode.Auto StartFlow.Auto -> Mode.Auto
StartFlow.Manual -> Mode.Manual StartFlow.Manual -> Mode.Manual

View File

@ -2,7 +2,10 @@ package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.web.client.RestTemplate import org.springframework.web.client.RestTemplate
import java.io.File import java.io.File
@ -216,4 +219,33 @@ fun File.resolveConflict(): File {
return candidate return candidate
} }
fun UtcNow(): Instant = Instant.now() fun UtcNow(): Instant = Instant.now()
fun List<Event>.effective(): List<Event> {
val deletedIds = this
.filterIsInstance<DeleteEvent>()
.map { it.deletedEventId }
.toSet()
return this
.filter { it.eventId !in deletedIds }
.filterNot { it is DeleteEvent }
}
fun List<PersistedEvent>.effectivePersisted(): List<PersistedEvent> {
val parsed = this.mapNotNull { pe ->
pe.toEvent()?.let { ev -> pe to ev }
}
val effectiveEvents = parsed
.map { it.second }
.effective() // bruker extension over
val effectiveIds = effectiveEvents.map { it.eventId }.toSet()
return parsed
.filter { (_, ev) -> ev.eventId in effectiveIds }
.map { it.first }
.sortedBy { it.persistedAt }
}

View File

@ -15,6 +15,8 @@ data class SequenceSummary(
val extractTaskStatus: CollectProjection.TaskStatus, val extractTaskStatus: CollectProjection.TaskStatus,
val convertTaskStatus: CollectProjection.TaskStatus, val convertTaskStatus: CollectProjection.TaskStatus,
val coverDownloadTaskStatus: CollectProjection.TaskStatus, val coverDownloadTaskStatus: CollectProjection.TaskStatus,
val contentMigratedTaskStatus: CollectProjection.TaskStatus,
val contentStoredTaskStatus: CollectProjection.TaskStatus,
val mode: Mode, val mode: Mode,
val currentState: CurrentState, val currentState: CurrentState,
val hasErrors: Boolean, val hasErrors: Boolean,

View File

@ -7,6 +7,7 @@ object EventRegistry {
fun getEvents(): List<Class<out Event>> { fun getEvents(): List<Class<out Event>> {
return listOf( return listOf(
CollectedEvent::class.java, CollectedEvent::class.java,
CompletedEvent::class.java,
ConvertTaskCreatedEvent::class.java, ConvertTaskCreatedEvent::class.java,
ConvertTaskResultEvent::class.java, ConvertTaskResultEvent::class.java,

View File

@ -0,0 +1,5 @@
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
import no.iktdev.eventi.models.Event
class CompletedEvent: Event()

View File

@ -19,6 +19,10 @@ class CollectProjection(val events: List<Event>) {
private set private set
var coverDownloadTaskStatus: TaskStatus = TaskStatus.NotInitiated var coverDownloadTaskStatus: TaskStatus = TaskStatus.NotInitiated
private set private set
var contentMigratedTaskStatus: TaskStatus = TaskStatus.NotInitiated
private set
var contentStoredTaskStatus: TaskStatus = TaskStatus.NotInitiated
private set
val metadata: MetadataProjection? by lazy { projectMetadata() } val metadata: MetadataProjection? by lazy { projectMetadata() }
val processedMedia: ProcessedMediaProjection? by lazy { projectProcessedMedia() } val processedMedia: ProcessedMediaProjection? by lazy { projectProcessedMedia() }
val parsedFileInfo: ParsedFileInfoProjection? by lazy { projectParsedFileInfo() } val parsedFileInfo: ParsedFileInfoProjection? by lazy { projectParsedFileInfo() }
@ -30,7 +34,8 @@ class CollectProjection(val events: List<Event>) {
extreactTaskStatus = taskProjection.projectExtractSubtitleStatus() extreactTaskStatus = taskProjection.projectExtractSubtitleStatus()
convertTaskStatus = taskProjection.projectConvertStatus() convertTaskStatus = taskProjection.projectConvertStatus()
coverDownloadTaskStatus = taskProjection.projectCoverDownloadStatus() coverDownloadTaskStatus = taskProjection.projectCoverDownloadStatus()
contentMigratedTaskStatus = taskProjection.projectMigrateContentStatus()
contentStoredTaskStatus = taskProjection.projectStoreContentAndMetadataStatus()
} }
fun getTaskStatus(): List<TaskStatus> = listOf( fun getTaskStatus(): List<TaskStatus> = listOf(

View File

@ -90,6 +90,20 @@ class TaskProjection(val events: List<Event>) {
} }
} }
fun projectMigrateContentStatus(): TaskStatus {
return projectStatus<MigrateContentToStoreTaskCreatedEvent, MigrateContentToStoreTaskResultEvent>(
createdIds = { it.map { e -> e.taskId }},
resultStatus = { it.status },
resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } }
)
}
fun projectStoreContentAndMetadataStatus(): TaskStatus {
return projectStatus<StoreContentAndMetadataTaskCreatedEvent, StoreContentAndMetadataTaskResultEvent>(
createdIds = { it.map { e -> e.taskId }},
resultStatus = {it.taskStatus},
resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } }
)
}
} }