commit 9de1600771ee1e0a96824bb33150253ea9fcc7d4 Author: Brage Skjønborg Date: Tue Oct 7 21:07:44 2025 +0200 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b1dff0d --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Kotlin ### +.kotlin + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/gradle.xml b/.idea/gradle.xml new file mode 100644 index 0000000..2a65317 --- /dev/null +++ b/.idea/gradle.xml @@ -0,0 +1,17 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml new file mode 100644 index 0000000..254a1fc --- /dev/null +++ b/.idea/kotlinc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..0cbadba --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts new file mode 100644 index 0000000..b4c65dc --- /dev/null +++ b/build.gradle.kts @@ -0,0 +1,45 @@ +plugins { + kotlin("jvm") version "2.2.10" +} + +group = "no.iktdev" +version = "1.0-SNAPSHOT" + +repositories { + mavenCentral() +} + +val exposedVersion = "0.61.0" + +dependencies { + + implementation ("mysql:mysql-connector-java:8.0.29") + implementation("org.jetbrains.exposed:exposed-core:${exposedVersion}") + implementation("org.jetbrains.exposed:exposed-dao:${exposedVersion}") + implementation("org.jetbrains.exposed:exposed-jdbc:${exposedVersion}") + implementation("org.jetbrains.exposed:exposed-java-time:${exposedVersion}") + + implementation("com.google.code.gson:gson:2.8.9") + + + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2") + + //testImplementation(kotlin("test")) + testImplementation("org.assertj:assertj-core:3.4.1") + + testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2") + testImplementation(platform("org.junit:junit-bom:5.9.1")) + testImplementation("org.junit.jupiter:junit-jupiter") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2") + + + testImplementation("com.h2database:h2:2.2.220") +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(21) +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..7fc6f1f --- /dev/null +++ b/gradle.properties @@ -0,0 +1 @@ +kotlin.code.style=official diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..249e583 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..851a0d5 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sat Oct 04 23:50:48 CEST 2025 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..1b6c787 --- /dev/null +++ b/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle.kts b/settings.gradle.kts new file mode 100644 index 0000000..02e7d3d --- /dev/null +++ b/settings.gradle.kts @@ -0,0 +1,4 @@ +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0" +} +rootProject.name = "Eventi" \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/ZDS.kt b/src/main/kotlin/no/iktdev/eventi/ZDS.kt new file mode 100644 index 0000000..b75efe0 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/ZDS.kt @@ -0,0 +1,67 @@ +package no.iktdev.eventi + +import com.google.gson.GsonBuilder +import com.google.gson.JsonDeserializationContext +import com.google.gson.JsonDeserializer +import com.google.gson.JsonElement +import com.google.gson.JsonPrimitive +import com.google.gson.JsonSerializationContext +import com.google.gson.JsonSerializer +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import java.lang.reflect.Type +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter + +object ZDS { + val gson = WGson.gson + + fun Event.toPersisted(id: Long, persistedAt: LocalDateTime): PersistedEvent { + val payloadJson = gson.toJson(this) + val eventName = this::class.simpleName ?: error("Missing class name") + return PersistedEvent( + id = id, + referenceId = referenceId, + eventId = eventId, + event = eventName, + data = payloadJson, + persistedAt = persistedAt + ) + } + + /** + * Convert a PersistedEvent back to its original Event type using the event type registry and Gson for deserialization. + */ + fun PersistedEvent.toEvent(): Event { + val clazz = EventTypeRegistry.resolve(event) + ?: error("Unknown event type: $event") + return gson.fromJson(data, clazz) + } + + + object WGson { + val gson = GsonBuilder() + .registerTypeAdapter(LocalDateTime::class.java, LocalDateTimeAdapter()) + .create() + fun toJson(data: Any?): String { + return gson.toJson(data) + } + + class LocalDateTimeAdapter : JsonSerializer, JsonDeserializer { + private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME + + override fun serialize( + src: LocalDateTime, typeOfSrc: Type, context: JsonSerializationContext + ): JsonElement { + return JsonPrimitive(src.format(formatter)) + } + + override fun deserialize( + json: JsonElement, typeOfT: Type, context: JsonDeserializationContext + ): LocalDateTime { + return LocalDateTime.parse(json.asString, formatter) + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt b/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt new file mode 100644 index 0000000..faf4d2b --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt @@ -0,0 +1,52 @@ +package no.iktdev.eventi.events + +import kotlinx.coroutines.delay +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.stores.EventStore +import java.time.Duration +import java.time.LocalDateTime +import java.util.UUID +import kotlin.collections.iterator + +abstract class AbstractEventPoller( + private val eventStore: EventStore, + private val dispatchQueue: SequenceDispatchQueue, + private val dispatcher: EventDispatcher +) { + var lastSeenTime: LocalDateTime = LocalDateTime.MIN + var backoff = Duration.ofSeconds(2) + private set + private val maxBackoff = Duration.ofMinutes(1) + + + suspend fun start() { + while (true) { + pollOnce() + } + } + + suspend fun pollOnce() { + val newPersisted = eventStore.getPersistedEventsAfter(lastSeenTime) + + if (newPersisted.isEmpty()) { + delay(backoff.toMillis()) + backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) + return + } + + backoff = Duration.ofSeconds(2) + + val grouped = newPersisted.groupBy { it.referenceId } + + for ((referenceId, _) in grouped) { + if (dispatchQueue.isProcessing(referenceId)) continue + + val fullLog = eventStore.getPersistedEventsFor(referenceId) + val events = fullLog.map { it.toEvent() } + + dispatchQueue.dispatch(referenceId, events, dispatcher) + lastSeenTime = fullLog.maxOf { it.persistedAt } + } + } + +} diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt new file mode 100644 index 0000000..ffa6d75 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt @@ -0,0 +1,28 @@ +package no.iktdev.eventi.events + +import no.iktdev.eventi.models.DeleteEvent +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.stores.EventStore +import java.util.UUID + +class EventDispatcher(val eventStore: EventStore) { + + fun dispatch(referenceId: UUID, events: List) { + val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.toSet() + val deletedEventIds = events.filterIsInstance().map { it.deletedEventId } + val candidates = events + .filter { it.eventId !in derivedFromIds } + .filter { it.eventId !in deletedEventIds } + + EventListenerRegistry.getListeners().forEach { listener -> + for (candidate in candidates) { + val result = listener.onEvent(candidate, events) + if (result != null) { + + eventStore.save(result) + return + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventListener.kt b/src/main/kotlin/no/iktdev/eventi/events/EventListener.kt new file mode 100644 index 0000000..c01283c --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/EventListener.kt @@ -0,0 +1,14 @@ +package no.iktdev.eventi.events + +import no.iktdev.eventi.models.Event + +abstract class EventListener: EventListenerImplementation { + init { + EventListenerRegistry.registerListener(this) + } + +} + +interface EventListenerImplementation { + fun onEvent(event: Event, history: List): Event? +} \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt b/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt new file mode 100644 index 0000000..40bb8d0 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/EventListenerRegistry.kt @@ -0,0 +1,11 @@ +package no.iktdev.eventi.events + +object EventListenerRegistry { + private val listeners = mutableListOf() + + fun registerListener(listener: EventListener) { + listeners.add(listener) + } + + fun getListeners(): List = listeners.toList() +} \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt b/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt new file mode 100644 index 0000000..ed9a59a --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/EventTypeRegistry.kt @@ -0,0 +1,31 @@ +package no.iktdev.eventi.events + +import no.iktdev.eventi.models.Event + +object EventTypeRegistry { + private val types = mutableMapOf>() + + fun register(clazz: Class) { + types[clazz.simpleName] = clazz + } + fun register(clazzes: List>) { + clazzes.forEach { clazz -> + types[clazz.simpleName] = clazz + } + } + + fun resolve(name: String): Class? = types[name] + + fun all(): Map> = types.toMap() +} + + +abstract class EventTypeRegistration { + init { + definedTypes().forEach { clazz -> + EventTypeRegistry.register(clazz) + } + } + + protected abstract fun definedTypes(): List> +} diff --git a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt new file mode 100644 index 0000000..64bb4bd --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt @@ -0,0 +1,40 @@ +package no.iktdev.eventi.events + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import no.iktdev.eventi.models.Event +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +class SequenceDispatchQueue( + private val maxConcurrency: Int = 8, + private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) +) { + private val semaphore = Semaphore(maxConcurrency) + private val active = ConcurrentHashMap.newKeySet() + + fun isProcessing(referenceId: UUID): Boolean = referenceId in active + + fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job? { + if (!active.add(referenceId)) return null // already processing + + return scope.launch { + try { + semaphore.acquire() + try { + dispatcher.dispatch(referenceId, events) + } catch (e: Exception) { + println("Dispatch failed for $referenceId: ${e.message}") + } finally { + semaphore.release() + } + } finally { + active.remove(referenceId) + } + } + } +} diff --git a/src/main/kotlin/no/iktdev/eventi/models/Event.kt b/src/main/kotlin/no/iktdev/eventi/models/Event.kt new file mode 100644 index 0000000..aa3f766 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/models/Event.kt @@ -0,0 +1,33 @@ +package no.iktdev.eventi.models + +import java.time.LocalDateTime +import java.util.UUID + +abstract class Event { + var referenceId: UUID = UUID.randomUUID() + protected set + var eventId: UUID = UUID.randomUUID() + private set + var metadata: Metadata = Metadata() + protected set + + @Transient + open val data: Any? = null + + fun derivedOf(event: Event) = apply { + this.referenceId = event.referenceId + this.metadata = Metadata(derivedFromId = event.eventId) + } + +} + +abstract class DeleteEvent: Event() { + open lateinit var deletedEventId: UUID +} + + +open class Metadata( + val created: LocalDateTime = LocalDateTime.now(), val derivedFromId: UUID? = null +) {} + + diff --git a/src/main/kotlin/no/iktdev/eventi/models/Task.kt b/src/main/kotlin/no/iktdev/eventi/models/Task.kt new file mode 100644 index 0000000..a9bcf04 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/models/Task.kt @@ -0,0 +1,3 @@ +package no.iktdev.eventi.models + +class Task() diff --git a/src/main/kotlin/no/iktdev/eventi/models/store/PersistedEvent.kt b/src/main/kotlin/no/iktdev/eventi/models/store/PersistedEvent.kt new file mode 100644 index 0000000..43b75f6 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/models/store/PersistedEvent.kt @@ -0,0 +1,13 @@ +package no.iktdev.eventi.models.store + +import java.time.LocalDateTime +import java.util.UUID + +data class PersistedEvent( + val id: Long, + val referenceId: UUID, + val eventId: UUID, + val event: String, + val data: String, + val persistedAt: LocalDateTime +) \ No newline at end of file diff --git a/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt b/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt new file mode 100644 index 0000000..5bcc3f7 --- /dev/null +++ b/src/main/kotlin/no/iktdev/eventi/stores/EventStore.kt @@ -0,0 +1,13 @@ +package no.iktdev.eventi.stores + +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import java.time.LocalDateTime +import java.util.UUID + +interface EventStore { + fun getPersistedEventsAfter(timestamp: LocalDateTime): List + fun getPersistedEventsFor(referenceId: UUID): List + fun save(event: Event) +} + diff --git a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt new file mode 100644 index 0000000..65f4db3 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt @@ -0,0 +1,165 @@ +package no.iktdev.eventi + +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.ZDS.toPersisted +import no.iktdev.eventi.events.EventDispatcher +import no.iktdev.eventi.events.EventListener +import no.iktdev.eventi.events.EventListenerRegistry +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.models.DeleteEvent +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.testUtil.wipe +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.LocalDateTime +import java.util.UUID + +class EventDispatcherTest: TestBase() { + val dispatcher = EventDispatcher(store) + + class DerivedEvent(): Event() + class TriggerEvent(): Event() { + fun usingReferenceId(id: UUID) = apply { referenceId = id } + } + class OtherEvent(): Event() + + + @BeforeEach + fun setup() { + EventTypeRegistry.wipe() + EventListenerRegistry.wipe() + // Verifiser at det er tomt + + EventTypeRegistry.register(listOf( + DerivedEvent::class.java, + TriggerEvent::class.java, + OtherEvent::class.java + )) + } + + + @Test + fun `should produce one event and stop`() { + val listener = ProducingListener() + + val trigger = TriggerEvent() + dispatcher.dispatch(trigger.referenceId, listOf(trigger)) + + val produced = store.all().firstOrNull() + assertNotNull(produced) + + val event = produced!!.toEvent() + assertEquals(trigger.eventId, event.metadata.derivedFromId) + assertTrue(event is DerivedEvent) + } + + @Test + fun `should skip already derived events`() { + val listener = ProducingListener() + + val trigger = TriggerEvent() + val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now()) + store.save(derived.toEvent()) // simulate prior production + + dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent())) + + assertEquals(1, store.all().size) // no new event produced + } + + @Test + fun `should pass full context to listener`() { + val listener = ContextCapturingListener() + + val e1 = TriggerEvent() + val e2 = OtherEvent() + dispatcher.dispatch(e1.referenceId, listOf(e1, e2)) + + assertEquals(2, listener.context.size) + } + + @Test + fun `should behave deterministically across replays`() { + val listener = ProducingListener() + + val trigger = TriggerEvent() + dispatcher.dispatch(trigger.referenceId, listOf(trigger)) + val replayContext = listOf(trigger) + store.all().map { it.toEvent() } + + dispatcher.dispatch(trigger.referenceId, replayContext) + + assertEquals(1, store.all().size) // no duplicate + } + + @Test + fun `should not deliver deleted events as candidates`() { + val dispatcher = EventDispatcher(store) + val received = mutableListOf() + object : EventListener() { + override fun onEvent(event: Event, history: List): Event? { + received += event + return null + } + } + // Original hendelse + val original = TriggerEvent() + + // Slettehendelse som peker på original + val deleted = object : DeleteEvent() { + override var deletedEventId = original.eventId + } + + // Dispatch med begge hendelser + dispatcher.dispatch(original.referenceId, listOf(original, deleted)) + + // Verifiser at original ikke ble levert som kandidat + assertFalse(received.contains(original)) { + "Original hendelse ble levert til lytteren selv om den var slettet" + } + + // Verifiser at slett-hendelsen finnes i konteksten + assertTrue(received.any() { it is DeleteEvent }) { + "DeleteEvent skal leveres som kandidat" + } + } + + @Test + fun `should deliver DeleteEvent to listeners that react to it`() { + val received = mutableListOf() + val listener = object : EventListener() { + override fun onEvent(event: Event, context: List): Event? { + if (event is DeleteEvent) received += event + return null + } + } + + val deleted = object : DeleteEvent() { + override var deletedEventId = UUID.randomUUID() + } + dispatcher.dispatch(deleted.referenceId, listOf(deleted)) + + assertTrue(received.contains(deleted)) + } + + + + // --- Test helpers --- + + class ProducingListener : EventListener() { + override fun onEvent(event: Event, context: List): Event? { + return if (event is TriggerEvent) DerivedEvent().derivedOf(event) else null + } + } + + class ContextCapturingListener : EventListener() { + var context: List = emptyList() + override fun onEvent(event: Event, context: List): Event? { + this.context = context + return null + } + } + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt b/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt new file mode 100644 index 0000000..0edd5c5 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/EventTypeRegistryTest.kt @@ -0,0 +1,31 @@ +package no.iktdev.eventi + +import no.iktdev.eventi.events.EchoEvent +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.events.StartEvent +import no.iktdev.eventi.testUtil.wipe +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class EventTypeRegistryTest: TestBase() { + + @BeforeEach + fun setup() { + EventTypeRegistry.wipe() + // Verifiser at det er tomt + assertNull(EventTypeRegistry.resolve("SomeEvent")) + } + + + @Test + @DisplayName("Test EventTypeRegistry registration") + fun scenario1() { + DefaultTestEvents() + assertThat(EventTypeRegistry.resolve("EchoEvent")).isEqualTo(EchoEvent::class.java) + assertThat(EventTypeRegistry.resolve("StartEvent")).isEqualTo(StartEvent::class.java) + } + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt new file mode 100644 index 0000000..fc59046 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt @@ -0,0 +1,27 @@ +package no.iktdev.eventi + +import no.iktdev.eventi.ZDS.toPersisted +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.eventi.stores.EventStore +import java.time.LocalDateTime +import java.util.UUID + +class InMemoryEventStore : EventStore { + private val persisted = mutableListOf() + private var nextId = 1L + + override fun getPersistedEventsAfter(timestamp: LocalDateTime): List = + persisted.filter { it.persistedAt > timestamp } + + override fun getPersistedEventsFor(referenceId: UUID): List = + persisted.filter { it.referenceId == referenceId } + + override fun save(event: Event) { + val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now()) + persisted += persistedEvent + } + + fun all(): List = persisted + fun clear() { persisted.clear(); nextId = 1L } +} diff --git a/src/test/kotlin/no/iktdev/eventi/TestBase.kt b/src/test/kotlin/no/iktdev/eventi/TestBase.kt new file mode 100644 index 0000000..c858bc2 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/TestBase.kt @@ -0,0 +1,25 @@ +package no.iktdev.eventi + +import no.iktdev.eventi.events.EchoEvent +import no.iktdev.eventi.events.EventTypeRegistration +import no.iktdev.eventi.events.StartEvent +import no.iktdev.eventi.models.Event + +open class TestBase { + + val store = InMemoryEventStore() + + class DefaultTestEvents() : EventTypeRegistration() { + override fun definedTypes(): List> { + return listOf( + EchoEvent::class.java, + StartEvent::class.java + ) + } + } + + init { + DefaultTestEvents() + } + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt new file mode 100644 index 0000000..7c01a04 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt @@ -0,0 +1,37 @@ +package no.iktdev.eventi + +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.ZDS.toPersisted +import no.iktdev.eventi.events.EchoEvent +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.testUtil.wipe +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import java.time.LocalDateTime + +class ZDSTest { + + @BeforeEach + fun setup() { + EventTypeRegistry.wipe() + // Verifiser at det er tomt + assertNull(EventTypeRegistry.resolve("SomeEvent")) + } + + @Test + @DisplayName("Test ZDS") + fun scenario1() { + EventTypeRegistry.register(EchoEvent::class.java) + + val echo = EchoEvent("hello") + val persisted = echo.toPersisted(id = 1L, persistedAt = LocalDateTime.now()) + + val restored = persisted.toEvent() + assert(restored is EchoEvent) + assert((restored as EchoEvent).data == "hello") + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt b/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt new file mode 100644 index 0000000..8cab690 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/AbstractEventPollerTest.kt @@ -0,0 +1,152 @@ +package no.iktdev.eventi.events + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.EventDispatcherTest +import no.iktdev.eventi.EventDispatcherTest.DerivedEvent +import no.iktdev.eventi.EventDispatcherTest.OtherEvent +import no.iktdev.eventi.EventDispatcherTest.TriggerEvent +import no.iktdev.eventi.TestBase +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.testUtil.wipe +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.Duration +import java.time.LocalDateTime +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +class AbstractEventPollerTest : TestBase() { + val dispatcher = EventDispatcher(store) + val queue = SequenceDispatchQueue(maxConcurrency = 8) + + val poller = object : AbstractEventPoller(store, queue, dispatcher) {} + + @BeforeEach + fun setup() { + EventTypeRegistry.wipe() + EventListenerRegistry.wipe() + store.clear() + // Verifiser at det er tomt + + EventTypeRegistry.register(listOf( + DerivedEvent::class.java, + TriggerEvent::class.java, + OtherEvent::class.java + )) + } + + @Test + fun `pollOnce should dispatch all new referenceIds and update lastSeenTime`() = runTest { + val dispatched = ConcurrentHashMap.newKeySet() + val completionMap = mutableMapOf>() + + EventListenerRegistry.registerListener(object : EventListener() { + override fun onEvent(event: Event, context: List): Event? { + dispatched += event.referenceId + completionMap[event.referenceId]?.complete(Unit) + return null + } + }) + + val referenceIds = (1..10).map { UUID.randomUUID() } + + referenceIds.forEach { refId -> + val e = EventDispatcherTest.TriggerEvent().usingReferenceId(refId) + store.save(e) // persistedAt settes automatisk her + completionMap[refId] = CompletableDeferred() + } + + poller.pollOnce() + + completionMap.values.awaitAll() + + assertEquals(referenceIds.toSet(), dispatched) + } + + @Test + fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest { + val testPoller = object : AbstractEventPoller(store, queue, dispatcher) { + fun currentBackoff(): Duration = backoff + } + + testPoller.pollOnce() + val afterFirst = testPoller.currentBackoff() + + testPoller.pollOnce() + val afterSecond = testPoller.currentBackoff() + + assertTrue(afterSecond > afterFirst) + + val e = TriggerEvent().usingReferenceId(UUID.randomUUID()) + store.save(e) + + testPoller.pollOnce() + val afterReset = testPoller.currentBackoff() + + assertEquals(Duration.ofSeconds(2), afterReset) + } + + @Test + fun `pollOnce should group and dispatch exactly 3 events for one referenceId`() = runTest { + val refId = UUID.randomUUID() + val received = mutableListOf() + val done = CompletableDeferred() + + // Wipe alt før test + EventTypeRegistry.wipe() + EventListenerRegistry.wipe() + store.clear() // sørg for at InMemoryEventStore støtter dette + + EventTypeRegistry.register(listOf(TriggerEvent::class.java)) + + object : EventListener() { + override fun onEvent(event: Event, context: List): Event? { + received += event + if (received.size == 3) done.complete(Unit) + return null + } + } + + repeat(3) { + store.save(TriggerEvent().usingReferenceId(refId)) + } + + poller.pollOnce() + + done.await() + + assertEquals(3, received.size) + assertTrue(received.all { it.referenceId == refId }) + } + + + + @Test + fun `pollOnce should ignore events before lastSeenTime`() = runTest { + val refId = UUID.randomUUID() + val ignored = TriggerEvent().usingReferenceId(refId) + + val testPoller = object : AbstractEventPoller(store, queue, dispatcher) { + init { + lastSeenTime = LocalDateTime.now().plusSeconds(1) + } + } + + store.save(ignored) + + testPoller.pollOnce() + + assertFalse(queue.isProcessing(refId)) + } + + + + + + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt b/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt new file mode 100644 index 0000000..98a134b --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/SequenceDispatchQueueTest.kt @@ -0,0 +1,65 @@ +package no.iktdev.eventi.events + +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.EventDispatcherTest +import no.iktdev.eventi.EventDispatcherTest.DerivedEvent +import no.iktdev.eventi.EventDispatcherTest.OtherEvent +import no.iktdev.eventi.EventDispatcherTest.TriggerEvent +import no.iktdev.eventi.TestBase +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.testUtil.wipe +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +class SequenceDispatchQueueTest: TestBase() { + + @BeforeEach + fun setup() { + EventTypeRegistry.wipe() + EventListenerRegistry.wipe() + // Verifiser at det er tomt + + EventTypeRegistry.register(listOf( + DerivedEvent::class.java, + TriggerEvent::class.java, + OtherEvent::class.java + )) + } + + + @Test + fun `should dispatch all referenceIds with limited concurrency`() = runTest { + val dispatcher = EventDispatcher(store) + val queue = SequenceDispatchQueue(maxConcurrency = 8) + + val dispatched = ConcurrentHashMap.newKeySet() + + EventListenerRegistry.registerListener(object : EventListener() { + override fun onEvent(event: Event, context: List): Event? { + dispatched += event.referenceId + Thread.sleep(50) // simuler tung prosessering + return null + } + }) + + val referenceIds = (1..100).map { UUID.randomUUID() } + + val jobs = referenceIds.mapNotNull { refId -> + val e = TriggerEvent().usingReferenceId(refId) + store.save(e) + queue.dispatch(refId, listOf(e), dispatcher) + } + + jobs.joinAll() + + assertEquals(100, dispatched.size) + } + + +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt b/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt new file mode 100644 index 0000000..8bca478 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/TestEvents.kt @@ -0,0 +1,11 @@ +package no.iktdev.eventi.events + +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.Metadata +import java.util.UUID + +class StartEvent(): Event() { +} + +class EchoEvent(override var data: String): Event() { +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt new file mode 100644 index 0000000..a14da3e --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/testUtil/EventListenerRegistryUtil.kt @@ -0,0 +1,18 @@ +package no.iktdev.eventi.testUtil + +import no.iktdev.eventi.events.EventListener +import no.iktdev.eventi.events.EventListenerRegistry +import org.assertj.core.api.Assertions.assertThat +import java.lang.reflect.Field + +fun EventListenerRegistry.wipe() { + val field: Field = EventListenerRegistry::class.java.getDeclaredField("listeners") + field.isAccessible = true + + // Tøm map’en + val mutableList = field.get(EventListenerRegistry) as MutableList<*> + (mutableList as MutableList>).clear() + + // Verifiser at det er tomt + assertThat(EventListenerRegistry.getListeners().isEmpty()) +} \ No newline at end of file diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt new file mode 100644 index 0000000..3c9c1a8 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/testUtil/EventTypeRegistryUtil.kt @@ -0,0 +1,18 @@ +package no.iktdev.eventi.testUtil + +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.models.Event +import org.junit.jupiter.api.Assertions.assertNull +import java.lang.reflect.Field + +fun EventTypeRegistry.wipe() { + val field: Field = EventTypeRegistry::class.java.getDeclaredField("types") + field.isAccessible = true + + // Tøm map’en + val typesMap = field.get(EventTypeRegistry) as MutableMap<*, *> + (typesMap as MutableMap>).clear() + + // Verifiser at det er tomt + assertNull(EventTypeRegistry.resolve("ANnonExistingEvent")) +} \ No newline at end of file