Minor update
This commit is contained in:
parent
5fd910b78d
commit
3211cb2608
@ -6,10 +6,12 @@ import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
|
||||
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
@ -40,11 +42,48 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
|
||||
|
||||
val receivedEvents = events.map { it.event }
|
||||
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
|
||||
|
||||
|
||||
|
||||
|
||||
val ffmpegEvents = listOf(KafkaEvents.EventMediaParameterEncodeCreated, EventMediaParameterExtractCreated)
|
||||
if (ffmpegEvents.any { receivedEvents.contains(it) } && events.none { e -> KafkaEvents.isOfWork(e.event) }) {
|
||||
return null
|
||||
}
|
||||
|
||||
val startedData: MediaProcessStarted? = started.data as MediaProcessStarted?
|
||||
if (startedData == null) {
|
||||
log.error { "${event.referenceId} contains a started event without proper data object" }
|
||||
return null
|
||||
}
|
||||
|
||||
|
||||
val hasEncodeAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.ENCODE)) {
|
||||
events.any { it.event == EventWorkEncodePerformed }
|
||||
} else true
|
||||
|
||||
val hasExtractAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.EXTRACT)) {
|
||||
events.any { it.event == EventWorkExtractPerformed}
|
||||
} else true
|
||||
|
||||
val hasConvertAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.CONVERT)) {
|
||||
events.any { it.event == EventWorkConvertPerformed }
|
||||
} else true
|
||||
|
||||
val missingRequired: MutableMap<ProcessStartOperationEvents, Boolean> = mutableMapOf(
|
||||
ProcessStartOperationEvents.ENCODE to hasEncodeAndIsRequired,
|
||||
ProcessStartOperationEvents.EXTRACT to hasExtractAndIsRequired,
|
||||
ProcessStartOperationEvents.CONVERT to hasConvertAndIsRequired
|
||||
)
|
||||
|
||||
|
||||
|
||||
if (missingRequired.values.any { !it }) {
|
||||
log.info { "Waiting for ${missingRequired.entries.filter { !it.value }.map { it.key.name }}" }
|
||||
return null
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user