From ce4019a1e6ce203ab9450264d2466b979a3cb83f Mon Sep 17 00:00:00 2001 From: Brage Date: Thu, 28 Mar 2024 04:05:02 +0100 Subject: [PATCH] multi database --- .github/workflows/v2.yml | 5 +- .../converter/ClaimsService.kt | 4 +- .../converter/ConverterApplication.kt | 30 +++- .../converter/ConverterCoordinator.kt | 12 +- .../converter/tasks/ConvertService.kt | 16 +- .../coordinator/Coordinator.kt | 157 +++--------------- .../coordinator/CoordinatorApplication.kt | 40 ++++- .../controller/ActionEventController.kt | 32 ++++ .../controller/RequestEventController.kt | 34 ++++ .../tasks/event/CompleteMediaTask.kt | 5 +- .../mediaprocessing/processer/Coordinator.kt | 17 +- .../processer/ProcesserApplication.kt | 30 +++- .../processer/services/ClaimsService.kt | 6 +- .../processer/services/EncodeService.kt | 22 ++- .../processer/services/ExtractService.kt | 20 +-- .../shared/common/SharedConfig.kt | 27 ++- .../shared/common/datasource/DataSource.kt | 13 +- .../datasource/DatabaseConnectionConfig.kt | 9 + .../common/datasource/MySqlDataSource.kt | 48 ++---- .../datasource/TableDefaultOperations.kt | 25 +-- .../persistance/PersistentDataReader.kt | 21 +-- .../common/persistance/PersistentDataStore.kt | 15 +- .../common/socket/SocketImplementation.kt | 1 - .../shared/contract/dto/ConvertRequest.kt | 4 +- .../shared/contract/dto/Enums.kt | 3 +- .../shared/contract/dto/RequestWorkProceed.kt | 6 + .../shared/contract/dto/Requester.kt | 5 + 27 files changed, 316 insertions(+), 291 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DatabaseConnectionConfig.kt create mode 100644 shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/RequestWorkProceed.kt create mode 100644 shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Requester.kt diff --git a/.github/workflows/v2.yml b/.github/workflows/v2.yml index e43c3555..2dba63d0 100644 --- a/.github/workflows/v2.yml +++ b/.github/workflows/v2.yml @@ -50,7 +50,10 @@ jobs: echo "Shared" echo "shared: ${{ needs.pre-check.outputs.shared }}" - + echo "\n" + echo "${{ needs.pre-check.outputs }}" + echo "${{ needs.pre-check }}" + build-shared: runs-on: ubuntu-latest needs: pre-check diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt index 38f6136f..2bc0331d 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt @@ -18,11 +18,11 @@ class ClaimsService() { @Scheduled(fixedDelay = (300_000)) fun validateClaims() { - val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents() + val expiredClaims = persistentReader.getExpiredClaimsProcessEvents() expiredClaims.forEach { log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } } - val store = PersistentDataStore() + val store = persistentWriter expiredClaims.forEach { val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) if (result) { diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt index 79ce293d..2a6dfcd6 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -1,9 +1,11 @@ package no.iktdev.mediaprocessing.converter -import kotlinx.coroutines.launch -import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents +import no.iktdev.mediaprocessing.shared.common.toEventsDatabase import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext @@ -16,14 +18,24 @@ private var context: ApplicationContext? = null fun getContext(): ApplicationContext? { return context } + + +lateinit var persistentReader: PersistentDataReader +lateinit var persistentWriter: PersistentDataStore + +private lateinit var eventsDatabase: MySqlDataSource +fun getEventsDatabase(): MySqlDataSource { + return eventsDatabase +} + fun main(args: Array) { - val dataSource = MySqlDataSource.fromDatabaseEnv() - Coroutines.default().launch { - dataSource.createDatabase() - dataSource.createTables( - processerEvents - ) - } + eventsDatabase = DatabaseEnvConfig.toEventsDatabase() + eventsDatabase.createDatabase() + eventsDatabase.createTables(processerEvents) + + persistentReader = PersistentDataReader(eventsDatabase) + persistentWriter = PersistentDataStore(eventsDatabase) + context = runApplication(*args) } //private val logger = KotlinLogging.logger {} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index 997ddf8a..c9d949f1 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -5,11 +5,7 @@ import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener -import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase -import no.iktdev.mediaprocessing.shared.common.DatabaseConfig -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord @@ -45,9 +41,9 @@ class ConverterCoordinator() : CoordinatorBase>) { if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) { - val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) + val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value) if (!success) { - log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" } + log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}!" } } else { readAllMessagesFor(event.value.referenceId, event.value.eventId) } @@ -59,7 +55,7 @@ class ConverterCoordinator() : CoordinatorBase>) { - val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value) + val success = persistentWriter.storeEventDataMessage(event.key.event, event.value) if (!success) { - log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" } + log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" } } else { io.launch { delay(500) // Give the database a few sec to update @@ -52,17 +52,6 @@ class Coordinator() : CoordinatorBase = listOf( + ProcessStartOperationEvents.ENCODE, + ProcessStartOperationEvents.EXTRACT, + ProcessStartOperationEvents.CONVERT + ) + startProcess(file, type, operations) + } + + fun startProcess(file: File, type: ProcessType, operations: List) { val processStartEvent = MediaProcessStarted( status = Status.COMPLETED, file = file.absolutePath, type = type ) producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, processStartEvent) + } public fun startRequestProcess(file: File, operations: List): UUID { @@ -91,8 +90,13 @@ class Coordinator() : CoordinatorBase): Boolean { - return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event } - .isNotEmpty() - } - - fun isMissingEncodeWorkCreated(messages: List): PersistentMessage? { - val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED } - return if (existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }) { - messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } - } else null - } - - fun isMissingExtractWorkCreated(messages: List): PersistentMessage? { - val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } - return if (existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }) { - messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED } - } else null - } - - - fun produceAllMissingProcesserEvents( - producer: CoordinatorProducer, - messages: List - ) { - val missingEncode = isMissingEncodeWorkCreated(messages) - val missingExtract = isMissingExtractWorkCreated(messages) - - if (missingEncode != null && missingEncode.data.isSuccess()) { - produceEncodeWork(producer, missingEncode) - } - if (missingExtract != null && missingExtract.data.isSuccess()) { - produceExtractWork(producer, missingExtract) - - } - } - - - fun produceEncodeWork(producer: CoordinatorProducer, message: PersistentMessage) { - if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { - throw RuntimeException("Incorrect event passed ${message.event}") - } - if (message.data !is FfmpegWorkerArgumentsCreated) { - throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") - } - val data = message.data as FfmpegWorkerArgumentsCreated - data.entries.forEach { - FfmpegWorkRequestCreated( - status = Status.COMPLETED, - inputFile = data.inputFile, - arguments = it.arguments, - outFile = it.outputFile - ).let { createdRequest -> - producer.sendMessage( - message.referenceId, - KafkaEvents.EVENT_WORK_ENCODE_CREATED, - eventId = message.eventId, - createdRequest - ) - } - } - } - - fun produceExtractWork(producer: CoordinatorProducer, message: PersistentMessage) { - if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { - throw RuntimeException("Incorrect event passed ${message.event}") - } - if (message.data !is FfmpegWorkerArgumentsCreated) { - throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") - } - val data = message.data as FfmpegWorkerArgumentsCreated - data.entries.forEach { - FfmpegWorkRequestCreated( - status = Status.COMPLETED, - inputFile = data.inputFile, - arguments = it.arguments, - outFile = it.outputFile - ).let { createdRequest -> - producer.sendMessage( - message.referenceId, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED, - eventId = message.eventId, - createdRequest - ) - } - val outFile = File(it.outputFile) - ConvertWorkerRequest( - status = Status.COMPLETED, - requiresEventId = message.eventId, - inputFile = it.outputFile, - true, - outFileBaseName = outFile.nameWithoutExtension, - outDirectory = outFile.parentFile.absolutePath - ).let { createdRequest -> - producer.sendMessage( - message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED, - createdRequest - ) - } - } - } - }*/ } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index 258d819b..d22ef220 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -4,10 +4,14 @@ package no.iktdev.mediaprocessing.coordinator import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.observable.Observables -import no.iktdev.mediaprocessing.shared.common.DatabaseConfig +import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.events +import no.iktdev.mediaprocessing.shared.common.toEventsDatabase +import no.iktdev.mediaprocessing.shared.common.toStoredDatabase import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.streamit.library.db.tables.* import no.iktdev.streamit.library.db.tables.helper.cast_errors @@ -26,26 +30,43 @@ class CoordinatorApplication { } private var context: ApplicationContext? = null +private lateinit var storeDatabase: MySqlDataSource @Suppress("unused") fun getContext(): ApplicationContext? { return context } +fun getStoreDatabase(): MySqlDataSource { + return storeDatabase +} + +private lateinit var eventsDatabase: MySqlDataSource +fun getEventsDatabase(): MySqlDataSource { + return eventsDatabase +} + +lateinit var persistentReader: PersistentDataReader +lateinit var persistentWriter: PersistentDataStore + fun main(args: Array) { Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener { override fun onUpdated(value: Throwable) { value.printStackTrace() } }) - val dataSource = MySqlDataSource.fromDatabaseEnv(); - dataSource.createDatabase() + + eventsDatabase = DatabaseEnvConfig.toEventsDatabase() + storeDatabase = DatabaseEnvConfig.toStoredDatabase() + + eventsDatabase.createDatabase() + storeDatabase.createDatabase() val kafkaTables = listOf( events, // For kafka ) - dataSource.createTables(*kafkaTables.toTypedArray()) + eventsDatabase.createTables(*kafkaTables.toTypedArray()) val tables = arrayOf( catalog, @@ -60,9 +81,10 @@ fun main(args: Array) { data_video, cast_errors ) - transaction { - SchemaUtils.createMissingTablesAndColumns(*tables) - } + storeDatabase.createTables(*tables) + + persistentReader = PersistentDataReader(eventsDatabase) + persistentWriter = PersistentDataStore(eventsDatabase) context = runApplication(*args) printSharedConfig() @@ -75,7 +97,7 @@ fun printSharedConfig() { log.info { "Ffprobe: ${SharedConfig.ffprobe}" } log.info { "Ffmpeg: ${SharedConfig.ffmpeg}" } - log.info { "Database: ${DatabaseConfig.database} @ ${DatabaseConfig.address}:${DatabaseConfig.port}" } + /*log.info { "Database: ${DatabaseConfig.database} @ ${DatabaseConfig.address}:${DatabaseConfig.port}" } log.info { "Username: ${DatabaseConfig.username}" } - log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" } + log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }*/ } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt new file mode 100644 index 00000000..8da0a096 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/ActionEventController.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import com.google.gson.Gson +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.persistentReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Controller +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping + +@Controller +@RequestMapping(path = ["/action"]) +class ActionEventController(@Autowired var coordinator: Coordinator) { + + + @RequestMapping("/flow/proceed") + fun permitRunOnSequence(@RequestBody data: RequestWorkProceed): ResponseEntity { + + val set = persistentReader.getMessagesFor(data.referenceId) + if (set.isEmpty()) { + return ResponseEntity.status(HttpStatus.NO_CONTENT).body(Gson().toJson(data)) + } + coordinator.permitWorkToProceedOn(data.referenceId, "Requested by ${data.source}") + + //EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted") + return ResponseEntity.ok(null) + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt index a43b23c0..0b587d96 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt @@ -2,7 +2,9 @@ package no.iktdev.mediaprocessing.coordinator.controller import com.google.gson.Gson import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.HttpStatus @@ -33,4 +35,36 @@ class RequestEventController(@Autowired var coordinator: Coordinator) { } return ResponseEntity.ok(null) } + + @PostMapping("/extract") + @ResponseStatus(HttpStatus.OK) + fun requestExtract(@RequestBody selectedFile: String): ResponseEntity { + try { + val file = File(selectedFile) + if (!file.exists()) { + return ResponseEntity.status(HttpStatus.NO_CONTENT).body(selectedFile) + } + coordinator.startProcess(file, ProcessType.MANUAL, listOf(ProcessStartOperationEvents.EXTRACT)) + + } catch (e: Exception) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(selectedFile) + } + return ResponseEntity.ok(null) + } + + @PostMapping("/all") + @ResponseStatus(HttpStatus.OK) + fun requestAll(@RequestBody selectedFile: String): ResponseEntity { + try { + val file = File(selectedFile) + if (!file.exists()) { + return ResponseEntity.status(HttpStatus.NO_CONTENT).body(selectedFile) + } + coordinator.startProcess(file, type = ProcessType.MANUAL) + + } catch (e: Exception) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(selectedFile) + } + return ResponseEntity.ok(null) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index ce00d43b..e05f5e3e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -37,7 +37,6 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task } val receivedEvents = events.map { it.event } - // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. val requiresOneOf = listOf( @@ -47,9 +46,9 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task ) if (requiresOneOf.none { it in receivedEvents }) { - val missing = requiresOneOf.filter { !receivedEvents.contains(it) } + val missing = requiresOneOf.subtract(receivedEvents.toSet()) log.info { "Can't complete at this moment. Missing required event(s)\n\t" + missing.joinToString("\n\t") } - return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event") + return null } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 08bf2eed..21b116d1 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -6,22 +6,13 @@ import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase -import no.iktdev.mediaprocessing.shared.common.DatabaseConfig -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage -import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.stereotype.Service -import javax.annotation.PostConstruct @Service @EnableScheduling @@ -52,9 +43,9 @@ class Coordinator(): CoordinatorBase) { - dataSource = MySqlDataSource.fromDatabaseEnv() - dataSource.createDatabase() - dataSource.createTables( - processerEvents - ) + eventsDatabase = DatabaseEnvConfig.toEventsDatabase() + eventsDatabase.createDatabase() + eventsDatabase.createTables(processerEvents) + + persistentReader = PersistentDataReader(eventsDatabase) + persistentWriter = PersistentDataStore(eventsDatabase) + val context = runApplication(*args) } @@ -37,7 +49,7 @@ class DatabaseReconnect() { fun checkIfConnected() { if (TransactionManager.currentOrNull() == null) { lostConnectionCount++ - dataSource.toDatabase() + eventsDatabase.toDatabase() } } } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt index 4bae0be2..c30a5960 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt @@ -2,6 +2,8 @@ package no.iktdev.mediaprocessing.processer.services import mu.KotlinLogging import no.iktdev.mediaprocessing.processer.Coordinator +import no.iktdev.mediaprocessing.processer.persistentReader +import no.iktdev.mediaprocessing.processer.persistentWriter import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import org.springframework.beans.factory.annotation.Autowired @@ -19,11 +21,11 @@ class ClaimsService() { @Scheduled(fixedDelay = (300_000)) fun validateClaims() { - val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents() + val expiredClaims = persistentReader.getExpiredClaimsProcessEvents() expiredClaims.forEach { log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } } - val store = PersistentDataStore() + val store = persistentWriter expiredClaims.forEach { val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) if (result) { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index ef2cf64c..80e35e0e 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -3,8 +3,7 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.processer.Coordinator -import no.iktdev.mediaprocessing.processer.TaskCreator +import no.iktdev.mediaprocessing.processer.* import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents @@ -14,7 +13,6 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessData import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status @@ -58,7 +56,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") } - val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) if (isAlreadyClaimed) { log.warn { "Process is already claimed!" } return null @@ -80,14 +78,14 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat logDir.mkdirs() } - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming - PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) return } runnerJob = scope.launch { @@ -107,13 +105,13 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat return } log.info { "Encode started for ${runner.referenceId}" } - PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) + persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendProgress(info, null, false) scope.launch { while (runnerJob?.isActive == true) { delay(java.time.Duration.ofMinutes(5).toMillis()) - PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId) + persistentWriter.updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId) } } } @@ -125,18 +123,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat return } log.info { "Encode completed for ${runner.referenceId}" } - val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) runBlocking { delay(1000) if (!consumedIsSuccessful) { - PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) } delay(1000) - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) while (!readbackIsSuccess) { delay(1000) - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) } producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 5cb3e19e..d796b845 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -3,8 +3,7 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.processer.Coordinator -import no.iktdev.mediaprocessing.processer.TaskCreator +import no.iktdev.mediaprocessing.processer.* import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents @@ -15,7 +14,6 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessData import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status @@ -61,7 +59,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") } - val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) if (isAlreadyClaimed) { log.warn { "Process is already claimed!" } return null @@ -84,7 +82,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea } - val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} extract" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) @@ -92,7 +90,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming - PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) return } runnerJob = scope.launch { @@ -112,7 +110,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea return } log.info { "Extract started for ${runner.referenceId}" } - PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) + persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) sendState(info, false) } @@ -123,12 +121,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea return } log.info { "Extract completed for ${runner.referenceId}" } - var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + var consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) runBlocking { delay(1000) limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) { - consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) + consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId) } log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" } @@ -136,9 +134,9 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea - var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) { - readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) + readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId) log.info { readbackIsSuccess } } log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index e831a2f0..15446170 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -1,5 +1,7 @@ package no.iktdev.mediaprocessing.shared.common +import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig +import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import java.io.File object SharedConfig { @@ -12,10 +14,31 @@ object SharedConfig { val preference: File = File("/data/config/preference.json") } -object DatabaseConfig { +object DatabaseEnvConfig { val address: String? = System.getenv("DATABASE_ADDRESS") val port: String? = System.getenv("DATABASE_PORT") val username: String? = System.getenv("DATABASE_USERNAME") val password: String? = System.getenv("DATABASE_PASSWORD") - val database: String? = System.getenv("DATABASE_NAME") + val eventBasedDatabase: String? = System.getenv("DATABASE_NAME_E") + val storedDatabase: String? = System.getenv("DATABASE_NAME_S") +} + +fun DatabaseEnvConfig.toStoredDatabase(): MySqlDataSource { + return MySqlDataSource(DatabaseConnectionConfig( + databaseName = this.storedDatabase ?: "streamit", + address = this.address ?: "localhost", + port = this.port, + username = this.username ?: "root", + password = this.password ?: "" + )) +} + +fun DatabaseEnvConfig.toEventsDatabase(): MySqlDataSource { + return MySqlDataSource(DatabaseConnectionConfig( + databaseName = this.eventBasedDatabase ?: "persistentEvents", + address = this.address ?: "localhost", + port = this.port, + username = this.username ?: "root", + password = this.password ?: "" + )) } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt index b737135b..24efc90f 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt @@ -7,7 +7,10 @@ import java.time.LocalDateTime import java.time.ZoneId import java.time.ZoneOffset -abstract class DataSource(val databaseName: String, val address: String, val port: String?, val username: String, val password: String) { +abstract class DataSource(val config: DatabaseConnectionConfig) { + open var database: Database? = null + + abstract fun connect() abstract fun createDatabase(): Database? @@ -18,11 +21,13 @@ abstract class DataSource(val databaseName: String, val address: String, val por abstract fun toConnectionUrl(): String fun toPortedAddress(): String { - return if (!address.contains(":") && port?.isBlank() != true) { - "$address:$port" - } else address + return if (!config.address.contains(":") && config.port?.isBlank() != true) { + "$config.address:$config.port" + } else config.address } + abstract fun toDatabase(): Database + } fun timestampToLocalDateTime(timestamp: Int): LocalDateTime { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DatabaseConnectionConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DatabaseConnectionConfig.kt new file mode 100644 index 00000000..183d431b --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DatabaseConnectionConfig.kt @@ -0,0 +1,9 @@ +package no.iktdev.mediaprocessing.shared.common.datasource + +data class DatabaseConnectionConfig( + val address: String, + val port: String?, + val username: String, + val password: String, + val databaseName: String +) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt index cb901db5..243d39a5 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt @@ -1,7 +1,6 @@ package no.iktdev.mediaprocessing.shared.common.datasource import mu.KotlinLogging -import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.Table @@ -9,30 +8,16 @@ import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction -open class MySqlDataSource(databaseName: String, address: String, port: String = "", username: String, password: String): DataSource(databaseName = databaseName, address = address, port = port, username = username, password = password) { +open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) { val log = KotlinLogging.logger {} - var database: Database? = null - private set - - companion object { - fun fromDatabaseEnv(): MySqlDataSource { - if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'") - if (DatabaseConfig.username.isNullOrBlank()) throw RuntimeException("Database username is not defined in 'DATABASE_USERNAME'") - if (DatabaseConfig.address.isNullOrBlank()) throw RuntimeException("Database address is not defined in 'DATABASE_ADDRESS'") - return MySqlDataSource( - databaseName = DatabaseConfig.database, - address = DatabaseConfig.address, - port = DatabaseConfig.port ?: "", - username = DatabaseConfig.username, - password = DatabaseConfig.password ?: "" - ) - } + override fun connect() { + this.toDatabase() } override fun createDatabase(): Database? { val ok = transaction(toDatabaseServerConnection()) { val tmc = TransactionManager.current().connection - val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '$databaseName'" + val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}'" val stmt = tmc.prepareStatement(query, true) val resultSet = stmt.executeQuery() @@ -41,14 +26,14 @@ open class MySqlDataSource(databaseName: String, address: String, port: String = if (!databaseExists) { try { exec(createDatabaseStatement()) - log.info { "Database $databaseName created." } + log.info { "Database ${config.databaseName} created." } true } catch (e: Exception) { e.printStackTrace() false } } else { - log.info { "Database $databaseName already exists." } + log.info { "Database ${config.databaseName} already exists." } true } } @@ -60,32 +45,33 @@ open class MySqlDataSource(databaseName: String, address: String, port: String = } override fun createTables(vararg tables: Table) { - transaction { + transaction(this.database) { SchemaUtils.createMissingTablesAndColumns(*tables) log.info { "Database transaction completed" } } } override fun createDatabaseStatement(): String { - return "CREATE DATABASE $databaseName" + return "CREATE DATABASE ${config.databaseName}" } protected fun toDatabaseServerConnection(): Database { database = Database.connect( toConnectionUrl(), - user = username, - password = password + user = config.username, + password = config.password ) return database!! } - fun toDatabase(): Database { - database = Database.connect( - "${toConnectionUrl()}/$databaseName", - user = username, - password = password + override fun toDatabase(): Database { + val database = Database.connect( + "${toConnectionUrl()}/${config.databaseName}", + user = config.username, + password = config.password ) - return database!! + this.database = database + return database } override fun toConnectionUrl(): String { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt index 4d3d71a5..cbd2c6be 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.common.datasource +import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.transactions.transaction @@ -9,9 +10,9 @@ open class TableDefaultOperations { } -fun withDirtyRead(block: () -> T): T? { +fun withDirtyRead(db: Database? = null, block: () -> T): T? { return try { - transaction(transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) { + transaction(db = db, transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) { try { block() } catch (e: Exception) { @@ -28,9 +29,9 @@ fun withDirtyRead(block: () -> T): T? { } -fun withTransaction(block: () -> T): T? { +fun withTransaction(db: Database? = null, block: () -> T): T? { return try { - transaction { + transaction(db) { try { block() } catch (e: Exception) { @@ -46,9 +47,9 @@ fun withTransaction(block: () -> T): T? { } } -fun insertWithSuccess(block: () -> T): Boolean { +fun insertWithSuccess(db: Database? = null, block: () -> T): Boolean { return try { - transaction { + transaction(db) { try { block() commit() @@ -65,9 +66,9 @@ fun insertWithSuccess(block: () -> T): Boolean { } } -fun executeOrException(rollbackOnFailure: Boolean = false, block: () -> T): Exception? { +fun executeOrException(db: Database? = null, rollbackOnFailure: Boolean = false, block: () -> T): Exception? { return try { - transaction { + transaction(db) { try { block() commit() @@ -86,9 +87,9 @@ fun executeOrException(rollbackOnFailure: Boolean = false, block: () -> T): } } -fun executeWithResult(block: () -> T): Pair { +fun executeWithResult(db: Database? = null, block: () -> T): Pair { return try { - transaction { + transaction(db) { try { val res = block() commit() @@ -105,9 +106,9 @@ fun executeWithResult(block: () -> T): Pair { } } -fun executeWithStatus(block: () -> T): Boolean { +fun executeWithStatus(db: Database? = null, block: () -> T): Boolean { return try { - transaction { + transaction(db) { try { block() commit() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index 5ce7ffdb..c9442016 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.common.persistance +import no.iktdev.mediaprocessing.shared.common.datasource.DataSource import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry @@ -7,11 +8,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import org.jetbrains.exposed.sql.* import java.time.LocalDateTime -class PersistentDataReader { +class PersistentDataReader(var dataSource: DataSource) { val dzz = DeserializingRegistry() fun getAllMessages(): List> { - val events = withTransaction { + val events = withTransaction(dataSource.database) { events.selectAll() .groupBy { it[events.referenceId] } } @@ -19,7 +20,7 @@ class PersistentDataReader { } fun getMessagesFor(referenceId: String): List { - return withTransaction { + return withTransaction(dataSource.database) { events.select { events.referenceId eq referenceId } .orderBy(events.created, SortOrder.ASC) .mapNotNull { fromRowToPersistentMessage(it, dzz) } @@ -27,7 +28,7 @@ class PersistentDataReader { } fun getUncompletedMessages(): List> { - val result = withDirtyRead { + val result = withDirtyRead(dataSource.database) { events.selectAll() .andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event } .groupBy { it[events.referenceId] } @@ -37,7 +38,7 @@ class PersistentDataReader { } fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { - val result = withDirtyRead { + val result = withDirtyRead(dataSource.database) { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) @@ -47,7 +48,7 @@ class PersistentDataReader { } fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withDirtyRead { + return withDirtyRead(dataSource.database) { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) and @@ -57,7 +58,7 @@ class PersistentDataReader { } fun getAvailableProcessEvents(): List { - return withDirtyRead { + return withDirtyRead(dataSource.database) { processerEvents.select { (processerEvents.claimed eq false) and (processerEvents.consumed eq false) @@ -67,7 +68,7 @@ class PersistentDataReader { fun getExpiredClaimsProcessEvents(): List { val deadline = LocalDateTime.now() - val entries = withTransaction { + val entries = withTransaction(dataSource.database) { processerEvents.select { (processerEvents.claimed eq true) and (processerEvents.consumed neq true) @@ -77,7 +78,7 @@ class PersistentDataReader { } fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { - val message = withDirtyRead { + val message = withDirtyRead(dataSource.database) { processerEvents.select { (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) @@ -87,7 +88,7 @@ class PersistentDataReader { } fun getProcessEvents(): List { - return withTransaction { + return withTransaction(dataSource.database) { processerEvents.selectAll() .mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } } ?: emptyList() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt index ec278777..364c9da5 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.shared.common.persistance import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.datasource.DataSource import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction @@ -13,9 +14,9 @@ import org.jetbrains.exposed.sql.update import java.sql.SQLIntegrityConstraintViolationException private val log = KotlinLogging.logger {} -open class PersistentDataStore { +open class PersistentDataStore(var dataSource: DataSource) { fun storeEventDataMessage(event: String, message: Message<*>): Boolean { - val exception = executeOrException { + val exception = executeOrException(dataSource.database) { events.insert { it[events.referenceId] = message.referenceId it[events.eventId] = message.eventId @@ -42,7 +43,7 @@ open class PersistentDataStore { } fun storeProcessDataMessage(event: String, message: Message<*>): Boolean { - val exception = executeOrException { + val exception = executeOrException(dataSource.database) { processerEvents.insert { it[processerEvents.referenceId] = message.referenceId it[processerEvents.eventId] = message.eventId @@ -62,7 +63,7 @@ open class PersistentDataStore { } fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withTransaction { + return withTransaction(dataSource.database) { processerEvents.update({ (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) and @@ -76,7 +77,7 @@ open class PersistentDataStore { } fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean { - return withTransaction { + return withTransaction(dataSource.database) { processerEvents.update({ (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) and @@ -89,7 +90,7 @@ open class PersistentDataStore { } fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { - return executeWithStatus { + return executeWithStatus(dataSource.database) { processerEvents.update({ (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) and @@ -102,7 +103,7 @@ open class PersistentDataStore { } fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean { - val exception = executeOrException { + val exception = executeOrException(dataSource.database) { processerEvents.update({ (processerEvents.referenceId eq referenceId) and (processerEvents.eventId eq eventId) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt index f96296e1..1f491212 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt @@ -6,7 +6,6 @@ import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBr import org.springframework.web.socket.config.annotation.StompEndpointRegistry import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer -@Configuration @EnableWebSocketMessageBroker open class SocketImplementation: WebSocketMessageBrokerConfigurer { diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt index 4dafcf7d..6c8aed58 100644 --- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt @@ -2,5 +2,5 @@ package no.iktdev.mediaprocessing.shared.contract.dto data class ConvertRequest( val file: String, // FullPath - val formats: List -) \ No newline at end of file + override val source: String +): Requester() \ No newline at end of file diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt index 562f1d73..fc2aaed5 100644 --- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt @@ -15,5 +15,6 @@ enum class ProcessStartOperationEvents { } enum class RequestStartOperationEvents { - CONVERT + CONVERT, + EXTRACT, } \ No newline at end of file diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/RequestWorkProceed.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/RequestWorkProceed.kt new file mode 100644 index 00000000..554ac47f --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/RequestWorkProceed.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +data class RequestWorkProceed( + val referenceId: String, + override val source: String +): Requester() diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Requester.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Requester.kt new file mode 100644 index 00000000..9ad41197 --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Requester.kt @@ -0,0 +1,5 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +abstract class Requester { + abstract val source: String +} \ No newline at end of file