multi database

This commit is contained in:
Brage 2024-03-28 04:05:02 +01:00
parent 0c2a6f3c1c
commit ce4019a1e6
27 changed files with 316 additions and 291 deletions

View File

@ -50,7 +50,10 @@ jobs:
echo "Shared"
echo "shared: ${{ needs.pre-check.outputs.shared }}"
echo "\n"
echo "${{ needs.pre-check.outputs }}"
echo "${{ needs.pre-check }}"
build-shared:
runs-on: ubuntu-latest
needs: pre-check

View File

@ -18,11 +18,11 @@ class ClaimsService() {
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents()
val expiredClaims = persistentReader.getExpiredClaimsProcessEvents()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = PersistentDataStore()
val store = persistentWriter
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {

View File

@ -1,9 +1,11 @@
package no.iktdev.mediaprocessing.converter
import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext
@ -16,14 +18,24 @@ private var context: ApplicationContext? = null
fun getContext(): ApplicationContext? {
return context
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
fun main(args: Array<String>) {
val dataSource = MySqlDataSource.fromDatabaseEnv()
Coroutines.default().launch {
dataSource.createDatabase()
dataSource.createTables(
processerEvents
)
}
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
context = runApplication<ConvertApplication>(*args)
}
//private val logger = KotlinLogging.logger {}

View File

@ -5,11 +5,7 @@ import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
@ -45,9 +41,9 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" }
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}!" }
} else {
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
@ -59,7 +55,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
}
fun readAllInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
val messages = persistentReader.getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
@ -69,7 +65,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
val messages = persistentReader.getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}

View File

@ -6,6 +6,8 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.converter.TaskCreator
import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.converter.persistentReader
import no.iktdev.mediaprocessing.converter.persistentWriter
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
@ -41,7 +43,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
fun getRequiredExtractProcessForContinuation(referenceId: String, requiresEventId: String): PersistentProcessDataMessage? {
return PersistentDataReader().getProcessEvent(referenceId, requiresEventId)
return persistentReader.getProcessEvent(referenceId, requiresEventId)
}
fun canConvert(extract: PersistentProcessDataMessage?): Boolean {
return extract?.consumed == true && extract.data.isSuccess()
@ -70,13 +72,13 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (!setClaim) {
return null
}
@ -94,18 +96,18 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
}
}
return result

View File

@ -7,11 +7,11 @@ import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -30,9 +30,9 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value)
val success = persistentWriter.storeEventDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" }
} else {
io.launch {
delay(500) // Give the database a few sec to update
@ -52,17 +52,6 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
return
}
listeners.forwardEventMessageToListeners(triggered, messages)
/*if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) {
if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents(
producer = producer,
messages = messages
)
} else {
log.info { "Process for $referenceId was started manually and will require user input for continuation" }
}
}*/
}
private val log = KotlinLogging.logger {}
@ -72,12 +61,22 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
//private val forwarder = Forwarder()
public fun startProcess(file: File, type: ProcessType) {
val operations: List<ProcessStartOperationEvents> = listOf(
ProcessStartOperationEvents.ENCODE,
ProcessStartOperationEvents.EXTRACT,
ProcessStartOperationEvents.CONVERT
)
startProcess(file, type, operations)
}
fun startProcess(file: File, type: ProcessType, operations: List<ProcessStartOperationEvents>) {
val processStartEvent = MediaProcessStarted(
status = Status.COMPLETED,
file = file.absolutePath,
type = type
)
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, processStartEvent)
}
public fun startRequestProcess(file: File, operations: List<RequestStartOperationEvents>): UUID {
@ -91,8 +90,13 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
return referenceId
}
fun permitWorkToProceedOn(referenceId: String, message: String) {
producer.sendMessage(referenceId = referenceId, KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED, SimpleMessageData(Status.COMPLETED, message))
}
fun readAllUncompletedMessagesInQueue() {
val messages = PersistentDataReader().getUncompletedMessages()
val messages = persistentReader.getUncompletedMessages()
io.launch {
messages.forEach {
delay(1000)
@ -101,30 +105,22 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} catch (e: Exception) {
e.printStackTrace()
}
/*if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(it)) {
if (getProcessStarted(it)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents(
producer = producer,
messages = it
)
}
}*/
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getMessagesFor(referenceId)
val messages = persistentReader.getMessagesFor(referenceId)
if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) {
log.warn { "EventId ($eventId) for ReferenceId ($referenceId) has not been made available in the database yet." }
io.launch {
val fixedDelay = 1000L
delay(fixedDelay)
var delayed = 0L
var msc = PersistentDataReader().getMessagesFor(referenceId)
var msc = persistentReader.getMessagesFor(referenceId)
while (msc.find { it.eventId == eventId } != null || delayed < 1000 * 60) {
delayed += fixedDelay
msc = PersistentDataReader().getMessagesFor(referenceId)
msc = persistentReader.getMessagesFor(referenceId)
}
operationToRunOnMessages(referenceId, eventId, msc)
}
@ -154,115 +150,6 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
// TODO: Build and insert into database
}
}
/*class Forwarder() {
val forwardOnEventReceived = listOf(
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
)
fun hasAnyRequiredEventToCreateProcesserEvents(messages: List<PersistentMessage>): Boolean {
return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }
.isNotEmpty()
}
fun isMissingEncodeWorkCreated(messages: List<PersistentMessage>): PersistentMessage? {
val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED }
return if (existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }) {
messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED }
} else null
}
fun isMissingExtractWorkCreated(messages: List<PersistentMessage>): PersistentMessage? {
val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
return if (existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }) {
messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }
} else null
}
fun produceAllMissingProcesserEvents(
producer: CoordinatorProducer,
messages: List<PersistentMessage>
) {
val missingEncode = isMissingEncodeWorkCreated(messages)
val missingExtract = isMissingExtractWorkCreated(messages)
if (missingEncode != null && missingEncode.data.isSuccess()) {
produceEncodeWork(producer, missingEncode)
}
if (missingExtract != null && missingExtract.data.isSuccess()) {
produceExtractWork(producer, missingExtract)
}
}
fun produceEncodeWork(producer: CoordinatorProducer, message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
status = Status.COMPLETED,
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(
message.referenceId,
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
eventId = message.eventId,
createdRequest
)
}
}
}
fun produceExtractWork(producer: CoordinatorProducer, message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
status = Status.COMPLETED,
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(
message.referenceId,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
eventId = message.eventId,
createdRequest
)
}
val outFile = File(it.outputFile)
ConvertWorkerRequest(
status = Status.COMPLETED,
requiresEventId = message.eventId,
inputFile = it.outputFile,
true,
outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath
).let { createdRequest ->
producer.sendMessage(
message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED,
createdRequest
)
}
}
}
}*/
}

View File

@ -4,10 +4,14 @@ package no.iktdev.mediaprocessing.coordinator
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import no.iktdev.mediaprocessing.shared.common.toStoredDatabase
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.streamit.library.db.tables.*
import no.iktdev.streamit.library.db.tables.helper.cast_errors
@ -26,26 +30,43 @@ class CoordinatorApplication {
}
private var context: ApplicationContext? = null
private lateinit var storeDatabase: MySqlDataSource
@Suppress("unused")
fun getContext(): ApplicationContext? {
return context
}
fun getStoreDatabase(): MySqlDataSource {
return storeDatabase
}
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
fun main(args: Array<String>) {
Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
val dataSource = MySqlDataSource.fromDatabaseEnv();
dataSource.createDatabase()
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
storeDatabase = DatabaseEnvConfig.toStoredDatabase()
eventsDatabase.createDatabase()
storeDatabase.createDatabase()
val kafkaTables = listOf(
events, // For kafka
)
dataSource.createTables(*kafkaTables.toTypedArray())
eventsDatabase.createTables(*kafkaTables.toTypedArray())
val tables = arrayOf(
catalog,
@ -60,9 +81,10 @@ fun main(args: Array<String>) {
data_video,
cast_errors
)
transaction {
SchemaUtils.createMissingTablesAndColumns(*tables)
}
storeDatabase.createTables(*tables)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
context = runApplication<CoordinatorApplication>(*args)
printSharedConfig()
@ -75,7 +97,7 @@ fun printSharedConfig() {
log.info { "Ffprobe: ${SharedConfig.ffprobe}" }
log.info { "Ffmpeg: ${SharedConfig.ffmpeg}" }
log.info { "Database: ${DatabaseConfig.database} @ ${DatabaseConfig.address}:${DatabaseConfig.port}" }
/*log.info { "Database: ${DatabaseConfig.database} @ ${DatabaseConfig.address}:${DatabaseConfig.port}" }
log.info { "Username: ${DatabaseConfig.username}" }
log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }
log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }*/
}

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.persistentReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
@Controller
@RequestMapping(path = ["/action"])
class ActionEventController(@Autowired var coordinator: Coordinator) {
@RequestMapping("/flow/proceed")
fun permitRunOnSequence(@RequestBody data: RequestWorkProceed): ResponseEntity<String> {
val set = persistentReader.getMessagesFor(data.referenceId)
if (set.isEmpty()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(Gson().toJson(data))
}
coordinator.permitWorkToProceedOn(data.referenceId, "Requested by ${data.source}")
//EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted")
return ResponseEntity.ok(null)
}
}

View File

@ -2,7 +2,9 @@ package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
@ -33,4 +35,36 @@ class RequestEventController(@Autowired var coordinator: Coordinator) {
}
return ResponseEntity.ok(null)
}
@PostMapping("/extract")
@ResponseStatus(HttpStatus.OK)
fun requestExtract(@RequestBody selectedFile: String): ResponseEntity<String> {
try {
val file = File(selectedFile)
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(selectedFile)
}
coordinator.startProcess(file, ProcessType.MANUAL, listOf(ProcessStartOperationEvents.EXTRACT))
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(selectedFile)
}
return ResponseEntity.ok(null)
}
@PostMapping("/all")
@ResponseStatus(HttpStatus.OK)
fun requestAll(@RequestBody selectedFile: String): ResponseEntity<String> {
try {
val file = File(selectedFile)
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(selectedFile)
}
coordinator.startProcess(file, type = ProcessType.MANUAL)
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(selectedFile)
}
return ResponseEntity.ok(null)
}
}

View File

@ -37,7 +37,6 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
}
val receivedEvents = events.map { it.event }
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val requiresOneOf = listOf(
@ -47,9 +46,9 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
)
if (requiresOneOf.none { it in receivedEvents }) {
val missing = requiresOneOf.filter { !receivedEvents.contains(it) }
val missing = requiresOneOf.subtract(receivedEvents.toSet())
log.info { "Can't complete at this moment. Missing required event(s)\n\t" + missing.joinToString("\n\t") }
return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event")
return null
}

View File

@ -6,22 +6,13 @@ import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service
@EnableScheduling
@ -52,9 +43,9 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
return
}
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}" }
} else {
io.launch {
delay(500)
@ -67,7 +58,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
fun readAllAvailableInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
val messages = persistentReader.getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
@ -77,7 +68,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
val messages = persistentReader.getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}

View File

@ -1,11 +1,13 @@
package no.iktdev.mediaprocessing.processer
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@ -17,13 +19,23 @@ private val logger = KotlinLogging.logger {}
@SpringBootApplication
class ProcesserApplication {
}
lateinit var dataSource: MySqlDataSource
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
fun main(args: Array<String>) {
dataSource = MySqlDataSource.fromDatabaseEnv()
dataSource.createDatabase()
dataSource.createTables(
processerEvents
)
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
val context = runApplication<ProcesserApplication>(*args)
}
@ -37,7 +49,7 @@ class DatabaseReconnect() {
fun checkIfConnected() {
if (TransactionManager.currentOrNull() == null) {
lostConnectionCount++
dataSource.toDatabase()
eventsDatabase.toDatabase()
}
}
}

View File

@ -2,6 +2,8 @@ package no.iktdev.mediaprocessing.processer.services
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.persistentReader
import no.iktdev.mediaprocessing.processer.persistentWriter
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import org.springframework.beans.factory.annotation.Autowired
@ -19,11 +21,11 @@ class ClaimsService() {
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents()
val expiredClaims = persistentReader.getExpiredClaimsProcessEvents()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = PersistentDataStore()
val store = persistentWriter
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {

View File

@ -3,8 +3,7 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
@ -14,7 +13,6 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessData
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@ -58,7 +56,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
@ -80,14 +78,14 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
logDir.mkdirs()
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
return
}
runnerJob = scope.launch {
@ -107,13 +105,13 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
return
}
log.info { "Encode started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(info, null, false)
scope.launch {
while (runnerJob?.isActive == true) {
delay(java.time.Duration.ofMinutes(5).toMillis())
PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
persistentWriter.updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
}
}
}
@ -125,18 +123,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
return
}
log.info { "Encode completed for ${runner.referenceId}" }
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
}
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)

View File

@ -3,8 +3,7 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
@ -15,7 +14,6 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessData
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@ -61,7 +59,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
@ -84,7 +82,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
@ -92,7 +90,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
return
}
runnerJob = scope.launch {
@ -112,7 +110,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
return
}
log.info { "Extract started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendState(info, false)
}
@ -123,12 +121,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
return
}
log.info { "Extract completed for ${runner.referenceId}" }
var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
var consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
runBlocking {
delay(1000)
limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) {
consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
}
log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" }
@ -136,9 +134,9 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) {
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
log.info { readbackIsSuccess }
}
log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" }

View File

@ -1,5 +1,7 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import java.io.File
object SharedConfig {
@ -12,10 +14,31 @@ object SharedConfig {
val preference: File = File("/data/config/preference.json")
}
object DatabaseConfig {
object DatabaseEnvConfig {
val address: String? = System.getenv("DATABASE_ADDRESS")
val port: String? = System.getenv("DATABASE_PORT")
val username: String? = System.getenv("DATABASE_USERNAME")
val password: String? = System.getenv("DATABASE_PASSWORD")
val database: String? = System.getenv("DATABASE_NAME")
val eventBasedDatabase: String? = System.getenv("DATABASE_NAME_E")
val storedDatabase: String? = System.getenv("DATABASE_NAME_S")
}
fun DatabaseEnvConfig.toStoredDatabase(): MySqlDataSource {
return MySqlDataSource(DatabaseConnectionConfig(
databaseName = this.storedDatabase ?: "streamit",
address = this.address ?: "localhost",
port = this.port,
username = this.username ?: "root",
password = this.password ?: ""
))
}
fun DatabaseEnvConfig.toEventsDatabase(): MySqlDataSource {
return MySqlDataSource(DatabaseConnectionConfig(
databaseName = this.eventBasedDatabase ?: "persistentEvents",
address = this.address ?: "localhost",
port = this.port,
username = this.username ?: "root",
password = this.password ?: ""
))
}

View File

@ -7,7 +7,10 @@ import java.time.LocalDateTime
import java.time.ZoneId
import java.time.ZoneOffset
abstract class DataSource(val databaseName: String, val address: String, val port: String?, val username: String, val password: String) {
abstract class DataSource(val config: DatabaseConnectionConfig) {
open var database: Database? = null
abstract fun connect()
abstract fun createDatabase(): Database?
@ -18,11 +21,13 @@ abstract class DataSource(val databaseName: String, val address: String, val por
abstract fun toConnectionUrl(): String
fun toPortedAddress(): String {
return if (!address.contains(":") && port?.isBlank() != true) {
"$address:$port"
} else address
return if (!config.address.contains(":") && config.port?.isBlank() != true) {
"$config.address:$config.port"
} else config.address
}
abstract fun toDatabase(): Database
}
fun timestampToLocalDateTime(timestamp: Int): LocalDateTime {

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.shared.common.datasource
data class DatabaseConnectionConfig(
val address: String,
val port: String?,
val username: String,
val password: String,
val databaseName: String
)

View File

@ -1,7 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.datasource
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.Table
@ -9,30 +8,16 @@ import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
open class MySqlDataSource(databaseName: String, address: String, port: String = "", username: String, password: String): DataSource(databaseName = databaseName, address = address, port = port, username = username, password = password) {
open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
val log = KotlinLogging.logger {}
var database: Database? = null
private set
companion object {
fun fromDatabaseEnv(): MySqlDataSource {
if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'")
if (DatabaseConfig.username.isNullOrBlank()) throw RuntimeException("Database username is not defined in 'DATABASE_USERNAME'")
if (DatabaseConfig.address.isNullOrBlank()) throw RuntimeException("Database address is not defined in 'DATABASE_ADDRESS'")
return MySqlDataSource(
databaseName = DatabaseConfig.database,
address = DatabaseConfig.address,
port = DatabaseConfig.port ?: "",
username = DatabaseConfig.username,
password = DatabaseConfig.password ?: ""
)
}
override fun connect() {
this.toDatabase()
}
override fun createDatabase(): Database? {
val ok = transaction(toDatabaseServerConnection()) {
val tmc = TransactionManager.current().connection
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '$databaseName'"
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}'"
val stmt = tmc.prepareStatement(query, true)
val resultSet = stmt.executeQuery()
@ -41,14 +26,14 @@ open class MySqlDataSource(databaseName: String, address: String, port: String =
if (!databaseExists) {
try {
exec(createDatabaseStatement())
log.info { "Database $databaseName created." }
log.info { "Database ${config.databaseName} created." }
true
} catch (e: Exception) {
e.printStackTrace()
false
}
} else {
log.info { "Database $databaseName already exists." }
log.info { "Database ${config.databaseName} already exists." }
true
}
}
@ -60,32 +45,33 @@ open class MySqlDataSource(databaseName: String, address: String, port: String =
}
override fun createTables(vararg tables: Table) {
transaction {
transaction(this.database) {
SchemaUtils.createMissingTablesAndColumns(*tables)
log.info { "Database transaction completed" }
}
}
override fun createDatabaseStatement(): String {
return "CREATE DATABASE $databaseName"
return "CREATE DATABASE ${config.databaseName}"
}
protected fun toDatabaseServerConnection(): Database {
database = Database.connect(
toConnectionUrl(),
user = username,
password = password
user = config.username,
password = config.password
)
return database!!
}
fun toDatabase(): Database {
database = Database.connect(
"${toConnectionUrl()}/$databaseName",
user = username,
password = password
override fun toDatabase(): Database {
val database = Database.connect(
"${toConnectionUrl()}/${config.databaseName}",
user = config.username,
password = config.password
)
return database!!
this.database = database
return database
}
override fun toConnectionUrl(): String {

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.datasource
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.transactions.transaction
@ -9,9 +10,9 @@ open class TableDefaultOperations<T : Table> {
}
fun <T> withDirtyRead(block: () -> T): T? {
fun <T> withDirtyRead(db: Database? = null, block: () -> T): T? {
return try {
transaction(transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) {
transaction(db = db, transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) {
try {
block()
} catch (e: Exception) {
@ -28,9 +29,9 @@ fun <T> withDirtyRead(block: () -> T): T? {
}
fun <T> withTransaction(block: () -> T): T? {
fun <T> withTransaction(db: Database? = null, block: () -> T): T? {
return try {
transaction {
transaction(db) {
try {
block()
} catch (e: Exception) {
@ -46,9 +47,9 @@ fun <T> withTransaction(block: () -> T): T? {
}
}
fun <T> insertWithSuccess(block: () -> T): Boolean {
fun <T> insertWithSuccess(db: Database? = null, block: () -> T): Boolean {
return try {
transaction {
transaction(db) {
try {
block()
commit()
@ -65,9 +66,9 @@ fun <T> insertWithSuccess(block: () -> T): Boolean {
}
}
fun <T> executeOrException(rollbackOnFailure: Boolean = false, block: () -> T): Exception? {
fun <T> executeOrException(db: Database? = null, rollbackOnFailure: Boolean = false, block: () -> T): Exception? {
return try {
transaction {
transaction(db) {
try {
block()
commit()
@ -86,9 +87,9 @@ fun <T> executeOrException(rollbackOnFailure: Boolean = false, block: () -> T):
}
}
fun <T> executeWithResult(block: () -> T): Pair<T?, Exception?> {
fun <T> executeWithResult(db: Database? = null, block: () -> T): Pair<T?, Exception?> {
return try {
transaction {
transaction(db) {
try {
val res = block()
commit()
@ -105,9 +106,9 @@ fun <T> executeWithResult(block: () -> T): Pair<T?, Exception?> {
}
}
fun <T> executeWithStatus(block: () -> T): Boolean {
fun <T> executeWithStatus(db: Database? = null, block: () -> T): Boolean {
return try {
transaction {
transaction(db) {
try {
block()
commit()

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
@ -7,11 +8,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.jetbrains.exposed.sql.*
import java.time.LocalDateTime
class PersistentDataReader {
class PersistentDataReader(var dataSource: DataSource) {
val dzz = DeserializingRegistry()
fun getAllMessages(): List<List<PersistentMessage>> {
val events = withTransaction {
val events = withTransaction(dataSource.database) {
events.selectAll()
.groupBy { it[events.referenceId] }
}
@ -19,7 +20,7 @@ class PersistentDataReader {
}
fun getMessagesFor(referenceId: String): List<PersistentMessage> {
return withTransaction {
return withTransaction(dataSource.database) {
events.select { events.referenceId eq referenceId }
.orderBy(events.created, SortOrder.ASC)
.mapNotNull { fromRowToPersistentMessage(it, dzz) }
@ -27,7 +28,7 @@ class PersistentDataReader {
}
fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withDirtyRead {
val result = withDirtyRead(dataSource.database) {
events.selectAll()
.andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event }
.groupBy { it[events.referenceId] }
@ -37,7 +38,7 @@ class PersistentDataReader {
}
fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean {
val result = withDirtyRead {
val result = withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
@ -47,7 +48,7 @@ class PersistentDataReader {
}
fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withDirtyRead {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
@ -57,7 +58,7 @@ class PersistentDataReader {
}
fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> {
return withDirtyRead {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.claimed eq false) and
(processerEvents.consumed eq false)
@ -67,7 +68,7 @@ class PersistentDataReader {
fun getExpiredClaimsProcessEvents(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
val entries = withTransaction {
val entries = withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.claimed eq true) and
(processerEvents.consumed neq true)
@ -77,7 +78,7 @@ class PersistentDataReader {
}
fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? {
val message = withDirtyRead {
val message = withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
@ -87,7 +88,7 @@ class PersistentDataReader {
}
fun getProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction {
return withTransaction(dataSource.database) {
processerEvents.selectAll()
.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
@ -13,9 +14,9 @@ import org.jetbrains.exposed.sql.update
import java.sql.SQLIntegrityConstraintViolationException
private val log = KotlinLogging.logger {}
open class PersistentDataStore {
open class PersistentDataStore(var dataSource: DataSource) {
fun storeEventDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException {
val exception = executeOrException(dataSource.database) {
events.insert {
it[events.referenceId] = message.referenceId
it[events.eventId] = message.eventId
@ -42,7 +43,7 @@ open class PersistentDataStore {
}
fun storeProcessDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException {
val exception = executeOrException(dataSource.database) {
processerEvents.insert {
it[processerEvents.referenceId] = message.referenceId
it[processerEvents.eventId] = message.eventId
@ -62,7 +63,7 @@ open class PersistentDataStore {
}
fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction {
return withTransaction(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
@ -76,7 +77,7 @@ open class PersistentDataStore {
}
fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction {
return withTransaction(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
@ -89,7 +90,7 @@ open class PersistentDataStore {
}
fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return executeWithStatus {
return executeWithStatus(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
@ -102,7 +103,7 @@ open class PersistentDataStore {
}
fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean {
val exception = executeOrException {
val exception = executeOrException(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)

View File

@ -6,7 +6,6 @@ import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBr
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration
@EnableWebSocketMessageBroker
open class SocketImplementation: WebSocketMessageBrokerConfigurer {

View File

@ -2,5 +2,5 @@ package no.iktdev.mediaprocessing.shared.contract.dto
data class ConvertRequest(
val file: String, // FullPath
val formats: List<SubtitleFormats>
)
override val source: String
): Requester()

View File

@ -15,5 +15,6 @@ enum class ProcessStartOperationEvents {
}
enum class RequestStartOperationEvents {
CONVERT
CONVERT,
EXTRACT,
}

View File

@ -0,0 +1,6 @@
package no.iktdev.mediaprocessing.shared.contract.dto
data class RequestWorkProceed(
val referenceId: String,
override val source: String
): Requester()

View File

@ -0,0 +1,5 @@
package no.iktdev.mediaprocessing.shared.contract.dto
abstract class Requester {
abstract val source: String
}