diff --git a/.github/workflows/build-java-app.yml b/.github/workflows/build-java-app.yml new file mode 100644 index 00000000..8d76c8ce --- /dev/null +++ b/.github/workflows/build-java-app.yml @@ -0,0 +1,85 @@ +name: Build Java App + +on: + workflow_call: + inputs: + app: + required: true + type: string + dockerTag: + required: true + type: string + +jobs: + build-java: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Detect frontend + id: detect_web + run: | + if [ -d "apps/${{ inputs.app }}/web" ]; then + echo "has_web=true" >> $GITHUB_OUTPUT + else + echo "has_web=false" >> $GITHUB_OUTPUT + fi + + - name: Build React frontend + if: ${{ steps.detect_web.outputs.has_web == 'true' }} + run: | + cd apps/${{ inputs.app }}/web + npm install + export CI=false + npm run build + + - name: Copy React build into Spring Boot resources + if: ${{ steps.detect_web.outputs.has_web == 'true' }} + run: | + rm -rf apps/${{ inputs.app }}/src/main/resources/static + mkdir -p apps/${{ inputs.app }}/src/main/resources/static + cp -r apps/${{ inputs.app }}/web/build/* apps/${{ inputs.app }}/src/main/resources/static + + - name: Extract version + run: | + VERSION=$(grep '^version' apps/${{ inputs.app }}/build.gradle.kts | sed 's/.*"\(.*\)".*/\1/') + echo "VERSION=$VERSION" >> $GITHUB_ENV + + - name: Build Java module + run: | + chmod +x ./gradlew + ./gradlew :apps:${{ inputs.app }}:bootJar --info --stacktrace + + - name: Build Docker image locally + run: | + docker build \ + -f ./dockerfiles/DebianJava \ + -t local-${{ inputs.app }}:${{ inputs.dockerTag }} \ + --build-arg MODULE_NAME=${{ inputs.app }} \ + --build-arg PASS_APP_VERSION=${{ env.VERSION }} \ + . + + - name: Test Docker container + run: | + docker run --rm local-${{ inputs.app }}:${{ inputs.dockerTag }} /bin/sh -c "echo 'Smoke test OK'" + + - name: Docker login + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./dockerfiles/DebianJava + build-args: | + MODULE_NAME=${{ inputs.app }} + PASS_APP_VERSION=${{ env.VERSION }} + push: true + tags: | + bskjon/mediaprocessing-${{ inputs.app }}:v5 + bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ inputs.dockerTag }} + bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ github.sha }} diff --git a/.github/workflows/build-python-app.yml b/.github/workflows/build-python-app.yml new file mode 100644 index 00000000..06d7c2af --- /dev/null +++ b/.github/workflows/build-python-app.yml @@ -0,0 +1,73 @@ +name: Build Python App + +on: + workflow_call: + inputs: + app: + required: true + type: string + dockerTag: + required: true + type: string + +jobs: + build-python: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + # Optional: install runtime dependencies if requirements.txt exists + - name: Install runtime dependencies + if: ${{ hashFiles(format('apps/{0}/requirements.txt', inputs.app)) != '' }} + run: | + cd apps/${{ inputs.app }} + pip install -r requirements.txt + + # Install test dependencies (pytest, asyncio test libs, etc.) + - name: Install test dependencies + if: ${{ hashFiles(format('apps/{0}/requirements-test.txt', inputs.app)) != '' }} + run: | + cd apps/${{ inputs.app }} + pip install -r requirements-test.txt + + # Run Python tests + - name: Run Python tests + run: | + cd apps/${{ inputs.app }} + pytest -q + + # Build Docker image locally + - name: Build Docker image locally + run: | + docker build \ + -f ./dockerfiles/Python \ + -t local-${{ inputs.app }}:${{ inputs.dockerTag }} \ + --build-arg MODULE_NAME=${{ inputs.app }} \ + . + + # Smoke-test the container + - name: Test Docker container + run: | + docker run --rm local-${{ inputs.app }}:${{ inputs.dockerTag }} /bin/sh -c "echo 'Smoke test OK'" + + # Login to Docker Hub + - name: Docker login + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + # Push final image + - name: Push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./dockerfiles/Python + build-args: | + MODULE_NAME=${{ inputs.app }} + push: true + tags: | + bskjon/mediaprocessing-${{ inputs.app }}:v5 + bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ inputs.dockerTag }} + bskjon/mediaprocessing-${{ inputs.app }}:v5-${{ github.sha }} diff --git a/.github/workflows/build-shared.yml b/.github/workflows/build-shared.yml new file mode 100644 index 00000000..676d7f01 --- /dev/null +++ b/.github/workflows/build-shared.yml @@ -0,0 +1,26 @@ +name: Build Shared + +on: + workflow_call: + inputs: + dockerTag: + required: true + type: string + +jobs: + build-shared: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Cache Gradle + uses: actions/cache@v4 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }} + + - name: Build Shared module + run: | + chmod +x ./gradlew + ./gradlew :shared:build --info --stacktrace diff --git a/.github/workflows/build-v5.yml b/.github/workflows/build-v5.yml new file mode 100644 index 00000000..66e52e84 --- /dev/null +++ b/.github/workflows/build-v5.yml @@ -0,0 +1,103 @@ +name: Build v5 + +on: + push: + branches: [ v5 ] + pull_request: + branches: [ v5 ] + workflow_dispatch: + +jobs: + pre-check: + runs-on: ubuntu-latest + outputs: + shared: ${{ steps.filter.outputs.shared }} + processer: ${{ steps.filter.outputs.processer }} + converter: ${{ steps.filter.outputs.converter }} + coordinator: ${{ steps.filter.outputs.coordinator }} + ui: ${{ steps.filter.outputs.ui }} + pyMetadata: ${{ steps.filter.outputs.metadata }} + pyWatcher: ${{ steps.filter.outputs.watcher }} + dockerTag: ${{ steps.tag.outputs.tag }} + + steps: + - uses: actions/checkout@v4 + + - name: Detect changes + id: filter + uses: dorny/paths-filter@v3 + with: + filters: | + shared: + - 'shared/**' + processer: + - 'apps/processer/**' + converter: + - 'apps/converter/**' + coordinator: + - 'apps/coordinator/**' + ui: + - 'apps/ui/**' + metadata: + - 'apps/pyMetadata/**' + watcher: + - 'apps/pyWatcher/**' + + - name: Generate docker tag + id: tag + run: echo "tag=$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" >> $GITHUB_OUTPUT + + + build-shared: + needs: pre-check + if: ${{ needs.pre-check.outputs.shared == 'true' || github.event_name == 'workflow_dispatch' }} + uses: ./.github/workflows/build-shared.yml + with: + dockerTag: ${{ needs.pre-check.outputs.dockerTag }} + + + build-apps: + needs: [pre-check, build-shared] + runs-on: ubuntu-latest + + strategy: + matrix: + include: + - app: processer + type: java + enabled: true + - app: converter + type: java + enabled: true + - app: coordinator + type: java + enabled: true + - app: ui + type: java + enabled: false + - app: pyMetadata + type: python + enabled: true + - app: pyWatcher + type: python + enabled: true + + if: ${{ matrix.enabled == true && + (needs.pre-check.outputs[matrix.app] == 'true' + || needs.pre-check.outputs.shared == 'true' + || github.event_name == 'workflow_dispatch') }} + + steps: + - name: Call Java workflow + if: ${{ matrix.type == 'java' }} + uses: ./.github/workflows/build-java-app.yml + with: + app: ${{ matrix.app }} + dockerTag: ${{ needs.pre-check.outputs.dockerTag }} + + - name: Call Python workflow + if: ${{ matrix.type == 'python' }} + uses: ./.github/workflows/build-python-app.yml + with: + app: ${{ matrix.app }} + dockerTag: ${{ needs.pre-check.outputs.dockerTag }} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt new file mode 100644 index 00000000..c5fb5a64 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt @@ -0,0 +1,18 @@ +package no.iktdev.mediaprocessing.coordinator + +import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate +import org.springframework.stereotype.Service +import java.util.concurrent.ConcurrentHashMap + +@Service +class CoordinatorService { + + private val progressMap = ConcurrentHashMap() + + fun updateProgress(update: ProgressUpdate) { + progressMap[update.taskId] = update + } + + fun getProgress(taskId: String): ProgressUpdate? = + progressMap[taskId] +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt new file mode 100644 index 00000000..12a6de90 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt @@ -0,0 +1,22 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.CoordinatorService +import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/internal") +class InternalProcesserController( + private val coordinator: CoordinatorService +) { + + @PostMapping("/progress") + fun receiveProgress(@RequestBody update: ProgressUpdate): ResponseEntity { + coordinator.updateProgress(update) + return ResponseEntity.ok().build() + } +} diff --git a/apps/processer/build.gradle.kts b/apps/processer/build.gradle.kts index 1e1fcacb..d5032369 100644 --- a/apps/processer/build.gradle.kts +++ b/apps/processer/build.gradle.kts @@ -24,10 +24,11 @@ repositories { dependencies { /*Spring boot*/ + implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter-web") - implementation("org.springframework.boot:spring-boot-starter:2.7.0") - // implementation("org.springframework.kafka:spring-kafka:3.0.1") - implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + implementation("org.springframework.boot:spring-boot-starter-webflux") + + // implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt new file mode 100644 index 00000000..af5a267e --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt @@ -0,0 +1,24 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate +import org.springframework.http.MediaType +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient + +@Component +class CoordinatorClient( + private val webClient: WebClient +) { + + fun reportProgress(referenceId: String, taskId: String, percent: FfmpegDecodedProgress, message: String?) = + webClient.post() + .uri("/internal/progress") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue( + ProgressUpdate(referenceId, taskId, percent, message) + ) + .retrieve() + .toBodilessEntity() + +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt new file mode 100644 index 00000000..63350e2d --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.processer + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.web.reactive.function.client.WebClient + +@Configuration +class CoordinatorClientConfig { + + @Bean + fun coordinatorWebClient(builder: WebClient.Builder): WebClient { + val baseUrl = ProcesserEnv.coordinatorUrl + ?: error("COORDINATOR_URL must be set") + + return builder + .baseUrl(baseUrl) + .build() + } +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt index 33b741f1..d96cca75 100755 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt @@ -5,7 +5,7 @@ import java.io.File class ProcesserEnv { companion object { - val wsAllowedOrigins: String = System.getenv("AllowedOriginsWebsocket")?.takeIf { it.isNotBlank() } ?: "" + val coordinatorUrl = System.getenv("COORDINATOR_URL") ?: "http://coordinator:8080" val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt new file mode 100644 index 00000000..ebb6a46c --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/CancelController.kt @@ -0,0 +1,35 @@ +package no.iktdev.mediaprocessing.processer.controller + +import no.iktdev.mediaprocessing.processer.listeners.SubtitleTaskListener +import no.iktdev.mediaprocessing.processer.listeners.VideoTaskListener +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Controller +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping + +@Controller +class CancelController { + @Autowired + lateinit var videoTaskListener: VideoTaskListener + @Autowired + lateinit var subtitleTaskListener: SubtitleTaskListener + + @RequestMapping("/cancel/single") + fun cancelTask(@RequestBody eventId: String? = null): ResponseEntity { + if (eventId.isNullOrBlank()) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("No eventId provided!") + } + var canceled: Boolean = false + if (videoTaskListener.currentTaskId?.toString() == eventId) { + videoTaskListener.currentJob?.cancel() + canceled = true + } + if (subtitleTaskListener.currentTaskId?.toString() == eventId) { + subtitleTaskListener.currentJob?.cancel() + canceled = true + } + return if (canceled) ResponseEntity.status(HttpStatus.FOUND).body("Canceled") else ResponseEntity.status(HttpStatus.NOT_FOUND).body("Not found!") + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt index b49726ed..f9ba8bdd 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt @@ -7,6 +7,7 @@ import no.iktdev.eventi.tasks.TaskType import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.processer.CoordinatorClient import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.Util import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent @@ -15,7 +16,8 @@ import org.springframework.stereotype.Service import java.util.* @Service -class VideoTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { +class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) { + override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" override fun supports(task: Task) = task is EncodeTask @@ -59,16 +61,35 @@ class VideoTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { override fun getFfmpeg(): FFmpeg { return VideoFFmpeg(object : FFmpeg.Listener { + var lastProgress: FfmpegDecodedProgress? = null override fun onStarted(inputFile: String) { } override fun onCompleted(inputFile: String, outputFile: String) { + currentTask?.let { + coordinatorWebClient.reportProgress( + referenceId = it.referenceId.toString(), + taskId = it.taskId.toString(), + percent = FfmpegDecodedProgress(100, "", lastProgress?.duration ?: "", "0", estimatedCompletion = "", estimatedCompletionSeconds = 0), + "" + ) + } } override fun onProgressChanged( inputFile: String, progress: FfmpegDecodedProgress ) { + lastProgress = progress + currentTask?.let { + coordinatorWebClient.reportProgress( + referenceId = it.referenceId.toString(), + taskId = it.taskId.toString(), + percent = progress, + "" + ) + } + } }) } diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt index fd402198..37274057 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.processer.listeners +import io.mockk.mockk import kotlinx.coroutines.test.runTest import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task @@ -7,6 +8,7 @@ import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.mediaprocessing.ffmpeg.FFmpeg +import no.iktdev.mediaprocessing.processer.CoordinatorClient import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask @@ -19,7 +21,7 @@ import kotlin.system.measureTimeMillis class VideoTaskListenerTest { - class TestListener(val delay: Long): VideoTaskListener() { + class TestListener(val delay: Long, coordinatorClient: CoordinatorClient): VideoTaskListener(coordinatorClient) { fun getJob() = currentJob private var _result: Event? = null @@ -50,6 +52,7 @@ class VideoTaskListenerTest { fun setup() { TaskTypeRegistry.register(EncodeTask::class.java) } + private val coordinatorClient = mockk(relaxed = true) @Test fun `onTask waits for runner to complete`() = runTest { @@ -62,7 +65,7 @@ class VideoTaskListenerTest { ) ).newReferenceId() - val listener = TestListener(delay) + val listener = TestListener(delay, coordinatorClient) val time = measureTimeMillis { listener.accept(testTask, overrideReporter)