Hiding deleted
This commit is contained in:
parent
a49661736b
commit
f00c45d38b
@ -54,4 +54,31 @@ class SequenceController(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/{referenceId}/delete")
|
||||||
|
fun deleteSequences(
|
||||||
|
@PathVariable referenceId: UUID
|
||||||
|
): ResponseEntity<ApiResponse> {
|
||||||
|
return try {
|
||||||
|
|
||||||
|
val id = EventStore.deleteSequence(referenceId)
|
||||||
|
|
||||||
|
ResponseEntity.ok(
|
||||||
|
ApiResponse(
|
||||||
|
ok = true,
|
||||||
|
message = "Sequence deleted, Event id for deletion marking is $id"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
ResponseEntity
|
||||||
|
.status(HttpStatus.INTERNAL_SERVER_ERROR)
|
||||||
|
.body(
|
||||||
|
ApiResponse(
|
||||||
|
ok = false,
|
||||||
|
message = ex.message ?: "Unknown error"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.controller
|
package no.iktdev.mediaprocessing.coordinator.controller
|
||||||
|
|
||||||
|
|
||||||
import no.iktdev.mediaprocessing.coordinator.services.EventService
|
|
||||||
import no.iktdev.mediaprocessing.coordinator.services.TaskService
|
|
||||||
import no.iktdev.mediaprocessing.coordinator.dto.translate.CoordinatorTaskTransferDto
|
import no.iktdev.mediaprocessing.coordinator.dto.translate.CoordinatorTaskTransferDto
|
||||||
import no.iktdev.mediaprocessing.coordinator.dto.translate.toCoordinatorTransferDto
|
import no.iktdev.mediaprocessing.coordinator.dto.translate.toCoordinatorTransferDto
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.services.EventService
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.services.TaskService
|
||||||
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
|
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse
|
import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse
|
||||||
@ -48,6 +48,9 @@ class TaskController(
|
|||||||
?: return ResponseEntity.notFound().build()
|
?: return ResponseEntity.notFound().build()
|
||||||
|
|
||||||
val referenceId = task.referenceId
|
val referenceId = task.referenceId
|
||||||
|
if (eventService.isSequenceDeleted(referenceId)) {
|
||||||
|
return ResponseEntity.status(HttpStatus.METHOD_NOT_ALLOWED).build()
|
||||||
|
}
|
||||||
|
|
||||||
// 1. Opprett DeleteEvent
|
// 1. Opprett DeleteEvent
|
||||||
val deletedId = eventService.deleteTaskFailureForReset(referenceId, taskId)
|
val deletedId = eventService.deleteTaskFailureForReset(referenceId, taskId)
|
||||||
|
|||||||
@ -70,5 +70,11 @@ class EventService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun getLastEventTimestamp(): Instant? = EventStore.getLastEventTimestamp()
|
fun getLastEventTimestamp(): Instant? = EventStore.getLastEventTimestamp()
|
||||||
|
fun isSequenceDeleted(referenceId: UUID): Boolean {
|
||||||
|
return EventStore.isEventSequenceDeleted(referenceId)
|
||||||
|
}
|
||||||
|
fun getDeletedSequences(referenceIds: Set<UUID>): Set<UUID> {
|
||||||
|
return EventStore.getDeletedSequences(referenceIds)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -15,15 +15,20 @@ import org.springframework.stereotype.Service
|
|||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class SequenceAggregatorService() {
|
class SequenceAggregatorService(
|
||||||
|
private val eventService: EventService
|
||||||
|
) {
|
||||||
|
|
||||||
fun getActiveSequences(): List<SequenceSummary> {
|
fun getActiveSequences(): List<SequenceSummary> {
|
||||||
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
||||||
|
|
||||||
// Gruppér først, deserialiser senere
|
// Gruppér først, deserialiser senere
|
||||||
val grouped = allEvents.groupBy { it.referenceId }
|
val grouped = allEvents.groupBy { it.referenceId }
|
||||||
|
val deleted = eventService.getDeletedSequences(grouped.keys)
|
||||||
|
|
||||||
return grouped.values
|
return grouped
|
||||||
|
.filterNot { (referenceId, _) -> referenceId in deleted }
|
||||||
|
.values
|
||||||
// aktive = ingen CollectedEvent
|
// aktive = ingen CollectedEvent
|
||||||
.filter { events -> events.none { it.event == CompletedEvent::class.java.simpleName } }
|
.filter { events -> events.none { it.event == CompletedEvent::class.java.simpleName } }
|
||||||
.mapNotNull { events -> buildSummary(events) }
|
.mapNotNull { events -> buildSummary(events) }
|
||||||
@ -34,8 +39,11 @@ class SequenceAggregatorService() {
|
|||||||
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
||||||
|
|
||||||
val grouped = allEvents.groupBy { it.referenceId }
|
val grouped = allEvents.groupBy { it.referenceId }
|
||||||
|
val deleted = eventService.getDeletedSequences(grouped.keys)
|
||||||
|
|
||||||
return grouped.values
|
return grouped
|
||||||
|
.filterNot { (referenceId, _) -> referenceId in deleted }
|
||||||
|
.values
|
||||||
.mapNotNull { events -> buildSummary(events) }
|
.mapNotNull { events -> buildSummary(events) }
|
||||||
.sortedByDescending { it.lastEventTime }
|
.sortedByDescending { it.lastEventTime }
|
||||||
.take(limit)
|
.take(limit)
|
||||||
|
|||||||
@ -9,19 +9,24 @@ import java.util.*
|
|||||||
|
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class TaskService {
|
class TaskService(
|
||||||
|
private val eventService: EventService
|
||||||
|
) {
|
||||||
|
|
||||||
|
|
||||||
fun getActiveTasks(): List<PersistedTask> {
|
fun getActiveTasks(): List<PersistedTask> {
|
||||||
return TaskStore.findActiveTasks()
|
return getNonDeleted(TaskStore.findActiveTasks())
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getPagedTasks(page: TaskQuery): Paginated<PersistedTask> {
|
fun getPagedTasks(page: TaskQuery): Paginated<PersistedTask> {
|
||||||
return TaskStore.getPagedTasks(page)
|
return TaskStore.getPagedTasks(page).let {
|
||||||
|
it.copy(items = getNonDeleted(it.items))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getTaskById(taskId: UUID): PersistedTask? {
|
fun getTaskById(taskId: UUID): PersistedTask? {
|
||||||
return TaskStore.findByTaskId(taskId)
|
val task = TaskStore.findByTaskId(taskId) ?: return null
|
||||||
|
return if (eventService.isSequenceDeleted(task.referenceId)) null else task
|
||||||
}
|
}
|
||||||
|
|
||||||
fun resetFailedTask(taskId: UUID): Boolean {
|
fun resetFailedTask(taskId: UUID): Boolean {
|
||||||
@ -30,6 +35,20 @@ class TaskService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun getFailedTasks(): List<PersistedTask> {
|
fun getFailedTasks(): List<PersistedTask> {
|
||||||
return TaskStore.getFailedTasks()
|
return getNonDeleted(TaskStore.getFailedTasks())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun getNonDeleted(tasks: List<PersistedTask>): List<PersistedTask> {
|
||||||
|
if (tasks.isEmpty()) return emptyList()
|
||||||
|
|
||||||
|
// 1. Finn alle referenceId i batchen
|
||||||
|
val referenceIds = tasks.map { it.referenceId }.toSet()
|
||||||
|
|
||||||
|
// 2. Slå opp alle slettede sekvenser i ett kall
|
||||||
|
val deleted = eventService.getDeletedSequences(referenceIds)
|
||||||
|
|
||||||
|
// 3. Filtrer bort tasks som tilhører slettede sekvenser
|
||||||
|
return tasks.filterNot { it.referenceId in deleted }
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -18,6 +18,7 @@ object EventRegistry {
|
|||||||
CoverDownloadTaskCreatedEvent::class.java,
|
CoverDownloadTaskCreatedEvent::class.java,
|
||||||
CoverDownloadResultEvent::class.java,
|
CoverDownloadResultEvent::class.java,
|
||||||
|
|
||||||
|
DeleteSequenceEvent::class.java,
|
||||||
DeletedTaskResultEvent::class.java,
|
DeletedTaskResultEvent::class.java,
|
||||||
ForcedTaskResetAuditEvent::class.java,
|
ForcedTaskResetAuditEvent::class.java,
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,6 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||||
|
|
||||||
|
import no.iktdev.eventi.models.SignalEvent
|
||||||
|
|
||||||
|
class DeleteSequenceEvent: SignalEvent() {
|
||||||
|
}
|
||||||
@ -9,16 +9,14 @@ import no.iktdev.eventi.stores.EventStore
|
|||||||
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent
|
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ManualAllowCompletionEvent
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.getName
|
import no.iktdev.mediaprocessing.shared.common.getName
|
||||||
import no.iktdev.mediaprocessing.shared.database.likeAny
|
import no.iktdev.mediaprocessing.shared.database.likeAny
|
||||||
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
||||||
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
||||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||||
import org.jetbrains.exposed.sql.SortOrder
|
import org.jetbrains.exposed.sql.SortOrder
|
||||||
|
import org.jetbrains.exposed.sql.and
|
||||||
import org.jetbrains.exposed.sql.insert
|
import org.jetbrains.exposed.sql.insert
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.time.temporal.ChronoUnit
|
import java.time.temporal.ChronoUnit
|
||||||
@ -74,15 +72,15 @@ object EventStore: EventStore {
|
|||||||
|
|
||||||
|
|
||||||
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
|
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
|
||||||
val result = withTransaction {
|
return withTransaction {
|
||||||
EventsTable.getWhere {
|
EventsTable.getWhere {
|
||||||
EventsTable.persistedAt greater timestamp
|
EventsTable.persistedAt greater timestamp
|
||||||
}
|
}
|
||||||
}
|
}.getOrDefault(emptyList())
|
||||||
return result.getOrDefault(emptyList())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> {
|
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> {
|
||||||
|
if (isEventSequenceDeleted(referenceId)) return emptyList()
|
||||||
val result = withTransaction {
|
val result = withTransaction {
|
||||||
EventsTable
|
EventsTable
|
||||||
.getWhere { EventsTable.referenceId eq referenceId.toString()}
|
.getWhere { EventsTable.referenceId eq referenceId.toString()}
|
||||||
@ -164,7 +162,8 @@ object EventStore: EventStore {
|
|||||||
EventsTable.getWhere {
|
EventsTable.getWhere {
|
||||||
EventsTable.referenceId notInList completedReferences
|
EventsTable.referenceId notInList completedReferences
|
||||||
}
|
}
|
||||||
}.getOrDefault(emptyList())
|
}.getOrDefault(emptyList()).groupBy { it.referenceId }
|
||||||
|
.filterNot { (referenceId, _) -> isEventSequenceDeleted(referenceId) }.values.flatten()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getLastEventTimestamp(): Instant? {
|
fun getLastEventTimestamp(): Instant? {
|
||||||
@ -176,4 +175,33 @@ object EventStore: EventStore {
|
|||||||
?.get(EventsTable.persistedAt)
|
?.get(EventsTable.persistedAt)
|
||||||
}.getOrDefault(null)
|
}.getOrDefault(null)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun isEventSequenceDeleted(referenceId: UUID): Boolean {
|
||||||
|
return withTransaction {
|
||||||
|
EventsTable.getWhere {
|
||||||
|
(EventsTable.referenceId eq referenceId.toString()) and
|
||||||
|
(EventsTable.event eq DeleteSequenceEvent::class.getName())
|
||||||
|
}.count() > 0
|
||||||
|
}.getOrDefault(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun getDeletedSequences(referenceIds: Set<UUID>): Set<UUID> {
|
||||||
|
if (referenceIds.isEmpty()) return emptySet()
|
||||||
|
return withTransaction {
|
||||||
|
EventsTable.select(EventsTable.referenceId)
|
||||||
|
.where {
|
||||||
|
(EventsTable.referenceId inList referenceIds.map { it.toString() }) and
|
||||||
|
(EventsTable.event eq DeleteSequenceEvent::class.getName())
|
||||||
|
}
|
||||||
|
.map { UUID.fromString(it[EventsTable.referenceId]) }
|
||||||
|
.toSet()
|
||||||
|
}.getOrDefault(emptySet())
|
||||||
|
}
|
||||||
|
|
||||||
|
fun deleteSequence(referenceId: UUID): UUID {
|
||||||
|
val deleteSequenceEvent = DeleteSequenceEvent().usingReferenceId(referenceId)
|
||||||
|
persist(deleteSequenceEvent)
|
||||||
|
return deleteSequenceEvent.eventId
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue
Block a user