Adjustments
This commit is contained in:
parent
fa8be8f08a
commit
93de9b4497
@ -1,60 +1,78 @@
|
|||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
from api.health_api import app as health_app, init_health_api
|
from api.health_api import app as health_app, init_health_api
|
||||||
from config.database_config import DatabaseConfig
|
from config.database_config import DatabaseConfig
|
||||||
from db.database import Database
|
from db.database import Database
|
||||||
from utils.logger import logger
|
from utils.logger import logger
|
||||||
from worker.poller import run_worker
|
from worker.poller import run_worker
|
||||||
import uvicorn
|
|
||||||
|
|
||||||
# global flag for shutdown
|
|
||||||
shutdown_flag = False
|
shutdown_flag = False
|
||||||
worker_heartbeat = 0
|
worker_heartbeat = time.time()
|
||||||
|
|
||||||
|
|
||||||
def handle_shutdown(signum, frame):
|
def handle_shutdown(signum, frame):
|
||||||
global shutdown_flag
|
global shutdown_flag
|
||||||
logger.info("🛑 Shutdown signal mottatt, avslutter worker...")
|
logger.info("🛑 Shutdown signal mottatt, avslutter worker...")
|
||||||
shutdown_flag = True
|
shutdown_flag = True
|
||||||
|
|
||||||
def set_heartbeat(ts):
|
|
||||||
global worker_heartbeat
|
|
||||||
worker_heartbeat = ts
|
|
||||||
|
|
||||||
def get_heartbeat():
|
def set_heartbeat(ts):
|
||||||
return worker_heartbeat
|
global worker_heartbeat
|
||||||
|
worker_heartbeat = ts
|
||||||
|
|
||||||
def start_health_server():
|
|
||||||
""" Starter FastAPI health-server i egen tråd. """
|
def get_heartbeat():
|
||||||
|
return worker_heartbeat
|
||||||
|
|
||||||
|
|
||||||
|
def start_health_server():
|
||||||
uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="warning")
|
uvicorn.run(health_app, host="0.0.0.0", port=8080, log_level="warning")
|
||||||
|
|
||||||
|
|
||||||
|
def start_worker(db):
|
||||||
|
run_worker(
|
||||||
|
db=db,
|
||||||
|
shutdown_flag_ref=lambda: shutdown_flag,
|
||||||
|
heartbeat_ref=lambda ts: set_heartbeat(ts)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# registrer signal handlers for graceful shutdown
|
|
||||||
signal.signal(signal.SIGINT, handle_shutdown)
|
signal.signal(signal.SIGINT, handle_shutdown)
|
||||||
signal.signal(signal.SIGTERM, handle_shutdown)
|
signal.signal(signal.SIGTERM, handle_shutdown)
|
||||||
|
|
||||||
logger.info("🚀 Starter worker-applikasjon")
|
logger.info("🚀 Starter worker-applikasjon")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
config: DatabaseConfig = DatabaseConfig.from_env()
|
config = DatabaseConfig.from_env()
|
||||||
db: Database = Database(config)
|
db = Database(config)
|
||||||
db.connect()
|
db.connect()
|
||||||
|
|
||||||
# Init health-API med DB og heartbeat-ref
|
# Init health API
|
||||||
init_health_api(db, get_heartbeat)
|
init_health_api(db, get_heartbeat)
|
||||||
|
|
||||||
# Start health-server i egen tråd
|
# Start health server i egen tråd
|
||||||
Thread(target=start_health_server, daemon=True).start()
|
Thread(target=start_health_server, daemon=True).start()
|
||||||
logger.info("🌡️ Health API startet på port 8080")
|
logger.info("🌡️ Health API startet på port 8080")
|
||||||
|
|
||||||
|
|
||||||
run_worker(db=db, shutdown_flag_ref=lambda: shutdown_flag, heartbeat_ref=lambda ts: set_heartbeat(ts))
|
# Start worker i egen tråd
|
||||||
|
Thread(target=start_worker, args=(db,), daemon=True).start()
|
||||||
|
logger.info("🔁 Worker startet i egen tråd")
|
||||||
|
|
||||||
|
# Hold main-tråden i live
|
||||||
|
while not shutdown_flag:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Kritisk feil i app: {e}")
|
logger.error(f"❌ Kritisk feil i app: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
logger.info("👋 Worker avsluttet gracefully")
|
logger.info("👋 Worker avsluttet gracefully")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
@ -1,8 +1,8 @@
|
|||||||
# app.py
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from threading import Thread
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
from api.health_api import create_health_app
|
from api.health_api import create_health_app
|
||||||
@ -12,7 +12,6 @@ from db.repository import insert_event
|
|||||||
from worker.file_watcher import start_observer
|
from worker.file_watcher import start_observer
|
||||||
from utils.logger import logger
|
from utils.logger import logger
|
||||||
|
|
||||||
# global flag for shutdown
|
|
||||||
shutdown_flag = False
|
shutdown_flag = False
|
||||||
observers = []
|
observers = []
|
||||||
worker_heartbeat = time.time()
|
worker_heartbeat = time.time()
|
||||||
@ -40,6 +39,7 @@ async def run_worker(db: Database, paths, extensions, shutdown_flag_ref):
|
|||||||
try:
|
try:
|
||||||
while not shutdown_flag_ref():
|
while not shutdown_flag_ref():
|
||||||
set_heartbeat(time.time())
|
set_heartbeat(time.time())
|
||||||
|
logger.debug("Heartbeat oppdatert")
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
finally:
|
finally:
|
||||||
logger.info("🛑 Stopper observer...")
|
logger.info("🛑 Stopper observer...")
|
||||||
@ -48,31 +48,26 @@ async def run_worker(db: Database, paths, extensions, shutdown_flag_ref):
|
|||||||
obs.join()
|
obs.join()
|
||||||
logger.info("👋 Alle observers stoppet")
|
logger.info("👋 Alle observers stoppet")
|
||||||
|
|
||||||
return observers
|
|
||||||
|
def start_health_server(app):
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=8080, log_level="warning")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# registrer signal handlers for graceful shutdown
|
|
||||||
signal.signal(signal.SIGINT, handle_shutdown)
|
signal.signal(signal.SIGINT, handle_shutdown)
|
||||||
signal.signal(signal.SIGTERM, handle_shutdown)
|
signal.signal(signal.SIGTERM, handle_shutdown)
|
||||||
|
|
||||||
logger.info("🚀 Starter worker-applikasjon")
|
logger.info("🚀 Starter watcher-applikasjon")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# DB
|
config = DatabaseConfig.from_env()
|
||||||
config: DatabaseConfig = DatabaseConfig.from_env()
|
db = Database(config)
|
||||||
db: Database = Database(config)
|
|
||||||
db.connect()
|
db.connect()
|
||||||
|
|
||||||
# paths og extensions
|
|
||||||
from config.paths_config import PathsConfig
|
from config.paths_config import PathsConfig
|
||||||
paths_config = PathsConfig.from_env()
|
paths_config = PathsConfig.from_env()
|
||||||
paths_config.validate()
|
paths_config.validate()
|
||||||
|
|
||||||
# start worker
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.create_task(run_worker(db, paths_config.watch_paths, paths_config.extensions, lambda: shutdown_flag))
|
|
||||||
|
|
||||||
# health API
|
# health API
|
||||||
app = create_health_app(
|
app = create_health_app(
|
||||||
observers_ref=lambda: observers,
|
observers_ref=lambda: observers,
|
||||||
@ -80,7 +75,18 @@ def main():
|
|||||||
heartbeat_ref=get_heartbeat
|
heartbeat_ref=get_heartbeat
|
||||||
)
|
)
|
||||||
|
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8080)
|
# start health server i egen tråd
|
||||||
|
Thread(target=start_health_server, args=(app,), daemon=True).start()
|
||||||
|
logger.info("🌡️ Health API startet på port 8080")
|
||||||
|
|
||||||
|
# start worker i event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(run_worker(
|
||||||
|
db,
|
||||||
|
paths_config.watch_paths,
|
||||||
|
paths_config.extensions,
|
||||||
|
lambda: shutdown_flag
|
||||||
|
))
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Kritisk feil i app: {e}")
|
logger.error(f"❌ Kritisk feil i app: {e}")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user