Small adjustments

This commit is contained in:
Brage 2024-02-29 23:34:51 +01:00
parent fc29b57cbe
commit 08f0a66b20
2 changed files with 26 additions and 5 deletions

View File

@ -3,11 +3,31 @@ package no.iktdev.mediaprocessing.shared.common.datasource
import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
import java.sql.Connection
open class TableDefaultOperations<T : Table> { open class TableDefaultOperations<T : Table> {
} }
fun <T> withDirtyRead(block: () -> T): T? {
return try {
transaction(transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) {
try {
block()
} catch (e: Exception) {
e.printStackTrace()
// log the error here or handle the exception as needed
throw e // Optionally, you can rethrow the exception if needed
}
}
} catch (e: Exception) {
e.printStackTrace()
// log the error here or handle the exception as needed
null
}
}
fun <T> withTransaction(block: () -> T): T? { fun <T> withTransaction(block: () -> T): T? {
return try { return try {
transaction { transaction {

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.persistance package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -26,7 +27,7 @@ class PersistentDataReader {
} }
fun getUncompletedMessages(): List<List<PersistentMessage>> { fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withTransaction { val result = withDirtyRead {
events.selectAll() events.selectAll()
.andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event } .andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event }
.groupBy { it[events.referenceId] } .groupBy { it[events.referenceId] }
@ -36,7 +37,7 @@ class PersistentDataReader {
} }
fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean {
val result = withTransaction { val result = withDirtyRead {
processerEvents.select { processerEvents.select {
(processerEvents.referenceId eq referenceId) and (processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) (processerEvents.eventId eq eventId)
@ -46,7 +47,7 @@ class PersistentDataReader {
} }
fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction { return withDirtyRead {
processerEvents.select { processerEvents.select {
(processerEvents.referenceId eq referenceId) and (processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and (processerEvents.eventId eq eventId) and
@ -56,7 +57,7 @@ class PersistentDataReader {
} }
fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> { fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction { return withDirtyRead {
processerEvents.select { processerEvents.select {
(processerEvents.claimed eq false) and (processerEvents.claimed eq false) and
(processerEvents.consumed eq false) (processerEvents.consumed eq false)
@ -76,7 +77,7 @@ class PersistentDataReader {
} }
fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? {
val message = withTransaction { val message = withDirtyRead {
processerEvents.select { processerEvents.select {
(processerEvents.referenceId eq referenceId) and (processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) (processerEvents.eventId eq eventId)