-
Notifications
You must be signed in to change notification settings - Fork 33
Gateway error handling #360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
94a6f76 to
af8b5b6
Compare
- Add MCPToolsCache class with persistent caching - Implement live tools → saved cache → hardcoded fallback - Include hardcoded ask_pieces_ltm and create_pieces_memory tools - Auto-save live tools to cache for offline use - Provide get_available_tools() for fallback system
…dation - Add 3-step validation system (health → compatibility → LTM) - Implement contextual error messages with actionable instructions - Add smart caching integration for offline scenarios - Remove automatic PiecesOS startup for user control - Add health WebSocket integration for better status checking - Improve error messages: 'pieces open', 'pieces update', 'pieces open --ltm' - Add version compatibility caching to reduce API calls - Handle upstream URL failures gracefully with lazy loading - Update server version to 0.2.0
- Add 19 unit tests with mocking for all validation flows - Test PiecesOS not running → 'pieces open' message - Test CLI version incompatible → 'pieces manage update' message - Test PiecesOS version incompatible → 'pieces update' message - Test LTM disabled → 'pieces open --ltm' message - Test connection failures → 'pieces restart' message - Mock all validation functions (_check_pieces_os_status, _check_ltm_status, etc.) - Verify error message content and user guidance - Test caching behavior and multiple validation calls - Ensure all integration tests continue to pass
- Add 'pieces open --ltm' command support for LTM activation - Enhance WebSocket health checking and error handling - Add restart command for MCP troubleshooting - Update LTM status management and caching - Improve E2E tests for better MCP gateway coverage - Add URL handling for MCP server endpoints
- Update poetry.lock with latest dependency versions - Update pyproject.toml configuration for testing improvements
- Replace try/except with direct PiecesOS running check - Streamline conditional flow for tools discovery - Use hardcoded cache as final fallback instead of empty list - Improve code readability and error handling
af8b5b6 to
3142f80
Compare
|
Question - couldn't it just launch pieces ? |
Yeah we can do that but the old method was we launch on start up and if you closed pieces in the middle of the session the cli will crash and you will have to restart it but now it will continue normally |
ed5e69d to
31073ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements comprehensive gateway error handling for the MCP (Model Context Protocol) server with enhanced validation and user guidance. The changes include a 3-step validation system to check PiecesOS status, version compatibility, and LTM availability before executing tool calls, providing specific command instructions to users when issues are detected.
- Adds comprehensive validation system with specific error messages and recovery commands
- Implements tools caching and fallback mechanisms when PiecesOS is unavailable
- Introduces new CLI commands for LTM management and PiecesOS restart functionality
Reviewed Changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/pieces/mcp/gateway.py | Core gateway implementation with validation system and error handling |
| src/pieces/mcp/tools_cache.py | New hardcoded fallback tools for offline scenarios |
| tests/mcps/mcp_gateway_test.py | Comprehensive unit tests for validation flows |
| src/pieces/command_interface/simple_commands.py | New restart command implementation |
| src/pieces/command_interface/open_command.py | Added LTM flag support |
| src/pieces/core/open_command.py | Enhanced open command with LTM auto-enable |
| src/pieces/copilot/ltm.py | LTM functions with auto-enable capability |
| pyproject.toml | MCP version update |
31073ed to
ed0469a
Compare
🔍 Comprehensive Code Review for PR #360: Gateway Error HandlingI've started a deep review of this PR implementing enhanced error handling for the MCP Gateway. Here's my initial assessment: 🎯 OverviewThis PR significantly improves error handling in the MCP Gateway by implementing a 3-step validation system that provides clear, actionable error messages to LLM agents when various failure conditions occur. The changes include:
📊 Technical Scope
I'll be posting detailed code-level comments for critical issues found. Stay tuned for the comprehensive analysis. |
tsavo-at-pieces
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical Issue: Incorrect Update Command in Error Message
File: src/pieces/mcp/gateway.py:96
"Please update the CLI version to be able to run the tool call, run 'pieces manage update' to get the latest version. then retry your request again after updating.",Problem: The command pieces manage update appears to be incorrect. Based on the codebase structure, there's no manage command registered. This will confuse users and LLM agents.
Solution: Verify the correct update command and fix the message:
# If the update command is 'pieces update':
"Please update the CLI version to be able to run the tool call, run 'pieces update' to get the latest version. Then retry your request again after updating.",
# Or if it requires a different command:
"Please update the CLI version to be able to run the tool call. Visit https://docs.pieces.app/cli/update for instructions. Then retry your request again after updating.",Also note:
- Capitalize "Then" after the period
- Consider adding a link to documentation for clarity
- The same issue appears in tests at line 93 of
mcp_gateway_test.py
EOF < /dev/null
tsavo-at-pieces
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Architecture Issue: Race Condition in Health WebSocket Check
File: src/pieces/mcp/gateway.py:104-128
def _check_pieces_os_status(self):
"""Check if PiecesOS is running using health WebSocket"""
# First check if health_ws is already running and connected
if HealthWS.is_running() and getattr(
Settings.pieces_client, "is_pos_stream_running", False
):
return True
# If health_ws is not running, check if PiecesOS is available
if Settings.pieces_client.is_pieces_running():
try:
# Try to start the health WebSocket
if health_ws := Settings.pieces_client.health_ws:
health_ws.start()Problems:
- Race condition between checking
HealthWS.is_running()and starting it - Using
getattrwith a hardcoded string suggestsis_pos_stream_runningmight not always exist - No synchronization mechanism for concurrent calls
- The health_ws reference at line 116 is not defined in scope
Solution:
def _check_pieces_os_status(self):
"""Check if PiecesOS is running using health WebSocket"""
# Use a lock to prevent race conditions
with self._health_check_lock: # Add this lock to __init__
# First check if already connected
if (HealthWS.is_running() and
hasattr(Settings.pieces_client, 'is_pos_stream_running') and
Settings.pieces_client.is_pos_stream_running):
return True
# Check if PiecesOS is available
if not Settings.pieces_client.is_pieces_running():
return False
try:
# Get the health WebSocket instance
health_ws = HealthWS.get_instance()
if health_ws and not health_ws.is_running():
health_ws.start()
# Update LTM status cache
Settings.pieces_client.copilot.context.ltm.ltm_status = (
Settings.pieces_client.work_stream_pattern_engine_api
.workstream_pattern_engine_processors_vision_status()
)
return True
except Exception as e:
Settings.logger.debug(f"Failed to start health WebSocket: {e}")
return FalseAlso add to __init__:
self._health_check_lock = asyncio.Lock()EOF < /dev/null
tsavo-at-pieces
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security Issue: Potential Command Injection in Error Messages
File: src/pieces/mcp/gateway.py:145-150
if not self._check_pieces_os_status():
return False, (
f"PiecesOS is not running. To use the '{tool_name}' tool, please run:\n\n"
"`pieces open`\n\n"
"This will start PiecesOS, then you can retry your request."
)Problem: The tool_name parameter is directly interpolated into error messages without validation. A malicious tool name could contain special characters or control sequences that could be interpreted by terminal emulators or logging systems.
Solution: Sanitize the tool name before using it in messages:
def _sanitize_tool_name(self, tool_name: str) -> str:
"""Sanitize tool name for safe inclusion in messages."""
# Remove control characters and limit length
import re
sanitized = re.sub(r'[^\w\s\-_.]', '', tool_name)
return sanitized[:100] # Limit to reasonable length
def _validate_system_status(self, tool_name: str) -> tuple[bool, str]:
"""
Perform 3-step validation before executing any command:
1. Check health WebSocket
2. Check compatibility
3. Check LTM (for LTM tools)
Returns:
tuple[bool, str]: (is_valid, error_message)
"""
# Sanitize tool name for safe display
safe_tool_name = self._sanitize_tool_name(tool_name)
# Step 1: Check health WebSocket / PiecesOS status
if not self._check_pieces_os_status():
return False, (
f"PiecesOS is not running. To use the '{safe_tool_name}' tool, please run:\n\n"
"`pieces open`\n\n"
"This will start PiecesOS, then you can retry your request."
)Apply the same sanitization in all error messages that include the tool name (lines 163, 172, 181).
EOF < /dev/null
Additional Line-Specific Issues Found1. Missing Retry Logic (src/pieces/mcp/gateway.py:360-389)The 2. Missing Error Handling in URL Fetch (src/pieces/mcp/gateway.py:44-51)The 3. Capitalization Issue (src/pieces/mcp/gateway.py:104)Error message says 'run' but should be 'Run' for consistency with other messages. 4. Grammar Error (src/pieces/command_interface/simple_commands.py:236)'Please running' should be 'Please run' 5. Empty URL Constant (src/pieces/urls.py:80)
6. Test Coverage Gaps (tests/mcps/mcp_gateway_test.py)Missing tests for:
7. Method Call Error (src/pieces/mcp/gateway.py:430)Line has 8. Potential Memory Issue (src/pieces/mcp/gateway.py:203)While using SHA256 is good, consider only hashing essential parts of tool descriptions to avoid memory issues with very large descriptions. All of these issues should be addressed before merging to ensure robustness and reliability of the gateway implementation. |
Follow-up on Author's CommentsThank you for the quick responses! Let me verify the updates: ✅ Verified Updates:
🔍 Remaining Issues:
📋 Recommendations:
The race condition fix looks good! Just need to resolve these remaining items before merge. |
@bishoy-at-pieces it seems like some of these are easy fixes and good to double check b4 merge |
0bafec6 to
00e212a
Compare
🔍 Updated Review Status for PR #360: Gateway Error Handling🚨 Critical Issues That Block Merge1. Incorrect Update Command (HIGH PRIORITY)
2.
|
…p/cli-agent into gateway-error-handling
253e377 to
84dde17
Compare
✅ All Fixes VerifiedI've reviewed the updated code in the Fixed Issues ✓
Integration with PR #351 ✓The error messages now correctly reference This PR is ready for final review and merge once PR #351 lands. Excellent work addressing all the security and reliability concerns! 🎯 |
📋 Outstanding Issues Summary for PR #360Based on comprehensive review of all historical comments and current code state: ✅ Fixed Issues (CONFIRMED)
🚧 Must-Have Issues (BLOCKERS)
|
🔧 Complete Implementation Guide for Outstanding IssuesBased on extensive research and best practices analysis, here are the complete code solutions for all outstanding issues: 1. Retry Logic with Exponential Backoff (src/pieces/mcp/gateway.py)Add this retry handler class before the import asyncio
import random
from typing import Optional, Callable, TypeVar, Awaitable, Tuple
from functools import wraps
T = TypeVar('T')
class ExponentialBackoffRetry:
"""Exponential backoff retry handler for async operations"""
def __init__(self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0,
jitter_range: Tuple[float, float] = (0.0, 0.3)):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter_range = jitter_range
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
# Exponential backoff: 1s, 2s, 4s, 8s... capped at max_delay
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
# Add jitter to prevent thundering herd
jitter = random.uniform(*self.jitter_range) * delay
return delay + jitter
async def __call__(self,
func: Callable[..., Awaitable[T]],
*args,
on_retry: Optional[Callable[[Exception, int], Awaitable[None]]] = None,
**kwargs) -> T:
"""Execute function with retry logic"""
last_exception = None
for attempt in range(self.max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
# Check if we should retry this exception
if not self.should_retry(e):
raise
if attempt < self.max_attempts - 1:
delay = self.calculate_delay(attempt)
# Call retry callback if provided
if on_retry:
await on_retry(e, attempt)
await asyncio.sleep(delay)
# All attempts exhausted
raise last_exception
def should_retry(self, exception: Exception) -> bool:
"""Determine if we should retry based on exception type"""
# Only retry transient network errors
retryable_exceptions = (
asyncio.TimeoutError,
ConnectionError,
OSError,
# Add specific MCP/SSE client exceptions if available
)
# Don't retry on these
non_retryable = (
ValueError, # Configuration errors
TypeError, # Programming errors
KeyboardInterrupt, # User interruption
SystemExit, # System shutdown
)
if isinstance(exception, non_retryable):
return False
return isinstance(exception, retryable_exceptions)Then modify the async def call_tool(self, name, arguments):
"""Calls a tool on the POS MCP server with retry logic."""
Settings.logger.debug(f"Calling tool: {name}")
# Perform 3-step validation before attempting to call tool
is_valid, error_message = self._validate_system_status(name)
if not is_valid:
Settings.logger.debug(f"Tool validation failed for {name}: {error_message}")
return types.CallToolResult(
content=[types.TextContent(type="text", text=error_message)]
)
# Initialize retry handler
retry_handler = ExponentialBackoffRetry(
max_attempts=3,
base_delay=1.0,
max_delay=10.0,
jitter_range=(0.0, 0.3)
)
async def on_retry(exception, attempt):
Settings.logger.warning(
f"Tool call '{name}' attempt {attempt + 1} failed: {exception}. "
f"Retrying in {retry_handler.calculate_delay(attempt):.2f} seconds..."
)
# All validations passed, try to call the upstream tool with retry
try:
async def call_upstream():
Settings.logger.debug(f"Calling upstream tool: {name}")
session = await self.connect()
result = await session.call_tool(name, arguments)
Settings.logger.debug(f"Successfully called tool: {name}")
Settings.logger.debug(f"with results: {result}")
return result
return await retry_handler(call_upstream, on_retry=on_retry)
except Exception as e:
Settings.logger.error(f"Error calling POS MCP {name} after retries: {e}", exc_info=True)
# Return a helpful error message based on the tool and system status
error_message = self._get_error_message_for_tool(name)
return types.CallToolResult(
content=[types.TextContent(type="text", text=error_message)]
)2. Fix Bare Except Clause (src/pieces/mcp/gateway.py, line ~51)Replace the bare except with specific exception handling: def _try_get_upstream_url(self):
"""Try to get the upstream URL if we don't have it yet."""
if self.upstream_url is None:
if Settings.pieces_client.is_pieces_running():
try:
self.upstream_url = get_mcp_latest_url()
return True
except (ConnectionError, OSError, ValueError, AttributeError) as e:
# ConnectionError: Network issues
# OSError: System-level errors
# ValueError: Invalid response/parsing errors
# AttributeError: Missing expected attributes
Settings.logger.debug(f"Failed to get MCP upstream URL: {type(e).__name__}: {e}")
return False
except Exception as e:
# Log unexpected exceptions but don't crash
Settings.logger.error(f"Unexpected error getting MCP URL: {type(e).__name__}: {e}", exc_info=True)
return False
return False
return True3. Add Timeout to URL Fetch (src/pieces/mcp/utils.py)Modify the import asyncio
from typing import Optional
async def get_mcp_latest_url_with_timeout(timeout: float = 5.0) -> Optional[str]:
"""Get MCP URL with timeout protection"""
try:
# If the original function is sync, run it in executor
loop = asyncio.get_event_loop()
url = await asyncio.wait_for(
loop.run_in_executor(None, get_mcp_latest_url),
timeout=timeout
)
return url
except asyncio.TimeoutError:
Settings.logger.error(f"Timeout getting MCP URL after {timeout} seconds")
raise
except Exception as e:
Settings.logger.error(f"Error getting MCP URL: {e}", exc_info=True)
raiseThen update async def _try_get_upstream_url_async(self):
"""Try to get the upstream URL with timeout protection."""
if self.upstream_url is None:
if Settings.pieces_client.is_pieces_running():
try:
# Use asyncio.to_thread for Python 3.9+ or run_in_executor for older versions
if hasattr(asyncio, 'to_thread'):
self.upstream_url = await asyncio.wait_for(
asyncio.to_thread(get_mcp_latest_url),
timeout=5.0
)
else:
loop = asyncio.get_event_loop()
self.upstream_url = await asyncio.wait_for(
loop.run_in_executor(None, get_mcp_latest_url),
timeout=5.0
)
return True
except asyncio.TimeoutError:
Settings.logger.warning("Timeout getting MCP upstream URL after 5 seconds")
return False
except (ConnectionError, OSError, ValueError, AttributeError) as e:
Settings.logger.debug(f"Failed to get MCP upstream URL: {type(e).__name__}: {e}")
return False
except Exception as e:
Settings.logger.error(f"Unexpected error getting MCP URL: {type(e).__name__}: {e}", exc_info=True)
return False
return False
return True4. Update TODO Comment (src/pieces/mcp/gateway.py, line ~283)Remove the outdated TODO: # Line 280-284, change from:
try:
await (
self.session.send_ping()
) # TODO: Uncomment when ping is implemented
Settings.logger.debug("Using existing upstream connection")
# To:
try:
await self.session.send_ping()
Settings.logger.debug("Using existing upstream connection")5. Add Comprehensive Error Handling to Connect Method (line ~275)Update the connect method with better error handling: async def connect(self, send_notification: bool = True):
"""Ensures a connection to the POS server exists and returns it."""
async with self.connection_lock:
if self.session is not None:
# Validate the existing session is still alive
try:
await self.session.send_ping()
Settings.logger.debug("Using existing upstream connection")
return self.session
except Exception as e:
Settings.logger.debug(
f"Existing connection is stale: {e}, creating new connection"
)
# Clean up the stale connection
await self._cleanup_stale_session()
self.session = None
self.sse_client = None
# Try to get upstream URL if we don't have it (with timeout)
if not await self._try_get_upstream_url_async():
raise ConnectionError(
"Cannot get MCP upstream URL - PiecesOS may not be running. "
"Please run 'pieces open' to start PiecesOS."
)
try:
Settings.logger.info(
f"Connecting to upstream MCP server at {self.upstream_url}"
)
# Add timeout to SSE client connection
self.sse_client = sse_client(self.upstream_url)
read_stream, write_stream = await asyncio.wait_for(
self.sse_client.__aenter__(),
timeout=30.0 # 30 second connection timeout
)
session = ClientSession(read_stream, write_stream)
await session.__aenter__()
self.session = session
await self.update_tools(session, send_notification)
await self.setup_notification_handler(session)
return session
except asyncio.TimeoutError:
self.session = None
Settings.logger.error(
f"Timeout connecting to upstream server at {self.upstream_url} after 30 seconds"
)
raise ConnectionError(
"Connection to MCP server timed out. "
"Please check if PiecesOS is running and try again."
)
except Exception as e:
self.session = None
Settings.logger.error(
f"Error connecting to upstream server: {e}", exc_info=True
)
raise ConnectionError(
f"Failed to connect to MCP server: {type(e).__name__}: {e}"
)6. Create Custom Exception Classes (Add at top of gateway.py)class MCPError(Exception):
"""Base exception for MCP gateway errors"""
pass
class MCPConnectionError(MCPError):
"""Raised when MCP gateway cannot connect to upstream"""
pass
class MCPTimeoutError(MCPError):
"""Raised when MCP operations timeout"""
pass
class MCPValidationError(MCPError):
"""Raised when system validation fails"""
pass7. Add Configuration Constants (At top of file after imports)# Timeout configurations
DEFAULT_CONNECTION_TIMEOUT = 30.0 # Connection establishment
DEFAULT_OPERATION_TIMEOUT = 60.0 # Individual operations
DEFAULT_PING_TIMEOUT = 5.0 # Health check pings
DEFAULT_URL_FETCH_TIMEOUT = 5.0 # URL fetching
# Retry configurations
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_BASE_DELAY = 1.0
DEFAULT_RETRY_MAX_DELAY = 10.0
DEFAULT_RETRY_JITTER = (0.0, 0.3)Summary of Changes
These changes follow industry best practices and will make the gateway more resilient to transient failures while providing better debugging information when issues occur. |
…p/cli-agent into gateway-error-handling
1b99da2 to
920dfc6
Compare
tsavo-at-pieces
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking solid. Definitely have @kirollos-at-pieces do some extensive testing!
Also do we have integration/unit tests for this as well?
|
yes we do have tests here @tsavo-at-pieces |
Yeah, I am testing now! |
The whole flow
Cursor do mcp call
cli do some checks (POS is running or not ltm and version compatibility)
"Please update PiecesOS to a compatible version to be able to run the tool call. run 'pieces update' to get the latest version. then retry your request again after updating.",
or
"Please update the CLI version to be able to run the tool call, run 'pieces manage update' to get the latest version. then retry your request again after updating.",
This helps the agents in general to know which command to for less friction (the agent will know which command to run to proceed)
Won't merge until this is fixed___: https://github.com/pieces-app/os_server/issues/2043