From 7ce0091af51599b66bacd3046c4de78e2a7798a6 Mon Sep 17 00:00:00 2001 From: dheerajoruganty Date: Sun, 4 May 2025 07:44:08 +0000 Subject: [PATCH 1/2] feat: Implement file logging and fix dynamic Nginx config for Docker --- docker/nginx_rev_proxy.conf | 132 +++++----- registry/main.py | 487 ++++++++++++++++++++++-------------- 2 files changed, 370 insertions(+), 249 deletions(-) diff --git a/docker/nginx_rev_proxy.conf b/docker/nginx_rev_proxy.conf index f7e4200..fb165a8 100644 --- a/docker/nginx_rev_proxy.conf +++ b/docker/nginx_rev_proxy.conf @@ -12,38 +12,45 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } - location /mcpgw/ { - proxy_pass http://127.0.0.1:8003/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - } + # REMOVE HARDCODED /mcpgw + # location /mcpgw/ { + # proxy_pass http://127.0.0.1:8003/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # } - # Route for Current Time service - location /currenttime/ { - proxy_pass http://127.0.0.1:8001/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - } + # REMOVE HARDCODED /currenttime + # location /currenttime/ { + # proxy_pass http://127.0.0.1:8001/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # } - # Route for Financial Information service - location /fininfo/ { - proxy_pass http://127.0.0.1:8002/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - - # Additional settings for SSE support - proxy_set_header Connection ''; - chunked_transfer_encoding off; - proxy_buffering off; - proxy_cache off; - proxy_read_timeout 3600s; - } + # REMOVE HARDCODED /fininfo + # location /fininfo/ { + # proxy_pass http://127.0.0.1:8002/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # + # # Additional settings for SSE support + # proxy_set_header Connection ''; + # chunked_transfer_encoding off; + # proxy_buffering off; + # proxy_cache off; + # proxy_read_timeout 3600s; + # } + + # --- ADD DYNAMIC MARKERS --- START + # DYNAMIC_LOCATIONS_START + + # DYNAMIC_LOCATIONS_END + # --- ADD DYNAMIC MARKERS --- END location /tsbedrock/ { # Fix the path handling by adding trailing slash and using $request_uri @@ -91,36 +98,45 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } - location /mcpgw/ { - proxy_pass http://127.0.0.1:8003/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - } + # REMOVE HARDCODED /mcpgw + # location /mcpgw/ { + # proxy_pass http://127.0.0.1:8003/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # } - location /currenttime/ { - proxy_pass http://127.0.0.1:8001/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - } + # REMOVE HARDCODED /currenttime + # location /currenttime/ { + # proxy_pass http://127.0.0.1:8001/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # } - location /fininfo/ { - proxy_pass http://127.0.0.1:8002/; - proxy_http_version 1.1; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - - # Additional settings for SSE support - proxy_set_header Connection ''; - chunked_transfer_encoding off; - proxy_buffering off; - proxy_cache off; - proxy_read_timeout 3600s; - } + # REMOVE HARDCODED /fininfo + # location /fininfo/ { + # proxy_pass http://127.0.0.1:8002/; + # proxy_http_version 1.1; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # + # # Additional settings for SSE support + # proxy_set_header Connection ''; + # chunked_transfer_encoding off; + # proxy_buffering off; + # proxy_cache off; + # proxy_read_timeout 3600s; + # } + + # --- ADD DYNAMIC MARKERS --- START + # DYNAMIC_LOCATIONS_START + + # DYNAMIC_LOCATIONS_END + # --- ADD DYNAMIC MARKERS --- END location /tsbedrock/ { proxy_pass https://hwfo2k8szg.execute-api.us-east-1.amazonaws.com/prod/; diff --git a/registry/main.py b/registry/main.py index ba308bb..a2f49c8 100644 --- a/registry/main.py +++ b/registry/main.py @@ -26,26 +26,71 @@ from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature from dotenv import load_dotenv import subprocess # Added for nginx reload +import logging # --- MCP Client Imports --- START from mcp import ClientSession from mcp.client.sse import sse_client # --- MCP Client Imports --- END -# Determine the base directory of this script (registry folder) -BASE_DIR = Path(__file__).resolve().parent - -load_dotenv(dotenv_path=BASE_DIR.parent / ".env") # Load .env from parent directory +# --- Define paths based on container structure --- START +CONTAINER_APP_DIR = Path("/app") +CONTAINER_REGISTRY_DIR = CONTAINER_APP_DIR / "registry" +CONTAINER_LOG_DIR = CONTAINER_APP_DIR / "logs" +# --- Define paths based on container structure --- END -# --- Configuration & State (Paths relative to this script) --- -NGINX_CONFIG_PATH = ( - BASE_DIR / "nginx_mcp_revproxy.conf" -) # In the same folder as main.py -SERVERS_DIR = BASE_DIR / "servers" # Directory to store individual server JSON files -STATIC_DIR = BASE_DIR / "static" -TEMPLATES_DIR = BASE_DIR / "templates" -NGINX_TEMPLATE_PATH = BASE_DIR / "nginx_template.conf" # Path to the template -STATE_FILE_PATH = BASE_DIR / "server_state.json" # Path to store enabled/disabled state +# Determine the base directory of this script (registry folder) +# BASE_DIR = Path(__file__).resolve().parent # Less relevant inside container + +# --- Load .env if it exists in the expected location relative to the app --- START +# Assumes .env might be mounted at /app/.env or similar +# DOTENV_PATH = BASE_DIR / ".env" +DOTENV_PATH = CONTAINER_REGISTRY_DIR / ".env" # Use container path +if DOTENV_PATH.exists(): + load_dotenv(dotenv_path=DOTENV_PATH) + print(f"Loaded environment variables from {DOTENV_PATH}") +else: + print(f"Warning: .env file not found at {DOTENV_PATH}") +# --- Load .env if it exists in the expected location relative to the app --- END + +# --- Configuration & State (Paths relative to container structure) --- +# Assumes nginx config might be placed alongside registry code +# NGINX_CONFIG_PATH = ( +# CONTAINER_REGISTRY_DIR / "nginx_mcp_revproxy.conf" +# ) +NGINX_CONFIG_PATH = Path("/etc/nginx/conf.d/nginx_rev_proxy.conf") # Target the actual Nginx config file +# Use the mounted volume path for server definitions +SERVERS_DIR = CONTAINER_REGISTRY_DIR / "servers" +STATIC_DIR = CONTAINER_REGISTRY_DIR / "static" +TEMPLATES_DIR = CONTAINER_REGISTRY_DIR / "templates" +# NGINX_TEMPLATE_PATH = CONTAINER_REGISTRY_DIR / "nginx_template.conf" +# Use the mounted volume path for state file, keep it with servers +STATE_FILE_PATH = SERVERS_DIR / "server_state.json" +# Define log file path +# LOG_FILE_PATH = BASE_DIR / "registry.log" +LOG_FILE_PATH = CONTAINER_LOG_DIR / "registry.log" + +# --- REMOVE Logging Setup from here --- START +# # Ensure log directory exists +# CONTAINER_LOG_DIR.mkdir(parents=True, exist_ok=True) +# +# # Configure logging +# logging.basicConfig( +# level=logging.INFO, +# format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +# handlers=[ +# logging.FileHandler(LOG_FILE_PATH), # Log to file in /app/logs +# logging.StreamHandler() # Log to console (stdout/stderr) +# ] +# ) +# +# logger = logging.getLogger(__name__) # Get a logger instance +# logger.info("Logging configured. Application starting...") +# --- REMOVE Logging Setup from here --- END + +# --- Define logger at module level (unconfigured initially) --- START +logger = logging.getLogger(__name__) +# --- Define logger at module level (unconfigured initially) --- END # In-memory state store REGISTERED_SERVERS = {} @@ -61,7 +106,7 @@ async def broadcast_health_status(): """Sends the current health status to all connected WebSocket clients.""" if active_connections: - print(f"Broadcasting health status to {len(active_connections)} clients...") + logger.info(f"Broadcasting health status to {len(active_connections)} clients...") # Construct data payload with status and ISO timestamp string data_to_send = {} @@ -100,12 +145,12 @@ async def broadcast_health_status(): if isinstance(result, Exception): # Check if it's a connection-related error (more specific checks possible) # For now, assume any exception during send means the client is gone - print(f"Error sending to WebSocket client {conn.client}: {result}. Marking for removal.") + logger.warning(f"Error sending to WebSocket client {conn.client}: {result}. Marking for removal.") disconnected_clients.add(conn) # Remove all disconnected clients identified during the broadcast if disconnected_clients: - print(f"Removing {len(disconnected_clients)} disconnected clients after broadcast.") + logger.info(f"Removing {len(disconnected_clients)} disconnected clients after broadcast.") for conn in disconnected_clients: if conn in active_connections: active_connections.remove(conn) @@ -113,9 +158,7 @@ async def broadcast_health_status(): # Session management configuration SECRET_KEY = os.environ.get("SECRET_KEY", "insecure-default-key-for-testing-only") if SECRET_KEY == "insecure-default-key-for-testing-only": - print( - "\nWARNING: Using insecure default SECRET_KEY. Set a strong SECRET_KEY environment variable for production.\n" - ) + logger.warning("Using insecure default SECRET_KEY. Set a strong SECRET_KEY environment variable for production.") SESSION_COOKIE_NAME = "mcp_gateway_session" signer = URLSafeTimedSerializer(SECRET_KEY) SESSION_MAX_AGE_SECONDS = 60 * 60 * 8 # 8 hours @@ -123,7 +166,7 @@ async def broadcast_health_status(): # --- Nginx Config Generation --- LOCATION_BLOCK_TEMPLATE = """ - location {path} {{ + location {path}/ {{ proxy_pass {proxy_pass_url}; proxy_http_version 1.1; proxy_set_header Host $host; @@ -133,7 +176,7 @@ async def broadcast_health_status(): """ COMMENTED_LOCATION_BLOCK_TEMPLATE = """ -# location {path} {{ +# location {path}/ {{ # proxy_pass {proxy_pass_url}; # proxy_http_version 1.1; # proxy_set_header Host $host; @@ -144,69 +187,121 @@ async def broadcast_health_status(): def regenerate_nginx_config(): """Generates the nginx config file based on registered servers and their state.""" - print(f"Regenerating Nginx config at {NGINX_CONFIG_PATH}...") + logger.info(f"Attempting to directly modify Nginx config at {NGINX_CONFIG_PATH}...") + + # Define markers + START_MARKER = "# DYNAMIC_LOCATIONS_START" + END_MARKER = "# DYNAMIC_LOCATIONS_END" + try: - with open(NGINX_TEMPLATE_PATH, 'r') as f_template: - template_content = f_template.read() + # Read the *target* Nginx config file + with open(NGINX_CONFIG_PATH, 'r') as f_target: + target_content = f_target.read() - location_blocks = [] + # Generate the location blocks section content (only needs to be done once) + location_blocks_content = [] sorted_paths = sorted(REGISTERED_SERVERS.keys()) for path in sorted_paths: server_info = REGISTERED_SERVERS[path] proxy_url = server_info.get("proxy_pass_url") - is_enabled = MOCK_SERVICE_STATE.get(path, False) # Default to disabled if state unknown - health_status = SERVER_HEALTH_STATUS.get(path) # Get current health status + is_enabled = MOCK_SERVICE_STATE.get(path, False) + health_status = SERVER_HEALTH_STATUS.get(path) if not proxy_url: - print(f"Warning: Skipping server '{server_info['server_name']}' ({path}) - missing proxy_pass_url.") + logger.warning(f"Skipping server '{server_info['server_name']}' ({path}) - missing proxy_pass_url.") continue - # Only create an active block if the service is enabled AND healthy if is_enabled and health_status == "healthy": - block = LOCATION_BLOCK_TEMPLATE.format( - path=path, - proxy_pass_url=proxy_url - ) + block = LOCATION_BLOCK_TEMPLATE.format(path=path, proxy_pass_url=proxy_url) else: - # Comment out the block if disabled OR not healthy - block = COMMENTED_LOCATION_BLOCK_TEMPLATE.format( - path=path, - proxy_pass_url=proxy_url - ) - location_blocks.append(block) - - final_config = template_content.replace("# {{LOCATION_BLOCKS}}", "\n".join(location_blocks)) + block = COMMENTED_LOCATION_BLOCK_TEMPLATE.format(path=path, proxy_pass_url=proxy_url) + location_blocks_content.append(block) + + generated_section = "\n".join(location_blocks_content).strip() + # --- Replace content between ALL marker pairs --- START + new_content = "" + current_pos = 0 + while True: + # Find the next start marker + start_index = target_content.find(START_MARKER, current_pos) + if start_index == -1: + # No more start markers found, append the rest of the file + new_content += target_content[current_pos:] + break + + # Find the corresponding end marker after the start marker + end_index = target_content.find(END_MARKER, start_index + len(START_MARKER)) + if end_index == -1: + # Found a start marker without a matching end marker, log error and stop + logger.error(f"Found '{START_MARKER}' at position {start_index} without a matching '{END_MARKER}' in {NGINX_CONFIG_PATH}. Aborting regeneration.") + # Append the rest of the file to avoid data loss, but don't reload + new_content += target_content[current_pos:] + # Write back the partially processed content? Or just return False? + # Let's return False to indicate failure without modifying the file potentially incorrectly. + return False # Indicate failure + + # Append the content before the current start marker + new_content += target_content[current_pos:start_index + len(START_MARKER)] + # Append the newly generated section (with appropriate newlines) + new_content += f"\n\n{generated_section}\n\n " + # Update current position to be after the end marker + current_pos = end_index + + # Check if any replacements were made (i.e., if current_pos moved beyond 0) + if current_pos == 0: + logger.error(f"No marker pairs '{START_MARKER}'...'{END_MARKER}' found in {NGINX_CONFIG_PATH}. Cannot regenerate.") + return False + + final_config = new_content # Use the iteratively built content + # --- Replace content between ALL marker pairs --- END + + # # Find the start and end markers in the target content + # start_index = target_content.find(START_MARKER) + # end_index = target_content.find(END_MARKER) + # + # if start_index == -1 or end_index == -1 or end_index <= start_index: + # logger.error(f"Markers '{START_MARKER}' and/or '{END_MARKER}' not found or in wrong order in {NGINX_CONFIG_PATH}. Cannot regenerate.") + # return False + # + # # Extract the parts before the start marker and after the end marker + # prefix = target_content[:start_index + len(START_MARKER)] + # suffix = target_content[end_index:] + # + # # Construct the new content + # # Add newlines around the generated section for readability + # final_config = f"{prefix}\n\n{generated_section}\n\n {suffix}" + + # Write the modified content back to the target file with open(NGINX_CONFIG_PATH, 'w') as f_out: f_out.write(final_config) - print("Nginx config regeneration successful.") + logger.info(f"Nginx config file {NGINX_CONFIG_PATH} modified successfully.") # --- Reload Nginx --- START try: - print("Attempting to reload Nginx configuration...") - # Ensure nginx command is available in PATH and process has permissions + logger.info("Attempting to reload Nginx configuration...") result = subprocess.run(['nginx', '-s', 'reload'], check=True, capture_output=True, text=True) - print(f"Nginx reload successful. Output:\n{result.stdout}") - # --- Reload Nginx --- END - return True # Return True only if write AND reload succeed + logger.info(f"Nginx reload successful. stdout: {result.stdout.strip()}") + return True except FileNotFoundError: - print("ERROR: 'nginx' command not found. Cannot reload configuration.") - return False # Indicate failure if nginx command isn't found + logger.error("'nginx' command not found. Cannot reload configuration.") + return False except subprocess.CalledProcessError as e: - print(f"ERROR: Failed to reload Nginx configuration. Return code: {e.returncode}") - print(f"Stderr: {e.stderr}") - print(f"Stdout: {e.stdout}") - return False # Indicate failure on reload error - except Exception as e: # Catch other potential exceptions like permission errors - print(f"ERROR: An unexpected error occurred during Nginx reload: {e}") - return False # Indicate failure + logger.error(f"Failed to reload Nginx configuration. Return code: {e.returncode}") + logger.error(f"Nginx reload stderr: {e.stderr.strip()}") + logger.error(f"Nginx reload stdout: {e.stdout.strip()}") + return False + except Exception as e: + logger.error(f"An unexpected error occurred during Nginx reload: {e}", exc_info=True) + return False + # --- Reload Nginx --- END except FileNotFoundError: - print(f"ERROR: Nginx template file not found at {NGINX_TEMPLATE_PATH}") + logger.error(f"Target Nginx config file not found at {NGINX_CONFIG_PATH}. Cannot regenerate.") return False except Exception as e: - print(f"ERROR: Failed to regenerate Nginx config: {e}") + logger.error(f"Failed to modify Nginx config at {NGINX_CONFIG_PATH}: {e}", exc_info=True) return False # --- Helper function to normalize a path to a filename --- @@ -222,20 +317,23 @@ def path_to_filename(path): # --- Data Loading --- def load_registered_servers_and_state(): global REGISTERED_SERVERS, MOCK_SERVICE_STATE - print(f"Loading server definitions from {SERVERS_DIR}...") + logger.info(f"Loading server definitions from {SERVERS_DIR}...") # Create servers directory if it doesn't exist - SERVERS_DIR.mkdir(exist_ok=True) + SERVERS_DIR.mkdir(parents=True, exist_ok=True) # Added parents=True temp_servers = {} server_files = list(SERVERS_DIR.glob("*.json")) if not server_files: - print(f"No server definition files found in {SERVERS_DIR}.") + logger.warning(f"No server definition files found in {SERVERS_DIR}. Initializing empty registry.") REGISTERED_SERVERS = {} - return + # Don't return yet, need to load state file + # return for server_file in server_files: + if server_file.name == STATE_FILE_PATH.name: # Skip the state file itself + continue try: with open(server_file, "r") as f: server_info = json.load(f) @@ -247,9 +345,7 @@ def load_registered_servers_and_state(): ): server_path = server_info["path"] if server_path in temp_servers: - print( - f"Warning: Duplicate server path found in {server_file}: {server_path}. Overwriting previous definition." - ) + logger.warning(f"Duplicate server path found in {server_file}: {server_path}. Overwriting previous definition.") # Add new fields with defaults server_info["description"] = server_info.get("description", "") @@ -263,41 +359,37 @@ def load_registered_servers_and_state(): temp_servers[server_path] = server_info else: - print( - f"Warning: Invalid server entry format found in {server_file}. Skipping." - ) + logger.warning(f"Invalid server entry format found in {server_file}. Skipping.") except FileNotFoundError: - print(f"ERROR: Server definition file not found at {server_file}.") + logger.error(f"Server definition file {server_file} reported by glob not found.") except json.JSONDecodeError as e: - print(f"ERROR: Could not parse JSON from {server_file}: {e}.") + logger.error(f"Could not parse JSON from {server_file}: {e}.") except Exception as e: - print(f"ERROR: An unexpected error occurred loading {server_file}: {e}.") + logger.error(f"An unexpected error occurred loading {server_file}: {e}", exc_info=True) REGISTERED_SERVERS = temp_servers - print( - f"Successfully loaded {len(REGISTERED_SERVERS)} servers from individual files." - ) + logger.info(f"Successfully loaded {len(REGISTERED_SERVERS)} server definitions.") # --- Load persisted mock service state --- START - print(f"Attempting to load persisted state from {STATE_FILE_PATH}...") + logger.info(f"Attempting to load persisted state from {STATE_FILE_PATH}...") loaded_state = {} try: if STATE_FILE_PATH.exists(): with open(STATE_FILE_PATH, "r") as f: loaded_state = json.load(f) if not isinstance(loaded_state, dict): - print(f"Warning: Invalid state format in {STATE_FILE_PATH}. Expected a dictionary. Ignoring.") + logger.warning(f"Invalid state format in {STATE_FILE_PATH}. Expected a dictionary. Resetting state.") loaded_state = {} # Reset if format is wrong else: - print("Successfully loaded persisted state.") + logger.info("Successfully loaded persisted state.") else: - print("No persisted state file found. Initializing state.") + logger.info(f"No persisted state file found at {STATE_FILE_PATH}. Initializing state.") except json.JSONDecodeError as e: - print(f"ERROR: Could not parse JSON from {STATE_FILE_PATH}: {e}. Initializing state.") + logger.error(f"Could not parse JSON from {STATE_FILE_PATH}: {e}. Initializing empty state.") loaded_state = {} except Exception as e: - print(f"ERROR: Failed to read state file {STATE_FILE_PATH}: {e}. Initializing state.") + logger.error(f"Failed to read state file {STATE_FILE_PATH}: {e}. Initializing empty state.", exc_info=True) loaded_state = {} # Initialize MOCK_SERVICE_STATE: Use loaded state if valid, otherwise default to False. @@ -306,7 +398,7 @@ def load_registered_servers_and_state(): for path in REGISTERED_SERVERS.keys(): MOCK_SERVICE_STATE[path] = loaded_state.get(path, False) # Default to False if not in loaded state or state was invalid - print(f"Final initial mock state: {MOCK_SERVICE_STATE}") + logger.info(f"Initial mock service state loaded: {MOCK_SERVICE_STATE}") # --- Load persisted mock service state --- END @@ -318,9 +410,9 @@ def load_registered_servers_and_state(): SERVER_HEALTH_STATUS[path] = "checking" if is_enabled else "disabled" else: # This case should ideally not happen if MOCK_SERVICE_STATE is built from REGISTERED_SERVERS - print(f"Warning: Path {path} found in loaded state but not in registered servers. Ignoring.") + logger.warning(f"Path {path} found in loaded state but not in registered servers. Ignoring.") - print(f"Initialized health status based on loaded state: {SERVER_HEALTH_STATUS}") + logger.info(f"Initialized health status based on loaded state: {SERVER_HEALTH_STATUS}") # We no longer need the explicit default initialization block below # print("Initializing mock service state (defaulting to disabled)...") @@ -333,7 +425,7 @@ def load_registered_servers_and_state(): def save_server_to_file(server_info): try: # Create servers directory if it doesn't exist - SERVERS_DIR.mkdir(exist_ok=True) + SERVERS_DIR.mkdir(parents=True, exist_ok=True) # Ensure it exists # Generate filename based on path path = server_info["path"] @@ -343,12 +435,10 @@ def save_server_to_file(server_info): with open(file_path, "w") as f: json.dump(server_info, f, indent=2) - print( - f"Successfully saved server '{server_info['server_name']}' to {file_path}" - ) + logger.info(f"Successfully saved server '{server_info['server_name']}' to {file_path}") return True except Exception as e: - print(f"ERROR: Failed to save server data to {filename}: {e}") + logger.error(f"Failed to save server '{server_info.get('server_name', 'UNKNOWN')}' data to {filename}: {e}", exc_info=True) return False @@ -367,7 +457,7 @@ async def get_tools_from_server(base_url: str) -> List[dict] | None: # Return li """ # Determine scheme and construct the full /sse URL if not base_url: - print("MCP Check Error: Base URL is empty.") + logger.error("MCP Check Error: Base URL is empty.") return None sse_url = base_url.rstrip('/') + "/sse" @@ -376,7 +466,7 @@ async def get_tools_from_server(base_url: str) -> List[dict] | None: # Return li mcp_server_url = f"http{secure_prefix}://{sse_url[len(f'http{secure_prefix}://'):]}" # Ensure correct format for sse_client - print(f"Attempting to connect to MCP server at {mcp_server_url} to get tool list...") + logger.info(f"Attempting to connect to MCP server at {mcp_server_url} to get tool list...") try: # Connect using the sse_client context manager directly async with sse_client(mcp_server_url) as (read, write): @@ -452,16 +542,16 @@ async def get_tools_from_server(base_url: str) -> List[dict] | None: # Return li "schema": tool_schema }) - print(f"Successfully retrieved details for {len(tool_details_list)} tools from {mcp_server_url}.") + logger.info(f"Successfully retrieved details for {len(tool_details_list)} tools from {mcp_server_url}.") return tool_details_list # Return the list of details except asyncio.TimeoutError: - print(f"MCP Check Error: Timeout during session operation with {mcp_server_url}.") + logger.error(f"MCP Check Error: Timeout during session operation with {mcp_server_url}.") return None except ConnectionRefusedError: - print(f"MCP Check Error: Connection refused by {mcp_server_url}.") + logger.error(f"MCP Check Error: Connection refused by {mcp_server_url}.") return None except Exception as e: - print(f"MCP Check Error: Failed to get tool list from {mcp_server_url}: {type(e).__name__} - {e}") + logger.error(f"MCP Check Error: Failed to get tool list from {mcp_server_url}: {type(e).__name__} - {e}") return None # --- MCP Client Function to Get Tool List --- END @@ -492,10 +582,10 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: if not url: current_status = "error: missing URL" SERVER_HEALTH_STATUS[path] = current_status - print(f"Health check skipped for {path}: Missing URL.") + logger.info(f"Health check skipped for {path}: Missing URL.") # --- Regenerate Nginx if status affecting it changed --- START if is_enabled and previous_status == "healthy": # Was healthy, now isn't (due to missing URL) - print(f"Status changed from healthy for {path}, regenerating Nginx config...") + logger.info(f"Status changed from healthy for {path}, regenerating Nginx config...") regenerate_nginx_config() # --- Regenerate Nginx if status affecting it changed --- END return current_status, last_checked_time @@ -503,7 +593,7 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: # Update status to 'checking' before performing the check # Only print if status actually changes to 'checking' if previous_status != "checking": - print(f"Setting status to 'checking' for {path} ({url})...") + logger.info(f"Setting status to 'checking' for {path} ({url})...") SERVER_HEALTH_STATUS[path] = "checking" # Optional: Consider a targeted broadcast here if immediate 'checking' feedback is desired # await broadcast_specific_update(path, "checking", last_checked_time) @@ -528,12 +618,12 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: if proc.returncode == 0: current_status = "healthy" - print(f"Health check successful for {path} ({url}).") + logger.info(f"Health check successful for {path} ({url}).") # --- Check for transition to healthy state --- START # Note: Tool list fetching moved inside the status transition check if previous_status != "healthy": - print(f"Service {path} transitioned to healthy. Regenerating Nginx config and fetching tool list...") + logger.info(f"Service {path} transitioned to healthy. Regenerating Nginx config and fetching tool list...") # --- Regenerate Nginx on transition TO healthy --- START regenerate_nginx_config() # --- Regenerate Nginx on transition TO healthy --- END @@ -555,58 +645,58 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: # if set(current_tool_list) != set(tool_list) or current_tool_count != new_tool_count: if current_tool_list_str != new_tool_list_str or current_tool_count != new_tool_count: - print(f"Updating tool list for {path}. New count: {new_tool_count}.") # Simplified log + logger.info(f"Updating tool list for {path}. New count: {new_tool_count}.") # Simplified log REGISTERED_SERVERS[path]["tool_list"] = tool_list # Store the new list of dicts REGISTERED_SERVERS[path]["num_tools"] = new_tool_count # Update the count # Save the updated server info to its file if not save_server_to_file(REGISTERED_SERVERS[path]): - print(f"ERROR: Failed to save updated tool list/count for {path} to file.") + logger.error(f"ERROR: Failed to save updated tool list/count for {path} to file.") else: - print(f"Tool list for {path} remains unchanged. No update needed.") + logger.info(f"Tool list for {path} remains unchanged. No update needed.") else: - print(f"Failed to retrieve tool list for healthy service {path}. List/Count remains unchanged.") + logger.info(f"Failed to retrieve tool list for healthy service {path}. List/Count remains unchanged.") else: # This case should technically not be reachable due to earlier url check - print(f"Cannot fetch tool list for {path}: proxy_pass_url is missing.") + logger.info(f"Cannot fetch tool list for {path}: proxy_pass_url is missing.") # --- Check for transition to healthy state --- END elif proc.returncode == 28: current_status = f"error: timeout ({HEALTH_CHECK_TIMEOUT_SECONDS}s)" - print(f"Health check timeout for {path} ({url})") + logger.info(f"Health check timeout for {path} ({url})") elif proc.returncode == 22: # HTTP error >= 400 current_status = "unhealthy (HTTP error)" - print(f"Health check unhealthy (HTTP >= 400) for {path} ({url}). Stderr: {stderr_str}") + logger.info(f"Health check unhealthy (HTTP >= 400) for {path} ({url}). Stderr: {stderr_str}") elif proc.returncode == 7: # Connection failed current_status = "error: connection failed" - print(f"Health check connection failed for {path} ({url}). Stderr: {stderr_str}") + logger.info(f"Health check connection failed for {path} ({url}). Stderr: {stderr_str}") else: # Other curl errors error_msg = f"error: check failed (code {proc.returncode})" if stderr_str: error_msg += f" - {stderr_str}" current_status = error_msg - print(f"Health check failed for {path} ({url}): {error_msg}") + logger.info(f"Health check failed for {path} ({url}): {error_msg}") except asyncio.TimeoutError: # This catches timeout on asyncio.wait_for, slightly different from curl's --max-time current_status = f"error: check process timeout" - print(f"Health check asyncio.wait_for timeout for {path} ({url})") + logger.info(f"Health check asyncio.wait_for timeout for {path} ({url})") except FileNotFoundError: current_status = "error: command not found" - print(f"ERROR: 'curl' command not found during health check for {path}. Cannot perform check.") + logger.error(f"ERROR: 'curl' command not found during health check for {path}. Cannot perform check.") # No need to stop all checks, just this one fails except Exception as e: current_status = f"error: {type(e).__name__}" - print(f"ERROR: Unexpected error during health check for {path} ({url}): {e}") + logger.error(f"ERROR: Unexpected error during health check for {path} ({url}): {e}") # Update the global status *after* the check completes SERVER_HEALTH_STATUS[path] = current_status - print(f"Final health status for {path}: {current_status}") + logger.info(f"Final health status for {path}: {current_status}") # --- Regenerate Nginx if status affecting it changed --- START # Check if the service is enabled AND its Nginx-relevant status changed if is_enabled: if previous_status == "healthy" and current_status != "healthy": - print(f"Status changed FROM healthy for enabled service {path}, regenerating Nginx config...") + logger.info(f"Status changed FROM healthy for enabled service {path}, regenerating Nginx config...") regenerate_nginx_config() # Regeneration on transition TO healthy is handled within the proc.returncode == 0 block above # elif previous_status != "healthy" and current_status == "healthy": @@ -622,7 +712,7 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: async def run_health_checks(): """Periodically checks the health of registered *enabled* services.""" while True: - print(f"Running periodic health checks (Interval: {HEALTH_CHECK_INTERVAL_SECONDS}s)...") + logger.info(f"Running periodic health checks (Interval: {HEALTH_CHECK_INTERVAL_SECONDS}s)...") paths_to_check = list(REGISTERED_SERVERS.keys()) needs_broadcast = False # Flag to check if any status actually changed @@ -647,11 +737,11 @@ async def run_health_checks(): # Also clear last check time when disabling? Or keep it? Keep for now. # SERVER_LAST_CHECK_TIME[path] = None needs_broadcast = True - print(f"Service {path} is disabled. Setting status.") + logger.info(f"Service {path} is disabled. Setting status.") continue # Skip health check for disabled services # --- Service is enabled, perform check using the new function --- - print(f"Performing periodic check for enabled service: {path}") + logger.info(f"Performing periodic check for enabled service: {path}") try: # Call the refactored check function # We only care if the status *changed* from the beginning of the cycle for broadcast purposes @@ -660,7 +750,7 @@ async def run_health_checks(): needs_broadcast = True except Exception as e: # Log error if the check function itself fails unexpectedly - print(f"ERROR: Unexpected exception calling perform_single_health_check for {path}: {e}") + logger.error(f"ERROR: Unexpected exception calling perform_single_health_check for {path}: {e}") # Update status to reflect this error? error_status = f"error: check execution failed ({type(e).__name__})" if previous_status != error_status: @@ -669,13 +759,13 @@ async def run_health_checks(): needs_broadcast = True - print(f"Finished periodic health checks. Current status map: {SERVER_HEALTH_STATUS}") + logger.info(f"Finished periodic health checks. Current status map: {SERVER_HEALTH_STATUS}") # Broadcast status update only if something changed during this cycle if needs_broadcast: - print("Broadcasting updated health status after periodic check...") + logger.info("Broadcasting updated health status after periodic check...") await broadcast_health_status() else: - print("No status changes detected in periodic check, skipping broadcast.") + logger.info("No status changes detected in periodic check, skipping broadcast.") # Wait for the next interval await asyncio.sleep(HEALTH_CHECK_INTERVAL_SECONDS) @@ -684,12 +774,27 @@ async def run_health_checks(): # --- Lifespan for Startup Task --- @asynccontextmanager async def lifespan(app: FastAPI): - print("Running startup tasks...") + # --- Configure Logging INSIDE lifespan --- START + # Ensure log directory exists + CONTAINER_LOG_DIR.mkdir(parents=True, exist_ok=True) # Should be defined now + + # Configure logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE_PATH), # Use correct variable + logging.StreamHandler() # Log to console (stdout/stderr) + ] + ) + logger.info("Logging configured. Running startup tasks...") # Now logger is configured + # --- Configure Logging INSIDE lifespan --- END + # 1. Load server definitions and persisted enabled/disabled state load_registered_servers_and_state() # 2. Perform initial health checks concurrently for *enabled* services - print("Performing initial health checks for enabled services...") + logger.info("Performing initial health checks for enabled services...") initial_check_tasks = [] enabled_paths = [path for path, is_enabled in MOCK_SERVICE_STATE.items() if is_enabled] @@ -703,7 +808,7 @@ async def lifespan(app: FastAPI): # Will be set by the check task below (or remain unset if check fails badly) SERVER_HEALTH_STATUS[path] = "checking" # Tentative status before check runs - print(f"Initially enabled services to check: {enabled_paths}") + logger.info(f"Initially enabled services to check: {enabled_paths}") if enabled_paths: for path in enabled_paths: # Create a task for each enabled service check @@ -717,22 +822,22 @@ async def lifespan(app: FastAPI): for i, result in enumerate(results): path = enabled_paths[i] if isinstance(result, Exception): - print(f"ERROR during initial health check for {path}: {result}") + logger.error(f"ERROR during initial health check for {path}: {result}") # Status might have already been set to an error state within the check function else: status, _ = result # Unpack the result tuple - print(f"Initial health check completed for {path}: Status = {status}") + logger.info(f"Initial health check completed for {path}: Status = {status}") else: - print("No services are initially enabled.") + logger.info("No services are initially enabled.") - print(f"Initial health status after checks: {SERVER_HEALTH_STATUS}") + logger.info(f"Initial health status after checks: {SERVER_HEALTH_STATUS}") # 3. Generate Nginx config *after* initial checks are done - print("Generating initial Nginx configuration...") + logger.info("Generating initial Nginx configuration...") regenerate_nginx_config() # Generate config based on initial health status # 4. Start the background periodic health check task - print("Starting background health check task...") + logger.info("Starting background health check task...") health_check_task = asyncio.create_task(run_health_checks()) # --- Yield to let the application run --- START @@ -740,13 +845,13 @@ async def lifespan(app: FastAPI): # --- Yield to let the application run --- END # --- Shutdown tasks --- START - print("Running shutdown tasks...") - print("Cancelling background health check task...") + logger.info("Running shutdown tasks...") + logger.info("Cancelling background health check task...") health_check_task.cancel() try: await health_check_task except asyncio.CancelledError: - print("Health check task cancelled successfully.") + logger.info("Health check task cancelled successfully.") # --- Shutdown tasks --- END @@ -841,10 +946,10 @@ async def login_submit( httponly=True, samesite="lax", ) - print(f"User '{username}' logged in successfully.") + logger.info(f"User '{username}' logged in successfully.") return response else: - print(f"Login failed for user '{username}'.") + logger.info(f"Login failed for user '{username}'.") return RedirectResponse( url="/login?error=Invalid+username+or+password", status_code=status.HTTP_303_SEE_OTHER, @@ -853,7 +958,7 @@ async def login_submit( @app.post("/logout") async def logout(): - print("User logged out.") + logger.info("User logged out.") response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie(SESSION_COOKIE_NAME) return response @@ -914,7 +1019,7 @@ async def toggle_service_route( new_state = enabled == "on" MOCK_SERVICE_STATE[service_path] = new_state server_name = REGISTERED_SERVERS[service_path]["server_name"] - print( + logger.info( f"Simulated toggle for '{server_name}' ({service_path}) to {new_state} by user '{username}'" ) @@ -925,14 +1030,14 @@ async def toggle_service_route( if new_state: # Perform immediate check when enabling - print(f"Performing immediate health check for {service_path} upon toggle ON...") + logger.info(f"Performing immediate health check for {service_path} upon toggle ON...") try: new_status, last_checked_dt = await perform_single_health_check(service_path) last_checked_iso = last_checked_dt.isoformat() if last_checked_dt else None - print(f"Immediate check for {service_path} completed. Status: {new_status}") + logger.info(f"Immediate check for {service_path} completed. Status: {new_status}") except Exception as e: # Handle potential errors during the immediate check itself - print(f"ERROR during immediate health check for {service_path}: {e}") + logger.error(f"ERROR during immediate health check for {service_path}: {e}") new_status = f"error: immediate check failed ({type(e).__name__})" # Update global state to reflect this error SERVER_HEALTH_STATUS[service_path] = new_status @@ -946,7 +1051,7 @@ async def toggle_service_route( last_checked_iso = last_checked_dt.isoformat() if last_checked_dt else None # Update global state directly when disabling SERVER_HEALTH_STATUS[service_path] = new_status - print(f"Service {service_path} toggled OFF. Status set to disabled.") + logger.info(f"Service {service_path} toggled OFF. Status set to disabled.") # --- Send *targeted* update via WebSocket --- START # Send immediate feedback for the toggled service only @@ -961,7 +1066,7 @@ async def toggle_service_route( } } message = json.dumps(update_data) - print(f"--- TOGGLE: Sending targeted update: {message}") + logger.info(f"--- TOGGLE: Sending targeted update: {message}") # Create task to send without blocking the request async def send_specific_update(): @@ -976,10 +1081,10 @@ async def send_specific_update(): for i, result in enumerate(results): conn, _ = send_tasks[i] if isinstance(result, Exception): - print(f"Error sending toggle update to WebSocket client {conn.client}: {result}. Marking for removal.") + logger.warning(f"Error sending toggle update to WebSocket client {conn.client}: {result}. Marking for removal.") disconnected_clients.add(conn) if disconnected_clients: - print(f"Removing {len(disconnected_clients)} disconnected clients after toggle update.") + logger.info(f"Removing {len(disconnected_clients)} disconnected clients after toggle update.") for conn in disconnected_clients: if conn in active_connections: active_connections.remove(conn) @@ -991,15 +1096,15 @@ async def send_specific_update(): try: with open(STATE_FILE_PATH, "w") as f: json.dump(MOCK_SERVICE_STATE, f, indent=2) - print(f"Persisted state to {STATE_FILE_PATH}") + logger.info(f"Persisted state to {STATE_FILE_PATH}") except Exception as e: - print(f"ERROR: Failed to persist state to {STATE_FILE_PATH}: {e}") + logger.error(f"ERROR: Failed to persist state to {STATE_FILE_PATH}: {e}") # Decide if we should raise an error or just log # --- Persist the updated state --- END # Regenerate Nginx config after toggling state if not regenerate_nginx_config(): - print("ERROR: Failed to update Nginx configuration after toggle.") + logger.error("ERROR: Failed to update Nginx configuration after toggle.") # --- Return JSON instead of Redirect --- START final_status = SERVER_HEALTH_STATUS.get(service_path, "unknown") @@ -1038,26 +1143,26 @@ async def register_service( license_str: Annotated[str, Form(alias="license")] = "N/A", username: Annotated[str, Depends(api_auth)] = None, ): - print(f"[DEBUG] register_service() called with parameters:") - print(f"[DEBUG] - name: {name}") - print(f"[DEBUG] - description: {description}") - print(f"[DEBUG] - path: {path}") - print(f"[DEBUG] - proxy_pass_url: {proxy_pass_url}") - print(f"[DEBUG] - tags: {tags}") - print(f"[DEBUG] - num_tools: {num_tools}") - print(f"[DEBUG] - num_stars: {num_stars}") - print(f"[DEBUG] - is_python: {is_python}") - print(f"[DEBUG] - license_str: {license_str}") - print(f"[DEBUG] - username: {username}") + logger.info(f"[DEBUG] register_service() called with parameters:") + logger.info(f"[DEBUG] - name: {name}") + logger.info(f"[DEBUG] - description: {description}") + logger.info(f"[DEBUG] - path: {path}") + logger.info(f"[DEBUG] - proxy_pass_url: {proxy_pass_url}") + logger.info(f"[DEBUG] - tags: {tags}") + logger.info(f"[DEBUG] - num_tools: {num_tools}") + logger.info(f"[DEBUG] - num_stars: {num_stars}") + logger.info(f"[DEBUG] - is_python: {is_python}") + logger.info(f"[DEBUG] - license_str: {license_str}") + logger.info(f"[DEBUG] - username: {username}") # Ensure path starts with a slash if not path.startswith("/"): path = "/" + path - print(f"[DEBUG] Path adjusted to start with slash: {path}") + logger.info(f"[DEBUG] Path adjusted to start with slash: {path}") # Check if path already exists if path in REGISTERED_SERVERS: - print(f"[ERROR] Service registration failed: path '{path}' already exists") + logger.error(f"[ERROR] Service registration failed: path '{path}' already exists") return JSONResponse( status_code=400, content={"error": f"Service with path '{path}' already exists"}, @@ -1065,7 +1170,7 @@ async def register_service( # Process tags: split string, strip whitespace, filter empty tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()] - print(f"[DEBUG] Processed tags: {tag_list}") + logger.info(f"[DEBUG] Processed tags: {tag_list}") # Create new server entry with all fields server_entry = { @@ -1080,56 +1185,56 @@ async def register_service( "license": license_str, "tool_list": [] # Initialize tool list } - print(f"[DEBUG] Created server entry: {json.dumps(server_entry, indent=2)}") + logger.info(f"[DEBUG] Created server entry: {json.dumps(server_entry, indent=2)}") # Save to individual file - print(f"[DEBUG] Attempting to save server data to file...") + logger.info(f"[DEBUG] Attempting to save server data to file...") success = save_server_to_file(server_entry) if not success: - print(f"[ERROR] Failed to save server data to file") + logger.error(f"[ERROR] Failed to save server data to file") return JSONResponse( status_code=500, content={"error": "Failed to save server data"} ) - print(f"[DEBUG] Successfully saved server data to file") + logger.info(f"[DEBUG] Successfully saved server data to file") # Add to in-memory registry and default to disabled - print(f"[DEBUG] Adding server to in-memory registry...") + logger.info(f"[DEBUG] Adding server to in-memory registry...") REGISTERED_SERVERS[path] = server_entry - print(f"[DEBUG] Setting initial service state to disabled") + logger.info(f"[DEBUG] Setting initial service state to disabled") MOCK_SERVICE_STATE[path] = False # Set initial health status for the new service (always start disabled) - print(f"[DEBUG] Setting initial health status to 'disabled'") + logger.info(f"[DEBUG] Setting initial health status to 'disabled'") SERVER_HEALTH_STATUS[path] = "disabled" # Start disabled SERVER_LAST_CHECK_TIME[path] = None # No check time yet # Ensure num_tools is present in the in-memory dict immediately if "num_tools" not in REGISTERED_SERVERS[path]: - print(f"[DEBUG] Adding missing num_tools field to in-memory registry") + logger.info(f"[DEBUG] Adding missing num_tools field to in-memory registry") REGISTERED_SERVERS[path]["num_tools"] = 0 # Regenerate Nginx config after successful registration - print(f"[DEBUG] Attempting to regenerate Nginx configuration...") + logger.info(f"[DEBUG] Attempting to regenerate Nginx configuration...") if not regenerate_nginx_config(): - print(f"[ERROR] Failed to update Nginx configuration after registration") + logger.error(f"[ERROR] Failed to update Nginx configuration after registration") else: - print(f"[DEBUG] Successfully regenerated Nginx configuration") + logger.info(f"[DEBUG] Successfully regenerated Nginx configuration") - print(f"[INFO] New service registered: '{name}' at path '{path}' by user '{username}'") + logger.info(f"[INFO] New service registered: '{name}' at path '{path}' by user '{username}'") # --- Persist the updated state after registration --- START try: - print(f"[DEBUG] Attempting to persist state to {STATE_FILE_PATH}...") + logger.info(f"[DEBUG] Attempting to persist state to {STATE_FILE_PATH}...") with open(STATE_FILE_PATH, "w") as f: json.dump(MOCK_SERVICE_STATE, f, indent=2) - print(f"[DEBUG] Successfully persisted state to {STATE_FILE_PATH}") + logger.info(f"[DEBUG] Successfully persisted state to {STATE_FILE_PATH}") except Exception as e: - print(f"[ERROR] Failed to persist state to {STATE_FILE_PATH}: {str(e)}") + logger.error(f"[ERROR] Failed to persist state to {STATE_FILE_PATH}: {str(e)}") # --- Persist the updated state after registration --- END # Broadcast the updated status after registration - print(f"[DEBUG] Creating task to broadcast health status...") + logger.info(f"[DEBUG] Creating task to broadcast health status...") asyncio.create_task(broadcast_health_status()) - print(f"[DEBUG] Registration complete, returning success response") + logger.info(f"[DEBUG] Registration complete, returning success response") return JSONResponse( status_code=201, content={ @@ -1209,7 +1314,7 @@ async def get_service_tools( raise HTTPException(status_code=404, detail="Tool list not available yet. Service may not be healthy or check is pending.") elif not isinstance(tool_list, list): # Data integrity check - print(f"Warning: tool_list for {service_path} is not a list: {type(tool_list)}") + logger.warning(f"Warning: tool_list for {service_path} is not a list: {type(tool_list)}") raise HTTPException(status_code=500, detail="Internal server error: Invalid tool list format.") return {"service_path": service_path, "tools": tool_list} @@ -1231,19 +1336,19 @@ async def refresh_service(service_path: str, username: Annotated[str, Depends(ap if not is_enabled: raise HTTPException(status_code=400, detail="Cannot refresh a disabled service") - print(f"Manual refresh requested for {service_path} by user '{username}'...") + logger.info(f"Manual refresh requested for {service_path} by user '{username}'...") try: # Trigger the health check (which also updates tools if healthy) await perform_single_health_check(service_path) # --- Regenerate Nginx config after manual refresh --- START # The health check itself might trigger regeneration, but do it explicitly # here too to ensure it happens after the refresh attempt completes. - print(f"Regenerating Nginx config after manual refresh for {service_path}...") + logger.info(f"Regenerating Nginx config after manual refresh for {service_path}...") regenerate_nginx_config() # --- Regenerate Nginx config after manual refresh --- END except Exception as e: # Catch potential errors during the check itself - print(f"ERROR during manual refresh check for {service_path}: {e}") + logger.error(f"ERROR during manual refresh check for {service_path}: {e}") # Update status to reflect the error error_status = f"error: refresh execution failed ({type(e).__name__})" SERVER_HEALTH_STATUS[service_path] = error_status @@ -1252,7 +1357,7 @@ async def refresh_service(service_path: str, username: Annotated[str, Depends(ap await broadcast_single_service_update(service_path) # --- Regenerate Nginx config even after refresh failure --- START # Ensure Nginx reflects the error state if it was previously healthy - print(f"Regenerating Nginx config after manual refresh failed for {service_path}...") + logger.info(f"Regenerating Nginx config after manual refresh failed for {service_path}...") regenerate_nginx_config() # --- Regenerate Nginx config even after refresh failure --- END # Return error response @@ -1346,10 +1451,10 @@ async def edit_server_submit( # Regenerate Nginx config as proxy_pass_url might have changed if not regenerate_nginx_config(): - print("ERROR: Failed to update Nginx configuration after edit.") + logger.error("ERROR: Failed to update Nginx configuration after edit.") # Consider how to notify user - maybe flash message system needed - print(f"Server '{name}' ({service_path}) updated by user '{username}'") + logger.info(f"Server '{name}' ({service_path}) updated by user '{username}'") # Redirect back to the main page return RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) @@ -1376,7 +1481,7 @@ async def broadcast_single_service_update(service_path: str): } } message = json.dumps(update_data) - print(f"--- BROADCAST SINGLE: Sending update for {service_path}: {message}") + logger.info(f"--- BROADCAST SINGLE: Sending update for {service_path}: {message}") # Use the same concurrent sending logic as in toggle disconnected_clients = set() @@ -1390,10 +1495,10 @@ async def broadcast_single_service_update(service_path: str): for i, result in enumerate(results): conn, _ = send_tasks[i] if isinstance(result, Exception): - print(f"Error sending single update to WebSocket client {conn.client}: {result}. Marking for removal.") + logger.warning(f"Error sending single update to WebSocket client {conn.client}: {result}. Marking for removal.") disconnected_clients.add(conn) if disconnected_clients: - print(f"Removing {len(disconnected_clients)} disconnected clients after single update broadcast.") + logger.info(f"Removing {len(disconnected_clients)} disconnected clients after single update broadcast.") for conn in disconnected_clients: if conn in active_connections: active_connections.remove(conn) @@ -1405,7 +1510,7 @@ async def broadcast_single_service_update(service_path: str): async def websocket_endpoint(websocket: WebSocket): await websocket.accept() active_connections.add(websocket) - print(f"WebSocket client connected: {websocket.client}") + logger.info(f"WebSocket client connected: {websocket.client}") try: # --- Send initial status upon connection (Formatted) --- START initial_data_to_send = {} @@ -1430,13 +1535,13 @@ async def websocket_endpoint(websocket: WebSocket): # We don't expect messages from client in this case, just keep alive await websocket.receive_text() # This will raise WebSocketDisconnect if client closes except WebSocketDisconnect: - print(f"WebSocket client disconnected: {websocket.client}") + logger.info(f"WebSocket client disconnected: {websocket.client}") except Exception as e: - print(f"WebSocket error for {websocket.client}: {e}") + logger.error(f"WebSocket error for {websocket.client}: {e}") finally: if websocket in active_connections: active_connections.remove(websocket) - print(f"WebSocket connection removed: {websocket.client}") + logger.info(f"WebSocket connection removed: {websocket.client}") # --- Run (for local testing) --- From fe1c70898c90bd61420130ef2644b6c9ae966f54 Mon Sep 17 00:00:00 2001 From: dheerajoruganty Date: Sun, 4 May 2025 07:46:58 +0000 Subject: [PATCH 2/2] chore: Fix linting issues with ruff --- registry/main.py | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/registry/main.py b/registry/main.py index a2f49c8..3fc44a5 100644 --- a/registry/main.py +++ b/registry/main.py @@ -1,5 +1,4 @@ import os -import re import json import secrets import asyncio @@ -7,7 +6,7 @@ from contextlib import asynccontextmanager from pathlib import Path # Import Path from typing import Annotated, List, Set -from datetime import datetime, timezone, timedelta +from datetime import datetime, timezone from fastapi import ( FastAPI, @@ -25,7 +24,6 @@ from fastapi.templating import Jinja2Templates from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature from dotenv import load_dotenv -import subprocess # Added for nginx reload import logging # --- MCP Client Imports --- START @@ -506,13 +504,17 @@ async def get_tools_from_server(base_url: str) -> List[dict] | None: # Return li current_section = "args" section_content = [stripped_line[len("Args:"):].strip()] elif stripped_line.startswith("Returns:"): - if current_section != "main": parsed_desc[current_section] = "\n".join(section_content).strip() - else: parsed_desc["main"] = "\n".join(main_desc_lines).strip() + if current_section != "main": + parsed_desc[current_section] = "\n".join(section_content).strip() + else: + parsed_desc["main"] = "\n".join(main_desc_lines).strip() current_section = "returns" section_content = [stripped_line[len("Returns:"):].strip()] elif stripped_line.startswith("Raises:"): - if current_section != "main": parsed_desc[current_section] = "\n".join(section_content).strip() - else: parsed_desc["main"] = "\n".join(main_desc_lines).strip() + if current_section != "main": + parsed_desc[current_section] = "\n".join(section_content).strip() + else: + parsed_desc["main"] = "\n".join(main_desc_lines).strip() current_section = "raises" section_content = [stripped_line[len("Raises:"):].strip()] elif current_section == "main": @@ -678,7 +680,7 @@ async def perform_single_health_check(path: str) -> tuple[str, datetime | None]: except asyncio.TimeoutError: # This catches timeout on asyncio.wait_for, slightly different from curl's --max-time - current_status = f"error: check process timeout" + current_status = "error: check process timeout" logger.info(f"Health check asyncio.wait_for timeout for {path} ({url})") except FileNotFoundError: current_status = "error: command not found" @@ -1143,7 +1145,7 @@ async def register_service( license_str: Annotated[str, Form(alias="license")] = "N/A", username: Annotated[str, Depends(api_auth)] = None, ): - logger.info(f"[DEBUG] register_service() called with parameters:") + logger.info("[DEBUG] register_service() called with parameters:") logger.info(f"[DEBUG] - name: {name}") logger.info(f"[DEBUG] - description: {description}") logger.info(f"[DEBUG] - path: {path}") @@ -1188,35 +1190,35 @@ async def register_service( logger.info(f"[DEBUG] Created server entry: {json.dumps(server_entry, indent=2)}") # Save to individual file - logger.info(f"[DEBUG] Attempting to save server data to file...") + logger.info("[DEBUG] Attempting to save server data to file...") success = save_server_to_file(server_entry) if not success: - logger.error(f"[ERROR] Failed to save server data to file") + logger.error("[ERROR] Failed to save server data to file") return JSONResponse( status_code=500, content={"error": "Failed to save server data"} ) - logger.info(f"[DEBUG] Successfully saved server data to file") + logger.info("[DEBUG] Successfully saved server data to file") # Add to in-memory registry and default to disabled - logger.info(f"[DEBUG] Adding server to in-memory registry...") + logger.info("[DEBUG] Adding server to in-memory registry...") REGISTERED_SERVERS[path] = server_entry - logger.info(f"[DEBUG] Setting initial service state to disabled") + logger.info("[DEBUG] Setting initial service state to disabled") MOCK_SERVICE_STATE[path] = False # Set initial health status for the new service (always start disabled) - logger.info(f"[DEBUG] Setting initial health status to 'disabled'") + logger.info("[DEBUG] Setting initial health status to 'disabled'") SERVER_HEALTH_STATUS[path] = "disabled" # Start disabled SERVER_LAST_CHECK_TIME[path] = None # No check time yet # Ensure num_tools is present in the in-memory dict immediately if "num_tools" not in REGISTERED_SERVERS[path]: - logger.info(f"[DEBUG] Adding missing num_tools field to in-memory registry") + logger.info("[DEBUG] Adding missing num_tools field to in-memory registry") REGISTERED_SERVERS[path]["num_tools"] = 0 # Regenerate Nginx config after successful registration - logger.info(f"[DEBUG] Attempting to regenerate Nginx configuration...") + logger.info("[DEBUG] Attempting to regenerate Nginx configuration...") if not regenerate_nginx_config(): - logger.error(f"[ERROR] Failed to update Nginx configuration after registration") + logger.error("[ERROR] Failed to update Nginx configuration after registration") else: - logger.info(f"[DEBUG] Successfully regenerated Nginx configuration") + logger.info("[DEBUG] Successfully regenerated Nginx configuration") logger.info(f"[INFO] New service registered: '{name}' at path '{path}' by user '{username}'") @@ -1231,10 +1233,10 @@ async def register_service( # --- Persist the updated state after registration --- END # Broadcast the updated status after registration - logger.info(f"[DEBUG] Creating task to broadcast health status...") + logger.info("[DEBUG] Creating task to broadcast health status...") asyncio.create_task(broadcast_health_status()) - logger.info(f"[DEBUG] Registration complete, returning success response") + logger.info("[DEBUG] Registration complete, returning success response") return JSONResponse( status_code=201, content={