Adjustments
This commit is contained in:
parent
4b63d70443
commit
8993546cb3
@ -21,9 +21,12 @@ import java.time.format.DateTimeFormatter
|
||||
object ZDS {
|
||||
val gson = WGson.gson
|
||||
|
||||
fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent {
|
||||
fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent? {
|
||||
val payloadJson = gson.toJson(this)
|
||||
val eventName = this::class.simpleName ?: error("Missing class name")
|
||||
val eventName = this::class.simpleName ?: run {
|
||||
println("Missing class name for event: $this")
|
||||
return null
|
||||
}
|
||||
return PersistedEvent(
|
||||
id = id,
|
||||
referenceId = referenceId,
|
||||
@ -37,15 +40,21 @@ object ZDS {
|
||||
/**
|
||||
* Convert a PersistedEvent back to its original Event type using the event type registry and Gson for deserialization.
|
||||
*/
|
||||
fun PersistedEvent.toEvent(): Event {
|
||||
fun PersistedEvent.toEvent(): Event? {
|
||||
val clazz = EventTypeRegistry.resolve(event)
|
||||
?: error("Unknown event type: $event")
|
||||
?: run {
|
||||
println("Missing class name for event: $this")
|
||||
return null
|
||||
}
|
||||
return gson.fromJson(data, clazz)
|
||||
}
|
||||
|
||||
fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask {
|
||||
fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask? {
|
||||
val payloadJson = gson.toJson(this)
|
||||
val taskName = this::class.simpleName ?: error("Missing class name")
|
||||
val taskName = this::class.simpleName ?: run {
|
||||
println("Missing class name for task: $this")
|
||||
return null
|
||||
}
|
||||
return PersistedTask(
|
||||
id = id,
|
||||
referenceId = referenceId,
|
||||
|
||||
@ -14,8 +14,8 @@ abstract class AbstractEventPoller(
|
||||
private val dispatcher: EventDispatcher
|
||||
) {
|
||||
var lastSeenTime: LocalDateTime = LocalDateTime.MIN
|
||||
var backoff = Duration.ofSeconds(2)
|
||||
private set
|
||||
open var backoff = Duration.ofSeconds(2)
|
||||
protected set
|
||||
private val maxBackoff = Duration.ofMinutes(1)
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ abstract class AbstractEventPoller(
|
||||
if (dispatchQueue.isProcessing(referenceId)) continue
|
||||
|
||||
val fullLog = eventStore.getPersistedEventsFor(referenceId)
|
||||
val events = fullLog.map { it.toEvent() }
|
||||
val events = fullLog.mapNotNull { it.toEvent() }
|
||||
|
||||
dispatchQueue.dispatch(referenceId, events, dispatcher)
|
||||
lastSeenTime = fullLog.maxOf { it.persistedAt }
|
||||
|
||||
@ -10,7 +10,8 @@ abstract class AbstractTaskPoller(
|
||||
private val taskStore: TaskStore,
|
||||
private val reporterFactory: (Task) -> TaskReporter
|
||||
) {
|
||||
var backoff = Duration.ofSeconds(2)
|
||||
|
||||
open var backoff = Duration.ofSeconds(2)
|
||||
protected set
|
||||
private val maxBackoff = Duration.ofMinutes(1)
|
||||
|
||||
|
||||
@ -15,7 +15,7 @@ import kotlin.coroutines.cancellation.CancellationException
|
||||
* @param T The type of result produced by processing the task.
|
||||
* @param reporter An instance of [TaskReporter] for reporting task status and events.
|
||||
*/
|
||||
abstract class TaskListener<T>(val taskType: TaskType): TaskListenerImplementation<T> {
|
||||
abstract class TaskListener<T>(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation<T> {
|
||||
|
||||
init {
|
||||
TaskListenerRegistry.registerListener(this)
|
||||
|
||||
@ -52,7 +52,7 @@ class EventDispatcherTest: TestBase() {
|
||||
assertNotNull(produced)
|
||||
|
||||
val event = produced!!.toEvent()
|
||||
assertEquals(trigger.eventId, event.metadata.derivedFromId)
|
||||
assertEquals(trigger.eventId, event!!.metadata.derivedFromId)
|
||||
assertTrue(event is DerivedEvent)
|
||||
}
|
||||
|
||||
@ -62,9 +62,10 @@ class EventDispatcherTest: TestBase() {
|
||||
|
||||
val trigger = TriggerEvent()
|
||||
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now())
|
||||
eventStore.persist(derived.toEvent()) // simulate prior production
|
||||
|
||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent()))
|
||||
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
||||
|
||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived!!.toEvent()!!))
|
||||
|
||||
assertEquals(1, eventStore.all().size) // no new event produced
|
||||
}
|
||||
@ -86,7 +87,7 @@ class EventDispatcherTest: TestBase() {
|
||||
|
||||
val trigger = TriggerEvent()
|
||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||
val replayContext = listOf(trigger) + eventStore.all().map { it.toEvent() }
|
||||
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
|
||||
|
||||
dispatcher.dispatch(trigger.referenceId, replayContext)
|
||||
|
||||
|
||||
@ -19,7 +19,7 @@ class InMemoryEventStore : EventStore {
|
||||
|
||||
override fun persist(event: Event) {
|
||||
val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now())
|
||||
persisted += persistedEvent
|
||||
persisted += persistedEvent!!
|
||||
}
|
||||
|
||||
fun all(): List<PersistedEvent> = persisted
|
||||
|
||||
@ -15,7 +15,7 @@ open class InMemoryTaskStore : TaskStore {
|
||||
|
||||
override fun persist(task: Task) {
|
||||
val persistedTask = task.toPersisted(nextId++)
|
||||
tasks += persistedTask
|
||||
tasks += persistedTask!!
|
||||
}
|
||||
|
||||
override fun findByTaskId(taskId: UUID) = tasks.find { it.taskId == taskId }
|
||||
|
||||
@ -32,7 +32,7 @@ class ZDSTest {
|
||||
val echo = EchoEvent("hello")
|
||||
val persisted = echo.toPersisted(id = 1L)
|
||||
|
||||
val restored = persisted.toEvent()
|
||||
val restored = persisted!!.toEvent()
|
||||
assert(restored is EchoEvent)
|
||||
assert((restored as EchoEvent).data == "hello")
|
||||
|
||||
@ -53,7 +53,7 @@ class ZDSTest {
|
||||
|
||||
val persisted = task.toPersisted(id = 1L)
|
||||
|
||||
val restored = persisted.toTask()
|
||||
val restored = persisted!!.toTask()
|
||||
assert(restored is TestTask)
|
||||
assert((restored as TestTask).data == "Potato")
|
||||
assert(restored.metadata.created == task.metadata.created)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user