From 749194f9e1803146473d1a2d0f45af39f77edef5 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 22 Mar 2025 15:38:34 +0100 Subject: [PATCH] minor adjustments --- .../ui/socket/ProcesserTasksTopic.kt | 114 ++++++++++++++++-- .../ui/socket/UnprocessedFilesTopic.kt | 2 +- .../ui/socket/a2a/ProcesserListenerService.kt | 16 ++- apps/ui/web/package-lock.json | 63 +++++----- apps/ui/web/src/App.tsx | 24 ++-- .../app/features/table/expandableTable.tsx | 22 +++- apps/ui/web/src/app/page/EventsChainPage.tsx | 10 +- apps/ui/web/src/app/page/ExplorePage.tsx | 16 +-- apps/ui/web/src/app/page/LaunchPage.tsx | 6 +- .../web/src/app/page/UnprocessedFilesPage.tsx | 93 +++++++++++++- apps/ui/web/src/app/store.ts | 5 +- .../web/src/app/store/chained-events-slice.ts | 3 +- .../src/app/store/kafka-items-flat-slice.ts | 23 ---- .../common/database/cal/TasksManager.kt | 8 +- 14 files changed, 295 insertions(+), 110 deletions(-) delete mode 100644 apps/ui/web/src/app/store/kafka-items-flat-slice.ts diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/ProcesserTasksTopic.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/ProcesserTasksTopic.kt index aef1d94a..c875673c 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/ProcesserTasksTopic.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/ProcesserTasksTopic.kt @@ -1,10 +1,20 @@ package no.iktdev.mediaprocessing.ui.socket +import no.iktdev.eventi.data.referenceId +import no.iktdev.eventi.database.toEpochSeconds +import no.iktdev.eventi.database.withDirtyRead import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.shared.common.contract.Events +import no.iktdev.mediaprocessing.shared.common.contract.ProcessType +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.contract.dto.ProcesserEventInfo +import no.iktdev.mediaprocessing.shared.common.database.cal.toEvent import no.iktdev.mediaprocessing.shared.common.database.cal.toTask +import no.iktdev.mediaprocessing.shared.common.database.tables.events import no.iktdev.mediaprocessing.shared.common.database.tables.tasks import no.iktdev.mediaprocessing.shared.common.task.Task +import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.ui.WebSocketMonitoringService import no.iktdev.mediaprocessing.ui.eventDatabase import no.iktdev.mediaprocessing.ui.socket.a2a.ProcesserListenerService @@ -13,25 +23,33 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.stereotype.Service +import java.io.File @Service class ProcesserTasksTopic( @Autowired a2AProcesserService: ProcesserListenerService, @Autowired private val webSocketMonitoringService: WebSocketMonitoringService, - @Autowired override var template: SimpMessagingTemplate?, -): SocketListener(template) { + @Autowired private val message: SimpMessagingTemplate?, +): SocketListener(message) { + final val a2a = object : ProcesserListenerService.A2AProcesserListener { override fun onExtractProgress(info: ProcesserEventInfo) { + message?.convertAndSend("/topic/processer/extract/progress", info) + pullAllTasks() } override fun onEncodeProgress(info: ProcesserEventInfo) { + message?.convertAndSend("/topic/processer/encode/progress", info) + pullAllTasks() } override fun onEncodeAssigned() { + pullAllTasks() } override fun onExtractAssigned() { + pullAllTasks() } } @@ -39,18 +57,96 @@ class ProcesserTasksTopic( a2AProcesserService.attachListener(a2a) } - data class TaskGroup( + enum class Status { + Skipped, + Awaiting, // Waiting for tasks to be created + NeedsApproval, + Pending, + InProgress, + Completed, + Failed, + } + + data class ContentEventState( val referenceId: String, - val tasks: List - ) + val title: String, + val encode: Status = Status.Skipped, + val extract: Status = Status.Skipped, + val convert: Status = Status.Skipped, + val created: Long + ) {} + @MessageMapping("/tasks/all") - fun pullAllTasks() { + fun pullAllTaskss() { + val states = update() + template?.convertAndSend("/topic/tasks/all", states) + } + + fun getOperationState(tasks: List, hasOperation: Boolean, canStart: Boolean): Status { + if (!hasOperation) return Status.Skipped + if (tasks.isEmpty()) return Status.Awaiting + + if (!canStart) return Status.NeedsApproval + + if (tasks.any { it.consumed }) { + return Status.Completed + } + if (tasks.any { it.claimed }) { + return Status.InProgress + } + if (tasks.any{ it.status == "ERROR"}) { + return Status.Failed + } + return Status.Pending + + } + + + fun update(): MutableList { + val eventStates: MutableList = mutableListOf() + + val tasks = pullAllTasks() + val availableEvents = pullAllEvents() + + for ((referenceId, events) in availableEvents) { + val startEvent = events.findFirstEventOf() ?: continue + val startData = startEvent.data ?: continue + val title = events.findFirstEventOf()?.data?.sanitizedName ?: startData.file.let { File(it).nameWithoutExtension } + val canStart = if (startData.type == ProcessType.FLOW) true else { + events.findEventsOf().isNotEmpty() + } + val tasksCreated = tasks[referenceId] + val encode = tasksCreated?.filter { it.task == TaskType.Encode } ?: emptyList() + val extract = tasksCreated?.filter { it.task == TaskType.Extract } ?: emptyList() + val convert = tasksCreated?.filter { it.task == TaskType.Convert } ?: emptyList() + + eventStates.add(ContentEventState( + title = title, + referenceId = referenceId, + encode = getOperationState(encode, startData.operations.contains(OperationEvents.ENCODE), canStart), + extract = getOperationState(extract, startData.operations.contains(OperationEvents.EXTRACT), canStart), + convert = getOperationState(convert, startData.operations.contains(OperationEvents.CONVERT), canStart), + created = startEvent.metadata.created.toEpochSeconds() * 1000L + )) + } + return eventStates + } + + fun pullAllTasks(): Map> { val result = withTransaction(eventDatabase.database) { tasks.selectAll().toTask() - .groupBy { it.referenceId }.map { g -> TaskGroup(g.key, g.value) } - } ?: emptyList() - template?.convertAndSend("/topic/tasks/all", result) + .groupBy { it.referenceId } + } ?: emptyMap() + return result + } + + fun pullAllEvents(): Map> { + val result = withDirtyRead(eventDatabase.database) { + events.selectAll().toEvent() + .groupBy { it.referenceId() } + } ?: emptyMap() + return result } } \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UnprocessedFilesTopic.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UnprocessedFilesTopic.kt index c4cff844..c8f3591e 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UnprocessedFilesTopic.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UnprocessedFilesTopic.kt @@ -68,7 +68,7 @@ class UnprocessedFilesTopic( FileInfo( it[files.baseName], it[files.fileName], - it[files.checksum] + it[files.checksum], ) } unprocessedFiles = found diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt index 46d6dd78..30691703 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/a2a/ProcesserListenerService.kt @@ -15,7 +15,6 @@ import org.springframework.stereotype.Service @Service class ProcesserListenerService( @Autowired private val webSocketMonitoringService: WebSocketMonitoringService, - @Autowired private val message: SimpMessagingTemplate?, ) { private val logger = KotlinLogging.logger {} private val listeners: MutableList = mutableListOf() @@ -50,9 +49,11 @@ class ProcesserListenerService( private val encodeProcessMessage = object : SocketMessageHandler() { override fun onMessage(socketMessage: String) { super.onMessage(socketMessage) - message?.convertAndSend("/topic/processer/encode/progress", socketMessage) val response = gson.fromJson(socketMessage, ProcesserEventInfo::class.java) - if (webSocketMonitoringService.anyListening()) { + listeners.forEach { listener -> + run { + listener.onEncodeProgress(response) + } } } } @@ -61,11 +62,14 @@ class ProcesserListenerService( private val extractProcessFrameHandler = object : SocketMessageHandler() { override fun onMessage(socketMessage: String) { super.onMessage(socketMessage) - message?.convertAndSend("/topic/processer/extract/progress", socketMessage) if (webSocketMonitoringService.anyListening()) { } - //val stringPayload = (if (payload is ByteArray) String(payload) else payload as String) - //val response = gson.fromJson(stringPayload, ProcesserEventInfo::class.java) + val response = gson.fromJson(socketMessage, ProcesserEventInfo::class.java) + listeners.forEach { listener -> + run { + listener.onEncodeProgress(response) + } + } } } diff --git a/apps/ui/web/package-lock.json b/apps/ui/web/package-lock.json index cce91581..1b99bd33 100644 --- a/apps/ui/web/package-lock.json +++ b/apps/ui/web/package-lock.json @@ -574,13 +574,13 @@ } }, "node_modules/@babel/helpers": { - "version": "7.22.6", - "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.22.6.tgz", - "integrity": "sha512-YjDs6y/fVOYFV8hAf1rxd1QvR9wJe1pDBZ2AREKq/SDayfPzgk0PBnVuTCE5X1acEpMMNOVUqoe+OwiZGJ+OaA==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.26.10.tgz", + "integrity": "sha512-UPYc3SauzZ3JGgj87GgZ89JVdC5dj0AoetR5Bw6wj4niittNyFh6+eOGonYvJ1ao6B8lEa3Q3klS7ADZ53bc5g==", + "license": "MIT", "dependencies": { - "@babel/template": "^7.22.5", - "@babel/traverse": "^7.22.6", - "@babel/types": "^7.22.5" + "@babel/template": "^7.26.9", + "@babel/types": "^7.26.10" }, "engines": { "node": ">=6.9.0" @@ -2085,9 +2085,10 @@ "integrity": "sha512-x/rqGMdzj+fWZvCOYForTghzbtqPDZ5gPwaoNGHdgDfF2QA/XZbCBp4Moo5scrkAMPhB7z26XM/AaHuIJdgauA==" }, "node_modules/@babel/runtime": { - "version": "7.25.0", - "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.25.0.tgz", - "integrity": "sha512-7dRy4DwXwtzBrPbZflqxnvfxLF8kdZXPkhymtDeFoFqE6ldzjQFgYTtYIFARcLEYDrqfBfYcZt1WqFxRoyC9Rw==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.26.10.tgz", + "integrity": "sha512-2WJMeRQPHKSPemqk/awGrAiuFfzBmOIPXKizAsVhWH9YJqLZ0H+HS4c8loHGgW6utJ3E/ejXQUsiGaQy2NZ9Fw==", + "license": "MIT", "dependencies": { "regenerator-runtime": "^0.14.0" }, @@ -2133,9 +2134,9 @@ } }, "node_modules/@babel/types": { - "version": "7.26.9", - "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.9.tgz", - "integrity": "sha512-Y3IR1cRnOxOCDvMmNiym7XpXQ93iGDDPHx+Zj+NM+rg0fBaShfQLkg+hKPaZCEvg5N/LeCo4+Rj/i3FuJsIQaw==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.10.tgz", + "integrity": "sha512-emqcG3vHrpxUKTrxcblR36dcrcoRDvKmnL/dCL6ZsHaShW80qxCAcNhzQZrpeM765VzEos+xOi4s+r4IXzTwdQ==", "license": "MIT", "dependencies": { "@babel/helper-string-parser": "^7.25.9", @@ -18009,11 +18010,12 @@ } }, "node_modules/use-sync-external-store": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.2.0.tgz", - "integrity": "sha512-eEgnFxGQ1Ife9bzYs6VLi8/4X6CObHMw9Qr9tPY43iKwsPw8xE8+EFsf/2cFZ5S3esXgpWgtSCtLNS41F+sKPA==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.4.0.tgz", + "integrity": "sha512-9WXSPC5fMv61vaupRkCKCxsPxBocVnwakBEkMIHHpkTTg6icbJtg6jzgtLDm4bl3cSHAca52rYWih0k4K3PfHw==", + "license": "MIT", "peerDependencies": { - "react": "^16.8.0 || ^17.0.0 || ^18.0.0" + "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" } }, "node_modules/utf-8-validate": { @@ -19510,13 +19512,12 @@ } }, "@babel/helpers": { - "version": "7.22.6", - "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.22.6.tgz", - "integrity": "sha512-YjDs6y/fVOYFV8hAf1rxd1QvR9wJe1pDBZ2AREKq/SDayfPzgk0PBnVuTCE5X1acEpMMNOVUqoe+OwiZGJ+OaA==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.26.10.tgz", + "integrity": "sha512-UPYc3SauzZ3JGgj87GgZ89JVdC5dj0AoetR5Bw6wj4niittNyFh6+eOGonYvJ1ao6B8lEa3Q3klS7ADZ53bc5g==", "requires": { - "@babel/template": "^7.22.5", - "@babel/traverse": "^7.22.6", - "@babel/types": "^7.22.5" + "@babel/template": "^7.26.9", + "@babel/types": "^7.26.10" } }, "@babel/parser": { @@ -20481,9 +20482,9 @@ "integrity": "sha512-x/rqGMdzj+fWZvCOYForTghzbtqPDZ5gPwaoNGHdgDfF2QA/XZbCBp4Moo5scrkAMPhB7z26XM/AaHuIJdgauA==" }, "@babel/runtime": { - "version": "7.25.0", - "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.25.0.tgz", - "integrity": "sha512-7dRy4DwXwtzBrPbZflqxnvfxLF8kdZXPkhymtDeFoFqE6ldzjQFgYTtYIFARcLEYDrqfBfYcZt1WqFxRoyC9Rw==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.26.10.tgz", + "integrity": "sha512-2WJMeRQPHKSPemqk/awGrAiuFfzBmOIPXKizAsVhWH9YJqLZ0H+HS4c8loHGgW6utJ3E/ejXQUsiGaQy2NZ9Fw==", "requires": { "regenerator-runtime": "^0.14.0" }, @@ -20520,9 +20521,9 @@ } }, "@babel/types": { - "version": "7.26.9", - "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.9.tgz", - "integrity": "sha512-Y3IR1cRnOxOCDvMmNiym7XpXQ93iGDDPHx+Zj+NM+rg0fBaShfQLkg+hKPaZCEvg5N/LeCo4+Rj/i3FuJsIQaw==", + "version": "7.26.10", + "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.26.10.tgz", + "integrity": "sha512-emqcG3vHrpxUKTrxcblR36dcrcoRDvKmnL/dCL6ZsHaShW80qxCAcNhzQZrpeM765VzEos+xOi4s+r4IXzTwdQ==", "requires": { "@babel/helper-string-parser": "^7.25.9", "@babel/helper-validator-identifier": "^7.25.9" @@ -31684,9 +31685,9 @@ } }, "use-sync-external-store": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.2.0.tgz", - "integrity": "sha512-eEgnFxGQ1Ife9bzYs6VLi8/4X6CObHMw9Qr9tPY43iKwsPw8xE8+EFsf/2cFZ5S3esXgpWgtSCtLNS41F+sKPA==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/use-sync-external-store/-/use-sync-external-store-1.4.0.tgz", + "integrity": "sha512-9WXSPC5fMv61vaupRkCKCxsPxBocVnwakBEkMIHHpkTTg6icbJtg6jzgtLDm4bl3cSHAca52rYWih0k4K3PfHw==", "requires": {} }, "utf-8-validate": { diff --git a/apps/ui/web/src/App.tsx b/apps/ui/web/src/App.tsx index 6caf972f..50eaef7c 100644 --- a/apps/ui/web/src/App.tsx +++ b/apps/ui/web/src/App.tsx @@ -12,7 +12,6 @@ import { updateItems } from './app/store/composed-slice'; import ExplorePage from './app/page/ExplorePage'; import { ThemeProvider } from '@mui/material'; import theme from './theme'; -import { simpleEventsUpdate } from './app/store/kafka-items-flat-slice'; import { EventDataObject, SimpleEventDataObject } from './types'; import EventsChainPage from './app/page/EventsChainPage'; import UnprocessedFilesPage from './app/page/UnprocessedFilesPage'; @@ -22,6 +21,14 @@ import QueueIcon from '@mui/icons-material/Queue'; import AppsIcon from '@mui/icons-material/Apps'; import ConstructionIcon from '@mui/icons-material/Construction'; import ProcesserTasksPage from './app/page/ProcesserTasksPage'; +import DashboardIcon from '@mui/icons-material/Dashboard'; +import GraphicEqIcon from '@mui/icons-material/GraphicEq'; +import HomeRepairServiceIcon from '@mui/icons-material/HomeRepairService'; +import InboxIcon from '@mui/icons-material/Inbox'; +import InputIcon from '@mui/icons-material/Input'; +import NotStartedIcon from '@mui/icons-material/NotStarted'; +import EventsPage from './app/page/EventsPage'; +import TableChartIcon from '@mui/icons-material/TableChart'; function App() { const client = useStompClient(); @@ -31,9 +38,6 @@ function App() { dispatch(updateItems(response)) }); - useWsSubscription>("/topic/event/flat", (response) => { - dispatch(simpleEventsUpdate(response)) - }); useEffect(() => { @@ -71,7 +75,12 @@ function App() { }}> - window.location.href = "/events"} sx={{ + window.location.href = "/processer"} sx={{ + ...iconHeight + }}> + + + window.location.href = "/eventsflow"} sx={{ ...iconHeight }}> @@ -89,7 +98,7 @@ function App() { window.location.href = "/tasks"} sx={{ ...iconHeight }}> - + } /> + } /> } /> } /> - } /> + } /> } />