diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 5948d465..fb7720dc 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -27,6 +27,10 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) private val log = KotlinLogging.logger {} val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + init { + log.info { "Starting with id: $serviceId" } + } + override val listensForEvents: List get() = listOf( KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index 0bb63ff9..c22fadde 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -26,13 +26,6 @@ fun main(args: Array) { val context = runApplication(*args) } -fun getComputername(): String { - return listOfNotNull( - System.getenv("hostname"), - System.getenv("computername") - ).first() -} - class SocketImplemented: SocketImplementation() { } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 839809aa..1e0fb449 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.processer.TaskCreator import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents -import no.iktdev.mediaprocessing.processer.getComputername import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage @@ -16,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.processer.ProcesserEnv +import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed import org.springframework.stereotype.Service @@ -33,9 +33,9 @@ class EncodeService: TaskCreator() { val scope = Coroutines.io() private var runner: FfmpegWorker? = null private var runnerJob: Job? = null - val encodeServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" init { - log.info { "Starting encode service with id: $encodeServiceId" } + log.info { "Starting with id: $serviceId" } } override val requiredEvents: List @@ -79,7 +79,7 @@ class EncodeService: TaskCreator() { logDir.mkdirs() } - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId) + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) @@ -104,13 +104,13 @@ class EncodeService: TaskCreator() { return } log.info { "Encode started for ${runner.referenceId}" } - PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) + PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendProgress(info, null, false) scope.launch { while (runnerJob?.isActive == true) { delay(java.time.Duration.ofMinutes(5).toMillis()) - PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) + PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId) } } } @@ -122,21 +122,21 @@ class EncodeService: TaskCreator() { return } log.info { "Encode completed for ${runner.referenceId}" } - val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) + val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) runBlocking { delay(1000) if (!consumedIsSuccessful) { - PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) + PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) } delay(1000) - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) while (!readbackIsSuccess) { delay(1000) - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) } producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, - ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) + ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) ) clearWorker() } @@ -151,7 +151,7 @@ class EncodeService: TaskCreator() { } log.info { "Encode failed for ${runner.referenceId}" } producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, - ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId) + ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) ) sendProgress(info = info, ended = true) clearWorker() diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 7404e825..11c484ed 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.processer.Tasks import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents -import no.iktdev.mediaprocessing.processer.getComputername import no.iktdev.mediaprocessing.shared.common.limitedWhile import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore @@ -17,6 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.processer.ProcesserEnv +import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import org.springframework.stereotype.Service @@ -37,9 +37,9 @@ class ExtractService: TaskCreator() { private var runner: FfmpegWorker? = null private var runnerJob: Job? = null - val extractServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" init { - log.info { "Starting extract service with id: $extractServiceId" } + log.info { "Starting with id: $serviceId" } } override fun getListener(): Tasks { return Tasks(producesEvent, this) @@ -82,7 +82,7 @@ class ExtractService: TaskCreator() { } - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId) + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} extract" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) @@ -108,7 +108,7 @@ class ExtractService: TaskCreator() { return } log.info { "Extract started for ${runner.referenceId}" } - PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, extractServiceId) + PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendState(info, false) } @@ -119,12 +119,12 @@ class ExtractService: TaskCreator() { return } log.info { "Extract completed for ${runner.referenceId}" } - var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) + var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) runBlocking { delay(1000) limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) { - consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) + consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) } log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" } @@ -132,9 +132,9 @@ class ExtractService: TaskCreator() { - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) { - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) log.info { readbackIsSuccess } } log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" } @@ -143,7 +143,7 @@ class ExtractService: TaskCreator() { producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, ProcesserExtractWorkPerformed( status = Status.COMPLETED, - producedBy = extractServiceId, + producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) ) @@ -160,7 +160,7 @@ class ExtractService: TaskCreator() { } log.info { "Extract failed for ${runner.referenceId}" } producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, - ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId) + ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) ) sendState(info, ended= true) clearWorker()