This commit is contained in:
Brage 2024-03-26 03:06:42 +01:00
parent 6fd490689e
commit 0c2a6f3c1c
2 changed files with 57 additions and 76 deletions

View File

@ -26,7 +26,7 @@ dependencies {
/*Spring boot*/ /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:2.7.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1") // implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.kafka:spring-kafka:2.8.5")
@ -40,7 +40,6 @@ dependencies {
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.google.code.gson:gson:2.8.9") implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307") implementation("org.json:json:20210307")
//implementation(project(mapOf("path" to ":shared")))
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev.library:subtitle:1.7.8-SNAPSHOT") implementation("no.iktdev.library:subtitle:1.7.8-SNAPSHOT")

View File

@ -1,6 +1,5 @@
package no.iktdev.mediaprocessing.converter.tasks package no.iktdev.mediaprocessing.converter.tasks
import com.google.gson.Gson
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import mu.KotlinLogging import mu.KotlinLogging
@ -53,82 +52,65 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
event: PersistentProcessDataMessage, event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage> events: List<PersistentProcessDataMessage>
): MessageDataWrapper? { ): MessageDataWrapper? {
try { val convertEvent = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest }
if (event.data !is ConvertWorkerRequest) if (convertEvent == null) {
return null // No convert here..
log.info { Gson().toJson(event) } return null
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val payload = event.data as ConvertWorkerRequest
val requiresEventId: String? = payload.requiresEventId
val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) {
val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId)
if (existing == null) {
skipConvertEvent(event, requiresEventId)
return null
}
existing
} else null
val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) {
Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
} else null
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (!setClaim) {
return null
}
if (converter == null || !converter.canRead()) {
// Make claim regardless but push to schedule
return null
}
val result = try {
performConvert(converter)
} catch (e: Exception) {
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
}
}
return result
} catch (e: Exception) {
e.printStackTrace()
} }
return null val convertRequest = convertEvent.data as ConvertWorkerRequest? ?: return null
val requiredEventId = convertRequest.requiresEventId
if (requiredEventId != null) {
// Requires the eventId to be defined as consumed
val requiredEventToBeCompleted =
getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiredEventId)
?: return SimpleMessageData(Status.SKIPPED, "Required event: $requiredEventId is not found. Skipping convert work for referenceId: ${event.referenceId}")
if (!canConvert(requiredEventToBeCompleted)) {
// Waiting for required event to be completed
return null
}
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (!setClaim) {
return null
}
val payload = event.data as ConvertWorkerRequest
val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
if (!converter.canRead()) {
// Make claim regardless but push to schedule
return SimpleMessageData(Status.ERROR, "Can't read the file..")
}
val result = try {
performConvert(converter)
} catch (e: Exception) {
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
}
}
return result
} }
fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {
if (event.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED)
return
val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}")
coordinator.producer.sendMessage(
referenceId = event.referenceId,
event = KafkaEvents.EVENT_WORK_CONVERT_SKIPPED,
data = producesPayload
)
}
fun performConvert(converter: Converter): ConvertWorkPerformed { fun performConvert(converter: Converter): ConvertWorkPerformed {
return try { return try {