From 278763be316b063df3b849418cfc6c506cc54643 Mon Sep 17 00:00:00 2001 From: bskjon Date: Wed, 19 Mar 2025 19:21:43 +0100 Subject: [PATCH] Fixes --- .../ui/socket/a2a/ProcesserListenerService.kt | 34 ++++++++++++++++++- apps/ui/web/src/app/page/ExplorePage.tsx | 2 +- .../mediaprocessing/shared/common/Utils.kt | 8 +++-- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt index a0fb3596..ca93ae3e 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt @@ -7,12 +7,15 @@ import no.iktdev.mediaprocessing.ui.UIEnv import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService import no.iktdev.mediaprocessing.ui.log import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.converter.StringMessageConverter import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.stomp.* import org.springframework.stereotype.Service import org.springframework.web.socket.client.standard.StandardWebSocketClient import org.springframework.web.socket.messaging.WebSocketStompClient import java.lang.reflect.Type +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit @Service class ProcesserListenerService( @@ -22,12 +25,14 @@ class ProcesserListenerService( private val logger = KotlinLogging.logger {} private val listeners: MutableList = mutableListOf() + private val scheduler = Executors.newSingleThreadScheduledExecutor() + private var reconnectTask: Runnable? = null + fun attachListener(listener: A2AProcesserListener) { listeners.add(listener) } val gson = Gson() - val client = WebSocketStompClient(StandardWebSocketClient()) init { @@ -36,13 +41,24 @@ class ProcesserListenerService( private final fun connectAndListen() { log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" } + client.messageConverter = StringMessageConverter() client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() { override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) { super.afterConnected(session, connectedHeaders) logger.info { "Tilkoblet processer" } + stopReconnect() subscribeToTopics(session) } + override fun handleTransportError(session: StompSession, exception: Throwable) { + super.handleTransportError(session, exception) + scheduleReconnect() + } + + override fun handleFrame(headers: StompHeaders, payload: Any?) { + super.handleFrame(headers, payload) + } + override fun handleException( session: StompSession, command: StompCommand?, @@ -56,6 +72,22 @@ class ProcesserListenerService( }) } + private fun scheduleReconnect() { + (reconnectTask ?: kotlinx.coroutines.Runnable { + logger.info { "Attempting to reconnect" } + connectAndListen() + }).let { + scheduler.scheduleAtFixedRate(it, 5, 5, TimeUnit.SECONDS) + } + } + + private fun stopReconnect() { + reconnectTask?.let { + scheduler.shutdownNow() + reconnectTask = null + } + } + private fun subscribeToTopics(session: StompSession) { session.subscribe("/topic/encode/progress", encodeProcessFrameHandler) session.subscribe("/topic/extract/progress", extractProcessFrameHandler) diff --git a/apps/ui/web/src/app/page/ExplorePage.tsx b/apps/ui/web/src/app/page/ExplorePage.tsx index 53820ceb..a3d801c9 100644 --- a/apps/ui/web/src/app/page/ExplorePage.tsx +++ b/apps/ui/web/src/app/page/ExplorePage.tsx @@ -114,7 +114,7 @@ function getSegments(absolutePath: string): Array { function getSegmentedNaviagatablePath(rootClick: () => void, navigateTo: (path: string | null) => void, path: string | null): JSX.Element { - const segments = getSegments(path!) + const segments = getSegments("/src/input/completed") const utElements = segments.map((segment: Segment, index: number) => { return ( diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 1b219195..d3ef1a3c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -175,6 +175,10 @@ inline fun RestTemplate.tryPost(url: String, data: Any, noinline onE } } -fun SimpMessagingTemplate.trySend(destination: String, data: Any) { - this.convertAndSend(destination, data) +fun SimpMessagingTemplate.trySend(destination: String, data: Any, onError: ((Exception) -> Unit)? = null) { + try { + this.convertAndSend(destination, data) + } catch (e: Exception) { + onError?.invoke(e) + } } \ No newline at end of file