From fac82f06ebed7f4719fef2a5b808b715fe107441 Mon Sep 17 00:00:00 2001 From: bskjon Date: Wed, 19 Mar 2025 22:57:44 +0100 Subject: [PATCH] Wip --- .../processer/services/EncodeService.kt | 7 ++++++ .../ui/socket/a2a/ProcesserListenerService.kt | 22 ++++++++++++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 919cf9e4..7ec5fe23 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.processer.services +import com.google.gson.Gson import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -209,6 +210,12 @@ class EncodeService( outputFiles = listOf(runner.outputFile), progress = progress?.toProcessProgress() ) + try { + log.info { "Reporting encode progress ${Gson().toJson(processerEventInfo)}" } + + } catch (e: Exception) { + e.printStackTrace() + } try { reporter.sendEncodeProgress(processerEventInfo) } catch (e: Exception) { diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt index ca93ae3e..d794a520 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt @@ -7,6 +7,7 @@ import no.iktdev.mediaprocessing.ui.UIEnv import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService import no.iktdev.mediaprocessing.ui.log import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.converter.MappingJackson2MessageConverter import org.springframework.messaging.converter.StringMessageConverter import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.stomp.* @@ -41,7 +42,7 @@ class ProcesserListenerService( private final fun connectAndListen() { log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" } - client.messageConverter = StringMessageConverter() + client.messageConverter = MappingJackson2MessageConverter() client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() { override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) { super.afterConnected(session, connectedHeaders) @@ -73,11 +74,13 @@ class ProcesserListenerService( } private fun scheduleReconnect() { + log.info { "Scheduling reconnect!" } (reconnectTask ?: kotlinx.coroutines.Runnable { logger.info { "Attempting to reconnect" } connectAndListen() }).let { scheduler.scheduleAtFixedRate(it, 5, 5, TimeUnit.SECONDS) + reconnectTask = it } } @@ -99,9 +102,22 @@ class ProcesserListenerService( } override fun handleFrame(headers: StompHeaders, payload: Any?) { - val response = gson.fromJson(payload.toString(), ProcesserEventInfo::class.java) if (webSocketMonitoringService.anyListening()) { - message?.convertAndSend("/topic/processer/encode/progress", response) + message?.convertAndSend("/topic/processer/encode/progress", payload) + } + val response = when (payload) { + is String -> { + log.info { "Received string of: $payload" } + gson.fromJson(payload.toString(), ProcesserEventInfo::class.java) + } + is ProcesserEventInfo -> { + log.info { "Received object of ${Gson().toJson(payload)}" } + payload + } + else -> { + log.error { "Unsupported payload $payload" } + return + } } } }