Small changes

This commit is contained in:
bskjon 2024-06-19 19:56:31 +02:00
parent 8c71df05b9
commit 9244a65235
9 changed files with 51 additions and 171 deletions

View File

@ -4,6 +4,7 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.lastOf
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.isOnly
@ -31,11 +32,10 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }
val startedEvent = events.lastOf(KafkaEvents.EventMediaProcessStarted)
val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az<MediaProcessStarted>()
if (startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) && startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
val subtitleFile = File(startedEventData.file)
return produceConvertWorkRequest(subtitleFile, null, startedEvent?.eventId)
return produceConvertWorkRequest(subtitleFile, null, event.eventId)
} else {
val derivedInfoObject = if (event.event in requiredEvents) {
DerivedInfoObject.fromExtractWorkCreated(event)

View File

@ -6,11 +6,8 @@ import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.springframework.boot.autoconfigure.SpringBootApplication

View File

@ -201,9 +201,6 @@ export default function ExplorePage() {
useEffect(() => {
if (cursor)
// Kjør din funksjon her når komponenten lastes inn for første gang
// Sjekk om cursor er null
if (cursor.path === null && client !== null) {

View File

@ -3,6 +3,7 @@ import composedSlice from './store/composed-slice';
import explorerSlice from './store/explorer-slice';
import kafkaItemsFlatSlice from './store/kafka-items-flat-slice';
import contextMenuSlice from './store/context-menu-slice';
import persistentEventsSlice from './store/persistent-events-slice';
export const store = configureStore({
@ -10,7 +11,8 @@ export const store = configureStore({
composed: composedSlice,
explorer: explorerSlice,
kafkaComposedFlat: kafkaItemsFlatSlice,
contextMenu: contextMenuSlice
contextMenu: contextMenuSlice,
persistentEvents: persistentEventsSlice
},
});

View File

@ -72,3 +72,21 @@ enum SimpleEventDataState {
encodingProgress?: number | null;
encodingTimeLeft?: number | null;
}
interface EventObject {
referenceId: string;
eventId: string;
event: string;
data: string;
created: string;
}
interface EventsObjectList {
referenceId: string;
events: Array<EventObject>;
}
interface EventsObjectListResponse {
lastPull: string;
items: Array<EventsObjectList>;
}

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.delay
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import java.io.File

View File

@ -1,132 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.deleteWhere
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.update
import java.sql.SQLIntegrityConstraintViolationException
private val log = KotlinLogging.logger {}
open class PersistentDataStore(var dataSource: DataSource) {
fun storeEventDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException(dataSource.database) {
events.insert {
it[events.referenceId] = message.referenceId
it[events.eventId] = message.eventId
it[events.event] = event
it[events.data] = message.dataAsJson()
}
}
return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) {
log.info { "Error is of SQLIntegrityConstraintViolationException" }
try {
log.info { "Error code is: ${ (exception as ExposedSQLException).errorCode}" }
} catch (e: Exception) {
}
//exception.printStackTrace()
(exception as ExposedSQLException).errorCode == 1062
}
else {
exception.printStackTrace()
false
}
}
}
fun deleteStoredEventDataMessage(referenceId: String, eventId: String, event: KafkaEvents): Boolean {
return executeWithStatus(dataSource.database) {
events.deleteWhere {
(events.referenceId eq referenceId) and
(events.eventId eq eventId) and
(events.event eq event.event)
}
}
}
fun storeProcessDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException(dataSource.database) {
processerEvents.insert {
it[processerEvents.referenceId] = message.referenceId
it[processerEvents.eventId] = message.eventId
it[processerEvents.event] = event
it[processerEvents.data] = message.dataAsJson()
}
}
return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) {
(exception as ExposedSQLException).errorCode == 1062
}
else {
exception.printStackTrace()
false
}
}
}
fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false)
}) {
it[processerEvents.claimedBy] = claimedBy
it[lastCheckIn] = CurrentDateTime
it[claimed] = true
}
} == 1
}
fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimedBy eq claimedBy) and
(processerEvents.claimed eq true)
}) {
it[processerEvents.consumed] = true
}
} == 1
}
fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return executeWithStatus(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false) and
(processerEvents.claimedBy eq claimedBy)
}) {
it[lastCheckIn] = CurrentDateTime
}
}
}
fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean {
val exception = executeOrException(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[claimedBy] = null
it[lastCheckIn] = null
it[claimed] = false
}
}
return exception == null
}
}