Changes to tasks
This commit is contained in:
parent
479d4cc25e
commit
597adb36bc
@ -15,7 +15,7 @@ import kotlin.coroutines.cancellation.CancellationException
|
||||
* @param T The type of result produced by processing the task.
|
||||
* @param reporter An instance of [TaskReporter] for reporting task status and events.
|
||||
*/
|
||||
abstract class TaskListener<T>(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation<T> {
|
||||
abstract class TaskListener<T>(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation {
|
||||
|
||||
init {
|
||||
TaskListenerRegistry.registerListener(this)
|
||||
@ -69,9 +69,12 @@ abstract class TaskListener<T>(val taskType: TaskType = TaskType.CPU_INTENSIVE):
|
||||
reporter?.markConsumed(task.taskId)
|
||||
}
|
||||
|
||||
override fun onComplete(task: Task, result: T?) {
|
||||
reporter?.markConsumed(task.taskId)
|
||||
reporter?.log(task.taskId, "Task completed successfully.")
|
||||
override fun onComplete(task: Task, result: Event?) {
|
||||
reporter!!.markConsumed(task.taskId)
|
||||
reporter!!.log(task.taskId, "Task completed successfully.")
|
||||
result?.let {
|
||||
reporter!!.publishEvent(result)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onCancelled() {
|
||||
@ -88,11 +91,11 @@ enum class TaskType {
|
||||
}
|
||||
|
||||
|
||||
interface TaskListenerImplementation<T> {
|
||||
interface TaskListenerImplementation {
|
||||
fun supports(task: Task): Boolean
|
||||
fun accept(task: Task, reporter: TaskReporter): Boolean
|
||||
suspend fun onTask(task: Task): T
|
||||
fun onComplete(task: Task, result: T?)
|
||||
suspend fun onTask(task: Task): Event?
|
||||
fun onComplete(task: Task, result: Event?)
|
||||
fun onError(task: Task, exception: Exception)
|
||||
fun onCancelled()
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
|
||||
|
||||
open class EchoListener : TaskListener<String>(TaskType.MIXED) {
|
||||
var result: String? = null
|
||||
var result: Event? = null
|
||||
|
||||
override fun getWorkerId() = this.javaClass.simpleName
|
||||
|
||||
@ -66,17 +66,17 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
return task is EchoTask
|
||||
}
|
||||
|
||||
override suspend fun onTask(task: Task): String {
|
||||
override suspend fun onTask(task: Task): Event {
|
||||
if (task is EchoTask) {
|
||||
return task.data + " Potetmos"
|
||||
return EchoEvent(task.data + " Potetmos").producedFrom(task)
|
||||
}
|
||||
throw IllegalArgumentException("Unsupported task type: ${task::class.java}")
|
||||
}
|
||||
|
||||
override fun onComplete(task: Task, result: String?) {
|
||||
override fun onComplete(task: Task, result: Event?) {
|
||||
super.onComplete(task, result)
|
||||
this.result = result;
|
||||
reporter?.publishEvent(EchoEvent(result!!).producedFrom(task))
|
||||
reporter?.publishEvent(result!!)
|
||||
}
|
||||
|
||||
}
|
||||
@ -99,7 +99,7 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
val producedEvent = eventDeferred.await()
|
||||
assertThat(producedEvent).isNotNull
|
||||
assertThat(producedEvent!!.metadata.derivedFromId).isEqualTo(task.taskId)
|
||||
assertThat(listener.result).isEqualTo("Hello Potetmos")
|
||||
assertThat((listener.result as EchoEvent).data).isEqualTo("Hello Potetmos")
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
@ -120,7 +120,7 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
advanceUntilIdle()
|
||||
|
||||
assertEquals(initialBackoff, poller.backoff)
|
||||
assertEquals("Hello Potetmos", listener.result)
|
||||
assertEquals("Hello Potetmos", (listener.result as EchoEvent).data)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Loading…
Reference in New Issue
Block a user