This commit is contained in:
bskjon 2024-04-11 22:08:46 +02:00
parent 72e73c2c20
commit 7088f0221b
11 changed files with 42 additions and 34 deletions

View File

@ -2,6 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto
@ -33,8 +34,10 @@ class ProcessMapping(val events: List<PersistentMessage>) {
val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterEncodeCreated } val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterEncodeCreated }
val created = events.filter { it.event == KafkaEvents.EventWorkEncodeCreated} val created = events.filter { it.event == KafkaEvents.EventWorkEncodeCreated}
val performed = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed } val performedEvents = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed }
val isSkipped = events.filter { it.isSkipped() }
val performed = performedEvents.filter { it.isSuccess() }
val isSkipped = performedEvents.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
@ -44,16 +47,21 @@ class ProcessMapping(val events: List<PersistentMessage>) {
val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterExtractCreated }.filter { it.data.isSuccess() } val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterExtractCreated }.filter { it.data.isSuccess() }
val created = events.filter { it.event == KafkaEvents.EventWorkExtractCreated } val created = events.filter { it.event == KafkaEvents.EventWorkExtractCreated }
val performed = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed } val performedEvents = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed }
val isSkipped = events.filter { it.isSkipped() }
val performed = performedEvents.filter { it.isSuccess() }
val isSkipped = performedEvents.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
fun waitsForConvert(): Boolean { fun waitsForConvert(): Boolean {
val created = events.filter { it.event == KafkaEvents.EventWorkConvertCreated } val created = events.filter { it.event == KafkaEvents.EventWorkConvertCreated }
val performed = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed } val performedEvents = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed }
val isSkipped = events.filter { it.isSkipped() }
val performed = performedEvents.filter { it.isSuccess() }
val isSkipped = performedEvents.filter { it.isSkipped() }
return created.size > performed.size + isSkipped.size return created.size > performed.size + isSkipped.size
} }

View File

@ -30,11 +30,11 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val producesEvent: KafkaEvents = KafkaEvents.EventCollectAndStore
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
EventMediaProcessStarted, EventMediaProcessStarted,
EVENT_MEDIA_PROCESS_COMPLETED EventMediaProcessCompleted
) )
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
@ -42,8 +42,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null val completed = events.lastOrSuccessOf(EventMediaProcessCompleted) ?: return null
if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { if (!started.data.isSuccess() || !completed.data.isSuccess()) {
return null return null
} }
val mapped = ProcessMapping(events).map() ?: return null val mapped = ProcessMapping(events).map() ?: return null

View File

@ -19,7 +19,7 @@ import org.springframework.stereotype.Service
class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED override val producesEvent: KafkaEvents = KafkaEvents.EventMediaProcessCompleted
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
EventMediaProcessStarted, EventMediaProcessStarted,

View File

@ -19,7 +19,7 @@ import org.springframework.stereotype.Service
class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED override val producesEvent: KafkaEvents = KafkaEvents.EventRequestProcessCompleted
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_REQUEST_PROCESS_STARTED, EVENT_REQUEST_PROCESS_STARTED,

View File

@ -58,10 +58,13 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
var message: String? = null var message: String? = null
var status = Status.COMPLETED
val result = if (outFile?.exists() == true) { val result = if (outFile?.exists() == true) {
message = "${outFile.name} already exists" message = "${outFile.name} already exists"
status = Status.SKIPPED
outFile outFile
} else if (coversInDifferentFormats.isNotEmpty()) { } else if (coversInDifferentFormats.isNotEmpty()) {
status = Status.SKIPPED
coversInDifferentFormats.random() coversInDifferentFormats.random()
} else if (outFile != null) { } else if (outFile != null) {
runBlocking { runBlocking {
@ -74,7 +77,9 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
return if (result == null) { return if (result == null) {
SimpleMessageData(Status.ERROR, "Could not download cover, check logs", event.eventId) SimpleMessageData(Status.ERROR, "Could not download cover, check logs", event.eventId)
} else { } else {
val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR if (!result.exists() || !result.canRead()) {
status = Status.ERROR
}
CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath, event.eventId) CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath, event.eventId)
} }
} }

View File

@ -58,10 +58,10 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
} }
private fun getCurrentState(events: List<PersistentMessage>, processes: Map<String, EventSummarySubItem>): SummaryState { private fun getCurrentState(events: List<PersistentMessage>, processes: Map<String, EventSummarySubItem>): SummaryState {
val stored = events.findLast { it.event == KafkaEvents.EVENT_COLLECT_AND_STORE } val stored = events.findLast { it.event == KafkaEvents.EventCollectAndStore }
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted } val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }
val completedMediaEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED } val completedMediaEvent = events.findLast { it.event == KafkaEvents.EventMediaProcessCompleted }
val completedRequestEvent = events.findLast { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED } val completedRequestEvent = events.findLast { it.event == KafkaEvents.EventRequestProcessCompleted }
if (stored != null && stored.data.isSuccess()) { if (stored != null && stored.data.isSuccess()) {
return SummaryState.Completed return SummaryState.Completed

View File

@ -33,7 +33,7 @@ class PersistentDataReader(var dataSource: DataSource) {
fun getUncompletedMessages(): List<List<PersistentMessage>> { fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withDirtyRead(dataSource.database) { val result = withDirtyRead(dataSource.database) {
events.selectAll() events.selectAll()
.andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event } .andWhere { events.event neq KafkaEvents.EventMediaProcessCompleted.event }
.groupBy { it[events.referenceId] } .groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } .mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } }
} ?: emptyList() } ?: emptyList()

View File

@ -1,8 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.persistance package no.iktdev.mediaprocessing.shared.common.persistance
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.datasource.* import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -11,10 +9,7 @@ import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import java.sql.SQLIntegrityConstraintViolationException
import java.time.LocalDateTime import java.time.LocalDateTime
import javax.xml.crypto.Data
import kotlin.coroutines.coroutineContext
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
@ -97,9 +92,9 @@ class PersistentEventManager(private val dataSource: DataSource) {
fun getEventsUncompleted(): List<List<PersistentMessage>> { fun getEventsUncompleted(): List<List<PersistentMessage>> {
val identifiesAsCompleted = listOf( val identifiesAsCompleted = listOf(
KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED, KafkaEvents.EventRequestProcessCompleted,
KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED, KafkaEvents.EventMediaProcessCompleted,
KafkaEvents.EVENT_COLLECT_AND_STORE KafkaEvents.EventCollectAndStore
) )
val all = getAllEventsGrouped() val all = getAllEventsGrouped()
return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } } return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } }

View File

@ -39,7 +39,7 @@ class DeserializingRegistry {
KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java, KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java KafkaEvents.EventMediaProcessCompleted to ProcessCompleted::class.java
) )
} }

View File

@ -36,9 +36,9 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
EVENT_MEDIA_PROCESS_COMPLETED("event:media-process:completed"), EventMediaProcessCompleted("event:media-process:completed"),
EVENT_REQUEST_PROCESS_COMPLETED("event:request-process:completed"), EventRequestProcessCompleted("event:request-process:completed"),
EVENT_COLLECT_AND_STORE("event::save"); EventCollectAndStore("event::save");
companion object { companion object {
fun toEvent(event: String): KafkaEvents? { fun toEvent(event: String): KafkaEvents? {
@ -60,9 +60,9 @@ enum class KafkaEvents(val event: String) {
fun isOfFinalize(event: KafkaEvents): Boolean { fun isOfFinalize(event: KafkaEvents): Boolean {
return event in listOf( return event in listOf(
EVENT_MEDIA_PROCESS_COMPLETED, EventMediaProcessCompleted,
EVENT_REQUEST_PROCESS_COMPLETED, EventRequestProcessCompleted,
EVENT_COLLECT_AND_STORE EventCollectAndStore
) )
} }
} }

View File

@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED) @KafkaBelongsToEvent(KafkaEvents.EventMediaProcessCompleted)
data class ProcessCompleted( data class ProcessCompleted(
override val status: Status, override val status: Status,
override val derivedFromEventId: String? override val derivedFromEventId: String?