diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt index e08d9422..4d3d71a5 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -3,11 +3,31 @@ package no.iktdev.mediaprocessing.shared.common.datasource import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.transactions.transaction +import java.sql.Connection open class TableDefaultOperations { } +fun withDirtyRead(block: () -> T): T? { + return try { + transaction(transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) { + try { + block() + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + throw e // Optionally, you can rethrow the exception if needed + } + } + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + null + } +} + + fun withTransaction(block: () -> T): T? { return try { transaction { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index 4d89540f..f8e4e7dd 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.common.persistance +import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents @@ -26,7 +27,7 @@ class PersistentDataReader { } fun getUncompletedMessages(): List> { - val result = withTransaction { + val result = withDirtyRead { events.selectAll() .andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event } .groupBy { it[events.referenceId] } @@ -36,7 +37,7 @@ class PersistentDataReader { } fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { - val result = withTransaction { + val result = withDirtyRead { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) @@ -46,7 +47,7 @@ class PersistentDataReader { } fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withTransaction { + return withDirtyRead { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) and @@ -56,7 +57,7 @@ class PersistentDataReader { } fun getAvailableProcessEvents(): List { - return withTransaction { + return withDirtyRead { processerEvents.select { (processerEvents.claimed eq false) and (processerEvents.consumed eq false) @@ -76,7 +77,7 @@ class PersistentDataReader { } fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { - val message = withTransaction { + val message = withDirtyRead { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId)