This commit is contained in:
bskjon 2024-06-28 00:45:10 +02:00
parent bbae7d932d
commit ddfd2a0e5d
5 changed files with 57 additions and 113 deletions

View File

@ -0,0 +1,57 @@
package no.iktdev.mediaprocessing.ui.service
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.contract.dto.EventsDto
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.ui.getEventsDatabase
import org.jetbrains.exposed.sql.*
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import javax.annotation.PostConstruct
@Service
class PersistentEventsTableService {
val dzz = DeserializingRegistry()
private var latestPull: LocalDateTime = LocalDateTime.MIN
val cachedEvents: MutableMap<String, List<EventsDto>> = mutableMapOf()
fun pullEvents() {
val pulled = withTransaction(getEventsDatabase()) {
val cached = latestPull
latestPull = LocalDateTime.now()
events.select {
(events.created greaterEq cached)
}
.orderBy(events.created, SortOrder.ASC)
.toEvent(dzz)
.groupBy { it.referenceId }
} ?: emptyMap()
pulled.forEach { (rid, events) ->
val cEvents = cachedEvents[rid] ?: emptyList()
cachedEvents[rid] = cEvents + events
}
}
fun Query?.toEvent(dzz: DeserializingRegistry): List<EventsDto> {
return this?.mapNotNull { fromRow(it, dzz) } ?: emptyList()
}
fun fromRow(row: ResultRow, dez: DeserializingRegistry): EventsDto? {
return EventsDto(
referenceId = row[events.referenceId],
eventId = row[events.eventId],
event = row[events.event],
data = row[events.data],
created = row[events.created]
)
}
@PostConstruct
fun onInitializationCompleted() {
pullEvents()
}
}

View File

@ -1,9 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.helper
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
data class DerivedProcessIterationHolder(
val eventId: String,
val event: PersistentProcessDataMessage,
var iterated: Int = 0
)

View File

@ -40,68 +40,8 @@ class PersistentDataReader(var dataSource: DataSource) {
return result
}
@Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted")
fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean {
val result = withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }.singleOrNull()
}
return result?.claimed ?: true
}
@Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted")
fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimedBy eq claimedBy)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
}?.singleOrNull()?.consumed ?: false
}
@Deprecated(message = "Use PersistentEventManager.getProcessEventsClaimable")
fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.claimed eq false) and
(processerEvents.consumed eq false)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getProcessEventsWithExpiredClaim")
fun getExpiredClaimsProcessEvents(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
val entries = withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.claimed eq true) and
(processerEvents.consumed neq true)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline }
}
@Deprecated("Use PersistentEventManager.getProcessEventWith")
fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? {
val message = withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
}?.singleOrNull()
return message
}
@Deprecated("Use PersistentEventManager.getAllEventsProcesser")
fun getProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.selectAll()
.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
}
}

View File

@ -187,7 +187,3 @@ fun List<PersistentMessage>?.toGrouped(): List<List<PersistentMessage>> {
fun Query?.toPersistentMessage(dzz: DeserializingRegistry): List<PersistentMessage> {
return this?.mapNotNull { fromRowToPersistentMessage(it, dzz) } ?: emptyList()
}
fun Query?.toPersistentProcesserMessage(dzz: DeserializingRegistry): List<PersistentProcessDataMessage> {
return this?.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } ?: emptyList()
}

View File

@ -1,40 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime
data class PersistentProcessDataMessage(
val referenceId: String,
val eventId: String,
val event: KafkaEvents,
val data: MessageDataWrapper,
val created: LocalDateTime,
val claimedBy: String? = null,
val claimed: Boolean = false,
val consumed: Boolean = false,
val lastCheckIn: LocalDateTime? = null
)
fun fromRowToPersistentProcessDataMessage(row: ResultRow, dez: DeserializingRegistry): PersistentProcessDataMessage? {
val kev = try {
KafkaEvents.toEvent(row[event])
} catch (e: IllegalArgumentException) {
e.printStackTrace()
return null
}?: return null
val dzdata = dez.deserializeData(kev, row[data])
return PersistentProcessDataMessage(
referenceId = row[referenceId],
eventId = row[eventId],
event = kev,
data = dzdata,
created = row[created],
claimed = row[claimed],
claimedBy = row[claimedBy],
consumed = row[consumed],
lastCheckIn = row[lastCheckIn]
)
}