Changed events and deleting replaced ones

This commit is contained in:
bskjon 2024-04-03 01:43:22 +02:00
parent 98527ec02f
commit 6090c2e8c0
6 changed files with 127 additions and 21 deletions

View File

@ -1,22 +1,22 @@
package no.iktdev.mediaprocessing.coordinator package no.iktdev.mediaprocessing.coordinator
import com.google.gson.Gson
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.UUID import java.util.UUID
@ -34,8 +34,11 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
if (!success) { if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" } log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" }
} else { } else {
deleteOlderEventsIfSuperseded(event.key, event.value)
io.launch { io.launch {
delay(500) // Give the database a few sec to update delay(1000) // Give the database a few sec to update
readAllMessagesFor(event.value.referenceId, event.value.eventId) readAllMessagesFor(event.value.referenceId, event.value.eventId)
} }
} }
@ -91,7 +94,11 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} }
fun permitWorkToProceedOn(referenceId: String, message: String) { fun permitWorkToProceedOn(referenceId: String, message: String) {
producer.sendMessage(referenceId = referenceId, KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED, SimpleMessageData(Status.COMPLETED, message)) producer.sendMessage(
referenceId = referenceId,
KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED,
SimpleMessageData(Status.COMPLETED, message)
)
} }
@ -135,21 +142,68 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
} }
io.launch {
buildModelBasedOnMessagesFor(referenceId, messages)
}
} }
fun getProcessStarted(messages: List<PersistentMessage>): MediaProcessStarted? { fun getProcessStarted(messages: List<PersistentMessage>): MediaProcessStarted? {
return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted
} }
suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List<PersistentMessage>) {
if (messages.any { it.data is ProcessCompleted }) { fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message<out MessageDataWrapper>) {
// TODO: Build and insert into database var existingMessages = persistentReader.getMessagesFor(value.referenceId)
if (!KafkaEvents.isOfWork(event)) {
val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId }
superseded.forEach {
persistentWriter.deleteStoredEventDataMessage(
referenceId = it.referenceId,
eventId = it.eventId,
event = it.event
)
}
}
existingMessages = persistentReader.getMessagesFor(value.referenceId)
val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) }
for (item: PersistentMessage in workItems) {
val originatorId = if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED) ||
item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
) {
val ec = item.data as FfmpegWorkRequestCreated
ec.derivedFromEventId
} else if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED)) {
try {
(item.data as ProcesserEncodeWorkPerformed).derivedFromEventId
} catch (e: Exception) {
null
}
} else if (item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED)) {
try {
(item.data as ProcesserExtractWorkPerformed).derivedFromEventId
} catch (e: Exception) {
null
}
} else null
originatorId?.let { originator ->
deleteEventsIfNoOriginator(item.referenceId, item.eventId, item.event, originator, existingMessages)
} }
} }
} }
private fun deleteEventsIfNoOriginator(
referenceId: String,
eventId: String,
event: KafkaEvents,
originatorId: String,
existingMessages: List<PersistentMessage>
) {
val originator = existingMessages.find { it.eventId == originatorId }
if (originator == null) {
persistentWriter.deleteStoredEventDataMessage(referenceId, eventId, event)
}
}
}

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.mapping package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto
@ -33,7 +34,7 @@ class ProcessMapping(val events: List<PersistentMessage>) {
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED} val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED}
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED }
val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED } val isSkipped = events.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
@ -44,7 +45,7 @@ class ProcessMapping(val events: List<PersistentMessage>) {
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED }
val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED } val isSkipped = events.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
@ -52,7 +53,7 @@ class ProcessMapping(val events: List<PersistentMessage>) {
fun waitsForConvert(): Boolean { fun waitsForConvert(): Boolean {
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED }
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED }
val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED } val isSkipped = events.filter { it.isSkipped() }
return created.size > performed.size + isSkipped.size return created.size > performed.size + isSkipped.size
} }

View File

@ -48,6 +48,8 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
if (!success) { if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}" } log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}" }
} else { } else {
deleteOlderEventsIfSuperseded(event.key, event.value)
io.launch { io.launch {
delay(500) delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId) readAllMessagesFor(event.value.referenceId, event.value.eventId)
@ -55,6 +57,24 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
} }
} }
fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message<out MessageDataWrapper>) {
val existingMessages = persistentReader.getMessagesFor(value.referenceId)
val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) }
if (KafkaEvents.isOfWork(event)) {
// Here i would need to list all of the work events, then proceed to check which one of the derivedId does not correspond to a entry
// Nonmatching has been superseded
val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId }
superseded.forEach {
persistentWriter.deleteStoredEventDataMessage(referenceId = it.referenceId, eventId = it.eventId, event= it.event )
}
}
}

View File

@ -5,9 +5,12 @@ import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.deleteWhere
import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.update import org.jetbrains.exposed.sql.update
@ -42,6 +45,16 @@ open class PersistentDataStore(var dataSource: DataSource) {
} }
} }
fun deleteStoredEventDataMessage(referenceId: String, eventId: String, event: KafkaEvents): Boolean {
return executeWithStatus(dataSource.database) {
events.deleteWhere {
(events.referenceId eq referenceId) and
(events.eventId eq eventId) and
(events.event eq event.event)
}
}
}
fun storeProcessDataMessage(event: String, message: Message<*>): Boolean { fun storeProcessDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException(dataSource.database) { val exception = executeOrException(dataSource.database) {
processerEvents.insert { processerEvents.insert {

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSkipped
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime import java.time.LocalDateTime
@ -29,6 +30,15 @@ fun PersistentMessage.isSuccess(): Boolean {
} }
} }
fun PersistentMessage.isSkipped(): Boolean {
return try {
this.data.isSkipped()
} catch (e: Exception) {
false
}
}
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try { val kev = try {

View File

@ -28,11 +28,6 @@ enum class KafkaEvents(val event: String) {
EVENT_WORK_DOWNLOAD_COVER_PERFORMED("event:work-download-cover:performed"), EVENT_WORK_DOWNLOAD_COVER_PERFORMED("event:work-download-cover:performed"),
EVENT_WORK_ENCODE_SKIPPED("event:work-encode:skipped"),
EVENT_WORK_EXTRACT_SKIPPED("event:work-extract:skipped"),
EVENT_WORK_CONVERT_SKIPPED("event:work-convert:skipped"),
EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"), EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"),
EVENT_STORE_SUBTITLE_PERFORMED("event:store-subtitle:performed"), EVENT_STORE_SUBTITLE_PERFORMED("event:store-subtitle:performed"),
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
@ -46,5 +41,18 @@ enum class KafkaEvents(val event: String) {
fun toEvent(event: String): KafkaEvents? { fun toEvent(event: String): KafkaEvents? {
return KafkaEvents.entries.find { it.event == event } return KafkaEvents.entries.find { it.event == event }
} }
fun isOfWork(event: KafkaEvents): Boolean {
return event in listOf(
EVENT_WORK_CONVERT_CREATED,
EVENT_WORK_EXTRACT_CREATED,
EVENT_WORK_ENCODE_CREATED,
EVENT_WORK_ENCODE_PERFORMED,
EVENT_WORK_CONVERT_PERFORMED,
EVENT_WORK_EXTRACT_PERFORMED
)
}
} }
} }