diff --git a/apps/py-metadata/api/health_api.py b/apps/py-metadata/api/health_api.py new file mode 100644 index 00000000..ac58dd6b --- /dev/null +++ b/apps/py-metadata/api/health_api.py @@ -0,0 +1,50 @@ +# health_api.py +import time +from fastapi import FastAPI +from fastapi.responses import JSONResponse + +app = FastAPI() + +# Disse settes av app.py +db = None +get_worker_heartbeat = None + + +def init_health_api(database, heartbeat_ref): + """ + Kalles fra app.py for å gi health-API tilgang til DB og worker-heartbeat. + """ + global db, get_worker_heartbeat + db = database + get_worker_heartbeat = heartbeat_ref + + +@app.get("/health") +async def health(): + db_ok = False + worker_ok = False + + # Sjekk database + try: + db.ping() + db_ok = True + except Exception: + db_ok = False + + # Sjekk worker heartbeat + try: + last = get_worker_heartbeat() + worker_ok = (time.time() - last) < 10 # 10 sekunder uten heartbeat = død + except Exception: + worker_ok = False + + status = db_ok and worker_ok + + return JSONResponse( + status_code=200 if status else 500, + content={ + "status": "ok" if status else "error", + "database": db_ok, + "worker": worker_ok + } + ) diff --git a/apps/py-metadata/app.py b/apps/py-metadata/app.py index b12a2e00..5715125f 100644 --- a/apps/py-metadata/app.py +++ b/apps/py-metadata/app.py @@ -1,18 +1,35 @@ import signal import sys +from threading import Thread +from api.health_api import init_health_api from config.database_config import DatabaseConfig from db.database import Database from utils.logger import logger from worker.poller import run_worker +import uvicorn # global flag for shutdown shutdown_flag = False +worker_heartbeat = 0 def handle_shutdown(signum, frame): global shutdown_flag logger.info("🛑 Shutdown signal mottatt, avslutter worker...") shutdown_flag = True +def set_heartbeat(ts): + global worker_heartbeat + worker_heartbeat = ts + +def get_heartbeat(): + return worker_heartbeat + +def start_health_server(): + """ Starter FastAPI health-server i egen tråd. """ + uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="warning") + + + def main(): # registrer signal handlers for graceful shutdown signal.signal(signal.SIGINT, handle_shutdown) @@ -23,7 +40,16 @@ def main(): config: DatabaseConfig = DatabaseConfig.from_env() db: Database = Database(config) db.connect() - run_worker(db=db, shutdown_flag_ref=lambda: shutdown_flag) + + # Init health-API med DB og heartbeat-ref + init_health_api(db, get_heartbeat) + + # Start health-server i egen tråd + Thread(target=start_health_server, daemon=True).start() + logger.info("🌡️ Health API startet på port 8080") + + + run_worker(db=db, shutdown_flag_ref=lambda: shutdown_flag, heartbeat_ref=lambda ts: set_heartbeat(ts)) except Exception as e: logger.error(f"❌ Kritisk feil i app: {e}") sys.exit(1) diff --git a/apps/py-metadata/db/database.py b/apps/py-metadata/db/database.py index 89340260..c26ed07c 100644 --- a/apps/py-metadata/db/database.py +++ b/apps/py-metadata/db/database.py @@ -1,3 +1,4 @@ +from ctypes import Union from config.database_config import DatabaseConfig from utils.logger import logger import mysql.connector @@ -51,3 +52,15 @@ class Database: cursor = self.conn.cursor(dictionary=True) cursor.execute(sql, params or ()) return cursor.fetchall() + + def ping(self): + try: + self.validate() + cursor = self.conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + return True + except Exception as e: + logger.error(f"Ping failed: {e}") + return False + diff --git a/apps/py-metadata/requirements.txt b/apps/py-metadata/requirements.txt index 2cf8ef7c..9f3f963b 100644 --- a/apps/py-metadata/requirements.txt +++ b/apps/py-metadata/requirements.txt @@ -7,4 +7,6 @@ mal-api>=0.5.3 Unidecode>=1.3.8 tabulate>=0.9.0 mysql-connector-python>=9.0.0 -pydantic>=2.12.5 \ No newline at end of file +pydantic>=2.12.5 +fastapi==0.124.4 +uvicorn==0.28.0 \ No newline at end of file diff --git a/apps/py-metadata/worker/poller.py b/apps/py-metadata/worker/poller.py index c743eea0..b6880c8c 100644 --- a/apps/py-metadata/worker/poller.py +++ b/apps/py-metadata/worker/poller.py @@ -47,11 +47,14 @@ def run_iteration(db: Database, worker_id: str, poll_interval: int) -> tuple[int db.connect() return poll_interval, 5 -def run_worker(db: Database, shutdown_flag_ref=lambda: False) -> None: +def run_worker(db: Database, shutdown_flag_ref=lambda: False, heartbeat_ref=None) -> None: poll_interval: int = 5 worker_id = f"worker-{uuid.uuid4()}" while not shutdown_flag_ref(): + if heartbeat_ref: + heartbeat_ref(time.time()) + sleep_interval, poll_interval = run_iteration(db, worker_id, poll_interval) time.sleep(sleep_interval) diff --git a/apps/py-watcher/api/health_api.py b/apps/py-watcher/api/health_api.py index 8e70b572..784adc4b 100644 --- a/apps/py-watcher/api/health_api.py +++ b/apps/py-watcher/api/health_api.py @@ -1,19 +1,49 @@ +# api/health_api.py +import time from fastapi import FastAPI from fastapi.responses import JSONResponse -def create_health_app(observers_ref): +def create_health_app(observers_ref, db_ref, heartbeat_ref): """ Returnerer en FastAPI-app med /health endpoint. - observers_ref: en funksjon eller lambda som gir listen av observers. + observers_ref: lambda -> liste av observer-tråder + db_ref: lambda -> Database-objekt + heartbeat_ref: lambda -> siste worker heartbeat timestamp """ app = FastAPI() @app.get("/health") def health(): + # Sjekk observers observers = observers_ref() - healthy = all(obs.is_alive() for obs in observers) - status = "healthy" if healthy else "unhealthy" - code = 200 if healthy else 500 - return JSONResponse({"status": status}, status_code=code) + observers_ok = all(obs.is_alive() for obs in observers) + + # Sjekk database + db_ok = False + try: + db_ref().ping() + db_ok = True + except Exception: + db_ok = False + + # Sjekk worker heartbeat + worker_ok = False + try: + last = heartbeat_ref() + worker_ok = (time.time() - last) < 10 + except Exception: + worker_ok = False + + healthy = observers_ok and db_ok and worker_ok + + return JSONResponse( + status_code=200 if healthy else 500, + content={ + "status": "healthy" if healthy else "unhealthy", + "observers": observers_ok, + "database": db_ok, + "worker": worker_ok + } + ) return app diff --git a/apps/py-watcher/app.py b/apps/py-watcher/app.py index e23a92d7..31a56d15 100644 --- a/apps/py-watcher/app.py +++ b/apps/py-watcher/app.py @@ -1,7 +1,10 @@ +# app.py import asyncio import signal import sys +import time import uvicorn + from api.health_api import create_health_app from config.database_config import DatabaseConfig from db.database import Database @@ -12,17 +15,31 @@ from utils.logger import logger # global flag for shutdown shutdown_flag = False observers = [] +worker_heartbeat = time.time() + def handle_shutdown(signum, frame): global shutdown_flag logger.info("🛑 Shutdown signal mottatt, avslutter worker...") shutdown_flag = True + +def set_heartbeat(ts): + global worker_heartbeat + worker_heartbeat = ts + + +def get_heartbeat(): + return worker_heartbeat + + async def run_worker(db: Database, paths, extensions, shutdown_flag_ref): global observers - observers = observers = [start_observer(db, [p], extensions, insert_event) for p in paths] + observers = [start_observer(db, [p], extensions, insert_event) for p in paths] + try: while not shutdown_flag_ref(): + set_heartbeat(time.time()) await asyncio.sleep(5) finally: logger.info("🛑 Stopper observer...") @@ -33,33 +50,44 @@ async def run_worker(db: Database, paths, extensions, shutdown_flag_ref): return observers + def main(): # registrer signal handlers for graceful shutdown signal.signal(signal.SIGINT, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) logger.info("🚀 Starter worker-applikasjon") + try: + # DB config: DatabaseConfig = DatabaseConfig.from_env() db: Database = Database(config) db.connect() - # paths og extensions fra PathsConfig + # paths og extensions from config.paths_config import PathsConfig paths_config = PathsConfig.from_env() paths_config.validate() + # start worker loop = asyncio.get_event_loop() loop.create_task(run_worker(db, paths_config.watch_paths, paths_config.extensions, lambda: shutdown_flag)) - # bruk health_api - app = create_health_app(lambda: observers) + # health API + app = create_health_app( + observers_ref=lambda: observers, + db_ref=lambda: db, + heartbeat_ref=get_heartbeat + ) + uvicorn.run(app, host="0.0.0.0", port=8000) + except Exception as e: logger.error(f"❌ Kritisk feil i app: {e}") sys.exit(1) logger.info("👋 Worker avsluttet gracefully") + if __name__ == "__main__": main() diff --git a/apps/py-watcher/db/database.py b/apps/py-watcher/db/database.py index 89340260..3ef2db53 100644 --- a/apps/py-watcher/db/database.py +++ b/apps/py-watcher/db/database.py @@ -51,3 +51,15 @@ class Database: cursor = self.conn.cursor(dictionary=True) cursor.execute(sql, params or ()) return cursor.fetchall() + + def ping(self): + try: + self.validate() + cursor = self.conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + return True + except Exception as e: + logger.error(f"Ping failed: {e}") + return False +