This commit is contained in:
bskjon 2024-04-19 02:22:25 +02:00
parent 3211cb2608
commit 84bc18ab41
15 changed files with 63 additions and 76 deletions

View File

@ -8,15 +8,17 @@ import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMe
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.io.File
import java.util.UUID
@EnableScheduling
@Service
class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
val io = Coroutines.io()
@ -58,32 +60,22 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
//private val forwarder = Forwarder()
public fun startProcess(file: File, type: ProcessType) {
val operations: List<ProcessStartOperationEvents> = listOf(
ProcessStartOperationEvents.ENCODE,
ProcessStartOperationEvents.EXTRACT,
ProcessStartOperationEvents.CONVERT
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
)
startProcess(file, type, operations)
}
fun startProcess(file: File, type: ProcessType, operations: List<ProcessStartOperationEvents>) {
fun startProcess(file: File, type: ProcessType, operations: List<StartOperationEvents>): UUID {
val referenceId: UUID = UUID.randomUUID()
val processStartEvent = MediaProcessStarted(
status = Status.COMPLETED,
file = file.absolutePath,
type = type
)
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EventMediaProcessStarted, processStartEvent)
}
public fun startRequestProcess(file: File, operations: List<RequestStartOperationEvents>): UUID {
val referenceId: UUID = UUID.randomUUID()
val start = RequestProcessStarted(
status = Status.COMPLETED,
file = file.absolutePath,
operations = operations
)
producer.sendMessage(referenceId = referenceId.toString(), KafkaEvents.EVENT_REQUEST_PROCESS_STARTED, start)
return referenceId
}
@ -150,6 +142,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
return messages.find { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted
}
@Scheduled(fixedDelay = (5*6_0000))
fun checkForWork() {
if (isReady()) {
log.info { "Checking if there is any uncompleted event sets" }
readAllUncompletedMessagesInQueue()
}
}
}

View File

@ -8,7 +8,9 @@ import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@Configuration
class SocketLocalInit: SocketImplementation()
class SocketLocalInit: SocketImplementation() {
}
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)

View File

@ -4,8 +4,7 @@ import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
@ -28,7 +27,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) {
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(convert.file)
}
val referenceId = coordinator.startRequestProcess(file, listOf(RequestStartOperationEvents.CONVERT))
val referenceId = coordinator.startProcess(file, ProcessType.MANUAL, listOf(StartOperationEvents.CONVERT))
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Gson().toJson(convert))
@ -44,7 +43,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) {
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(selectedFile)
}
coordinator.startProcess(file, ProcessType.MANUAL, listOf(ProcessStartOperationEvents.EXTRACT))
coordinator.startProcess(file, ProcessType.MANUAL, listOf(StartOperationEvents.EXTRACT))
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(selectedFile)

View File

@ -6,7 +6,7 @@ import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -58,22 +58,22 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
}
val hasEncodeAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.ENCODE)) {
val hasEncodeAndIsRequired = if (startedData.operations.contains(StartOperationEvents.ENCODE)) {
events.any { it.event == EventWorkEncodePerformed }
} else true
val hasExtractAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.EXTRACT)) {
val hasExtractAndIsRequired = if (startedData.operations.contains(StartOperationEvents.EXTRACT)) {
events.any { it.event == EventWorkExtractPerformed}
} else true
val hasConvertAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.CONVERT)) {
val hasConvertAndIsRequired = if (startedData.operations.contains(StartOperationEvents.CONVERT)) {
events.any { it.event == EventWorkConvertPerformed }
} else true
val missingRequired: MutableMap<ProcessStartOperationEvents, Boolean> = mutableMapOf(
ProcessStartOperationEvents.ENCODE to hasEncodeAndIsRequired,
ProcessStartOperationEvents.EXTRACT to hasExtractAndIsRequired,
ProcessStartOperationEvents.CONVERT to hasConvertAndIsRequired
val missingRequired: MutableMap<StartOperationEvents, Boolean> = mutableMapOf(
StartOperationEvents.ENCODE to hasEncodeAndIsRequired,
StartOperationEvents.EXTRACT to hasExtractAndIsRequired,
StartOperationEvents.CONVERT to hasConvertAndIsRequired
)

View File

@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -44,8 +44,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
log.info { "${event.referenceId} triggered by ${event.event}" }
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
if (!started.operations.contains(StartOperationEvents.ENCODE)) {
log.info { "Couldn't find operation event ${StartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
return null
}

View File

@ -8,7 +8,7 @@ import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.*
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
@ -52,8 +52,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
return null
}
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
if (!started.operations.contains(ProcessStartOperationEvents.EXTRACT)) {
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" }
if (!started.operations.contains(StartOperationEvents.EXTRACT)) {
log.info { "Couldn't find operation event ${StartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" }
return null
}

View File

@ -7,10 +7,12 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
@ -101,9 +103,13 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
val usableMessages = messages.filter { lists -> lists.any { it.event in processKafkaEvents } }
val validMessages = usableMessages.filter { lists -> lists.any { it.event == KafkaEvents.EventMediaProcessStarted } ||
(lists.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED } && lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } )
}.flatten().filter { it.event in processKafkaEvents }
val validMessages = usableMessages.filter { lists ->
lists.any { it.event == KafkaEvents.EventMediaProcessStarted && (it.data as MediaProcessStarted).type == ProcessType.FLOW } ||
lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted }
}
.flatten()
.filter { it.event in processKafkaEvents }
validMessages.filter { it.eventId !in existing }.forEach {
eventManager.setProcessEvent(it.event, Message(

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.stereotype.Service
import org.springframework.web.client.RestTemplate
@ -10,9 +11,12 @@ import org.springframework.web.client.RestTemplate
class Reporter() {
@Autowired
lateinit var restTemplate: RestTemplate
@Autowired
lateinit var messageTemplate: SimpMessagingTemplate
fun sendEncodeProgress(progress: ProcesserEventInfo) {
try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java)
messageTemplate.convertAndSend("/topic/encode/progress", progress)
} catch (e: Exception) {
e.printStackTrace()
}
@ -21,6 +25,7 @@ class Reporter() {
fun sendExtractProgress(progress: ProcesserEventInfo) {
try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java)
messageTemplate.convertAndSend("/topic/extract/progress", progress)
} catch (e: Exception) {
e.printStackTrace()
}

View File

@ -61,13 +61,12 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
val stored = events.findLast { it.event == KafkaEvents.EventCollectAndStore }
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }
val completedMediaEvent = events.findLast { it.event == KafkaEvents.EventMediaProcessCompleted }
val completedRequestEvent = events.findLast { it.event == KafkaEvents.EventRequestProcessCompleted }
if (stored != null && stored.data.isSuccess()) {
return SummaryState.Completed
}
if (completedMediaEvent?.data.isSuccess() || completedRequestEvent?.data.isSuccess()) {
if (completedMediaEvent?.data.isSuccess()) {
return SummaryState.AwaitingStore
}
if (processes.values.all { it.status == SummaryState.Completed }) {

View File

@ -19,6 +19,8 @@ import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
private var ready: Boolean = false
fun isReady() = ready
private val log = KotlinLogging.logger {}
abstract val listeners: L
@ -37,6 +39,7 @@ abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
log.info { "Attaching listeners to Coordinator" }
listener.onMessageReceived = { event -> onMessageReceived(event)}
listener.listen(KafkaEnv.kafkaTopic)
ready = true
}
abstract fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>)

View File

@ -19,6 +19,6 @@ object processerEvents: IntIdTable() {
val lastCheckIn: Column<LocalDateTime?> = datetime("lastCheckIn").nullable()
init {
uniqueIndex(referenceId, event)
uniqueIndex(referenceId, eventId, event)
}
}

View File

@ -16,13 +16,8 @@ enum class SubtitleFormats {
SMI
}
enum class ProcessStartOperationEvents {
enum class StartOperationEvents {
ENCODE,
EXTRACT,
CONVERT
}
enum class RequestStartOperationEvents {
CONVERT,
EXTRACT,
}

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.shared.kafka.core
enum class KafkaEvents(val event: String) {
EventMediaProcessStarted("event:media-process:started"),
EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"),
EventMediaReadStreamPerformed("event:media-read-stream:performed"),
EventMediaParseStreamPerformed("event:media-parse-stream:performed"),
@ -18,7 +17,6 @@ enum class KafkaEvents(val event: String) {
EventMediaWorkProceedPermitted("event:media-work-proceed:permitted"),
// This event is to be used for commuincating across all appss taht an event has ben removed and to rterminate existint events
EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"),
EventWorkEncodeCreated("event:work-encode:created"),
@ -37,7 +35,6 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
EventMediaProcessCompleted("event:media-process:completed"),
EventRequestProcessCompleted("event:request-process:completed"),
EventCollectAndStore("event::save"),
;
@ -63,7 +60,6 @@ enum class KafkaEvents(val event: String) {
fun isOfFinalize(event: KafkaEvents): Boolean {
return event in listOf(
EventMediaProcessCompleted,
EventRequestProcessCompleted,
EventCollectAndStore
)
}

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -11,10 +11,10 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
data class MediaProcessStarted(
override val status: Status,
val type: ProcessType = ProcessType.FLOW,
val operations: List<ProcessStartOperationEvents> = listOf(
ProcessStartOperationEvents.ENCODE,
ProcessStartOperationEvents.EXTRACT,
ProcessStartOperationEvents.CONVERT
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
),
val file: String // AbsolutePath
) : MessageDataWrapper(status)

View File

@ -1,18 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_REQUEST_PROCESS_STARTED)
data class RequestProcessStarted(
override val status: Status,
val operations: List<RequestStartOperationEvents> = listOf(
RequestStartOperationEvents.CONVERT
),
val file: String // AbsolutePath
) : MessageDataWrapper(status)