UI + Adjustments

This commit is contained in:
Brage 2024-03-30 14:25:35 +01:00
parent 68f3d05d56
commit 98ca3e239f
34 changed files with 646 additions and 108 deletions

View File

@ -9,10 +9,9 @@ import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.converter.persistentReader import no.iktdev.mediaprocessing.converter.persistentReader
import no.iktdev.mediaprocessing.converter.persistentWriter import no.iktdev.mediaprocessing.converter.persistentWriter
import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.common.helper.DerivedProcessIterationHolder
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
@ -20,10 +19,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerReq
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.util.* import java.util.*
@EnableScheduling
@Service @Service
class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : TaskCreator(coordinator) { class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : TaskCreator(coordinator) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
@ -42,9 +44,13 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED
fun getRequiredExtractProcessForContinuation(referenceId: String, requiresEventId: String): PersistentProcessDataMessage? { fun getRequiredExtractProcessForContinuation(
referenceId: String,
requiresEventId: String
): PersistentProcessDataMessage? {
return persistentReader.getProcessEvent(referenceId, requiresEventId) return persistentReader.getProcessEvent(referenceId, requiresEventId)
} }
fun canConvert(extract: PersistentProcessDataMessage?): Boolean { fun canConvert(extract: PersistentProcessDataMessage?): Boolean {
return extract?.consumed == true && extract.data.isSuccess() return extract?.consumed == true && extract.data.isSuccess()
} }
@ -54,7 +60,8 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
event: PersistentProcessDataMessage, event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage> events: List<PersistentProcessDataMessage>
): MessageDataWrapper? { ): MessageDataWrapper? {
val convertEvent = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest } val convertEvent =
events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest }
if (convertEvent == null) { if (convertEvent == null) {
// No convert here.. // No convert here..
return null return null
@ -63,22 +70,42 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
val requiredEventId = convertRequest.requiresEventId val requiredEventId = convertRequest.requiresEventId
if (requiredEventId != null) { if (requiredEventId != null) {
// Requires the eventId to be defined as consumed // Requires the eventId to be defined as consumed
val requiredEventToBeCompleted = val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation(
getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiredEventId) referenceId = event.referenceId,
?: return SimpleMessageData(Status.SKIPPED, "Required event: $requiredEventId is not found. Skipping convert work for referenceId: ${event.referenceId}") requiresEventId = requiredEventId
)
if (requiredEventToBeCompleted == null) {
log.warn { "$requiredEventId extract event with eventId: $requiredEventId was not found" }
log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" }
val existing = scheduled_deferred_events[event.referenceId]
val newList = (existing ?: listOf()) + listOf(
DerivedProcessIterationHolder(
eventId = event.eventId,
event = convertEvent
)
)
scheduled_deferred_events[event.referenceId] = newList
return null
}
if (!canConvert(requiredEventToBeCompleted)) { if (!canConvert(requiredEventToBeCompleted)) {
// Waiting for required event to be completed // Waiting for required event to be completed
return null return null
} }
} }
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) val isAlreadyClaimed =
persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) { if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" } log.warn { "Process is already claimed!" }
return null return null
} }
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) val setClaim = persistentWriter.setProcessEventClaim(
referenceId = event.referenceId,
eventId = event.eventId,
claimedBy = serviceId
)
if (!setClaim) { if (!setClaim) {
return null return null
} }
@ -87,27 +114,39 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
if (!converter.canRead()) { if (!converter.canRead()) {
// Make claim regardless but push to schedule // Make claim regardless but push to schedule
return SimpleMessageData(Status.ERROR, "Can't read the file..") return ConvertWorkPerformed(
status = Status.ERROR,
message = "Can't read the file..",
derivedFromEventId = converter.eventId,
producedBy = serviceId
)
} }
val result = try { val result = try {
performConvert(converter) performConvert(converter)
} catch (e: Exception) { } catch (e: Exception) {
SimpleMessageData(status = Status.ERROR, message = e.message) ConvertWorkPerformed(
status = Status.ERROR, message = e.message,
derivedFromEventId = converter.eventId,
producedBy = serviceId
)
} }
val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) val consumedIsSuccessful =
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking { runBlocking {
delay(1000) delay(1000)
if (!consumedIsSuccessful) { if (!consumedIsSuccessful) {
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
} }
delay(1000) delay(1000)
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) var readbackIsSuccess =
persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) { while (!readbackIsSuccess) {
delay(1000) delay(1000)
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) readbackIsSuccess =
persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
} }
} }
return result return result
@ -132,7 +171,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
derivedFromEventId = converter.eventId, derivedFromEventId = converter.eventId,
outFiles = emptyList() outFiles = emptyList()
) )
} catch (e : Converter.FileIsNullOrEmpty) { } catch (e: Converter.FileIsNullOrEmpty) {
e.printStackTrace() e.printStackTrace()
ConvertWorkPerformed( ConvertWorkPerformed(
status = Status.ERROR, status = Status.ERROR,
@ -143,4 +182,51 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
) )
} }
} }
val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
@Scheduled(fixedDelay = (300_000))
fun validatePresenceOfRequiredEvent() {
val removal = mutableMapOf<String, List<DerivedProcessIterationHolder>>()
for ((referenceId, eventList) in scheduled_deferred_events) {
val failed = mutableListOf<DerivedProcessIterationHolder>()
for (event in eventList) {
val ce = if (event.event.data is ConvertWorkerRequest) event.event.data as ConvertWorkerRequest else null
try {
val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation(
referenceId = referenceId,
requiresEventId = ce?.requiresEventId!!
)
if (requiredEventToBeCompleted == null && event.iterated > 4) {
throw RuntimeException("Iterated overshot")
} else {
event.iterated++
"Iteration ${event.iterated} for event ${event.eventId} in deferred check"
}
} catch (e: Exception) {
persistentWriter.setProcessEventCompleted(referenceId, event.eventId, serviceId)
failed.add(event)
log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." }
producer.sendMessage(
referenceId = referenceId,
event = producesEvent,
data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}")
)
}
}
removal[referenceId] = failed
}
for ((referenceId, events) in removal) {
val list = scheduled_deferred_events[referenceId] ?: continue
list.toMutableList().removeAll(events)
scheduled_deferred_events[referenceId] = list
}
}
} }

View File

@ -4,6 +4,7 @@ plugins {
kotlin("plugin.spring") version "1.5.31" kotlin("plugin.spring") version "1.5.31"
id("org.springframework.boot") version "2.5.5" id("org.springframework.boot") version "2.5.5"
id("io.spring.dependency-management") version "1.0.11.RELEASE" id("io.spring.dependency-management") version "1.0.11.RELEASE"
id("org.jetbrains.kotlin.plugin.serialization") version "1.5.0" // Legg til Kotlin Serialization-plugin
} }
group = "no.iktdev.mediaprocessing" group = "no.iktdev.mediaprocessing"
@ -84,7 +85,7 @@ dependencies {
testImplementation("junit:junit:4.13.2") testImplementation("junit:junit:4.13.2")
testImplementation("org.mockito:mockito-core:3.+") testImplementation("org.mockito:mockito-core:3.+")
testImplementation("org.assertj:assertj-core:3.4.1") testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0")
} }
tasks.withType<Test> { tasks.withType<Test> {

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.mapping package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.contract.reader.OutputFilesDto import no.iktdev.mediaprocessing.shared.contract.reader.OutputFilesDto
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
@ -14,15 +15,15 @@ class OutputFilesMapping(val events: List<PersistentMessage>) {
val videoResult = events.filter { it.data is ProcesserEncodeWorkPerformed } val videoResult = events.filter { it.data is ProcesserEncodeWorkPerformed }
.map { it.data as ProcesserEncodeWorkPerformed } .map { it.data as ProcesserEncodeWorkPerformed }
val subtitleResult = events.filter { it.data is ProcesserExtractWorkPerformed && it.data.isSuccess() }.map { it.data as ProcesserExtractWorkPerformed }.filter { !it.outFile.isNullOrBlank() } val subtitleResult = events.filter { it.data is ProcesserExtractWorkPerformed && it.isSuccess() }.map { it.data as ProcesserExtractWorkPerformed }.filter { !it.outFile.isNullOrBlank() }
val convertedSubtitleResult = events.filter { it.data is ConvertWorkPerformed && it.data.isSuccess() }.map { it.data as ConvertWorkPerformed } val convertedSubtitleResult = events.filter { it.data is ConvertWorkPerformed && it.isSuccess() }.map { it.data as ConvertWorkPerformed }
val referenceId = events.first().referenceId val referenceId = events.firstOrNull()?.referenceId ?: throw RuntimeException("No Id")
val subtitles = try { val subtitles = try {
toSubtitleList(subtitleResult, convertedSubtitleResult) toSubtitleList(subtitleResult, convertedSubtitleResult)
} catch (e: Exception) { } catch (e: Exception) {
System.err.println("Exception of $referenceId") System.err.println("Exception of $referenceId")
System.err.print("EventIds:\n" + events.joinToString("\n") { it.eventId }) System.err.print("EventIds:\n" + events.joinToString("\n") { it.eventId } + "\n")
e.printStackTrace() e.printStackTrace()
throw e throw e
} }

View File

@ -0,0 +1,56 @@
package no.iktdev.mediaprocessing
import kotlinx.serialization.json.*
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.json.JSONArray
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
class PersistentMessageFromJsonDump(events: String) {
private var data: JsonArray?
init {
val jsonArray = Json.parseToJsonElement(events) as JsonArray
data = jsonArray.firstOrNull { it.jsonObject["data"] != null }?.jsonObject?.get("data") as? JsonArray
}
fun getPersistentMessages(): List<PersistentMessage> {
return data?.mapNotNull {
try {
mapToPersistentMessage(it)
} catch (e: Exception) {
System.err.print(it.toString())
e.printStackTrace()
null
}
} ?: emptyList()
}
val dzz = DeserializingRegistry()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
private fun mapToPersistentMessage(e: JsonElement): PersistentMessage? {
val referenceId: String = e.jsonObject["referenceId"]?.jsonPrimitive?.content ?: throw RuntimeException("No ReferenceId found")
val eventId: String = e.jsonObject["eventId"]?.jsonPrimitive?.content ?: throw RuntimeException("No EventId")
val event: String = e.jsonObject["event"]?.jsonPrimitive?.content ?: throw RuntimeException("No Event")
val data: String = e.jsonObject["data"]?.jsonPrimitive?.content ?: throw RuntimeException("No data")
val created: String = e.jsonObject["created"]?.jsonPrimitive?.content ?: throw RuntimeException("No Created date time found")
val kev = KafkaEvents.toEvent(event) ?: throw RuntimeException("Not able to convert event to Enum")
val dzdata = dzz.deserializeData(kev, data)
return PersistentMessage(
referenceId = referenceId,
eventId = eventId,
event = kev,
data = dzdata,
created = LocalDateTime.parse(created, formatter)
)
}
}

View File

@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
@ -78,4 +79,10 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
) )
@Scheduled(fixedDelay = (5_000))
fun checkForWork() {
log.info { "Checking if there is any work to do.." }
readAllAvailableInQueue()
}
} }

View File

@ -39,6 +39,9 @@ dependencies {
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:common"))) implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:kafka")))
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui package no.iktdev.mediaprocessing.ui
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import org.springframework.beans.factory.annotation.Value import org.springframework.beans.factory.annotation.Value
@ -8,6 +8,7 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import org.springframework.web.method.HandlerTypePredicate import org.springframework.web.method.HandlerTypePredicate
import org.springframework.web.servlet.config.annotation.CorsRegistry import org.springframework.web.servlet.config.annotation.CorsRegistry
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer import org.springframework.web.servlet.config.annotation.PathMatchConfigurer
@ -22,7 +23,7 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
class WebConfig: WebMvcConfigurer { class WebConfig: WebMvcConfigurer {
override fun addCorsMappings(registry: CorsRegistry) { override fun addCorsMappings(registry: CorsRegistry) {
registry.addMapping("/**") registry.addMapping("/**")
.allowedOrigins("*") .allowedOrigins("localhost", "*://localhost:3000", "localhost:80")
.allowCredentials(false) .allowCredentials(false)
} }
@ -48,7 +49,17 @@ class WebConfig: WebMvcConfigurer {
} }
} }
class SocketImplemented: SocketImplementation() { @Configuration
class ApiCommunicationConfig {
@Bean
fun coordinatorTemplate(): RestTemplate {
val restTemplate = RestTemplate()
return restTemplate
}
} }
@Configuration
class SocketImplemented: SocketImplementation() {
}

View File

@ -0,0 +1,120 @@
package no.iktdev.mediaprocessing.ui
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.ui.coordinator.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.ui.dto.EventSummarySubItem
import no.iktdev.mediaprocessing.ui.dto.SummaryState
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
override val listeners = PersistentEventBasedMessageListener()
val dbReader = PersistentDataReader(getEventsDatabase())
override fun onCoordinatorReady() {
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
}
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
messages: List<PersistentMessage>
) {
}
fun readAllEvents() {
val messages = persistentReader.getAllMessages()
}
fun readAllProcesserEvents() {
val messages = persistentReader.getProcessEvents()
}
@Scheduled(fixedDelay = (5_000))
fun refreshDatabaseData() {
}
private fun getCurrentStateFromProcesserEvents(events: List<PersistentProcessDataMessage>): Map<String, EventSummarySubItem> {
return events.associate {
it.event.event to EventSummarySubItem(
eventId = it.eventId,
status = if (it.consumed) SummaryState.Completed else if (it.claimed) SummaryState.Working else SummaryState.Pending
)
}
}
private fun getCurrentState(events: List<PersistentMessage>, processes: Map<String, EventSummarySubItem>): SummaryState {
val stored = events.findLast { it.event == KafkaEvents.EVENT_COLLECT_AND_STORE }
val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }
val completedMediaEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED }
val completedRequestEvent = events.findLast { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED }
if (stored != null && stored.data.isSuccess()) {
return SummaryState.Completed
}
if (completedMediaEvent?.data.isSuccess() || completedRequestEvent?.data.isSuccess()) {
return SummaryState.AwaitingStore
}
if (processes.values.all { it.status == SummaryState.Completed }) {
return SummaryState.AwaitingStore
} else if (processes.values.any { it.status == SummaryState.Working }) {
return SummaryState.Working
} else if (processes.values.any { it.status == SummaryState.Pending }) {
return SummaryState.Pending
}
val workPrepared = events.filter { it.event in listOf(
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
KafkaEvents.EVENT_WORK_CONVERT_CREATED,
KafkaEvents.EVENT_WORK_ENCODE_CREATED
) }
if (workPrepared.isNotEmpty()) {
return SummaryState.Pending
}
if (started != null && (started.data as MediaProcessStarted).type == ProcessType.MANUAL) {
return SummaryState.AwaitingConfirmation
}
val perparation = events.filter { it.event in listOf(
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED,
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
) }
if (perparation.isNotEmpty()) {
return SummaryState.Preparing
}
// EVENT_MEDIA_METADATA_SEARCH_PERFORMED
return SummaryState.Started
}
fun buildSummaries() {
val messages = persistentReader.getAllMessages()
}
}

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui package no.iktdev.mediaprocessing.ui
import mu.KotlinLogging import mu.KotlinLogging
@ -6,11 +6,17 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.Observables
import no.iktdev.exfl.observable.observableMapOf import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.streamit.content.ui.dto.EventDataObject import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.streamit.content.ui.dto.ExplorerItem import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.streamit.content.ui.dto.SimpleEventDataObject import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.mediaprocessing.ui.dto.ExplorerItem
import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -22,6 +28,14 @@ private val logger = KotlinLogging.logger {}
class UIApplication { class UIApplication {
} }
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
private var context: ApplicationContext? = null private var context: ApplicationContext? = null
private val kafkaClearedLatch = CountDownLatch(1) private val kafkaClearedLatch = CountDownLatch(1)
@ -35,6 +49,14 @@ val memActiveEventMap: ObservableMap<String, EventDataObject> = observableMapOf(
val fileRegister: ObservableMap<String, ExplorerItem> = observableMapOf() val fileRegister: ObservableMap<String, ExplorerItem> = observableMapOf()
fun main(args: Array<String>) { fun main(args: Array<String>) {
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.connect()
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
Coroutines.addListener(object : Observables.ObservableValue.ValueListener<Throwable> { Coroutines.addListener(object : Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) { override fun onUpdated(value: Throwable) {
logger.error { "Received error: ${value.message}" } logger.error { "Received error: ${value.message}" }
@ -60,13 +82,14 @@ fun main(args: Array<String>) {
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
kafkaClearedLatch.countDown() // kafkaClearedLatch.countDown()
} }
logger.info { "Waiting for kafka to clear offset!" } // logger.info { "Waiting for kafka to clear offset!" }
kafkaClearedLatch.await(5, TimeUnit.MINUTES) // kafkaClearedLatch.await(5, TimeUnit.MINUTES)
logger.info { "Offset cleared!" } // logger.info { "Offset cleared!" }
Thread.sleep(10000) // Thread.sleep(10000)
context = runApplication<UIApplication>(*args)
} }

View File

@ -0,0 +1,10 @@
package no.iktdev.mediaprocessing.ui
import java.io.File
object UIEnv {
var storedContent: File = if (!System.getenv("DIRECTORY_CONTENT_STORED").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_STORED")) else File("/src/output")
val socketEncoder: String = if (System.getenv("EncoderWs").isNullOrBlank()) System.getenv("EncoderWs") else "ws://encoder:8080"
val coordinatorUrl: String = if (System.getenv("Coordinator").isNullOrBlank()) System.getenv("Coordinator") else "http://coordinator"
}

View File

@ -0,0 +1,33 @@
package no.iktdev.mediaprocessing.ui.coordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class PersistentEventBasedMessageListener: EventBasedMessageListener<PersistentMessage>() {
override fun listenerWantingEvent(
event: PersistentMessage,
waitingListeners: List<Tasks<PersistentMessage>>
): List<Tasks<PersistentMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentMessage,
history: List<PersistentMessage>,
listeners: List<ITaskCreatorListener<PersistentMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentMessage>): List<Tasks<PersistentMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui.dto package no.iktdev.mediaprocessing.ui.dto
enum class SimpleEventDataState { enum class SimpleEventDataState {
NA, NA,

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing.ui.dto
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
data class EventSummary(
val referenceId: String,
val baseName: String,
val collection: String,
val events: List<KafkaEvents>,
val status: SummaryState,
val activeEvens: Map<String, EventSummarySubItem>
)
data class EventSummarySubItem(
val eventId: String,
val status: SummaryState,
val progress: Int = 0
)
enum class SummaryState {
Completed,
AwaitingStore,
Working,
Pending,
AwaitingConfirmation,
Preparing,
Metadata,
Analyzing,
Reading,
Started
}

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui.dto package no.iktdev.mediaprocessing.ui.dto
interface ExplorerAttr { interface ExplorerAttr {
val created: Long val created: Long

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui.dto package no.iktdev.mediaprocessing.ui.dto
data class ExplorerCursor ( data class ExplorerCursor (
val name: String, val name: String,

View File

@ -1,7 +1,11 @@
package no.iktdev.streamit.content.ui.explorer package no.iktdev.mediaprocessing.ui.explorer
import no.iktdev.streamit.content.ui.UIEnv import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.streamit.content.ui.dto.* import no.iktdev.mediaprocessing.ui.UIEnv
import no.iktdev.mediaprocessing.ui.dto.ExplorerAttributes
import no.iktdev.mediaprocessing.ui.dto.ExplorerCursor
import no.iktdev.mediaprocessing.ui.dto.ExplorerItem
import no.iktdev.mediaprocessing.ui.dto.ExplorerItemType
import java.io.File import java.io.File
import java.io.FileFilter import java.io.FileFilter
import java.nio.file.Files import java.nio.file.Files
@ -67,7 +71,7 @@ class ExplorerCore {
} }
fun getHomeCursor(): ExplorerCursor? { fun getHomeCursor(): ExplorerCursor? {
return getCursor(UIEnv.incomingContent.absolutePath) return getCursor(SharedConfig.incomingContent.absolutePath)
} }
} }

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui.service package no.iktdev.mediaprocessing.ui.service
import dev.vishna.watchservice.KWatchEvent import dev.vishna.watchservice.KWatchEvent
import dev.vishna.watchservice.asWatchChannel import dev.vishna.watchservice.asWatchChannel
@ -6,8 +6,8 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.streamit.content.ui.explorer.ExplorerCore import no.iktdev.mediaprocessing.ui.explorer.ExplorerCore
import no.iktdev.streamit.content.ui.fileRegister import no.iktdev.mediaprocessing.ui.fileRegister
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.math.BigInteger import java.math.BigInteger

View File

@ -1,15 +1,21 @@
package no.iktdev.streamit.content.ui.socket package no.iktdev.mediaprocessing.ui.socket
import no.iktdev.streamit.content.ui.explorer.ExplorerCore import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest
import no.iktdev.mediaprocessing.ui.UIEnv
import no.iktdev.mediaprocessing.ui.explorer.ExplorerCore
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.Payload import org.springframework.messaging.handler.annotation.Payload
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.stereotype.Controller import org.springframework.stereotype.Controller
import org.springframework.web.client.RestTemplate
val log = KotlinLogging.logger {}
@Controller @Controller
class ExplorerTopic( class ExplorerTopic(
@Autowired private val template: SimpMessagingTemplate?, @Autowired private val template: SimpMessagingTemplate?,
@Autowired private val coordinatorTemplate: RestTemplate,
val explorer: ExplorerCore = ExplorerCore() val explorer: ExplorerCore = ExplorerCore()
): TopicSupport() { ): TopicSupport() {
@ -28,5 +34,15 @@ class ExplorerTopic(
} }
} }
@MessageMapping("/request/convert")
fun requestConvert(@Payload data: ConvertRequest) {
val req = coordinatorTemplate.postForEntity(UIEnv.coordinatorUrl, data, String.javaClass)
log.info { req }
}
@MessageMapping("/request/all")
fun requestAllAvailableActions() {
}
} }

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.ui.socket package no.iktdev.mediaprocessing.ui.socket
import com.google.gson.Gson import com.google.gson.Gson

View File

@ -1,11 +1,11 @@
package no.iktdev.streamit.content.ui.socket package no.iktdev.mediaprocessing.ui.socket
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.streamit.content.ui.dto.EventDataObject import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.streamit.content.ui.dto.SimpleEventDataObject import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject
import no.iktdev.streamit.content.ui.memActiveEventMap import no.iktdev.mediaprocessing.ui.memActiveEventMap
import no.iktdev.streamit.content.ui.memSimpleConvertedEventsMap import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate

View File

@ -1,12 +1,12 @@
package no.iktdev.streamit.content.ui.socket.internal package no.iktdev.mediaprocessing.ui.socket.internal
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.streamit.content.ui.UIEnv import no.iktdev.mediaprocessing.ui.UIEnv
import no.iktdev.streamit.content.ui.dto.EventDataObject import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.streamit.content.ui.memActiveEventMap import no.iktdev.mediaprocessing.ui.memActiveEventMap
import no.iktdev.streamit.content.ui.memSimpleConvertedEventsMap import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap
import org.springframework.messaging.simp.stomp.StompFrameHandler import org.springframework.messaging.simp.stomp.StompFrameHandler
import org.springframework.messaging.simp.stomp.StompHeaders import org.springframework.messaging.simp.stomp.StompHeaders
import org.springframework.messaging.simp.stomp.StompSession import org.springframework.messaging.simp.stomp.StompSession

View File

@ -1,10 +0,0 @@
package no.iktdev.streamit.content.ui
import java.io.File
class UIEnv {
companion object {
var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input")
val socketEncoder: String = if (System.getenv("WS_ENCODER").isNullOrBlank()) System.getenv("WS_ENCODER") else "ws://encoder:8080"
}
}

View File

@ -0,0 +1,3 @@
spring.output.ansi.enabled=always
logging.level.org.apache.kafka=INFO
logging.level.root=INFO

View File

@ -0,0 +1,21 @@
.contextmenu {
display: block;
border: black 1px solid;
border-radius: 5px;
position: absolute;
}
.contextMenuItem {
cursor: pointer;
padding-top: 5px;
padding-bottom: 5px;
padding-left: 25px;
padding-right: 25px;
}
.contextMenuItem:hover {
background-color: black;
}

View File

@ -1,4 +1,4 @@
import React, { useEffect } from 'react'; import React, { useEffect, useState } from 'react';
import logo from './logo.svg'; import logo from './logo.svg';
import './App.css'; import './App.css';
import { Box, CssBaseline } from '@mui/material'; import { Box, CssBaseline } from '@mui/material';

View File

@ -18,16 +18,13 @@ type NullableTableRowActionEvents<T> = TableRowActionEvents<T> | null;
export interface TableRowActionEvents<T> { export interface TableRowActionEvents<T> {
click: (row: T) => void; click: (row: T) => void;
doubleClick: (row: T) => void; doubleClick: (row: T) => void;
contextMenu: (row: T) => void; contextMenu?: (row: T, x: number, y: number) => void;
} }
export default function SimpleTable<T>({ items, columns, customizer, onRowClickedEvent }: { items: Array<T>, columns: Array<TablePropetyConfig>, customizer?: TableCellCustomizer<T>, onRowClickedEvent?: TableRowActionEvents<T> }) { export default function SimpleTable<T>({ items, columns, customizer, onRowClickedEvent }: { items: Array<T>, columns: Array<TablePropetyConfig>, customizer?: TableCellCustomizer<T>, onRowClickedEvent?: TableRowActionEvents<T> }) {
const muiTheme = useTheme(); const muiTheme = useTheme();
const [contextMenuVisible, setContextMenuVisible] = useState(false);
const [contextMenuPosition, setContextMenuPosition] = useState({ top: 0, left: 0 });
const [order, setOrder] = useState<'asc' | 'desc'>('asc'); const [order, setOrder] = useState<'asc' | 'desc'>('asc');
const [orderBy, setOrderBy] = useState<string>(''); const [orderBy, setOrderBy] = useState<string>('');
const [selectedRow, setSelectedRow] = useState<T | null>(null); const [selectedRow, setSelectedRow] = useState<T | null>(null);
@ -45,6 +42,13 @@ export default function SimpleTable<T>({ items, columns, customizer, onRowClicke
} }
} }
const tableRowContextMenu = (e: React.MouseEvent<HTMLTableRowElement, MouseEvent> , row: T | null) => {
if (row && onRowClickedEvent && onRowClickedEvent.contextMenu) {
e.preventDefault()
onRowClickedEvent.contextMenu(row, e.pageX, e.pageY)
}
}
const handleSort = (property: string) => { const handleSort = (property: string) => {
const isAsc = orderBy === property && order === 'asc'; const isAsc = orderBy === property && order === 'asc';
setOrder(isAsc ? 'desc' : 'asc'); setOrder(isAsc ? 'desc' : 'asc');
@ -114,6 +118,10 @@ export default function SimpleTable<T>({ items, columns, customizer, onRowClicke
<TableRow key={rowIndex} <TableRow key={rowIndex}
onClick={() => tableRowSingleClicked(row)} onClick={() => tableRowSingleClicked(row)}
onDoubleClick={() => tableRowDoubleClicked(row)} onDoubleClick={() => tableRowDoubleClicked(row)}
onContextMenu={(e) => {
tableRowContextMenu(e, row);
tableRowSingleClicked(row);
}}
style={{ cursor: "pointer", backgroundColor: selectedRow === row ? muiTheme.palette.action.selected : '' }} style={{ cursor: "pointer", backgroundColor: selectedRow === row ? muiTheme.palette.action.selected : '' }}
> >
{columns.map((column) => ( {columns.map((column) => (

View File

@ -1,4 +1,4 @@
import { useEffect } from 'react'; import { useEffect, useState } from 'react';
import { UnixTimestamp } from '../features/UxTc'; import { UnixTimestamp } from '../features/UxTc';
import { Box, Button, Typography, useTheme } from '@mui/material'; import { Box, Button, Typography, useTheme } from '@mui/material';
import { useDispatch, useSelector } from 'react-redux'; import { useDispatch, useSelector } from 'react-redux';
@ -7,10 +7,13 @@ import SimpleTable, { TableCellCustomizer, TablePropetyConfig, TableRowActionEve
import { useStompClient } from 'react-stomp-hooks'; import { useStompClient } from 'react-stomp-hooks';
import { useWsSubscription } from '../ws/subscriptions'; import { useWsSubscription } from '../ws/subscriptions';
import { updateItems } from '../store/explorer-slice'; import { updateItems } from '../store/explorer-slice';
import { setContextMenuPosition, setContextMenuVisible } from '../store/context-menu-slice';
import FolderIcon from '@mui/icons-material/Folder'; import FolderIcon from '@mui/icons-material/Folder';
import IconForward from '@mui/icons-material/ArrowForwardIosRounded'; import IconForward from '@mui/icons-material/ArrowForwardIosRounded';
import IconHome from '@mui/icons-material/Home'; import IconHome from '@mui/icons-material/Home';
import { ExplorerItem, ExplorerCursor, ExplorerItemType } from '../../types'; import { ExplorerItem, ExplorerCursor, ExplorerItemType } from '../../types';
import ContextMenu, { ContextMenuActionEvent, ContextMenuItem } from '../features/ContextMenu';
import { canConvert, canEncode, canExtract } from '../../fileUtil';
const createTableCell: TableCellCustomizer<ExplorerItem> = (accessor, data) => { const createTableCell: TableCellCustomizer<ExplorerItem> = (accessor, data) => {
@ -80,12 +83,52 @@ function getSegmentedNaviagatablePath(navigateTo: (path: string | null) => void,
) )
} }
function getContextMenuFileActionMenuItems(row: ExplorerItem | null): ContextMenuItem[] {
const ext = row?.extension;
const items: Array<ContextMenuItem> = [];
if (!ext) {return items;}
if (canEncode(ext) && canExtract(ext)) {
items.push({
actionIndex: 0,
icon: null,
text: "All available"
} as ContextMenuItem)
}
if (canEncode(ext)) {
items.push({
actionIndex: 1,
icon: null,
text: "Encode"
} as ContextMenuItem)
}
if (canExtract(ext)) {
items.push({
actionIndex: 2,
icon: null,
text: "Extract"
} as ContextMenuItem)
}
if (canConvert(ext)) {
items.push({
actionIndex: 3,
icon: null,
text: "Convert"
} as ContextMenuItem)
}
console.log(items);
return items;
}
export default function ExplorePage() { export default function ExplorePage() {
const muiTheme = useTheme(); const muiTheme = useTheme();
const dispatch = useDispatch(); const dispatch = useDispatch();
const client = useStompClient(); const client = useStompClient();
const cursor = useSelector((state: RootState) => state.explorer) const cursor = useSelector((state: RootState) => state.explorer)
const [selectedRow, setSelectedRow] = useState<ExplorerItem|null>(null);
const [actionableItems, setActionableItems] = useState<Array<ContextMenuItem>>([]);
const navigateTo = (path: string | null) => { const navigateTo = (path: string | null) => {
console.log(path) console.log(path)
@ -98,14 +141,52 @@ export default function ExplorePage() {
} }
const onItemSelectedEvent: TableRowActionEvents<ExplorerItem> = { const onItemSelectedEvent: TableRowActionEvents<ExplorerItem> = {
click: (row: ExplorerItem) => null, click: (row: ExplorerItem) => {
setSelectedRow(row)
},
doubleClick: (row: ExplorerItem) => { doubleClick: (row: ExplorerItem) => {
console.log(row); console.log(row);
if (row.type === "FOLDER") { if (row.type === "FOLDER") {
navigateTo(row.path); navigateTo(row.path);
} }
}, },
contextMenu: (row: ExplorerItem) => null contextMenu: (row: ExplorerItem, x: number, y: number) => {
if (row.type === "FOLDER") {
return;
}
dispatch(setContextMenuVisible(true))
dispatch(setContextMenuPosition({x: x, y: y}))
setActionableItems(getContextMenuFileActionMenuItems(row))
}
}
const onContextMenuItemClickedEvent: ContextMenuActionEvent<ExplorerItem> = {
selected:(actionIndex: number | null, value: ExplorerItem | null) => {
switch(actionIndex) {
case 0: {
console.log("All");
break;
}
case 1: {
console.log("Encode")
break;
}
case 2: {
console.log("Extract")
break;
}
case 3: {
console.log("Convert")
break;
}
default: {
}
}
}
} }
const onHomeClick = () => { const onHomeClick = () => {
@ -139,6 +220,7 @@ export default function ExplorePage() {
return ( return (
<>
<Box display="block"> <Box display="block">
<Box sx={{ <Box sx={{
height: 50, height: 50,
@ -176,5 +258,9 @@ export default function ExplorePage() {
<SimpleTable items={cursor?.items ?? []} columns={columns} customizer={createTableCell} onRowClickedEvent={onItemSelectedEvent} /> <SimpleTable items={cursor?.items ?? []} columns={columns} customizer={createTableCell} onRowClickedEvent={onItemSelectedEvent} />
</Box> </Box>
</Box> </Box>
<ContextMenu row={selectedRow} actionItems={actionableItems} onContextMenuItemClicked={onContextMenuItemClickedEvent} />
</>
) )
} }

View File

@ -2,13 +2,15 @@ import { configureStore, ThunkAction, Action } from '@reduxjs/toolkit';
import composedSlice from './store/composed-slice'; import composedSlice from './store/composed-slice';
import explorerSlice from './store/explorer-slice'; import explorerSlice from './store/explorer-slice';
import kafkaItemsFlatSlice from './store/kafka-items-flat-slice'; import kafkaItemsFlatSlice from './store/kafka-items-flat-slice';
import contextMenuSlice from './store/context-menu-slice';
export const store = configureStore({ export const store = configureStore({
reducer: { reducer: {
composed: composedSlice, composed: composedSlice,
explorer: explorerSlice, explorer: explorerSlice,
kafkaComposedFlat: kafkaItemsFlatSlice kafkaComposedFlat: kafkaItemsFlatSlice,
contextMenu: contextMenuSlice
}, },
}); });

View File

@ -1,4 +1,5 @@
import { PayloadAction, createSlice } from "@reduxjs/toolkit" import { PayloadAction, createSlice } from "@reduxjs/toolkit"
import { EventDataObject } from "../../types"
interface ComposedState { interface ComposedState {
items: Array<EventDataObject> items: Array<EventDataObject>

View File

@ -1,4 +1,5 @@
import { PayloadAction, createSlice } from "@reduxjs/toolkit" import { PayloadAction, createSlice } from "@reduxjs/toolkit"
import { SimpleEventDataObject } from "../../types"
interface ComposedState { interface ComposedState {
items: Array<SimpleEventDataObject> items: Array<SimpleEventDataObject>

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.shared.common.helper
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
data class DerivedProcessIterationHolder(
val eventId: String,
val event: PersistentProcessDataMessage,
var iterated: Int = 0
)

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime import java.time.LocalDateTime
@ -19,6 +20,15 @@ fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event return this.event == event
} }
fun PersistentMessage.isSuccess(): Boolean {
return try {
this.data.isSuccess()
} catch (e: Exception) {
false
}
}
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try { val kev = try {
KafkaEvents.toEvent(row[events.event]) KafkaEvents.toEvent(row[events.event])

View File

@ -15,7 +15,11 @@ data class SimpleMessageData(
fun MessageDataWrapper?.isSuccess(): Boolean { fun MessageDataWrapper?.isSuccess(): Boolean {
return this != null && this.status != Status.ERROR return this != null && this.status == Status.COMPLETED
}
fun MessageDataWrapper?.isFailed(): Boolean {
return if (this == null) true else this.status != Status.COMPLETED
} }
fun MessageDataWrapper?.isSkipped(): Boolean { fun MessageDataWrapper?.isSkipped(): Boolean {

View File

@ -11,5 +11,5 @@ data class ConvertWorkPerformed(
override val message: String? = null, override val message: String? = null,
val producedBy: String, val producedBy: String,
val derivedFromEventId: String, val derivedFromEventId: String,
val outFiles: List<String> val outFiles: List<String> = listOf()
): MessageDataWrapper(status, message) ): MessageDataWrapper(status, message)