From 9244a6523557ce38b0a08b446aa2170c738422f7 Mon Sep 17 00:00:00 2001 From: bskjon Date: Wed, 19 Jun 2024 19:56:31 +0200 Subject: [PATCH] Small changes --- .../tasks/event/CreateConvertWorkTask.kt | 6 +- .../processer/ProcesserApplication.kt | 3 - apps/ui/web/src/app/page/ExplorePage.tsx | 3 - apps/ui/web/src/app/store.ts | 4 +- apps/ui/web/src/types.d.ts | 22 ++- .../shared/common/CoordinatorBase.kt | 1 - .../mediaprocessing/shared/common/Utils.kt | 1 - .../common/persistance/PersistentDataStore.kt | 132 ------------------ .../shared/kafka/core/KafkaEvents.kt | 50 +++---- 9 files changed, 51 insertions(+), 171 deletions(-) delete mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index a1e1f15c..cf3cc60b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -4,6 +4,7 @@ import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent import no.iktdev.mediaprocessing.shared.common.persistance.lastOf import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.isOnly @@ -31,11 +32,10 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } - val startedEvent = events.lastOf(KafkaEvents.EventMediaProcessStarted) val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az() - if (startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) { + if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) && startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) { val subtitleFile = File(startedEventData.file) - return produceConvertWorkRequest(subtitleFile, null, startedEvent?.eventId) + return produceConvertWorkRequest(subtitleFile, null, event.eventId) } else { val derivedInfoObject = if (event.event in requiredEvents) { DerivedInfoObject.fromExtractWorkCreated(event) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index 0d8edc02..40be7802 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -6,11 +6,8 @@ import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.Observables import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents -import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.toEventsDatabase import org.jetbrains.exposed.sql.transactions.TransactionManager import org.springframework.boot.autoconfigure.SpringBootApplication diff --git a/apps/ui/web/src/app/page/ExplorePage.tsx b/apps/ui/web/src/app/page/ExplorePage.tsx index 79267044..1d59f6c3 100644 --- a/apps/ui/web/src/app/page/ExplorePage.tsx +++ b/apps/ui/web/src/app/page/ExplorePage.tsx @@ -201,9 +201,6 @@ export default function ExplorePage() { useEffect(() => { - if (cursor) - - // Kjør din funksjon her når komponenten lastes inn for første gang // Sjekk om cursor er null if (cursor.path === null && client !== null) { diff --git a/apps/ui/web/src/app/store.ts b/apps/ui/web/src/app/store.ts index 83ade210..a0b790e3 100644 --- a/apps/ui/web/src/app/store.ts +++ b/apps/ui/web/src/app/store.ts @@ -3,6 +3,7 @@ import composedSlice from './store/composed-slice'; import explorerSlice from './store/explorer-slice'; import kafkaItemsFlatSlice from './store/kafka-items-flat-slice'; import contextMenuSlice from './store/context-menu-slice'; +import persistentEventsSlice from './store/persistent-events-slice'; export const store = configureStore({ @@ -10,7 +11,8 @@ export const store = configureStore({ composed: composedSlice, explorer: explorerSlice, kafkaComposedFlat: kafkaItemsFlatSlice, - contextMenu: contextMenuSlice + contextMenu: contextMenuSlice, + persistentEvents: persistentEventsSlice }, }); diff --git a/apps/ui/web/src/types.d.ts b/apps/ui/web/src/types.d.ts index 1afe602b..78bf141e 100644 --- a/apps/ui/web/src/types.d.ts +++ b/apps/ui/web/src/types.d.ts @@ -57,7 +57,7 @@ enum SimpleEventDataState { FAILED, } - interface SimpleEventDataObject { +interface SimpleEventDataObject { id: string; name?: string | null; path?: string | null; @@ -71,4 +71,22 @@ enum SimpleEventDataState { eventCollected: SimpleEventDataState; encodingProgress?: number | null; encodingTimeLeft?: number | null; - } \ No newline at end of file +} + +interface EventObject { + referenceId: string; + eventId: string; + event: string; + data: string; + created: string; +} + +interface EventsObjectList { + referenceId: string; + events: Array; +} + +interface EventsObjectListResponse { + lastPull: string; + items: Array; +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index 249cf920..4b151cd1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.CoroutinesDefault -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index ba4dbe49..5d17c48c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common import kotlinx.coroutines.delay import mu.KotlinLogging import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import java.io.File diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt deleted file mode 100644 index e1c9b1e4..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt +++ /dev/null @@ -1,132 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.persistance - -import mu.KotlinLogging -import no.iktdev.mediaprocessing.shared.common.datasource.DataSource -import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException -import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus -import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.Message -import org.jetbrains.exposed.exceptions.ExposedSQLException -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.and -import org.jetbrains.exposed.sql.deleteWhere -import org.jetbrains.exposed.sql.insert -import org.jetbrains.exposed.sql.javatime.CurrentDateTime -import org.jetbrains.exposed.sql.update -import java.sql.SQLIntegrityConstraintViolationException - -private val log = KotlinLogging.logger {} -open class PersistentDataStore(var dataSource: DataSource) { - fun storeEventDataMessage(event: String, message: Message<*>): Boolean { - val exception = executeOrException(dataSource.database) { - events.insert { - it[events.referenceId] = message.referenceId - it[events.eventId] = message.eventId - it[events.event] = event - it[events.data] = message.dataAsJson() - } - } - return if (exception == null) true else { - if (exception.cause is SQLIntegrityConstraintViolationException) { - log.info { "Error is of SQLIntegrityConstraintViolationException" } - try { - log.info { "Error code is: ${ (exception as ExposedSQLException).errorCode}" } - } catch (e: Exception) { - - } - //exception.printStackTrace() - (exception as ExposedSQLException).errorCode == 1062 - } - else { - exception.printStackTrace() - false - } - } - } - - fun deleteStoredEventDataMessage(referenceId: String, eventId: String, event: KafkaEvents): Boolean { - return executeWithStatus(dataSource.database) { - events.deleteWhere { - (events.referenceId eq referenceId) and - (events.eventId eq eventId) and - (events.event eq event.event) - } - } - } - - fun storeProcessDataMessage(event: String, message: Message<*>): Boolean { - val exception = executeOrException(dataSource.database) { - processerEvents.insert { - it[processerEvents.referenceId] = message.referenceId - it[processerEvents.eventId] = message.eventId - it[processerEvents.event] = event - it[processerEvents.data] = message.dataAsJson() - } - } - return if (exception == null) true else { - if (exception.cause is SQLIntegrityConstraintViolationException) { - (exception as ExposedSQLException).errorCode == 1062 - } - else { - exception.printStackTrace() - false - } - } - } - - fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withTransaction(dataSource.database) { - processerEvents.update({ - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) and - (processerEvents.claimed eq false) - }) { - it[processerEvents.claimedBy] = claimedBy - it[lastCheckIn] = CurrentDateTime - it[claimed] = true - } - } == 1 - } - - fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withTransaction(dataSource.database) { - processerEvents.update({ - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) and - (processerEvents.claimedBy eq claimedBy) and - (processerEvents.claimed eq true) - }) { - it[processerEvents.consumed] = true - } - } == 1 - } - - fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { - return executeWithStatus(dataSource.database) { - processerEvents.update({ - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) and - (processerEvents.claimed eq false) and - (processerEvents.claimedBy eq claimedBy) - }) { - it[lastCheckIn] = CurrentDateTime - } - } - } - - fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean { - val exception = executeOrException(dataSource.database) { - processerEvents.update({ - (processerEvents.referenceId eq referenceId) and - (processerEvents.eventId eq eventId) - }) { - it[claimedBy] = null - it[lastCheckIn] = null - it[claimed] = false - } - } - return exception == null - } - -} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index e2c4a000..a1704e0f 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -1,41 +1,41 @@ package no.iktdev.mediaprocessing.shared.kafka.core enum class KafkaEvents(val event: String) { - EventMediaProcessStarted("event:media-process:started"), + EventMediaProcessStarted ("event:media-process:started"), - EventMediaReadStreamPerformed("event:media-read-stream:performed"), - EventMediaParseStreamPerformed("event:media-parse-stream:performed"), - EventMediaReadBaseInfoPerformed("event:media-read-base-info:performed"), - EventMediaMetadataSearchPerformed("event:media-metadata-search:performed"), - EventMediaReadOutNameAndType("event:media-read-out-name-and-type:performed"), - EventMediaReadOutCover("event:media-read-out-cover:performed"), + EventMediaReadStreamPerformed ("event:media-read-stream:performed"), + EventMediaParseStreamPerformed ("event:media-parse-stream:performed"), + EventMediaReadBaseInfoPerformed ("event:media-read-base-info:performed"), + EventMediaMetadataSearchPerformed ("event:media-metadata-search:performed"), + EventMediaReadOutNameAndType ("event:media-read-out-name-and-type:performed"), + EventMediaReadOutCover ("event:media-read-out-cover:performed"), - EventMediaParameterEncodeCreated("event:media-encode-parameter:created"), - EventMediaParameterExtractCreated("event:media-extract-parameter:created"), - EventMediaParameterConvertCreated("event:media-convert-parameter:created"), - EventMediaParameterDownloadCoverCreated("event:media-download-cover-parameter:created"), + EventMediaParameterEncodeCreated ("event:media-encode-parameter:created"), + EventMediaParameterExtractCreated ("event:media-extract-parameter:created"), + EventMediaParameterConvertCreated ("event:media-convert-parameter:created"), + EventMediaParameterDownloadCoverCreated ("event:media-download-cover-parameter:created"), - EventMediaWorkProceedPermitted("event:media-work-proceed:permitted"), + EventMediaWorkProceedPermitted ("event:media-work-proceed:permitted"), EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"), - EventWorkEncodeCreated("event:work-encode:created"), - EventWorkExtractCreated("event:work-extract:created"), - EventWorkConvertCreated("event:work-convert:created"), + EventWorkEncodeCreated ("event:work-encode:created"), + EventWorkExtractCreated ("event:work-extract:created"), + EventWorkConvertCreated ("event:work-convert:created"), - EventWorkEncodePerformed("event:work-encode:performed"), - EventWorkExtractPerformed("event:work-extract:performed"), - EventWorkConvertPerformed("event:work-convert:performed"), - EventWorkDownloadCoverPerformed("event:work-download-cover:performed"), + EventWorkEncodePerformed ("event:work-encode:performed"), + EventWorkExtractPerformed ("event:work-extract:performed"), + EventWorkConvertPerformed ("event:work-convert:performed"), + EventWorkDownloadCoverPerformed ("event:work-download-cover:performed"), - EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"), - EVENT_STORE_SUBTITLE_PERFORMED("event:store-subtitle:performed"), - EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), - EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), + EVENT_STORE_VIDEO_PERFORMED ("event:store-video:performed"), + EVENT_STORE_SUBTITLE_PERFORMED ("event:store-subtitle:performed"), + EVENT_STORE_COVER_PERFORMED ("event:store-cover:performed"), + EVENT_STORE_METADATA_PERFORMED ("event:store-metadata:performed"), - EventMediaProcessCompleted("event:media-process:completed"), - EventCollectAndStore("event::save"), + EventMediaProcessCompleted ("event:media-process:completed"), + EventCollectAndStore ("event::save"), ;