Task polling

This commit is contained in:
bskjon 2024-06-27 02:12:51 +02:00
parent f0251fae55
commit bbae7d932d
72 changed files with 1681 additions and 1458 deletions

View File

@ -9,6 +9,13 @@ plugins {
group = "no.iktdev.mediaprocessing.apps"
version = "1.0-SNAPSHOT"
val appVersion= "1.0.0"
tasks.processResources {
expand(mapOf("appVersion" to appVersion))
}
repositories {
mavenCentral()
maven("https://jitpack.io")

View File

@ -1,33 +0,0 @@
package no.iktdev.mediaprocessing.converter
import mu.KotlinLogging
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class ClaimsService() {
private val log = KotlinLogging.logger {}
@Autowired
lateinit var coordinator: ConverterCoordinator
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = eventManager.getProcessEventsWithExpiredClaim()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
expiredClaims.forEach {
val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" }
}
}
coordinator.readAllInQueue()
}
}

View File

@ -6,7 +6,8 @@ import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.persistance.TasksManager
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@ -23,8 +24,7 @@ fun getContext(): ApplicationContext? {
return context
}
lateinit var eventManager: PersistentEventManager
lateinit var taskManager: TasksManager
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
@ -46,9 +46,8 @@ fun main(args: Array<String>) {
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
eventManager = PersistentEventManager(eventsDatabase)
eventsDatabase.createTables(tasks)
taskManager = TasksManager(eventsDatabase)
context = runApplication<ConvertApplication>(*args)
}

View File

@ -1,104 +0,0 @@
package no.iktdev.mediaprocessing.converter
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@EnableScheduling
@Service
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
private val log = KotlinLogging.logger {}
override val listeners = PersistentEventProcessBasedMessageListener()
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
messages: List<PersistentProcessDataMessage>
) {
val triggeredMessage = messages.find { it.eventId == eventId }
if (triggeredMessage == null) {
log.error { "Could not find $eventId in provided messages" }
return
}
listeners.forwardEventMessageToListeners(triggeredMessage, messages)
}
override fun onCoordinatorReady() {
super.onCoordinatorReady()
log.info { "Converter Coordinator is ready" }
generateMissingEvents()
readAllInQueue()
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (event.key == KafkaEvents.EventWorkConvertCreated) {
val success = eventManager.setProcessEvent(event.key, event.value)
if (!success) {
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
} else {
ioCoroutine.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
}
} else if (event.key == KafkaEvents.EventWorkExtractPerformed) {
readAllInQueue()
} else {
log.debug { "Skipping ${event.key}" }
}
}
fun readAllInQueue() {
val messages = eventManager.getProcessEventsClaimable()// persistentReader.getAvailableProcessEvents()
ioCoroutine.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
private fun generateMissingEvents() {
val existing = eventManager.getAllProcessEvents().filter { it.event == KafkaEvents.EventWorkConvertCreated }.map { it.eventId }
val messages = eventManager.getEventsUncompleted()
val myEvents = messages.flatten()
.filter { it.event == KafkaEvents.EventWorkConvertCreated }
.filter { existing.none { en -> en == it.eventId } }
myEvents.forEach {
eventManager.setProcessEvent(it.event, Message(
referenceId = it.referenceId,
eventId = it.eventId,
data = it.data
))
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = eventManager.getProcessEventsClaimable() // persistentReader.getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
@Scheduled(fixedDelay = (5*6_0000))
fun checkForWork() {
log.info { "Checking if there is any work to do.." }
readAllInQueue()
generateMissingEvents()
}
}

View File

@ -1,47 +0,0 @@
package no.iktdev.mediaprocessing.converter
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class TaskCreator(coordinator: ConverterCoordinator) :
TaskCreatorImpl<ConverterCoordinator, PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>(coordinator) {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
/*override fun getListener(): Tasks {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}*/
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> {
return listOf()
}
override fun containsUnprocessedEvents(events: List<PersistentProcessDataMessage>): Boolean {
return true
}
}

View File

@ -0,0 +1,85 @@
package no.iktdev.mediaprocessing.converter
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.*
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.persistance.RunnerManager
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class TaskCoordinator(): TaskCoordinatorBase() {
private val log = KotlinLogging.logger {}
lateinit var runnerManager: RunnerManager
@Value("\${appVersion}")
private lateinit var version: String
override fun onCoordinatorReady() {
super.onCoordinatorReady()
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName, version = version)
runnerManager.assignRunner()
}
override val taskAvailabilityEventListener: MutableMap<TaskType, MutableList<TaskQueueListener>> = mutableMapOf(
TaskType.Convert to mutableListOf(),
)
private val taskListeners: MutableSet<TaskEvents> = mutableSetOf()
fun getTaskListeners(): List<TaskEvents> {
return taskListeners.toList()
}
fun addTaskEventListener(listener: TaskEvents) {
taskListeners.add(listener)
}
fun addConvertTaskListener(listener: TaskQueueListener) {
addTaskListener(TaskType.Convert, listener)
}
override fun addTaskListener(type: TaskType, listener: TaskQueueListener) {
super.addTaskListener(type, listener)
pullForAvailableTasks()
}
override fun pullForAvailableTasks() {
if (runnerManager.iAmSuperseded()) {
// This will let the application complete but not consume new
taskMode = ActiveMode.Passive
return
}
val available = taskManager.getClaimableTasks().asClaimable()
available.forEach { (type, list) ->
taskAvailabilityEventListener[type]?.forEach { listener ->
list.foreachOrUntilClaimed {
listener.onTaskAvailable(it)
}
}
}
}
override fun clearExpiredClaims() {
val expiredClaims = taskManager.getTasksWithExpiredClaim().filter { it.task in listOf(TaskType.Convert) }
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.task}" }
}
expiredClaims.forEach {
val result = taskManager.deleteTaskClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.task}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.task}" }
}
}
}
interface TaskEvents {
fun onCancelOrStopProcess(eventId: String)
}
}

View File

@ -0,0 +1,7 @@
package no.iktdev.mediaprocessing.converter.convert
interface ConvertListener {
fun onStarted(inputFile: String)
fun onCompleted(inputFile: String, outputFiles: List<String>)
fun onError(inputFile: String, message: String) {}
}

View File

@ -8,12 +8,15 @@ import no.iktdev.library.subtitle.export.Export
import no.iktdev.library.subtitle.reader.BaseReader
import no.iktdev.library.subtitle.reader.Reader
import no.iktdev.mediaprocessing.converter.ConverterEnv
import no.iktdev.mediaprocessing.shared.common.task.ConvertTaskData
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import java.io.File
import kotlin.jvm.Throws
class Converter(val referenceId: String, val eventId: String, val data: ConvertWorkerRequest) {
class Converter2(val data: ConvertTaskData,
private val listener: ConvertListener
) {
@Throws(FileUnavailableException::class)
private fun getReader(): BaseReader? {
@ -37,31 +40,37 @@ class Converter(val referenceId: String, val eventId: String, val data: ConvertW
}
@Throws(FileUnavailableException::class, FileIsNullOrEmpty::class)
fun execute(): List<File> {
fun execute() {
val file = File(data.inputFile)
Configuration.exportJson = true
val read = getReader()?.read() ?: throw FileIsNullOrEmpty()
if (read.isEmpty())
throw FileIsNullOrEmpty()
val filtered = read.filter { !it.ignore && it.type !in listOf(DialogType.SIGN_SONG, DialogType.CAPTION) }
val syncOrNotSync = syncDialogs(filtered)
listener.onStarted(file.absolutePath)
try {
Configuration.exportJson = true
val read = getReader()?.read() ?: throw FileIsNullOrEmpty()
if (read.isEmpty())
throw FileIsNullOrEmpty()
val filtered = read.filter { !it.ignore && it.type !in listOf(DialogType.SIGN_SONG, DialogType.CAPTION) }
val syncOrNotSync = syncDialogs(filtered)
val exporter = Export(file, File(data.outDirectory), data.outFileBaseName)
val exporter = Export(file, File(data.outDirectory), data.outFileBaseName)
return if (data.outFormats.isEmpty()) {
exporter.write(syncOrNotSync)
} else {
val exported = mutableListOf<File>()
if (data.outFormats.contains(SubtitleFormats.SRT)) {
exported.add(exporter.writeSrt(syncOrNotSync))
val outFiles = if (data.outFormats.isEmpty()) {
exporter.write(syncOrNotSync)
} else {
val exported = mutableListOf<File>()
if (data.outFormats.contains(SubtitleFormats.SRT)) {
exported.add(exporter.writeSrt(syncOrNotSync))
}
if (data.outFormats.contains(SubtitleFormats.SMI)) {
exported.add(exporter.writeSmi(syncOrNotSync))
}
if (data.outFormats.contains(SubtitleFormats.VTT)) {
exported.add(exporter.writeVtt(syncOrNotSync))
}
exported
}
if (data.outFormats.contains(SubtitleFormats.SMI)) {
exported.add(exporter.writeSmi(syncOrNotSync))
}
if (data.outFormats.contains(SubtitleFormats.VTT)) {
exported.add(exporter.writeVtt(syncOrNotSync))
}
exported
listener.onCompleted(file.absolutePath, outFiles.map { it.absolutePath })
} catch (e: Exception) {
listener.onError(file.absolutePath, e.message ?: e.localizedMessage)
}
}

View File

@ -1,32 +0,0 @@
package no.iktdev.mediaprocessing.converter.coordination
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class PersistentEventProcessBasedMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun listenerWantingEvent(
event: PersistentProcessDataMessage,
waitingListeners: List<Tasks<PersistentProcessDataMessage>>
): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,30 +0,0 @@
package no.iktdev.mediaprocessing.converter.flow
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class EventBasedProcessMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners
.filter { !events.map { e -> e.event }
.contains(it.producesEvent) }
return nonCreators
}
override fun listenerWantingEvent(event: PersistentProcessDataMessage, waitingListeners: List<Tasks<PersistentProcessDataMessage>>): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
}

View File

@ -1,233 +0,0 @@
package no.iktdev.mediaprocessing.converter.tasks
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.*
import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.common.helper.DerivedProcessIterationHolder
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.util.*
@EnableScheduling
@Service
class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
log.info { "Starting with id: $serviceId" }
}
override val listensForEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EventWorkExtractPerformed,
KafkaEvents.EventWorkConvertCreated
)
override val producesEvent: KafkaEvents
get() = KafkaEvents.EventWorkConvertPerformed
fun getRequiredExtractProcessForContinuation(
referenceId: String,
requiresEventId: String
): PersistentProcessDataMessage? {
return eventManager.getProcessEventWith(referenceId, requiresEventId)
}
fun canConvert(extract: PersistentProcessDataMessage?): Boolean {
return extract?.consumed == true && extract.data.isSuccess()
}
override fun onProcessEvents(
event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage>
): MessageDataWrapper? {
val waitsForEventId = if (event.event == KafkaEvents.EventWorkConvertCreated) {
// Do convert check
val convertRequest = event.data as ConvertWorkerRequest? ?: return null
convertRequest.requiresEventId
} else if (event.event == KafkaEvents.EventWorkExtractPerformed) {
if (event.data is ProcesserExtractWorkPerformed) event.data.derivedFromEventId else return null
} else null
val convertData = if (event.event == KafkaEvents.EventWorkConvertCreated) {
event.data as ConvertWorkerRequest? ?: return null
} else {
val convertEvent = events.find { it.referenceId == event.referenceId && it.event == KafkaEvents.EventWorkConvertCreated } ?: return null
convertEvent.data as ConvertWorkerRequest? ?: return null
}
if (waitsForEventId != null) {
// Requires the eventId to be defined as consumed
val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation(
referenceId = event.referenceId,
requiresEventId = waitsForEventId
)
if (requiredEventToBeCompleted == null) {
/*log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" }
val existing = scheduled_deferred_events[event.referenceId]
val newList = (existing ?: listOf()) + listOf(
DerivedProcessIterationHolder(
eventId = event.eventId,
event = convertEvent
)
)
scheduled_deferred_events[event.referenceId] = newList*/
return null
}
if (!canConvert(requiredEventToBeCompleted)) {
// Waiting for required event to be completed
return null
}
}
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val setClaim = eventManager.setProcessEventClaim(
referenceId = event.referenceId,
eventId = event.eventId,
claimer = serviceId
)
if (!setClaim) {
return null
}
val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = convertData)
if (!converter.canRead()) {
// Make claim regardless but push to schedule
return ConvertWorkPerformed(
status = Status.ERROR,
message = "Can't read the file..",
derivedFromEventId = converter.eventId,
producedBy = serviceId
)
}
val result = try {
performConvert(converter)
} catch (e: Exception) {
ConvertWorkPerformed(
status = Status.ERROR, message = e.message,
derivedFromEventId = converter.eventId,
producedBy = serviceId
)
}
val consumedIsSuccessful =
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
}
delay(1000)
var readbackIsSuccess = eventManager.isProcessEventCompleted(event.referenceId, event.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess =
eventManager.isProcessEventCompleted(event.referenceId, event.eventId)
}
}
return result
}
fun performConvert(converter: Converter): ConvertWorkPerformed {
return try {
val result = converter.execute()
ConvertWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
outFiles = result.map { it.absolutePath }
)
} catch (e: Converter.FileUnavailableException) {
e.printStackTrace()
ConvertWorkPerformed(
status = Status.ERROR,
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
outFiles = emptyList()
)
} catch (e: Converter.FileIsNullOrEmpty) {
e.printStackTrace()
ConvertWorkPerformed(
status = Status.ERROR,
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
outFiles = emptyList()
)
}
}
/*val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
@Scheduled(fixedDelay = (300_000))
fun validatePresenceOfRequiredEvent() {
val continueDeferral: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
for ((referenceId, eventList) in scheduled_deferred_events) {
val keepable = mutableListOf<DerivedProcessIterationHolder>()
for (event in eventList) {
val ce = if (event.event.data is ConvertWorkerRequest) event.event.data as ConvertWorkerRequest else null
try {
val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation(
referenceId = referenceId,
requiresEventId = ce?.requiresEventId!!
)
if (requiredEventToBeCompleted == null && event.iterated > 4) {
throw RuntimeException("Iterated overshot")
} else {
event.iterated++
keepable.add(event)
"Iteration ${event.iterated} for event ${event.eventId} in deferred check"
}
} catch (e: Exception) {
eventManager.setProcessEventCompleted(referenceId, event.eventId, Status.SKIPPED)
log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." }
producer.sendMessage(
referenceId = referenceId,
event = producesEvent,
data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}", derivedFromEventId = event.eventId)
)
}
}
continueDeferral[referenceId] = keepable
}
scheduled_deferred_events.clear()
scheduled_deferred_events.putAll(continueDeferral)
}*/
}

View File

@ -0,0 +1,126 @@
package no.iktdev.mediaprocessing.converter.tasks
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.*
import no.iktdev.mediaprocessing.converter.convert.ConvertListener
import no.iktdev.mediaprocessing.converter.convert.Converter2
import no.iktdev.mediaprocessing.shared.common.services.TaskService
import no.iktdev.mediaprocessing.shared.common.task.ConvertTaskData
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@EnableScheduling
@Service
class ConvertServiceV2(
@Autowired var tasks: TaskCoordinator,
) : TaskService(), ConvertListener, TaskCoordinator.TaskEvents {
override val log = KotlinLogging.logger {}
override val logDir = ConverterEnv.logDirectory
override fun getServiceId(serviceName: String): String {
return super.getServiceId(this::class.java.simpleName)
}
var worker: Converter2? = null
override fun onAttachListener() {
tasks.addConvertTaskListener(this)
tasks.addTaskEventListener(this)
}
override fun isReadyToConsume(): Boolean {
return worker == null
}
override fun isTaskClaimable(task: Task): Boolean {
return !taskManager.isTaskClaimed(referenceId = task.referenceId, eventId = task.eventId)
}
override fun onTaskAssigned(task: Task) {
startConvert(task)
}
fun startConvert(task: Task) {
val convert = task.data as ConvertTaskData
worker = Converter2(convert, this)
worker?.execute()
}
override fun onStarted(inputFile: String) {
val task = assignedTask ?: return
taskManager.markTaskAsClaimed(task.referenceId, task.eventId, serviceId)
log.info { "Convert started for ${task.referenceId}" }
}
override fun onCompleted(inputFile: String, outputFiles: List<String>) {
val task = assignedTask ?: return
log.info { "Convert completed for ${task.referenceId}" }
val claimSuccessful = taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
runBlocking {
delay(1000)
if (!claimSuccessful) {
taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
delay(1000)
}
var readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkConvertPerformed,
data = ConvertWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = task.eventId,
outFiles = outputFiles
)
)
onClearTask()
}
}
override fun onError(inputFile: String, message: String) {
val task = assignedTask ?: return
super.onError(inputFile, message)
log.info { "Convert error for ${task.referenceId}" }
val data = ConvertWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
derivedFromEventId = task.eventId,
outFiles = emptyList()
)
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkConvertPerformed,
data = data
)
}
override fun onClearTask() {
super.onClearTask()
worker = null
}
override fun onCancelOrStopProcess(eventId: String) {
TODO("Not yet implemented")
}
}

View File

@ -48,6 +48,7 @@ fun getEventsDatabase(): MySqlDataSource {
}
lateinit var eventManager: PersistentEventManager
lateinit var taskManager: TasksManager
fun main(args: Array<String>) {
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
@ -69,11 +70,13 @@ fun main(args: Array<String>) {
eventManager = PersistentEventManager(eventsDatabase)
taskManager = TasksManager(eventsDatabase)
val kafkaTables = listOf(
events, // For kafka
allEvents
allEvents,
tasks
)

View File

@ -1,10 +1,9 @@
package no.iktdev.mediaprocessing.coordinator
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.EventCoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
@ -19,7 +18,7 @@ import java.util.UUID
@EnableScheduling
@Service
class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
class EventCoordinator() : EventCoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
override fun onCoordinatorReady() {
super.onCoordinatorReady()

View File

@ -4,11 +4,10 @@ import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMe
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class TaskCreator(coordinator: Coordinator):
TaskCreatorImpl<Coordinator, PersistentMessage, PersistentEventBasedMessageListener>(coordinator) {
abstract class TaskCreator(coordinator: EventCoordinator):
TaskCreatorImpl<EventCoordinator, PersistentMessage, PersistentEventBasedMessageListener>(coordinator) {

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.eventManager
import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed
import org.springframework.beans.factory.annotation.Autowired
@ -13,7 +13,7 @@ import org.springframework.web.bind.annotation.RequestMapping
@Controller
@RequestMapping(path = ["/action"])
class ActionEventController(@Autowired var coordinator: Coordinator) {
class ActionEventController(@Autowired var coordinator: EventCoordinator) {
@RequestMapping("/flow/proceed")

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.EventRequest
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
@ -17,7 +17,7 @@ import java.io.File
@Controller
@RequestMapping(path = ["/request"])
class RequestEventController(@Autowired var coordinator: Coordinator) {
class RequestEventController(@Autowired var coordinator: EventCoordinator) {
@PostMapping("/convert")
@ResponseStatus(HttpStatus.OK)

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
@ -17,7 +17,7 @@ import org.springframework.stereotype.Service
import java.io.File
@Service
class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class BaseInfoFromFile(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
@ -27,10 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.sql.SQLIntegrityConstraintViolationException
import java.text.Normalizer
@Service
class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class CollectAndStoreTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}

View File

@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
@ -19,7 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EventMediaProcessCompleted

View File

@ -1,11 +1,16 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.common.persistance.lastOf
import no.iktdev.mediaprocessing.shared.common.task.ConvertTaskData
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.isOnly
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -15,46 +20,71 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class CreateConvertWorkTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EventWorkConvertCreated
override val listensForEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EventMediaProcessStarted, KafkaEvents.EventWorkExtractCreated)
get() = listOf(KafkaEvents.EventMediaProcessStarted, KafkaEvents.EventWorkExtractPerformed)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }
val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az<MediaProcessStarted>()
if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) && startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
val subtitleFile = File(startedEventData.file)
return produceConvertWorkRequest(subtitleFile, null, event.eventId)
} else {
val derivedInfoObject = if (event.event in requiredEvents) {
DerivedInfoObject.fromExtractWorkCreated(event)
} else {
val extractEvent = events.lastOf(KafkaEvents.EventWorkExtractCreated)
extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) }
} ?: return null
val result = if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) &&
event.data.az<MediaProcessStarted>()?.operations?.isOnly(StartOperationEvents.CONVERT) == true
) {
startedEventData?.file
} else if (event.isOfEvent(KafkaEvents.EventWorkExtractPerformed) && startedEventData?.operations?.contains(
StartOperationEvents.CONVERT
) == true
) {
val innerData = event.data.az<ProcesserExtractWorkPerformed>()
innerData?.outFile
} else null
val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) {
event.eventId
} else null;
val convertFile = result?.let { File(it) } ?: return null
val outFile = File(derivedInfoObject.outputFile)
return produceConvertWorkRequest(outFile, requiredEventId, event.eventId)
}
val taskData = ConvertTaskData(
allowOverwrite = true,
inputFile = convertFile.absolutePath,
outFileBaseName = convertFile.nameWithoutExtension,
outDirectory = convertFile.parentFile.absolutePath,
outFormats = emptyList()
)
taskManager.createTask(
referenceId = event.referenceId,
eventId = event.eventId,
task = TaskType.Convert,
data = Gson().toJson(taskData)
)
return if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) &&
event.data.az<MediaProcessStarted>()?.operations?.isOnly(StartOperationEvents.CONVERT) == true
) {
produceConvertWorkRequest(convertFile, null, event.eventId)
} else if (event.isOfEvent(KafkaEvents.EventWorkExtractPerformed) && startedEventData?.operations?.contains(
StartOperationEvents.CONVERT
) == true
) {
return produceConvertWorkRequest(convertFile, event.referenceId, event.eventId)
} else null
}
private fun produceConvertWorkRequest(file: File, requiresEventId: String?, derivedFromEventId: String?): ConvertWorkerRequest {
private fun produceConvertWorkRequest(
file: File,
requiresEventId: String?,
derivedFromEventId: String?
): ConvertWorkerRequest {
return ConvertWorkerRequest(
status = Status.COMPLETED,
requiresEventId = requiresEventId,
@ -67,7 +97,6 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
}
private data class DerivedInfoObject(
val outputFile: String,
val derivedFromEventId: String,

View File

@ -1,16 +1,23 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.TasksManager
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator) : CreateProcesserWorkTask(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EventWorkEncodeCreated
@ -34,6 +41,20 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : C
} else event
forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.let {
val entries = it.entries.firstOrNull() ?: return@let
val ffmpegTask = FfmpegTaskData(
inputFile = it.inputFile,
outFile = entries.outputFile,
arguments = entries.arguments
)
val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Encode, Gson().toJson(ffmpegTask))
if (!status) {
log.error { "Failed to create Encode task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" }
}
}
return super.onProcessEvents(forwardEvent, events)
}

View File

@ -1,16 +1,22 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinator) : CreateProcesserWorkTask(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EventWorkExtractCreated
@ -33,6 +39,20 @@ class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) :
sevent ?: event
} else event
forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.let {
it.entries.forEach { argsGroup ->
val ffmpegTask = FfmpegTaskData(
inputFile = it.inputFile,
outFile = argsGroup.outputFile,
arguments = argsGroup.arguments
)
val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Extract, Gson().toJson(ffmpegTask))
if (!status) {
log.error { "Failed to create Extract task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" }
}
}
}
return super.onProcessEvents(forwardEvent, events)
}
}

View File

@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.DownloadClient
import no.iktdev.mediaprocessing.shared.common.getComputername
@ -19,7 +19,7 @@ import java.io.File
import java.util.*
@Service
class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class DownloadAndStoreCoverTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
import no.iktdev.mediaprocessing.shared.common.parsing.Regexes
@ -17,7 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.JsonObject
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
@ -35,7 +35,7 @@ import java.util.*
*/
@Service
@EnableScheduling
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60

View File

@ -2,11 +2,10 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
@ -21,7 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class ParseVideoFileStreams(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}

View File

@ -4,7 +4,7 @@ import com.google.gson.Gson
import com.google.gson.JsonObject
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -22,7 +22,7 @@ import org.springframework.stereotype.Service
import java.io.File
@Service
class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class ReadVideoFileStreams(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val requiredOperations = listOf(StartOperationEvents.ENCODE, StartOperationEvents.EXTRACT)

View File

@ -1,9 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.log
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -12,9 +11,8 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) {
abstract class CreateProcesserWorkTask(override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted?

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -19,7 +19,7 @@ import org.springframework.stereotype.Service
import java.io.File
@Service
class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class EncodeArgumentCreatorTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val preference = Preference.getPreference()

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.*
import no.iktdev.mediaprocessing.shared.common.Preference
@ -22,7 +22,7 @@ import org.springframework.stereotype.Service
import java.io.File
@Service
class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
class ExtractArgumentCreatorTask(@Autowired override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val preference = Preference.getPreference()

View File

@ -34,7 +34,7 @@ interface FileWatcherEvents {
@Service
class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents {
class InputDirectoryWatcher(@Autowired var coordinator: EventCoordinator): FileWatcherEvents {
private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val queue = FileWatcherQueue()

View File

@ -0,0 +1,45 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.PersistentMessageFromJsonDump
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
class MetadataAndBaseInfoToFileOutTest {
fun testData(): String {
return """
[
{"type":"header","version":"5.2.1","comment":"Export to JSON plugin for PHPMyAdmin"},
{"type":"database","name":"eventsV2"},
{"type":"table","name":"events","database":"eventsV2","data":
[
{"id":"9","referenceId":"f015ad8a-8210-4040-993b-bdaa5bd25d80","eventId":"3cea9a98-2e65-4e70-96bc-4b6933c06af7","event":"event:media-read-base-info:performed","data":"{\"status\":\"COMPLETED\",\"title\":\"Psycho-Pass Movie\",\"sanitizedName\":\"Psycho-Pass Movie - Providence\",\"derivedFromEventId\":\"62408248-d457-4f4d-a2c7-9b17e5701336\"}","created":"2024-04-15 22:24:07.406088"},
{"id":"19","referenceId":"f015ad8a-8210-4040-993b-bdaa5bd25d80","eventId":"0edaa265-fc85-41bc-952a-acb21771feb9","event":"event:media-metadata-search:performed","data":"{\"status\":\"COMPLETED\",\"data\":{\"title\":\"Psycho-Pass Movie: Providence\",\"altTitle\":[],\"cover\":\"https:\/\/cdn.myanimelist.net\/images\/anime\/1244\/134653.jpg\",\"type\":\"movie\",\"summary\":[{\"summary\":\"In 2113, the Ministry of Foreign Affairs (MFA) dissolved their secret paramilitary unit known as the Peacebreakers. However, the squad disappeared, and their activities remained a mystery. Five years later, the Peacebreakers resurface when they murder Milcia Stronskaya, a scientist in possession of highly classified documents essential to the future of the Sybil System—Japan\\u0027s surveillance structure that detects potential criminals in society. To investigate the incident and prepare for a clash against the Peacebreakers in coordination with the MFA, the chief of the Public Safety Bureau decides to recruit the former Enforcer Shinya Kougami back into the force. Having defected years ago, Kougami currently works for the MFA under Frederica Hanashiro\\u0027s command. Kougami\\u0027s return creates tensions between him and his former colleagues Akane Tsunemori and Nobuchika Ginoza, but they must set aside their past grudges to focus on ensuring the security of the Sybil System. [Written by MAL Rewrite]\",\"language\":\"eng\"}],\"genres\":[\"Action\",\"Mystery\",\"Sci-Fi\",\"Suspense\"]}}","created":"2024-04-15 22:24:18.339106"}
]
}
]
""".trimIndent()
}
@Test
fun testVideoData() {
val pmdj = PersistentMessageFromJsonDump(testData())
val events = pmdj.getPersistentMessages()
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EventMediaReadBaseInfoPerformed) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val meta = events.lastOrSuccessOf(KafkaEvents.EventMediaMetadataSearchPerformed) { it.data is MetadataPerformed }?.data as MetadataPerformed?
val pm = MetadataAndBaseInfoToFileOut.ProcessMediaInfoAndMetadata(baseInfo, meta)
val vi = pm.getVideoPayload()
assertThat(vi).isNotNull()
}
}

View File

@ -9,6 +9,13 @@ plugins {
group = "no.iktdev.mediaprocessing.apps"
version = "1.0-SNAPSHOT"
val appVersion= "1.0.0"
tasks.processResources {
expand(mapOf("appVersion" to appVersion))
}
repositories {
mavenCentral()
maven("https://jitpack.io")
@ -79,4 +86,4 @@ tasks.bootJar {
tasks.jar {
archiveFileName.set("app.jar")
archiveBaseName.set("app")
}
}

View File

@ -1,154 +0,0 @@
package no.iktdev.mediaprocessing.processer
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
private val log = KotlinLogging.logger {}
override val listeners = PersistentEventProcessBasedMessageListener()
private val coordinatorEventListeners: MutableList<CoordinatorEvents> = mutableListOf()
fun getRegisteredEventListeners() = coordinatorEventListeners.toList()
fun addCoordinatorEventListener(listener: CoordinatorEvents) {
coordinatorEventListeners.add(listener)
}
fun removeCoordinatorEventListener(listener: CoordinatorEvents) {
coordinatorEventListeners.remove(listener)
}
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
messages: List<PersistentProcessDataMessage>
) {
val triggered = messages.find { it.eventId == eventId }
if (triggered == null) {
log.error { "Could not find $eventId in provided messages" }
return
}
listeners.forwardEventMessageToListeners(triggered, messages)
}
override fun onCoordinatorReady() {
super.onCoordinatorReady()
generateMissingEvents()
readAllAvailableInQueue()
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (!acceptEvents.contains(event.key)) {
return
}
if (event.key == KafkaEvents.EventNotificationOfWorkItemRemoval) {
handleDeletionOfEvents(event)
return
}
val success = eventManager.setProcessEvent(event.key, event.value)
if (!success) {
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
} else {
ioCoroutine.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
}
}
private fun handleDeletionOfEvents(kafkaPayload: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (kafkaPayload.value.data is NotificationOfDeletionPerformed) {
val data = kafkaPayload.value.data as NotificationOfDeletionPerformed
if (data.deletedEvent in processKafkaEvents) {
coordinatorEventListeners.forEach { it.onCancelOrStopProcess(data.deletedEventId) }
eventManager.deleteProcessEvent(kafkaPayload.value.referenceId, data.deletedEventId)
}
} else {
log.warn { "Deletion handling was triggered with wrong data" }
}
}
private fun readAllAvailableInQueue() {
val messages = eventManager.getProcessEventsClaimable()
ioCoroutine.launch {
messages.forEach {
delay(500)
listeners.forwardBatchEventMessagesToListeners(listOf(it))
//createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
private fun generateMissingEvents() {
val existing = eventManager.getAllProcessEvents().filter { it.event in processKafkaEvents }.map { it.eventId }
val messages = eventManager.getEventsUncompleted()
val myEvents = messages.flatten()
.filter { it.event in processKafkaEvents }
.filter { existing.none { en -> en == it.eventId } }
myEvents.forEach {
log.info { "Generating missing process event (${it.event.event}) for referenceId ${it.referenceId}," }
eventManager.setProcessEvent(it.event, Message(
referenceId = it.referenceId,
eventId = it.eventId,
data = it.data
))
}
}
/**
* If we get double events at the same time, this would be the case
*/
fun readNextAvailableMessageWithEvent(kafkaEvents: KafkaEvents) {
val messages = eventManager.getProcessEventsClaimable().firstOrNull { it.event == kafkaEvents }?.let {
readAllMessagesFor(referenceId = it.referenceId, eventId = it.eventId)
}
}
private fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = eventManager.getProcessEventsClaimable()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
private final val processKafkaEvents = listOf(
KafkaEvents.EventWorkEncodeCreated,
KafkaEvents.EventWorkExtractCreated,
)
private final val acceptEvents = listOf(
KafkaEvents.EventNotificationOfWorkItemRemoval
) + processKafkaEvents
@Scheduled(fixedDelay = (5*6_0000))
fun checkForWork() {
log.info { "Checking if there is any work to do.." }
readAllAvailableInQueue()
generateMissingEvents()
}
interface CoordinatorEvents {
fun onCancelOrStopProcess(eventId: String)
}
}

View File

@ -6,8 +6,7 @@ import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.springframework.boot.autoconfigure.SpringBootApplication
@ -15,9 +14,15 @@ import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
private val logger = KotlinLogging.logger {}
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
var taskMode: ActiveMode = ActiveMode.Active
@SpringBootApplication
class ProcesserApplication {
}
@ -27,7 +32,7 @@ fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var eventManager: PersistentEventManager
lateinit var taskManager: TasksManager
fun main(args: Array<String>) {
@ -45,10 +50,9 @@ fun main(args: Array<String>) {
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
eventsDatabase.createTables(tasks)
eventManager = PersistentEventManager(eventsDatabase)
taskManager = TasksManager(eventsDatabase)
val context = runApplication<ProcesserApplication>(*args)

View File

@ -1,44 +0,0 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class TaskCreator(coordinator: Coordinator) :
TaskCreatorImpl<Coordinator, PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>(coordinator) {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
/*override fun getListener(): Tasks {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}*/
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
override fun prerequisiteRequired(event: PersistentProcessDataMessage): List<() -> Boolean> {
return listOf()
}
override fun containsUnprocessedEvents(events: List<PersistentProcessDataMessage>): Boolean {
return true
}
}

View File

@ -0,0 +1,89 @@
package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.*
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.persistance.RunnerManager
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class TaskCoordinator(): TaskCoordinatorBase() {
private val log = KotlinLogging.logger {}
lateinit var runnerManager: RunnerManager
@Value("\${appVersion}")
private lateinit var version: String
override fun onCoordinatorReady() {
super.onCoordinatorReady()
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName, version = version)
runnerManager.assignRunner()
}
override val taskAvailabilityEventListener: MutableMap<TaskType, MutableList<TaskQueueListener>> = mutableMapOf(
TaskType.Encode to mutableListOf(),
TaskType.Extract to mutableListOf()
)
private val taskListeners: MutableSet<TaskEvents> = mutableSetOf()
fun getTaskListeners(): List<TaskEvents> {
return taskListeners.toList()
}
fun addTaskEventListener(listener: TaskEvents) {
taskListeners.add(listener)
}
fun addEncodeTaskListener(listener: TaskQueueListener) {
addTaskListener(TaskType.Encode, listener)
}
fun addExtractTaskListener(listener: TaskQueueListener) {
addTaskListener(TaskType.Extract, listener)
}
override fun addTaskListener(type: TaskType, listener: TaskQueueListener) {
super.addTaskListener(type, listener)
pullForAvailableTasks()
}
override fun pullForAvailableTasks() {
if (runnerManager.iAmSuperseded()) {
// This will let the application complete but not consume new
taskMode = ActiveMode.Passive
return
}
val available = taskManager.getClaimableTasks().asClaimable()
available.forEach { (type, list) ->
taskAvailabilityEventListener[type]?.forEach { listener ->
list.foreachOrUntilClaimed {
listener.onTaskAvailable(it)
}
}
}
}
override fun clearExpiredClaims() {
val expiredClaims = taskManager.getTasksWithExpiredClaim().filter { it.task in listOf(TaskType.Encode, TaskType.Extract) }
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.task}" }
}
expiredClaims.forEach {
val result = taskManager.deleteTaskClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.task}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.task}" }
}
}
}
interface TaskEvents {
fun onCancelOrStopProcess(eventId: String)
}
}

View File

@ -1,6 +1,6 @@
package no.iktdev.mediaprocessing.processer.controller
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.TaskCoordinator
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
@ -9,14 +9,14 @@ import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
@Controller
class CancelController(@Autowired var coordinator: Coordinator) {
class CancelController(@Autowired var task: TaskCoordinator) {
@RequestMapping(path = ["/cancel"])
fun cancelProcess(@RequestBody eventId: String? = null): ResponseEntity<String> {
if (eventId.isNullOrBlank()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("No eventId provided!")
}
coordinator.getRegisteredEventListeners().forEach { it.onCancelOrStopProcess(eventId) }
task.getTaskListeners().forEach { it.onCancelOrStopProcess(eventId) }
return ResponseEntity.ok(null)
}

View File

@ -1,32 +0,0 @@
package no.iktdev.mediaprocessing.processer.coordination
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class PersistentEventProcessBasedMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun listenerWantingEvent(
event: PersistentProcessDataMessage,
waitingListeners: List<Tasks<PersistentProcessDataMessage>>
): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,11 +0,0 @@
package no.iktdev.mediaprocessing.processer.coordination
/**
* Class to handle messages from websockets, produced by Processer instances.
* This is due to keep a overview of progress by processer
*/
class ProcesserSocketMessageListener {
}

View File

@ -0,0 +1,60 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
class FfmpegArgumentsBuilder() {
private val defaultArguments = listOf(
"-nostdin",
"-hide_banner"
)
private var inputFile: String? = null
private var outputFile: String? = null
private var overwrite: Boolean = false
private var progress: Boolean = false
private var suppliedArgs: List<String> = emptyList()
fun inputFile(inputFile: String) = apply {
this.inputFile = inputFile
}
fun outputFile(outputFile: String) = apply {
this.outputFile = outputFile
}
fun allowOverwrite(allowOverwrite: Boolean) = apply {
this.overwrite = allowOverwrite
}
fun withProgress(withProgress: Boolean) = apply {
this.progress = withProgress
}
fun args(args: List<String>) = apply {
this.suppliedArgs = args
}
fun build(): List<String> {
val args = mutableListOf<String>()
val inFile = if (inputFile == null || inputFile?.isBlank() == true) {
throw RuntimeException("Inputfile is required")
} else this.inputFile!!
val outFile: String = if (outputFile == null || outputFile?.isBlank() == true) {
throw RuntimeException("Outputfile is required")
} else this.outputFile!!
if (overwrite) {
args.add("-y")
}
args.addAll(defaultArguments)
args.addAll(listOf("-i", inFile))
args.addAll(suppliedArgs)
args.add(outFile)
if (progress) {
args.addAll(listOf("-progress", "pipe:1"))
}
return args
}
}

View File

@ -0,0 +1,10 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
interface FfmpegListener {
fun onStarted(inputFile: String)
fun onCompleted(inputFile: String, outputFile: String)
fun onProgressChanged(inputFile: String, progress: FfmpegDecodedProgress)
fun onError(inputFile: String, message: String) {}
}

View File

@ -0,0 +1,115 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegProgressDecoder
import java.io.File
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.*
private val log = KotlinLogging.logger {}
class FfmpegRunner(
val inputFile: String,
val outputFile: String,
val arguments: List<String>,
private val listener: FfmpegListener,
val logDir: File
) {
val currentDateTime = LocalDateTime.now()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd.HH.mm")
val formattedDateTime = currentDateTime.format(formatter)
val logFile = logDir.using("$formattedDateTime-${File(inputFile).nameWithoutExtension}.log")
val scope = CoroutineScope(Dispatchers.Unconfined + Job())
private var job: Job? = null
val decoder = FfmpegProgressDecoder()
private val outputCache = mutableListOf<String>()
fun isWorking(): Boolean {
return job != null && (job?.isCompleted != true) && scope.isActive
}
fun run(progress: Boolean = false) {
log.info { "Starting ffmpeg on file $inputFile" }
val args = FfmpegArgumentsBuilder()
.inputFile(inputFile)
.outputFile(outputFile)
.args(arguments)
.allowOverwrite(ProcesserEnv.allowOverwrite)
.withProgress(progress)
.build()
job = scope.launch {
execute(args)
}
}
fun isAlive(): Boolean {
return scope.isActive && job?.isCompleted != true
}
private suspend fun execute(args: List<String>) {
withContext(Dispatchers.IO) {
logFile.createNewFile()
}
listener.onStarted(inputFile)
val processOp = process(
ProcesserEnv.ffmpeg, *args.toTypedArray(),
stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE,
consumer = {
//log.info { it }
onOutputChanged(it)
},
destroyForcibly = true
)
val result = processOp
onOutputChanged("Received exit code: ${result.resultCode}")
if (result.resultCode != 0) {
listener.onError(inputFile, result.output.joinToString("\n"))
} else {
listener.onCompleted(inputFile, outputFile)
}
}
fun cancel(message: String = "Work was interrupted as requested") {
job?.cancel()
scope.cancel(message)
listener.onError(inputFile, message)
}
private var progress: FfmpegDecodedProgress? = null
fun onOutputChanged(line: String) {
outputCache.add(line)
writeToLog(line)
// toList is needed to prevent mutability.
decoder.parseVideoProgress(outputCache.toList())?.let { decoded ->
try {
val _progress = decoder.getProgress(decoded)
if (progress == null || _progress.progress > (progress?.progress ?: -1)) {
progress = _progress
listener.onProgressChanged(inputFile, _progress)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
fun writeToLog(line: String) {
logFile.printWriter().use {
it.appendLine(line)
}
}
}

View File

@ -0,0 +1,67 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import kotlinx.coroutines.cancel
import mu.KLogger
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.ClaimableTask
import no.iktdev.mediaprocessing.shared.common.TaskQueueListener
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.services.TaskService
import no.iktdev.mediaprocessing.shared.common.task.Task
import java.io.File
import java.util.*
import javax.annotation.PostConstruct
abstract class FfmpegTaskService: TaskService(), FfmpegListener {
abstract override val logDir: File
abstract override val log: KLogger
protected var runner: FfmpegRunner? = null
override fun onTaskAvailable(data: ClaimableTask) {
if (runner?.isWorking() == true) {
log.info { "Worker is already running.., will not consume" }
return
}
if (assignedTask != null) {
log.info { "Assigned task is not unassigned.., will not consume" }
return
}
val task = data.consume()
if (task == null) {
log.error { "Task is already consumed!" }
return
}
val isAlreadyClaimed = taskManager.isTaskClaimed(referenceId = task.referenceId, eventId = task.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return
}
task.let {
this.assignedTask = it
onTaskAssigned(it)
}
}
@PostConstruct
private fun onCreated() {
log.info { "Starting with id: $serviceId" }
onAttachListener()
}
fun clearWorker() {
this.runner?.scope?.cancel()
this.assignedTask = null
this.runner = null
}
fun cancelWorkIfRunning(eventId: String) {
if (assignedTask?.eventId == eventId) {
runner?.cancel()
}
}
}

View File

@ -4,10 +4,10 @@ import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.eventManager
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegProgressDecoder
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import java.io.File
import java.time.Duration

View File

@ -1,4 +1,4 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
package no.iktdev.mediaprocessing.processer.ffmpeg.progress
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserProgress

View File

@ -1,4 +1,4 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
package no.iktdev.mediaprocessing.processer.ffmpeg.progress
import java.lang.StringBuilder
import java.time.LocalTime

View File

@ -1,43 +0,0 @@
package no.iktdev.mediaprocessing.processer.services
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.eventManager
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class ClaimsService() {
private val log = KotlinLogging.logger {}
@Autowired
lateinit var coordinator: Coordinator
/**
* If this serivce calls for readback all on the coordinator, it will cause a full on crash, as it
*/
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = eventManager.getProcessEventsWithExpiredClaim()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val released = expiredClaims.mapNotNull {
val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" }
}
it
}
/*released.forEach {
log.info { "Sending released ${it.referenceId} ${it.event} into queue" }
coordinator.readAllMessagesFor(it.referenceId, it.eventId)
}*/
}
}

View File

@ -1,227 +0,0 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.encodeLogDirectory
override val producesEvent = KafkaEvents.EventWorkEncodePerformed
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EventWorkEncodeCreated
)
private var runner: FfmpegWorker? = null
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
private final val coordinatorEvents = object: Coordinator.CoordinatorEvents {
override fun onCancelOrStopProcess(eventId: String) {
cancelWorkIfRunning(eventId)
}
}
init {
log.info { "Starting with id: $serviceId" }
}
override fun attachListener() {
super.attachListener()
coordinator.addCoordinatorEventListener(listener = coordinatorEvents)
}
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper? {
if (event.event !in requiredEvents) {
return null
}
if (event.data !is FfmpegWorkRequestCreated) {
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId)
}
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runner?.isWorking() != true) {
startEncode(event)
} else {
log.warn { "Worker is already running.." }
}
// This should never return any other than continue or skipped
return null
}
fun startEncode(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
val outFile = File(ffwrc.outFile)
outFile.parentFile.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (outFile.exists()) {
if (ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
eventManager.setProcessEventCompleted(event.referenceId, event.eventId, Status.ERROR)
return
}
}
runner?.runWithProgress()
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
}
}
val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" }
return
}
log.info { "Encode started for ${runner.referenceId}" }
eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(referenceId, eventId, status = WorkStatus.Started, info, FfmpegDecodedProgress(
progress = 0,
time = "Unkown",
duration = "Unknown",
speed = "0",
)
)
}
override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" }
return
}
log.info { "Encode completed for ${runner.referenceId}" }
val consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
}
delay(1000)
var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
}
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
)
sendProgress(referenceId, eventId, status = WorkStatus.Completed, info, FfmpegDecodedProgress(
progress = 100,
time = "",
duration = "",
speed = "0",
))
clearWorker()
}
}
override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR)
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" }
return
}
log.info { "Encode failed for ${runner.referenceId}\n$errorMessage" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
)
sendProgress(referenceId, eventId, status = WorkStatus.Failed, info = info, progress = FfmpegDecodedProgress(
progress = 0,
time = "",
duration = "",
speed = "0",
))
clearWorker()
}
override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
sendProgress(referenceId, eventId, WorkStatus.Working, info, progress)
}
override fun onIAmAlive(referenceId: String, eventId: String) {
super.onIAmAlive(referenceId, eventId)
eventManager.setProcessEventClaimRefresh(referenceId, eventId, serviceId)
}
}
fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) {
val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = info.inputFile,
outputFiles = listOf(info.outFile),
progress = progress?.toProcessProgress()
)
try {
reporter.sendEncodeProgress(processerEventInfo)
} catch (e: Exception) {
e.printStackTrace()
}
}
fun clearWorker() {
this.runner = null
coordinator.readNextAvailableMessageWithEvent(KafkaEvents.EventWorkEncodeCreated)
}
@PreDestroy
fun shutdown() {
runner?.cancel("Stopping application")
}
fun cancelWorkIfRunning(eventId: String) {
if (runner?.eventId == eventId) {
runner?.cancel()
}
}
}

View File

@ -0,0 +1,213 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.Reporter
import no.iktdev.mediaprocessing.processer.TaskCoordinator
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegRunner
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.ClaimableTask
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.time.Duration
@Service
class EncodeServiceV2(
@Autowired var tasks: TaskCoordinator,
@Autowired private val reporter: Reporter
) : FfmpegTaskService(), TaskCoordinator.TaskEvents {
override val log = KotlinLogging.logger {}
override val logDir = ProcesserEnv.encodeLogDirectory
override fun getServiceId(serviceName: String): String {
return super.getServiceId(this::class.java.simpleName)
}
override fun onAttachListener() {
tasks.addEncodeTaskListener(this)
tasks.addTaskEventListener(this)
}
override fun isReadyToConsume(): Boolean {
return runner?.isWorking() == false
}
override fun isTaskClaimable(task: Task): Boolean {
return !taskManager.isTaskClaimed(referenceId = task.referenceId, eventId = task.eventId)
}
override fun onTaskAssigned(task: Task) {
startEncode(task)
}
fun startEncode(event: Task) {
val ffwrc = event.data as FfmpegTaskData
val outFile = File(ffwrc.outFile)
outFile.parentFile.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegRunner(
inputFile = ffwrc.inputFile,
outputFile = ffwrc.outFile,
arguments = ffwrc.arguments,
logDir = logDir, listener = this
)
if (outFile.exists()) {
if (ffwrc.arguments.firstOrNull() != "-y") {
this.onError(
ffwrc.inputFile,
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}"
)
// Setting consumed to prevent spamming
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
return
}
}
runner?.run(true)
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.task}" }
}
}
override fun onStarted(inputFile: String) {
val task = assignedTask ?: return
taskManager.markTaskAsClaimed(task.referenceId, task.eventId, serviceId)
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Started, FfmpegDecodedProgress(
progress = 0,
time = "Unkown",
duration = "Unknown",
speed = "0",
)
)
log.info { "Encode started for ${task.referenceId}" }
runner?.scope?.launch {
log.info { "Encode keep-alive started for ${task.referenceId}" }
while (runner?.isAlive() == true) {
delay(Duration.ofMinutes(5).toMillis())
taskManager.refreshTaskClaim(task.referenceId, task.eventId, serviceId)
}
}
}
override fun onCompleted(inputFile: String, outputFile: String) {
val task = assignedTask ?: return
log.info { "Encode completed for ${task.referenceId}" }
val claimSuccessful = taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
runBlocking {
delay(1000)
if (!claimSuccessful) {
taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
delay(1000)
}
var readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkEncodePerformed,
data = ProcesserEncodeWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = task.eventId,
outFile = outputFile
)
)
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Completed, FfmpegDecodedProgress(
progress = 100,
time = "",
duration = "",
speed = "0",
)
)
clearWorker()
}
}
override fun onError(inputFile: String, message: String) {
val task = assignedTask ?: return
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkEncodePerformed,
data = ProcesserEncodeWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
derivedFromEventId = task.eventId
)
)
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Failed, progress = FfmpegDecodedProgress(
progress = 0,
time = "",
duration = "",
speed = "0",
)
)
clearWorker()
}
override fun onProgressChanged(inputFile: String, progress: FfmpegDecodedProgress) {
val task = assignedTask ?: return
sendProgress(task.referenceId, task.eventId, WorkStatus.Working, progress)
}
fun sendProgress(
referenceId: String,
eventId: String,
status: WorkStatus,
progress: FfmpegDecodedProgress? = null
) {
val runner = runner ?: return
val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = runner.inputFile,
outputFiles = listOf(runner.outputFile),
progress = progress?.toProcessProgress()
)
try {
reporter.sendEncodeProgress(processerEventInfo)
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun onCancelOrStopProcess(eventId: String) {
cancelWorkIfRunning(eventId)
}
}

View File

@ -1,195 +0,0 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class ExtractService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.extractLogDirectory
override val producesEvent = KafkaEvents.EventWorkExtractPerformed
private var runner: FfmpegWorker? = null
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
log.info { "Starting with id: $serviceId" }
}
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EventWorkExtractCreated)
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper? {
if (!requiredEvents.contains(event.event)) {
return null
}
if (event.data !is FfmpegWorkRequestCreated) {
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId)
}
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runner?.isWorking() != true) {
startExtract(event)
} else {
log.warn { "Worker is already running.." }
}
// This should never return any other than continue or skipped
return null
}
fun startExtract(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
return
}
runner!!.run()
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
}
}
val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" }
return
}
log.info { "Extract started for ${runner.referenceId}" }
eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(referenceId, eventId, WorkStatus.Started, info)
}
override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" }
return
}
log.info { "Extract completed for ${runner.referenceId}" }
var consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
runBlocking {
delay(1000)
limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) {
consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
}
log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" }
delay(1000)
var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) {
readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
log.info { readbackIsSuccess }
}
log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserExtractWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = runner.eventId,
outFile = runner.info.outFile)
)
sendProgress(referenceId, eventId, WorkStatus.Completed, info)
log.info { "Extract is releasing worker" }
clearWorker()
}
}
override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR)
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" }
return
}
log.info { "Extract failed for ${runner.referenceId}\n$errorMessage" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
)
sendProgress(referenceId, eventId, WorkStatus.Failed, info)
clearWorker()
}
override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
sendProgress(referenceId, eventId, WorkStatus.Working, info, progress)
}
}
fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) {
val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = info.inputFile,
outputFiles = listOf(info.outFile),
progress = progress?.toProcessProgress()
)
reporter.sendExtractProgress(processerEventInfo)
}
fun clearWorker() {
this.runner = null
coordinator.readNextAvailableMessageWithEvent(KafkaEvents.EventWorkExtractCreated)
}
@PreDestroy
fun shutdown() {
runner?.cancel("Stopping application")
}
}

View File

@ -0,0 +1,182 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.Reporter
import no.iktdev.mediaprocessing.processer.TaskCoordinator
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegRunner
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class ExtractServiceV2(
@Autowired var tasks: TaskCoordinator,
@Autowired private val reporter: Reporter
) : FfmpegTaskService(), TaskCoordinator.TaskEvents {
override val log = KotlinLogging.logger {}
override val logDir = ProcesserEnv.encodeLogDirectory
override fun getServiceId(serviceName: String): String {
return super.getServiceId(this::class.java.simpleName)
}
override fun onAttachListener() {
tasks.addExtractTaskListener(this)
tasks.addTaskEventListener(this)
}
override fun isReadyToConsume(): Boolean {
return runner?.isWorking() == false
}
override fun isTaskClaimable(task: Task): Boolean {
return !taskManager.isTaskClaimed(referenceId = task.referenceId, eventId = task.eventId)
}
override fun onTaskAssigned(task: Task) {
startExtract(task)
}
fun startExtract(event: Task) {
val ffwrc = event.data as FfmpegTaskData
val outFile = File(ffwrc.outFile).also {
it.parentFile.mkdirs()
}
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegRunner(
inputFile = ffwrc.inputFile,
outputFile = ffwrc.outFile,
arguments = ffwrc.arguments,
logDir = logDir,
listener = this
)
if (outFile.exists()) {
if (ffwrc.arguments.firstOrNull() != "-y") {
this.onError(
ffwrc.inputFile,
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}"
)
// Setting consumed to prevent spamming
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
return
}
}
runner?.run()
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.task}" }
}
}
override fun onStarted(inputFile: String) {
val task = assignedTask ?: return
taskManager.markTaskAsClaimed(task.referenceId, task.eventId, serviceId)
sendProgress(task.referenceId, task.eventId, WorkStatus.Started)
}
override fun onCompleted(inputFile: String, outputFile: String) {
val task = assignedTask ?: return
log.info { "Extract completed for ${task.referenceId}" }
runBlocking {
var successfulComplete = false
limitedWhile({!successfulComplete}, 1000 * 10, 1000) {
taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
successfulComplete = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkExtractPerformed,
data = ProcesserExtractWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = task.eventId,
outFile = outputFile
)
)
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Completed, FfmpegDecodedProgress(
progress = 100,
time = "",
duration = "",
speed = "0",
)
)
clearWorker()
}
}
override fun onError(inputFile: String, message: String) {
val task = assignedTask ?: return
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkExtractPerformed,
data = ProcesserExtractWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
derivedFromEventId = task.eventId
)
)
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Failed, progress = FfmpegDecodedProgress(
progress = 0,
time = "",
duration = "",
speed = "0",
)
)
clearWorker()
}
override fun onProgressChanged(inputFile: String, progress: FfmpegDecodedProgress) {
val task = assignedTask ?: return
sendProgress(task.referenceId, task.eventId, WorkStatus.Working, progress)
}
fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, progress: FfmpegDecodedProgress? = null) {
val runner = runner ?: return
val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = runner.inputFile,
outputFiles = listOf(runner.outputFile),
progress = progress?.toProcessProgress()
)
try {
reporter.sendExtractProgress(processerEventInfo)
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun onCancelOrStopProcess(eventId: String) {
cancelWorkIfRunning(eventId)
}
}

View File

@ -1,3 +1,3 @@
spring.output.ansi.enabled=always
logging.level.org.apache.kafka=WARN
logging.level.root=INFO
logging.level.root=INFO

View File

@ -1,6 +1,6 @@
package no.iktdev.mediaprocessing.ui
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.EventCoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
@ -24,7 +24,7 @@ import org.springframework.stereotype.Service
@Service
@EnableScheduling
class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
class EventCoordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : EventCoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
override val listeners = PersistentEventBasedMessageListener()
override fun onCoordinatorReady() {

View File

@ -17,7 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
abstract class EventCoordinatorBase<V, L: EventBasedMessageListener<V>> {
val defaultCoroutine = CoroutinesDefault()
private var ready: Boolean = false
fun isReady() = ready

View File

@ -0,0 +1,91 @@
package no.iktdev.mediaprocessing.shared.common
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import javax.annotation.PostConstruct
@EnableScheduling
abstract class TaskCoordinatorBase() {
private val log = KotlinLogging.logger {}
var taskMode: ActiveMode = ActiveMode.Active
private var ready: Boolean = false
fun isReady() = ready
@Autowired
lateinit var producer: CoordinatorProducer
abstract val taskAvailabilityEventListener: MutableMap<TaskType, MutableList<TaskQueueListener>>
open fun addTaskListener(type: TaskType, listener: TaskQueueListener) {
val listeners = taskAvailabilityEventListener[type] ?: mutableListOf<TaskQueueListener>().also { it ->
taskAvailabilityEventListener[type] = it
}
listeners.add(listener)
}
open fun onCoordinatorReady() {
ready = true
}
@PostConstruct
fun onInitializationCompleted() {
onCoordinatorReady()
pullAvailable()
}
abstract fun pullForAvailableTasks()
@Scheduled(fixedDelay = (5_000))
fun pullAvailable() {
if (taskMode != ActiveMode.Active) {
return
}
pullForAvailableTasks()
}
abstract fun clearExpiredClaims()
@Scheduled(fixedDelay = (300_000))
fun resetExpiredClaims() {
if (taskMode != ActiveMode.Active) {
return
}
clearExpiredClaims()
}
}
class ClaimableTask(private var task: Task) {
var isConsumed: Boolean = false
private set
fun consume(): Task? {
return if (!isConsumed) {
isConsumed = true
task
} else null
}
}
interface TaskQueueListener {
fun onTaskAvailable(data: ClaimableTask)
}
fun Map<TaskType, List<Task>>.asClaimable(): Map<TaskType, List<ClaimableTask>> {
return this.mapValues { v -> v.value.map { ClaimableTask(it) } }
}
fun List<ClaimableTask>.foreachOrUntilClaimed(block: (task: ClaimableTask) -> Unit) {
this.forEach {
if (!it.isConsumed) {
block(it)
}
}
}

View File

@ -30,6 +30,10 @@ fun <T> withDirtyRead(db: Database? = null, block: () -> T): T? {
}
}
fun <T> withDirtyRead(db: DataSource? = null, block: () -> T): T? {
return withDirtyRead(db?.database, block)
}
fun <T> withTransaction(db: Database? = null, block: () -> T): T? {
return try {

View File

@ -83,15 +83,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
} ?: emptyList()
}
fun getProcessEventWith(referenceId: String, eventId: String): PersistentProcessDataMessage? {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.toPersistentProcesserMessage(dzz)
}?.singleOrNull()
}
fun getAllEvents(): List<PersistentMessage> {
return withDirtyRead(dataSource.database) {
events.selectAll()
@ -103,13 +94,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
return getAllEvents().toGrouped()
}
fun getAllProcessEvents(): List<PersistentProcessDataMessage> {
return withDirtyRead(dataSource.database) {
processerEvents.selectAll()
.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getEventsUncompleted(): List<List<PersistentMessage>> {
val identifiesAsCompleted = listOf(
KafkaEvents.EventCollectAndStore
@ -118,38 +102,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } }
}
fun getProcessEventsUncompleted(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.consumed eq false)
}.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getProcessEventsClaimable(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.consumed eq false) and
(processerEvents.claimed eq false)
}.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getProcessEventsWithExpiredClaim(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
return getProcessEventsUncompleted()
.filter { it.claimed && if (it.lastCheckIn != null) it.lastCheckIn.plusMinutes(15) < deadline else true }
}
fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean {
val info = getProcessEventWith(referenceId, eventId)
return info?.claimed ?: true && info?.consumed ?: true
}
fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean {
return getProcessEventWith(referenceId, eventId)?.consumed ?: false
}
//endregion
//region Database write
@ -222,97 +174,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
return success
}
fun setProcessEvent(event: KafkaEvents, message: Message<*>): Boolean {
val exception = executeOrException(dataSource.database) {
processerEvents.insert {
it[processerEvents.referenceId] = message.referenceId
it[processerEvents.eventId] = message.eventId
it[processerEvents.event] = event.event
it[processerEvents.data] = message.dataAsJson()
}
}
return if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.info { "Error is of SQLIntegrityConstraintViolationException" }
} else {
log.info { "Error code is: ${exception.errorCode}" }
exception.printStackTrace()
}
} else {
exception.printStackTrace()
}
false
} else {
true
}
}
fun setProcessEventClaim(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false) and
(processerEvents.consumed eq false)
}) {
it[claimedBy] = claimer
it[lastCheckIn] = CurrentDateTime
it[claimed] = true
}
}
}
fun setProcessEventCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[consumed] = true
it[claimed] = true
it[processerEvents.status] = status.name
}
}
}
fun setProcessEventClaimRefresh(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq true) and
(processerEvents.claimedBy eq claimer)
}) {
it[lastCheckIn] = CurrentDateTime
}
}
}
/**
* Removes the claim set on the process event
*/
fun deleteProcessEventClaim(referenceId: String, eventId: String): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[claimed] = false
it[claimedBy] = null
it[lastCheckIn] = null
}
}
}
fun deleteProcessEvent(referenceId: String, eventId: String): Boolean {
return executeWithStatus (dataSource) {
processerEvents.deleteWhere {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}
}
}
//endregion

View File

@ -1,14 +1,5 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimed
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimedBy
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.consumed
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.created
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.data
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.event
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.eventId
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.lastCheckIn
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.referenceId
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper

View File

@ -0,0 +1,46 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
import java.util.UUID
class RunnerManager(private val dataSource: DataSource, val startId: String = UUID.randomUUID().toString(), val name: String, val version: String) {
private val log = KotlinLogging.logger {}
fun assignRunner(): Boolean {
return executeOrException(dataSource.database) {
runners.insert {
it[runners.startId] = this@RunnerManager.startId
it[runners.application] = this@RunnerManager.name
it[runners.version] = this@RunnerManager.version
}
} == null
}
private fun versionToVersionCode(version: String?): Int? {
return version?.replace(".", "")?.toIntOrNull()
}
fun iAmSuperseded(): Boolean {
return withDirtyRead(dataSource.database) {
val runnerVersionCodes = runners.select {
(runners.application eq this@RunnerManager.version) and
(runners.startId neq this@RunnerManager.startId)
}.map { it[runners.version] }.mapNotNull { versionToVersionCode(it) }
val myVersion = versionToVersionCode(this.version)
myVersion?.let {
(runnerVersionCodes.any { rv -> rv > it })
} ?: true
} ?: true
}
}
enum class ActiveMode {
Active,
Passive
}

View File

@ -0,0 +1,147 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.common.task.TaskDoz
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import java.time.LocalDateTime
class TasksManager(private val dataSource: DataSource) {
private val log = KotlinLogging.logger {}
fun getClaimableTasks(): Map<TaskType, List<Task>> {
return withTransaction(dataSource.database) {
tasks.select {
(tasks.consumed eq false) and
(tasks.claimed eq false)
}.toTaskTypeGroup()
} ?: emptyMap()
}
fun getTasksFor(referenceId: String): List<Task> {
return withDirtyRead(dataSource) {
tasks.select {
(tasks.referenceId eq referenceId)
}.toTask()
} ?: emptyList()
}
fun getTaskWith(referenceId: String, eventId: String): Task? {
return withDirtyRead(dataSource.database) {
tasks.select {
(tasks.referenceId eq referenceId) and
(tasks.eventId eq eventId)
}.toTask()
}?.singleOrNull()
}
fun getTasksWithExpiredClaim(): List<Task> {
val deadline = LocalDateTime.now()
return getUncompletedTasks()
.filter { it.claimed && if (it.lastCheckIn != null) it.lastCheckIn.plusMinutes(15) < deadline else true }
}
fun isTaskClaimed(referenceId: String, eventId: String): Boolean {
val info = getTaskWith(referenceId, eventId)
return info?.claimed ?: true && info?.consumed ?: true
}
fun isTaskCompleted(referenceId: String, eventId: String): Boolean {
return getTaskWith(referenceId, eventId)?.consumed ?: false
}
fun getUncompletedTasks(): List<Task> {
return withTransaction(dataSource.database) {
tasks.select {
(tasks.consumed eq false)
}.toTask()
} ?: emptyList()
}
fun markTaskAsClaimed(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource.database) {
tasks.update({
(tasks.referenceId eq referenceId) and
(tasks.eventId eq eventId) and
(tasks.claimed eq false) and
(tasks.consumed eq false)
}) {
it[claimedBy] = claimer
it[lastCheckIn] = CurrentDateTime
it[claimed] = true
}
}
}
fun markTaskAsCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean {
return executeWithStatus(dataSource) {
tasks.update({
(tasks.referenceId eq referenceId) and
(tasks.eventId eq eventId)
}) {
it[consumed] = true
it[claimed] = true
it[tasks.status] = status.name
}
}
}
fun refreshTaskClaim(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource) {
tasks.update({
(tasks.referenceId eq referenceId) and
(tasks.eventId eq eventId) and
(tasks.claimed eq true) and
(tasks.claimedBy eq claimer)
}) {
it[lastCheckIn] = CurrentDateTime
}
}
}
fun deleteTaskClaim(referenceId: String, eventId: String): Boolean {
return executeWithStatus(dataSource) {
tasks.update({
(tasks.referenceId eq referenceId) and
(tasks.eventId eq eventId)
}) {
it[claimed] = false
it[claimedBy] = null
it[lastCheckIn] = null
}
}
}
fun createTask(referenceId: String, eventId: String, task: TaskType, data: String): Boolean {
return executeWithStatus(dataSource) {
tasks.insert {
it[tasks.referenceId] = referenceId
it[tasks.eventId] = eventId
it[tasks.task] = task.name
it[tasks.data] = data
}
}
}
}
fun Query?.toTaskTypeGroup(): Map<TaskType, List<Task>> {
val dz = TaskDoz()
val res = this?.mapNotNull { dz.deserializeTask(it) }?.groupBy { it.task } ?: emptyMap()
return res
}
fun Query?.toTask(): List<Task> {
val dz = TaskDoz()
val res = this?.mapNotNull { dz.deserializeTask(it) } ?: emptyList()
return res
}

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.persistance.events.defaultExpression
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.javatime.datetime
import java.time.LocalDateTime
object runners: IntIdTable() {
val startId: Column<String> = varchar("startId", 50)
val application: Column<String> = varchar("application", 50)
val version: Column<String> = varchar("version", 8)
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
}

View File

@ -6,19 +6,19 @@ import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.javatime.datetime
import java.time.LocalDateTime
object processerEvents: IntIdTable() {
object tasks: IntIdTable() {
val referenceId: Column<String> = varchar("referenceId", 50)
val status: Column<String?> = varchar("status", 10).nullable()
val claimed: Column<Boolean> = bool("claimed").default(false)
val claimedBy: Column<String?> = varchar("claimedBy", 100).nullable()
val event: Column<String> = varchar("event",100)
val consumed: Column<Boolean> = bool("consumed").default(false)
val task: Column<String> = varchar("task",50)
val eventId: Column<String> = varchar("eventId", 50)
val data: Column<String> = text("data")
val consumed: Column<Boolean> = bool("consumed").default(false)
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
val lastCheckIn: Column<LocalDateTime?> = datetime("lastCheckIn").nullable()
init {
uniqueIndex(referenceId, eventId, event)
uniqueIndex(referenceId, eventId, task)
}
}

View File

@ -0,0 +1,68 @@
package no.iktdev.mediaprocessing.shared.common.services
import mu.KLogger
import no.iktdev.mediaprocessing.shared.common.ClaimableTask
import no.iktdev.mediaprocessing.shared.common.TaskQueueListener
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.task.Task
import java.io.File
import java.util.*
import javax.annotation.PostConstruct
abstract class TaskService: TaskQueueListener {
abstract val logDir: File
abstract val log: KLogger
lateinit var serviceId: String
open fun getServiceId(serviceName: String = this.javaClass.simpleName): String {
return "${getComputername()}::${serviceName}::${UUID.randomUUID()}"
}
protected var assignedTask: Task? = null
/**
* If there is a task that the service is working on, return false
*/
abstract fun isReadyToConsume(): Boolean
abstract fun isTaskClaimable(task: Task): Boolean
abstract fun onTaskAssigned(task: Task)
abstract fun onAttachListener()
override fun onTaskAvailable(data: ClaimableTask) {
if (!isReadyToConsume()) {
return
}
if (assignedTask != null) {
log.info { "Assigned task is not unassigned.., will not consume" }
return
}
val task = data.consume()
if (task == null) {
log.error { "Task is already consumed!" }
return
}
if (!isTaskClaimable(task)) {
log.warn { "Process is already claimed!" }
return
}
task.let {
this.assignedTask = it
onTaskAssigned(it)
}
}
@PostConstruct
private fun onCreated() {
log.info { "Starting with id: $serviceId" }
onAttachListener()
}
open fun onClearTask() {
this.assignedTask = null
}
}

View File

@ -0,0 +1,16 @@
package no.iktdev.mediaprocessing.shared.common.task
import java.time.LocalDateTime
data class Task(
val referenceId: String,
val status: String? = null,
val claimed: Boolean = false,
val claimedBy: String? = null,
val consumed: Boolean = false,
val task: TaskType,
val eventId: String,
val data: TaskData? = null,
val created: LocalDateTime,
val lastCheckIn: LocalDateTime? = null
)

View File

@ -0,0 +1,28 @@
package no.iktdev.mediaprocessing.shared.common.task
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import org.jetbrains.exposed.sql.ResultRow
import java.io.Serializable
open class TaskData(
val inputFile: String,
): Serializable {
}
class FfmpegTaskData(
inputFile: String,
val arguments: List<String>,
val outFile: String
): TaskData(inputFile = inputFile)
class ConvertTaskData(
inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,
val outDirectory: String,
val outFormats: List<SubtitleFormats> = listOf()
): TaskData(inputFile = inputFile)

View File

@ -0,0 +1,43 @@
package no.iktdev.mediaprocessing.shared.common.task
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import com.mysql.cj.xdevapi.RowResult
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import org.jetbrains.exposed.sql.ResultRow
class TaskDoz {
val gson = Gson()
fun <T: TaskData> dzdata(type: TaskType, data: String): T? {
val clazz: Class<out TaskData> = when(type) {
TaskType.Encode, TaskType.Extract -> {
FfmpegTaskData::class.java
}
TaskType.Convert -> {
ConvertTaskData::class.java
}
else -> TaskData::class.java
}
val type = TypeToken.getParameterized(clazz).type
return gson.fromJson(data, type)
}
fun deserializeTask(row: ResultRow): Task? {
val taskType = row[tasks.task].let { TaskType.valueOf(it) }
val data = row[tasks.data]
return Task(
referenceId = row[tasks.referenceId],
status = row[tasks.status],
claimed = row[tasks.claimed],
claimedBy = row[tasks.claimedBy],
consumed = row[tasks.consumed],
task = taskType,
eventId = row[tasks.eventId],
data = dzdata(taskType, data),
created = row[tasks.created],
lastCheckIn = row[tasks.lastCheckIn]
)
}
}

View File

@ -0,0 +1,7 @@
package no.iktdev.mediaprocessing.shared.common.task
enum class TaskType {
Encode,
Extract,
Convert
}

View File

@ -1,7 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.tasks
enum class ProcesserTask {
Encode,
Extract,
Convert
}

View File

@ -1,15 +1,14 @@
package no.iktdev.mediaprocessing.shared.common.tasks
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.EventCoordinatorBase
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import javax.annotation.PostConstruct
abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessageListener<V>>(
abstract class TaskCreatorImpl<C : EventCoordinatorBase<V, L>, V, L : EventBasedMessageListener<V>>(
open var coordinator: C
) : ITaskCreatorListener<V> {
private val log = KotlinLogging.logger {}

View File

@ -83,4 +83,11 @@ class FileNameParserTest {
assertThat(result.first()).isEqualTo("Urusei Yatsura")
}
@Test
fun testName() {
val input = "The.Boys.S04E02.Life.Among.the.Septics.1080p.AMZN.WEB-DL.DDP5.1.H.264-NTb"
val result = FileNameParser(input).guessSearchableTitle()
assertThat(result.first()).isEqualTo("Urusei Yatsura (2022)")
}
}