Migrated coordinator to shared listener
This commit is contained in:
parent
08fc781d86
commit
6e1cc17235
@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.converter
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
|
||||
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
|
||||
@ -17,6 +18,7 @@ import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, EventBasedProcessMessageListener>() {
|
||||
val io = Coroutines.io()
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
|
||||
@ -5,7 +5,8 @@ import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.mediaprocessing.coordinator.coordination.EventBasedMessageListener
|
||||
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
|
||||
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
|
||||
@ -15,9 +16,8 @@ import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.*
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
@ -25,17 +25,54 @@ import java.util.UUID
|
||||
import javax.annotation.PostConstruct
|
||||
|
||||
@Service
|
||||
class Coordinator() {
|
||||
class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
|
||||
val io = Coroutines.io()
|
||||
|
||||
@Autowired
|
||||
private lateinit var producer: CoordinatorProducer
|
||||
override fun onCoordinatorReady() {
|
||||
readAllUncompletedMessagesInQueue()
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private lateinit var listener: DefaultMessageListener
|
||||
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
|
||||
val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value)
|
||||
if (!success) {
|
||||
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
|
||||
} else {
|
||||
io.launch {
|
||||
delay(500) // Give the database a few sec to update
|
||||
readAllMessagesFor(event.value.referenceId, event.value.eventId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun createTasksBasedOnEventsAndPersistence(
|
||||
referenceId: String,
|
||||
eventId: String,
|
||||
messages: List<PersistentMessage>
|
||||
) {
|
||||
val triggered = messages.find { it.eventId == eventId }
|
||||
if (triggered == null) {
|
||||
log.error { "Could not find $eventId in provided messages" }
|
||||
return
|
||||
}
|
||||
listeners.forwardEventMessageToListeners(triggered, messages)
|
||||
|
||||
if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) {
|
||||
if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
|
||||
forwarder.produceAllMissingProcesserEvents(
|
||||
producer = producer,
|
||||
referenceId = referenceId,
|
||||
eventId = eventId,
|
||||
messages = messages
|
||||
)
|
||||
} else {
|
||||
log.info { "Process for $referenceId was started manually and will require user input for continuation" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
val listeners = EventBasedMessageListener()
|
||||
override val listeners = PersistentEventBasedMessageListener()
|
||||
|
||||
private val forwarder = Forwarder()
|
||||
|
||||
@ -48,9 +85,6 @@ class Coordinator() {
|
||||
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent)
|
||||
}
|
||||
|
||||
|
||||
val io = Coroutines.io()
|
||||
|
||||
fun readAllUncompletedMessagesInQueue() {
|
||||
val messages = PersistentDataReader().getUncompletedMessages()
|
||||
io.launch {
|
||||
@ -82,7 +116,7 @@ class Coordinator() {
|
||||
}
|
||||
|
||||
fun operationToRunOnMessages(referenceId: String, eventId: String, messages: List<PersistentMessage>) {
|
||||
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
|
||||
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
|
||||
|
||||
io.launch {
|
||||
buildModelBasedOnMessagesFor(referenceId, messages)
|
||||
@ -99,40 +133,6 @@ class Coordinator() {
|
||||
}
|
||||
}
|
||||
|
||||
fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List<PersistentMessage>) {
|
||||
val triggered = messages.find { it.eventId == eventId }
|
||||
if (triggered == null) {
|
||||
log.error { "Could not find $eventId in provided messages" }
|
||||
return
|
||||
}
|
||||
listeners.forwardEventMessageToListeners(triggered, messages)
|
||||
|
||||
if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) {
|
||||
if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
|
||||
forwarder.produceAllMissingProcesserEvents(producer = producer, referenceId = referenceId, eventId = eventId, messages = messages)
|
||||
} else {
|
||||
log.info { "Process for $referenceId was started manually and will require user input for continuation" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
fun onReady() {
|
||||
io.launch {
|
||||
listener.onMessageReceived = { event ->
|
||||
val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value)
|
||||
if (!success) {
|
||||
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
|
||||
} else
|
||||
io.launch {
|
||||
delay(500) // Give the database a few sec to update
|
||||
readAllMessagesFor(event.value.referenceId, event.value.eventId)
|
||||
}
|
||||
}
|
||||
listener.listen(KafkaEnv.kafkaTopic)
|
||||
}
|
||||
readAllUncompletedMessagesInQueue()
|
||||
}
|
||||
|
||||
|
||||
class Forwarder() {
|
||||
@ -141,19 +141,26 @@ class Coordinator() {
|
||||
)
|
||||
|
||||
fun hasAnyRequiredEventToCreateProcesserEvents(messages: List<PersistentMessage>): Boolean {
|
||||
return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }.isNotEmpty()
|
||||
return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }
|
||||
.isNotEmpty()
|
||||
}
|
||||
|
||||
fun isMissingEncodeWorkCreated(messages: List<PersistentMessage>): Boolean {
|
||||
val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED }
|
||||
return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }
|
||||
}
|
||||
|
||||
fun isMissingExtractWorkCreated(messages: List<PersistentMessage>): Boolean {
|
||||
val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
|
||||
return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }
|
||||
}
|
||||
|
||||
fun produceAllMissingProcesserEvents(producer: CoordinatorProducer, referenceId: String, eventId: String, messages: List<PersistentMessage>) {
|
||||
fun produceAllMissingProcesserEvents(
|
||||
producer: CoordinatorProducer,
|
||||
referenceId: String,
|
||||
eventId: String,
|
||||
messages: List<PersistentMessage>
|
||||
) {
|
||||
val currentMessage = messages.find { it.eventId == eventId }
|
||||
if (!currentMessage?.data.isSuccess()) {
|
||||
return
|
||||
@ -164,11 +171,13 @@ class Coordinator() {
|
||||
produceEncodeWork(producer, currentMessage)
|
||||
}
|
||||
}
|
||||
|
||||
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> {
|
||||
if (isMissingExtractWorkCreated(messages)) {
|
||||
produceExtractWork(producer, currentMessage)
|
||||
}
|
||||
}
|
||||
|
||||
else -> {}
|
||||
}
|
||||
}
|
||||
@ -189,10 +198,12 @@ class Coordinator() {
|
||||
arguments = it.arguments,
|
||||
outFile = it.outputFile
|
||||
).let { createdRequest ->
|
||||
producer.sendMessage(message.referenceId,
|
||||
producer.sendMessage(
|
||||
message.referenceId,
|
||||
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
|
||||
eventId = message.eventId,
|
||||
createdRequest)
|
||||
createdRequest
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -212,7 +223,8 @@ class Coordinator() {
|
||||
arguments = it.arguments,
|
||||
outFile = it.outputFile
|
||||
).let { createdRequest ->
|
||||
producer.sendMessage(message.referenceId,
|
||||
producer.sendMessage(
|
||||
message.referenceId,
|
||||
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
|
||||
eventId = message.eventId,
|
||||
createdRequest
|
||||
@ -227,8 +239,10 @@ class Coordinator() {
|
||||
outFileBaseName = outFile.nameWithoutExtension,
|
||||
outDirectory = outFile.parentFile.absolutePath
|
||||
).let { createdRequest ->
|
||||
producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED,
|
||||
createdRequest)
|
||||
producer.sendMessage(
|
||||
message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED,
|
||||
createdRequest
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,99 +1,44 @@
|
||||
package no.iktdev.mediaprocessing.coordinator
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.coordinator.coordination.Tasks
|
||||
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
|
||||
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import javax.annotation.PostConstruct
|
||||
|
||||
abstract class TaskCreator: TaskCreatorListener {
|
||||
abstract class TaskCreator(coordinator: Coordinator):
|
||||
TaskCreatorImpl<Coordinator, PersistentMessage, PersistentEventBasedMessageListener>(coordinator) {
|
||||
val log = KotlinLogging.logger {}
|
||||
abstract val producesEvent: KafkaEvents
|
||||
|
||||
@Autowired
|
||||
lateinit var producer: CoordinatorProducer
|
||||
|
||||
@Autowired
|
||||
lateinit var coordinator: Coordinator
|
||||
|
||||
open val requiredEvents: List<KafkaEvents> = listOf()
|
||||
open val listensForEvents: List<KafkaEvents> = listOf()
|
||||
|
||||
open fun isPrerequisiteEventsOk(events: List<PersistentMessage>): Boolean {
|
||||
override fun isPrerequisiteEventsOk(events: List<PersistentMessage>): Boolean {
|
||||
val currentEvents = events.map { it.event }
|
||||
return requiredEvents.all { currentEvents.contains(it) }
|
||||
}
|
||||
open fun isPrerequisiteDataPresent(events: List<PersistentMessage>): Boolean {
|
||||
override fun isPrerequisiteDataPresent(events: List<PersistentMessage>): Boolean {
|
||||
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
|
||||
return failed.isEmpty()
|
||||
}
|
||||
|
||||
open fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean {
|
||||
override fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean {
|
||||
return event.event == singleOne
|
||||
}
|
||||
|
||||
fun getListener(): Tasks {
|
||||
/*override fun getListener(): Tasks {
|
||||
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
|
||||
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
|
||||
}
|
||||
}*/
|
||||
|
||||
|
||||
open fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
|
||||
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
|
||||
return listOf {
|
||||
isPrerequisiteEventsOk(events)
|
||||
}
|
||||
}
|
||||
|
||||
open fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> {
|
||||
override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> {
|
||||
return listOf()
|
||||
}
|
||||
|
||||
|
||||
private val context: MutableMap<String, Any> = mutableMapOf()
|
||||
private val context_key_reference = "reference"
|
||||
private val context_key_producesEvent = "event"
|
||||
final override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
|
||||
context[context_key_reference] = referenceId
|
||||
getListener().producesEvent.let {
|
||||
context[context_key_producesEvent] = it
|
||||
}
|
||||
|
||||
if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) {
|
||||
val result = onProcessEvents(event, events)
|
||||
if (result != null) {
|
||||
onResult(result)
|
||||
}
|
||||
} else {
|
||||
// TODO: Re-enable this
|
||||
// log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" }
|
||||
}
|
||||
}
|
||||
|
||||
abstract fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper?
|
||||
|
||||
|
||||
private fun onResult(data: MessageDataWrapper) {
|
||||
producer.sendMessage(
|
||||
referenceId = context[context_key_reference] as String,
|
||||
event = context[context_key_producesEvent] as KafkaEvents,
|
||||
data = data
|
||||
)
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
fun postConstruct() {
|
||||
coordinator.listeners.add(getListener())
|
||||
}
|
||||
}
|
||||
|
||||
fun interface Prerequisite {
|
||||
fun execute(value: Any): Boolean
|
||||
}
|
||||
|
||||
interface TaskCreatorListener {
|
||||
fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>): Unit
|
||||
}
|
||||
@ -1,64 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.coordination
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
|
||||
class EventBasedMessageListener {
|
||||
private val listeners: MutableList<Tasks> = mutableListOf()
|
||||
|
||||
fun add(produces: KafkaEvents, listener: TaskCreatorListener) {
|
||||
listeners.add(Tasks(producesEvent = produces, taskHandler = listener))
|
||||
}
|
||||
|
||||
fun add(task: Tasks) {
|
||||
listeners.add(task)
|
||||
}
|
||||
|
||||
private fun waitingListeners(events: List<PersistentMessage>): List<Tasks> {
|
||||
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
|
||||
return nonCreators
|
||||
}
|
||||
|
||||
private fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List<Tasks>): List<Tasks> {
|
||||
return waitingListeners.filter { event.event in it.listensForEvents }
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called in sequence, thus some messages might be made a duplicate of.
|
||||
*/
|
||||
fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List<PersistentMessage>) {
|
||||
val waitingListeners = waitingListeners(events)
|
||||
//val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners)
|
||||
//availableListeners.forEach {
|
||||
waitingListeners.forEach {
|
||||
try {
|
||||
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called with all messages at once, thus it should reflect kafka topic and database
|
||||
*/
|
||||
fun forwardBatchEventMessagesToListeners(events: List<PersistentMessage>) {
|
||||
val waitingListeners = waitingListeners(events)
|
||||
waitingListeners.forEach {
|
||||
try {
|
||||
val last = events.last()
|
||||
it.taskHandler.onEventReceived(last.referenceId, last, events)
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
data class Tasks(
|
||||
val producesEvent: KafkaEvents,
|
||||
val listensForEvents: List<KafkaEvents> = listOf(),
|
||||
val taskHandler: TaskCreatorListener
|
||||
)
|
||||
@ -0,0 +1,33 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.coordination
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
|
||||
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
|
||||
class PersistentEventBasedMessageListener: EventBasedMessageListener<PersistentMessage>() {
|
||||
|
||||
override fun listenerWantingEvent(
|
||||
event: PersistentMessage,
|
||||
waitingListeners: List<Tasks<PersistentMessage>>
|
||||
): List<Tasks<PersistentMessage>> {
|
||||
return waitingListeners.filter { event.event in it.listensForEvents }
|
||||
}
|
||||
|
||||
override fun onForward(
|
||||
event: PersistentMessage,
|
||||
history: List<PersistentMessage>,
|
||||
listeners: List<ITaskCreatorListener<PersistentMessage>>
|
||||
) {
|
||||
listeners.forEach {
|
||||
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
|
||||
}
|
||||
}
|
||||
|
||||
override fun waitingListeners(events: List<PersistentMessage>): List<Tasks<PersistentMessage>> {
|
||||
val nonCreators = listeners.filter { !events.filter { event -> !event.data.isSuccess() }.map { e -> e.event }.contains(it.producesEvent) }
|
||||
return nonCreators
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,6 +1,8 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.lastOrSuccess
|
||||
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
@ -8,11 +10,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
@Service
|
||||
class BaseInfoFromFile() : TaskCreator() {
|
||||
class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED
|
||||
@ -26,9 +29,10 @@ class BaseInfoFromFile() : TaskCreator() {
|
||||
}
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper {
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
|
||||
return readFileInfo(event.data as ProcessStarted)
|
||||
val selected = events.filter { it.event == KafkaEvents.EVENT_PROCESS_STARTED }.lastOrSuccess() ?: return null
|
||||
return readFileInfo(selected.data as ProcessStarted)
|
||||
}
|
||||
|
||||
fun readFileInfo(started: ProcessStarted): MessageDataWrapper {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||
@ -16,12 +17,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import no.iktdev.streamit.library.db.query.*
|
||||
import org.jetbrains.exposed.exceptions.ExposedSQLException
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
import java.sql.SQLIntegrityConstraintViolationException
|
||||
|
||||
@Service
|
||||
class CollectAndStoreTask() : TaskCreator() {
|
||||
class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
|
||||
|
||||
override val requiredEvents: List<KafkaEvents> = listOf(
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
@ -9,10 +10,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class CompleteTask() : TaskCreator() {
|
||||
class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED
|
||||
|
||||
override val requiredEvents: List<KafkaEvents> = listOf(
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.DownloadClient
|
||||
import no.iktdev.mediaprocessing.shared.common.getComputername
|
||||
@ -11,12 +12,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverDownloadWorkPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
import java.util.*
|
||||
|
||||
@Service
|
||||
class DownloadAndStoreCoverTask: TaskCreator() {
|
||||
class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
@ -9,10 +10,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerform
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class MetadataAndBaseInfoToCoverTask : TaskCreator() {
|
||||
class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import no.iktdev.exfl.using
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
|
||||
@ -15,6 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerform
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.scheduling.annotation.EnableScheduling
|
||||
import org.springframework.scheduling.annotation.Scheduled
|
||||
import org.springframework.stereotype.Service
|
||||
@ -25,7 +27,7 @@ import java.time.LocalDateTime
|
||||
*/
|
||||
@Service
|
||||
@EnableScheduling
|
||||
class MetadataAndBaseInfoToFileOut(): TaskCreator() {
|
||||
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
|
||||
import com.google.gson.Gson
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
|
||||
@ -13,10 +14,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsPars
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class ParseVideoFileStreams() : TaskCreator() {
|
||||
class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED
|
||||
|
||||
@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
|
||||
import com.google.gson.Gson
|
||||
import com.google.gson.JsonObject
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
@ -13,11 +14,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
@Service
|
||||
class ReadVideoFileStreams(): TaskCreator() {
|
||||
class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
|
||||
|
||||
import no.iktdev.exfl.using
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.shared.common.Preference
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
@ -10,11 +11,12 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
@Service
|
||||
class EncodeArgumentCreatorTask : TaskCreator() {
|
||||
class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
val preference = Preference.getPreference()
|
||||
override val producesEvent: KafkaEvents
|
||||
get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
|
||||
|
||||
import no.iktdev.exfl.using
|
||||
import no.iktdev.mediaprocessing.coordinator.Coordinator
|
||||
import no.iktdev.mediaprocessing.coordinator.TaskCreator
|
||||
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.*
|
||||
import no.iktdev.mediaprocessing.shared.common.Preference
|
||||
@ -13,11 +14,12 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
|
||||
@Service
|
||||
class ExtractArgumentCreatorTask : TaskCreator() {
|
||||
class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||
|
||||
val preference = Preference.getPreference()
|
||||
|
||||
|
||||
@ -16,7 +16,6 @@ import javax.annotation.PostConstruct
|
||||
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
|
||||
abstract val listeners: L
|
||||
|
||||
val io = Coroutines.io()
|
||||
|
||||
@Autowired
|
||||
lateinit var producer: CoordinatorProducer
|
||||
|
||||
@ -2,6 +2,9 @@ package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import kotlinx.coroutines.delay
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
import java.io.File
|
||||
import java.io.RandomAccessFile
|
||||
import java.net.InetAddress
|
||||
@ -22,6 +25,10 @@ fun isFileAvailable(file: File): Boolean {
|
||||
return false
|
||||
}
|
||||
|
||||
fun List<PersistentMessage>.lastOrSuccess(): PersistentMessage? {
|
||||
return this.lastOrNull { it.data.isSuccess() } ?: this.lastOrNull()
|
||||
}
|
||||
|
||||
|
||||
suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) {
|
||||
var elapsedDelay = 0L
|
||||
|
||||
Loading…
Reference in New Issue
Block a user