Fixed reconnect issue
This commit is contained in:
parent
c464b110da
commit
e663c743ab
@ -10,6 +10,8 @@ import org.springframework.boot.web.server.WebServerFactoryCustomizer
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.core.io.Resource
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter
|
||||
import org.springframework.messaging.converter.StringMessageConverter
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
|
||||
@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.ui
|
||||
import java.io.File
|
||||
|
||||
object UIEnv {
|
||||
val socketEncoder: String = System.getenv("EncoderWs")?.takeIf { it.isNotBlank() } ?: "ws://encoder:8080"
|
||||
val socketEncoder: String = System.getenv("EncoderWs")?.takeIf { it.isNotBlank() } ?: "ws://encoder:8080/ws"
|
||||
val coordinatorUrl: String = System.getenv("Coordinator")?.takeIf { it.isNotBlank() } ?: "http://coordinator"
|
||||
val wsAllowedOrigins: String = System.getenv("AllowedOriginsWebsocket")?.takeIf { it.isNotBlank() } ?: ""
|
||||
}
|
||||
@ -0,0 +1,108 @@
|
||||
package no.iktdev.mediaprocessing.ui.socket
|
||||
|
||||
import mu.KotlinLogging
|
||||
import org.springframework.messaging.simp.stomp.StompCommand
|
||||
import org.springframework.messaging.simp.stomp.StompHeaders
|
||||
import org.springframework.messaging.simp.stomp.StompSession
|
||||
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter
|
||||
import org.springframework.web.socket.client.standard.StandardWebSocketClient
|
||||
import org.springframework.web.socket.messaging.WebSocketStompClient
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class SocketClient(private val url: String, val listener: SocketEvents? = null) {
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
private val client = WebSocketStompClient(StandardWebSocketClient())
|
||||
private val subscriptions: MutableList<SocketSubscription> = mutableListOf()
|
||||
private var session: StompSession? = null
|
||||
private val scheduler = Executors.newSingleThreadScheduledExecutor()
|
||||
private var reconnectFuture: ScheduledFuture<*>? = null
|
||||
|
||||
|
||||
|
||||
fun subscribe(topic: String, handler: StompSessionHandlerAdapter) {
|
||||
subscriptions.add(SocketSubscription(topic, handler))
|
||||
session?.subscribe(topic, handler)
|
||||
}
|
||||
|
||||
|
||||
private fun reconnect() {
|
||||
if (reconnectFuture != null && !reconnectFuture!!.isDone) return // Allerede reconnecting
|
||||
|
||||
reconnectFuture?.cancel(true) // Kansellerer tidligere reconnect-task hvis den kjører
|
||||
|
||||
logger.info { "Scheduling reconnect in 30 seconds" }
|
||||
|
||||
reconnectFuture = scheduler.scheduleWithFixedDelay({
|
||||
try {
|
||||
logger.info { "Attempting to reconnect... $url" }
|
||||
listener?.onReconnecting()
|
||||
connect()
|
||||
} catch (e: Exception) {
|
||||
logger.error(e) { "Reconnect attempt failed" }
|
||||
}
|
||||
}, 5, 30, TimeUnit.SECONDS) // Starter etter 5 sekunder, med 30 sekunders intervall
|
||||
}
|
||||
|
||||
private fun resetReconnector() {
|
||||
reconnectFuture?.cancel(true)
|
||||
reconnectFuture = null
|
||||
}
|
||||
|
||||
fun disconnect() {
|
||||
if (!scheduler.isShutdown) {
|
||||
try {
|
||||
reconnectFuture?.cancel(true)
|
||||
scheduler.shutdownNow()
|
||||
} catch (e: Exception) {}
|
||||
}
|
||||
session?.disconnect()
|
||||
listener?.onDisconnected()
|
||||
}
|
||||
|
||||
private val connectAdapter = object: StompSessionHandlerAdapter() {
|
||||
override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
|
||||
super.afterConnected(session, connectedHeaders)
|
||||
resetReconnector()
|
||||
listener?.onConnected()
|
||||
subscriptions.forEach {
|
||||
session.subscribe(it.destination, it.handler)
|
||||
}
|
||||
}
|
||||
|
||||
override fun handleTransportError(session: StompSession, exception: Throwable) {
|
||||
super.handleTransportError(session, exception)
|
||||
listener?.onException(exception)
|
||||
this@SocketClient.session = null
|
||||
reconnect()
|
||||
}
|
||||
|
||||
override fun handleException(
|
||||
session: StompSession,
|
||||
command: StompCommand?,
|
||||
headers: StompHeaders,
|
||||
payload: ByteArray,
|
||||
exception: Throwable
|
||||
) {
|
||||
super.handleException(session, command, headers, payload, exception)
|
||||
listener?.onException(exception)
|
||||
}
|
||||
}
|
||||
|
||||
fun connect() {
|
||||
client.connect(url, connectAdapter)
|
||||
}
|
||||
|
||||
interface SocketEvents {
|
||||
fun onConnected(): Unit {}
|
||||
fun onReconnecting(): Unit {}
|
||||
fun onDisconnected(): Unit {}
|
||||
fun onException(e: Throwable) {}
|
||||
}
|
||||
data class SocketSubscription(
|
||||
val destination: String,
|
||||
val handler: StompSessionHandlerAdapter
|
||||
)
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package no.iktdev.mediaprocessing.ui.socket
|
||||
|
||||
import org.springframework.messaging.simp.stomp.StompHeaders
|
||||
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter
|
||||
import java.lang.reflect.Type
|
||||
|
||||
open class SocketMessageHandler: StompSessionHandlerAdapter() {
|
||||
override fun getPayloadType(headers: StompHeaders): Type {
|
||||
return ByteArray::class.java
|
||||
}
|
||||
|
||||
override fun handleFrame(headers: StompHeaders, payload: Any?) {
|
||||
super.handleFrame(headers, payload)
|
||||
if (payload is ByteArray) {
|
||||
onMessage(String(payload))
|
||||
} else if (payload is String) {
|
||||
onMessage(payload)
|
||||
}
|
||||
}
|
||||
|
||||
open fun onMessage(socketMessage: String) {
|
||||
}
|
||||
}
|
||||
@ -6,17 +6,11 @@ import no.iktdev.mediaprocessing.shared.common.contract.dto.ProcesserEventInfo
|
||||
import no.iktdev.mediaprocessing.ui.UIEnv
|
||||
import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService
|
||||
import no.iktdev.mediaprocessing.ui.log
|
||||
import no.iktdev.mediaprocessing.ui.socket.SocketClient
|
||||
import no.iktdev.mediaprocessing.ui.socket.SocketMessageHandler
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter
|
||||
import org.springframework.messaging.converter.StringMessageConverter
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate
|
||||
import org.springframework.messaging.simp.stomp.*
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.web.socket.client.standard.StandardWebSocketClient
|
||||
import org.springframework.web.socket.messaging.WebSocketStompClient
|
||||
import java.lang.reflect.Type
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@Service
|
||||
class ProcesserListenerService(
|
||||
@ -26,112 +20,52 @@ class ProcesserListenerService(
|
||||
private val logger = KotlinLogging.logger {}
|
||||
private val listeners: MutableList<A2AProcesserListener> = mutableListOf()
|
||||
|
||||
private val scheduler = Executors.newSingleThreadScheduledExecutor()
|
||||
private var reconnectTask: Runnable? = null
|
||||
private var socketClient: SocketClient? = null
|
||||
|
||||
fun attachListener(listener: A2AProcesserListener) {
|
||||
listeners.add(listener)
|
||||
}
|
||||
|
||||
val gson = Gson()
|
||||
val client = WebSocketStompClient(StandardWebSocketClient())
|
||||
|
||||
private final val socketEvent = object : SocketClient.SocketEvents {
|
||||
override fun onConnected() {
|
||||
super.onConnected()
|
||||
log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" }
|
||||
logger.info { "Tilkoblet processer" }
|
||||
|
||||
socketClient?.subscribe("/topic/encode/progress", encodeProcessMessage)
|
||||
socketClient?.subscribe("/topic/extract/progress", extractProcessFrameHandler)
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
connectAndListen()
|
||||
}
|
||||
|
||||
private final fun connectAndListen() {
|
||||
log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" }
|
||||
client.messageConverter = MappingJackson2MessageConverter()
|
||||
client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() {
|
||||
override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
|
||||
super.afterConnected(session, connectedHeaders)
|
||||
logger.info { "Tilkoblet processer" }
|
||||
stopReconnect()
|
||||
subscribeToTopics(session)
|
||||
}
|
||||
|
||||
override fun handleTransportError(session: StompSession, exception: Throwable) {
|
||||
super.handleTransportError(session, exception)
|
||||
scheduleReconnect()
|
||||
}
|
||||
|
||||
override fun handleFrame(headers: StompHeaders, payload: Any?) {
|
||||
super.handleFrame(headers, payload)
|
||||
}
|
||||
|
||||
override fun handleException(
|
||||
session: StompSession,
|
||||
command: StompCommand?,
|
||||
headers: StompHeaders,
|
||||
payload: ByteArray,
|
||||
exception: Throwable
|
||||
) {
|
||||
super.handleException(session, command, headers, payload, exception)
|
||||
logger.error { "Feil ved tilkobling: ${exception.message}" }
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private fun scheduleReconnect() {
|
||||
log.info { "Scheduling reconnect!" }
|
||||
(reconnectTask ?: kotlinx.coroutines.Runnable {
|
||||
logger.info { "Attempting to reconnect" }
|
||||
connectAndListen()
|
||||
}).let {
|
||||
scheduler.scheduleAtFixedRate(it, 5, 5, TimeUnit.SECONDS)
|
||||
reconnectTask = it
|
||||
SocketClient(UIEnv.socketEncoder, socketEvent).also {
|
||||
it.connect()
|
||||
this.socketClient = it
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopReconnect() {
|
||||
reconnectTask?.let {
|
||||
scheduler.shutdownNow()
|
||||
reconnectTask = null
|
||||
}
|
||||
}
|
||||
|
||||
private fun subscribeToTopics(session: StompSession) {
|
||||
session.subscribe("/topic/encode/progress", encodeProcessFrameHandler)
|
||||
session.subscribe("/topic/extract/progress", extractProcessFrameHandler)
|
||||
}
|
||||
|
||||
private val encodeProcessFrameHandler = object : StompFrameHandler {
|
||||
override fun getPayloadType(headers: StompHeaders): Type {
|
||||
return ProcesserEventInfo::class.java
|
||||
}
|
||||
|
||||
override fun handleFrame(headers: StompHeaders, payload: Any?) {
|
||||
private val encodeProcessMessage = object : SocketMessageHandler() {
|
||||
override fun onMessage(socketMessage: String) {
|
||||
super.onMessage(socketMessage)
|
||||
message?.convertAndSend("/topic/processer/encode/progress", socketMessage)
|
||||
val response = gson.fromJson(socketMessage, ProcesserEventInfo::class.java)
|
||||
if (webSocketMonitoringService.anyListening()) {
|
||||
message?.convertAndSend("/topic/processer/encode/progress", payload)
|
||||
}
|
||||
val response = when (payload) {
|
||||
is String -> {
|
||||
log.info { "Received string of: $payload" }
|
||||
gson.fromJson(payload.toString(), ProcesserEventInfo::class.java)
|
||||
}
|
||||
is ProcesserEventInfo -> {
|
||||
log.info { "Received object of ${Gson().toJson(payload)}" }
|
||||
payload
|
||||
}
|
||||
else -> {
|
||||
log.error { "Unsupported payload $payload" }
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val extractProcessFrameHandler = object : StompFrameHandler {
|
||||
override fun getPayloadType(headers: StompHeaders): Type {
|
||||
return ProcesserEventInfo::class.java
|
||||
}
|
||||
|
||||
override fun handleFrame(headers: StompHeaders, payload: Any?) {
|
||||
val response = gson.fromJson(payload.toString(), ProcesserEventInfo::class.java)
|
||||
private val extractProcessFrameHandler = object : SocketMessageHandler() {
|
||||
override fun onMessage(socketMessage: String) {
|
||||
super.onMessage(socketMessage)
|
||||
message?.convertAndSend("/topic/processer/extract/progress", socketMessage)
|
||||
if (webSocketMonitoringService.anyListening()) {
|
||||
message?.convertAndSend("/topic/processer/extract/progress", response)
|
||||
}
|
||||
//val stringPayload = (if (payload is ByteArray) String(payload) else payload as String)
|
||||
//val response = gson.fromJson(stringPayload, ProcesserEventInfo::class.java)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -49,6 +49,10 @@ export default function ProcesserTasksPage() {
|
||||
dispatch(update(response))
|
||||
});
|
||||
|
||||
useWsSubscription<any>("/topic/processer/encode/progress", (response) => {
|
||||
console.log(response)
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
client?.publish({
|
||||
destination: "/app/tasks/all"
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.socket
|
||||
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter
|
||||
import org.springframework.messaging.converter.StringMessageConverter
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
|
||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
|
||||
@ -22,4 +25,5 @@ open class SocketImplementation: WebSocketMessageBrokerConfigurer {
|
||||
registry.enableSimpleBroker("/topic")
|
||||
registry.setApplicationDestinationPrefixes("/app")
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user