Adjustment

This commit is contained in:
Brage 2024-01-14 15:55:52 +01:00
parent b663bcaf05
commit b4036d10c4
4 changed files with 27 additions and 30 deletions

View File

@ -27,6 +27,10 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
log.info { "Starting with id: $serviceId" }
}
override val listensForEvents: List<KafkaEvents> override val listensForEvents: List<KafkaEvents>
get() = listOf( get() = listOf(
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED,

View File

@ -26,13 +26,6 @@ fun main(args: Array<String>) {
val context = runApplication<ProcesserApplication>(*args) val context = runApplication<ProcesserApplication>(*args)
} }
fun getComputername(): String {
return listOfNotNull(
System.getenv("hostname"),
System.getenv("computername")
).first()
}
class SocketImplemented: SocketImplementation() { class SocketImplemented: SocketImplementation() {
} }

View File

@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.processer.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
@ -16,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -33,9 +33,9 @@ class EncodeService: TaskCreator() {
val scope = Coroutines.io() val scope = Coroutines.io()
private var runner: FfmpegWorker? = null private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null private var runnerJob: Job? = null
val encodeServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init { init {
log.info { "Starting encode service with id: $encodeServiceId" } log.info { "Starting with id: $serviceId" }
} }
override val requiredEvents: List<KafkaEvents> override val requiredEvents: List<KafkaEvents>
@ -79,7 +79,7 @@ class EncodeService: TaskCreator() {
logDir.mkdirs() logDir.mkdirs()
} }
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId) val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (setClaim) { if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" } log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
@ -104,13 +104,13 @@ class EncodeService: TaskCreator() {
return return
} }
log.info { "Encode started for ${runner.referenceId}" } log.info { "Encode started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(info, null, false) sendProgress(info, null, false)
scope.launch { scope.launch {
while (runnerJob?.isActive == true) { while (runnerJob?.isActive == true) {
delay(java.time.Duration.ofMinutes(5).toMillis()) delay(java.time.Duration.ofMinutes(5).toMillis())
PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
} }
} }
} }
@ -122,21 +122,21 @@ class EncodeService: TaskCreator() {
return return
} }
log.info { "Encode completed for ${runner.referenceId}" } log.info { "Encode completed for ${runner.referenceId}" }
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
runBlocking { runBlocking {
delay(1000) delay(1000)
if (!consumedIsSuccessful) { if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
} }
delay(1000) delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
while (!readbackIsSuccess) { while (!readbackIsSuccess) {
delay(1000) delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
} }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
) )
clearWorker() clearWorker()
} }
@ -151,7 +151,7 @@ class EncodeService: TaskCreator() {
} }
log.info { "Encode failed for ${runner.referenceId}" } log.info { "Encode failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId) ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
) )
sendProgress(info = info, ended = true) sendProgress(info = info, ended = true)
clearWorker() clearWorker()

View File

@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.processer.Tasks
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.processer.getComputername
import no.iktdev.mediaprocessing.shared.common.limitedWhile import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
@ -17,6 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -37,9 +37,9 @@ class ExtractService: TaskCreator() {
private var runner: FfmpegWorker? = null private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null private var runnerJob: Job? = null
val extractServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init { init {
log.info { "Starting extract service with id: $extractServiceId" } log.info { "Starting with id: $serviceId" }
} }
override fun getListener(): Tasks { override fun getListener(): Tasks {
return Tasks(producesEvent, this) return Tasks(producesEvent, this)
@ -82,7 +82,7 @@ class ExtractService: TaskCreator() {
} }
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId) val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (setClaim) { if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" } log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
@ -108,7 +108,7 @@ class ExtractService: TaskCreator() {
return return
} }
log.info { "Extract started for ${runner.referenceId}" } log.info { "Extract started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, extractServiceId) PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendState(info, false) sendState(info, false)
} }
@ -119,12 +119,12 @@ class ExtractService: TaskCreator() {
return return
} }
log.info { "Extract completed for ${runner.referenceId}" } log.info { "Extract completed for ${runner.referenceId}" }
var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
runBlocking { runBlocking {
delay(1000) delay(1000)
limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) { limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) {
consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
} }
log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" } log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" }
@ -132,9 +132,9 @@ class ExtractService: TaskCreator() {
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) { limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) {
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
log.info { readbackIsSuccess } log.info { readbackIsSuccess }
} }
log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" } log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" }
@ -143,7 +143,7 @@ class ExtractService: TaskCreator() {
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserExtractWorkPerformed( ProcesserExtractWorkPerformed(
status = Status.COMPLETED, status = Status.COMPLETED,
producedBy = extractServiceId, producedBy = serviceId,
derivedFromEventId = runner.eventId, derivedFromEventId = runner.eventId,
outFile = runner.info.outFile) outFile = runner.info.outFile)
) )
@ -160,7 +160,7 @@ class ExtractService: TaskCreator() {
} }
log.info { "Extract failed for ${runner.referenceId}" } log.info { "Extract failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId) ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
) )
sendState(info, ended= true) sendState(info, ended= true)
clearWorker() clearWorker()