Changed behaviour

This commit is contained in:
bskjon 2024-04-21 20:43:03 +02:00
parent 7663f4d7e5
commit b245cf6f94
3 changed files with 22 additions and 10 deletions

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
@ -13,12 +14,15 @@ class Reporter() {
lateinit var restTemplate: RestTemplate lateinit var restTemplate: RestTemplate
@Autowired @Autowired
lateinit var messageTemplate: SimpMessagingTemplate lateinit var messageTemplate: SimpMessagingTemplate
private val log = KotlinLogging.logger {}
fun sendEncodeProgress(progress: ProcesserEventInfo) { fun sendEncodeProgress(progress: ProcesserEventInfo) {
try { try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java)
messageTemplate.convertAndSend("/topic/encode/progress", progress) messageTemplate.convertAndSend("/topic/encode/progress", progress)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() log.error { e.message }
} }
} }
@ -27,7 +31,7 @@ class Reporter() {
restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java) restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java)
messageTemplate.convertAndSend("/topic/extract/progress", progress) messageTemplate.convertAndSend("/topic/extract/progress", progress)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() log.error { e.message }
} }
} }

View File

@ -83,7 +83,8 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
fun startEncode(event: PersistentProcessDataMessage) { fun startEncode(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs() val outFile = File(ffwrc.outFile)
outFile.parentFile.mkdirs()
if (!logDir.exists()) { if (!logDir.exists()) {
logDir.mkdirs() logDir.mkdirs()
} }
@ -92,12 +93,14 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
if (setClaim) { if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" } log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { if (outFile.exists()) {
if (ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming // Setting consumed to prevent spamming
eventManager.setProcessEventCompleted(event.referenceId, event.eventId) eventManager.setProcessEventCompleted(event.referenceId, event.eventId, Status.ERROR)
return return
} }
}
runner?.runWithProgress() runner?.runWithProgress()
} else { } else {
@ -196,7 +199,11 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
outputFiles = listOf(info.outFile), outputFiles = listOf(info.outFile),
progress = progress?.toProcessProgress() progress = progress?.toProcessProgress()
) )
try {
reporter.sendEncodeProgress(processerEventInfo) reporter.sendEncodeProgress(processerEventInfo)
} catch (e: Exception) {
e.printStackTrace()
}
} }

View File

@ -142,7 +142,8 @@ class PersistentEventManager(private val dataSource: DataSource) {
} }
fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean { fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean {
return getProcessEventWith(referenceId, eventId)?.claimed ?: false val info = getProcessEventWith(referenceId, eventId)
return info?.claimed ?: true && info?.consumed ?: true
} }
fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean { fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean {