Cancellation handling
This commit is contained in:
parent
b62079f65f
commit
ce75726824
@ -69,10 +69,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
val result = onTask(task)
|
val result = onTask(task)
|
||||||
reporter.markCompleted(task.taskId)
|
reporter.markCompleted(task.taskId)
|
||||||
onComplete(task, result)
|
onComplete(task, result)
|
||||||
|
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
onCancelled()
|
// Dette er en ekte kansellering
|
||||||
|
onCancelled(task)
|
||||||
|
throw e // viktig: ikke svelg cancellation
|
||||||
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
// Dette er en faktisk feil
|
||||||
onError(task, e)
|
onError(task, e)
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
heartbeatRunner?.cancel()
|
heartbeatRunner?.cancel()
|
||||||
currentJob?.cancel()
|
currentJob?.cancel()
|
||||||
@ -82,6 +88,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
this@TaskListener.reporter = null
|
this@TaskListener.reporter = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +106,8 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onCancelled() {
|
override fun onCancelled(task: Task) {
|
||||||
|
reporter!!.markCancelled(task.taskId)
|
||||||
currentJob?.cancel()
|
currentJob?.cancel()
|
||||||
heartbeatRunner?.cancel()
|
heartbeatRunner?.cancel()
|
||||||
currentTask = null
|
currentTask = null
|
||||||
@ -119,7 +127,7 @@ interface TaskListenerImplementation {
|
|||||||
suspend fun onTask(task: Task): Event?
|
suspend fun onTask(task: Task): Event?
|
||||||
fun onComplete(task: Task, result: Event?)
|
fun onComplete(task: Task, result: Event?)
|
||||||
fun onError(task: Task, exception: Exception)
|
fun onError(task: Task, exception: Exception)
|
||||||
fun onCancelled()
|
fun onCancelled(task: Task)
|
||||||
}
|
}
|
||||||
|
|
||||||
interface TaskReporter {
|
interface TaskReporter {
|
||||||
@ -127,6 +135,7 @@ interface TaskReporter {
|
|||||||
fun updateLastSeen(taskId: UUID)
|
fun updateLastSeen(taskId: UUID)
|
||||||
fun markCompleted(taskId: UUID)
|
fun markCompleted(taskId: UUID)
|
||||||
fun markFailed(taskId: UUID)
|
fun markFailed(taskId: UUID)
|
||||||
|
fun markCancelled(taskId: UUID)
|
||||||
fun updateProgress(taskId: UUID, progress: Int)
|
fun updateProgress(taskId: UUID, progress: Int)
|
||||||
fun log(taskId: UUID, message: String)
|
fun log(taskId: UUID, message: String)
|
||||||
fun publishEvent(event: Event)
|
fun publishEvent(event: Event)
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import kotlinx.coroutines.test.runTest
|
|||||||
import kotlinx.coroutines.yield
|
import kotlinx.coroutines.yield
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
|
||||||
import org.junit.jupiter.api.Assertions.*
|
import org.junit.jupiter.api.Assertions.*
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
@ -42,6 +41,9 @@ class TaskListenerTest {
|
|||||||
consumed = true
|
consumed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun markCancelled(taskId: UUID) {
|
||||||
|
}
|
||||||
|
|
||||||
override fun updateProgress(taskId: UUID, progress: Int) {
|
override fun updateProgress(taskId: UUID, progress: Int) {
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -428,8 +430,8 @@ class TaskListenerTest {
|
|||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onCancelled() {
|
override fun onCancelled(task: Task) {
|
||||||
super.onCancelled()
|
super.onCancelled(task)
|
||||||
cancelledCalled.complete(Unit)
|
cancelledCalled.complete(Unit)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,7 +38,7 @@ class TaskPollerImplementationTest : TestBase() {
|
|||||||
override fun updateLastSeen(taskId: UUID) {}
|
override fun updateLastSeen(taskId: UUID) {}
|
||||||
override fun markCompleted(taskId: UUID) {}
|
override fun markCompleted(taskId: UUID) {}
|
||||||
override fun markFailed(taskId: UUID) {}
|
override fun markFailed(taskId: UUID) {}
|
||||||
|
override fun markCancelled(taskId: UUID) {}
|
||||||
override fun updateProgress(taskId: UUID, progress: Int) {}
|
override fun updateProgress(taskId: UUID, progress: Int) {}
|
||||||
override fun log(taskId: UUID, message: String) {}
|
override fun log(taskId: UUID, message: String) {}
|
||||||
override fun publishEvent(event: Event) {
|
override fun publishEvent(event: Event) {
|
||||||
@ -92,8 +92,8 @@ class TaskPollerImplementationTest : TestBase() {
|
|||||||
super.onError(task, exception)
|
super.onError(task, exception)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onCancelled() {
|
override fun onCancelled(task: Task) {
|
||||||
super.onCancelled()
|
super.onCancelled(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user