diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 0000000..005e87a
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,60 @@
+# This workflow uses actions that are not certified by GitHub.
+# They are provided by a third-party and are governed by
+# separate terms of service, privacy policy, and support
+# documentation.
+# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time
+# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle
+
+name: Build and Publish to reposilite
+
+on:
+ release:
+ types: [created]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up JDK 17
+ uses: actions/setup-java@v3
+ with:
+ java-version: '17'
+ distribution: 'zulu'
+
+ - name: Setup Gradle
+ uses: gradle/gradle-build-action@67421db6bd0bf253fb4bd25b31ebb98943c375e1
+
+ - name: Set executable permission on gradlew
+ run: chmod +x ./gradlew
+
+ - name: Test
+ working-directory: ${{ github.workspace }}
+ run: ls -la
+
+ - name: Set library version
+ working-directory: ${{ github.workspace }}
+ run: |
+ if [ -n "${{ github.event.release.tag_name }}" ]; then
+ version=$(echo ${{ github.event.release.tag_name }} | sed 's/^v//')
+ sed -i "s/version = \".*\"/version = \"$version\"/g" build.gradle.kts
+ grep -o "version = \"$version\"" build.gradle.kts
+ else
+ echo "No release tag found. Skipping library version update."
+ fi
+
+
+ - name: Initialize Gradle wrapper
+ run: ./gradlew wrapper
+
+ - name: Gradle Build
+ run: ./gradlew build
+
+
+ - name: Publish to Reposilite
+ run: ./gradlew publish
+ env:
+ reposiliteUsername: ${{ secrets.reposiliteUsername }}
+ reposilitePassword: ${{ secrets.reposilitePassword }}
diff --git a/.idea/copilot.data.migration.agent.xml b/.idea/copilot.data.migration.agent.xml
new file mode 100644
index 0000000..4ea72a9
--- /dev/null
+++ b/.idea/copilot.data.migration.agent.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.ask.xml b/.idea/copilot.data.migration.ask.xml
new file mode 100644
index 0000000..7ef04e2
--- /dev/null
+++ b/.idea/copilot.data.migration.ask.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.ask2agent.xml b/.idea/copilot.data.migration.ask2agent.xml
new file mode 100644
index 0000000..1f2ea11
--- /dev/null
+++ b/.idea/copilot.data.migration.ask2agent.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.edit.xml b/.idea/copilot.data.migration.edit.xml
new file mode 100644
index 0000000..8648f94
--- /dev/null
+++ b/.idea/copilot.data.migration.edit.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/build.gradle.kts b/build.gradle.kts
index b4c65dc..e04e874 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -1,9 +1,13 @@
+import java.io.ByteArrayOutputStream
+
plugins {
kotlin("jvm") version "2.2.10"
+ id("maven-publish")
}
group = "no.iktdev"
-version = "1.0-SNAPSHOT"
+version = "1.0-rc1"
+val named = "eventi"
repositories {
mavenCentral()
@@ -33,7 +37,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
-
+ testImplementation("io.mockk:mockk:1.13.5")
testImplementation("com.h2database:h2:2.2.220")
}
@@ -42,4 +46,81 @@ tasks.test {
}
kotlin {
jvmToolchain(21)
-}
\ No newline at end of file
+}
+
+val reposiliteUrl = if (version.toString().endsWith("SNAPSHOT")) {
+ "https://reposilite.iktdev.no/snapshots"
+} else {
+ "https://reposilite.iktdev.no/releases"
+}
+
+publishing {
+ publications {
+ create("reposilite") {
+ versionMapping {
+ usage("java-api") {
+ fromResolutionOf("runtimeClasspath")
+ }
+ usage("java-runtime") {
+ fromResolutionResult()
+ }
+ }
+ pom {
+ name.set(named)
+ version = project.version.toString()
+ url.set(reposiliteUrl)
+ }
+ from(components["kotlin"])
+ }
+ }
+ repositories {
+ mavenLocal()
+ maven {
+ name = named
+ url = uri(reposiliteUrl)
+ credentials {
+ username = System.getenv("reposiliteUsername")
+ password = System.getenv("reposilitePassword")
+ }
+ }
+ }
+}
+
+fun findLatestTag(): String {
+ val stdout = ByteArrayOutputStream()
+ exec {
+ commandLine = listOf("git", "describe", "--tags", "--abbrev=0")
+ standardOutput = stdout
+ isIgnoreExitValue = true
+ }
+ return stdout.toString().trim().removePrefix("v")
+}
+
+fun isSnapshotBuild(): Boolean {
+ // Use environment variable or branch name to detect snapshot
+ val ref = System.getenv("GITHUB_REF") ?: ""
+ return ref.endsWith("/master") || ref.endsWith("/main")
+}
+
+fun getCommitsSinceTag(tag: String): Int {
+ val stdout = ByteArrayOutputStream()
+ exec {
+ commandLine = listOf("git", "rev-list", "$tag..HEAD", "--count")
+ standardOutput = stdout
+ isIgnoreExitValue = true
+ }
+ return stdout.toString().trim().toIntOrNull() ?: 0
+}
+
+val latestTag = findLatestTag()
+val versionString = if (isSnapshotBuild()) {
+ val parts = latestTag.split(".")
+ val patch = parts.lastOrNull()?.toIntOrNull()?.plus(1) ?: 1
+ val base = if (parts.size >= 2) "${parts[0]}.${parts[1]}" else latestTag
+ val buildNumber = getCommitsSinceTag("v$latestTag")
+ "$base.$patch-SNAPSHOT-$buildNumber"
+} else {
+ latestTag
+}
+
+version = versionString
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/ListenerRegistryImplementation.kt b/src/main/kotlin/no/iktdev/eventi/ListenerRegistryImplementation.kt
new file mode 100644
index 0000000..6d4099a
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/ListenerRegistryImplementation.kt
@@ -0,0 +1,11 @@
+package no.iktdev.eventi
+
+abstract class ListenerRegistryImplementation {
+ private val listeners = mutableListOf()
+
+ fun registerListener(listener: T) {
+ listeners.add(listener)
+ }
+
+ fun getListeners(): List = listeners.toList()
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/TypeRegistryImplementation.kt b/src/main/kotlin/no/iktdev/eventi/TypeRegistryImplementation.kt
new file mode 100644
index 0000000..0437929
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/TypeRegistryImplementation.kt
@@ -0,0 +1,18 @@
+package no.iktdev.eventi
+
+abstract class TypeRegistryImplementation {
+ protected open val types = mutableMapOf>()
+
+ open fun register(clazz: Class) {
+ types[clazz.simpleName] = clazz
+ }
+ open fun register(clazzes: List>) {
+ clazzes.forEach { clazz ->
+ types[clazz.simpleName] = clazz
+ }
+ }
+
+ open fun resolve(name: String): Class? = types[name]
+
+ open fun all(): Map> = types.toMap()
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/ZDS.kt b/src/main/kotlin/no/iktdev/eventi/ZDS.kt
index b75efe0..abd94b4 100644
--- a/src/main/kotlin/no/iktdev/eventi/ZDS.kt
+++ b/src/main/kotlin/no/iktdev/eventi/ZDS.kt
@@ -9,7 +9,11 @@ import com.google.gson.JsonSerializationContext
import com.google.gson.JsonSerializer
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
+import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.PersistedEvent
+import no.iktdev.eventi.models.store.PersistedTask
+import no.iktdev.eventi.models.store.TaskStatus
+import no.iktdev.eventi.tasks.TaskTypeRegistry
import java.lang.reflect.Type
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
@@ -17,7 +21,7 @@ import java.time.format.DateTimeFormatter
object ZDS {
val gson = WGson.gson
- fun Event.toPersisted(id: Long, persistedAt: LocalDateTime): PersistedEvent {
+ fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent {
val payloadJson = gson.toJson(this)
val eventName = this::class.simpleName ?: error("Missing class name")
return PersistedEvent(
@@ -39,6 +43,34 @@ object ZDS {
return gson.fromJson(data, clazz)
}
+ fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask {
+ val payloadJson = gson.toJson(this)
+ val taskName = this::class.simpleName ?: error("Missing class name")
+ return PersistedTask(
+ id = id,
+ referenceId = referenceId,
+ taskId = taskId,
+ task = taskName,
+ data = payloadJson,
+ status = status,
+ claimed = false,
+ consumed = false,
+ claimedBy = null,
+ lastCheckIn = null,
+ persistedAt = persistedAt
+ )
+ }
+
+ fun PersistedTask.toTask(): Task? {
+ val clazz = TaskTypeRegistry.resolve(task)
+ ?: run {
+ //error("Unknown task type: $task")
+ return null
+ }
+ return gson.fromJson(data, clazz)
+ }
+
+
object WGson {
val gson = GsonBuilder()
diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt
index ffa6d75..3f1dfdf 100644
--- a/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt
+++ b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt
@@ -19,7 +19,7 @@ class EventDispatcher(val eventStore: EventStore) {
val result = listener.onEvent(candidate, events)
if (result != null) {
- eventStore.save(result)
+ eventStore.persist(result)
return
}
}
diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt b/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt
index 40bb8d0..042fb5b 100644
--- a/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt
+++ b/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt
@@ -1,11 +1,6 @@
package no.iktdev.eventi.events
-object EventListenerRegistry {
- private val listeners = mutableListOf()
+import no.iktdev.eventi.ListenerRegistryImplementation
- fun registerListener(listener: EventListener) {
- listeners.add(listener)
- }
-
- fun getListeners(): List = listeners.toList()
+object EventListenerRegistry: ListenerRegistryImplementation() {
}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt b/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt
index ed9a59a..5788fcf 100644
--- a/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt
+++ b/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt
@@ -1,31 +1,8 @@
package no.iktdev.eventi.events
+import no.iktdev.eventi.TypeRegistryImplementation
import no.iktdev.eventi.models.Event
-object EventTypeRegistry {
- private val types = mutableMapOf>()
-
- fun register(clazz: Class) {
- types[clazz.simpleName] = clazz
- }
- fun register(clazzes: List>) {
- clazzes.forEach { clazz ->
- types[clazz.simpleName] = clazz
- }
- }
-
- fun resolve(name: String): Class? = types[name]
-
- fun all(): Map> = types.toMap()
+object EventTypeRegistry: TypeRegistryImplementation() {
}
-
-abstract class EventTypeRegistration {
- init {
- definedTypes().forEach { clazz ->
- EventTypeRegistry.register(clazz)
- }
- }
-
- protected abstract fun definedTypes(): List>
-}
diff --git a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt
index 64bb4bd..cc832d7 100644
--- a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt
+++ b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt
@@ -17,6 +17,10 @@ class SequenceDispatchQueue(
private val semaphore = Semaphore(maxConcurrency)
private val active = ConcurrentHashMap.newKeySet()
+ fun _scope(): CoroutineScope {
+ return scope
+ }
+
fun isProcessing(referenceId: UUID): Boolean = referenceId in active
fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job? {
diff --git a/src/main/kotlin/no/iktdev/eventi/models/Event.kt b/src/main/kotlin/no/iktdev/eventi/models/Event.kt
index aa3f766..a52befc 100644
--- a/src/main/kotlin/no/iktdev/eventi/models/Event.kt
+++ b/src/main/kotlin/no/iktdev/eventi/models/Event.kt
@@ -1,6 +1,5 @@
package no.iktdev.eventi.models
-import java.time.LocalDateTime
import java.util.UUID
abstract class Event {
@@ -19,6 +18,19 @@ abstract class Event {
this.metadata = Metadata(derivedFromId = event.eventId)
}
+ fun producedFrom(task: Task) = apply {
+ this.referenceId = task.referenceId
+ this.metadata = Metadata(derivedFromId = task.taskId)
+ }
+
+ fun newReferenceId() = apply {
+ this.referenceId = UUID.randomUUID()
+ }
+
+ fun usingReferenceId(refId: UUID) = apply {
+ this.referenceId = refId
+ }
+
}
abstract class DeleteEvent: Event() {
@@ -26,8 +38,3 @@ abstract class DeleteEvent: Event() {
}
-open class Metadata(
- val created: LocalDateTime = LocalDateTime.now(), val derivedFromId: UUID? = null
-) {}
-
-
diff --git a/src/main/kotlin/no/iktdev/eventi/models/Metadata.kt b/src/main/kotlin/no/iktdev/eventi/models/Metadata.kt
new file mode 100644
index 0000000..641b096
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/models/Metadata.kt
@@ -0,0 +1,9 @@
+package no.iktdev.eventi.models
+
+import java.time.LocalDateTime
+import java.util.UUID
+
+open class Metadata(
+ val created: LocalDateTime = LocalDateTime.now(),
+ val derivedFromId: UUID? = null
+) {}
diff --git a/src/main/kotlin/no/iktdev/eventi/models/Task.kt b/src/main/kotlin/no/iktdev/eventi/models/Task.kt
index a9bcf04..856f98a 100644
--- a/src/main/kotlin/no/iktdev/eventi/models/Task.kt
+++ b/src/main/kotlin/no/iktdev/eventi/models/Task.kt
@@ -1,3 +1,25 @@
package no.iktdev.eventi.models
-class Task()
+import java.time.LocalDateTime
+import java.util.UUID
+
+
+abstract class Task {
+ lateinit var referenceId: UUID
+ protected set
+ val taskId: UUID = UUID.randomUUID()
+ var metadata: Metadata = Metadata()
+ protected set
+
+ @Transient
+ open val data: Any? = null
+
+ fun newReferenceId() = apply {
+ this.referenceId = UUID.randomUUID()
+ }
+
+ fun derivedOf(event: Event) = apply {
+ this.referenceId = event.referenceId
+ this.metadata = Metadata(derivedFromId = event.eventId)
+ }
+}
diff --git a/src/main/kotlin/no/iktdev/eventi/models/store/PersistedTask.kt b/src/main/kotlin/no/iktdev/eventi/models/store/PersistedTask.kt
new file mode 100644
index 0000000..2885741
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/models/store/PersistedTask.kt
@@ -0,0 +1,25 @@
+package no.iktdev.eventi.models.store
+
+import java.time.LocalDateTime
+import java.util.UUID
+
+data class PersistedTask(
+ val id: Long,
+ val referenceId: UUID,
+ val status: TaskStatus,
+ val taskId: UUID,
+ val task: String,
+ val data: String,
+ val claimed: Boolean,
+ val claimedBy: String? = null,
+ val consumed: Boolean,
+ val lastCheckIn: LocalDateTime? = null,
+ val persistedAt: LocalDateTime
+) {}
+
+enum class TaskStatus {
+ Pending,
+ InProgress,
+ Completed,
+ Failed
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt b/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt
index 5bcc3f7..98dee99 100644
--- a/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt
+++ b/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt
@@ -8,6 +8,6 @@ import java.util.UUID
interface EventStore {
fun getPersistedEventsAfter(timestamp: LocalDateTime): List
fun getPersistedEventsFor(referenceId: UUID): List
- fun save(event: Event)
+ fun persist(event: Event)
}
diff --git a/src/main/kotlin/no/iktdev/eventi/stores/TaskStore.kt b/src/main/kotlin/no/iktdev/eventi/stores/TaskStore.kt
new file mode 100644
index 0000000..02988d7
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/stores/TaskStore.kt
@@ -0,0 +1,21 @@
+package no.iktdev.eventi.stores
+
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.models.store.PersistedTask
+import java.time.Duration
+import java.util.UUID
+
+interface TaskStore {
+ fun persist(task: Task)
+
+ fun findByTaskId(taskId: UUID): PersistedTask?
+ fun findByEventId(eventId: UUID): List
+ fun findUnclaimed(referenceId: UUID): List
+
+ fun claim(taskId: UUID, workerId: String): Boolean
+ fun heartbeat(taskId: UUID)
+ fun markConsumed(taskId: UUID)
+ fun releaseExpiredTasks(timeout: Duration = Duration.ofMinutes(15))
+
+ fun getPendingTasks(): List
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt b/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt
new file mode 100644
index 0000000..fbb8dfe
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt
@@ -0,0 +1,51 @@
+package no.iktdev.eventi.tasks
+
+import kotlinx.coroutines.delay
+import no.iktdev.eventi.ZDS.toTask
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.stores.TaskStore
+import java.time.Duration
+
+abstract class AbstractTaskPoller(
+ private val taskStore: TaskStore,
+ private val reporterFactory: (Task) -> TaskReporter
+) {
+ var backoff = Duration.ofSeconds(2)
+ protected set
+ private val maxBackoff = Duration.ofMinutes(1)
+
+ suspend fun start() {
+ while (true) {
+ pollOnce()
+ }
+ }
+
+ suspend fun pollOnce() {
+ val newPersistedTasks = taskStore.getPendingTasks()
+
+ if (newPersistedTasks.isEmpty()) {
+ delay(backoff.toMillis())
+ backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
+ return
+ }
+ val tasks = newPersistedTasks.mapNotNull { it.toTask() }
+ var acceptedAny = false
+
+ for (task in tasks) {
+ val listener = TaskListenerRegistry.getListeners().firstOrNull { it.supports(task) && !it.isBusy } ?: continue
+ val claimed = taskStore.claim(task.taskId, listener.getWorkerId())
+ if (!claimed) continue
+
+ val reporter = reporterFactory(task)
+ val accepted = listener.accept(task, reporter)
+ acceptedAny = acceptedAny || accepted
+ }
+
+ if (!acceptedAny) {
+ delay(backoff.toMillis())
+ backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
+ } else {
+ backoff = Duration.ofSeconds(2)
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt
new file mode 100644
index 0000000..18d39f9
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt
@@ -0,0 +1,106 @@
+package no.iktdev.eventi.tasks
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.launch
+import no.iktdev.eventi.models.Event
+import no.iktdev.eventi.models.Task
+import java.util.UUID
+import kotlin.coroutines.cancellation.CancellationException
+
+/**
+ * Abstract base class for handling tasks with asynchronous processing and reporting.
+ *
+ * @param T The type of result produced by processing the task.
+ * @param reporter An instance of [TaskReporter] for reporting task status and events.
+ */
+abstract class TaskListener(val taskType: TaskType): TaskListenerImplementation {
+
+ init {
+ TaskListenerRegistry.registerListener(this)
+ }
+
+ var reporter: TaskReporter? = null
+ private set
+ abstract fun getWorkerId(): String
+ protected var currentJob: Job? = null
+ var currentTask: Task? = null
+ private set
+
+ open val isBusy: Boolean get() = currentJob?.isActive == true
+ val currentTaskId: UUID? get() = currentTask?.taskId
+
+ private fun getDispatcherForTask(task: Task): CoroutineScope {
+ return when (taskType) {
+ TaskType.CPU_INTENSIVE,
+ TaskType.MIXED -> CoroutineScope(Dispatchers.Default)
+ TaskType.IO_INTENSIVE -> CoroutineScope(Dispatchers.IO)
+ }
+ }
+
+ override fun accept(task: Task, reporter: TaskReporter): Boolean {
+ if (isBusy || !supports(task)) return false
+ this.reporter = reporter
+ currentTask = task
+ reporter.markClaimed(task.taskId, getWorkerId())
+
+ currentJob = getDispatcherForTask(task).launch {
+ try {
+ val result = onTask(task)
+ reporter.markConsumed(task.taskId)
+ onComplete(task, result)
+ } catch (e: CancellationException) {
+ onCancelled()
+ } catch (e: Exception) {
+ onError(task, e)
+ } finally {
+ currentJob = null
+ currentTask = null
+ this@TaskListener.reporter = null
+ }
+ }
+ return true
+ }
+
+ override fun onError(task: Task, exception: Exception) {
+ reporter?.log(task.taskId, "Error processing task: ${exception.message}")
+ exception.printStackTrace()
+ }
+
+ override fun onComplete(task: Task, result: T?) {
+ reporter?.markConsumed(task.taskId)
+ reporter?.log(task.taskId, "Task completed successfully.")
+ }
+
+ override fun onCancelled() {
+ currentJob?.cancel()
+ currentJob = null
+ currentTask = null
+ }
+}
+
+enum class TaskType {
+ CPU_INTENSIVE,
+ IO_INTENSIVE,
+ MIXED
+}
+
+
+interface TaskListenerImplementation {
+ fun supports(task: Task): Boolean
+ fun accept(task: Task, reporter: TaskReporter): Boolean
+ fun onTask(task: Task): T
+ fun onComplete(task: Task, result: T?)
+ fun onError(task: Task, exception: Exception)
+ fun onCancelled()
+}
+
+interface TaskReporter {
+ fun markClaimed(taskId: UUID, workerId: String)
+ fun updateLastSeen(taskId: UUID)
+ fun markConsumed(taskId: UUID)
+ fun updateProgress(taskId: UUID, progress: Int)
+ fun log(taskId: UUID, message: String)
+ fun publishEvent(event: Event)
+}
diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistry.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistry.kt
new file mode 100644
index 0000000..814da05
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistry.kt
@@ -0,0 +1,7 @@
+package no.iktdev.eventi.tasks
+
+import no.iktdev.eventi.ListenerRegistryImplementation
+
+object TaskListenerRegistry: ListenerRegistryImplementation>() {
+
+}
\ No newline at end of file
diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskTypeRegistry.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskTypeRegistry.kt
new file mode 100644
index 0000000..e6d6bae
--- /dev/null
+++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskTypeRegistry.kt
@@ -0,0 +1,7 @@
+package no.iktdev.eventi.tasks
+
+import no.iktdev.eventi.TypeRegistryImplementation
+import no.iktdev.eventi.models.Task
+
+object TaskTypeRegistry: TypeRegistryImplementation() {
+}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt
index 65f4db3..a790a13 100644
--- a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt
+++ b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt
@@ -19,11 +19,10 @@ import java.time.LocalDateTime
import java.util.UUID
class EventDispatcherTest: TestBase() {
- val dispatcher = EventDispatcher(store)
+ val dispatcher = EventDispatcher(eventStore)
class DerivedEvent(): Event()
class TriggerEvent(): Event() {
- fun usingReferenceId(id: UUID) = apply { referenceId = id }
}
class OtherEvent(): Event()
@@ -49,7 +48,7 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
- val produced = store.all().firstOrNull()
+ val produced = eventStore.all().firstOrNull()
assertNotNull(produced)
val event = produced!!.toEvent()
@@ -63,11 +62,11 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now())
- store.save(derived.toEvent()) // simulate prior production
+ eventStore.persist(derived.toEvent()) // simulate prior production
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent()))
- assertEquals(1, store.all().size) // no new event produced
+ assertEquals(1, eventStore.all().size) // no new event produced
}
@Test
@@ -87,16 +86,16 @@ class EventDispatcherTest: TestBase() {
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
- val replayContext = listOf(trigger) + store.all().map { it.toEvent() }
+ val replayContext = listOf(trigger) + eventStore.all().map { it.toEvent() }
dispatcher.dispatch(trigger.referenceId, replayContext)
- assertEquals(1, store.all().size) // no duplicate
+ assertEquals(1, eventStore.all().size) // no duplicate
}
@Test
fun `should not deliver deleted events as candidates`() {
- val dispatcher = EventDispatcher(store)
+ val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf()
object : EventListener() {
override fun onEvent(event: Event, history: List): Event? {
diff --git a/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt b/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt
index 0edd5c5..40b92ca 100644
--- a/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt
+++ b/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt
@@ -23,7 +23,7 @@ class EventTypeRegistryTest: TestBase() {
@Test
@DisplayName("Test EventTypeRegistry registration")
fun scenario1() {
- DefaultTestEvents()
+ registerEventTypes()
assertThat(EventTypeRegistry.resolve("EchoEvent")).isEqualTo(EchoEvent::class.java)
assertThat(EventTypeRegistry.resolve("StartEvent")).isEqualTo(StartEvent::class.java)
}
diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt
index fc59046..1dbc5ea 100644
--- a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt
+++ b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt
@@ -17,7 +17,7 @@ class InMemoryEventStore : EventStore {
override fun getPersistedEventsFor(referenceId: UUID): List =
persisted.filter { it.referenceId == referenceId }
- override fun save(event: Event) {
+ override fun persist(event: Event) {
val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now())
persisted += persistedEvent
}
diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt
new file mode 100644
index 0000000..86eb8c9
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt
@@ -0,0 +1,67 @@
+package no.iktdev.eventi
+
+import no.iktdev.eventi.ZDS.toPersisted
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.models.store.PersistedTask
+import no.iktdev.eventi.models.store.TaskStatus
+import no.iktdev.eventi.stores.TaskStore
+import java.time.Duration
+import java.time.LocalDateTime
+import java.util.UUID
+
+open class InMemoryTaskStore : TaskStore {
+ private val tasks = mutableListOf()
+ private var nextId = 1L
+
+ override fun persist(task: Task) {
+ val persistedTask = task.toPersisted(nextId++)
+ tasks += persistedTask
+ }
+
+ override fun findByTaskId(taskId: UUID) = tasks.find { it.taskId == taskId }
+
+ override fun findByEventId(eventId: UUID) =
+ tasks.filter { it.data.contains(eventId.toString()) }
+
+ override fun findUnclaimed(referenceId: UUID) =
+ tasks.filter { it.referenceId == referenceId && !it.claimed && !it.consumed }
+
+ override fun claim(taskId: UUID, workerId: String): Boolean {
+ val task = findByTaskId(taskId) ?: return false
+ if (task.claimed && !isExpired(task)) return false
+ update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = LocalDateTime.now()))
+ return true
+ }
+
+ override fun heartbeat(taskId: UUID) {
+ val task = findByTaskId(taskId) ?: return
+ update(task.copy(lastCheckIn = LocalDateTime.now()))
+ }
+
+ override fun markConsumed(taskId: UUID) {
+ val task = findByTaskId(taskId) ?: return
+ update(task.copy(consumed = true, status = TaskStatus.Completed))
+ }
+
+ override fun releaseExpiredTasks(timeout: Duration) {
+ val now = LocalDateTime.now()
+ tasks.filter {
+ it.claimed && !it.consumed && it.lastCheckIn?.isBefore(now.minus(timeout)) == true
+ }.forEach {
+ update(it.copy(claimed = false, claimedBy = null, lastCheckIn = null))
+ }
+ }
+
+ override fun getPendingTasks() = tasks.filter { !it.consumed }
+
+ private fun update(updated: PersistedTask) {
+ tasks.replaceAll { if (it.taskId == updated.taskId) updated else it }
+ }
+
+ private fun isExpired(task: PersistedTask): Boolean {
+ val now = LocalDateTime.now()
+ return task.lastCheckIn?.isBefore(now.minusMinutes(15)) == true
+ }
+
+ private fun serialize(data: Any?): String = data?.toString() ?: "{}"
+}
diff --git a/src/test/kotlin/no/iktdev/eventi/TestBase.kt b/src/test/kotlin/no/iktdev/eventi/TestBase.kt
index c858bc2..10396ae 100644
--- a/src/test/kotlin/no/iktdev/eventi/TestBase.kt
+++ b/src/test/kotlin/no/iktdev/eventi/TestBase.kt
@@ -1,25 +1,20 @@
package no.iktdev.eventi
import no.iktdev.eventi.events.EchoEvent
-import no.iktdev.eventi.events.EventTypeRegistration
+import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.events.StartEvent
-import no.iktdev.eventi.models.Event
open class TestBase {
- val store = InMemoryEventStore()
+ val eventStore = InMemoryEventStore()
+ val taskStore = InMemoryTaskStore()
- class DefaultTestEvents() : EventTypeRegistration() {
- override fun definedTypes(): List> {
- return listOf(
- EchoEvent::class.java,
- StartEvent::class.java
- )
- }
+ fun registerEventTypes() {
+ EventTypeRegistry.register(listOf(StartEvent::class.java, EchoEvent::class.java))
}
init {
- DefaultTestEvents()
+ registerEventTypes()
}
}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt
index 7c01a04..1292dda 100644
--- a/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt
+++ b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt
@@ -2,8 +2,11 @@ package no.iktdev.eventi
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.ZDS.toPersisted
+import no.iktdev.eventi.ZDS.toTask
import no.iktdev.eventi.events.EchoEvent
import no.iktdev.eventi.events.EventTypeRegistry
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeEach
@@ -16,17 +19,18 @@ class ZDSTest {
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
+ TaskTypeRegistry.wipe()
// Verifiser at det er tomt
assertNull(EventTypeRegistry.resolve("SomeEvent"))
}
@Test
- @DisplayName("Test ZDS")
+ @DisplayName("Test ZDS with Event object")
fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello")
- val persisted = echo.toPersisted(id = 1L, persistedAt = LocalDateTime.now())
+ val persisted = echo.toPersisted(id = 1L)
val restored = persisted.toEvent()
assert(restored is EchoEvent)
@@ -34,4 +38,27 @@ class ZDSTest {
}
+ data class TestTask(
+ override val data: String?
+ ): Task()
+
+ @Test
+ @DisplayName("Test ZDS with Task object")
+ fun scenario2() {
+
+ TaskTypeRegistry.register(TestTask::class.java)
+
+ val task = TestTask("Potato")
+ .newReferenceId()
+
+ val persisted = task.toPersisted(id = 1L)
+
+ val restored = persisted.toTask()
+ assert(restored is TestTask)
+ assert((restored as TestTask).data == "Potato")
+ assert(restored.metadata.created == task.metadata.created)
+ assert(restored.metadata.derivedFromId == task.metadata.derivedFromId)
+
+ }
+
}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt b/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt
index 8cab690..f29ae2a 100644
--- a/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt
+++ b/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt
@@ -1,36 +1,49 @@
package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.job
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
import no.iktdev.eventi.EventDispatcherTest
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event
+import no.iktdev.eventi.stores.EventStore
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import sun.rmi.transport.DGCAckHandler.received
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
class AbstractEventPollerTest : TestBase() {
- val dispatcher = EventDispatcher(store)
+ val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
- val poller = object : AbstractEventPoller(store, queue, dispatcher) {}
+ val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {}
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
- store.clear()
+ eventStore.clear()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf(
@@ -57,7 +70,7 @@ class AbstractEventPollerTest : TestBase() {
referenceIds.forEach { refId ->
val e = EventDispatcherTest.TriggerEvent().usingReferenceId(refId)
- store.save(e) // persistedAt settes automatisk her
+ eventStore.persist(e) // persistedAt settes automatisk her
completionMap[refId] = CompletableDeferred()
}
@@ -70,7 +83,7 @@ class AbstractEventPollerTest : TestBase() {
@Test
fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest {
- val testPoller = object : AbstractEventPoller(store, queue, dispatcher) {
+ val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff
}
@@ -83,7 +96,7 @@ class AbstractEventPollerTest : TestBase() {
assertTrue(afterSecond > afterFirst)
val e = TriggerEvent().usingReferenceId(UUID.randomUUID())
- store.save(e)
+ eventStore.persist(e)
testPoller.pollOnce()
val afterReset = testPoller.currentBackoff()
@@ -100,7 +113,7 @@ class AbstractEventPollerTest : TestBase() {
// Wipe alt før test
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
- store.clear() // sørg for at InMemoryEventStore støtter dette
+ eventStore.clear() // sørg for at InMemoryEventStore støtter dette
EventTypeRegistry.register(listOf(TriggerEvent::class.java))
@@ -113,7 +126,7 @@ class AbstractEventPollerTest : TestBase() {
}
repeat(3) {
- store.save(TriggerEvent().usingReferenceId(refId))
+ eventStore.persist(TriggerEvent().usingReferenceId(refId))
}
poller.pollOnce()
@@ -125,27 +138,80 @@ class AbstractEventPollerTest : TestBase() {
}
-
@Test
fun `pollOnce should ignore events before lastSeenTime`() = runTest {
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
- val testPoller = object : AbstractEventPoller(store, queue, dispatcher) {
+ val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
init {
lastSeenTime = LocalDateTime.now().plusSeconds(1)
}
}
- store.save(ignored)
+ eventStore.persist(ignored)
testPoller.pollOnce()
assertFalse(queue.isProcessing(refId))
}
+ @OptIn(ExperimentalCoroutinesApi::class)
+ @Test
+ fun `poller handles manually injected duplicate event`() = runTest {
+ EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
+ val channel = Channel(Channel.UNLIMITED)
+ val handled = mutableListOf()
+ // Setup
+ object : EventListener() {
+
+ override fun onEvent(event: Event, context: List): Event? {
+ if (event !is EchoEvent)
+ return null
+ handled += event
+ channel.trySend(event)
+ return MarcoEvent(true).derivedOf(event)
+ }
+ }
+
+ val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
+ }
+
+ // Original event
+ val original = EchoEvent(data = "Hello")
+ eventStore.persist(original)
+
+ // Act
+ poller.pollOnce()
+ withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(Duration.ofMinutes(1).toMillis()) {
+ repeat(1) { channel.receive() }
+ }
+ }
+
+ // Manual replay with new eventId, same referenceId
+ val duplicateEvent = EchoEvent("Test me").usingReferenceId(original.referenceId)
+
+ eventStore.persist(duplicateEvent)
+
+ // Act
+ poller.pollOnce()
+
+ withContext(Dispatchers.Default.limitedParallelism(1)) {
+ withTimeout(Duration.ofMinutes(1).toMillis()) {
+ repeat(1) { channel.receive() }
+ }
+ }
+
+
+
+ // Assert
+ assertEquals(2, handled.size)
+ assertTrue(handled.any { it.eventId == original.eventId })
+ }
+
diff --git a/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt b/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt
index 98a134b..55083e2 100644
--- a/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt
+++ b/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt
@@ -2,7 +2,6 @@ package no.iktdev.eventi.events
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.test.runTest
-import no.iktdev.eventi.EventDispatcherTest
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
@@ -14,8 +13,6 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
class SequenceDispatchQueueTest: TestBase() {
@@ -35,7 +32,7 @@ class SequenceDispatchQueueTest: TestBase() {
@Test
fun `should dispatch all referenceIds with limited concurrency`() = runTest {
- val dispatcher = EventDispatcher(store)
+ val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val dispatched = ConcurrentHashMap.newKeySet()
@@ -52,7 +49,7 @@ class SequenceDispatchQueueTest: TestBase() {
val jobs = referenceIds.mapNotNull { refId ->
val e = TriggerEvent().usingReferenceId(refId)
- store.save(e)
+ eventStore.persist(e)
queue.dispatch(refId, listOf(e), dispatcher)
}
diff --git a/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt b/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt
index 8bca478..75d14e8 100644
--- a/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt
+++ b/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt
@@ -8,4 +8,7 @@ class StartEvent(): Event() {
}
class EchoEvent(override var data: String): Event() {
+}
+
+class MarcoEvent(override val data: Boolean): Event() {
}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt
new file mode 100644
index 0000000..f7a8fcd
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt
@@ -0,0 +1,196 @@
+package no.iktdev.eventi.tasks
+
+import io.mockk.mockk
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runTest
+import no.iktdev.eventi.InMemoryTaskStore
+import no.iktdev.eventi.TestBase
+import no.iktdev.eventi.events.EventListener
+import no.iktdev.eventi.events.EventTypeRegistry
+import no.iktdev.eventi.models.Event
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.stores.TaskStore
+import no.iktdev.eventi.testUtil.multiply
+import no.iktdev.eventi.testUtil.wipe
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import java.time.Duration
+import java.util.UUID
+
+class AbstractTaskPollerTest : TestBase() {
+
+ @BeforeEach
+ fun setup() {
+ TaskListenerRegistry.wipe()
+ TaskTypeRegistry.wipe()
+ eventDeferred = CompletableDeferred()
+ }
+
+ private lateinit var eventDeferred: CompletableDeferred
+ val reporterFactory = { _: Task ->
+ object : TaskReporter {
+ override fun markClaimed(taskId: UUID, workerId: String) {}
+ override fun updateLastSeen(taskId: UUID) {}
+ override fun markConsumed(taskId: UUID) {}
+ override fun updateProgress(taskId: UUID, progress: Int) {}
+ override fun log(taskId: UUID, message: String) {}
+ override fun publishEvent(event: Event) {
+ eventDeferred.complete(event)
+ }
+ }
+ }
+
+ data class EchoTask(override var data: String?) : Task() {
+ }
+
+ data class EchoEvent(override var data: String) : Event() {
+ }
+
+ class TaskPollerTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): AbstractTaskPoller(taskStore, reporterFactory) {
+ fun overrideSetBackoff(duration: java.time.Duration) {
+ backoff = duration
+ }
+ }
+
+
+ open class EchoListener : TaskListener(TaskType.MIXED) {
+ var result: String? = null
+
+ override fun getWorkerId() = this.javaClass.simpleName
+
+ override fun supports(task: Task): Boolean {
+ return task is EchoTask
+ }
+
+ override fun onTask(task: Task): String {
+ if (task is EchoTask) {
+ return task.data + " Potetmos"
+ }
+ throw IllegalArgumentException("Unsupported task type: ${task::class.java}")
+ }
+
+ override fun onComplete(task: Task, result: String?) {
+ super.onComplete(task, result)
+ this.result = result;
+ reporter?.publishEvent(EchoEvent(result!!).producedFrom(task))
+ }
+
+ }
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ @Test
+ fun scenario1() = runTest {
+ // Register Task and Event
+ TaskTypeRegistry.register(EchoTask::class.java)
+ EventTypeRegistry.register(EchoEvent::class.java)
+
+ val listener = EchoListener()
+
+ val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
+
+ val task = EchoTask("Hello").newReferenceId()
+ taskStore.persist(task)
+ poller.pollOnce()
+ advanceUntilIdle()
+ val producedEvent = eventDeferred.await()
+ assertThat(producedEvent).isNotNull
+ assertThat(producedEvent!!.metadata.derivedFromId).isEqualTo(task.taskId)
+ assertThat(listener.result).isEqualTo("Hello Potetmos")
+ }
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ @Test
+ fun `poller resets backoff when task is accepted`() = runTest {
+ TaskTypeRegistry.register(EchoTask::class.java)
+ EventTypeRegistry.register(EchoEvent::class.java)
+
+ val listener = EchoListener()
+ val poller = TaskPollerTest(taskStore, reporterFactory)
+ val initialBackoff = poller.backoff
+
+ poller.overrideSetBackoff(Duration.ofSeconds(16))
+ val task = EchoTask("Hello").newReferenceId()
+ taskStore.persist(task)
+
+ poller.pollOnce()
+ advanceUntilIdle()
+
+ assertEquals(initialBackoff, poller.backoff)
+ assertEquals("Hello Potetmos", listener.result)
+ }
+
+ @Test
+ fun `poller increases backoff when no tasks`() = runTest {
+ val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
+ val initialBackoff = poller.backoff
+ val totalBackoff = initialBackoff.multiply(2)
+
+ poller.pollOnce()
+
+ assertEquals(totalBackoff, poller.backoff)
+ }
+
+
+ @Test
+ fun `poller increases backoff when no listener supports task`() = runTest {
+ val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
+ val initialBackoff = poller.backoff
+
+ // as long as the task is not added to registry this will be unsupported
+ val unsupportedTask = EchoTask("Hello").newReferenceId()
+ taskStore.persist(unsupportedTask)
+
+ poller.pollOnce()
+
+ assertEquals(initialBackoff.multiply(2), poller.backoff)
+ }
+
+ @Test
+ fun `poller increases backoff when listener is busy`() = runTest {
+ val busyListener = object : EchoListener() {
+ override val isBusy = true
+ }
+
+ val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
+ val intialBackoff = poller.backoff
+
+ val task = EchoTask("Busy").newReferenceId()
+ taskStore.persist(task)
+
+ poller.pollOnce()
+
+ assertEquals(intialBackoff.multiply(2), poller.backoff)
+ }
+
+ @Test
+ fun `poller increases backoff when task is not claimed`() = runTest {
+ val listener = EchoListener()
+ TaskTypeRegistry.register(EchoTask::class.java)
+ val task = EchoTask("Unclaimable").newReferenceId()
+ taskStore.persist(task)
+
+ // Simuler at claim alltid feiler
+ val failingStore = object : InMemoryTaskStore() {
+ override fun claim(taskId: UUID, workerId: String): Boolean = false
+ }
+ val pollerWithFailingClaim = object : AbstractTaskPoller(failingStore, reporterFactory) {}
+ val initialBackoff = pollerWithFailingClaim.backoff
+
+ failingStore.persist(task)
+
+ pollerWithFailingClaim.pollOnce()
+
+ assertEquals(initialBackoff.multiply(2), pollerWithFailingClaim.backoff)
+ }
+
+
+
+
+
+
+
+}
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/DurationUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/DurationUtil.kt
new file mode 100644
index 0000000..6add020
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/DurationUtil.kt
@@ -0,0 +1,8 @@
+package no.iktdev.eventi.testUtil
+
+import java.time.Duration
+
+
+fun Duration.multiply(factor: Int): Duration {
+ return Duration.ofNanos(this.toNanos() * factor)
+}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt
index a14da3e..7f3cc70 100644
--- a/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt
@@ -6,7 +6,9 @@ import org.assertj.core.api.Assertions.assertThat
import java.lang.reflect.Field
fun EventListenerRegistry.wipe() {
- val field: Field = EventListenerRegistry::class.java.getDeclaredField("listeners")
+ val field: Field = EventListenerRegistry::class.java
+ .superclass
+ .getDeclaredField("listeners")
field.isAccessible = true
// Tøm map’en
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt
index 3c9c1a8..14f018e 100644
--- a/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt
@@ -6,7 +6,9 @@ import org.junit.jupiter.api.Assertions.assertNull
import java.lang.reflect.Field
fun EventTypeRegistry.wipe() {
- val field: Field = EventTypeRegistry::class.java.getDeclaredField("types")
+ val field: Field = EventTypeRegistry::class.java
+ .superclass
+ .getDeclaredField("types")
field.isAccessible = true
// Tøm map’en
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/TaskListenerRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/TaskListenerRegistryUtil.kt
new file mode 100644
index 0000000..0aa6824
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/TaskListenerRegistryUtil.kt
@@ -0,0 +1,22 @@
+package no.iktdev.eventi.testUtil
+
+import no.iktdev.eventi.events.EventListener
+import no.iktdev.eventi.events.EventListenerRegistry
+import no.iktdev.eventi.tasks.TaskListener
+import no.iktdev.eventi.tasks.TaskListenerRegistry
+import org.assertj.core.api.Assertions.assertThat
+import java.lang.reflect.Field
+
+fun TaskListenerRegistry.wipe() {
+ val field: Field = TaskListenerRegistry::class.java
+ .superclass
+ .getDeclaredField("listeners")
+ field.isAccessible = true
+
+ // Tøm map’en
+ val mutableList = field.get(TaskListenerRegistry) as MutableList<*>
+ (mutableList as MutableList>>).clear()
+
+ // Verifiser at det er tomt
+ assertThat(TaskListenerRegistry.getListeners().isEmpty())
+}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/TaskTypeRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/TaskTypeRegistryUtil.kt
new file mode 100644
index 0000000..9963ea2
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/TaskTypeRegistryUtil.kt
@@ -0,0 +1,22 @@
+package no.iktdev.eventi.testUtil
+
+import no.iktdev.eventi.events.EventTypeRegistry
+import no.iktdev.eventi.models.Event
+import no.iktdev.eventi.models.Task
+import no.iktdev.eventi.tasks.TaskTypeRegistry
+import org.junit.jupiter.api.Assertions.assertNull
+import java.lang.reflect.Field
+
+fun TaskTypeRegistry.wipe() {
+ val field: Field = TaskTypeRegistry::class.java
+ .superclass
+ .getDeclaredField("types")
+ field.isAccessible = true
+
+ // Tøm map’en
+ val typesMap = field.get(TaskTypeRegistry) as MutableMap<*, *>
+ (typesMap as MutableMap>).clear()
+
+ // Verifiser at det er tomt
+ assertNull(TaskTypeRegistry.resolve("ANnonExistingEvent"))
+}
\ No newline at end of file
diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/UtilTest.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/UtilTest.kt
new file mode 100644
index 0000000..606163d
--- /dev/null
+++ b/src/test/kotlin/no/iktdev/eventi/testUtil/UtilTest.kt
@@ -0,0 +1,23 @@
+package no.iktdev.eventi.testUtil
+
+import no.iktdev.eventi.events.EventListenerRegistry
+import no.iktdev.eventi.events.EventTypeRegistry
+import no.iktdev.eventi.tasks.TaskListenerRegistry
+import no.iktdev.eventi.tasks.TaskTypeRegistry
+import org.junit.jupiter.api.DisplayName
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+
+class UtilTest {
+
+ @Test
+ @DisplayName("Test wipe function")
+ fun scenario1() {
+ assertDoesNotThrow {
+ EventTypeRegistry.wipe()
+ EventListenerRegistry.wipe()
+ TaskListenerRegistry.wipe()
+ TaskTypeRegistry.wipe()
+ }
+ }
+}
\ No newline at end of file