This commit is contained in:
bskjon 2025-03-19 19:21:43 +01:00
parent d8f0f5db85
commit 278763be31
3 changed files with 40 additions and 4 deletions

View File

@ -7,12 +7,15 @@ import no.iktdev.mediaprocessing.ui.UIEnv
import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService
import no.iktdev.mediaprocessing.ui.log import no.iktdev.mediaprocessing.ui.log
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.converter.StringMessageConverter
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.messaging.simp.stomp.* import org.springframework.messaging.simp.stomp.*
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.web.socket.client.standard.StandardWebSocketClient import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient import org.springframework.web.socket.messaging.WebSocketStompClient
import java.lang.reflect.Type import java.lang.reflect.Type
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
@Service @Service
class ProcesserListenerService( class ProcesserListenerService(
@ -22,12 +25,14 @@ class ProcesserListenerService(
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private val listeners: MutableList<A2AProcesserListener> = mutableListOf() private val listeners: MutableList<A2AProcesserListener> = mutableListOf()
private val scheduler = Executors.newSingleThreadScheduledExecutor()
private var reconnectTask: Runnable? = null
fun attachListener(listener: A2AProcesserListener) { fun attachListener(listener: A2AProcesserListener) {
listeners.add(listener) listeners.add(listener)
} }
val gson = Gson() val gson = Gson()
val client = WebSocketStompClient(StandardWebSocketClient()) val client = WebSocketStompClient(StandardWebSocketClient())
init { init {
@ -36,13 +41,24 @@ class ProcesserListenerService(
private final fun connectAndListen() { private final fun connectAndListen() {
log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" } log.info { "EncoderWsUrl: ${UIEnv.socketEncoder}" }
client.messageConverter = StringMessageConverter()
client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() { client.connect(UIEnv.socketEncoder, object : StompSessionHandlerAdapter() {
override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) { override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
super.afterConnected(session, connectedHeaders) super.afterConnected(session, connectedHeaders)
logger.info { "Tilkoblet processer" } logger.info { "Tilkoblet processer" }
stopReconnect()
subscribeToTopics(session) subscribeToTopics(session)
} }
override fun handleTransportError(session: StompSession, exception: Throwable) {
super.handleTransportError(session, exception)
scheduleReconnect()
}
override fun handleFrame(headers: StompHeaders, payload: Any?) {
super.handleFrame(headers, payload)
}
override fun handleException( override fun handleException(
session: StompSession, session: StompSession,
command: StompCommand?, command: StompCommand?,
@ -56,6 +72,22 @@ class ProcesserListenerService(
}) })
} }
private fun scheduleReconnect() {
(reconnectTask ?: kotlinx.coroutines.Runnable {
logger.info { "Attempting to reconnect" }
connectAndListen()
}).let {
scheduler.scheduleAtFixedRate(it, 5, 5, TimeUnit.SECONDS)
}
}
private fun stopReconnect() {
reconnectTask?.let {
scheduler.shutdownNow()
reconnectTask = null
}
}
private fun subscribeToTopics(session: StompSession) { private fun subscribeToTopics(session: StompSession) {
session.subscribe("/topic/encode/progress", encodeProcessFrameHandler) session.subscribe("/topic/encode/progress", encodeProcessFrameHandler)
session.subscribe("/topic/extract/progress", extractProcessFrameHandler) session.subscribe("/topic/extract/progress", extractProcessFrameHandler)

View File

@ -114,7 +114,7 @@ function getSegments(absolutePath: string): Array<Segment> {
function getSegmentedNaviagatablePath(rootClick: () => void, navigateTo: (path: string | null) => void, path: string | null): JSX.Element { function getSegmentedNaviagatablePath(rootClick: () => void, navigateTo: (path: string | null) => void, path: string | null): JSX.Element {
const segments = getSegments(path!) const segments = getSegments("/src/input/completed")
const utElements = segments.map((segment: Segment, index: number) => { const utElements = segments.map((segment: Segment, index: number) => {
return ( return (

View File

@ -175,6 +175,10 @@ inline fun <reified T> RestTemplate.tryPost(url: String, data: Any, noinline onE
} }
} }
fun SimpMessagingTemplate.trySend(destination: String, data: Any) { fun SimpMessagingTemplate.trySend(destination: String, data: Any, onError: ((Exception) -> Unit)? = null) {
try {
this.convertAndSend(destination, data) this.convertAndSend(destination, data)
} catch (e: Exception) {
onError?.invoke(e)
}
} }