Minor update
This commit is contained in:
parent
607142cc75
commit
528d07f645
@ -98,6 +98,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
|
|||||||
|
|
||||||
fun readAllUncompletedMessagesInQueue() {
|
fun readAllUncompletedMessagesInQueue() {
|
||||||
val messages = eventManager.getEventsUncompleted()
|
val messages = eventManager.getEventsUncompleted()
|
||||||
|
if (messages.isNotEmpty()) {
|
||||||
|
log.info { "Found ${messages.size} uncompleted items" }
|
||||||
|
}
|
||||||
|
messages.onEach {
|
||||||
|
it.firstOrNull()?.let {
|
||||||
|
log.info { "Found uncompleted: ${it.referenceId}" }
|
||||||
|
}
|
||||||
|
}
|
||||||
io.launch {
|
io.launch {
|
||||||
messages.forEach {
|
messages.forEach {
|
||||||
delay(1000)
|
delay(1000)
|
||||||
|
|||||||
@ -45,6 +45,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
|
|||||||
|
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
||||||
val completed = events.lastOrSuccessOf(EventMediaProcessCompleted) ?: return null
|
val completed = events.lastOrSuccessOf(EventMediaProcessCompleted) ?: return null
|
||||||
if (!started.data.isSuccess() || !completed.data.isSuccess()) {
|
if (!started.data.isSuccess() || !completed.data.isSuccess()) {
|
||||||
|
|||||||
@ -31,6 +31,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
|
|||||||
|
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
||||||
if (!started.data.isSuccess()) {
|
if (!started.data.isSuccess()) {
|
||||||
return null
|
return null
|
||||||
|
|||||||
@ -29,6 +29,8 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta
|
|||||||
|
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val started = events.lastOrSuccessOf(EVENT_REQUEST_PROCESS_STARTED) ?: return null
|
val started = events.lastOrSuccessOf(EVENT_REQUEST_PROCESS_STARTED) ?: return null
|
||||||
if (!started.data.isSuccess()) {
|
if (!started.data.isSuccess()) {
|
||||||
return null
|
return null
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||||
@ -15,6 +16,7 @@ import java.io.File
|
|||||||
|
|
||||||
@Service
|
@Service
|
||||||
class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||||
|
val log = KotlinLogging.logger {}
|
||||||
override val producesEvent: KafkaEvents
|
override val producesEvent: KafkaEvents
|
||||||
get() = KafkaEvents.EventWorkConvertCreated
|
get() = KafkaEvents.EventWorkConvertCreated
|
||||||
|
|
||||||
@ -25,6 +27,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
|
|||||||
)
|
)
|
||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
if (!event.data.isSuccess()) {
|
if (!event.data.isSuccess()) {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,17 +1,26 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||||
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
|
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
|
class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
|
||||||
|
val log = KotlinLogging.logger {}
|
||||||
override val producesEvent: KafkaEvents
|
override val producesEvent: KafkaEvents
|
||||||
get() = KafkaEvents.EventWorkEncodeCreated
|
get() = KafkaEvents.EventWorkEncodeCreated
|
||||||
|
|
||||||
override val requiredEvents: List<KafkaEvents>
|
override val requiredEvents: List<KafkaEvents>
|
||||||
get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated)
|
get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated)
|
||||||
|
|
||||||
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
return super.onProcessEvents(event, events)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,16 +1,26 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||||
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
|
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
|
class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
|
||||||
|
val log = KotlinLogging.logger {}
|
||||||
override val producesEvent: KafkaEvents
|
override val producesEvent: KafkaEvents
|
||||||
get() = KafkaEvents.EventWorkExtractCreated
|
get() = KafkaEvents.EventWorkExtractCreated
|
||||||
|
|
||||||
override val requiredEvents: List<KafkaEvents>
|
override val requiredEvents: List<KafkaEvents>
|
||||||
get() = listOf(KafkaEvents.EventMediaParameterExtractCreated)
|
get() = listOf(KafkaEvents.EventMediaParameterExtractCreated)
|
||||||
|
|
||||||
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
|
return super.onProcessEvents(event, events)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -42,6 +42,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
|
|||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
||||||
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
|
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
|
||||||
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
|
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
|
||||||
|
|||||||
@ -46,6 +46,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
|
|||||||
|
|
||||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||||
|
|
||||||
if (!requiredEvents.contains(event.event)) {
|
if (!requiredEvents.contains(event.event)) {
|
||||||
log.info { "Ignored ${event.event} @ ${event.eventId}" }
|
log.info { "Ignored ${event.event} @ ${event.eventId}" }
|
||||||
return null
|
return null
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user