Integrity update
This commit is contained in:
parent
9dd1825064
commit
b3366a2b5f
@ -164,7 +164,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
|
||||
log.error { "Can't produce error message when the referenceId is not present" }
|
||||
return
|
||||
}
|
||||
log.info { "Encode failed for ${runner.referenceId}" }
|
||||
log.info { "Encode failed for ${runner.referenceId}\n$errorMessage" }
|
||||
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
|
||||
data = ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
|
||||
)
|
||||
|
||||
@ -159,7 +159,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
|
||||
log.error { "Can't produce error message when the referenceId is not present" }
|
||||
return
|
||||
}
|
||||
log.info { "Extract failed for ${runner.referenceId}" }
|
||||
log.info { "Extract failed for ${runner.referenceId}\n$errorMessage" }
|
||||
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
|
||||
ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
|
||||
)
|
||||
|
||||
@ -10,7 +10,9 @@ import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
|
||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
||||
import java.security.MessageDigest
|
||||
import java.time.LocalDateTime
|
||||
import kotlin.text.Charsets.UTF_8
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
@ -110,8 +112,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
|
||||
|
||||
fun getEventsUncompleted(): List<List<PersistentMessage>> {
|
||||
val identifiesAsCompleted = listOf(
|
||||
/*KafkaEvents.EventRequestProcessCompleted,
|
||||
KafkaEvents.EventMediaProcessCompleted,*/
|
||||
KafkaEvents.EventCollectAndStore
|
||||
)
|
||||
val all = getAllEventsGrouped()
|
||||
@ -153,6 +153,13 @@ class PersistentEventManager(private val dataSource: DataSource) {
|
||||
|
||||
//region Database write
|
||||
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
@OptIn(ExperimentalStdlibApi::class)
|
||||
private fun getIntegrityOfData(data : String) : String {
|
||||
return digest.digest(data.toByteArray(kotlin.text.Charsets.UTF_8))
|
||||
.toHexString()
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the kafka event and its data in the database as PersistentMessage
|
||||
* @param event KafkaEvents
|
||||
@ -174,6 +181,7 @@ class PersistentEventManager(private val dataSource: DataSource) {
|
||||
it[referenceId] = message.referenceId
|
||||
it[eventId] = message.eventId
|
||||
it[events.event] = event.event
|
||||
it[integrity] = getIntegrityOfData(message.dataAsJson())
|
||||
it[data] = message.dataAsJson()
|
||||
}
|
||||
}
|
||||
|
||||
@ -11,10 +11,11 @@ object events: IntIdTable() {
|
||||
val eventId: Column<String> = varchar("eventId", 50)
|
||||
val event: Column<String> = varchar("event",100)
|
||||
val data: Column<String> = text("data")
|
||||
val integrity: Column<String> = varchar("integrity", 250)
|
||||
//val success: Column<Boolean> = bool("success").default(false)
|
||||
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
||||
|
||||
init {
|
||||
uniqueIndex(referenceId, eventId, event)
|
||||
uniqueIndex(referenceId, eventId, event, integrity)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user