This commit is contained in:
Brage 2024-03-26 01:16:06 +01:00
parent 4b4aac9496
commit 6fd490689e

View File

@ -53,65 +53,70 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
event: PersistentProcessDataMessage, event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage> events: List<PersistentProcessDataMessage>
): MessageDataWrapper? { ): MessageDataWrapper? {
if (event.data !is ConvertWorkerRequest) try {
return null if (event.data !is ConvertWorkerRequest)
log.info { Gson().toJson(event) } return null
log.info { Gson().toJson(event) }
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) { if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" } log.warn { "Process is already claimed!" }
return null
}
val payload = event.data as ConvertWorkerRequest
val requiresEventId: String? = payload.requiresEventId
val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) {
val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId)
if (existing == null) {
skipConvertEvent(event, requiresEventId)
return null return null
} }
existing
} else null
val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) {
Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
} else null
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) val payload = event.data as ConvertWorkerRequest
if (!setClaim) { val requiresEventId: String? = payload.requiresEventId
return null
}
if (converter == null || !converter.canRead()) { val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) {
// Make claim regardless but push to schedule val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId)
return null if (existing == null) {
} skipConvertEvent(event, requiresEventId)
return null
}
existing
} else null
val result = try { val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) {
performConvert(converter) Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
} catch (e: Exception) { } else null
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking { val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
delay(1000) if (!setClaim) {
if (!consumedIsSuccessful) { return null
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
} }
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) { if (converter == null || !converter.canRead()) {
// Make claim regardless but push to schedule
return null
}
val result = try {
performConvert(converter)
} catch (e: Exception) {
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking {
delay(1000) delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
}
} }
return result
} catch (e: Exception) {
e.printStackTrace()
} }
return result return null
} }
fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) { fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {