From dc87aa394ca8c464e61b1e272129421e45191f27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Thu, 29 Jan 2026 19:05:16 +0100 Subject: [PATCH] Task Reset --- .../coordinator/controller/TaskController.kt | 48 ++++++++- .../coordinator/services/EventService.kt | 7 ++ .../coordinator/services/TaskService.kt | 5 + .../shared/common/dto/ResetTaskResponse.kt | 12 +++ .../event_task_contract/EventRegistry.kt | 3 + .../events/DeletedTaskResultEvent.kt | 7 ++ .../events/ForcedTaskResetAuditEvent.kt | 8 ++ .../shared/database/stores/EventStore.kt | 63 ++++++----- .../shared/database/stores/TaskStore.kt | 101 ++++++------------ .../shared/database/tables/EventsTable.kt | 20 ++++ .../shared/database/tables/TasksTable.kt | 26 +++++ 11 files changed, 203 insertions(+), 97 deletions(-) create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/ResetTaskResponse.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeletedTaskResultEvent.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ForcedTaskResetAuditEvent.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt index 7feba536..02bec58c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -2,19 +2,22 @@ package no.iktdev.mediaprocessing.coordinator.controller import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.coordinator.services.EventService import no.iktdev.mediaprocessing.coordinator.services.TaskService +import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery -import org.springframework.web.bind.annotation.GetMapping -import org.springframework.web.bind.annotation.PathVariable -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.RestController +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.* import java.util.* @RestController @RequestMapping("/tasks") class TaskController( private val taskService: TaskService, + private val eventService: EventService ) { @GetMapping("/active") @@ -30,4 +33,41 @@ class TaskController( @GetMapping("/{id}") fun getTask(@PathVariable id: UUID): PersistedTask? = taskService.getTaskById(id) + + + @PostMapping("/{taskId}/reset") + fun resetTask(@PathVariable taskId: UUID, forced: Boolean = false): ResponseEntity { + val task = taskService.getTaskById(taskId) + ?: return ResponseEntity.notFound().build() + + val referenceId = task.referenceId + + // 1. Opprett DeleteEvent + val deletedId = eventService.deleteTaskFailureForReset(referenceId, taskId) + if (deletedId == null) { + if (forced) { + eventService.createForcedTaskResetAuditEvent(referenceId, taskId) + } else { + return ResponseEntity.status(HttpStatus.CONFLICT).build() + } + } + + // 2. Reset task + taskService.resetFailedTask(taskId) + + return ResponseEntity.ok( + ResetTaskResponse( + taskId = taskId, + referenceId = referenceId, + status = "reset", + deletedEventId = deletedId, + resetAt = UtcNow() + ) + ) + } + @PostMapping("/{taskId}/reset/force") + fun resetTaskForce(@PathVariable taskId: UUID): ResponseEntity { + return resetTask(taskId, true) + } + } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index 921fcdd0..5dee4f2f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -45,4 +45,11 @@ class EventService { return EventStore.getPagedEvents(query) } + fun deleteTaskFailureForReset(referenceId: UUID, taskId: UUID): UUID? { + return EventStore.deleteFailedEventForTask(referenceId, taskId) + } + + fun createForcedTaskResetAuditEvent(referenceId: UUID, taskId: UUID): UUID? { + return EventStore.createTaskResetAudioEvent(referenceId, taskId) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt index 863dd573..f12fb2eb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt @@ -23,4 +23,9 @@ class TaskService { fun getTaskById(taskId: UUID): PersistedTask? { return TaskStore.findByTaskId(taskId) } + + fun resetFailedTask(taskId: UUID): Boolean { + val resetSuccess = TaskStore.resetTaskById(taskId).isSuccess + return resetSuccess + } } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/ResetTaskResponse.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/ResetTaskResponse.kt new file mode 100644 index 00000000..47eb14ba --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/ResetTaskResponse.kt @@ -0,0 +1,12 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import java.time.Instant +import java.util.* + +data class ResetTaskResponse( + val taskId: UUID, + val referenceId: UUID, + val deletedEventId: UUID?, + val status: String, + val resetAt: Instant +) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt index c7b42c56..e0d4fab7 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt @@ -17,6 +17,9 @@ object EventRegistry { CoverDownloadTaskCreatedEvent::class.java, CoverDownloadResultEvent::class.java, + DeletedTaskResultEvent::class.java, + ForcedTaskResetAuditEvent::class.java, + FileAddedEvent::class.java, FileReadyEvent::class.java, FileRemovedEvent::class.java, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeletedTaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeletedTaskResultEvent.kt new file mode 100644 index 00000000..9550844d --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/DeletedTaskResultEvent.kt @@ -0,0 +1,7 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract.events + +import no.iktdev.eventi.models.DeleteEvent +import java.util.* + +class DeletedTaskResultEvent(override var deletedEventId: UUID): DeleteEvent() { +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ForcedTaskResetAuditEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ForcedTaskResetAuditEvent.kt new file mode 100644 index 00000000..e837f51c --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ForcedTaskResetAuditEvent.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract.events + +import no.iktdev.eventi.models.Event +import java.util.* + +data class ForcedTaskResetAuditEvent( + val taskId: UUID, +): Event() \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt index 5df84d4a..eaf3fc2c 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt @@ -1,22 +1,27 @@ package no.iktdev.mediaprocessing.shared.database.stores +import mu.KotlinLogging import no.iktdev.eventi.ZDS +import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.eventi.stores.EventStore import no.iktdev.mediaprocessing.shared.common.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent import no.iktdev.mediaprocessing.shared.database.likeAny import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.EventsTable import no.iktdev.mediaprocessing.shared.database.withTransaction import org.jetbrains.exposed.sql.insert -import org.jetbrains.exposed.sql.selectAll import java.time.Instant import java.util.* + object EventStore: EventStore { + val log = KotlinLogging.logger {} fun getPagedEvents(query: EventQuery): Paginated = pagedQuery( @@ -65,36 +70,17 @@ object EventStore: EventStore { override fun getPersistedEventsAfter(timestamp: Instant): List { val result = withTransaction { - EventsTable.selectAll() - .where { EventsTable.persistedAt greater timestamp } - .map { - PersistedEvent( - id = it[EventsTable.id].value.toLong(), - referenceId = UUID.fromString(it[EventsTable.referenceId]), - eventId = UUID.fromString(it[EventsTable.eventId]), - event = it[EventsTable.event], - data = it[EventsTable.data], - persistedAt = it[EventsTable.persistedAt] - ) - } + EventsTable.getWhere { + EventsTable.persistedAt greater timestamp + } } return result.getOrDefault(emptyList()) } override fun getPersistedEventsFor(referenceId: UUID): List { val result = withTransaction { - EventsTable.selectAll() - .where { EventsTable.referenceId eq referenceId.toString()} - .map { - PersistedEvent( - id = it[EventsTable.id].value.toLong(), - referenceId = UUID.fromString(it[EventsTable.referenceId]), - eventId = UUID.fromString(it[EventsTable.eventId]), - event = it[EventsTable.event], - data = it[EventsTable.data], - persistedAt = it[EventsTable.persistedAt] - ) - } + EventsTable + .getWhere { EventsTable.referenceId eq referenceId.toString()} } return result.getOrDefault(emptyList()) } @@ -114,4 +100,31 @@ object EventStore: EventStore { } } } + + + fun deleteFailedEventForTask(referenceId: UUID, taskId: UUID): UUID? { + val sequenceEvents = withTransaction { + EventsTable.getWhere { + EventsTable.referenceId eq referenceId.toString() + } + }.getOrDefault(emptyList()) + val serialized = sequenceEvents.map { it.toEvent() } + val targetedEvent = serialized.find { it?.metadata?.derivedFromId?.any { uUID -> uUID == taskId } == true } + if (targetedEvent == null) { + log.error { "TaskId $taskId does not exist within the metadata of any events within the scope of $referenceId" } + } else { + log.info { "Identified ${targetedEvent.eventId} in ${targetedEvent.referenceId} as being derived from $taskId" } + val preparedDeleteEvent = DeletedTaskResultEvent(targetedEvent.eventId) + persist(preparedDeleteEvent) + return preparedDeleteEvent.deletedEventId + } + return null + } + + fun createTaskResetAudioEvent(referenceId: UUID, taskId: UUID): UUID { + val auditEvent = ForcedTaskResetAuditEvent(taskId) + .usingReferenceId(referenceId) + persist(auditEvent) + return auditEvent.eventId + } } \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt index 90dc6c3a..380b680d 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt @@ -107,67 +107,28 @@ object TaskStore: TaskStore { override fun findByReferenceId(referenceId: UUID): List { return withTransaction { - TasksTable.selectAll() - .where { TasksTable.referenceId eq referenceId.toString() } - .map { - PersistedTask( - id = it[TasksTable.id].value.toLong(), - referenceId = UUID.fromString(it[TasksTable.referenceId]), - status = it[TasksTable.status], - taskId = UUID.fromString(it[TasksTable.taskId]), - task = it[TasksTable.task], - data = it[TasksTable.data], - claimed = it[TasksTable.claimed], - claimedBy = it[TasksTable.claimedBy], - consumed = it[TasksTable.consumed], - lastCheckIn = it[TasksTable.lastCheckIn], - persistedAt = it[TasksTable.persistedAt] - ) - } + TasksTable.getWhere { + TasksTable.referenceId eq referenceId.toString() + } }.getOrDefault(emptyList()) } override fun findUnclaimed(referenceId: UUID): List { return withTransaction { - TasksTable.selectAll() - .where { (TasksTable.referenceId eq referenceId.toString()) and (TasksTable.claimed eq false) and (TasksTable.consumed eq false) } - .map { - PersistedTask( - id = it[TasksTable.id].value.toLong(), - referenceId = UUID.fromString(it[TasksTable.referenceId]), - status = it[TasksTable.status], - taskId = UUID.fromString(it[TasksTable.taskId]), - task = it[TasksTable.task], - data = it[TasksTable.data], - claimed = it[TasksTable.claimed], - claimedBy = it[TasksTable.claimedBy], - consumed = it[TasksTable.consumed], - lastCheckIn = it[TasksTable.lastCheckIn], - persistedAt = it[TasksTable.persistedAt] - ) - } + TasksTable.getWhere { + (TasksTable.referenceId eq referenceId.toString()) and + (TasksTable.claimed eq false) and + (TasksTable.consumed eq false) + } }.getOrDefault(emptyList()) } fun findActiveTasks(): List { return withTransaction { - TasksTable.selectAll() - .where { (TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and (TasksTable.consumed eq false) } - .map { - PersistedTask( - id = it[TasksTable.id].value.toLong(), - referenceId = UUID.fromString(it[TasksTable.referenceId]), - status = it[TasksTable.status], - taskId = UUID.fromString(it[TasksTable.taskId]), - task = it[TasksTable.task], - data = it[TasksTable.data], - claimed = it[TasksTable.claimed], - claimedBy = it[TasksTable.claimedBy], - consumed = it[TasksTable.consumed], - lastCheckIn = it[TasksTable.lastCheckIn], - persistedAt = it[TasksTable.persistedAt] - ) - } + TasksTable.getWhere { + (TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and + (TasksTable.consumed eq false) + } }.getOrDefault(emptyList()) } @@ -220,25 +181,29 @@ object TaskStore: TaskStore { } } + fun resetTaskById(taskId: UUID): Result { + return withTransaction { + TasksTable.update({ + (TasksTable.claimed eq true) and + (TasksTable.consumed eq false) and + (TasksTable.status eq TaskStatus.Failed) and + (TasksTable.taskId eq taskId.toString()) + }) { + it[claimed] = false + it[claimedBy] = null + it[consumed] = false + it[lastCheckIn] = null + it[status] = TaskStatus.Pending + } + } + } + override fun getPendingTasks(): List { return withTransaction { - TasksTable.selectAll() - .where { (TasksTable.consumed eq false) and (TasksTable.claimed eq false) } - .map { - PersistedTask( - id = it[TasksTable.id].value.toLong(), - referenceId = UUID.fromString(it[TasksTable.referenceId]), - status = it[TasksTable.status], - taskId = UUID.fromString(it[TasksTable.taskId]), - task = it[TasksTable.task], - data = it[TasksTable.data], - claimed = it[TasksTable.claimed], - claimedBy = it[TasksTable.claimedBy], - consumed = it[TasksTable.consumed], - lastCheckIn = it[TasksTable.lastCheckIn], - persistedAt = it[TasksTable.persistedAt] - ) - } + TasksTable.getWhere { + (TasksTable.consumed eq false) and + (TasksTable.claimed eq false) + } }.getOrDefault(emptyList()) } } \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt index 58d69680..cedb26e8 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt @@ -1,10 +1,15 @@ package no.iktdev.mediaprocessing.shared.database.tables +import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.mediaprocessing.shared.common.UtcNow import no.iktdev.mediaprocessing.shared.database.LongTextColumnType import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.Column +import org.jetbrains.exposed.sql.Op +import org.jetbrains.exposed.sql.SqlExpressionBuilder import org.jetbrains.exposed.sql.javatime.timestamp +import org.jetbrains.exposed.sql.selectAll +import java.util.* object EventsTable: IntIdTable(name = "EVENTS") { val referenceId: Column = varchar("REFERENCE_ID", 36) @@ -18,4 +23,19 @@ object EventsTable: IntIdTable(name = "EVENTS") { init { uniqueIndex(referenceId, eventId, event) } + + fun getWhere(predicate: SqlExpressionBuilder.() -> Op): List { + return EventsTable.selectAll() + .where(predicate) + .map { + PersistedEvent( + id = it[EventsTable.id].value.toLong(), + referenceId = UUID.fromString(it[EventsTable.referenceId]), + eventId = UUID.fromString(it[EventsTable.eventId]), + event = it[EventsTable.event], + data = it[EventsTable.data], + persistedAt = it[EventsTable.persistedAt] + ) + } + } } \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/TasksTable.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/TasksTable.kt index ad29d860..edbb4e1b 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/TasksTable.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/TasksTable.kt @@ -1,12 +1,17 @@ package no.iktdev.mediaprocessing.shared.database.tables +import no.iktdev.eventi.models.store.PersistedTask import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.mediaprocessing.shared.common.UtcNow import no.iktdev.mediaprocessing.shared.database.LongTextColumnType import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.Column +import org.jetbrains.exposed.sql.Op +import org.jetbrains.exposed.sql.SqlExpressionBuilder import org.jetbrains.exposed.sql.javatime.timestamp +import org.jetbrains.exposed.sql.selectAll import java.time.Instant +import java.util.* object TasksTable: IntIdTable(name = "TASKS") { val referenceId: Column = varchar("REFERENCE_ID", 36) @@ -20,4 +25,25 @@ object TasksTable: IntIdTable(name = "TASKS") { val lastCheckIn: Column = timestamp("LAST_CHECK_IN").nullable() val persistedAt = timestamp("PERSISTED_AT") .clientDefault { UtcNow() } + + fun getWhere(predicate: SqlExpressionBuilder.() -> Op): List { + return TasksTable.selectAll() + .where(predicate) + .map { + PersistedTask( + id = it[TasksTable.id].value.toLong(), + referenceId = UUID.fromString(it[TasksTable.referenceId]), + status = it[TasksTable.status], + taskId = UUID.fromString(it[TasksTable.taskId]), + task = it[TasksTable.task], + data = it[TasksTable.data], + claimed = it[TasksTable.claimed], + claimedBy = it[TasksTable.claimedBy], + consumed = it[TasksTable.consumed], + lastCheckIn = it[TasksTable.lastCheckIn], + persistedAt = it[TasksTable.persistedAt] + ) + } + } + } \ No newline at end of file