This commit is contained in:
Brage Skjønborg 2025-10-11 12:21:28 +02:00
parent 9de1600771
commit 0576b1134e
39 changed files with 972 additions and 86 deletions

60
.github/workflows/build.yml vendored Normal file
View File

@ -0,0 +1,60 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle
name: Build and Publish to reposilite
on:
release:
types: [created]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'zulu'
- name: Setup Gradle
uses: gradle/gradle-build-action@67421db6bd0bf253fb4bd25b31ebb98943c375e1
- name: Set executable permission on gradlew
run: chmod +x ./gradlew
- name: Test
working-directory: ${{ github.workspace }}
run: ls -la
- name: Set library version
working-directory: ${{ github.workspace }}
run: |
if [ -n "${{ github.event.release.tag_name }}" ]; then
version=$(echo ${{ github.event.release.tag_name }} | sed 's/^v//')
sed -i "s/version = \".*\"/version = \"$version\"/g" build.gradle.kts
grep -o "version = \"$version\"" build.gradle.kts
else
echo "No release tag found. Skipping library version update."
fi
- name: Initialize Gradle wrapper
run: ./gradlew wrapper
- name: Gradle Build
run: ./gradlew build
- name: Publish to Reposilite
run: ./gradlew publish
env:
reposiliteUsername: ${{ secrets.reposiliteUsername }}
reposilitePassword: ${{ secrets.reposilitePassword }}

6
.idea/copilot.data.migration.agent.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.ask.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AskMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Ask2AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.edit.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EditMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

View File

@ -1,9 +1,13 @@
import java.io.ByteArrayOutputStream
plugins {
kotlin("jvm") version "2.2.10"
id("maven-publish")
}
group = "no.iktdev"
version = "1.0-SNAPSHOT"
version = "1.0-rc1"
val named = "eventi"
repositories {
mavenCentral()
@ -33,7 +37,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
testImplementation("io.mockk:mockk:1.13.5")
testImplementation("com.h2database:h2:2.2.220")
}
@ -43,3 +47,80 @@ tasks.test {
kotlin {
jvmToolchain(21)
}
val reposiliteUrl = if (version.toString().endsWith("SNAPSHOT")) {
"https://reposilite.iktdev.no/snapshots"
} else {
"https://reposilite.iktdev.no/releases"
}
publishing {
publications {
create<MavenPublication>("reposilite") {
versionMapping {
usage("java-api") {
fromResolutionOf("runtimeClasspath")
}
usage("java-runtime") {
fromResolutionResult()
}
}
pom {
name.set(named)
version = project.version.toString()
url.set(reposiliteUrl)
}
from(components["kotlin"])
}
}
repositories {
mavenLocal()
maven {
name = named
url = uri(reposiliteUrl)
credentials {
username = System.getenv("reposiliteUsername")
password = System.getenv("reposilitePassword")
}
}
}
}
fun findLatestTag(): String {
val stdout = ByteArrayOutputStream()
exec {
commandLine = listOf("git", "describe", "--tags", "--abbrev=0")
standardOutput = stdout
isIgnoreExitValue = true
}
return stdout.toString().trim().removePrefix("v")
}
fun isSnapshotBuild(): Boolean {
// Use environment variable or branch name to detect snapshot
val ref = System.getenv("GITHUB_REF") ?: ""
return ref.endsWith("/master") || ref.endsWith("/main")
}
fun getCommitsSinceTag(tag: String): Int {
val stdout = ByteArrayOutputStream()
exec {
commandLine = listOf("git", "rev-list", "$tag..HEAD", "--count")
standardOutput = stdout
isIgnoreExitValue = true
}
return stdout.toString().trim().toIntOrNull() ?: 0
}
val latestTag = findLatestTag()
val versionString = if (isSnapshotBuild()) {
val parts = latestTag.split(".")
val patch = parts.lastOrNull()?.toIntOrNull()?.plus(1) ?: 1
val base = if (parts.size >= 2) "${parts[0]}.${parts[1]}" else latestTag
val buildNumber = getCommitsSinceTag("v$latestTag")
"$base.$patch-SNAPSHOT-$buildNumber"
} else {
latestTag
}
version = versionString

View File

@ -0,0 +1,11 @@
package no.iktdev.eventi
abstract class ListenerRegistryImplementation<T> {
private val listeners = mutableListOf<T>()
fun registerListener(listener: T) {
listeners.add(listener)
}
fun getListeners(): List<T> = listeners.toList()
}

View File

@ -0,0 +1,18 @@
package no.iktdev.eventi
abstract class TypeRegistryImplementation<T> {
protected open val types = mutableMapOf<String, Class<out T>>()
open fun register(clazz: Class<out T>) {
types[clazz.simpleName] = clazz
}
open fun register(clazzes: List<Class<out T>>) {
clazzes.forEach { clazz ->
types[clazz.simpleName] = clazz
}
}
open fun resolve(name: String): Class<out T>? = types[name]
open fun all(): Map<String, Class<out T>> = types.toMap()
}

View File

@ -9,7 +9,11 @@ import com.google.gson.JsonSerializationContext
import com.google.gson.JsonSerializer
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskTypeRegistry
import java.lang.reflect.Type
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
@ -17,7 +21,7 @@ import java.time.format.DateTimeFormatter
object ZDS {
val gson = WGson.gson
fun Event.toPersisted(id: Long, persistedAt: LocalDateTime): PersistedEvent {
fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent {
val payloadJson = gson.toJson(this)
val eventName = this::class.simpleName ?: error("Missing class name")
return PersistedEvent(
@ -39,6 +43,34 @@ object ZDS {
return gson.fromJson(data, clazz)
}
fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask {
val payloadJson = gson.toJson(this)
val taskName = this::class.simpleName ?: error("Missing class name")
return PersistedTask(
id = id,
referenceId = referenceId,
taskId = taskId,
task = taskName,
data = payloadJson,
status = status,
claimed = false,
consumed = false,
claimedBy = null,
lastCheckIn = null,
persistedAt = persistedAt
)
}
fun PersistedTask.toTask(): Task? {
val clazz = TaskTypeRegistry.resolve(task)
?: run {
//error("Unknown task type: $task")
return null
}
return gson.fromJson(data, clazz)
}
object WGson {
val gson = GsonBuilder()

View File

@ -19,7 +19,7 @@ class EventDispatcher(val eventStore: EventStore) {
val result = listener.onEvent(candidate, events)
if (result != null) {
eventStore.save(result)
eventStore.persist(result)
return
}
}

View File

@ -1,11 +1,6 @@
package no.iktdev.eventi.events
object EventListenerRegistry {
private val listeners = mutableListOf<EventListener>()
import no.iktdev.eventi.ListenerRegistryImplementation
fun registerListener(listener: EventListener) {
listeners.add(listener)
}
fun getListeners(): List<EventListener> = listeners.toList()
object EventListenerRegistry: ListenerRegistryImplementation<EventListener>() {
}

View File

@ -1,31 +1,8 @@
package no.iktdev.eventi.events
import no.iktdev.eventi.TypeRegistryImplementation
import no.iktdev.eventi.models.Event
object EventTypeRegistry {
private val types = mutableMapOf<String, Class<out Event>>()
fun register(clazz: Class<out Event>) {
types[clazz.simpleName] = clazz
}
fun register(clazzes: List<Class<out Event>>) {
clazzes.forEach { clazz ->
types[clazz.simpleName] = clazz
}
}
fun resolve(name: String): Class<out Event>? = types[name]
fun all(): Map<String, Class<out Event>> = types.toMap()
object EventTypeRegistry: TypeRegistryImplementation<Event>() {
}
abstract class EventTypeRegistration {
init {
definedTypes().forEach { clazz ->
EventTypeRegistry.register(clazz)
}
}
protected abstract fun definedTypes(): List<Class<out Event>>
}

View File

@ -17,6 +17,10 @@ class SequenceDispatchQueue(
private val semaphore = Semaphore(maxConcurrency)
private val active = ConcurrentHashMap.newKeySet<UUID>()
fun _scope(): CoroutineScope {
return scope
}
fun isProcessing(referenceId: UUID): Boolean = referenceId in active
fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job? {

View File

@ -1,6 +1,5 @@
package no.iktdev.eventi.models
import java.time.LocalDateTime
import java.util.UUID
abstract class Event {
@ -19,6 +18,19 @@ abstract class Event {
this.metadata = Metadata(derivedFromId = event.eventId)
}
fun producedFrom(task: Task) = apply {
this.referenceId = task.referenceId
this.metadata = Metadata(derivedFromId = task.taskId)
}
fun newReferenceId() = apply {
this.referenceId = UUID.randomUUID()
}
fun usingReferenceId(refId: UUID) = apply {
this.referenceId = refId
}
}
abstract class DeleteEvent: Event() {
@ -26,8 +38,3 @@ abstract class DeleteEvent: Event() {
}
open class Metadata(
val created: LocalDateTime = LocalDateTime.now(), val derivedFromId: UUID? = null
) {}

View File

@ -0,0 +1,9 @@
package no.iktdev.eventi.models
import java.time.LocalDateTime
import java.util.UUID
open class Metadata(
val created: LocalDateTime = LocalDateTime.now(),
val derivedFromId: UUID? = null
) {}

View File

@ -1,3 +1,25 @@
package no.iktdev.eventi.models
class Task()
import java.time.LocalDateTime
import java.util.UUID
abstract class Task {
lateinit var referenceId: UUID
protected set
val taskId: UUID = UUID.randomUUID()
var metadata: Metadata = Metadata()
protected set
@Transient
open val data: Any? = null
fun newReferenceId() = apply {
this.referenceId = UUID.randomUUID()
}
fun derivedOf(event: Event) = apply {
this.referenceId = event.referenceId
this.metadata = Metadata(derivedFromId = event.eventId)
}
}

View File

@ -0,0 +1,25 @@
package no.iktdev.eventi.models.store
import java.time.LocalDateTime
import java.util.UUID
data class PersistedTask(
val id: Long,
val referenceId: UUID,
val status: TaskStatus,
val taskId: UUID,
val task: String,
val data: String,
val claimed: Boolean,
val claimedBy: String? = null,
val consumed: Boolean,
val lastCheckIn: LocalDateTime? = null,
val persistedAt: LocalDateTime
) {}
enum class TaskStatus {
Pending,
InProgress,
Completed,
Failed
}

View File

@ -8,6 +8,6 @@ import java.util.UUID
interface EventStore {
fun getPersistedEventsAfter(timestamp: LocalDateTime): List<PersistedEvent>
fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent>
fun save(event: Event)
fun persist(event: Event)
}

View File

@ -0,0 +1,21 @@
package no.iktdev.eventi.stores
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.PersistedTask
import java.time.Duration
import java.util.UUID
interface TaskStore {
fun persist(task: Task)
fun findByTaskId(taskId: UUID): PersistedTask?
fun findByEventId(eventId: UUID): List<PersistedTask>
fun findUnclaimed(referenceId: UUID): List<PersistedTask>
fun claim(taskId: UUID, workerId: String): Boolean
fun heartbeat(taskId: UUID)
fun markConsumed(taskId: UUID)
fun releaseExpiredTasks(timeout: Duration = Duration.ofMinutes(15))
fun getPendingTasks(): List<PersistedTask>
}

View File

@ -0,0 +1,51 @@
package no.iktdev.eventi.tasks
import kotlinx.coroutines.delay
import no.iktdev.eventi.ZDS.toTask
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.stores.TaskStore
import java.time.Duration
abstract class AbstractTaskPoller(
private val taskStore: TaskStore,
private val reporterFactory: (Task) -> TaskReporter
) {
var backoff = Duration.ofSeconds(2)
protected set
private val maxBackoff = Duration.ofMinutes(1)
suspend fun start() {
while (true) {
pollOnce()
}
}
suspend fun pollOnce() {
val newPersistedTasks = taskStore.getPendingTasks()
if (newPersistedTasks.isEmpty()) {
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
return
}
val tasks = newPersistedTasks.mapNotNull { it.toTask() }
var acceptedAny = false
for (task in tasks) {
val listener = TaskListenerRegistry.getListeners().firstOrNull { it.supports(task) && !it.isBusy } ?: continue
val claimed = taskStore.claim(task.taskId, listener.getWorkerId())
if (!claimed) continue
val reporter = reporterFactory(task)
val accepted = listener.accept(task, reporter)
acceptedAny = acceptedAny || accepted
}
if (!acceptedAny) {
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
} else {
backoff = Duration.ofSeconds(2)
}
}
}

View File

@ -0,0 +1,106 @@
package no.iktdev.eventi.tasks
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException
/**
* Abstract base class for handling tasks with asynchronous processing and reporting.
*
* @param T The type of result produced by processing the task.
* @param reporter An instance of [TaskReporter] for reporting task status and events.
*/
abstract class TaskListener<T>(val taskType: TaskType): TaskListenerImplementation<T> {
init {
TaskListenerRegistry.registerListener(this)
}
var reporter: TaskReporter? = null
private set
abstract fun getWorkerId(): String
protected var currentJob: Job? = null
var currentTask: Task? = null
private set
open val isBusy: Boolean get() = currentJob?.isActive == true
val currentTaskId: UUID? get() = currentTask?.taskId
private fun getDispatcherForTask(task: Task): CoroutineScope {
return when (taskType) {
TaskType.CPU_INTENSIVE,
TaskType.MIXED -> CoroutineScope(Dispatchers.Default)
TaskType.IO_INTENSIVE -> CoroutineScope(Dispatchers.IO)
}
}
override fun accept(task: Task, reporter: TaskReporter): Boolean {
if (isBusy || !supports(task)) return false
this.reporter = reporter
currentTask = task
reporter.markClaimed(task.taskId, getWorkerId())
currentJob = getDispatcherForTask(task).launch {
try {
val result = onTask(task)
reporter.markConsumed(task.taskId)
onComplete(task, result)
} catch (e: CancellationException) {
onCancelled()
} catch (e: Exception) {
onError(task, e)
} finally {
currentJob = null
currentTask = null
this@TaskListener.reporter = null
}
}
return true
}
override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace()
}
override fun onComplete(task: Task, result: T?) {
reporter?.markConsumed(task.taskId)
reporter?.log(task.taskId, "Task completed successfully.")
}
override fun onCancelled() {
currentJob?.cancel()
currentJob = null
currentTask = null
}
}
enum class TaskType {
CPU_INTENSIVE,
IO_INTENSIVE,
MIXED
}
interface TaskListenerImplementation<T> {
fun supports(task: Task): Boolean
fun accept(task: Task, reporter: TaskReporter): Boolean
fun onTask(task: Task): T
fun onComplete(task: Task, result: T?)
fun onError(task: Task, exception: Exception)
fun onCancelled()
}
interface TaskReporter {
fun markClaimed(taskId: UUID, workerId: String)
fun updateLastSeen(taskId: UUID)
fun markConsumed(taskId: UUID)
fun updateProgress(taskId: UUID, progress: Int)
fun log(taskId: UUID, message: String)
fun publishEvent(event: Event)
}

View File

@ -0,0 +1,7 @@
package no.iktdev.eventi.tasks
import no.iktdev.eventi.ListenerRegistryImplementation
object TaskListenerRegistry: ListenerRegistryImplementation<TaskListener<*>>() {
}

View File

@ -0,0 +1,7 @@
package no.iktdev.eventi.tasks
import no.iktdev.eventi.TypeRegistryImplementation
import no.iktdev.eventi.models.Task
object TaskTypeRegistry: TypeRegistryImplementation<Task>() {
}

View File

@ -19,11 +19,10 @@ import java.time.LocalDateTime
import java.util.UUID
class EventDispatcherTest: TestBase() {
val dispatcher = EventDispatcher(store)
val dispatcher = EventDispatcher(eventStore)
class DerivedEvent(): Event()
class TriggerEvent(): Event() {
fun usingReferenceId(id: UUID) = apply { referenceId = id }
}
class OtherEvent(): Event()
@ -49,7 +48,7 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val produced = store.all().firstOrNull()
val produced = eventStore.all().firstOrNull()
assertNotNull(produced)
val event = produced!!.toEvent()
@ -63,11 +62,11 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now())
store.save(derived.toEvent()) // simulate prior production
eventStore.persist(derived.toEvent()) // simulate prior production
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent()))
assertEquals(1, store.all().size) // no new event produced
assertEquals(1, eventStore.all().size) // no new event produced
}
@Test
@ -87,16 +86,16 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val replayContext = listOf(trigger) + store.all().map { it.toEvent() }
val replayContext = listOf(trigger) + eventStore.all().map { it.toEvent() }
dispatcher.dispatch(trigger.referenceId, replayContext)
assertEquals(1, store.all().size) // no duplicate
assertEquals(1, eventStore.all().size) // no duplicate
}
@Test
fun `should not deliver deleted events as candidates`() {
val dispatcher = EventDispatcher(store)
val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {

View File

@ -23,7 +23,7 @@ class EventTypeRegistryTest: TestBase() {
@Test
@DisplayName("Test EventTypeRegistry registration")
fun scenario1() {
DefaultTestEvents()
registerEventTypes()
assertThat(EventTypeRegistry.resolve("EchoEvent")).isEqualTo(EchoEvent::class.java)
assertThat(EventTypeRegistry.resolve("StartEvent")).isEqualTo(StartEvent::class.java)
}

View File

@ -17,7 +17,7 @@ class InMemoryEventStore : EventStore {
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> =
persisted.filter { it.referenceId == referenceId }
override fun save(event: Event) {
override fun persist(event: Event) {
val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now())
persisted += persistedEvent
}

View File

@ -0,0 +1,67 @@
package no.iktdev.eventi
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
open class InMemoryTaskStore : TaskStore {
private val tasks = mutableListOf<PersistedTask>()
private var nextId = 1L
override fun persist(task: Task) {
val persistedTask = task.toPersisted(nextId++)
tasks += persistedTask
}
override fun findByTaskId(taskId: UUID) = tasks.find { it.taskId == taskId }
override fun findByEventId(eventId: UUID) =
tasks.filter { it.data.contains(eventId.toString()) }
override fun findUnclaimed(referenceId: UUID) =
tasks.filter { it.referenceId == referenceId && !it.claimed && !it.consumed }
override fun claim(taskId: UUID, workerId: String): Boolean {
val task = findByTaskId(taskId) ?: return false
if (task.claimed && !isExpired(task)) return false
update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = LocalDateTime.now()))
return true
}
override fun heartbeat(taskId: UUID) {
val task = findByTaskId(taskId) ?: return
update(task.copy(lastCheckIn = LocalDateTime.now()))
}
override fun markConsumed(taskId: UUID) {
val task = findByTaskId(taskId) ?: return
update(task.copy(consumed = true, status = TaskStatus.Completed))
}
override fun releaseExpiredTasks(timeout: Duration) {
val now = LocalDateTime.now()
tasks.filter {
it.claimed && !it.consumed && it.lastCheckIn?.isBefore(now.minus(timeout)) == true
}.forEach {
update(it.copy(claimed = false, claimedBy = null, lastCheckIn = null))
}
}
override fun getPendingTasks() = tasks.filter { !it.consumed }
private fun update(updated: PersistedTask) {
tasks.replaceAll { if (it.taskId == updated.taskId) updated else it }
}
private fun isExpired(task: PersistedTask): Boolean {
val now = LocalDateTime.now()
return task.lastCheckIn?.isBefore(now.minusMinutes(15)) == true
}
private fun serialize(data: Any?): String = data?.toString() ?: "{}"
}

View File

@ -1,25 +1,20 @@
package no.iktdev.eventi
import no.iktdev.eventi.events.EchoEvent
import no.iktdev.eventi.events.EventTypeRegistration
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.events.StartEvent
import no.iktdev.eventi.models.Event
open class TestBase {
val store = InMemoryEventStore()
val eventStore = InMemoryEventStore()
val taskStore = InMemoryTaskStore()
class DefaultTestEvents() : EventTypeRegistration() {
override fun definedTypes(): List<Class<out Event>> {
return listOf(
EchoEvent::class.java,
StartEvent::class.java
)
}
fun registerEventTypes() {
EventTypeRegistry.register(listOf(StartEvent::class.java, EchoEvent::class.java))
}
init {
DefaultTestEvents()
registerEventTypes()
}
}

View File

@ -2,8 +2,11 @@ package no.iktdev.eventi
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.ZDS.toTask
import no.iktdev.eventi.events.EchoEvent
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeEach
@ -16,17 +19,18 @@ class ZDSTest {
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
TaskTypeRegistry.wipe()
// Verifiser at det er tomt
assertNull(EventTypeRegistry.resolve("SomeEvent"))
}
@Test
@DisplayName("Test ZDS")
@DisplayName("Test ZDS with Event object")
fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello")
val persisted = echo.toPersisted(id = 1L, persistedAt = LocalDateTime.now())
val persisted = echo.toPersisted(id = 1L)
val restored = persisted.toEvent()
assert(restored is EchoEvent)
@ -34,4 +38,27 @@ class ZDSTest {
}
data class TestTask(
override val data: String?
): Task()
@Test
@DisplayName("Test ZDS with Task object")
fun scenario2() {
TaskTypeRegistry.register(TestTask::class.java)
val task = TestTask("Potato")
.newReferenceId()
val persisted = task.toPersisted(id = 1L)
val restored = persisted.toTask()
assert(restored is TestTask)
assert((restored as TestTask).data == "Potato")
assert(restored.metadata.created == task.metadata.created)
assert(restored.metadata.derivedFromId == task.metadata.derivedFromId)
}
}

View File

@ -1,36 +1,49 @@
package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import no.iktdev.eventi.EventDispatcherTest
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.stores.EventStore
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import sun.rmi.transport.DGCAckHandler.received
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
class AbstractEventPollerTest : TestBase() {
val dispatcher = EventDispatcher(store)
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : AbstractEventPoller(store, queue, dispatcher) {}
val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {}
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
store.clear()
eventStore.clear()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf(
@ -57,7 +70,7 @@ class AbstractEventPollerTest : TestBase() {
referenceIds.forEach { refId ->
val e = EventDispatcherTest.TriggerEvent().usingReferenceId(refId)
store.save(e) // persistedAt settes automatisk her
eventStore.persist(e) // persistedAt settes automatisk her
completionMap[refId] = CompletableDeferred()
}
@ -70,7 +83,7 @@ class AbstractEventPollerTest : TestBase() {
@Test
fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest {
val testPoller = object : AbstractEventPoller(store, queue, dispatcher) {
val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff
}
@ -83,7 +96,7 @@ class AbstractEventPollerTest : TestBase() {
assertTrue(afterSecond > afterFirst)
val e = TriggerEvent().usingReferenceId(UUID.randomUUID())
store.save(e)
eventStore.persist(e)
testPoller.pollOnce()
val afterReset = testPoller.currentBackoff()
@ -100,7 +113,7 @@ class AbstractEventPollerTest : TestBase() {
// Wipe alt før test
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
store.clear() // sørg for at InMemoryEventStore støtter dette
eventStore.clear() // sørg for at InMemoryEventStore støtter dette
EventTypeRegistry.register(listOf(TriggerEvent::class.java))
@ -113,7 +126,7 @@ class AbstractEventPollerTest : TestBase() {
}
repeat(3) {
store.save(TriggerEvent().usingReferenceId(refId))
eventStore.persist(TriggerEvent().usingReferenceId(refId))
}
poller.pollOnce()
@ -125,27 +138,80 @@ class AbstractEventPollerTest : TestBase() {
}
@Test
fun `pollOnce should ignore events before lastSeenTime`() = runTest {
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
val testPoller = object : AbstractEventPoller(store, queue, dispatcher) {
val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
init {
lastSeenTime = LocalDateTime.now().plusSeconds(1)
}
}
store.save(ignored)
eventStore.persist(ignored)
testPoller.pollOnce()
assertFalse(queue.isProcessing(refId))
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `poller handles manually injected duplicate event`() = runTest {
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
val channel = Channel<Event>(Channel.UNLIMITED)
val handled = mutableListOf<Event>()
// Setup
object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
if (event !is EchoEvent)
return null
handled += event
channel.trySend(event)
return MarcoEvent(true).derivedOf(event)
}
}
val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
}
// Original event
val original = EchoEvent(data = "Hello")
eventStore.persist(original)
// Act
poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) {
repeat(1) { channel.receive() }
}
}
// Manual replay with new eventId, same referenceId
val duplicateEvent = EchoEvent("Test me").usingReferenceId(original.referenceId)
eventStore.persist(duplicateEvent)
// Act
poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) {
repeat(1) { channel.receive() }
}
}
// Assert
assertEquals(2, handled.size)
assertTrue(handled.any { it.eventId == original.eventId })
}

View File

@ -2,7 +2,6 @@ package no.iktdev.eventi.events
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.EventDispatcherTest
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
@ -14,8 +13,6 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class SequenceDispatchQueueTest: TestBase() {
@ -35,7 +32,7 @@ class SequenceDispatchQueueTest: TestBase() {
@Test
fun `should dispatch all referenceIds with limited concurrency`() = runTest {
val dispatcher = EventDispatcher(store)
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
@ -52,7 +49,7 @@ class SequenceDispatchQueueTest: TestBase() {
val jobs = referenceIds.mapNotNull { refId ->
val e = TriggerEvent().usingReferenceId(refId)
store.save(e)
eventStore.persist(e)
queue.dispatch(refId, listOf(e), dispatcher)
}

View File

@ -9,3 +9,6 @@ class StartEvent(): Event() {
class EchoEvent(override var data: String): Event() {
}
class MarcoEvent(override val data: Boolean): Event() {
}

View File

@ -0,0 +1,196 @@
package no.iktdev.eventi.tasks
import io.mockk.mockk
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.InMemoryTaskStore
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.stores.TaskStore
import no.iktdev.eventi.testUtil.multiply
import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.time.Duration
import java.util.UUID
class AbstractTaskPollerTest : TestBase() {
@BeforeEach
fun setup() {
TaskListenerRegistry.wipe()
TaskTypeRegistry.wipe()
eventDeferred = CompletableDeferred()
}
private lateinit var eventDeferred: CompletableDeferred<Event>
val reporterFactory = { _: Task ->
object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markConsumed(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {
eventDeferred.complete(event)
}
}
}
data class EchoTask(override var data: String?) : Task() {
}
data class EchoEvent(override var data: String) : Event() {
}
class TaskPollerTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): AbstractTaskPoller(taskStore, reporterFactory) {
fun overrideSetBackoff(duration: java.time.Duration) {
backoff = duration
}
}
open class EchoListener : TaskListener<String>(TaskType.MIXED) {
var result: String? = null
override fun getWorkerId() = this.javaClass.simpleName
override fun supports(task: Task): Boolean {
return task is EchoTask
}
override fun onTask(task: Task): String {
if (task is EchoTask) {
return task.data + " Potetmos"
}
throw IllegalArgumentException("Unsupported task type: ${task::class.java}")
}
override fun onComplete(task: Task, result: String?) {
super.onComplete(task, result)
this.result = result;
reporter?.publishEvent(EchoEvent(result!!).producedFrom(task))
}
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun scenario1() = runTest {
// Register Task and Event
TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java)
val listener = EchoListener()
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId()
taskStore.persist(task)
poller.pollOnce()
advanceUntilIdle()
val producedEvent = eventDeferred.await()
assertThat(producedEvent).isNotNull
assertThat(producedEvent!!.metadata.derivedFromId).isEqualTo(task.taskId)
assertThat(listener.result).isEqualTo("Hello Potetmos")
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `poller resets backoff when task is accepted`() = runTest {
TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java)
val listener = EchoListener()
val poller = TaskPollerTest(taskStore, reporterFactory)
val initialBackoff = poller.backoff
poller.overrideSetBackoff(Duration.ofSeconds(16))
val task = EchoTask("Hello").newReferenceId()
taskStore.persist(task)
poller.pollOnce()
advanceUntilIdle()
assertEquals(initialBackoff, poller.backoff)
assertEquals("Hello Potetmos", listener.result)
}
@Test
fun `poller increases backoff when no tasks`() = runTest {
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
val totalBackoff = initialBackoff.multiply(2)
poller.pollOnce()
assertEquals(totalBackoff, poller.backoff)
}
@Test
fun `poller increases backoff when no listener supports task`() = runTest {
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
// as long as the task is not added to registry this will be unsupported
val unsupportedTask = EchoTask("Hello").newReferenceId()
taskStore.persist(unsupportedTask)
poller.pollOnce()
assertEquals(initialBackoff.multiply(2), poller.backoff)
}
@Test
fun `poller increases backoff when listener is busy`() = runTest {
val busyListener = object : EchoListener() {
override val isBusy = true
}
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val intialBackoff = poller.backoff
val task = EchoTask("Busy").newReferenceId()
taskStore.persist(task)
poller.pollOnce()
assertEquals(intialBackoff.multiply(2), poller.backoff)
}
@Test
fun `poller increases backoff when task is not claimed`() = runTest {
val listener = EchoListener()
TaskTypeRegistry.register(EchoTask::class.java)
val task = EchoTask("Unclaimable").newReferenceId()
taskStore.persist(task)
// Simuler at claim alltid feiler
val failingStore = object : InMemoryTaskStore() {
override fun claim(taskId: UUID, workerId: String): Boolean = false
}
val pollerWithFailingClaim = object : AbstractTaskPoller(failingStore, reporterFactory) {}
val initialBackoff = pollerWithFailingClaim.backoff
failingStore.persist(task)
pollerWithFailingClaim.pollOnce()
assertEquals(initialBackoff.multiply(2), pollerWithFailingClaim.backoff)
}
}

View File

@ -0,0 +1,8 @@
package no.iktdev.eventi.testUtil
import java.time.Duration
fun Duration.multiply(factor: Int): Duration {
return Duration.ofNanos(this.toNanos() * factor)
}

View File

@ -6,7 +6,9 @@ import org.assertj.core.api.Assertions.assertThat
import java.lang.reflect.Field
fun EventListenerRegistry.wipe() {
val field: Field = EventListenerRegistry::class.java.getDeclaredField("listeners")
val field: Field = EventListenerRegistry::class.java
.superclass
.getDeclaredField("listeners")
field.isAccessible = true
// Tøm mapen

View File

@ -6,7 +6,9 @@ import org.junit.jupiter.api.Assertions.assertNull
import java.lang.reflect.Field
fun EventTypeRegistry.wipe() {
val field: Field = EventTypeRegistry::class.java.getDeclaredField("types")
val field: Field = EventTypeRegistry::class.java
.superclass
.getDeclaredField("types")
field.isAccessible = true
// Tøm mapen

View File

@ -0,0 +1,22 @@
package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskListenerRegistry
import org.assertj.core.api.Assertions.assertThat
import java.lang.reflect.Field
fun TaskListenerRegistry.wipe() {
val field: Field = TaskListenerRegistry::class.java
.superclass
.getDeclaredField("listeners")
field.isAccessible = true
// Tøm mapen
val mutableList = field.get(TaskListenerRegistry) as MutableList<*>
(mutableList as MutableList<Class<out TaskListener<*>>>).clear()
// Verifiser at det er tomt
assertThat(TaskListenerRegistry.getListeners().isEmpty())
}

View File

@ -0,0 +1,22 @@
package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.tasks.TaskTypeRegistry
import org.junit.jupiter.api.Assertions.assertNull
import java.lang.reflect.Field
fun TaskTypeRegistry.wipe() {
val field: Field = TaskTypeRegistry::class.java
.superclass
.getDeclaredField("types")
field.isAccessible = true
// Tøm mapen
val typesMap = field.get(TaskTypeRegistry) as MutableMap<*, *>
(typesMap as MutableMap<String, Class<out Task>>).clear()
// Verifiser at det er tomt
assertNull(TaskTypeRegistry.resolve("ANnonExistingEvent"))
}

View File

@ -0,0 +1,23 @@
package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.tasks.TaskListenerRegistry
import no.iktdev.eventi.tasks.TaskTypeRegistry
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
class UtilTest {
@Test
@DisplayName("Test wipe function")
fun scenario1() {
assertDoesNotThrow {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
TaskListenerRegistry.wipe()
TaskTypeRegistry.wipe()
}
}
}