diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt index 8529d35a..5de269cf 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt @@ -16,7 +16,6 @@ class OperationsController( @PostMapping("/start") fun startProcess(@RequestBody req: StartProcessRequest): ResponseEntity> { - val referenceId = commandService.startProcess(req) return when (val result = commandService.startProcess(req)) { is CommandService.StartResult.Accepted -> ResponseEntity .accepted() diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt new file mode 100644 index 00000000..fc432f50 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -0,0 +1,30 @@ +package no.iktdev.mediaprocessing.coordinator.controller + + +import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.coordinator.services.TaskService +import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks +import org.springframework.web.bind.annotation.* +import java.util.* + +@RestController +@RequestMapping("/tasks") +class TaskController( + private val taskService: TaskService, +) { + + @GetMapping("/active") + fun getActiveTasks(): List = + taskService.getActiveTasks() + + @GetMapping + fun getPagedTasks( + @RequestParam(defaultValue = "0") page: Int, + @RequestParam(defaultValue = "50") size: Int, + ): PagedTasks = + taskService.getPagedTasks(page, size) + + @GetMapping("/{id}") + fun getTask(@PathVariable id: UUID): PersistedTask? = + taskService.getTaskById(id) +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListener.kt index b14c3b5c..5b2c0140 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListener.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import no.iktdev.eventi.events.EventListener import no.iktdev.eventi.models.Event import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ManualAllowCompletionEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskCreatedEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask @@ -21,7 +22,16 @@ class StoreContentAndMetadataListener: EventListener() { event: Event, history: List ): Event? { - val useEvent = event as? MigrateContentToStoreTaskResultEvent ?: return null + if (event !is MigrateContentToStoreTaskResultEvent && event !is ManualAllowCompletionEvent) + return null + + val useEvent = if (event is ManualAllowCompletionEvent) { + history.lastOrNull { it is MigrateContentToStoreTaskResultEvent } as? MigrateContentToStoreTaskResultEvent + ?: return null + } else { + event as MigrateContentToStoreTaskResultEvent + } + val collectionEvent = history.lastOrNull { it is CollectedEvent } as? CollectedEvent ?: return null @@ -39,6 +49,12 @@ class StoreContentAndMetadataListener: EventListener() { return null } + if (!projection.canStoreAutomatically()) { + log.info { "Not storing content and metadata automatically for collection: $collection @ ${useEvent.referenceId}" } + log.info { "A manual allow completion event is required to proceed." } + return null + } + val exportInfo = ContentExport( collection = collection, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt new file mode 100644 index 00000000..648590e8 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt @@ -0,0 +1,25 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks +import no.iktdev.mediaprocessing.shared.database.stores.TaskStore +import org.springframework.stereotype.Service +import java.util.* + + +@Service +class TaskService { + + + fun getActiveTasks(): List { + return TaskStore.findActiveTasks() + } + + fun getPagedTasks(page: Int, size: Int): PagedTasks { + return TaskStore.getPagedTasks(page, size) + } + + fun getTaskById(taskId: UUID): PersistedTask? { + return TaskStore.findByTaskId(taskId) + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt new file mode 100644 index 00000000..fc790f1e --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt @@ -0,0 +1,10 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import no.iktdev.eventi.models.store.PersistedTask + +data class PagedTasks( + val content: List, + val page: Int, + val size: Int, + val total: Long +) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt index 19a02760..c7b42c56 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt @@ -1,34 +1,7 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileAddedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileReadyEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.FileRemovedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent import no.iktdev.eventi.models.Event -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksDetermineSubtitleTypeEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksExtractSelectedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodePerformedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractPerformedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* object EventRegistry { fun getEvents(): List> { @@ -48,6 +21,8 @@ object EventRegistry { FileReadyEvent::class.java, FileRemovedEvent::class.java, + ManualAllowCompletionEvent::class.java, + MediaParsedInfoEvent::class.java, MediaStreamParsedEvent::class.java, MediaTracksDetermineSubtitleTypeEvent::class.java, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ManualAllowCompletionEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ManualAllowCompletionEvent.kt new file mode 100644 index 00000000..d93a143c --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ManualAllowCompletionEvent.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract.events + +import no.iktdev.eventi.models.Event + +class ManualAllowCompletionEvent: Event() { +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt index fc60f1a7..f496b95b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt @@ -4,12 +4,18 @@ import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent import no.iktdev.mediaprocessing.shared.common.model.ContentExport import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus import java.io.File class StoreProjection(val events: List) { + fun canStoreAutomatically(): Boolean { + val manualEvent = events.filterIsInstance().lastOrNull() + return manualEvent?.data?.flow != StartFlow.Manual + } fun projectMetadata(): ContentExport.MetadataExport? { val metadata = CollectProjection(events).metadata diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt index 3487e57e..2d44bb1b 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt @@ -5,17 +5,48 @@ import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.PersistedTask import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.stores.TaskStore +import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks import no.iktdev.mediaprocessing.shared.database.tables.TasksTable import no.iktdev.mediaprocessing.shared.database.withTransaction -import org.jetbrains.exposed.sql.and -import org.jetbrains.exposed.sql.insert -import org.jetbrains.exposed.sql.selectAll -import org.jetbrains.exposed.sql.update +import org.jetbrains.exposed.sql.* import java.time.Duration import java.time.LocalDateTime -import java.util.UUID +import java.util.* object TaskStore: TaskStore { + + fun getPagedTasks(page: Int, size: Int): PagedTasks { + return withTransaction { + val total = TasksTable.selectAll().count() + val rows = TasksTable + .selectAll() + .orderBy(TasksTable.persistedAt, SortOrder.DESC) + .limit(size).offset(start = (page * size).toLong()) + .map { it -> + PersistedTask( + id = it[TasksTable.id].value.toLong(), + referenceId = it[TasksTable.referenceId], + status = it[TasksTable.status], + taskId = it[TasksTable.taskId], + task = it[TasksTable.task], + data = it[TasksTable.data], + claimed = it[TasksTable.claimed], + claimedBy = it[TasksTable.claimedBy], + consumed = it[TasksTable.consumed], + lastCheckIn = it[TasksTable.lastCheckIn], + persistedAt = it[TasksTable.persistedAt] + ) + } + PagedTasks( + content = rows, + page = page, + size = size, + total = total + ) + }.getOrDefault(PagedTasks(emptyList(), page, size, 0)) + } + + override fun persist(task: Task) { val asData = ZDS.WGson.toJson(task) val taskName = task::class.simpleName ?: run { @@ -99,6 +130,28 @@ object TaskStore: TaskStore { }.getOrDefault(emptyList()) } + fun findActiveTasks(): List { + return withTransaction { + TasksTable.selectAll() + .where { (TasksTable.status inList listOf(TaskStatus.Pending, TaskStatus.InProgress)) and (TasksTable.consumed eq false) } + .map { + PersistedTask( + id = it[TasksTable.id].value.toLong(), + referenceId = it[TasksTable.referenceId], + status = it[TasksTable.status], + taskId = it[TasksTable.taskId], + task = it[TasksTable.task], + data = it[TasksTable.data], + claimed = it[TasksTable.claimed], + claimedBy = it[TasksTable.claimedBy], + consumed = it[TasksTable.consumed], + lastCheckIn = it[TasksTable.lastCheckIn], + persistedAt = it[TasksTable.persistedAt] + ) + } + }.getOrDefault(emptyList()) + } + override fun claim(taskId: UUID, workerId: String): Boolean { return withTransaction { TasksTable.update({