Updated filter
This commit is contained in:
parent
4f3be9a642
commit
876d900e9b
@ -19,7 +19,7 @@ import org.springframework.stereotype.Service
|
|||||||
class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
|
||||||
val log = KotlinLogging.logger {}
|
val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED
|
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED
|
||||||
|
|
||||||
override val requiredEvents: List<KafkaEvents> = listOf(
|
override val requiredEvents: List<KafkaEvents> = listOf(
|
||||||
EVENT_REQUEST_PROCESS_STARTED,
|
EVENT_REQUEST_PROCESS_STARTED,
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
|
||||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
|
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
|
||||||
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
||||||
@ -11,6 +12,7 @@ import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
|||||||
import org.jetbrains.exposed.sql.update
|
import org.jetbrains.exposed.sql.update
|
||||||
import java.sql.SQLIntegrityConstraintViolationException
|
import java.sql.SQLIntegrityConstraintViolationException
|
||||||
|
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
open class PersistentDataStore {
|
open class PersistentDataStore {
|
||||||
fun storeEventDataMessage(event: String, message: Message<*>): Boolean {
|
fun storeEventDataMessage(event: String, message: Message<*>): Boolean {
|
||||||
val exception = executeOrException {
|
val exception = executeOrException {
|
||||||
@ -23,7 +25,13 @@ open class PersistentDataStore {
|
|||||||
}
|
}
|
||||||
return if (exception == null) true else {
|
return if (exception == null) true else {
|
||||||
if (exception.cause is SQLIntegrityConstraintViolationException) {
|
if (exception.cause is SQLIntegrityConstraintViolationException) {
|
||||||
exception.printStackTrace()
|
log.info { "Error is of SQLIntegrityConstraintViolationException" }
|
||||||
|
try {
|
||||||
|
log.info { "Error code is: ${ (exception as ExposedSQLException).errorCode}" }
|
||||||
|
} catch (e: Exception) {
|
||||||
|
|
||||||
|
}
|
||||||
|
//exception.printStackTrace()
|
||||||
(exception as ExposedSQLException).errorCode == 1062
|
(exception as ExposedSQLException).errorCode == 1062
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|||||||
@ -20,9 +20,9 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
|
|||||||
@Autowired
|
@Autowired
|
||||||
lateinit var producer: CoordinatorProducer
|
lateinit var producer: CoordinatorProducer
|
||||||
fun getListener(): Tasks<V> {
|
fun getListener(): Tasks<V> {
|
||||||
val reactableEvents: Set<KafkaEvents> = requiredEvents.toSet() + listensForEvents.toSet()
|
val reactableEvents = (requiredEvents + listensForEvents).distinct()
|
||||||
//val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
|
//val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
|
||||||
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents.toList())
|
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents)
|
||||||
}
|
}
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
fun attachListener() {
|
fun attachListener() {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user