This commit is contained in:
bskjon 2024-04-02 22:27:10 +02:00
parent b9a10e7585
commit 98527ec02f
20 changed files with 275 additions and 87 deletions

2
.idea/vcs.xml generated
View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="" vcs="Git" />
</component> </component>
</project> </project>

View File

@ -1,16 +1,19 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.Defaults
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import import org.springframework.context.annotation.Import
@Configuration //@Configuration
class SocketLocalInit: SocketImplementation() //class SocketLocalInit: SocketImplementation()
@Configuration @Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class) @Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() { class KafkaLocalInit: KafkaImplementation() {
} }
@Configuration
class DefaultConfiguration: Defaults()

View File

@ -0,0 +1,28 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import org.springframework.web.client.RestTemplate
@Service
class Reporter(@Autowired private val restTemplate: RestTemplate) {
fun sendEncodeProgress(progress: ProcesserEventInfo) {
try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java)
} catch (e: Exception) {
e.printStackTrace()
}
}
fun sendExtractProgress(progress: ProcesserEventInfo) {
try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java)
} catch (e: Exception) {
e.printStackTrace()
}
}
}

View File

@ -1,5 +1,7 @@
package no.iktdev.mediaprocessing.processer.ffmpeg package no.iktdev.mediaprocessing.processer.ffmpeg
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserProgress
data class FfmpegDecodedProgress( data class FfmpegDecodedProgress(
val progress: Int = -1, val progress: Int = -1,
val time: String, val time: String,
@ -7,6 +9,16 @@ data class FfmpegDecodedProgress(
val speed: String, val speed: String,
val estimatedCompletionSeconds: Long = -1, val estimatedCompletionSeconds: Long = -1,
val estimatedCompletion: String = "Unknown", val estimatedCompletion: String = "Unknown",
) ) {
fun toProcessProgress(): ProcesserProgress {
return ProcesserProgress(
progress = this.progress,
speed = this.speed,
timeWorkedOn = this.time,
timeLeft = this.estimatedCompletion
)
}
}
data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0) data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0)

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.ffmpeg
import com.github.pgreze.process.Redirect import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process import com.github.pgreze.process.process
import com.google.gson.Gson
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import mu.KotlinLogging import mu.KotlinLogging
@ -59,7 +58,7 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
logFile.createNewFile() logFile.createNewFile()
} }
listener.onStarted(info) listener.onStarted(referenceId, eventId, info)
val processOp = process( val processOp = process(
ProcesserEnv.ffmpeg, *args.toTypedArray(), ProcesserEnv.ffmpeg, *args.toTypedArray(),
stdout = Redirect.CAPTURE, stdout = Redirect.CAPTURE,
@ -73,17 +72,29 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
val result = processOp val result = processOp
onOutputChanged("Received exit code: ${result.resultCode}") onOutputChanged("Received exit code: ${result.resultCode}")
if (result.resultCode != 0) { if (result.resultCode != 0) {
listener.onError(info, result.output.joinToString("\n")) listener.onError(referenceId, eventId, info, result.output.joinToString("\n"))
} else { } else {
listener.onCompleted(info) listener.onCompleted(referenceId, eventId, info)
} }
} }
private var progress: FfmpegDecodedProgress? = null
fun onOutputChanged(line: String) { fun onOutputChanged(line: String) {
outputCache.add(line) outputCache.add(line)
writeToLog(line) writeToLog(line)
// toList is needed to prevent mutability. // toList is needed to prevent mutability.
val progress = decoder.parseVideoProgress(outputCache.toList()) decoder.parseVideoProgress(outputCache.toList())?.let { decoded ->
try {
val _progress = decoder.getProgress(decoded)
if (progress == null || _progress.progress > (progress?.progress ?: -1) ) {
progress = _progress
listener.onProgressChanged(referenceId, eventId, info, _progress)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
} }
fun writeToLog(line: String) { fun writeToLog(line: String) {
@ -96,8 +107,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
} }
interface FfmpegWorkerEvents { interface FfmpegWorkerEvents {
fun onStarted(info: FfmpegWorkRequestCreated,) fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated,)
fun onCompleted(info: FfmpegWorkRequestCreated) fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated)
fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String)
fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress)
} }

View File

@ -7,13 +7,13 @@ import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
@ -24,7 +24,7 @@ import java.util.*
import javax.annotation.PreDestroy import javax.annotation.PreDestroy
@Service @Service
class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.encodeLogDirectory private val logDir = ProcesserEnv.encodeLogDirectory
@ -83,7 +83,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
log.info { "Claim successful for ${event.referenceId} encode" } log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming // Setting consumed to prevent spamming
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
return return
@ -98,7 +98,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
} }
val ffmpegWorkerEvents = object : FfmpegWorkerEvents { val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(info: FfmpegWorkRequestCreated) { override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" } log.error { "Can't produce start message when the referenceId is not present" }
@ -106,7 +106,13 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
} }
log.info { "Encode started for ${runner.referenceId}" } log.info { "Encode started for ${runner.referenceId}" }
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(info, null, false) sendProgress(referenceId, eventId, status = WorkStatus.Started, info, FfmpegDecodedProgress(
progress = 0,
time = "Unkown",
duration = "Unknown",
speed = "0",
)
)
scope.launch { scope.launch {
while (runnerJob?.isActive == true) { while (runnerJob?.isActive == true) {
@ -116,7 +122,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
} }
} }
override fun onCompleted(info: FfmpegWorkRequestCreated) { override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" } log.error { "Can't produce completion message when the referenceId is not present" }
@ -139,12 +145,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
) )
sendProgress(referenceId, eventId, status = WorkStatus.Completed, info, FfmpegDecodedProgress(
progress = 100,
time = "",
duration = "",
speed = "0",
))
clearWorker() clearWorker()
} }
} }
override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
val runner = this@EncodeService.runner val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" } log.error { "Can't produce error message when the referenceId is not present" }
@ -154,18 +166,31 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) data = ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
) )
sendProgress(info = info, ended = true) sendProgress(referenceId, eventId, status = WorkStatus.Failed, info = info, progress = FfmpegDecodedProgress(
progress = 0,
time = "",
duration = "",
speed = "0",
))
clearWorker() clearWorker()
} }
override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
sendProgress(info, progress, false) sendProgress(referenceId, eventId, WorkStatus.Working, info, progress)
} }
} }
fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) { fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) {
// TODO: Implementation val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = info.inputFile,
outputFiles = listOf(info.outFile),
progress = progress?.toProcessProgress()
)
reporter.sendEncodeProgress(processerEventInfo)
} }

View File

@ -8,13 +8,13 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.shared.common.limitedWhile import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
@ -25,7 +25,7 @@ import java.util.*
import javax.annotation.PreDestroy import javax.annotation.PreDestroy
@Service @Service
class ExtractService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { class ExtractService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.extractLogDirectory private val logDir = ProcesserEnv.extractLogDirectory
@ -88,7 +88,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming // Setting consumed to prevent spamming
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
return return
@ -103,7 +103,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
} }
val ffmpegWorkerEvents = object : FfmpegWorkerEvents { val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(info: FfmpegWorkRequestCreated) { override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" } log.error { "Can't produce start message when the referenceId is not present" }
@ -111,10 +111,10 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
} }
log.info { "Extract started for ${runner.referenceId}" } log.info { "Extract started for ${runner.referenceId}" }
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendState(info, false) sendProgress(referenceId, eventId, WorkStatus.Started, info)
} }
override fun onCompleted(info: FfmpegWorkRequestCreated) { override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" } log.error { "Can't produce completion message when the referenceId is not present" }
@ -149,12 +149,13 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
derivedFromEventId = runner.eventId, derivedFromEventId = runner.eventId,
outFile = runner.info.outFile) outFile = runner.info.outFile)
) )
sendProgress(referenceId, eventId, WorkStatus.Completed, info)
log.info { "Extract is releasing worker" } log.info { "Extract is releasing worker" }
clearWorker() clearWorker()
} }
} }
override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
val runner = this@ExtractService.runner val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" } log.error { "Can't produce error message when the referenceId is not present" }
@ -164,18 +165,26 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId)
) )
sendState(info, ended= true) sendProgress(referenceId, eventId, WorkStatus.Failed, info)
clearWorker() clearWorker()
} }
override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
// None as this will not be running with progress sendProgress(referenceId, eventId, WorkStatus.Working, info, progress)
} }
} }
fun sendState(info: FfmpegWorkRequestCreated, ended: Boolean) { fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) {
val processerEventInfo = ProcesserEventInfo(
referenceId = referenceId,
eventId = eventId,
status = status,
inputFile = info.inputFile,
outputFiles = listOf(info.outFile),
progress = progress?.toProcessProgress()
)
reporter.sendExtractProgress(processerEventInfo)
} }

View File

@ -1,12 +1,16 @@
package no.iktdev.mediaprocessing.ui package no.iktdev.mediaprocessing.ui
import no.iktdev.mediaprocessing.shared.common.Defaults
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.beans.factory.annotation.Value import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory
import org.springframework.boot.web.server.WebServerFactoryCustomizer import org.springframework.boot.web.server.WebServerFactoryCustomizer
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry import org.springframework.context.annotation.Import
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate import org.springframework.web.client.RestTemplate
import org.springframework.web.method.HandlerTypePredicate import org.springframework.web.method.HandlerTypePredicate
@ -14,9 +18,6 @@ import org.springframework.web.servlet.config.annotation.CorsRegistry
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer import org.springframework.web.servlet.config.annotation.PathMatchConfigurer
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration @Configuration
@ -63,3 +64,11 @@ class ApiCommunicationConfig {
@Configuration @Configuration
class SocketImplemented: SocketImplementation() { class SocketImplemented: SocketImplementation() {
} }
@Configuration
class DefaultConfiguration: Defaults()
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -1,7 +1,6 @@
package no.iktdev.mediaprocessing.ui package no.iktdev.mediaprocessing.ui
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
@ -9,22 +8,24 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.ui.coordinator.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.ui.coordinator.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.ui.dto.EventSummary
import no.iktdev.mediaprocessing.ui.dto.EventSummarySubItem import no.iktdev.mediaprocessing.ui.dto.EventSummarySubItem
import no.iktdev.mediaprocessing.ui.dto.SummaryState import no.iktdev.mediaprocessing.ui.dto.SummaryState
import no.iktdev.mediaprocessing.ui.socket.EventbasedTopic
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
@EnableScheduling @EnableScheduling
class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() { class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
override val listeners = PersistentEventBasedMessageListener() override val listeners = PersistentEventBasedMessageListener()
val dbReader = PersistentDataReader(getEventsDatabase())
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
@ -41,14 +42,6 @@ class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : Coo
) { ) {
} }
fun readAllEvents() {
val messages = persistentReader.getAllMessages()
}
fun readAllProcesserEvents() {
val messages = persistentReader.getProcessEvents()
}
@Scheduled(fixedDelay = (5_000)) @Scheduled(fixedDelay = (5_000))
fun refreshDatabaseData() { fun refreshDatabaseData() {
@ -106,15 +99,62 @@ class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : Coo
return SummaryState.Preparing return SummaryState.Preparing
} }
// EVENT_MEDIA_METADATA_SEARCH_PERFORMED val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) }
if (analyzed2 != null) {
return SummaryState.Analyzing
}
val waitingForMeta = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED }
if (waitingForMeta != null) {
return SummaryState.Metadata
}
val analyzed = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) }
if (analyzed != null) {
return SummaryState.Analyzing
}
val readEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED }
if (readEvent != null) {
return SummaryState.Read
}
return SummaryState.Started return SummaryState.Started
} }
fun buildSummaries() { fun buildSummaries() {
val processerMessages = persistentReader.getProcessEvents().groupBy { it.referenceId }
val messages = persistentReader.getAllMessages() val messages = persistentReader.getAllMessages()
val mapped = messages.mapNotNull { it ->
val referenceId = it.firstOrNull()?.referenceId
if (referenceId != null) {
val procM = processerMessages.getOrDefault(referenceId, emptyList())
val processesStatuses = getCurrentStateFromProcesserEvents(procM)
val messageStatus = getCurrentState(it, processesStatuses)
val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED }?.data.let { data ->
if (data is BaseInfoPerformed) data else null
}
val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }?.data.let { data ->
if (data is VideoInfoPerformed) data else null
}
val baseName = if (mediaNameEvent == null) baseNameEvent?.sanitizedName else mediaNameEvent.toValueObject()?.fullName
EventSummary(
referenceId = referenceId,
baseName = baseName,
collection = mediaNameEvent?.toValueObject()?.title,
events = it.map { ke -> ke.event },
status = messageStatus,
activeEvens = processesStatuses
)
} else null
}
} }
} }

View File

@ -7,19 +7,15 @@ import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.Observables
import no.iktdev.exfl.observable.observableMapOf import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.mediaprocessing.ui.dto.ExplorerItem import no.iktdev.mediaprocessing.ui.dto.ExplorerItem
import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -44,8 +40,6 @@ fun getContext(): ApplicationContext? {
return context return context
} }
val memSimpleConvertedEventsMap: ObservableMap<String, SimpleEventDataObject> = observableMapOf()
val memActiveEventMap: ObservableMap<String, EventDataObject> = observableMapOf()
val fileRegister: ObservableMap<String, ExplorerItem> = observableMapOf() val fileRegister: ObservableMap<String, ExplorerItem> = observableMapOf()
fun main(args: Array<String>) { fun main(args: Array<String>) {

View File

@ -4,10 +4,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
data class EventSummary( data class EventSummary(
val referenceId: String, val referenceId: String,
val baseName: String, val baseName: String? = null,
val collection: String, val collection: String? = null,
val events: List<KafkaEvents>, val events: List<KafkaEvents> = emptyList(),
val status: SummaryState, val status: SummaryState = SummaryState.Started,
val activeEvens: Map<String, EventSummarySubItem> val activeEvens: Map<String, EventSummarySubItem>
) )
@ -26,7 +26,7 @@ enum class SummaryState {
Preparing, Preparing,
Metadata, Metadata,
Analyzing, Analyzing,
Reading, Read,
Started Started
} }

View File

@ -1,21 +1,26 @@
package no.iktdev.mediaprocessing.ui.socket package no.iktdev.mediaprocessing.ui.socket
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.observable.ObservableList
import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.observableListOf
import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.mediaprocessing.ui.dto.EventDataObject import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.mediaprocessing.ui.dto.EventSummary
import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject
import no.iktdev.mediaprocessing.ui.memActiveEventMap
import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.stereotype.Controller import org.springframework.stereotype.Controller
@Controller @Controller
class UISocketService( class EventbasedTopic(
@Autowired private val template: SimpMessagingTemplate? @Autowired private val template: SimpMessagingTemplate?
) { ) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
val summaryList: ObservableList<EventSummary> = observableListOf()
val memSimpleConvertedEventsMap: ObservableMap<String, SimpleEventDataObject> = observableMapOf()
val memActiveEventMap: ObservableMap<String, EventDataObject> = observableMapOf()
init { init {
memActiveEventMap.addListener(object : ObservableMap.Listener<String, EventDataObject> { memActiveEventMap.addListener(object : ObservableMap.Listener<String, EventDataObject> {
@ -38,6 +43,12 @@ class UISocketService(
} }
} }
}) })
summaryList.addListener(object: ObservableList.Listener<EventSummary> {
override fun onListChanged(items: List<EventSummary>) {
super.onListChanged(items)
template?.convertAndSend("/topic/summary", items)
}
})
} }
@MessageMapping("/items") @MessageMapping("/items")

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.shared.common
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.client.RestTemplate
@Configuration
open class Defaults {
@Bean
fun restTemplate(): RestTemplate {
val restTemplate = RestTemplate()
return restTemplate
}
}

View File

@ -10,6 +10,7 @@ object SharedConfig {
val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe" val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe"
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val uiUrl: String = System.getenv("APP_URL_UI") ?: "http://ui:8080"
val preference: File = File("/data/config/preference.json") val preference: File = File("/data/config/preference.json")
} }

View File

@ -16,6 +16,7 @@ data class PersistentMessage(
val created: LocalDateTime val created: LocalDateTime
) )
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event return this.event == event
} }

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import org.h2.jdbcx.JdbcDataSource import org.h2.jdbcx.JdbcDataSource
import java.io.PrintWriter import java.io.PrintWriter
@ -8,16 +9,11 @@ import java.sql.SQLFeatureNotSupportedException
import java.util.logging.Logger import java.util.logging.Logger
import javax.sql.DataSource import javax.sql.DataSource
class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password) { class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(
companion object { DatabaseConnectionConfig(
fun fromDatabaseEnv(): H2DataSource { databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password, port = null
if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'") )
return H2DataSource( ) {
JdbcDataSource(),
databaseName = DatabaseConfig.database!!,
)
}
}
override fun getConnection(): Connection { override fun getConnection(): Connection {
return jdbcDataSource.connection return jdbcDataSource.connection
} }
@ -61,7 +57,7 @@ class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: Str
} }
override fun createDatabaseStatement(): String { override fun createDatabaseStatement(): String {
return "CREATE SCHEMA $databaseName" return "CREATE SCHEMA ${config.databaseName}"
} }
override fun toConnectionUrl(): String { override fun toConnectionUrl(): String {

View File

@ -0,0 +1,7 @@
package no.iktdev.mediaprocessing.shared.contract.dto
data class ConverterEventInfo(
val status: WorkStatus = WorkStatus.Pending,
val inputFile: String,
val outputFiles: List<String> = emptyList()
)

View File

@ -1,5 +1,13 @@
package no.iktdev.mediaprocessing.shared.contract.dto package no.iktdev.mediaprocessing.shared.contract.dto
enum class WorkStatus {
Pending,
Started,
Working,
Completed,
Failed
}
enum class SubtitleFormats { enum class SubtitleFormats {
ASS, ASS,

View File

@ -0,0 +1,17 @@
package no.iktdev.mediaprocessing.shared.contract.dto
data class ProcesserEventInfo(
val referenceId: String,
val eventId: String,
val status: WorkStatus = WorkStatus.Pending,
val progress: ProcesserProgress? = null,
val inputFile: String,
val outputFiles: List<String>
)
data class ProcesserProgress(
val progress: Int = -1,
val speed: String? = null,
val timeWorkedOn: String? = null,
val timeLeft: String? = "Unknown", // HH mm
)

View File

@ -27,18 +27,18 @@ data class VideoInfoPerformed(
data class EpisodeInfo( data class EpisodeInfo(
override val type: String = "serie", override val type: String = "serie",
val title: String, override val title: String,
val episode: Int, val episode: Int,
val season: Int, val season: Int,
val episodeTitle: String?, val episodeTitle: String?,
override val fullName: String override val fullName: String
): VideoInfo(type, fullName) ): VideoInfo(type, title, fullName)
data class MovieInfo( data class MovieInfo(
override val type: String = "movie", override val type: String = "movie",
val title: String, override val title: String,
override val fullName: String override val fullName: String
) : VideoInfo(type, fullName) ) : VideoInfo(type, title, fullName)
data class SubtitleInfo( data class SubtitleInfo(
val inputFile: String, val inputFile: String,
@ -49,6 +49,7 @@ data class SubtitleInfo(
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE)
open class VideoInfo( open class VideoInfo(
@Transient open val type: String, @Transient open val type: String,
@Transient open val title: String,
@Transient open val fullName: String @Transient open val fullName: String
) { ) {
fun toJsonObject(): JsonObject { fun toJsonObject(): JsonObject {