diff --git a/core/libs/commonwealth/src/commonwealth/settings/managers/pydantic_manager.py b/core/libs/commonwealth/src/commonwealth/settings/managers/pydantic_manager.py index 1a8be18ebc..7f98dc6edd 100644 --- a/core/libs/commonwealth/src/commonwealth/settings/managers/pydantic_manager.py +++ b/core/libs/commonwealth/src/commonwealth/settings/managers/pydantic_manager.py @@ -1,9 +1,10 @@ import pathlib import re -from typing import Any, Optional, Type +from typing import Any, Dict, Optional, Type import appdirs from commonwealth.settings.bases.pydantic_base import PydanticSettings +from commonwealth.utils.events import events from loguru import logger @@ -29,6 +30,7 @@ def __init__( self.config_folder.mkdir(parents=True, exist_ok=True) self.settings_type = settings_type self._settings: Optional[PydanticSettings] = None + self._initial_event_emitted = False logger.debug( f"Starting {project_name} settings with {settings_type.__name__}, configuration path: {config_folder}" ) @@ -97,6 +99,7 @@ def load_from_file(settings_type: Type[PydanticSettings], file_path: pathlib.Pat def save(self) -> None: """Save settings""" self.settings.save(self.settings_file_path()) + self._publish_settings_snapshot("save") def load(self) -> None: """Load settings""" @@ -125,6 +128,7 @@ def get_settings_version_from_filename(filename: pathlib.Path) -> int: try: self._settings = PydanticManager.load_from_file(self.settings_type, valid_file) logger.debug(f"Using {valid_file} as settings source") + self._emit_initial_settings_event() return except Exception as exception: logger.debug("Invalid settings, going to try another file:", exception) @@ -132,6 +136,7 @@ def get_settings_version_from_filename(filename: pathlib.Path) -> int: logger.debug("No valid settings found, using default settings") self._settings = self.settings_type() self.save() + self._emit_initial_settings_event() def _clear_temp_files(self) -> None: """Clear temporary files""" @@ -140,3 +145,30 @@ def _clear_temp_files(self) -> None: temp_file.unlink() except Exception as exception: logger.debug(f"Failed to clear temporary file {temp_file}: {exception}") + + def _serialize_settings(self) -> Optional[Dict[str, Any]]: + if not self._settings: + return None + try: + if hasattr(self._settings, "model_dump"): + return self._settings.model_dump() + return self._settings.dict() + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Failed to serialize settings for event publishing: {exc}") + return None + + def _publish_settings_snapshot(self, reason: str) -> None: + serialized = self._serialize_settings() + if not serialized: + return + metadata = {"project": self.project_name, "reason": reason} + try: + events.publish_settings(serialized, metadata) + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Unable to publish settings event: {exc}") + + def _emit_initial_settings_event(self) -> None: + if self._initial_event_emitted: + return + self._initial_event_emitted = True + self._publish_settings_snapshot("initial-load") diff --git a/core/libs/commonwealth/src/commonwealth/settings/managers/pykson_manager.py b/core/libs/commonwealth/src/commonwealth/settings/managers/pykson_manager.py index 402f30b913..f54953f66f 100644 --- a/core/libs/commonwealth/src/commonwealth/settings/managers/pykson_manager.py +++ b/core/libs/commonwealth/src/commonwealth/settings/managers/pykson_manager.py @@ -1,9 +1,11 @@ +import json import pathlib import re -from typing import Any, Optional, Type +from typing import Any, Dict, Optional, Type import appdirs from commonwealth.settings.bases.pykson_base import PyksonSettings +from commonwealth.utils.events import events from loguru import logger @@ -29,6 +31,7 @@ def __init__( self.config_folder.mkdir(parents=True, exist_ok=True) self.settings_type = settings_type self._settings: Optional[PyksonSettings] = None + self._initial_event_emitted = False logger.debug( f"Starting {project_name} settings with {settings_type.__name__}, configuration path: {config_folder}" ) @@ -93,6 +96,7 @@ def load_from_file(settings_type: Type[PyksonSettings], file_path: pathlib.Path) def save(self) -> None: """Save settings""" self.settings.save(self.settings_file_path()) + self._publish_settings_snapshot("save") def load(self) -> None: """Load settings""" @@ -121,6 +125,7 @@ def get_settings_version_from_filename(filename: pathlib.Path) -> int: try: self._settings = PyksonManager.load_from_file(self.settings_type, valid_file) logger.debug(f"Using {valid_file} as settings source") + self._emit_initial_settings_event() return except Exception as exception: logger.debug("Invalid settings, going to try another file:", exception) @@ -128,6 +133,7 @@ def get_settings_version_from_filename(filename: pathlib.Path) -> int: logger.debug("No valid settings found, using default settings") self._settings = self.settings_type() self.save() + self._emit_initial_settings_event() def _clear_temp_files(self) -> None: """Clear temporary files""" @@ -136,3 +142,29 @@ def _clear_temp_files(self) -> None: temp_file.unlink() except Exception as exception: logger.debug(f"Failed to clear temporary file {temp_file}: {exception}") + + def _serialize_settings(self) -> Optional[Dict[str, Any]]: + if not self._settings: + return None + try: + raw = Pykson().to_json(self._settings) + return json.loads(raw) + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Failed to serialize pykson settings: {exc}") + return None + + def _publish_settings_snapshot(self, reason: str) -> None: + serialized = self._serialize_settings() + if not serialized: + return + metadata = {"project": self.project_name, "reason": reason} + try: + events.publish_settings(serialized, metadata) + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Unable to publish settings event: {exc}") + + def _emit_initial_settings_event(self) -> None: + if self._initial_event_emitted: + return + self._initial_event_emitted = True + self._publish_settings_snapshot("initial-load") diff --git a/core/libs/commonwealth/src/commonwealth/utils/__init__.py b/core/libs/commonwealth/src/commonwealth/utils/__init__.py index e69de29bb2..5275e4cfec 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/__init__.py +++ b/core/libs/commonwealth/src/commonwealth/utils/__init__.py @@ -0,0 +1,21 @@ +from commonwealth.utils.events import ( + events, + init_event_publisher, + publish_error_event, + publish_health_event, + publish_running_event, + publish_settings_event, + publish_start_event, + publish_stop_event, +) + +__all__ = [ + "events", + "init_event_publisher", + "publish_start_event", + "publish_settings_event", + "publish_running_event", + "publish_health_event", + "publish_error_event", + "publish_stop_event", +] diff --git a/core/libs/commonwealth/src/commonwealth/utils/events.py b/core/libs/commonwealth/src/commonwealth/utils/events.py new file mode 100644 index 0000000000..17c857d287 --- /dev/null +++ b/core/libs/commonwealth/src/commonwealth/utils/events.py @@ -0,0 +1,241 @@ +"""Publish structured Foxglove events for Python services.""" + +from __future__ import annotations + +import asyncio +import atexit +import json +import sys +import time +import traceback +from typing import Any, Dict, Optional + +import zenoh +from loguru import logger + +from commonwealth.utils.process import get_process_name +from commonwealth.utils.zenoh_helper import ZenohSession + +FOXGLOVE_ENCODING = zenoh.Encoding.APPLICATION_JSON.with_schema("foxglove.Log") +FOXGLOVE_INFO_LEVEL = 2 + + +class EventPublisher: + """Publisher for service lifecycle events following the Foxglove log schema.""" + + def __init__(self) -> None: + self._zenoh_session: Optional[ZenohSession] = None + self._service_name: Optional[str] = None + self._process_name: str = "" + self._topic: str = "" + self._initialized: bool = False + self._stop_emitted: bool = False + self._atexit_registered: bool = False + self._excepthook_installed: bool = False + self._previous_excepthook = None + self._asyncio_handler_installed: bool = False + self._previous_asyncio_handler = None + + def initialize(self, service_name: str) -> None: + """Initialize the event publisher with the current service name.""" + if self._initialized: + if service_name and service_name != self._service_name: + raise RuntimeError( + f"Event publisher already initialized for '{self._service_name}'," + f" cannot reinitialize with '{service_name}'." + ) + return + if not service_name: + raise ValueError("Service name cannot be empty when initializing event publisher.") + self._service_name = service_name + self._process_name = get_process_name() + self._zenoh_session = ZenohSession(service_name) + self._topic = f"services/{service_name}/event" + self._initialized = True + self._register_exit_handlers() + + @staticmethod + def _foxglove_timestamp() -> Dict[str, int]: + total_ns = time.time_ns() + return {"sec": total_ns // 1_000_000_000, "nsec": total_ns % 1_000_000_000} + + def _serialize_event_message(self, event_type: str, payload: Dict[str, Any]) -> str: + body = {"type": event_type, "payload": payload} + try: + return json.dumps(body, default=str) + except TypeError as exc: + logger.warning(f"Failed to serialize payload for {event_type} event: {exc}") + safe_payload = {key: str(value) for key, value in payload.items()} + return json.dumps({"type": event_type, "payload": safe_payload}) + + def _publish_event(self, event_type: str, payload: Dict[str, Any]) -> None: + if not self._initialized or self._zenoh_session is None: + raise RuntimeError("EventPublisher not initialized. Call events.initialize() before publishing.") + + session = self._zenoh_session.session + if session is None: + logger.debug("Zenoh session unavailable while publishing event, skipping.") + return + + foxglove_log = { + "timestamp": self._foxglove_timestamp(), + "level": FOXGLOVE_INFO_LEVEL, + "message": self._serialize_event_message(event_type, payload), + "name": self._zenoh_session.format_source_name(self._process_name), + "file": "", + "line": 0, + } + + try: + session.put( + self._topic, + json.dumps(foxglove_log), + encoding=FOXGLOVE_ENCODING, + ) + logger.debug(f"Published {event_type} event to {self._topic}") + except Exception as exc: + logger.error(f"Failed to publish {event_type} event: {exc}") + + def publish(self, event_type: str, payload: Optional[Dict[str, Any]] = None) -> None: + """Publish a generic event with a custom payload.""" + self._publish_event(event_type, payload or {}) + + def publish_start(self, additional_payload: Optional[Dict[str, Any]] = None) -> None: + payload = self._timestamp_payload(additional_payload) + self._publish_event("start", payload) + + def publish_settings(self, settings: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> None: + payload = self._timestamp_payload({"settings": settings}) + if metadata: + payload["metadata"] = metadata + self._publish_event("settings", payload) + + def publish_running(self, additional_payload: Optional[Dict[str, Any]] = None) -> None: + payload = self._timestamp_payload(additional_payload) + self._publish_event("running", payload) + # Emit an initial healthy status when the service is running. + self.publish_health("ready") + + def publish_health(self, status: str, details: Optional[Dict[str, Any]] = None) -> None: + payload = self._timestamp_payload({"status": status}) + if details: + payload["details"] = details + self._publish_event("health", payload) + + def publish_error(self, message: str, details: Optional[Dict[str, Any]] = None) -> None: + payload = self._timestamp_payload({"message": message}) + if details: + payload["details"] = details + self._publish_event("error", payload) + + def publish_stop(self, additional_payload: Optional[Dict[str, Any]] = None) -> None: + if self._stop_emitted: + return + payload = self._timestamp_payload(additional_payload) + self._stop_emitted = True + self._publish_event("stop", payload) + + def _timestamp_payload(self, base: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + payload: Dict[str, Any] = {"timestamp_ns": time.time_ns()} + if base: + payload.update(base) + return payload + + def _register_exit_handlers(self) -> None: + if not self._atexit_registered: + atexit.register(self._handle_process_exit) + self._atexit_registered = True + if not self._excepthook_installed: + self._previous_excepthook = sys.excepthook + sys.excepthook = self._handle_unhandled_exception + self._excepthook_installed = True + if not self._asyncio_handler_installed: + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = None + if loop is not None: + self._previous_asyncio_handler = loop.get_exception_handler() + loop.set_exception_handler(self._handle_async_exception) + self._asyncio_handler_installed = True + + def _handle_process_exit(self) -> None: + try: + self.publish_stop() + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Unable to publish stop event while shutting down: {exc}") + + def _handle_unhandled_exception(self, exc_type, exc_value, exc_traceback) -> None: # type: ignore[no-untyped-def] + try: + trace = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) + self.publish_error( + "unhandled_exception", + { + "exception_type": exc_type.__name__, + "exception_message": str(exc_value), + "traceback": trace, + }, + ) + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Failed to publish error event: {exc}") + finally: + if self._previous_excepthook: + self._previous_excepthook(exc_type, exc_value, exc_traceback) + + def _handle_async_exception(self, loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None: + try: + message = context.get("message", "asyncio_exception") + exception = context.get("exception") + details: Dict[str, Any] = {} + if exception: + details["exception_type"] = type(exception).__name__ + details["exception_message"] = str(exception) + details["traceback"] = "".join(traceback.format_exception(type(exception), exception, exception.__traceback__)) + if "future" in context: + details["future"] = repr(context["future"]) + self.publish_error(message, details or None) + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Failed to publish asyncio error event: {exc}") + finally: + if self._previous_asyncio_handler: + self._previous_asyncio_handler(loop, context) + else: + loop.default_exception_handler(context) + + +events = EventPublisher() + + +def init_event_publisher(service_name: str) -> None: + """Backward compatible wrapper for initializing events.""" + events.initialize(service_name) + + +def publish_start_event(additional_payload: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish a service start event.""" + events.publish_start(additional_payload) + + +def publish_settings_event(settings: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish a settings/configuration event.""" + events.publish_settings(settings, metadata) + + +def publish_running_event(additional_payload: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish a service running event.""" + events.publish_running(additional_payload) + + +def publish_health_event(status: str, details: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish a health event.""" + events.publish_health(status, details) + + +def publish_error_event(message: str, details: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish an error event.""" + events.publish_error(message, details) + + +def publish_stop_event(additional_payload: Optional[Dict[str, Any]] = None) -> None: + """Backward compatible wrapper to publish a stop event.""" + events.publish_stop(additional_payload) diff --git a/core/libs/commonwealth/src/commonwealth/utils/logs.py b/core/libs/commonwealth/src/commonwealth/utils/logs.py index 4d3bf7872b..e6588c6f01 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/logs.py +++ b/core/libs/commonwealth/src/commonwealth/utils/logs.py @@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, Callable, Optional, Union import zenoh +from commonwealth.utils.events import events +from commonwealth.utils.process import get_process_name from commonwealth.utils.zenoh_helper import ZenohSession from loguru import logger @@ -44,6 +46,7 @@ def validate_service_name(service_name: str) -> None: def init_logger(service_name: str) -> None: try: validate_service_name(service_name) + events.initialize(service_name) logger.add(create_log_sink(service_name), serialize=True) except Exception as e: print(f"Error: unable to set logging path: {e}") @@ -70,6 +73,7 @@ def create_log_sink(service_name: str) -> Callable[["Message"], None]: """ zenoh_session = ZenohSession(service_name) topic = f"services/{service_name}/log" + process_name = get_process_name() def sink(message: "Message") -> None: # Transform the message to the Foxglove log format @@ -98,7 +102,7 @@ def sink(message: "Message") -> None: }, "level": LEVEL_MAP.get(record["level"].name.upper(), LEVEL_MAP["UNKNOWN"]), "message": record["message"], - "name": record["name"], + "name": zenoh_session.format_source_name(process_name), "file": record["file"].name, "line": record["line"], } diff --git a/core/libs/commonwealth/src/commonwealth/utils/process.py b/core/libs/commonwealth/src/commonwealth/utils/process.py new file mode 100644 index 0000000000..6dc2e36b53 --- /dev/null +++ b/core/libs/commonwealth/src/commonwealth/utils/process.py @@ -0,0 +1,25 @@ +"""Helpers for retrieving process metadata used in telemetry publishing.""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + + +def get_process_name() -> str: + """Return a human-readable name for the current process.""" + argv0 = sys.argv[0] if sys.argv else "" + if argv0: + candidate = Path(argv0) + if candidate.name: + return candidate.name + + executable = getattr(sys, "executable", "") or "" + if executable: + name = Path(executable).name + if name: + return name + + return f"pid-{os.getpid()}" + diff --git a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py index 71914cf469..977c4002b5 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py +++ b/core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py @@ -2,7 +2,7 @@ import json import re from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable +from typing import Any, Callable, Optional import fastapi import zenoh @@ -18,6 +18,7 @@ class ZenohSession(metaclass=Singleton): session: zenoh.Session | None = None config: zenoh.Config _executor: ThreadPoolExecutor | None = None + _session_id: Optional[str] = None def __init__(self, service_name: str) -> None: if self.session is not None: @@ -48,6 +49,83 @@ def close(self) -> None: self._executor.shutdown(wait=False, cancel_futures=True) self._executor = None + def get_session_id(self) -> Optional[str]: + """Return the Zenoh session ID (zid) if available.""" + if self._session_id: + return self._session_id + if self.session is None: + return None + try: + info_attr = getattr(self.session, "info", None) + if info_attr is None: + logger.debug("Zenoh session does not expose info attribute.") + return None + info = info_attr() if callable(info_attr) else info_attr + except Exception as exc: # pragma: no cover - best effort logging + logger.debug(f"Could not fetch zenoh session info: {exc}") + return None + + session_id = self._extract_session_id(info) + if session_id: + self._session_id = session_id + return session_id + + def format_source_name(self, process_name: str) -> str: + """Compose the `/` identifier requested by the logging spec.""" + session_id = self.get_session_id() + return f"{session_id}/{process_name}" if session_id else process_name + + @staticmethod + def _extract_session_id(info: Any) -> Optional[str]: + if isinstance(info, dict): + for key in ("zid", "session_id", "id"): + value = info.get(key) + if value: + return str(value) + + candidate = getattr(info, "zid", None) + if candidate: + try: + candidate = candidate() if callable(candidate) else candidate + except Exception as exc: # pragma: no cover - best effort + logger.debug(f"Failed to call zid accessor: {exc}") + candidate = None + if candidate: + return str(candidate) + + if isinstance(info, str): + parsed = ZenohSession._parse_session_id_string(info) + if parsed: + return parsed + + try: + as_str = str(info) + except Exception: + as_str = None + if as_str: + parsed = ZenohSession._parse_session_id_string(as_str) + if parsed: + return parsed + + return None + + @staticmethod + def _parse_session_id_string(data: str) -> Optional[str]: + try: + decoded = json.loads(data) + if isinstance(decoded, dict): + for key in ("zid", "session_id", "id"): + value = decoded.get(key) + if value: + return str(value) + except (TypeError, ValueError, json.JSONDecodeError): + pass + + match = re.search(r"(?:zid|session[_ ]?id|id)\s*[:=]\s*\"?([0-9A-Fa-fx\.\-:]+)\"?", data) + if match: + return match.group(1) + return None + def zenoh_config(self, service_name: str) -> None: configuration = { "mode": "client", diff --git a/core/services/ardupilot_manager/main.py b/core/services/ardupilot_manager/main.py index acebc74343..094a0e9874 100755 --- a/core/services/ardupilot_manager/main.py +++ b/core/services/ardupilot_manager/main.py @@ -4,6 +4,7 @@ from args import CommandLineArgs from autopilot_manager import AutoPilotManager +from commonwealth.utils.events import events from commonwealth.utils.general import is_running_as_root from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async @@ -14,6 +15,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting AutoPilot Manager.") autopilot = AutoPilotManager() @@ -46,6 +48,16 @@ async def main() -> None: asyncio.create_task(autopilot.auto_restart_ardupilot()) asyncio.create_task(autopilot.start_mavlink_manager_watchdog()) + # Publish running event when service is ready + events.publish_running() + events.publish_health( + "ready", + { + "endpoint": f"{args.host}:{args.port}", + "sitl": args.sitl, + }, + ) + await server.serve() await autopilot.kill_ardupilot() diff --git a/core/services/ardupilot_manager/mavlink_proxy/MAVLinkServer.py b/core/services/ardupilot_manager/mavlink_proxy/MAVLinkServer.py index ac26e93452..80dab62875 100644 --- a/core/services/ardupilot_manager/mavlink_proxy/MAVLinkServer.py +++ b/core/services/ardupilot_manager/mavlink_proxy/MAVLinkServer.py @@ -44,14 +44,19 @@ def convert_endpoint(endpoint: Endpoint) -> str: filtered_endpoints = Endpoint.filter_enabled(self.endpoints()) endpoints = " ".join([convert_endpoint(endpoint) for endpoint in [master_endpoint, *filtered_endpoints]]) - if not self.log_path: - self.log_path = "/var/logs/blueos/services/mavlink-server/" if not self.mavlink_system_id: self.mavlink_system_id = int(os.environ.get("MAV_SYSTEM_ID", 1)) if not self.mavlink_component_id: self.mavlink_component_id = int(os.environ.get("MAV_COMPONENT_ID_ONBOARD_COMPUTER", 191)) - return f"{self.binary()} {endpoints} --mavlink-system-id={self.mavlink_system_id} --mavlink-component-id={self.mavlink_component_id} --log-path={self.log_path}" + command = ( + f"{self.binary()} {endpoints}" + f" --mavlink-system-id={self.mavlink_system_id}" + f" --mavlink-component-id={self.mavlink_component_id}" + ) + if self.log_path: + command += f" --log-path={self.log_path}" + return command @staticmethod def name() -> str: diff --git a/core/services/bag_of_holding/main.py b/core/services/bag_of_holding/main.py index 1a1d24a705..82960be794 100755 --- a/core/services/bag_of_holding/main.py +++ b/core/services/bag_of_holding/main.py @@ -8,6 +8,7 @@ import appdirs import dpath from commonwealth.utils.apis import GenericErrorHandlingRoute +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import Body, FastAPI, HTTPException @@ -23,6 +24,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( title="Bag of Holding API", @@ -117,6 +119,10 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=9101, log_config=None) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9101}) + await server.serve() diff --git a/core/services/beacon/main.py b/core/services/beacon/main.py index db3ffbe7be..e7a0737ceb 100755 --- a/core/services/beacon/main.py +++ b/core/services/beacon/main.py @@ -10,6 +10,7 @@ import psutil from commonwealth.settings.manager import Manager from commonwealth.utils.apis import PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.logs import init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import FastAPI, Request @@ -24,7 +25,6 @@ SERVICE_NAME = "beacon" - class AsyncRunner: def __init__(self, ip_version: IPVersion, interface: str, interface_name: str) -> None: self.ip_version = ip_version @@ -266,6 +266,7 @@ async def stop(self) -> None: logging.basicConfig(level=logging.DEBUG) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( title="Beacon API", @@ -348,6 +349,10 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=9111, log_config=None) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9111}) + asyncio.create_task(beacon.run()) await server.serve() await beacon.stop() diff --git a/core/services/bridget/main.py b/core/services/bridget/main.py index 284d82ddde..9d0a49446e 100755 --- a/core/services/bridget/main.py +++ b/core/services/bridget/main.py @@ -5,6 +5,7 @@ from bridget import BridgeFrontendSpec, Bridget from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import FastAPI, status @@ -17,6 +18,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( title="Bridget API", @@ -83,6 +85,10 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=27353, log_config=None) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 27353}) + await server.serve() diff --git a/core/services/cable_guy/main.py b/core/services/cable_guy/main.py index d1712077de..aa5e790873 100755 --- a/core/services/cable_guy/main.py +++ b/core/services/cable_guy/main.py @@ -10,6 +10,7 @@ from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse from commonwealth.utils.decorators import temporary_cache from commonwealth.utils.DHCPServerManager import DHCPServerDetails, DHCPServerLease +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from config import SERVICE_NAME @@ -22,6 +23,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() manager = EthernetManager() @@ -186,6 +188,10 @@ async def main() -> None: await manager.initialize() asyncio.create_task(manager.watchdog()) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9090}) + await server.serve() diff --git a/core/services/commander/main.py b/core/services/commander/main.py index 88e8a5ae9e..516811b1c2 100755 --- a/core/services/commander/main.py +++ b/core/services/commander/main.py @@ -12,6 +12,7 @@ import appdirs from commonwealth.utils.apis import GenericErrorHandlingRoute from commonwealth.utils.commands import run_command +from commonwealth.utils.events import events from commonwealth.utils.general import delete_everything, delete_everything_stream from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async @@ -23,11 +24,12 @@ from uvicorn import Config, Server SERVICE_NAME = "commander" -LOG_FOLDER_PATH = os.environ.get("BLUEOS_LOG_FOLDER_PATH", "/var/logs/blueos") +LOG_FOLDER_PATH = os.environ.get("BLUEOS_LOG_FOLDER_PATH", "/usr/blueos/system_logs") MAVLINK_LOG_FOLDER_PATH = os.environ.get("BLUEOS_MAVLINK_LOG_FOLDER_PATH", "/shortcuts/ardupilot_logs/logs/") logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( title="Commander API", @@ -298,6 +300,11 @@ async def main() -> None: # Running uvicorn with log disabled so loguru can handle it config = Config(app=app, host="0.0.0.0", port=9100, log_config=None) server = Server(config) + + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9100}) + await server.serve() diff --git a/core/services/disk_usage/main.py b/core/services/disk_usage/main.py index 4c0cdb84b7..aec4bc7773 100755 --- a/core/services/disk_usage/main.py +++ b/core/services/disk_usage/main.py @@ -12,6 +12,7 @@ from typing import Any, Dict, List, Optional from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import APIRouter, FastAPI, HTTPException, Query, status @@ -28,6 +29,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting Disk Usage service") @@ -436,6 +438,8 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=PORT, log_config=None) server = Server(config) + events.publish_running() + events.publish_health("ready", {"port": PORT}) await server.serve() finally: logger.info("Disk Usage service stopped") diff --git a/core/services/helper/main.py b/core/services/helper/main.py index 11a4e193f6..b54a5b05be 100755 --- a/core/services/helper/main.py +++ b/core/services/helper/main.py @@ -30,6 +30,7 @@ local_hardware_identifier, local_unique_identifier, ) +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import FastAPI, HTTPException @@ -45,6 +46,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG) try: init_logger(SERVICE_NAME) + events.publish_start() except Exception as logger_e: print(f"Error: unable to set logger path: {logger_e}") @@ -626,6 +628,10 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=Helper.PORT, log_config=None) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": Helper.PORT}) + asyncio.create_task(periodic()) await server.serve() diff --git a/core/services/kraken/main.py b/core/services/kraken/main.py index 9341384f14..a0414a4aef 100755 --- a/core/services/kraken/main.py +++ b/core/services/kraken/main.py @@ -3,14 +3,19 @@ import logging from args import CommandLineArgs +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from config import SERVICE_NAME from loguru import logger from uvicorn import Config, Server +from args import CommandLineArgs +from config import SERVICE_NAME + logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() from api import application from jobs import JobsManager @@ -35,6 +40,10 @@ async def main() -> None: jobs.set_base_host(f"http://{args.host}:{args.port}") + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"endpoint": f"{args.host}:{args.port}"}) + # Launch background tasks asyncio.create_task(kraken.start_cleaner_task()) asyncio.create_task(kraken.start_starter_task()) diff --git a/core/services/nmea_injector/main.py b/core/services/nmea_injector/main.py index 3bd1c3926c..21e1b9382b 100755 --- a/core/services/nmea_injector/main.py +++ b/core/services/nmea_injector/main.py @@ -5,6 +5,7 @@ from typing import Any, List from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import FastAPI, status @@ -24,6 +25,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( @@ -95,6 +97,10 @@ async def main() -> None: if args.tcp: asyncio.create_task(controller.add_sock(NMEASocket(kind=SocketKind.TCP, port=args.tcp, component_id=221))) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 2748, "udp_input": args.udp, "tcp_input": args.tcp}) + await server.serve() diff --git a/core/services/pardal/main.py b/core/services/pardal/main.py index b096e1f771..53e239e761 100755 --- a/core/services/pardal/main.py +++ b/core/services/pardal/main.py @@ -8,6 +8,7 @@ import aiohttp from aiohttp import web +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from loguru import logger @@ -23,6 +24,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting Pardal") @@ -155,6 +157,10 @@ async def main() -> None: site = web.TCPSite(runner, host="0.0.0.0", port=args.port) await site.start() + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": args.port}) + # Wait forever await asyncio.Event().wait() diff --git a/core/services/ping/main.py b/core/services/ping/main.py index 5a9ecf4df5..f88ab66ca6 100755 --- a/core/services/ping/main.py +++ b/core/services/ping/main.py @@ -5,6 +5,7 @@ from typing import Any, List from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from fastapi import FastAPI, status @@ -21,6 +22,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() app = FastAPI( title="Ping Manager API", @@ -86,6 +88,10 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=9110, log_config=None) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9110}) + asyncio.create_task(sensor_manager()) await server.serve() diff --git a/core/services/recorder_extractor/main.py b/core/services/recorder_extractor/main.py index 041f514eee..e102338e2e 100755 --- a/core/services/recorder_extractor/main.py +++ b/core/services/recorder_extractor/main.py @@ -13,6 +13,7 @@ from aiocache import cached from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse +from commonwealth.utils.events import events from commonwealth.utils.general import file_is_open_async from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async @@ -35,6 +36,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting Recorder Extractor service") @@ -456,6 +458,8 @@ async def main() -> None: config = Config(app=app, host="0.0.0.0", port=PORT, log_config=None) server = Server(config) + events.publish_running() + events.publish_health("ready", {"port": PORT}) await server.serve() finally: extractor_task.cancel() diff --git a/core/services/versionchooser/main.py b/core/services/versionchooser/main.py index 29d348ee8e..f4030a45c2 100755 --- a/core/services/versionchooser/main.py +++ b/core/services/versionchooser/main.py @@ -4,6 +4,7 @@ from api import application from args import CommandLineArgs +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from loguru import logger @@ -13,6 +14,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting Version Chooser") @@ -29,6 +31,10 @@ async def main() -> None: config = Config(app=application, host=args.host, port=args.port) server = Server(config) + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"endpoint": f"{args.host}:{args.port}"}) + await server.serve() diff --git a/core/services/versionchooser/utils/chooser.py b/core/services/versionchooser/utils/chooser.py index 71197ac766..3eb589e39b 100644 --- a/core/services/versionchooser/utils/chooser.py +++ b/core/services/versionchooser/utils/chooser.py @@ -249,7 +249,6 @@ async def set_bootstrap_version(self, tag: str) -> JSONResponse: "Binds": [ f"{HOME}/.config/blueos/bootstrap:/root/.config/bootstrap", "/var/run/docker.sock:/var/run/docker.sock", - "/var/logs/blueos:/var/logs/blueos", ], "LogConfig": { "Type": "json-file", diff --git a/core/services/wifi/main.py b/core/services/wifi/main.py index 8e4838b771..0d282494c1 100755 --- a/core/services/wifi/main.py +++ b/core/services/wifi/main.py @@ -11,6 +11,7 @@ PrettyJSONResponse, StackedHTTPException, ) +from commonwealth.utils.events import events from commonwealth.utils.logs import InterceptHandler, init_logger from commonwealth.utils.sentry_config import init_sentry_async from exceptions import BusyError @@ -35,6 +36,7 @@ logging.basicConfig(handlers=[InterceptHandler()], level=0) init_logger(SERVICE_NAME) +events.publish_start() logger.info("Starting Wifi Manager.") wpa_manager = WifiManager() @@ -195,6 +197,10 @@ async def main() -> None: wifi_manager = implementation break + # Publish running event when service is ready + events.publish_running() + events.publish_health("ready", {"port": 9000}) + await server.serve()