v3 21
This commit is contained in:
parent
91ae79e388
commit
278d7a4e6e
@ -5,13 +5,12 @@ import no.iktdev.exfl.coroutines.CoroutinesDefault
|
||||
import no.iktdev.exfl.coroutines.CoroutinesIO
|
||||
import no.iktdev.exfl.observable.Observables
|
||||
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import no.iktdev.mediaprocessing.shared.common.getAppVersion
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.*
|
||||
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
import org.springframework.context.ApplicationContext
|
||||
|
||||
@SpringBootApplication
|
||||
class ConvertApplication
|
||||
|
||||
@ -6,7 +6,7 @@ import no.iktdev.exfl.coroutines.CoroutinesDefault
|
||||
import no.iktdev.exfl.coroutines.CoroutinesIO
|
||||
import no.iktdev.exfl.observable.Observables
|
||||
import no.iktdev.mediaprocessing.shared.common.*
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.*
|
||||
import no.iktdev.streamit.library.db.tables.*
|
||||
import no.iktdev.streamit.library.db.tables.helper.cast_errors
|
||||
|
||||
@ -5,7 +5,9 @@ import no.iktdev.eventi.data.derivedFromEventId
|
||||
import no.iktdev.eventi.data.eventId
|
||||
import no.iktdev.eventi.data.referenceId
|
||||
import no.iktdev.eventi.data.toJson
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.*
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
import no.iktdev.eventi.database.isCausedByDuplicateError
|
||||
import no.iktdev.eventi.database.isExposedSqlException
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.*
|
||||
import no.iktdev.mediaprocessing.shared.contract.Events
|
||||
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
|
||||
@ -19,7 +21,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
|
||||
override fun storeEvent(event: Event): Boolean {
|
||||
|
||||
withTransaction(dataSource.database) {
|
||||
no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
allEvents.insert {
|
||||
it[referenceId] = event.referenceId()
|
||||
it[eventId] = event.eventId()
|
||||
@ -40,7 +42,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
}
|
||||
|
||||
|
||||
val exception = executeOrException(dataSource.database) {
|
||||
val exception = no.iktdev.eventi.database.executeOrException(dataSource.database) {
|
||||
events.insert {
|
||||
it[referenceId] = event.referenceId()
|
||||
it[eventId] = event.eventId()
|
||||
@ -88,7 +90,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
|
||||
|
||||
override fun readAvailableEvents(): List<List<Event>> {
|
||||
return withTransaction (dataSource.database) {
|
||||
return no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
events.selectAll()
|
||||
.groupBy { it[events.referenceId] }
|
||||
.mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }
|
||||
@ -96,7 +98,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
}
|
||||
|
||||
override fun readAvailableEventsFor(referenceId: String): List<Event> {
|
||||
val events = withTransaction(dataSource.database) {
|
||||
val events = no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
events.select { events.referenceId eq referenceId }
|
||||
.mapNotNull { it.toEvent() }
|
||||
} ?: emptyList()
|
||||
@ -104,7 +106,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
}
|
||||
|
||||
override fun getAllEvents(): List<List<Event>> {
|
||||
val events = withTransaction(dataSource.database) {
|
||||
val events = no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
events.selectAll()
|
||||
.groupBy { it[events.referenceId] }
|
||||
.mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }
|
||||
@ -114,7 +116,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
|
||||
|
||||
override fun getEventsWith(referenceId: String): List<Event> {
|
||||
return withTransaction(dataSource.database) {
|
||||
return no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
events.select {
|
||||
(events.referenceId eq referenceId)
|
||||
}
|
||||
@ -151,12 +153,12 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
* Deletes the events
|
||||
*/
|
||||
private fun deleteSupersededEvents(superseded: List<Event>) {
|
||||
withTransaction(dataSource) {
|
||||
no.iktdev.eventi.database.withTransaction(dataSource) {
|
||||
superseded.forEach { duplicate ->
|
||||
events.deleteWhere {
|
||||
(events.referenceId eq duplicate.referenceId()) and
|
||||
(events.eventId eq duplicate.eventId()) and
|
||||
(events.event eq duplicate.eventType.event)
|
||||
(referenceId eq duplicate.referenceId()) and
|
||||
(eventId eq duplicate.eventId()) and
|
||||
(event eq duplicate.eventType.event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,22 +1,5 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
|
||||
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
|
||||
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
|
||||
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
|
||||
import no.iktdev.streamit.library.db.query.*
|
||||
import no.iktdev.streamit.library.db.tables.titles
|
||||
import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.jetbrains.exposed.sql.insertIgnore
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
import java.sql.SQLIntegrityConstraintViolationException
|
||||
/*
|
||||
@Service
|
||||
class CollectAndStoreTask() {
|
||||
|
||||
@ -6,9 +6,9 @@ import no.iktdev.eventi.data.*
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
|
||||
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
||||
import no.iktdev.eventi.database.executeOrException
|
||||
import no.iktdev.eventi.database.executeWithStatus
|
||||
import no.iktdev.eventi.database.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
|
||||
import no.iktdev.mediaprocessing.shared.contract.Events
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.*
|
||||
@ -31,6 +31,16 @@ import java.sql.SQLIntegrityConstraintViolationException
|
||||
class CompletedTaskListener: CoordinatorEventListener() {
|
||||
val log = KotlinLogging.logger {}
|
||||
|
||||
var doNotProduceComplete = System.getenv("DISABLE_COMPLETE").toBoolean() ?: false
|
||||
|
||||
|
||||
override fun onReady() {
|
||||
super.onReady()
|
||||
if (doNotProduceComplete) {
|
||||
log.warn { "DoNotProduceComplete is set!\n\tNo complete event will be triggered!\n\tTo enable production of complete vents, remove this line in your environment: \"DISABLE_COMPLETE\"" }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Autowired
|
||||
override var coordinator: Coordinator? = null
|
||||
@ -121,6 +131,26 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if metadata has cover, if so, 2 events are expected
|
||||
*/
|
||||
fun req4(events: List<Event>): Boolean {
|
||||
val metadata = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }
|
||||
if (metadata?.isSuccessful() != true) {
|
||||
return true
|
||||
}
|
||||
|
||||
val hasCover = metadata.dataAs<pyMetadata>()?.cover != null
|
||||
if (hasCover == false) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (events.any { it.eventType == Events.EventMediaReadOutCover } && events.any { it.eventType == Events.EventWorkDownloadCoverPerformed }) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
override fun isPrerequisitesFulfilled(incomingEvent: Event, events: List<Event>): Boolean {
|
||||
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>()
|
||||
@ -145,6 +175,13 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
//log.info { "${this::class.java.simpleName} Failed Req3" }
|
||||
return false
|
||||
}
|
||||
|
||||
if (!req4(events)) {
|
||||
log.info { "${this::class.java.simpleName} Failed Req4" }
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
return super.isPrerequisitesFulfilled(incomingEvent, events)
|
||||
}
|
||||
|
||||
@ -258,18 +295,18 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
try {
|
||||
withTransaction(getStoreDatabase()) {
|
||||
titles.insertIgnore {
|
||||
it[titles.masterTitle] = metadata.collection
|
||||
it[titles.title] = NameHelper.normalize(usedTitle)
|
||||
it[titles.type] = 1
|
||||
it[masterTitle] = metadata.collection
|
||||
it[title] = NameHelper.normalize(usedTitle)
|
||||
it[type] = 1
|
||||
}
|
||||
titles.insertIgnore {
|
||||
it[titles.masterTitle] = usedTitle
|
||||
it[titles.title] = NameHelper.normalize(usedTitle)
|
||||
it[titles.type] = 2
|
||||
it[masterTitle] = usedTitle
|
||||
it[title] = NameHelper.normalize(usedTitle)
|
||||
it[type] = 2
|
||||
}
|
||||
metadata.titles.forEach { title ->
|
||||
titles.insertIgnore {
|
||||
it[titles.masterTitle] = usedTitle
|
||||
it[masterTitle] = usedTitle
|
||||
it[titles.title] = title
|
||||
}
|
||||
}
|
||||
@ -336,6 +373,9 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
}
|
||||
|
||||
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
|
||||
if (doNotProduceComplete) {
|
||||
return false
|
||||
}
|
||||
val result = super.shouldIProcessAndHandleEvent(incomingEvent, events)
|
||||
return result
|
||||
}
|
||||
@ -355,6 +395,7 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
} else null
|
||||
|
||||
|
||||
|
||||
storeSubtitles(subtitles)
|
||||
metadata?.let {
|
||||
storeTitles(metadata = metadata, usedTitle = metadata.title)
|
||||
@ -372,4 +413,5 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
}
|
||||
@ -4,14 +4,11 @@ import mu.KotlinLogging
|
||||
import no.iktdev.eventi.core.ConsumableEvent
|
||||
import no.iktdev.eventi.core.WGson
|
||||
import no.iktdev.eventi.data.EventStatus
|
||||
import no.iktdev.eventi.implementations.EventCoordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
|
||||
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
|
||||
import no.iktdev.mediaprocessing.shared.common.parsing.Regexes
|
||||
import no.iktdev.mediaprocessing.shared.contract.Events
|
||||
import no.iktdev.mediaprocessing.shared.contract.EventsListenerContract
|
||||
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.*
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
@ -35,6 +32,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
|
||||
events.any { it.eventType == Events.EventMediaReadOutNameAndType }
|
||||
}
|
||||
|
||||
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
|
||||
return super.shouldIProcessAndHandleEvent(incomingEvent, events) && incomingEvent.eventType in listensForEvents
|
||||
|
||||
}
|
||||
|
||||
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
|
||||
val event = incomingEvent.consume()
|
||||
if (event == null) {
|
||||
|
||||
@ -7,7 +7,7 @@ import no.iktdev.eventi.data.EventMetadata
|
||||
import no.iktdev.eventi.data.EventStatus
|
||||
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
|
||||
import no.iktdev.eventi.database.toEpochSeconds
|
||||
import no.iktdev.mediaprocessing.shared.contract.Events
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.BaseInfoEvent
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.Event
|
||||
|
||||
@ -5,7 +5,7 @@ import no.iktdev.exfl.coroutines.CoroutinesDefault
|
||||
import no.iktdev.exfl.coroutines.CoroutinesIO
|
||||
import no.iktdev.exfl.observable.Observables
|
||||
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import no.iktdev.mediaprocessing.shared.common.getAppVersion
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.*
|
||||
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
|
||||
|
||||
@ -2,17 +2,10 @@ package no.iktdev.mediaprocessing.processer
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.*
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.RunnerManager
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.toTask
|
||||
import no.iktdev.mediaprocessing.shared.common.task.TaskType
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.Event
|
||||
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
|
||||
import org.jetbrains.exposed.sql.select
|
||||
import org.springframework.beans.factory.annotation.Value
|
||||
import org.springframework.scheduling.annotation.EnableScheduling
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
|
||||
@ -91,7 +91,7 @@ class EventsPullerThread(threading.Thread):
|
||||
if connection:
|
||||
connection.close()
|
||||
# Introduce a small sleep to reduce CPU usage
|
||||
time.sleep(1000)
|
||||
time.sleep(5)
|
||||
if (self.shutdown.is_set()):
|
||||
logger.info("Shutdown is set..")
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import java.io.File
|
||||
|
||||
object SharedConfig {
|
||||
@ -26,21 +26,23 @@ object DatabaseEnvConfig {
|
||||
}
|
||||
|
||||
fun DatabaseEnvConfig.toStoredDatabase(): MySqlDataSource {
|
||||
return MySqlDataSource(DatabaseConnectionConfig(
|
||||
val config = DatabaseConnectionConfig(
|
||||
databaseName = this.storedDatabase ?: "streamit",
|
||||
address = this.address ?: "localhost",
|
||||
port = this.port,
|
||||
username = this.username ?: "root",
|
||||
password = this.password ?: ""
|
||||
))
|
||||
)
|
||||
return MySqlDataSource(config)
|
||||
}
|
||||
|
||||
fun DatabaseEnvConfig.toEventsDatabase(): MySqlDataSource {
|
||||
return MySqlDataSource(DatabaseConnectionConfig(
|
||||
val config = DatabaseConnectionConfig(
|
||||
databaseName = this.eventBasedDatabase ?: "persistentEvents",
|
||||
address = this.address ?: "localhost",
|
||||
port = this.port,
|
||||
username = this.username ?: "root",
|
||||
password = this.password ?: ""
|
||||
))
|
||||
)
|
||||
return MySqlDataSource(config)
|
||||
}
|
||||
@ -1,43 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import java.time.Instant
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneId
|
||||
import java.time.ZoneOffset
|
||||
|
||||
abstract class DataSource(val config: DatabaseConnectionConfig) {
|
||||
open var database: Database? = null
|
||||
|
||||
abstract fun connect()
|
||||
|
||||
abstract fun createDatabase(): Database?
|
||||
|
||||
abstract fun createTables(vararg tables: Table)
|
||||
|
||||
abstract fun createDatabaseStatement(): String
|
||||
|
||||
abstract fun toConnectionUrl(): String
|
||||
|
||||
abstract fun toDatabaseConnectionUrl(database: String): String
|
||||
|
||||
fun toPortedAddress(): String {
|
||||
var baseAddress = config.address
|
||||
if (!config.port.isNullOrBlank()) {
|
||||
baseAddress += ":${config.port}"
|
||||
}
|
||||
return baseAddress
|
||||
}
|
||||
|
||||
abstract fun toDatabase(): Database
|
||||
|
||||
}
|
||||
|
||||
fun timestampToLocalDateTime(timestamp: Int): LocalDateTime {
|
||||
return Instant.ofEpochSecond(timestamp.toLong()).atZone(ZoneId.systemDefault()).toLocalDateTime()
|
||||
}
|
||||
|
||||
fun LocalDateTime.toEpochSeconds(): Long {
|
||||
return this.toEpochSecond(ZoneOffset.ofTotalSeconds(ZoneOffset.systemDefault().rules.getOffset(LocalDateTime.now()).totalSeconds))
|
||||
}
|
||||
@ -1,9 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
data class DatabaseConnectionConfig(
|
||||
val address: String,
|
||||
val port: String?,
|
||||
val username: String,
|
||||
val password: String,
|
||||
val databaseName: String
|
||||
)
|
||||
@ -1,85 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import mu.KotlinLogging
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.SchemaUtils
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
|
||||
open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
|
||||
val log = KotlinLogging.logger {}
|
||||
override fun connect() {
|
||||
this.toDatabase()
|
||||
}
|
||||
|
||||
override fun createDatabase(): Database? {
|
||||
val ok = transaction(toDatabaseServerConnection()) {
|
||||
val tmc = TransactionManager.current().connection
|
||||
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}';"
|
||||
val stmt = tmc.prepareStatement(query, true)
|
||||
|
||||
val resultSet = stmt.executeQuery()
|
||||
val databaseExists = resultSet.next()
|
||||
|
||||
if (!databaseExists) {
|
||||
try {
|
||||
exec(createDatabaseStatement())
|
||||
log.info { "Database ${config.databaseName} created." }
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
} else {
|
||||
log.info { "Database ${config.databaseName} already exists." }
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
return if (ok) toDatabase() else {
|
||||
log.error { "No database to create or connect to" }
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
override fun createTables(vararg tables: Table) {
|
||||
transaction(this.database) {
|
||||
SchemaUtils.createMissingTablesAndColumns(*tables)
|
||||
log.info { "Database transaction completed" }
|
||||
}
|
||||
}
|
||||
|
||||
override fun createDatabaseStatement(): String {
|
||||
return "CREATE DATABASE ${config.databaseName};"
|
||||
}
|
||||
|
||||
protected fun toDatabaseServerConnection(): Database {
|
||||
database = Database.connect(
|
||||
toConnectionUrl(),
|
||||
user = config.username,
|
||||
password = config.password
|
||||
)
|
||||
return database!!
|
||||
}
|
||||
|
||||
override fun toDatabase(): Database {
|
||||
val database = Database.connect(
|
||||
toDatabaseConnectionUrl(config.databaseName),
|
||||
user = config.username,
|
||||
password = config.password
|
||||
)
|
||||
this.database = database
|
||||
return database
|
||||
}
|
||||
|
||||
override fun toDatabaseConnectionUrl(database: String): String {
|
||||
return toConnectionUrl() + "/$database"
|
||||
}
|
||||
|
||||
override fun toConnectionUrl(): String {
|
||||
return "jdbc:mysql://${toPortedAddress()}"
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,153 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLIntegrityConstraintViolationException
|
||||
|
||||
open class TableDefaultOperations<T : Table> {
|
||||
|
||||
}
|
||||
|
||||
fun <T> withDirtyRead(db: Database? = null, block: () -> T): T? {
|
||||
return try {
|
||||
transaction(db = db, transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED) {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> withDirtyRead(db: DataSource? = null, block: () -> T): T? {
|
||||
return withDirtyRead(db?.database, block)
|
||||
}
|
||||
|
||||
|
||||
fun <T> withTransaction(db: Database? = null, block: () -> T): T? {
|
||||
return try {
|
||||
transaction(db) {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
null
|
||||
}
|
||||
}
|
||||
fun <T> withTransaction(db: DataSource? = null, block: () -> T): T? {
|
||||
return withTransaction(db?.database, block)
|
||||
}
|
||||
|
||||
|
||||
|
||||
fun <T> insertWithSuccess(db: Database? = null, block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction(db) {
|
||||
try {
|
||||
block()
|
||||
commit()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeOrException(db: Database? = null, rollbackOnFailure: Boolean = false, block: () -> T): Exception? {
|
||||
return try {
|
||||
transaction(db) {
|
||||
try {
|
||||
block()
|
||||
commit()
|
||||
null
|
||||
} catch (e: Exception) {
|
||||
// log the error here or handle the exception as needed
|
||||
if (rollbackOnFailure)
|
||||
rollback()
|
||||
e
|
||||
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeWithResult(db: Database? = null, block: () -> T): Pair<T?, Exception?> {
|
||||
return try {
|
||||
transaction(db) {
|
||||
try {
|
||||
val res = block()
|
||||
commit()
|
||||
res to null
|
||||
} catch (e: Exception) {
|
||||
// log the error here or handle the exception as needed
|
||||
rollback()
|
||||
null to e
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
return null to e
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeWithStatus(db: Database? = null, block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction(db) {
|
||||
try {
|
||||
block()
|
||||
commit()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeWithStatus(db: DataSource? = null, block: () -> T): Boolean {
|
||||
return executeWithStatus(db?.database, block)
|
||||
}
|
||||
|
||||
fun Exception.isExposedSqlException(): Boolean {
|
||||
return this is ExposedSQLException
|
||||
}
|
||||
|
||||
fun ExposedSQLException.isCausedByDuplicateError(): Boolean {
|
||||
return if (this.cause is SQLIntegrityConstraintViolationException) {
|
||||
return this.errorCode == 1062
|
||||
} else false
|
||||
}
|
||||
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
import no.iktdev.eventi.database.executeOrException
|
||||
import no.iktdev.eventi.database.withDirtyRead
|
||||
import no.iktdev.mediaprocessing.shared.common.getAppVersion
|
||||
import org.jetbrains.exposed.sql.and
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
|
||||
@ -4,14 +4,13 @@ import mu.KotlinLogging
|
||||
import no.iktdev.eventi.data.eventId
|
||||
import no.iktdev.eventi.data.referenceId
|
||||
import no.iktdev.eventi.data.toJson
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.*
|
||||
import no.iktdev.eventi.database.*
|
||||
import no.iktdev.mediaprocessing.shared.common.task.Task
|
||||
import no.iktdev.mediaprocessing.shared.common.task.TaskType
|
||||
import no.iktdev.mediaprocessing.shared.common.task.TaskDoz
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.Event
|
||||
import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
|
||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
||||
import java.security.MessageDigest
|
||||
import java.time.LocalDateTime
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import org.h2.jdbcx.JdbcDataSource
|
||||
import java.io.PrintWriter
|
||||
import java.sql.Connection
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import no.iktdev.eventi.database.MySqlDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
|
||||
class H2DataSource2(conf: DatabaseConnectionConfig): MySqlDataSource(conf) {
|
||||
|
||||
@ -65,7 +65,7 @@ class FileNameParserTest {
|
||||
@Test
|
||||
fun findTitleWithYear() {
|
||||
val input = "Dette er (en) tekst med (flere) paranteser som (potet) inneholder (år) som (2024) (2025).";
|
||||
val result = FileNameParser(input).guessSearchableTitle()
|
||||
val result = FileNameParser(input).guessSearchableTitle().first()
|
||||
assertThat(result).isEqualTo("Dette er tekst med paranteser som inneholder som (2024) (2025)")
|
||||
}
|
||||
|
||||
@ -83,11 +83,4 @@ class FileNameParserTest {
|
||||
assertThat(result.first()).isEqualTo("Urusei Yatsura")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testName() {
|
||||
val input = "The.Boys.S04E02.Life.Among.the.Septics.1080p.AMZN.WEB-DL.DDP5.1.H.264-NTb"
|
||||
val result = FileNameParser(input).guessSearchableTitle()
|
||||
assertThat(result.first()).isEqualTo("Urusei Yatsura (2022)")
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,14 +1,5 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.tests
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.H2DataSource2
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.events
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.util.UUID
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.deleteAll
|
||||
|
||||
/*
|
||||
class PersistentEventMangerTestBase {
|
||||
val defaultReferenceId = UUID.randomUUID().toString()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.shared.contract
|
||||
|
||||
import no.iktdev.eventi.implementations.EventsManagerImpl
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
import no.iktdev.mediaprocessing.shared.contract.data.Event
|
||||
|
||||
abstract class EventsManagerContract(dataSource: DataSource) : EventsManagerImpl<Event>(dataSource) {
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
package no.iktdev.eventi.database
|
||||
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import java.time.Instant
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
package no.iktdev.eventi.database
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.SchemaUtils
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
package no.iktdev.eventi.database
|
||||
|
||||
import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
|
||||
@ -2,6 +2,7 @@ package no.iktdev.eventi.implementations
|
||||
|
||||
import no.iktdev.eventi.core.ConsumableEvent
|
||||
import no.iktdev.eventi.data.*
|
||||
import javax.annotation.PostConstruct
|
||||
|
||||
abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
|
||||
abstract val coordinator: EventCoordinator<T, E>?
|
||||
@ -9,6 +10,16 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
|
||||
abstract val produceEvent: Any
|
||||
abstract val listensForEvents: List<Any>
|
||||
|
||||
open fun onReady() {
|
||||
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
fun onCreated() {
|
||||
onReady()
|
||||
}
|
||||
|
||||
|
||||
protected open fun onProduceEvent(event: T) {
|
||||
coordinator?.produceNewEvent(event) ?: {
|
||||
println("No Coordinator set")
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.eventi.implementations
|
||||
|
||||
import no.iktdev.eventi.data.EventImpl
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
|
||||
/**
|
||||
* Interacts with the database, needs to be within the Coordinator
|
||||
|
||||
@ -5,24 +5,13 @@
|
||||
package no.iktdev.eventi
|
||||
|
||||
import no.iktdev.eventi.data.EventImpl
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import no.iktdev.eventi.implementations.EventListenerImpl
|
||||
import no.iktdev.eventi.implementations.EventsManagerImpl
|
||||
import no.iktdev.eventi.mock.MockEventManager
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
import org.springframework.context.ApplicationContext
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@SpringBootApplication
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package no.iktdev.eventi.mock
|
||||
|
||||
import no.iktdev.eventi.database.DatabaseConnectionConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user