This commit is contained in:
bskjon 2024-04-11 01:33:42 +02:00
parent f0a8e14aaa
commit 5eb94df884
7 changed files with 25 additions and 26 deletions

9
.idea/misc.xml generated
View File

@ -1,12 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="EntryPointsManager"> <component name="ExternalStorageConfigurationManager" enabled="true" />
<list size="2"> <component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="azul-17" project-jdk-type="JavaSDK">
<item index="0" class="java.lang.String" itemvalue="org.springframework.scheduling.annotation.Scheduled" />
<item index="1" class="java.lang.String" itemvalue="org.springframework.stereotype.Service" />
</list>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>

View File

@ -182,12 +182,10 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf() val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
@Scheduled(fixedDelay = (300_000)) @Scheduled(fixedDelay = (300_000))
fun validatePresenceOfRequiredEvent() { fun validatePresenceOfRequiredEvent() {
val removal = mutableMapOf<String, List<DerivedProcessIterationHolder>>() val continueDeferral: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
for ((referenceId, eventList) in scheduled_deferred_events) { for ((referenceId, eventList) in scheduled_deferred_events) {
val failed = mutableListOf<DerivedProcessIterationHolder>() val keepable = mutableListOf<DerivedProcessIterationHolder>()
for (event in eventList) { for (event in eventList) {
val ce = if (event.event.data is ConvertWorkerRequest) event.event.data as ConvertWorkerRequest else null val ce = if (event.event.data is ConvertWorkerRequest) event.event.data as ConvertWorkerRequest else null
try { try {
@ -199,12 +197,12 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
throw RuntimeException("Iterated overshot") throw RuntimeException("Iterated overshot")
} else { } else {
event.iterated++ event.iterated++
keepable.add(event)
"Iteration ${event.iterated} for event ${event.eventId} in deferred check" "Iteration ${event.iterated} for event ${event.eventId} in deferred check"
} }
} catch (e: Exception) { } catch (e: Exception) {
eventManager.setProcessEventCompleted(referenceId, event.eventId) eventManager.setProcessEventCompleted(referenceId, event.eventId)
failed.add(event)
log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." } log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." }
producer.sendMessage( producer.sendMessage(
referenceId = referenceId, referenceId = referenceId,
@ -213,14 +211,11 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
) )
} }
} }
removal[referenceId] = failed continueDeferral[referenceId] = keepable
} }
for ((referenceId, events) in removal) { scheduled_deferred_events.clear()
val list = scheduled_deferred_events[referenceId] ?: continue scheduled_deferred_events.putAll(continueDeferral)
list.toMutableList().removeAll(events)
scheduled_deferred_events[referenceId] = list
}
} }

View File

@ -4,8 +4,10 @@ import no.iktdev.mediaprocessing.shared.common.Defaults
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import import org.springframework.context.annotation.Import
import org.springframework.web.client.RestTemplate
//@Configuration //@Configuration
//class SocketLocalInit: SocketImplementation() //class SocketLocalInit: SocketImplementation()
@ -16,4 +18,5 @@ class KafkaLocalInit: KafkaImplementation() {
} }
@Configuration @Configuration
class DefaultConfiguration: Defaults() public class DefaultProcesserConfiguration: Defaults() {
}

View File

@ -7,8 +7,9 @@ import org.springframework.stereotype.Service
import org.springframework.web.client.RestTemplate import org.springframework.web.client.RestTemplate
@Service @Service
class Reporter(@Autowired private val restTemplate: RestTemplate) { class Reporter() {
@Autowired
lateinit var restTemplate: RestTemplate
fun sendEncodeProgress(progress: ProcesserEventInfo) { fun sendEncodeProgress(progress: ProcesserEventInfo) {
try { try {
restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java)

View File

@ -3,10 +3,7 @@ package no.iktdev.mediaprocessing.ui.socket.internal
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.ui.UIEnv
import no.iktdev.mediaprocessing.ui.dto.EventDataObject
import no.iktdev.mediaprocessing.ui.memActiveEventMap
import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap
import org.springframework.messaging.simp.stomp.StompFrameHandler import org.springframework.messaging.simp.stomp.StompFrameHandler
import org.springframework.messaging.simp.stomp.StompHeaders import org.springframework.messaging.simp.stomp.StompHeaders
import org.springframework.messaging.simp.stomp.StompSession import org.springframework.messaging.simp.stomp.StompSession

View File

@ -8,7 +8,7 @@ import org.springframework.web.client.RestTemplate
open class Defaults { open class Defaults {
@Bean @Bean
fun restTemplate(): RestTemplate { open fun restTemplate(): RestTemplate {
val restTemplate = RestTemplate() val restTemplate = RestTemplate()
return restTemplate return restTemplate
} }

View File

@ -57,5 +57,13 @@ enum class KafkaEvents(val event: String) {
EventWorkExtractPerformed EventWorkExtractPerformed
) )
} }
fun isOfFinalize(event: KafkaEvents): Boolean {
return event in listOf(
EVENT_MEDIA_PROCESS_COMPLETED,
EVENT_REQUEST_PROCESS_COMPLETED,
EVENT_COLLECT_AND_STORE
)
}
} }
} }