Controllers added + pipeline

This commit is contained in:
Brage Skjønborg 2026-01-04 01:58:22 +01:00
parent 82b48b8c13
commit cc6656ed5d
13 changed files with 437 additions and 7 deletions

85
.github/workflows/build-java-app.yml vendored Normal file
View File

@ -0,0 +1,85 @@
name: Build Java App
on:
workflow_call:
inputs:
app:
required: true
type: string
dockerTag:
required: true
type: string
jobs:
build-java:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Detect frontend
id: detect_web
run: |
if [ -d "apps/${{ inputs.app }}/web" ]; then
echo "has_web=true" >> $GITHUB_OUTPUT
else
echo "has_web=false" >> $GITHUB_OUTPUT
fi
- name: Build React frontend
if: ${{ steps.detect_web.outputs.has_web == 'true' }}
run: |
cd apps/${{ inputs.app }}/web
npm install
export CI=false
npm run build
- name: Copy React build into Spring Boot resources
if: ${{ steps.detect_web.outputs.has_web == 'true' }}
run: |
rm -rf apps/${{ inputs.app }}/src/main/resources/static
mkdir -p apps/${{ inputs.app }}/src/main/resources/static
cp -r apps/${{ inputs.app }}/web/build/* apps/${{ inputs.app }}/src/main/resources/static
- name: Extract version
run: |
VERSION=$(grep '^version' apps/${{ inputs.app }}/build.gradle.kts | sed 's/.*"\(.*\)".*/\1/')
echo "VERSION=$VERSION" >> $GITHUB_ENV
- name: Build Java module
run: |
chmod +x ./gradlew
./gradlew :apps:${{ inputs.app }}:bootJar --info --stacktrace
- name: Build Docker image locally
run: |
docker build \
-f ./dockerfiles/DebianJava \
-t local-${{ inputs.app }}:${{ inputs.dockerTag }} \
--build-arg MODULE_NAME=${{ inputs.app }} \
--build-arg PASS_APP_VERSION=${{ env.VERSION }} \
.
- name: Test Docker container
run: |
docker run --rm local-${{ inputs.app }}:${{ inputs.dockerTag }} /bin/sh -c "echo 'Smoke test OK'"
- name: Docker login
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./dockerfiles/DebianJava
build-args: |
MODULE_NAME=${{ inputs.app }}
PASS_APP_VERSION=${{ env.VERSION }}
push: true
tags: |
bskjon/mediaprocessing-${{ inputs.app }}:v5
bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ inputs.dockerTag }}
bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ github.sha }}

73
.github/workflows/build-python-app.yml vendored Normal file
View File

@ -0,0 +1,73 @@
name: Build Python App
on:
workflow_call:
inputs:
app:
required: true
type: string
dockerTag:
required: true
type: string
jobs:
build-python:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# Optional: install runtime dependencies if requirements.txt exists
- name: Install runtime dependencies
if: ${{ hashFiles(format('apps/{0}/requirements.txt', inputs.app)) != '' }}
run: |
cd apps/${{ inputs.app }}
pip install -r requirements.txt
# Install test dependencies (pytest, asyncio test libs, etc.)
- name: Install test dependencies
if: ${{ hashFiles(format('apps/{0}/requirements-test.txt', inputs.app)) != '' }}
run: |
cd apps/${{ inputs.app }}
pip install -r requirements-test.txt
# Run Python tests
- name: Run Python tests
run: |
cd apps/${{ inputs.app }}
pytest -q
# Build Docker image locally
- name: Build Docker image locally
run: |
docker build \
-f ./dockerfiles/Python \
-t local-${{ inputs.app }}:${{ inputs.dockerTag }} \
--build-arg MODULE_NAME=${{ inputs.app }} \
.
# Smoke-test the container
- name: Test Docker container
run: |
docker run --rm local-${{ inputs.app }}:${{ inputs.dockerTag }} /bin/sh -c "echo 'Smoke test OK'"
# Login to Docker Hub
- name: Docker login
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
# Push final image
- name: Push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./dockerfiles/Python
build-args: |
MODULE_NAME=${{ inputs.app }}
push: true
tags: |
bskjon/mediaprocessing-${{ inputs.app }}:v5
bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ inputs.dockerTag }}
bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ github.sha }}

26
.github/workflows/build-shared.yml vendored Normal file
View File

@ -0,0 +1,26 @@
name: Build Shared
on:
workflow_call:
inputs:
dockerTag:
required: true
type: string
jobs:
build-shared:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Cache Gradle
uses: actions/cache@v4
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
- name: Build Shared module
run: |
chmod +x ./gradlew
./gradlew :shared:build --info --stacktrace

103
.github/workflows/build-v5.yml vendored Normal file
View File

@ -0,0 +1,103 @@
name: Build v5
on:
push:
branches: [ v5 ]
pull_request:
branches: [ v5 ]
workflow_dispatch:
jobs:
pre-check:
runs-on: ubuntu-latest
outputs:
shared: ${{ steps.filter.outputs.shared }}
processer: ${{ steps.filter.outputs.processer }}
converter: ${{ steps.filter.outputs.converter }}
coordinator: ${{ steps.filter.outputs.coordinator }}
ui: ${{ steps.filter.outputs.ui }}
pyMetadata: ${{ steps.filter.outputs.metadata }}
pyWatcher: ${{ steps.filter.outputs.watcher }}
dockerTag: ${{ steps.tag.outputs.tag }}
steps:
- uses: actions/checkout@v4
- name: Detect changes
id: filter
uses: dorny/paths-filter@v3
with:
filters: |
shared:
- 'shared/**'
processer:
- 'apps/processer/**'
converter:
- 'apps/converter/**'
coordinator:
- 'apps/coordinator/**'
ui:
- 'apps/ui/**'
metadata:
- 'apps/pyMetadata/**'
watcher:
- 'apps/pyWatcher/**'
- name: Generate docker tag
id: tag
run: echo "tag=$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" >> $GITHUB_OUTPUT
build-shared:
needs: pre-check
if: ${{ needs.pre-check.outputs.shared == 'true' || github.event_name == 'workflow_dispatch' }}
uses: ./.github/workflows/build-shared.yml
with:
dockerTag: ${{ needs.pre-check.outputs.dockerTag }}
build-apps:
needs: [pre-check, build-shared]
runs-on: ubuntu-latest
strategy:
matrix:
include:
- app: processer
type: java
enabled: true
- app: converter
type: java
enabled: true
- app: coordinator
type: java
enabled: true
- app: ui
type: java
enabled: false
- app: pyMetadata
type: python
enabled: true
- app: pyWatcher
type: python
enabled: true
if: ${{ matrix.enabled == true &&
(needs.pre-check.outputs[matrix.app] == 'true'
|| needs.pre-check.outputs.shared == 'true'
|| github.event_name == 'workflow_dispatch') }}
steps:
- name: Call Java workflow
if: ${{ matrix.type == 'java' }}
uses: ./.github/workflows/build-java-app.yml
with:
app: ${{ matrix.app }}
dockerTag: ${{ needs.pre-check.outputs.dockerTag }}
- name: Call Python workflow
if: ${{ matrix.type == 'python' }}
uses: ./.github/workflows/build-python-app.yml
with:
app: ${{ matrix.app }}
dockerTag: ${{ needs.pre-check.outputs.dockerTag }}

View File

@ -0,0 +1,18 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate
import org.springframework.stereotype.Service
import java.util.concurrent.ConcurrentHashMap
@Service
class CoordinatorService {
private val progressMap = ConcurrentHashMap<String, ProgressUpdate>()
fun updateProgress(update: ProgressUpdate) {
progressMap[update.taskId] = update
}
fun getProgress(taskId: String): ProgressUpdate? =
progressMap[taskId]
}

View File

@ -0,0 +1,22 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.CoordinatorService
import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/internal")
class InternalProcesserController(
private val coordinator: CoordinatorService
) {
@PostMapping("/progress")
fun receiveProgress(@RequestBody update: ProgressUpdate): ResponseEntity<Void> {
coordinator.updateProgress(update)
return ResponseEntity.ok().build()
}
}

View File

@ -24,10 +24,11 @@ repositories {
dependencies { dependencies {
/*Spring boot*/ /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter-webflux")
// implementation("org.springframework.kafka:spring-kafka:3.0.1") // implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")

View File

@ -0,0 +1,24 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient
@Component
class CoordinatorClient(
private val webClient: WebClient
) {
fun reportProgress(referenceId: String, taskId: String, percent: FfmpegDecodedProgress, message: String?) =
webClient.post()
.uri("/internal/progress")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(
ProgressUpdate(referenceId, taskId, percent, message)
)
.retrieve()
.toBodilessEntity()
}

View File

@ -0,0 +1,19 @@
package no.iktdev.mediaprocessing.processer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class CoordinatorClientConfig {
@Bean
fun coordinatorWebClient(builder: WebClient.Builder): WebClient {
val baseUrl = ProcesserEnv.coordinatorUrl
?: error("COORDINATOR_URL must be set")
return builder
.baseUrl(baseUrl)
.build()
}
}

View File

@ -5,7 +5,7 @@ import java.io.File
class ProcesserEnv { class ProcesserEnv {
companion object { companion object {
val wsAllowedOrigins: String = System.getenv("AllowedOriginsWebsocket")?.takeIf { it.isNotBlank() } ?: "" val coordinatorUrl = System.getenv("COORDINATOR_URL") ?: "http://coordinator:8080"
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false

View File

@ -0,0 +1,35 @@
package no.iktdev.mediaprocessing.processer.controller
import no.iktdev.mediaprocessing.processer.listeners.SubtitleTaskListener
import no.iktdev.mediaprocessing.processer.listeners.VideoTaskListener
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
@Controller
class CancelController {
@Autowired
lateinit var videoTaskListener: VideoTaskListener
@Autowired
lateinit var subtitleTaskListener: SubtitleTaskListener
@RequestMapping("/cancel/single")
fun cancelTask(@RequestBody eventId: String? = null): ResponseEntity<String> {
if (eventId.isNullOrBlank()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("No eventId provided!")
}
var canceled: Boolean = false
if (videoTaskListener.currentTaskId?.toString() == eventId) {
videoTaskListener.currentJob?.cancel()
canceled = true
}
if (subtitleTaskListener.currentTaskId?.toString() == eventId) {
subtitleTaskListener.currentJob?.cancel()
canceled = true
}
return if (canceled) ResponseEntity.status(HttpStatus.FOUND).body("Canceled") else ResponseEntity.status(HttpStatus.NOT_FOUND).body("Not found!")
}
}

View File

@ -7,6 +7,7 @@ import no.iktdev.eventi.tasks.TaskType
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.CoordinatorClient
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.Util import no.iktdev.mediaprocessing.processer.Util
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
@ -15,7 +16,8 @@ import org.springframework.stereotype.Service
import java.util.* import java.util.*
@Service @Service
class VideoTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
override fun supports(task: Task) = task is EncodeTask override fun supports(task: Task) = task is EncodeTask
@ -59,16 +61,35 @@ class VideoTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
override fun getFfmpeg(): FFmpeg { override fun getFfmpeg(): FFmpeg {
return VideoFFmpeg(object : FFmpeg.Listener { return VideoFFmpeg(object : FFmpeg.Listener {
var lastProgress: FfmpegDecodedProgress? = null
override fun onStarted(inputFile: String) { override fun onStarted(inputFile: String) {
} }
override fun onCompleted(inputFile: String, outputFile: String) { override fun onCompleted(inputFile: String, outputFile: String) {
currentTask?.let {
coordinatorWebClient.reportProgress(
referenceId = it.referenceId.toString(),
taskId = it.taskId.toString(),
percent = FfmpegDecodedProgress(100, "", lastProgress?.duration ?: "", "0", estimatedCompletion = "", estimatedCompletionSeconds = 0),
""
)
}
} }
override fun onProgressChanged( override fun onProgressChanged(
inputFile: String, inputFile: String,
progress: FfmpegDecodedProgress progress: FfmpegDecodedProgress
) { ) {
lastProgress = progress
currentTask?.let {
coordinatorWebClient.reportProgress(
referenceId = it.referenceId.toString(),
taskId = it.taskId.toString(),
percent = progress,
""
)
}
} }
}) })
} }

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.processer.listeners package no.iktdev.mediaprocessing.processer.listeners
import io.mockk.mockk
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
@ -7,6 +8,7 @@ import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.processer.CoordinatorClient
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask
@ -19,7 +21,7 @@ import kotlin.system.measureTimeMillis
class VideoTaskListenerTest { class VideoTaskListenerTest {
class TestListener(val delay: Long): VideoTaskListener() { class TestListener(val delay: Long, coordinatorClient: CoordinatorClient): VideoTaskListener(coordinatorClient) {
fun getJob() = currentJob fun getJob() = currentJob
private var _result: Event? = null private var _result: Event? = null
@ -50,6 +52,7 @@ class VideoTaskListenerTest {
fun setup() { fun setup() {
TaskTypeRegistry.register(EncodeTask::class.java) TaskTypeRegistry.register(EncodeTask::class.java)
} }
private val coordinatorClient = mockk<CoordinatorClient>(relaxed = true)
@Test @Test
fun `onTask waits for runner to complete`() = runTest { fun `onTask waits for runner to complete`() = runTest {
@ -62,7 +65,7 @@ class VideoTaskListenerTest {
) )
).newReferenceId() ).newReferenceId()
val listener = TestListener(delay) val listener = TestListener(delay, coordinatorClient)
val time = measureTimeMillis { val time = measureTimeMillis {
listener.accept(testTask, overrideReporter) listener.accept(testTask, overrideReporter)