diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt index fb07e981..b0e530d0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt @@ -54,4 +54,31 @@ class SequenceController( } } + @PostMapping("/{referenceId}/delete") + fun deleteSequences( + @PathVariable referenceId: UUID + ): ResponseEntity { + return try { + + val id = EventStore.deleteSequence(referenceId) + + ResponseEntity.ok( + ApiResponse( + ok = true, + message = "Sequence deleted, Event id for deletion marking is $id" + ) + ) + + } catch (ex: Exception) { + ResponseEntity + .status(HttpStatus.INTERNAL_SERVER_ERROR) + .body( + ApiResponse( + ok = false, + message = ex.message ?: "Unknown error" + ) + ) + } + } + } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt index f5c21c2a..b6dd12f2 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -1,10 +1,10 @@ package no.iktdev.mediaprocessing.coordinator.controller -import no.iktdev.mediaprocessing.coordinator.services.EventService -import no.iktdev.mediaprocessing.coordinator.services.TaskService import no.iktdev.mediaprocessing.coordinator.dto.translate.CoordinatorTaskTransferDto import no.iktdev.mediaprocessing.coordinator.dto.translate.toCoordinatorTransferDto +import no.iktdev.mediaprocessing.coordinator.services.EventService +import no.iktdev.mediaprocessing.coordinator.services.TaskService import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse @@ -48,6 +48,9 @@ class TaskController( ?: return ResponseEntity.notFound().build() val referenceId = task.referenceId + if (eventService.isSequenceDeleted(referenceId)) { + return ResponseEntity.status(HttpStatus.METHOD_NOT_ALLOWED).build() + } // 1. Opprett DeleteEvent val deletedId = eventService.deleteTaskFailureForReset(referenceId, taskId) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index 081fd140..a3ca025a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -70,5 +70,11 @@ class EventService { } fun getLastEventTimestamp(): Instant? = EventStore.getLastEventTimestamp() + fun isSequenceDeleted(referenceId: UUID): Boolean { + return EventStore.isEventSequenceDeleted(referenceId) + } + fun getDeletedSequences(referenceIds: Set): Set { + return EventStore.getDeletedSequences(referenceIds) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt index 52de885d..dedf8e6f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt @@ -15,15 +15,20 @@ import org.springframework.stereotype.Service import java.time.Instant @Service -class SequenceAggregatorService() { +class SequenceAggregatorService( + private val eventService: EventService +) { fun getActiveSequences(): List { val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH) // Gruppér først, deserialiser senere val grouped = allEvents.groupBy { it.referenceId } + val deleted = eventService.getDeletedSequences(grouped.keys) - return grouped.values + return grouped + .filterNot { (referenceId, _) -> referenceId in deleted } + .values // aktive = ingen CollectedEvent .filter { events -> events.none { it.event == CompletedEvent::class.java.simpleName } } .mapNotNull { events -> buildSummary(events) } @@ -34,8 +39,11 @@ class SequenceAggregatorService() { val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH) val grouped = allEvents.groupBy { it.referenceId } + val deleted = eventService.getDeletedSequences(grouped.keys) - return grouped.values + return grouped + .filterNot { (referenceId, _) -> referenceId in deleted } + .values .mapNotNull { events -> buildSummary(events) } .sortedByDescending { it.lastEventTime } .take(limit) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt index 6d07276b..46d8b736 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt @@ -9,19 +9,24 @@ import java.util.* @Service -class TaskService { +class TaskService( + private val eventService: EventService +) { fun getActiveTasks(): List { - return TaskStore.findActiveTasks() + return getNonDeleted(TaskStore.findActiveTasks()) } fun getPagedTasks(page: TaskQuery): Paginated { - return TaskStore.getPagedTasks(page) + return TaskStore.getPagedTasks(page).let { + it.copy(items = getNonDeleted(it.items)) + } } fun getTaskById(taskId: UUID): PersistedTask? { - return TaskStore.findByTaskId(taskId) + val task = TaskStore.findByTaskId(taskId) ?: return null + return if (eventService.isSequenceDeleted(task.referenceId)) null else task } fun resetFailedTask(taskId: UUID): Boolean { @@ -30,6 +35,20 @@ class TaskService { } fun getFailedTasks(): List { - return TaskStore.getFailedTasks() + return getNonDeleted(TaskStore.getFailedTasks()) } + + private fun getNonDeleted(tasks: List): List { + if (tasks.isEmpty()) return emptyList() + + // 1. Finn alle referenceId i batchen + val referenceIds = tasks.map { it.referenceId }.toSet() + + // 2. Slå opp alle slettede sekvenser i ett kall + val deleted = eventService.getDeletedSequences(referenceIds) + + // 3. Filtrer bort tasks som tilhører slettede sekvenser + return tasks.filterNot { it.referenceId in deleted } + } + } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt index a82016d1..d4433b82 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt @@ -18,6 +18,7 @@ object EventRegistry { CoverDownloadTaskCreatedEvent::class.java, CoverDownloadResultEvent::class.java, + DeleteSequenceEvent::class.java, DeletedTaskResultEvent::class.java, ForcedTaskResetAuditEvent::class.java, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeleteSequenceEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeleteSequenceEvent.kt new file mode 100644 index 00000000..272fb6fc --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeleteSequenceEvent.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract.events + +import no.iktdev.eventi.models.SignalEvent + +class DeleteSequenceEvent: SignalEvent() { +} \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt index af5534fa..f838b95f 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt @@ -9,16 +9,14 @@ import no.iktdev.eventi.stores.EventStore import no.iktdev.mediaprocessing.shared.common.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.Paginated -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ManualAllowCompletionEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* import no.iktdev.mediaprocessing.shared.common.getName import no.iktdev.mediaprocessing.shared.database.likeAny import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.EventsTable import no.iktdev.mediaprocessing.shared.database.withTransaction import org.jetbrains.exposed.sql.SortOrder +import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insert import java.time.Instant import java.time.temporal.ChronoUnit @@ -74,15 +72,15 @@ object EventStore: EventStore { override fun getPersistedEventsAfter(timestamp: Instant): List { - val result = withTransaction { + return withTransaction { EventsTable.getWhere { EventsTable.persistedAt greater timestamp } - } - return result.getOrDefault(emptyList()) + }.getOrDefault(emptyList()) } override fun getPersistedEventsFor(referenceId: UUID): List { + if (isEventSequenceDeleted(referenceId)) return emptyList() val result = withTransaction { EventsTable .getWhere { EventsTable.referenceId eq referenceId.toString()} @@ -164,7 +162,8 @@ object EventStore: EventStore { EventsTable.getWhere { EventsTable.referenceId notInList completedReferences } - }.getOrDefault(emptyList()) + }.getOrDefault(emptyList()).groupBy { it.referenceId } + .filterNot { (referenceId, _) -> isEventSequenceDeleted(referenceId) }.values.flatten() } fun getLastEventTimestamp(): Instant? { @@ -176,4 +175,33 @@ object EventStore: EventStore { ?.get(EventsTable.persistedAt) }.getOrDefault(null) } + + fun isEventSequenceDeleted(referenceId: UUID): Boolean { + return withTransaction { + EventsTable.getWhere { + (EventsTable.referenceId eq referenceId.toString()) and + (EventsTable.event eq DeleteSequenceEvent::class.getName()) + }.count() > 0 + }.getOrDefault(false) + } + + fun getDeletedSequences(referenceIds: Set): Set { + if (referenceIds.isEmpty()) return emptySet() + return withTransaction { + EventsTable.select(EventsTable.referenceId) + .where { + (EventsTable.referenceId inList referenceIds.map { it.toString() }) and + (EventsTable.event eq DeleteSequenceEvent::class.getName()) + } + .map { UUID.fromString(it[EventsTable.referenceId]) } + .toSet() + }.getOrDefault(emptySet()) + } + + fun deleteSequence(referenceId: UUID): UUID { + val deleteSequenceEvent = DeleteSequenceEvent().usingReferenceId(referenceId) + persist(deleteSequenceEvent) + return deleteSequenceEvent.eventId + } + } \ No newline at end of file