Worker listener should not consume if it can't start

This commit is contained in:
bskjon 2025-02-23 19:46:53 +01:00
parent cec4e2aca2
commit f9600c0745
4 changed files with 8 additions and 19 deletions

View File

@ -11,6 +11,10 @@ import no.iktdev.mediaprocessing.shared.common.contract.data.az
abstract class WorkTaskListener: CoordinatorEventListener() { abstract class WorkTaskListener: CoordinatorEventListener() {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
return canStart(incomingEvent, events)
}
fun canStart(incomingEvent: Event, events: List<Event>): Boolean { fun canStart(incomingEvent: Event, events: List<Event>): Boolean {
val autoStart = events.find { it.eventType == Events.ProcessStarted }?.az<MediaProcessStartEvent>()?.data val autoStart = events.find { it.eventType == Events.ProcessStarted }?.az<MediaProcessStartEvent>()?.data
if (autoStart == null) { if (autoStart == null) {

View File

@ -35,6 +35,10 @@ class ConvertWorkTaskListener: WorkTaskListener() {
return true return true
} }
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean { override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (!super.shouldIProcessAndHandleEvent(incomingEvent, events)) {
return false
}
if (!isOfEventsIListenFor(incomingEvent)) if (!isOfEventsIListenFor(incomingEvent))
return false return false
if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) { if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) {
@ -54,10 +58,6 @@ class ConvertWorkTaskListener: WorkTaskListener() {
return return
} }
active = true active = true
if (!canStart(event, events)) {
active = false
return
}
var language: String? = null var language: String? = null
var storeAsFile: String? = null var storeAsFile: String? = null

View File

@ -42,10 +42,6 @@ class EncodeWorkTaskListener : WorkTaskListener() {
return return
} }
active = true active = true
if (!canStart(event, events)) {
active = false
return
}
val encodeArguments = if (event.eventType == Events.ParameterEncodeCreated) { val encodeArguments = if (event.eventType == Events.ParameterEncodeCreated) {
event.az<EncodeArgumentCreatedEvent>()?.data event.az<EncodeArgumentCreatedEvent>()?.data

View File

@ -36,11 +36,6 @@ class ExtractWorkTaskListener: WorkTaskListener() {
return true return true
} }
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return state
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume() val event = incomingEvent.consume()
if (event == null) { if (event == null) {
@ -50,12 +45,6 @@ class ExtractWorkTaskListener: WorkTaskListener() {
} }
active = true active = true
if (!canStart(event, events)) {
active = false
return
}
val arguments = if (event.eventType == Events.ParameterExtractCreated) { val arguments = if (event.eventType == Events.ParameterExtractCreated) {
event.az<ExtractArgumentCreatedEvent>()?.data event.az<ExtractArgumentCreatedEvent>()?.data
} else { } else {