diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunner.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunner.kt index c6c9b9db..c5528149 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunner.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunner.kt @@ -20,8 +20,10 @@ class FfmpegRunner( val arguments: List, private val listener: FfmpegListener, val logDir: File - ) { + val workOutputFile = "$outputFile.work" + + val currentDateTime = LocalDateTime.now() val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd.HH.mm") val formattedDateTime = currentDateTime.format(formatter) @@ -39,9 +41,10 @@ class FfmpegRunner( } fun run(progress: Boolean = false) { + log.info { "Work file can be found at $workOutputFile" } val args = FfmpegArgumentsBuilder() .inputFile(inputFile) - .outputFile(outputFile) + .outputFile(workOutputFile) .args(arguments) .allowOverwrite(ProcesserEnv.allowOverwrite) .withProgress(progress) @@ -76,9 +79,19 @@ class FfmpegRunner( val result = processOp onOutputChanged("Received exit code: ${result.resultCode}") if (result.resultCode != 0) { + log.warn { "Work outputfile is orphaned and could be found using this path:\n$workOutputFile" } listener.onError(inputFile, result.output.joinToString("\n")) } else { - listener.onCompleted(inputFile, outputFile) + log.info { "Converting work file to output file: $workOutputFile -> $outputFile" } + val success = File(workOutputFile).renameTo(File(outputFile)) + if (!success) { + val outMessage = "Could not convert file $workOutputFile -> $outputFile" + log.error { outMessage } + listener.onError(inputFile, outMessage) + } else { + listener.onCompleted(inputFile, outputFile) + } + } } diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index 5abac058..a2f2b54e 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -10,6 +10,7 @@ import time from fuzzywuzzy import fuzz import mysql.connector from datetime import datetime +import asyncio import mysql.connector.cursor @@ -64,8 +65,8 @@ class EventsPullerThread(threading.Thread): GROUP BY referenceId HAVING SUM(event = 'event:media-read-base-info:performed') > 0 - AND SUM(event = 'event:media-metadata-search:performed') = 0 - AND SUM(event = 'event:media-process:completed') = 0 + AND SUM(event = 'event:media-metadata-search:performed') != 0 + AND SUM(event = 'event:media-process:completed') != 0 ) AND event = 'event:media-read-base-info:performed'; """) @@ -122,7 +123,7 @@ Found message logger.info(logMessage) event: MediaEvent = json_to_media_event(row["data"]) - producedEvent = MetadataEventHandler(event).run() + producedEvent = asyncio.run(MetadataEventHandler(event).run()) producedMessage = f""" ============================================================================ @@ -171,20 +172,19 @@ Producing message global should_stop should_stop = True -class MetadataEventHandler(): +class MetadataEventHandler: mediaEvent: MediaEvent | None = None + def __init__(self, data: MediaEvent): super().__init__() - self.mediaEvent = None - self.mediaEvent = data logger.info(self.mediaEvent) - def run(self) -> MediaEvent: + async def run(self) -> MediaEvent | None: logger.info("Starting search") - if (self.mediaEvent is None): + if self.mediaEvent is None: logger.error("Event does not contain anything...") - return + return None event: MediaEvent = self.mediaEvent @@ -194,48 +194,49 @@ class MetadataEventHandler(): event.data.sanitizedName ]) - joinedTitles = "\n".join(searchableTitles) logger.info("Searching for: %s", joinedTitles) - result: Metadata | None = self.__getMetadata(searchableTitles) + + # Kjør den asynkrone søkemetoden + result: Metadata | None = await self.__getMetadata(searchableTitles) result_message: str | None = None - if (result is None): + if result is None: result_message = f"No result for {joinedTitles}" logger.info(result_message) - producedEvent = MediaEvent( - metadata = EventMetadata( + metadata=EventMetadata( referenceId=event.metadata.referenceId, eventId=str(uuid.uuid4()), derivedFromEventId=event.metadata.eventId, - status= "Failed" if result is None else "Success", - created= datetime.now().isoformat() + status="Failed" if result is None else "Success", + created=datetime.now().isoformat() ), data=result, eventType="EventMediaMetadataSearchPerformed" ) return producedEvent - - def __getMetadata(self, titles: List[str]) -> Metadata | None: + async def __getMetadata(self, titles: List[str]) -> Metadata | None: mal = Mal(titles=titles) anii = Anii(titles=titles) imdb = Imdb(titles=titles) - results: List[Metadata] = [ + results: List[Metadata | None] = await asyncio.gather( mal.search(), anii.search(), imdb.search() - ] + ) + filtered_results = [result for result in results if result is not None] logger.info("\nSimple matcher") simpleSelector = SimpleMatcher(titles=titles, metadata=filtered_results).getBestMatch() logger.info("\nAdvanced matcher") advancedSelector = AdvancedMatcher(titles=titles, metadata=filtered_results).getBestMatch() - logger.info("\nPrefrix matcher") + logger.info("\nPrefix matcher") prefixSelector = PrefixMatcher(titles=titles, metadata=filtered_results).getBestMatch() + if simpleSelector is not None: return simpleSelector if advancedSelector is not None: @@ -244,7 +245,6 @@ class MetadataEventHandler(): return prefixSelector return None - # Global variabel for å indikere om applikasjonen skal avsluttes should_stop = False diff --git a/apps/pyMetadata/sources/anii.py b/apps/pyMetadata/sources/anii.py index bc825bb0..32a2ed13 100644 --- a/apps/pyMetadata/sources/anii.py +++ b/apps/pyMetadata/sources/anii.py @@ -1,32 +1,35 @@ import logging, sys import hashlib -from typing import List +from typing import List, Dict, Optional from clazz.Metadata import Metadata, Summary from .source import SourceBase from AnilistPython import Anilist +import asyncio log = logging.getLogger(__name__) class Anii(SourceBase): - def __init__(self, titles: List[str]) -> None: super().__init__(titles) - def search(self) -> Metadata | None: - idToTitle: dict[str, str] = {} - results: dict[str, str] = {} + async def search(self) -> Optional[Metadata]: + idToTitle: Dict[str, str] = {} + results: Dict[str, Dict] = {} try: for title in self.titles: try: - result = Anilist().get_anime(title) + result = await asyncio.to_thread(Anilist().get_anime, title) if result: _title = result.get("name_english", None) - givenId = self.generate_id(_title) - if givenId: - idToTitle[givenId] = _title - results[givenId] = result + if _title is None: + _title = result.get("name_romaji", None) + if _title is not None: + givenId = await asyncio.to_thread(self.generate_id, _title) + if givenId: + idToTitle[givenId] = _title + results[givenId] = result except IndexError as notFound: pass except Exception as e: @@ -37,50 +40,44 @@ class Anii(SourceBase): except Exception as e: log.exception(e) - if not idToTitle or not results: self.logNoMatch("Anii", titles=self.titles) return None - best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles) + best_match_id, best_match_title = await asyncio.to_thread(self.findBestMatchAcrossTitles, idToTitle, self.titles) - return self.__getMetadata(results[best_match_id]) - - def queryIds(self, title: str) -> dict[str, str]: - return super().queryIds(title) + return await self.__getMetadata(results[best_match_id]) + async def queryIds(self, title: str) -> Dict[str, str]: + return await asyncio.to_thread(super().queryIds, title) - def __getMetadata(self, result: dict) -> Metadata: + async def __getMetadata(self, result: Dict) -> Optional[Metadata]: try: summary = result.get("desc", None) return Metadata( - title = result.get("name_english", None), - altTitle = [result.get("name_romaji", [])], - cover = result.get("cover_image", None), - banner = None, - summary = [] if summary is None else [ + title=result.get("name_english", None), + altTitle=[result.get("name_romaji", [])], + cover=result.get("cover_image", None), + banner=None, + summary=[] if summary is None else [ Summary( - language = "eng", - summary = summary + language="eng", + summary=summary ) ], - type = self.getMediaType(result.get('airing_format', '')), - genres = result.get('genres', []), + type=self.getMediaType(result.get('airing_format', '')), + genres=result.get('genres', []), source="anii", ) except Exception as e: log.exception(e) return None - - def generate_id(self, text: str) -> str | None: + async def generate_id(self, text: str) -> Optional[str]: if text: - return hashlib.md5(text.encode()).hexdigest() + return await asyncio.to_thread(hashlib.md5, text.encode()).hexdigest() return None - + def getMediaType(self, type: str) -> str: return 'movie' if type.lower() == 'movie' else 'serie' - - - diff --git a/apps/pyMetadata/sources/imdb.py b/apps/pyMetadata/sources/imdb.py index 2627b612..3ae0e9f7 100644 --- a/apps/pyMetadata/sources/imdb.py +++ b/apps/pyMetadata/sources/imdb.py @@ -2,23 +2,23 @@ import logging from imdb import Cinemagoer from imdb.Movie import Movie -from typing import List +from typing import List, Dict, Optional from clazz.Metadata import Metadata, Summary from .source import SourceBase +import asyncio log = logging.getLogger(__name__) class Imdb(SourceBase): - def __init__(self, titles: List[str]) -> None: super().__init__(titles) - def search(self) -> Metadata | None: - idToTitle: dict[str, str] = {} + async def search(self) -> Optional[Metadata]: + idToTitle: Dict[str, str] = {} for title in self.titles: - receivedIds = self.queryIds(title) + receivedIds = await self.queryIds(title) for id, title in receivedIds.items(): idToTitle[id] = title @@ -28,38 +28,40 @@ class Imdb(SourceBase): best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles) - return self.__getMetadata(best_match_id) + return await self.__getMetadata(best_match_id) - def queryIds(self, title: str) -> dict[str, str]: - idToTitle: dict[str, str] = {} + async def queryIds(self, title: str) -> Dict[str, str]: + idToTitle: Dict[str, str] = {} try: - search = Cinemagoer().search_movie(title) + search = await asyncio.to_thread(Cinemagoer().search_movie, title) cappedResult: List[Movie] = search[:5] - usable: List[Movie] = [found for found in cappedResult if self.isMatchOrPartial("Imdb", title, found._getitem("title"))] + usable = [ + found for found in cappedResult if await asyncio.to_thread(self.isMatchOrPartial, "Imdb", title, found.get("title")) + ] for item in usable: - idToTitle[item.movieID] = item._getitem("title") + idToTitle[item.movieID] = item.get("title") except Exception as e: log.exception(e) return idToTitle - def __getMetadata(self, id: str) -> Metadata | None: + async def __getMetadata(self, id: str) -> Optional[Metadata]: try: - result = Cinemagoer().get_movie(id) + result = await asyncio.to_thread(Cinemagoer().get_movie, id) summary = result.get("plot outline", None) return Metadata( - title = result.get("title", None), - altTitle = [result.get("localized title", [])], - cover = result.get("cover url", None), - banner = None, - summary = [] if summary is None else [ + title=result.get("title", None), + altTitle=[result.get("localized title", [])], + cover=result.get("cover url", None), + banner=None, + summary=[] if summary is None else [ Summary( - language = "eng", - summary = summary + language="eng", + summary=summary ) ], - type = self.getMediaType(result.get('kind', '')), - genres = result.get('genres', []), + type=self.getMediaType(result.get('kind', '')), + genres=result.get('genres', []), source="imdb", ) except Exception as e: diff --git a/apps/pyMetadata/sources/mal.py b/apps/pyMetadata/sources/mal.py index f7b7d53d..931529bb 100644 --- a/apps/pyMetadata/sources/mal.py +++ b/apps/pyMetadata/sources/mal.py @@ -1,71 +1,73 @@ import logging, sys -from typing import List +from typing import Dict, List, Optional from clazz.Metadata import Metadata, Summary from .source import SourceBase from mal import Anime, AnimeSearch, AnimeSearchResult +import asyncio log = logging.getLogger(__name__) class Mal(SourceBase): - """""" def __init__(self, titles: List[str]) -> None: super().__init__(titles) - def search(self) -> Metadata | None: - idToTitle: dict[str, str] = {} + async def search(self) -> Optional[Metadata]: + idToTitle: Dict[str, str] = {} for title in self.titles: - receivedIds = self.queryIds(title) + receivedIds = await self.queryIds(title) for id, title in receivedIds.items(): idToTitle[id] = title - + if not idToTitle: self.logNoMatch("MAL", titles=self.titles) return None best_match_id, best_match_title = self.findBestMatchAcrossTitles(idToTitle, self.titles) - return self.__getMetadata(best_match_id) - - def queryIds(self, title: str) -> dict[str, str]: - idToTitle: dict[str, str] = {} + return await self.__getMetadata(best_match_id) + + async def queryIds(self, title: str) -> Dict[str, str]: + idToTitle: Dict[str, str] = {} try: - search = AnimeSearch(title) + search = await asyncio.to_thread(AnimeSearch, title) cappedResult: List[AnimeSearchResult] = search.results[:5] - usable: List[AnimeSearchResult] = [found for found in cappedResult if self.isMatchOrPartial("MAL", title, found.title)] + usable = [ + found for found in cappedResult if await asyncio.to_thread(self.isMatchOrPartial, "MAL", title, found.title) + ] for item in usable: log.info(f"malId: {item.mal_id} to {item.title}") idToTitle[item.mal_id] = item.title except Exception as e: log.exception(e) return idToTitle - - def __getMetadata(self, id: str): + + async def __getMetadata(self, id: str) -> Optional[Metadata]: try: - anime = Anime(id) + anime = await asyncio.to_thread(Anime, id) return Metadata( - title = anime.title, - altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName], - cover = anime.image_url, - banner = None, - summary = [] if anime.synopsis is None else [ - Summary( - language = "eng", - summary = anime.synopsis - ) - ], - type = self.getMediaType(anime.type), - genres = anime.genres, - source="mal", - ) + title=anime.title, + altTitle=[altName for altName in [anime.title_english, *anime.title_synonyms] if altName], + cover=anime.image_url, + banner=None, + summary=[] if anime.synopsis is None else [ + Summary( + language="eng", + summary=anime.synopsis + ) + ], + type=self.getMediaType(anime.type), + genres=anime.genres, + source="mal", + ) except Exception as e: log.exception(e) return None - + def getMediaType(self, type: str) -> str: return 'movie' if type.lower() == 'movie' else 'serie' \ No newline at end of file diff --git a/apps/pyMetadata/sources/source.py b/apps/pyMetadata/sources/source.py index aaa09637..8af2355e 100644 --- a/apps/pyMetadata/sources/source.py +++ b/apps/pyMetadata/sources/source.py @@ -6,6 +6,8 @@ from typing import List, Tuple from fuzzywuzzy import fuzz from clazz.Metadata import Metadata +import asyncio + log = logging.getLogger(__name__) @@ -17,11 +19,11 @@ class SourceBase(ABC): self.titles = titles @abstractmethod - def search(self, ) -> Metadata | None: + async def search(self, ) -> Metadata | None: pass @abstractmethod - def queryIds(self, title: str) -> dict[str, str]: + async def queryIds(self, title: str) -> dict[str, str]: pass def isMatchOrPartial(self, source: str | None, title, foundTitle) -> bool: diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index 2fe50f41..667be2e6 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -112,7 +112,6 @@ abstract class EventCoordinator> { } pullDelay.set(slowPullDelay.get()) } - referencePool.values.awaitAll() } waitForConditionOrTimeout(pullDelay.get()) { newEventProduced