diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/PersistentEventsTableService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/PersistentEventsTableService.kt new file mode 100644 index 00000000..961e31f4 --- /dev/null +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/PersistentEventsTableService.kt @@ -0,0 +1,57 @@ +package no.iktdev.mediaprocessing.ui.service + +import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction +import no.iktdev.mediaprocessing.shared.common.persistance.events +import no.iktdev.mediaprocessing.shared.contract.dto.EventsDto +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import no.iktdev.mediaprocessing.ui.getEventsDatabase +import org.jetbrains.exposed.sql.* +import org.springframework.stereotype.Service +import java.time.LocalDateTime +import javax.annotation.PostConstruct + +@Service +class PersistentEventsTableService { + val dzz = DeserializingRegistry() + private var latestPull: LocalDateTime = LocalDateTime.MIN + + val cachedEvents: MutableMap> = mutableMapOf() + + + fun pullEvents() { + val pulled = withTransaction(getEventsDatabase()) { + val cached = latestPull + latestPull = LocalDateTime.now() + events.select { + (events.created greaterEq cached) + } + .orderBy(events.created, SortOrder.ASC) + .toEvent(dzz) + .groupBy { it.referenceId } + } ?: emptyMap() + pulled.forEach { (rid, events) -> + val cEvents = cachedEvents[rid] ?: emptyList() + cachedEvents[rid] = cEvents + events + } + } + + + fun Query?.toEvent(dzz: DeserializingRegistry): List { + return this?.mapNotNull { fromRow(it, dzz) } ?: emptyList() + } + fun fromRow(row: ResultRow, dez: DeserializingRegistry): EventsDto? { + return EventsDto( + referenceId = row[events.referenceId], + eventId = row[events.eventId], + event = row[events.event], + data = row[events.data], + created = row[events.created] + ) + } + + + @PostConstruct + fun onInitializationCompleted() { + pullEvents() + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/helper/DerivedProcessIterationHolder.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/helper/DerivedProcessIterationHolder.kt deleted file mode 100644 index 788cd4d4..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/helper/DerivedProcessIterationHolder.kt +++ /dev/null @@ -1,9 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.helper - -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage - -data class DerivedProcessIterationHolder( - val eventId: String, - val event: PersistentProcessDataMessage, - var iterated: Int = 0 -) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index 2db8ca9a..a3b12ec0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -40,68 +40,8 @@ class PersistentDataReader(var dataSource: DataSource) { return result } - @Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted") - fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { - val result = withDirtyRead(dataSource.database) { - processerEvents.select { - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) - }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }.singleOrNull() - } - return result?.claimed ?: true - } - @Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted") - fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withDirtyRead(dataSource.database) { - processerEvents.select { - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) and - (processerEvents.claimedBy eq claimedBy) - }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } - }?.singleOrNull()?.consumed ?: false - } - @Deprecated(message = "Use PersistentEventManager.getProcessEventsClaimable") - fun getAvailableProcessEvents(): List { - return withDirtyRead(dataSource.database) { - processerEvents.select { - (processerEvents.claimed eq false) and - (processerEvents.consumed eq false) - }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } - } ?: emptyList() - } - - @Deprecated("Use PersistentEventManager.getProcessEventsWithExpiredClaim") - fun getExpiredClaimsProcessEvents(): List { - val deadline = LocalDateTime.now() - val entries = withTransaction(dataSource.database) { - processerEvents.select { - (processerEvents.claimed eq true) and - (processerEvents.consumed neq true) - }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } - } ?: emptyList() - return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline } - } - - @Deprecated("Use PersistentEventManager.getProcessEventWith") - fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { - val message = withDirtyRead(dataSource.database) { - processerEvents.select { - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) - }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } - }?.singleOrNull() - return message - } - - @Deprecated("Use PersistentEventManager.getAllEventsProcesser") - fun getProcessEvents(): List { - return withTransaction(dataSource.database) { - processerEvents.selectAll() - .mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } - } ?: emptyList() - } } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index 3782fc78..4813dbe6 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -187,7 +187,3 @@ fun List?.toGrouped(): List> { fun Query?.toPersistentMessage(dzz: DeserializingRegistry): List { return this?.mapNotNull { fromRowToPersistentMessage(it, dzz) } ?: emptyList() } - -fun Query?.toPersistentProcesserMessage(dzz: DeserializingRegistry): List { - return this?.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } ?: emptyList() -} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt deleted file mode 100644 index 498c9b72..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt +++ /dev/null @@ -1,40 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.persistance - -import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import org.jetbrains.exposed.sql.ResultRow -import java.time.LocalDateTime - -data class PersistentProcessDataMessage( - val referenceId: String, - val eventId: String, - val event: KafkaEvents, - val data: MessageDataWrapper, - val created: LocalDateTime, - val claimedBy: String? = null, - val claimed: Boolean = false, - val consumed: Boolean = false, - val lastCheckIn: LocalDateTime? = null -) - -fun fromRowToPersistentProcessDataMessage(row: ResultRow, dez: DeserializingRegistry): PersistentProcessDataMessage? { - val kev = try { - KafkaEvents.toEvent(row[event]) - } catch (e: IllegalArgumentException) { - e.printStackTrace() - return null - }?: return null - val dzdata = dez.deserializeData(kev, row[data]) - return PersistentProcessDataMessage( - referenceId = row[referenceId], - eventId = row[eventId], - event = kev, - data = dzdata, - created = row[created], - claimed = row[claimed], - claimedBy = row[claimedBy], - consumed = row[consumed], - lastCheckIn = row[lastCheckIn] - ) -} \ No newline at end of file