Task Reset

This commit is contained in:
Brage Skjønborg 2026-01-29 19:05:16 +01:00
parent 66f5e12a51
commit dc87aa394c
11 changed files with 203 additions and 97 deletions

View File

@ -2,19 +2,22 @@ package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.mediaprocessing.coordinator.services.EventService
import no.iktdev.mediaprocessing.coordinator.services.TaskService
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse
import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/tasks")
class TaskController(
private val taskService: TaskService,
private val eventService: EventService
) {
@GetMapping("/active")
@ -30,4 +33,41 @@ class TaskController(
@GetMapping("/{id}")
fun getTask(@PathVariable id: UUID): PersistedTask? =
taskService.getTaskById(id)
@PostMapping("/{taskId}/reset")
fun resetTask(@PathVariable taskId: UUID, forced: Boolean = false): ResponseEntity<ResetTaskResponse> {
val task = taskService.getTaskById(taskId)
?: return ResponseEntity.notFound().build()
val referenceId = task.referenceId
// 1. Opprett DeleteEvent
val deletedId = eventService.deleteTaskFailureForReset(referenceId, taskId)
if (deletedId == null) {
if (forced) {
eventService.createForcedTaskResetAuditEvent(referenceId, taskId)
} else {
return ResponseEntity.status(HttpStatus.CONFLICT).build()
}
}
// 2. Reset task
taskService.resetFailedTask(taskId)
return ResponseEntity.ok(
ResetTaskResponse(
taskId = taskId,
referenceId = referenceId,
status = "reset",
deletedEventId = deletedId,
resetAt = UtcNow()
)
)
}
@PostMapping("/{taskId}/reset/force")
fun resetTaskForce(@PathVariable taskId: UUID): ResponseEntity<ResetTaskResponse> {
return resetTask(taskId, true)
}
}

View File

@ -45,4 +45,11 @@ class EventService {
return EventStore.getPagedEvents(query)
}
fun deleteTaskFailureForReset(referenceId: UUID, taskId: UUID): UUID? {
return EventStore.deleteFailedEventForTask(referenceId, taskId)
}
fun createForcedTaskResetAuditEvent(referenceId: UUID, taskId: UUID): UUID? {
return EventStore.createTaskResetAudioEvent(referenceId, taskId)
}
}

View File

@ -23,4 +23,9 @@ class TaskService {
fun getTaskById(taskId: UUID): PersistedTask? {
return TaskStore.findByTaskId(taskId)
}
fun resetFailedTask(taskId: UUID): Boolean {
val resetSuccess = TaskStore.resetTaskById(taskId).isSuccess
return resetSuccess
}
}

View File

@ -0,0 +1,12 @@
package no.iktdev.mediaprocessing.shared.common.dto
import java.time.Instant
import java.util.*
data class ResetTaskResponse(
val taskId: UUID,
val referenceId: UUID,
val deletedEventId: UUID?,
val status: String,
val resetAt: Instant
)

View File

@ -17,6 +17,9 @@ object EventRegistry {
CoverDownloadTaskCreatedEvent::class.java,
CoverDownloadResultEvent::class.java,
DeletedTaskResultEvent::class.java,
ForcedTaskResetAuditEvent::class.java,
FileAddedEvent::class.java,
FileReadyEvent::class.java,
FileRemovedEvent::class.java,

View File

@ -0,0 +1,7 @@
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
import no.iktdev.eventi.models.DeleteEvent
import java.util.*
class DeletedTaskResultEvent(override var deletedEventId: UUID): DeleteEvent() {
}

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
import no.iktdev.eventi.models.Event
import java.util.*
data class ForcedTaskResetAuditEvent(
val taskId: UUID,
): Event()

View File

@ -1,22 +1,27 @@
package no.iktdev.mediaprocessing.shared.database.stores
import mu.KotlinLogging
import no.iktdev.eventi.ZDS
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.eventi.stores.EventStore
import no.iktdev.mediaprocessing.shared.common.UtcNow
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent
import no.iktdev.mediaprocessing.shared.database.likeAny
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
import no.iktdev.mediaprocessing.shared.database.withTransaction
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.selectAll
import java.time.Instant
import java.util.*
object EventStore: EventStore {
val log = KotlinLogging.logger {}
fun getPagedEvents(query: EventQuery): Paginated<PersistedEvent> =
pagedQuery(
@ -65,17 +70,8 @@ object EventStore: EventStore {
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
val result = withTransaction {
EventsTable.selectAll()
.where { EventsTable.persistedAt greater timestamp }
.map {
PersistedEvent(
id = it[EventsTable.id].value.toLong(),
referenceId = UUID.fromString(it[EventsTable.referenceId]),
eventId = UUID.fromString(it[EventsTable.eventId]),
event = it[EventsTable.event],
data = it[EventsTable.data],
persistedAt = it[EventsTable.persistedAt]
)
EventsTable.getWhere {
EventsTable.persistedAt greater timestamp
}
}
return result.getOrDefault(emptyList())
@ -83,18 +79,8 @@ object EventStore: EventStore {
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> {
val result = withTransaction {
EventsTable.selectAll()
.where { EventsTable.referenceId eq referenceId.toString()}
.map {
PersistedEvent(
id = it[EventsTable.id].value.toLong(),
referenceId = UUID.fromString(it[EventsTable.referenceId]),
eventId = UUID.fromString(it[EventsTable.eventId]),
event = it[EventsTable.event],
data = it[EventsTable.data],
persistedAt = it[EventsTable.persistedAt]
)
}
EventsTable
.getWhere { EventsTable.referenceId eq referenceId.toString()}
}
return result.getOrDefault(emptyList())
}
@ -114,4 +100,31 @@ object EventStore: EventStore {
}
}
}
fun deleteFailedEventForTask(referenceId: UUID, taskId: UUID): UUID? {
val sequenceEvents = withTransaction {
EventsTable.getWhere {
EventsTable.referenceId eq referenceId.toString()
}
}.getOrDefault(emptyList())
val serialized = sequenceEvents.map { it.toEvent() }
val targetedEvent = serialized.find { it?.metadata?.derivedFromId?.any { uUID -> uUID == taskId } == true }
if (targetedEvent == null) {
log.error { "TaskId $taskId does not exist within the metadata of any events within the scope of $referenceId" }
} else {
log.info { "Identified ${targetedEvent.eventId} in ${targetedEvent.referenceId} as being derived from $taskId" }
val preparedDeleteEvent = DeletedTaskResultEvent(targetedEvent.eventId)
persist(preparedDeleteEvent)
return preparedDeleteEvent.deletedEventId
}
return null
}
fun createTaskResetAudioEvent(referenceId: UUID, taskId: UUID): UUID {
val auditEvent = ForcedTaskResetAuditEvent(taskId)
.usingReferenceId(referenceId)
persist(auditEvent)
return auditEvent.eventId
}
}

View File

@ -107,66 +107,27 @@ object TaskStore: TaskStore {
override fun findByReferenceId(referenceId: UUID): List<PersistedTask> {
return withTransaction {
TasksTable.selectAll()
.where { TasksTable.referenceId eq referenceId.toString() }
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = UUID.fromString(it[TasksTable.referenceId]),
status = it[TasksTable.status],
taskId = UUID.fromString(it[TasksTable.taskId]),
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
TasksTable.getWhere {
TasksTable.referenceId eq referenceId.toString()
}
}.getOrDefault(emptyList())
}
override fun findUnclaimed(referenceId: UUID): List<PersistedTask> {
return withTransaction {
TasksTable.selectAll()
.where { (TasksTable.referenceId eq referenceId.toString()) and (TasksTable.claimed eq false) and (TasksTable.consumed eq false) }
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = UUID.fromString(it[TasksTable.referenceId]),
status = it[TasksTable.status],
taskId = UUID.fromString(it[TasksTable.taskId]),
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
TasksTable.getWhere {
(TasksTable.referenceId eq referenceId.toString()) and
(TasksTable.claimed eq false) and
(TasksTable.consumed eq false)
}
}.getOrDefault(emptyList())
}
fun findActiveTasks(): List<PersistedTask> {
return withTransaction {
TasksTable.selectAll()
.where { (TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and (TasksTable.consumed eq false) }
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = UUID.fromString(it[TasksTable.referenceId]),
status = it[TasksTable.status],
taskId = UUID.fromString(it[TasksTable.taskId]),
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
TasksTable.getWhere {
(TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and
(TasksTable.consumed eq false)
}
}.getOrDefault(emptyList())
}
@ -220,24 +181,28 @@ object TaskStore: TaskStore {
}
}
fun resetTaskById(taskId: UUID): Result<Int> {
return withTransaction {
TasksTable.update({
(TasksTable.claimed eq true) and
(TasksTable.consumed eq false) and
(TasksTable.status eq TaskStatus.Failed) and
(TasksTable.taskId eq taskId.toString())
}) {
it[claimed] = false
it[claimedBy] = null
it[consumed] = false
it[lastCheckIn] = null
it[status] = TaskStatus.Pending
}
}
}
override fun getPendingTasks(): List<PersistedTask> {
return withTransaction {
TasksTable.selectAll()
.where { (TasksTable.consumed eq false) and (TasksTable.claimed eq false) }
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = UUID.fromString(it[TasksTable.referenceId]),
status = it[TasksTable.status],
taskId = UUID.fromString(it[TasksTable.taskId]),
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
TasksTable.getWhere {
(TasksTable.consumed eq false) and
(TasksTable.claimed eq false)
}
}.getOrDefault(emptyList())
}

View File

@ -1,10 +1,15 @@
package no.iktdev.mediaprocessing.shared.database.tables
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.mediaprocessing.shared.common.UtcNow
import no.iktdev.mediaprocessing.shared.database.LongTextColumnType
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.SqlExpressionBuilder
import org.jetbrains.exposed.sql.javatime.timestamp
import org.jetbrains.exposed.sql.selectAll
import java.util.*
object EventsTable: IntIdTable(name = "EVENTS") {
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
@ -18,4 +23,19 @@ object EventsTable: IntIdTable(name = "EVENTS") {
init {
uniqueIndex(referenceId, eventId, event)
}
fun getWhere(predicate: SqlExpressionBuilder.() -> Op<Boolean>): List<PersistedEvent> {
return EventsTable.selectAll()
.where(predicate)
.map {
PersistedEvent(
id = it[EventsTable.id].value.toLong(),
referenceId = UUID.fromString(it[EventsTable.referenceId]),
eventId = UUID.fromString(it[EventsTable.eventId]),
event = it[EventsTable.event],
data = it[EventsTable.data],
persistedAt = it[EventsTable.persistedAt]
)
}
}
}

View File

@ -1,12 +1,17 @@
package no.iktdev.mediaprocessing.shared.database.tables
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.mediaprocessing.shared.common.UtcNow
import no.iktdev.mediaprocessing.shared.database.LongTextColumnType
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.Op
import org.jetbrains.exposed.sql.SqlExpressionBuilder
import org.jetbrains.exposed.sql.javatime.timestamp
import org.jetbrains.exposed.sql.selectAll
import java.time.Instant
import java.util.*
object TasksTable: IntIdTable(name = "TASKS") {
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
@ -20,4 +25,25 @@ object TasksTable: IntIdTable(name = "TASKS") {
val lastCheckIn: Column<Instant?> = timestamp("LAST_CHECK_IN").nullable()
val persistedAt = timestamp("PERSISTED_AT")
.clientDefault { UtcNow() }
fun getWhere(predicate: SqlExpressionBuilder.() -> Op<Boolean>): List<PersistedTask> {
return TasksTable.selectAll()
.where(predicate)
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = UUID.fromString(it[TasksTable.referenceId]),
status = it[TasksTable.status],
taskId = UUID.fromString(it[TasksTable.taskId]),
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
}
}
}