Pagination + store event

This commit is contained in:
Brage Skjønborg 2026-01-20 03:20:33 +01:00
parent 181b1630be
commit d246403f74
9 changed files with 155 additions and 35 deletions

View File

@ -16,7 +16,6 @@ class OperationsController(
@PostMapping("/start")
fun startProcess(@RequestBody req: StartProcessRequest): ResponseEntity<Map<String, String>> {
val referenceId = commandService.startProcess(req)
return when (val result = commandService.startProcess(req)) {
is CommandService.StartResult.Accepted -> ResponseEntity
.accepted()

View File

@ -0,0 +1,30 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.mediaprocessing.coordinator.services.TaskService
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/tasks")
class TaskController(
private val taskService: TaskService,
) {
@GetMapping("/active")
fun getActiveTasks(): List<PersistedTask> =
taskService.getActiveTasks()
@GetMapping
fun getPagedTasks(
@RequestParam(defaultValue = "0") page: Int,
@RequestParam(defaultValue = "50") size: Int,
): PagedTasks =
taskService.getPagedTasks(page, size)
@GetMapping("/{id}")
fun getTask(@PathVariable id: UUID): PersistedTask? =
taskService.getTaskById(id)
}

View File

@ -4,6 +4,7 @@ import mu.KotlinLogging
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ManualAllowCompletionEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask
@ -21,7 +22,16 @@ class StoreContentAndMetadataListener: EventListener() {
event: Event,
history: List<Event>
): Event? {
val useEvent = event as? MigrateContentToStoreTaskResultEvent ?: return null
if (event !is MigrateContentToStoreTaskResultEvent && event !is ManualAllowCompletionEvent)
return null
val useEvent = if (event is ManualAllowCompletionEvent) {
history.lastOrNull { it is MigrateContentToStoreTaskResultEvent } as? MigrateContentToStoreTaskResultEvent
?: return null
} else {
event as MigrateContentToStoreTaskResultEvent
}
val collectionEvent = history.lastOrNull { it is CollectedEvent } as? CollectedEvent
?: return null
@ -39,6 +49,12 @@ class StoreContentAndMetadataListener: EventListener() {
return null
}
if (!projection.canStoreAutomatically()) {
log.info { "Not storing content and metadata automatically for collection: $collection @ ${useEvent.referenceId}" }
log.info { "A manual allow completion event is required to proceed." }
return null
}
val exportInfo = ContentExport(
collection = collection,

View File

@ -0,0 +1,25 @@
package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
import org.springframework.stereotype.Service
import java.util.*
@Service
class TaskService {
fun getActiveTasks(): List<PersistedTask> {
return TaskStore.findActiveTasks()
}
fun getPagedTasks(page: Int, size: Int): PagedTasks {
return TaskStore.getPagedTasks(page, size)
}
fun getTaskById(taskId: UUID): PersistedTask? {
return TaskStore.findByTaskId(taskId)
}
}

View File

@ -0,0 +1,10 @@
package no.iktdev.mediaprocessing.shared.common.dto
import no.iktdev.eventi.models.store.PersistedTask
data class PagedTasks(
val content: List<PersistedTask>,
val page: Int,
val size: Int,
val total: Long
)

View File

@ -1,34 +1,7 @@
package no.iktdev.mediaprocessing.shared.common.event_task_contract
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileAddedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileReadyEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileRemovedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksDetermineSubtitleTypeEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksExtractSelectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodePerformedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractPerformedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
object EventRegistry {
fun getEvents(): List<Class<out Event>> {
@ -48,6 +21,8 @@ object EventRegistry {
FileReadyEvent::class.java,
FileRemovedEvent::class.java,
ManualAllowCompletionEvent::class.java,
MediaParsedInfoEvent::class.java,
MediaStreamParsedEvent::class.java,
MediaTracksDetermineSubtitleTypeEvent::class.java,

View File

@ -0,0 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
import no.iktdev.eventi.models.Event
class ManualAllowCompletionEvent: Event() {
}

View File

@ -4,12 +4,18 @@ import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.mediaprocessing.shared.common.model.ContentExport
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
import java.io.File
class StoreProjection(val events: List<Event>) {
fun canStoreAutomatically(): Boolean {
val manualEvent = events.filterIsInstance<StartProcessingEvent>().lastOrNull()
return manualEvent?.data?.flow != StartFlow.Manual
}
fun projectMetadata(): ContentExport.MetadataExport? {
val metadata = CollectProjection(events).metadata

View File

@ -5,17 +5,48 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
import no.iktdev.mediaprocessing.shared.database.tables.TasksTable
import no.iktdev.mediaprocessing.shared.database.withTransaction
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.update
import org.jetbrains.exposed.sql.*
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import java.util.*
object TaskStore: TaskStore {
fun getPagedTasks(page: Int, size: Int): PagedTasks {
return withTransaction {
val total = TasksTable.selectAll().count()
val rows = TasksTable
.selectAll()
.orderBy(TasksTable.persistedAt, SortOrder.DESC)
.limit(size).offset(start = (page * size).toLong())
.map { it ->
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = it[TasksTable.referenceId],
status = it[TasksTable.status],
taskId = it[TasksTable.taskId],
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
}
PagedTasks(
content = rows,
page = page,
size = size,
total = total
)
}.getOrDefault(PagedTasks(emptyList(), page, size, 0))
}
override fun persist(task: Task) {
val asData = ZDS.WGson.toJson(task)
val taskName = task::class.simpleName ?: run {
@ -99,6 +130,28 @@ object TaskStore: TaskStore {
}.getOrDefault(emptyList())
}
fun findActiveTasks(): List<PersistedTask> {
return withTransaction {
TasksTable.selectAll()
.where { (TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and (TasksTable.consumed eq false) }
.map {
PersistedTask(
id = it[TasksTable.id].value.toLong(),
referenceId = it[TasksTable.referenceId],
status = it[TasksTable.status],
taskId = it[TasksTable.taskId],
task = it[TasksTable.task],
data = it[TasksTable.data],
claimed = it[TasksTable.claimed],
claimedBy = it[TasksTable.claimedBy],
consumed = it[TasksTable.consumed],
lastCheckIn = it[TasksTable.lastCheckIn],
persistedAt = it[TasksTable.persistedAt]
)
}
}.getOrDefault(emptyList())
}
override fun claim(taskId: UUID, workerId: String): Boolean {
return withTransaction {
TasksTable.update({