diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt index bbb85507..fd3394b6 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt @@ -10,6 +10,8 @@ import org.springframework.boot.web.server.WebServerFactoryCustomizer import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.io.Resource +import org.springframework.messaging.converter.MappingJackson2MessageConverter +import org.springframework.messaging.converter.StringMessageConverter import org.springframework.stereotype.Component import org.springframework.stereotype.Service import org.springframework.web.bind.annotation.RestController diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIEnv.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIEnv.kt index da4f9e65..66e159b0 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIEnv.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIEnv.kt @@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.ui import java.io.File object UIEnv { - val socketEncoder: String = System.getenv("EncoderWs")?.takeIf { it.isNotBlank() } ?: "ws://encoder:8080" + val socketEncoder: String = System.getenv("EncoderWs")?.takeIf { it.isNotBlank() } ?: "ws://encoder:8080/ws" val coordinatorUrl: String = System.getenv("Coordinator")?.takeIf { it.isNotBlank() } ?: "http://coordinator" val wsAllowedOrigins: String = System.getenv("AllowedOriginsWebsocket")?.takeIf { it.isNotBlank() } ?: "" } \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketClient.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketClient.kt new file mode 100644 index 00000000..6f81b1e5 --- /dev/null +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketClient.kt @@ -0,0 +1,108 @@ +package no.iktdev.mediaprocessing.ui.socket + +import mu.KotlinLogging +import org.springframework.messaging.simp.stomp.StompCommand +import org.springframework.messaging.simp.stomp.StompHeaders +import org.springframework.messaging.simp.stomp.StompSession +import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter +import org.springframework.web.socket.client.standard.StandardWebSocketClient +import org.springframework.web.socket.messaging.WebSocketStompClient +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +class SocketClient(private val url: String, val listener: SocketEvents? = null) { + private val logger = KotlinLogging.logger {} + + private val client = WebSocketStompClient(StandardWebSocketClient()) + private val subscriptions: MutableList = mutableListOf() + private var session: StompSession? = null + private val scheduler = Executors.newSingleThreadScheduledExecutor() + private var reconnectFuture: ScheduledFuture<*>? = null + + + + fun subscribe(topic: String, handler: StompSessionHandlerAdapter) { + subscriptions.add(SocketSubscription(topic, handler)) + session?.subscribe(topic, handler) + } + + + private fun reconnect() { + if (reconnectFuture != null && !reconnectFuture!!.isDone) return // Allerede reconnecting + + reconnectFuture?.cancel(true) // Kansellerer tidligere reconnect-task hvis den kjører + + logger.info { "Scheduling reconnect in 30 seconds" } + + reconnectFuture = scheduler.scheduleWithFixedDelay({ + try { + logger.info { "Attempting to reconnect... $url" } + listener?.onReconnecting() + connect() + } catch (e: Exception) { + logger.error(e) { "Reconnect attempt failed" } + } + }, 5, 30, TimeUnit.SECONDS) // Starter etter 5 sekunder, med 30 sekunders intervall + } + + private fun resetReconnector() { + reconnectFuture?.cancel(true) + reconnectFuture = null + } + + fun disconnect() { + if (!scheduler.isShutdown) { + try { + reconnectFuture?.cancel(true) + scheduler.shutdownNow() + } catch (e: Exception) {} + } + session?.disconnect() + listener?.onDisconnected() + } + + private val connectAdapter = object: StompSessionHandlerAdapter() { + override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) { + super.afterConnected(session, connectedHeaders) + resetReconnector() + listener?.onConnected() + subscriptions.forEach { + session.subscribe(it.destination, it.handler) + } + } + + override fun handleTransportError(session: StompSession, exception: Throwable) { + super.handleTransportError(session, exception) + listener?.onException(exception) + this@SocketClient.session = null + reconnect() + } + + override fun handleException( + session: StompSession, + command: StompCommand?, + headers: StompHeaders, + payload: ByteArray, + exception: Throwable + ) { + super.handleException(session, command, headers, payload, exception) + listener?.onException(exception) + } + } + + fun connect() { + client.connect(url, connectAdapter) + } + + interface SocketEvents { + fun onConnected(): Unit {} + fun onReconnecting(): Unit {} + fun onDisconnected(): Unit {} + fun onException(e: Throwable) {} + } + data class SocketSubscription( + val destination: String, + val handler: StompSessionHandlerAdapter + ) +} \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketMessageHandler.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketMessageHandler.kt new file mode 100644 index 00000000..556684da --- /dev/null +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/SocketMessageHandler.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.ui.socket + +import org.springframework.messaging.simp.stomp.StompHeaders +import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter +import java.lang.reflect.Type + +open class SocketMessageHandler: StompSessionHandlerAdapter() { + override fun getPayloadType(headers: StompHeaders): Type { + return ByteArray::class.java + } + + override fun handleFrame(headers: StompHeaders, payload: Any?) { + super.handleFrame(headers, payload) + if (payload is ByteArray) { + onMessage(String(payload)) + } else if (payload is String) { + onMessage(payload) + } + } + + open fun onMessage(socketMessage: String) { + } +} \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt index d794a520..46d6dd78 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt @@ -6,17 +6,11 @@ import no.iktdev.mediaprocessing.shared.common.contract.dto.ProcesserEventInfo import no.iktdev.mediaprocessing.ui.UIEnv import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService import no.iktdev.mediaprocessing.ui.log +import no.iktdev.mediaprocessing.ui.socket.SocketClient +import no.iktdev.mediaprocessing.ui.socket.SocketMessageHandler import org.springframework.beans.factory.annotation.Autowired -import org.springframework.messaging.converter.MappingJackson2MessageConverter -import org.springframework.messaging.converter.StringMessageConverter import org.springframework.messaging.simp.SimpMessagingTemplate -import org.springframework.messaging.simp.stomp.* import org.springframework.stereotype.Service -import org.springframework.web.socket.client.standard.StandardWebSocketClient -import org.springframework.web.socket.messaging.WebSocketStompClient -import java.lang.reflect.Type -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit @Service class ProcesserListenerService( @@ -26,112 +20,52 @@ class ProcesserListenerService( private val logger = KotlinLogging.logger {} private val listeners: MutableList = mutableListOf() - private val scheduler = Executors.newSingleThreadScheduledExecutor() - private var reconnectTask: Runnable? = null + private var socketClient: SocketClient? = null fun attachListener(listener: A2AProcesserListener) { listeners.add(listener) } val gson = Gson() - val client = WebSocketStompClient(StandardWebSocketClient()) + + private final val socketEvent = object : SocketClient.SocketEvents { + override fun onConnected() { + super.onConnected() + log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" } + logger.info { "Tilkoblet processer" } + + socketClient?.subscribe("/topic/encode/progress", encodeProcessMessage) + socketClient?.subscribe("/topic/extract/progress", extractProcessFrameHandler) + } + } init { - connectAndListen() - } - - private final fun connectAndListen() { - log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" } - client.messageConverter = MappingJackson2MessageConverter() - client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() { - override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) { - super.afterConnected(session, connectedHeaders) - logger.info { "Tilkoblet processer" } - stopReconnect() - subscribeToTopics(session) - } - - override fun handleTransportError(session: StompSession, exception: Throwable) { - super.handleTransportError(session, exception) - scheduleReconnect() - } - - override fun handleFrame(headers: StompHeaders, payload: Any?) { - super.handleFrame(headers, payload) - } - - override fun handleException( - session: StompSession, - command: StompCommand?, - headers: StompHeaders, - payload: ByteArray, - exception: Throwable - ) { - super.handleException(session, command, headers, payload, exception) - logger.error { "Feil ved tilkobling: ${exception.message}" } - } - }) - } - - private fun scheduleReconnect() { - log.info { "Scheduling reconnect!" } - (reconnectTask ?: kotlinx.coroutines.Runnable { - logger.info { "Attempting to reconnect" } - connectAndListen() - }).let { - scheduler.scheduleAtFixedRate(it, 5, 5, TimeUnit.SECONDS) - reconnectTask = it + SocketClient(UIEnv.socketEncoder, socketEvent).also { + it.connect() + this.socketClient = it } } - private fun stopReconnect() { - reconnectTask?.let { - scheduler.shutdownNow() - reconnectTask = null - } - } - private fun subscribeToTopics(session: StompSession) { - session.subscribe("/topic/encode/progress", encodeProcessFrameHandler) - session.subscribe("/topic/extract/progress", extractProcessFrameHandler) - } - - private val encodeProcessFrameHandler = object : StompFrameHandler { - override fun getPayloadType(headers: StompHeaders): Type { - return ProcesserEventInfo::class.java - } - - override fun handleFrame(headers: StompHeaders, payload: Any?) { + private val encodeProcessMessage = object : SocketMessageHandler() { + override fun onMessage(socketMessage: String) { + super.onMessage(socketMessage) + message?.convertAndSend("/topic/processer/encode/progress", socketMessage) + val response = gson.fromJson(socketMessage, ProcesserEventInfo::class.java) if (webSocketMonitoringService.anyListening()) { - message?.convertAndSend("/topic/processer/encode/progress", payload) - } - val response = when (payload) { - is String -> { - log.info { "Received string of: $payload" } - gson.fromJson(payload.toString(), ProcesserEventInfo::class.java) - } - is ProcesserEventInfo -> { - log.info { "Received object of ${Gson().toJson(payload)}" } - payload - } - else -> { - log.error { "Unsupported payload $payload" } - return - } } } } - private val extractProcessFrameHandler = object : StompFrameHandler { - override fun getPayloadType(headers: StompHeaders): Type { - return ProcesserEventInfo::class.java - } - override fun handleFrame(headers: StompHeaders, payload: Any?) { - val response = gson.fromJson(payload.toString(), ProcesserEventInfo::class.java) + private val extractProcessFrameHandler = object : SocketMessageHandler() { + override fun onMessage(socketMessage: String) { + super.onMessage(socketMessage) + message?.convertAndSend("/topic/processer/extract/progress", socketMessage) if (webSocketMonitoringService.anyListening()) { - message?.convertAndSend("/topic/processer/extract/progress", response) } + //val stringPayload = (if (payload is ByteArray) String(payload) else payload as String) + //val response = gson.fromJson(stringPayload, ProcesserEventInfo::class.java) } } diff --git a/apps/ui/web/src/app/page/ProcesserTasksPage.tsx b/apps/ui/web/src/app/page/ProcesserTasksPage.tsx index 93b63404..fdd381fb 100644 --- a/apps/ui/web/src/app/page/ProcesserTasksPage.tsx +++ b/apps/ui/web/src/app/page/ProcesserTasksPage.tsx @@ -49,6 +49,10 @@ export default function ProcesserTasksPage() { dispatch(update(response)) }); + useWsSubscription("/topic/processer/encode/progress", (response) => { + console.log(response) + }); + useEffect(() => { client?.publish({ destination: "/app/tasks/all" diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt index df07263b..0197e857 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt @@ -1,6 +1,9 @@ package no.iktdev.mediaprocessing.shared.common.socket +import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.messaging.converter.MappingJackson2MessageConverter +import org.springframework.messaging.converter.StringMessageConverter import org.springframework.messaging.simp.config.MessageBrokerRegistry import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker import org.springframework.web.socket.config.annotation.StompEndpointRegistry @@ -22,4 +25,5 @@ open class SocketImplementation: WebSocketMessageBrokerConfigurer { registry.enableSimpleBroker("/topic") registry.setApplicationDestinationPrefixes("/app") } + } \ No newline at end of file