Query changes
This commit is contained in:
parent
7343d51b15
commit
49a3002259
@ -1,6 +1,9 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.controller
|
package no.iktdev.mediaprocessing.coordinator.controller
|
||||||
|
|
||||||
import no.iktdev.mediaprocessing.coordinator.services.EventPagingService
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.services.EventService
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
|
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
|
||||||
import org.springframework.web.bind.annotation.*
|
import org.springframework.web.bind.annotation.*
|
||||||
import java.util.*
|
import java.util.*
|
||||||
@ -8,18 +11,22 @@ import java.util.*
|
|||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/events")
|
@RequestMapping("/events")
|
||||||
class EventsController(
|
class EventsController(
|
||||||
private val paging: EventPagingService
|
private val paging: EventService
|
||||||
) {
|
) {
|
||||||
|
|
||||||
|
@GetMapping()
|
||||||
|
fun getEvents(query: EventQuery): Paginated<PersistedEvent> {
|
||||||
|
return paging.getEvents(query)
|
||||||
|
}
|
||||||
|
|
||||||
@GetMapping("/sequence/{referenceId}")
|
@GetMapping("/sequence/{referenceId}")
|
||||||
fun getEvents(
|
fun getEventSequence(
|
||||||
@PathVariable referenceId: UUID,
|
@PathVariable referenceId: UUID,
|
||||||
@RequestParam(required = false) beforeEventId: UUID?,
|
@RequestParam(required = false) beforeEventId: UUID?,
|
||||||
@RequestParam(required = false) afterEventId: UUID?,
|
@RequestParam(required = false) afterEventId: UUID?,
|
||||||
@RequestParam(defaultValue = "50") limit: Int
|
@RequestParam(defaultValue = "50") limit: Int
|
||||||
): List<SequenceEvent> {
|
): List<SequenceEvent> {
|
||||||
return paging.getEvents(
|
return paging.getPagedEvents(
|
||||||
referenceId = referenceId,
|
referenceId = referenceId,
|
||||||
beforeEventId = beforeEventId,
|
beforeEventId = beforeEventId,
|
||||||
afterEventId = afterEventId,
|
afterEventId = afterEventId,
|
||||||
|
|||||||
@ -3,8 +3,12 @@ package no.iktdev.mediaprocessing.coordinator.controller
|
|||||||
|
|
||||||
import no.iktdev.eventi.models.store.PersistedTask
|
import no.iktdev.eventi.models.store.PersistedTask
|
||||||
import no.iktdev.mediaprocessing.coordinator.services.TaskService
|
import no.iktdev.mediaprocessing.coordinator.services.TaskService
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
import org.springframework.web.bind.annotation.*
|
import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping
|
||||||
|
import org.springframework.web.bind.annotation.PathVariable
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping
|
||||||
|
import org.springframework.web.bind.annotation.RestController
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@ -18,11 +22,10 @@ class TaskController(
|
|||||||
taskService.getActiveTasks()
|
taskService.getActiveTasks()
|
||||||
|
|
||||||
@GetMapping
|
@GetMapping
|
||||||
fun getPagedTasks(
|
fun getPagedTasks(query: TaskQuery): Paginated<PersistedTask> =
|
||||||
@RequestParam(defaultValue = "0") page: Int,
|
taskService.getPagedTasks(query)
|
||||||
@RequestParam(defaultValue = "50") size: Int,
|
|
||||||
): PagedTasks =
|
|
||||||
taskService.getPagedTasks(page, size)
|
|
||||||
|
|
||||||
@GetMapping("/{id}")
|
@GetMapping("/{id}")
|
||||||
fun getTask(@PathVariable id: UUID): PersistedTask? =
|
fun getTask(@PathVariable id: UUID): PersistedTask? =
|
||||||
|
|||||||
@ -1,6 +1,9 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.services
|
package no.iktdev.mediaprocessing.coordinator.services
|
||||||
|
|
||||||
import no.iktdev.eventi.ZDS.toEvent
|
import no.iktdev.eventi.ZDS.toEvent
|
||||||
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
|
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.toDto
|
import no.iktdev.mediaprocessing.shared.common.dto.toDto
|
||||||
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
||||||
@ -8,9 +11,9 @@ import org.springframework.stereotype.Service
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class EventPagingService {
|
class EventService {
|
||||||
|
|
||||||
fun getEvents(
|
fun getPagedEvents(
|
||||||
referenceId: UUID,
|
referenceId: UUID,
|
||||||
beforeEventId: UUID?,
|
beforeEventId: UUID?,
|
||||||
afterEventId: UUID?,
|
afterEventId: UUID?,
|
||||||
@ -38,4 +41,8 @@ class EventPagingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun getEvents(query: EventQuery): Paginated<PersistedEvent> {
|
||||||
|
return EventStore.getPagedEvents(query)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,7 +1,8 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.services
|
package no.iktdev.mediaprocessing.coordinator.services
|
||||||
|
|
||||||
import no.iktdev.eventi.models.store.PersistedTask
|
import no.iktdev.eventi.models.store.PersistedTask
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery
|
||||||
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
|
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import java.util.*
|
import java.util.*
|
||||||
@ -15,8 +16,8 @@ class TaskService {
|
|||||||
return TaskStore.findActiveTasks()
|
return TaskStore.findActiveTasks()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getPagedTasks(page: Int, size: Int): PagedTasks {
|
fun getPagedTasks(page: TaskQuery): Paginated<PersistedTask> {
|
||||||
return TaskStore.getPagedTasks(page, size)
|
return TaskStore.getPagedTasks(page)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getTaskById(taskId: UUID): PersistedTask? {
|
fun getTaskById(taskId: UUID): PersistedTask? {
|
||||||
|
|||||||
@ -0,0 +1,16 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
|
data class EventQuery(
|
||||||
|
val referenceId: String? = null,
|
||||||
|
val eventId: String? = null,
|
||||||
|
val event: String? = null,
|
||||||
|
val from: Instant? = null,
|
||||||
|
val to: Instant? = null,
|
||||||
|
override val sort: String = "persistedAt",
|
||||||
|
override val order: Sort = Sort.DESC,
|
||||||
|
override val page: Int = 0,
|
||||||
|
override val pageSize: Int = 50
|
||||||
|
) : PagedQuery
|
||||||
|
|
||||||
@ -0,0 +1,8 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
|
interface PagedQuery {
|
||||||
|
val page: Int
|
||||||
|
val pageSize: Int
|
||||||
|
val sort: String
|
||||||
|
val order: Sort
|
||||||
|
}
|
||||||
@ -1,9 +1,7 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.common.dto
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
import no.iktdev.eventi.models.store.PersistedTask
|
data class Paginated<T>(
|
||||||
|
val items: List<T>,
|
||||||
data class PagedTasks(
|
|
||||||
val items: List<PersistedTask>,
|
|
||||||
val page: Int,
|
val page: Int,
|
||||||
val size: Int,
|
val size: Int,
|
||||||
val total: Long
|
val total: Long
|
||||||
@ -0,0 +1,6 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
|
enum class Sort {
|
||||||
|
ASC,
|
||||||
|
DESC
|
||||||
|
}
|
||||||
@ -0,0 +1,37 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
|
import org.springframework.util.LinkedMultiValueMap
|
||||||
|
import org.springframework.util.MultiValueMap
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
|
data class TaskQuery(
|
||||||
|
val status: List<String>? = null,
|
||||||
|
val claimed: Boolean? = null,
|
||||||
|
val consumed: Boolean? = null,
|
||||||
|
val referenceId: String? = null,
|
||||||
|
val from: Instant? = null,
|
||||||
|
val to: Instant? = null,
|
||||||
|
override val sort: String = "persistedAt",
|
||||||
|
override val order: Sort = Sort.DESC,
|
||||||
|
override val page: Int = 0,
|
||||||
|
override val pageSize: Int = 50
|
||||||
|
): PagedQuery {
|
||||||
|
fun toQueryParams(): MultiValueMap<String, String> {
|
||||||
|
val params = LinkedMultiValueMap<String, String>()
|
||||||
|
|
||||||
|
status?.forEach { params.add("status", it) }
|
||||||
|
claimed?.let { params.add("claimed", it.toString()) }
|
||||||
|
consumed?.let { params.add("consumed", it.toString()) }
|
||||||
|
referenceId?.let { params.add("referenceId", it) }
|
||||||
|
from?.let { params.add("from", it.toString()) }
|
||||||
|
to?.let { params.add("to", it.toString()) }
|
||||||
|
params.add("sort", sort)
|
||||||
|
params.add("order", order.name)
|
||||||
|
params.add("page", page.toString())
|
||||||
|
params.add("pageSize", pageSize.toString())
|
||||||
|
|
||||||
|
return params
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@ -0,0 +1,48 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.database.queries
|
||||||
|
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.PagedQuery
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.Sort
|
||||||
|
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||||
|
import org.jetbrains.exposed.sql.*
|
||||||
|
|
||||||
|
fun <T> pagedQuery(
|
||||||
|
table: Table,
|
||||||
|
query: PagedQuery,
|
||||||
|
sortColumns: Map<String, Column<*>>,
|
||||||
|
applyFilters: QueryBuilder.() -> Unit,
|
||||||
|
mapper: (ResultRow) -> T
|
||||||
|
): Paginated<T> {
|
||||||
|
|
||||||
|
return withTransaction {
|
||||||
|
|
||||||
|
// 1. Start query
|
||||||
|
var base = table.selectAll()
|
||||||
|
|
||||||
|
// 2. Apply filters
|
||||||
|
val builder = QueryBuilder(base)
|
||||||
|
builder.applyFilters()
|
||||||
|
base = builder.build()
|
||||||
|
|
||||||
|
// 3. Count before paging
|
||||||
|
val total = base.count()
|
||||||
|
|
||||||
|
// 4. Sorting
|
||||||
|
val sortColumn = sortColumns[query.sort] ?: error("Unknown sort: ${query.sort}")
|
||||||
|
val sortOrder = if (query.order == Sort.ASC) SortOrder.ASC else SortOrder.DESC
|
||||||
|
|
||||||
|
// 5. Paging + mapping
|
||||||
|
val items = base
|
||||||
|
.orderBy(sortColumn, sortOrder)
|
||||||
|
.limit(query.pageSize)
|
||||||
|
.offset((query.page * query.pageSize).toLong())
|
||||||
|
.map(mapper)
|
||||||
|
|
||||||
|
Paginated(
|
||||||
|
items = items,
|
||||||
|
page = query.page,
|
||||||
|
size = query.pageSize,
|
||||||
|
total = total
|
||||||
|
)
|
||||||
|
}.getOrDefault(Paginated(emptyList(), query.page, query.pageSize, 0))
|
||||||
|
}
|
||||||
@ -0,0 +1,15 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.database.queries
|
||||||
|
|
||||||
|
import org.jetbrains.exposed.sql.Op
|
||||||
|
import org.jetbrains.exposed.sql.Query
|
||||||
|
import org.jetbrains.exposed.sql.SqlExpressionBuilder
|
||||||
|
import org.jetbrains.exposed.sql.andWhere
|
||||||
|
|
||||||
|
class QueryBuilder(private var query: Query) {
|
||||||
|
|
||||||
|
fun where(condition: SqlExpressionBuilder.() -> Op<Boolean>) {
|
||||||
|
query = query.andWhere(condition)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun build(): Query = query
|
||||||
|
}
|
||||||
@ -5,6 +5,9 @@ import no.iktdev.eventi.models.Event
|
|||||||
import no.iktdev.eventi.models.store.PersistedEvent
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
import no.iktdev.eventi.stores.EventStore
|
import no.iktdev.eventi.stores.EventStore
|
||||||
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
|
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
||||||
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
||||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||||
import org.jetbrains.exposed.sql.insert
|
import org.jetbrains.exposed.sql.insert
|
||||||
@ -13,6 +16,52 @@ import java.time.Instant
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
object EventStore: EventStore {
|
object EventStore: EventStore {
|
||||||
|
|
||||||
|
fun getPagedEvents(query: EventQuery): Paginated<PersistedEvent> =
|
||||||
|
pagedQuery(
|
||||||
|
table = EventsTable,
|
||||||
|
query = query,
|
||||||
|
sortColumns = mapOf(
|
||||||
|
"referenceId" to EventsTable.referenceId,
|
||||||
|
"eventId" to EventsTable.eventId,
|
||||||
|
"event" to EventsTable.event,
|
||||||
|
"persistedAt" to EventsTable.persistedAt
|
||||||
|
),
|
||||||
|
applyFilters = {
|
||||||
|
|
||||||
|
query.referenceId?.let { ref ->
|
||||||
|
where { EventsTable.referenceId like "%$ref%" }
|
||||||
|
}
|
||||||
|
|
||||||
|
query.eventId?.let { id ->
|
||||||
|
where { EventsTable.eventId like "%$id%" }
|
||||||
|
}
|
||||||
|
|
||||||
|
query.event?.let { ev ->
|
||||||
|
where { EventsTable.event like "%$ev%" }
|
||||||
|
}
|
||||||
|
|
||||||
|
query.from?.let { from ->
|
||||||
|
where { EventsTable.persistedAt greaterEq from }
|
||||||
|
}
|
||||||
|
|
||||||
|
query.to?.let { to ->
|
||||||
|
where { EventsTable.persistedAt lessEq to }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
mapper = { row ->
|
||||||
|
PersistedEvent(
|
||||||
|
id = row[EventsTable.id].value.toLong(),
|
||||||
|
referenceId = UUID.fromString(row[EventsTable.referenceId]),
|
||||||
|
eventId = UUID.fromString(row[EventsTable.eventId]),
|
||||||
|
event = row[EventsTable.event],
|
||||||
|
data = row[EventsTable.data],
|
||||||
|
persistedAt = row[EventsTable.persistedAt]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
|
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
|
||||||
val result = withTransaction {
|
val result = withTransaction {
|
||||||
EventsTable.selectAll()
|
EventsTable.selectAll()
|
||||||
|
|||||||
@ -6,45 +6,60 @@ import no.iktdev.eventi.models.store.PersistedTask
|
|||||||
import no.iktdev.eventi.models.store.TaskStatus
|
import no.iktdev.eventi.models.store.TaskStatus
|
||||||
import no.iktdev.eventi.stores.TaskStore
|
import no.iktdev.eventi.stores.TaskStore
|
||||||
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.PagedTasks
|
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery
|
||||||
|
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
||||||
import no.iktdev.mediaprocessing.shared.database.tables.TasksTable
|
import no.iktdev.mediaprocessing.shared.database.tables.TasksTable
|
||||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||||
import org.jetbrains.exposed.sql.*
|
import org.jetbrains.exposed.sql.and
|
||||||
|
import org.jetbrains.exposed.sql.insert
|
||||||
|
import org.jetbrains.exposed.sql.selectAll
|
||||||
|
import org.jetbrains.exposed.sql.update
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
object TaskStore: TaskStore {
|
object TaskStore: TaskStore {
|
||||||
|
|
||||||
fun getPagedTasks(page: Int, size: Int): PagedTasks {
|
fun getPagedTasks(query: TaskQuery): Paginated<PersistedTask> =
|
||||||
return withTransaction {
|
pagedQuery(
|
||||||
val total = TasksTable.selectAll().count()
|
table = TasksTable,
|
||||||
val rows = TasksTable
|
query = query,
|
||||||
.selectAll()
|
sortColumns = mapOf(
|
||||||
.orderBy(TasksTable.persistedAt, SortOrder.DESC)
|
"taskId" to TasksTable.taskId,
|
||||||
.limit(size).offset(start = (page * size).toLong())
|
"referenceId" to TasksTable.referenceId,
|
||||||
.map { it ->
|
"status" to TasksTable.status,
|
||||||
PersistedTask(
|
"persistedAt" to TasksTable.persistedAt,
|
||||||
id = it[TasksTable.id].value.toLong(),
|
"lastCheckIn" to TasksTable.lastCheckIn
|
||||||
referenceId = UUID.fromString(it[TasksTable.referenceId]),
|
),
|
||||||
status = it[TasksTable.status],
|
applyFilters = {
|
||||||
taskId = UUID.fromString(it[TasksTable.taskId]),
|
query.status?.let { statuses ->
|
||||||
task = it[TasksTable.task],
|
val enums = statuses.map { TaskStatus.valueOf(it) }
|
||||||
data = it[TasksTable.data],
|
where { TasksTable.status inList enums }
|
||||||
claimed = it[TasksTable.claimed],
|
|
||||||
claimedBy = it[TasksTable.claimedBy],
|
|
||||||
consumed = it[TasksTable.consumed],
|
|
||||||
lastCheckIn = it[TasksTable.lastCheckIn],
|
|
||||||
persistedAt = it[TasksTable.persistedAt]
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
PagedTasks(
|
|
||||||
items = rows,
|
query.claimed?.let { where { TasksTable.claimed eq it } }
|
||||||
page = page,
|
query.consumed?.let { where { TasksTable.consumed eq it } }
|
||||||
size = size,
|
query.referenceId?.let { where { TasksTable.referenceId like "%$it%" } }
|
||||||
total = total
|
query.from?.let { where { TasksTable.persistedAt greaterEq it } }
|
||||||
)
|
query.to?.let { where { TasksTable.persistedAt lessEq it } }
|
||||||
}.getOrDefault(PagedTasks(emptyList(), page, size, 0))
|
},
|
||||||
}
|
mapper = { row ->
|
||||||
|
PersistedTask(
|
||||||
|
id = row[TasksTable.id].value.toLong(),
|
||||||
|
referenceId = UUID.fromString(row[TasksTable.referenceId]),
|
||||||
|
status = row[TasksTable.status],
|
||||||
|
taskId = UUID.fromString(row[TasksTable.taskId]),
|
||||||
|
task = row[TasksTable.task],
|
||||||
|
data = row[TasksTable.data],
|
||||||
|
claimed = row[TasksTable.claimed],
|
||||||
|
claimedBy = row[TasksTable.claimedBy],
|
||||||
|
consumed = row[TasksTable.consumed],
|
||||||
|
lastCheckIn = row[TasksTable.lastCheckIn],
|
||||||
|
persistedAt = row[TasksTable.persistedAt]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
override fun persist(task: Task) {
|
override fun persist(task: Task) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user