Removed class from log
This commit is contained in:
parent
3e8924af8f
commit
fc5bb6a71c
@ -33,7 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null
|
val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null
|
||||||
return readFileInfo(selected.data as MediaProcessStarted)
|
return readFileInfo(selected.data as MediaProcessStarted)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,62 @@
|
|||||||
|
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||||
|
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
|
@Service
|
||||||
|
class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||||
|
val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
|
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED
|
||||||
|
|
||||||
|
override val requiredEvents: List<KafkaEvents> = listOf(
|
||||||
|
EVENT_REQUEST_PROCESS_STARTED,
|
||||||
|
)
|
||||||
|
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null
|
||||||
|
if (!started.data.isSuccess()) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
|
val receivedEvents = events.map { it.event }
|
||||||
|
|
||||||
|
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
|
||||||
|
|
||||||
|
val requiresOneOf = listOf(
|
||||||
|
EVENT_WORK_CONVERT_PERFORMED,
|
||||||
|
EVENT_WORK_EXTRACT_PERFORMED,
|
||||||
|
EVENT_WORK_ENCODE_PERFORMED
|
||||||
|
)
|
||||||
|
|
||||||
|
if (requiresOneOf.none { it in receivedEvents }) {
|
||||||
|
val missing = requiresOneOf.filter { !receivedEvents.contains(it) }
|
||||||
|
log.info { "Can't complete at this moment. Missing required event(s)" + missing.joinToString("\n\t") }
|
||||||
|
return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val mapper = ProcessMapping(events)
|
||||||
|
if (mapper.canCollect()) {
|
||||||
|
return ProcessCompleted(Status.COMPLETED)
|
||||||
|
}
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -35,7 +35,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
|
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
|
||||||
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null
|
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null
|
||||||
|
|||||||
@ -13,17 +13,21 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
|
|||||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
|
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling
|
import org.springframework.scheduling.annotation.EnableScheduling
|
||||||
import org.springframework.scheduling.annotation.Scheduled
|
import org.springframework.scheduling.annotation.Scheduled
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
|
import java.time.ZoneOffset
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -32,6 +36,7 @@ import java.time.LocalDateTime
|
|||||||
@EnableScheduling
|
@EnableScheduling
|
||||||
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||||
val log = KotlinLogging.logger {}
|
val log = KotlinLogging.logger {}
|
||||||
|
val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60
|
||||||
|
|
||||||
override val producesEvent: KafkaEvents
|
override val producesEvent: KafkaEvents
|
||||||
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
|
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
|
||||||
@ -44,7 +49,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
|
|||||||
)
|
)
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
|
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
|
||||||
val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed?
|
val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed?
|
||||||
@ -54,7 +59,11 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
|
|||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
if (baseInfo.isSuccess() && meta == null) {
|
if (baseInfo.isSuccess() && meta == null) {
|
||||||
log.info { "Sending ${baseInfo?.title} to waiting queue" }
|
val estimatedTimeout = LocalDateTime.now().toEpochSeconds() + metadataTimeout
|
||||||
|
val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC)
|
||||||
|
|
||||||
|
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
|
||||||
|
log.info { "Sending ${baseInfo?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
|
||||||
if (!waitingProcessesForMeta.containsKey(event.referenceId)) {
|
if (!waitingProcessesForMeta.containsKey(event.referenceId)) {
|
||||||
waitingProcessesForMeta[event.referenceId] = LocalDateTime.now()
|
waitingProcessesForMeta[event.referenceId] = LocalDateTime.now()
|
||||||
}
|
}
|
||||||
@ -94,7 +103,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
|
|||||||
@Scheduled(fixedDelay = (1_000))
|
@Scheduled(fixedDelay = (1_000))
|
||||||
fun sendErrorMessageForMetadata() {
|
fun sendErrorMessageForMetadata() {
|
||||||
val expired = waitingProcessesForMeta.filter {
|
val expired = waitingProcessesForMeta.filter {
|
||||||
LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + KafkaEnv.metadataTimeoutMinutes * 60)
|
LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + metadataTimeout)
|
||||||
}
|
}
|
||||||
expired.forEach {
|
expired.forEach {
|
||||||
log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
|
log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
|
||||||
|
|||||||
@ -38,7 +38,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null
|
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null
|
||||||
return parseStreams(desiredEvent.data as ReaderPerformed)
|
return parseStreams(desiredEvent.data as ReaderPerformed)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,7 +41,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
|
|||||||
|
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
|
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
|
||||||
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) }
|
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) }
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,7 +41,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
||||||
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
|
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
|
||||||
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
|
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
|
||||||
|
|||||||
@ -45,9 +45,9 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
if (!requiredEvents.contains(event.event)) {
|
if (!requiredEvents.contains(event.event)) {
|
||||||
log.info { "${this.javaClass.simpleName} ignores ${event.event} @ ${event.eventId}" }
|
log.info { "Ignored ${event.event} @ ${event.eventId}" }
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user