This commit is contained in:
Brage Skjønborg 2026-01-23 00:49:51 +01:00
parent a0f1908a1a
commit a9a06a41f9
16 changed files with 99 additions and 75 deletions

View File

@ -1,12 +1,14 @@
package no.iktdev.eventi
import java.time.Clock
import java.time.LocalDateTime
import java.time.Instant
object MyTime {
private val clock: Clock = Clock.systemUTC()
@JvmStatic
fun UtcNow(): LocalDateTime =
LocalDateTime.now(clock)
fun utcNow(): Instant =
Instant.now(clock)
}

View File

@ -1,5 +1,6 @@
package no.iktdev.eventi
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonDeserializationContext
import com.google.gson.JsonDeserializer
@ -15,13 +16,14 @@ import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskTypeRegistry
import java.lang.reflect.Type
import java.time.Instant
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
object ZDS {
val gson = WGson.gson
fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = MyTime.UtcNow()): PersistedEvent? {
fun Event.toPersisted(id: Long, persistedAt: Instant = MyTime.utcNow()): PersistedEvent? {
val payloadJson = gson.toJson(this)
val eventName = this::class.simpleName ?: run {
throw IllegalStateException("Missing class name for event: $this")
@ -47,7 +49,7 @@ object ZDS {
return gson.fromJson(data, clazz)
}
fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = MyTime.UtcNow()): PersistedTask? {
fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: Instant = MyTime.utcNow()): PersistedTask? {
val payloadJson = gson.toJson(this)
val taskName = this::class.simpleName ?: run {
throw IllegalStateException("Missing class name for task: $this")
@ -80,26 +82,47 @@ object ZDS {
object WGson {
val gson = GsonBuilder()
.registerTypeAdapter(Instant::class.java, InstantAdapter())
// hvis du fortsatt har LocalDateTime et sted:
.registerTypeAdapter(LocalDateTime::class.java, LocalDateTimeAdapter())
.create()
fun toJson(data: Any?): String {
return gson.toJson(data)
fun toJson(data: Any?): String =
gson.toJson(data)
class InstantAdapter : JsonSerializer<Instant>, JsonDeserializer<Instant> {
override fun serialize(
src: Instant,
typeOfSrc: Type,
context: JsonSerializationContext
): JsonElement =
JsonPrimitive(src.toString()) // ISO-8601, UTC
override fun deserialize(
json: JsonElement,
typeOfT: Type,
context: JsonDeserializationContext
): Instant =
Instant.parse(json.asString)
}
class LocalDateTimeAdapter : JsonSerializer<LocalDateTime>, JsonDeserializer<LocalDateTime> {
private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
override fun serialize(
src: LocalDateTime, typeOfSrc: Type, context: JsonSerializationContext
): JsonElement {
return JsonPrimitive(src.format(formatter))
}
src: LocalDateTime,
typeOfSrc: Type,
context: JsonSerializationContext
): JsonElement =
JsonPrimitive(src.format(formatter))
override fun deserialize(
json: JsonElement, typeOfT: Type, context: JsonDeserializationContext
): LocalDateTime {
return LocalDateTime.parse(json.asString, formatter)
}
json: JsonElement,
typeOfT: Type,
context: JsonDeserializationContext
): LocalDateTime =
LocalDateTime.parse(json.asString, formatter)
}
}
}

View File

@ -6,7 +6,7 @@ import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.stores.EventStore
import java.time.Duration
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
import kotlin.collections.iterator
@ -16,10 +16,10 @@ abstract class EventPollerImplementation(
private val dispatcher: EventDispatcher
) {
// Erstatter ikke lastSeenTime, men supplerer den
protected val refWatermark = mutableMapOf<UUID, LocalDateTime>()
protected val refWatermark = mutableMapOf<UUID, Instant>()
// lastSeenTime brukes kun som scan hint
var lastSeenTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0)
var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2)
protected set
@ -40,7 +40,7 @@ abstract class EventPollerImplementation(
}
suspend fun pollOnce() {
val pollStartedAt = MyTime.UtcNow()
val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" }
// Global scan hint: kombiner refWatermark og lastSeenTime
@ -70,7 +70,7 @@ abstract class EventPollerImplementation(
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) {
val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0)
val refSeen = refWatermark[ref] ?: Instant.EPOCH
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }

View File

@ -1,11 +1,11 @@
package no.iktdev.eventi.models
import no.iktdev.eventi.MyTime
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
class Metadata {
val created: LocalDateTime = MyTime.UtcNow()
val created: Instant = MyTime.utcNow()
var derivedFromId: Set<UUID>? = null
private set
fun derivedFromEventId(vararg id: UUID) = apply {

View File

@ -1,6 +1,5 @@
package no.iktdev.eventi.models
import java.time.LocalDateTime
import java.util.UUID

View File

@ -1,6 +1,6 @@
package no.iktdev.eventi.models.store
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
data class PersistedEvent(
@ -9,5 +9,5 @@ data class PersistedEvent(
val eventId: UUID,
val event: String,
val data: String,
val persistedAt: LocalDateTime
val persistedAt: Instant
)

View File

@ -1,6 +1,6 @@
package no.iktdev.eventi.models.store
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
data class PersistedTask(
@ -13,8 +13,8 @@ data class PersistedTask(
val claimed: Boolean,
val claimedBy: String? = null,
val consumed: Boolean,
val lastCheckIn: LocalDateTime? = null,
val persistedAt: LocalDateTime
val lastCheckIn: Instant? = null,
val persistedAt: Instant
) {}
enum class TaskStatus {

View File

@ -2,11 +2,11 @@ package no.iktdev.eventi.stores
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
interface EventStore {
fun getPersistedEventsAfter(timestamp: LocalDateTime): List<PersistedEvent>
fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent>
fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent>
fun persist(event: Event)
}

View File

@ -18,7 +18,6 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.time.LocalDateTime
import java.util.UUID
class EventDispatcherTest: TestBase() {
@ -71,7 +70,7 @@ class EventDispatcherTest: TestBase() {
val listener = ProducingListener()
val trigger = TriggerEvent()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.UtcNow())
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
eventStore.persist(derived!!.toEvent()!!) // simulate prior production

View File

@ -4,25 +4,25 @@ import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.eventi.stores.EventStore
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
class InMemoryEventStore : EventStore {
private val persisted = mutableListOf<PersistedEvent>()
private var nextId = 1L
override fun getPersistedEventsAfter(timestamp: LocalDateTime): List<PersistedEvent> =
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> =
persisted.filter { it.persistedAt > timestamp }
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> =
persisted.filter { it.referenceId == referenceId }
override fun persist(event: Event) {
val persistedEvent = event.toPersisted(nextId++, MyTime.UtcNow())
val persistedEvent = event.toPersisted(nextId++, MyTime.utcNow())
persisted += persistedEvent!!
}
fun persistAt(event: Event, persistedAt: LocalDateTime) {
fun persistAt(event: Event, persistedAt: Instant) {
val persistedEvent = event.toPersisted(nextId++, persistedAt)
persisted += persistedEvent!!
}

View File

@ -6,7 +6,7 @@ import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore
import java.time.Duration
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import java.util.UUID
import kotlin.concurrent.atomics.AtomicReference
@ -30,13 +30,13 @@ open class InMemoryTaskStore : TaskStore {
override fun claim(taskId: UUID, workerId: String): Boolean {
val task = findByTaskId(taskId) ?: return false
if (task.claimed && !isExpired(task)) return false
update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = MyTime.UtcNow()))
update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = MyTime.utcNow()))
return true
}
override fun heartbeat(taskId: UUID) {
val task = findByTaskId(taskId) ?: return
update(task.copy(lastCheckIn = MyTime.UtcNow()))
update(task.copy(lastCheckIn = MyTime.utcNow()))
}
override fun markConsumed(taskId: UUID, status: TaskStatus) {
@ -45,7 +45,7 @@ open class InMemoryTaskStore : TaskStore {
}
override fun releaseExpiredTasks(timeout: Duration) {
val now = MyTime.UtcNow()
val now = MyTime.utcNow()
tasks.filter {
it.claimed && !it.consumed && it.lastCheckIn?.isBefore(now.minus(timeout)) == true
}.forEach {
@ -60,8 +60,8 @@ open class InMemoryTaskStore : TaskStore {
}
private fun isExpired(task: PersistedTask): Boolean {
val now = MyTime.UtcNow()
return task.lastCheckIn?.isBefore(now.minusMinutes(15)) == true
val now = MyTime.utcNow()
return task.lastCheckIn?.isBefore(now.minus(15, ChronoUnit.MINUTES)) == true
}
private fun serialize(data: Any?): String = data?.toString() ?: "{}"

View File

@ -12,7 +12,6 @@ import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.time.LocalDateTime
class ZDSTest {

View File

@ -22,7 +22,6 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
@ -138,7 +137,7 @@ class EventPollerImplementationTest : TestBase() {
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
init {
lastSeenTime = MyTime.UtcNow().plusSeconds(1)
lastSeenTime = MyTime.utcNow().plusSeconds(1)
}
}

View File

@ -8,12 +8,13 @@ import no.iktdev.eventi.InMemoryEventStore
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.time.LocalDateTime
import java.util.UUID
import kotlinx.coroutines.*
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
@ -59,9 +60,9 @@ class TestEvent : Event() {
}
class FakeClock(var now: LocalDateTime) {
class FakeClock(var now: Instant) {
fun advanceSeconds(sec: Long) {
now = now.plusSeconds(sec)
now = MyTime.utcNow().plusSeconds(sec)
}
}
@ -88,7 +89,7 @@ class RunSimulationTestTest {
}
}
private fun persistEvent(ref: UUID, time: LocalDateTime) {
private fun persistEvent(ref: UUID, time: Instant) {
val e = TestEvent().withReference(ref)
store.persist(e.setMetadata(Metadata()))
}
@ -96,14 +97,14 @@ class RunSimulationTestTest {
@Test
fun `poller updates lastSeenTime when dispatch happens`() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref, t)
poller.pollOnce()
advanceUntilIdle()
assertThat(poller.lastSeenTime).isAfter(LocalDateTime.of(1970,1,1,0,0))
assertThat(poller.lastSeenTime).isGreaterThan(Instant.EPOCH)
assertThat(dispatcher.dispatched).hasSize(1)
}
@ -116,7 +117,7 @@ class RunSimulationTestTest {
@Test
fun `poller DOES update lastSeenTime even when queue is busy`() = runTest {
val ref = UUID.randomUUID()
val t = LocalDateTime.of(2026,1,22,12,0,0)
val t = Instant.parse("2026-01-22T12:00:00Z")
store.persistAt(TestEvent().withReference(ref), t)
@ -129,7 +130,7 @@ class RunSimulationTestTest {
// Etter livelock-fixen skal lastSeenTime være *etter* eventet
assertThat(poller.lastSeenTime)
.isAfter(t)
.isGreaterThan(t)
}
@ -138,7 +139,7 @@ class RunSimulationTestTest {
@Test
fun `poller does not double-dispatch`() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref, t)
@ -155,7 +156,7 @@ class RunSimulationTestTest {
fun `poller handles multiple referenceIds`() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA, t)
persistEvent(refB, t.plusSeconds(1))
@ -170,7 +171,7 @@ class RunSimulationTestTest {
fun `poller handles identical timestamps`() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA, t)
persistEvent(refB, t)
@ -212,7 +213,7 @@ class RunSimulationTestTest {
@Test
fun `poller processes events arriving while queue is busy`() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t1 = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
val t1 = Instant.parse("2026-01-22T12:00:00Z")
val t2 = t1.plusSeconds(5)
persistEvent(ref, t1)

View File

@ -2,6 +2,7 @@ package no.iktdev.eventi.events.poller
import kotlinx.coroutines.test.*
import no.iktdev.eventi.InMemoryEventStore
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventDispatcher
import no.iktdev.eventi.events.EventTypeRegistry
@ -13,12 +14,13 @@ import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.eventi.stores.EventStore
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
import org.assertj.core.api.Assertions.assertThat
class PollerStartLoopTest: TestBase() {
@ -29,8 +31,8 @@ class PollerStartLoopTest: TestBase() {
private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue
private lateinit var poller: TestablePoller
private fun t(seconds: Long): LocalDateTime =
LocalDateTime.of(2024, 1, 1, 12, 0).plusSeconds(seconds)
private fun t(seconds: Long): Instant =
Instant.parse("2024-01-01T12:00:00Z").plusSeconds(seconds)
@BeforeEach
@ -45,7 +47,7 @@ class PollerStartLoopTest: TestBase() {
poller = TestablePoller(store, queue, dispatcher, scope)
}
private fun persistAt(ref: UUID, time: LocalDateTime) {
private fun persistAt(ref: UUID, time: Instant) {
val e = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e, time)
}
@ -80,7 +82,7 @@ class PollerStartLoopTest: TestBase() {
val before = poller.backoff
val ref = UUID.randomUUID()
persistAt(ref, LocalDateTime.now())
persistAt(ref, MyTime.utcNow())
poller.startFor(iterations = 1)
@ -93,7 +95,7 @@ class PollerStartLoopTest: TestBase() {
poller.startFor(iterations = 3)
persistAt(ref, LocalDateTime.now())
persistAt(ref, MyTime.utcNow())
poller.startFor(iterations = 1)
@ -108,7 +110,7 @@ class PollerStartLoopTest: TestBase() {
queue.busyRefs += ref
// Legg inn et event
val t = LocalDateTime.now()
val t = MyTime.utcNow()
persistAt(ref, t)
// Første poll: ingen dispatch fordi ref er busy
@ -152,7 +154,7 @@ class PollerStartLoopTest: TestBase() {
queue.busyRefs += ref
val t1 = LocalDateTime.now()
val t1 = MyTime.utcNow()
persistAt(ref, t1)
poller.startFor(iterations = 1)
@ -223,7 +225,7 @@ class PollerStartLoopTest: TestBase() {
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark
assertThat(poller.watermarkFor(refB)).isAfter(wmB1)
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
}
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@ -346,7 +348,7 @@ class PollerStartLoopTest: TestBase() {
// Fake EventStore som alltid returnerer samme event
val fakeStore = object : EventStore {
override fun getPersistedEventsAfter(ts: LocalDateTime): List<PersistedEvent> {
override fun getPersistedEventsAfter(ts: Instant): List<PersistedEvent> {
// Alltid returner én event som ligger før watermark
return listOf(
PersistedEvent(
@ -392,8 +394,8 @@ class PollerStartLoopTest: TestBase() {
poller.pollOnce()
// Fixen skal flytte lastSeenTime forbi eventen
assertThat(poller.lastSeenTime)
.isAfter(t(50))
assertThat<Instant>(poller.lastSeenTime)
.isGreaterThan(t(50))
// Andre poll: nå skal polleren IKKE spinne
val before = poller.lastSeenTime

View File

@ -6,7 +6,7 @@ import no.iktdev.eventi.events.EventDispatcher
import no.iktdev.eventi.events.EventPollerImplementation
import no.iktdev.eventi.events.SequenceDispatchQueue
import no.iktdev.eventi.stores.EventStore
import java.time.LocalDateTime
import java.time.Instant
import java.util.UUID
class TestablePoller(
@ -31,19 +31,19 @@ class TestablePoller(
}
}
override fun watermarkFor(ref: UUID): LocalDateTime? {
override fun watermarkFor(ref: UUID): Instant? {
return refWatermark[ref]?.let {
return it
}
}
override fun setWatermarkFor(ref: UUID, time: LocalDateTime) {
override fun setWatermarkFor(ref: UUID, time: Instant) {
refWatermark[ref] = time
}
}
interface WatermarkDebugView {
fun watermarkFor(ref: UUID): LocalDateTime?
fun setWatermarkFor(ref: UUID, time: LocalDateTime)
fun watermarkFor(ref: UUID): Instant?
fun setWatermarkFor(ref: UUID, time: Instant)
}