diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt index 840309d0..05cfcfec 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt @@ -1,6 +1,9 @@ package no.iktdev.mediaprocessing.coordinator.controller -import no.iktdev.mediaprocessing.coordinator.services.EventPagingService +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.mediaprocessing.coordinator.services.EventService +import no.iktdev.mediaprocessing.shared.common.dto.EventQuery +import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent import org.springframework.web.bind.annotation.* import java.util.* @@ -8,18 +11,22 @@ import java.util.* @RestController @RequestMapping("/events") class EventsController( - private val paging: EventPagingService + private val paging: EventService ) { + @GetMapping() + fun getEvents(query: EventQuery): Paginated { + return paging.getEvents(query) + } @GetMapping("/sequence/{referenceId}") - fun getEvents( + fun getEventSequence( @PathVariable referenceId: UUID, @RequestParam(required = false) beforeEventId: UUID?, @RequestParam(required = false) afterEventId: UUID?, @RequestParam(defaultValue = "50") limit: Int ): List { - return paging.getEvents( + return paging.getPagedEvents( referenceId = referenceId, beforeEventId = beforeEventId, afterEventId = afterEventId, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt index fc432f50..7feba536 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -3,8 +3,12 @@ package no.iktdev.mediaprocessing.coordinator.controller import no.iktdev.eventi.models.store.PersistedTask import no.iktdev.mediaprocessing.coordinator.services.TaskService -import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks -import org.springframework.web.bind.annotation.* +import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController import java.util.* @RestController @@ -18,11 +22,10 @@ class TaskController( taskService.getActiveTasks() @GetMapping - fun getPagedTasks( - @RequestParam(defaultValue = "0") page: Int, - @RequestParam(defaultValue = "50") size: Int, - ): PagedTasks = - taskService.getPagedTasks(page, size) + fun getPagedTasks(query: TaskQuery): Paginated = + taskService.getPagedTasks(query) + + @GetMapping("/{id}") fun getTask(@PathVariable id: UUID): PersistedTask? = diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt similarity index 77% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index edd2edbb..921fcdd0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -1,6 +1,9 @@ package no.iktdev.mediaprocessing.coordinator.services import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.mediaprocessing.shared.common.dto.EventQuery +import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent import no.iktdev.mediaprocessing.shared.common.dto.toDto import no.iktdev.mediaprocessing.shared.database.stores.EventStore @@ -8,9 +11,9 @@ import org.springframework.stereotype.Service import java.util.* @Service -class EventPagingService { +class EventService { - fun getEvents( + fun getPagedEvents( referenceId: UUID, beforeEventId: UUID?, afterEventId: UUID?, @@ -38,4 +41,8 @@ class EventPagingService { } } + fun getEvents(query: EventQuery): Paginated { + return EventStore.getPagedEvents(query) + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt index 648590e8..863dd573 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/TaskService.kt @@ -1,7 +1,8 @@ package no.iktdev.mediaprocessing.coordinator.services import no.iktdev.eventi.models.store.PersistedTask -import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks +import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery import no.iktdev.mediaprocessing.shared.database.stores.TaskStore import org.springframework.stereotype.Service import java.util.* @@ -15,8 +16,8 @@ class TaskService { return TaskStore.findActiveTasks() } - fun getPagedTasks(page: Int, size: Int): PagedTasks { - return TaskStore.getPagedTasks(page, size) + fun getPagedTasks(page: TaskQuery): Paginated { + return TaskStore.getPagedTasks(page) } fun getTaskById(taskId: UUID): PersistedTask? { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/EventQuery.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/EventQuery.kt new file mode 100644 index 00000000..3758be7a --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/EventQuery.kt @@ -0,0 +1,16 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import java.time.Instant + +data class EventQuery( + val referenceId: String? = null, + val eventId: String? = null, + val event: String? = null, + val from: Instant? = null, + val to: Instant? = null, + override val sort: String = "persistedAt", + override val order: Sort = Sort.DESC, + override val page: Int = 0, + override val pageSize: Int = 50 +) : PagedQuery + diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedQuery.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedQuery.kt new file mode 100644 index 00000000..444cc3f9 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedQuery.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +interface PagedQuery { + val page: Int + val pageSize: Int + val sort: String + val order: Sort +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt similarity index 50% rename from shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt rename to shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt index 92c12c6e..3b31acd3 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/PagedTasks.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt @@ -1,9 +1,7 @@ package no.iktdev.mediaprocessing.shared.common.dto -import no.iktdev.eventi.models.store.PersistedTask - -data class PagedTasks( - val items: List, +data class Paginated( + val items: List, val page: Int, val size: Int, val total: Long diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Sort.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Sort.kt new file mode 100644 index 00000000..a1275127 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Sort.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +enum class Sort { + ASC, + DESC +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/TaskQuery.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/TaskQuery.kt new file mode 100644 index 00000000..ed52a7e6 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/TaskQuery.kt @@ -0,0 +1,37 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import org.springframework.util.LinkedMultiValueMap +import org.springframework.util.MultiValueMap +import java.time.Instant + +data class TaskQuery( + val status: List? = null, + val claimed: Boolean? = null, + val consumed: Boolean? = null, + val referenceId: String? = null, + val from: Instant? = null, + val to: Instant? = null, + override val sort: String = "persistedAt", + override val order: Sort = Sort.DESC, + override val page: Int = 0, + override val pageSize: Int = 50 +): PagedQuery { + fun toQueryParams(): MultiValueMap { + val params = LinkedMultiValueMap() + + status?.forEach { params.add("status", it) } + claimed?.let { params.add("claimed", it.toString()) } + consumed?.let { params.add("consumed", it.toString()) } + referenceId?.let { params.add("referenceId", it) } + from?.let { params.add("from", it.toString()) } + to?.let { params.add("to", it.toString()) } + params.add("sort", sort) + params.add("order", order.name) + params.add("page", page.toString()) + params.add("pageSize", pageSize.toString()) + + return params + } + +} + diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/Queries.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/Queries.kt new file mode 100644 index 00000000..35c97258 --- /dev/null +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/Queries.kt @@ -0,0 +1,48 @@ +package no.iktdev.mediaprocessing.shared.database.queries + +import no.iktdev.mediaprocessing.shared.common.dto.PagedQuery +import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.dto.Sort +import no.iktdev.mediaprocessing.shared.database.withTransaction +import org.jetbrains.exposed.sql.* + +fun pagedQuery( + table: Table, + query: PagedQuery, + sortColumns: Map>, + applyFilters: QueryBuilder.() -> Unit, + mapper: (ResultRow) -> T +): Paginated { + + return withTransaction { + + // 1. Start query + var base = table.selectAll() + + // 2. Apply filters + val builder = QueryBuilder(base) + builder.applyFilters() + base = builder.build() + + // 3. Count before paging + val total = base.count() + + // 4. Sorting + val sortColumn = sortColumns[query.sort] ?: error("Unknown sort: ${query.sort}") + val sortOrder = if (query.order == Sort.ASC) SortOrder.ASC else SortOrder.DESC + + // 5. Paging + mapping + val items = base + .orderBy(sortColumn, sortOrder) + .limit(query.pageSize) + .offset((query.page * query.pageSize).toLong()) + .map(mapper) + + Paginated( + items = items, + page = query.page, + size = query.pageSize, + total = total + ) + }.getOrDefault(Paginated(emptyList(), query.page, query.pageSize, 0)) +} diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/QueryBuilder.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/QueryBuilder.kt new file mode 100644 index 00000000..51e12e42 --- /dev/null +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/queries/QueryBuilder.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.shared.database.queries + +import org.jetbrains.exposed.sql.Op +import org.jetbrains.exposed.sql.Query +import org.jetbrains.exposed.sql.SqlExpressionBuilder +import org.jetbrains.exposed.sql.andWhere + +class QueryBuilder(private var query: Query) { + + fun where(condition: SqlExpressionBuilder.() -> Op) { + query = query.andWhere(condition) + } + + fun build(): Query = query +} diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt index 88cbf3c4..ed67355c 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt @@ -5,6 +5,9 @@ import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.PersistedEvent import no.iktdev.eventi.stores.EventStore import no.iktdev.mediaprocessing.shared.common.UtcNow +import no.iktdev.mediaprocessing.shared.common.dto.EventQuery +import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.EventsTable import no.iktdev.mediaprocessing.shared.database.withTransaction import org.jetbrains.exposed.sql.insert @@ -13,6 +16,52 @@ import java.time.Instant import java.util.* object EventStore: EventStore { + + fun getPagedEvents(query: EventQuery): Paginated = + pagedQuery( + table = EventsTable, + query = query, + sortColumns = mapOf( + "referenceId" to EventsTable.referenceId, + "eventId" to EventsTable.eventId, + "event" to EventsTable.event, + "persistedAt" to EventsTable.persistedAt + ), + applyFilters = { + + query.referenceId?.let { ref -> + where { EventsTable.referenceId like "%$ref%" } + } + + query.eventId?.let { id -> + where { EventsTable.eventId like "%$id%" } + } + + query.event?.let { ev -> + where { EventsTable.event like "%$ev%" } + } + + query.from?.let { from -> + where { EventsTable.persistedAt greaterEq from } + } + + query.to?.let { to -> + where { EventsTable.persistedAt lessEq to } + } + }, + mapper = { row -> + PersistedEvent( + id = row[EventsTable.id].value.toLong(), + referenceId = UUID.fromString(row[EventsTable.referenceId]), + eventId = UUID.fromString(row[EventsTable.eventId]), + event = row[EventsTable.event], + data = row[EventsTable.data], + persistedAt = row[EventsTable.persistedAt] + ) + } + ) + + override fun getPersistedEventsAfter(timestamp: Instant): List { val result = withTransaction { EventsTable.selectAll() diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt index c99835a6..8dd8ba37 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt @@ -6,45 +6,60 @@ import no.iktdev.eventi.models.store.PersistedTask import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.stores.TaskStore import no.iktdev.mediaprocessing.shared.common.UtcNow -import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks +import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery +import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.TasksTable import no.iktdev.mediaprocessing.shared.database.withTransaction -import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.insert +import org.jetbrains.exposed.sql.selectAll +import org.jetbrains.exposed.sql.update import java.time.Duration import java.util.* object TaskStore: TaskStore { - fun getPagedTasks(page: Int, size: Int): PagedTasks { - return withTransaction { - val total = TasksTable.selectAll().count() - val rows = TasksTable - .selectAll() - .orderBy(TasksTable.persistedAt, SortOrder.DESC) - .limit(size).offset(start = (page * size).toLong()) - .map { it -> - PersistedTask( - id = it[TasksTable.id].value.toLong(), - referenceId = UUID.fromString(it[TasksTable.referenceId]), - status = it[TasksTable.status], - taskId = UUID.fromString(it[TasksTable.taskId]), - task = it[TasksTable.task], - data = it[TasksTable.data], - claimed = it[TasksTable.claimed], - claimedBy = it[TasksTable.claimedBy], - consumed = it[TasksTable.consumed], - lastCheckIn = it[TasksTable.lastCheckIn], - persistedAt = it[TasksTable.persistedAt] - ) + fun getPagedTasks(query: TaskQuery): Paginated = + pagedQuery( + table = TasksTable, + query = query, + sortColumns = mapOf( + "taskId" to TasksTable.taskId, + "referenceId" to TasksTable.referenceId, + "status" to TasksTable.status, + "persistedAt" to TasksTable.persistedAt, + "lastCheckIn" to TasksTable.lastCheckIn + ), + applyFilters = { + query.status?.let { statuses -> + val enums = statuses.map { TaskStatus.valueOf(it) } + where { TasksTable.status inList enums } } - PagedTasks( - items = rows, - page = page, - size = size, - total = total - ) - }.getOrDefault(PagedTasks(emptyList(), page, size, 0)) - } + + query.claimed?.let { where { TasksTable.claimed eq it } } + query.consumed?.let { where { TasksTable.consumed eq it } } + query.referenceId?.let { where { TasksTable.referenceId like "%$it%" } } + query.from?.let { where { TasksTable.persistedAt greaterEq it } } + query.to?.let { where { TasksTable.persistedAt lessEq it } } + }, + mapper = { row -> + PersistedTask( + id = row[TasksTable.id].value.toLong(), + referenceId = UUID.fromString(row[TasksTable.referenceId]), + status = row[TasksTable.status], + taskId = UUID.fromString(row[TasksTable.taskId]), + task = row[TasksTable.task], + data = row[TasksTable.data], + claimed = row[TasksTable.claimed], + claimedBy = row[TasksTable.claimedBy], + consumed = row[TasksTable.consumed], + lastCheckIn = row[TasksTable.lastCheckIn], + persistedAt = row[TasksTable.persistedAt] + ) + } + ) + override fun persist(task: Task) {