diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 1a9da83d..6aedb5ba 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -53,65 +53,70 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) event: PersistentProcessDataMessage, events: List ): MessageDataWrapper? { - if (event.data !is ConvertWorkerRequest) - return null - log.info { Gson().toJson(event) } + try { + if (event.data !is ConvertWorkerRequest) + return null + log.info { Gson().toJson(event) } - val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) - if (isAlreadyClaimed) { - log.warn { "Process is already claimed!" } - return null - } - - - val payload = event.data as ConvertWorkerRequest - val requiresEventId: String? = payload.requiresEventId - - val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) { - val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId) - if (existing == null) { - skipConvertEvent(event, requiresEventId) + val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + if (isAlreadyClaimed) { + log.warn { "Process is already claimed!" } return null } - existing - } else null - - val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) { - Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) - } else null - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) - if (!setClaim) { - return null - } + val payload = event.data as ConvertWorkerRequest + val requiresEventId: String? = payload.requiresEventId - if (converter == null || !converter.canRead()) { - // Make claim regardless but push to schedule - return null - } + val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) { + val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId) + if (existing == null) { + skipConvertEvent(event, requiresEventId) + return null + } + existing + } else null - val result = try { - performConvert(converter) - } catch (e: Exception) { - SimpleMessageData(status = Status.ERROR, message = e.message) - } + val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) { + Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) + } else null - val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) - runBlocking { - delay(1000) - if (!consumedIsSuccessful) { - PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + if (!setClaim) { + return null } - delay(1000) - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) - while (!readbackIsSuccess) { + if (converter == null || !converter.canRead()) { + // Make claim regardless but push to schedule + return null + } + + val result = try { + performConvert(converter) + } catch (e: Exception) { + SimpleMessageData(status = Status.ERROR, message = e.message) + } + + val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + runBlocking { delay(1000) - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + if (!consumedIsSuccessful) { + PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + } + delay(1000) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + + while (!readbackIsSuccess) { + delay(1000) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + } } + return result + } catch (e: Exception) { + e.printStackTrace() } - return result + return null } fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {