diff --git a/apps/converter/build.gradle.kts b/apps/converter/build.gradle.kts index 5fe4f5f4..eb193fd7 100644 --- a/apps/converter/build.gradle.kts +++ b/apps/converter/build.gradle.kts @@ -26,7 +26,7 @@ dependencies { /*Spring boot*/ implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter:2.7.0") - // implementation("org.springframework.kafka:spring-kafka:3.0.1") + // implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") implementation("org.springframework.kafka:spring-kafka:2.8.5") @@ -40,7 +40,6 @@ dependencies { implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20210307") - //implementation(project(mapOf("path" to ":shared"))) implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev.library:subtitle:1.7.8-SNAPSHOT") diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 6aedb5ba..a2649099 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -1,6 +1,5 @@ package no.iktdev.mediaprocessing.converter.tasks -import com.google.gson.Gson import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import mu.KotlinLogging @@ -53,82 +52,65 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) event: PersistentProcessDataMessage, events: List ): MessageDataWrapper? { - try { - if (event.data !is ConvertWorkerRequest) - return null - log.info { Gson().toJson(event) } - - val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) - if (isAlreadyClaimed) { - log.warn { "Process is already claimed!" } - return null - } - - - val payload = event.data as ConvertWorkerRequest - val requiresEventId: String? = payload.requiresEventId - - val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) { - val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId) - if (existing == null) { - skipConvertEvent(event, requiresEventId) - return null - } - existing - } else null - - val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) { - Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) - } else null - - - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) - if (!setClaim) { - return null - } - - if (converter == null || !converter.canRead()) { - // Make claim regardless but push to schedule - return null - } - - val result = try { - performConvert(converter) - } catch (e: Exception) { - SimpleMessageData(status = Status.ERROR, message = e.message) - } - - val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) - runBlocking { - delay(1000) - if (!consumedIsSuccessful) { - PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) - } - delay(1000) - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) - - while (!readbackIsSuccess) { - delay(1000) - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) - } - } - return result - } catch (e: Exception) { - e.printStackTrace() + val convertEvent = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest } + if (convertEvent == null) { + // No convert here.. + return null } - return null + val convertRequest = convertEvent.data as ConvertWorkerRequest? ?: return null + val requiredEventId = convertRequest.requiresEventId + if (requiredEventId != null) { + // Requires the eventId to be defined as consumed + val requiredEventToBeCompleted = + getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiredEventId) + ?: return SimpleMessageData(Status.SKIPPED, "Required event: $requiredEventId is not found. Skipping convert work for referenceId: ${event.referenceId}") + if (!canConvert(requiredEventToBeCompleted)) { + // Waiting for required event to be completed + return null + } + } + + val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + if (isAlreadyClaimed) { + log.warn { "Process is already claimed!" } + return null + } + + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + if (!setClaim) { + return null + } + + val payload = event.data as ConvertWorkerRequest + val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) + if (!converter.canRead()) { + // Make claim regardless but push to schedule + return SimpleMessageData(Status.ERROR, "Can't read the file..") + } + + val result = try { + performConvert(converter) + } catch (e: Exception) { + SimpleMessageData(status = Status.ERROR, message = e.message) + } + + val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + runBlocking { + delay(1000) + if (!consumedIsSuccessful) { + PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + } + delay(1000) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + + while (!readbackIsSuccess) { + delay(1000) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + } + } + return result } - fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) { - if (event.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED) - return - val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}") - coordinator.producer.sendMessage( - referenceId = event.referenceId, - event = KafkaEvents.EVENT_WORK_CONVERT_SKIPPED, - data = producesPayload - ) - } fun performConvert(converter: Converter): ConvertWorkPerformed { return try {