Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 86 additions & 81 deletions crewai_tools/adapters/mcp_adapter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional, Type

from crewai.tools import BaseTool
from crewai_tools.adapters.tool_collection import ToolCollection
"""
MCPServer for CrewAI.


"""
logger = logging.getLogger(__name__)

if TYPE_CHECKING:
Expand All @@ -29,107 +25,98 @@


class MCPServerAdapter:
"""Manages the lifecycle of an MCP server and make its tools available to CrewAI.
"""Manages the lifecycle of an MCP server and makes its tools available to CrewAI.

Note: tools can only be accessed after the server has been started with the
`start()` method.
This adapter handles starting and stopping the MCP server, converting its
capabilities into CrewAI tools. It is best used as a context manager (`with`
statement) to ensure resources are properly cleaned up.

Attributes:
tools: The CrewAI tools available from the MCP server.

Usage:
# context manager + stdio
with MCPServerAdapter(...) as tools:
# tools is now available

# context manager + sse
with MCPServerAdapter({"url": "http://localhost:8000/sse"}) as tools:
# tools is now available

# context manager with filtered tools
with MCPServerAdapter(..., "tool1", "tool2") as filtered_tools:
# only tool1 and tool2 are available

# context manager with custom connect timeout (60 seconds)
with MCPServerAdapter(..., connect_timeout=60) as tools:
# tools is now available with longer timeout

# manually stop mcp server
try:
mcp_server = MCPServerAdapter(...)
tools = mcp_server.tools # all tools

# or with filtered tools and custom timeout
mcp_server = MCPServerAdapter(..., "tool1", "tool2", connect_timeout=45)
filtered_tools = mcp_server.tools # only tool1 and tool2
...
finally:
mcp_server.stop()

# Best practice is ensure cleanup is done after use.
mcp_server.stop() # run after crew().kickoff()
tools: A ToolCollection of the available CrewAI tools. Accessing this
before the server is ready will raise a ValueError.
"""

def __init__(
self,
serverparams: StdioServerParameters | dict[str, Any],
*tool_names: str,
connect_timeout: int = 30,
):
"""Initialize the MCP Server
) -> None:
"""Initialize and start the MCP Server.

Example:
.. code-block:: python

# For a server communicating over standard I/O
from mcp import StdioServerParameters
stdio_params = StdioServerParameters(command="python", args=["-c", "..."])
with MCPServerAdapter(stdio_params) as tools:
# use tools

# For a server communicating over SSE (Server-Sent Events)
sse_params = {"url": "http://localhost:8000/sse"}
with MCPServerAdapter(sse_params, connect_timeout=60) as tools:
# use tools

Args:
serverparams: The parameters for the MCP server it supports either a
`StdioServerParameters` or a `dict` respectively for STDIO and SSE.
serverparams: The parameters for the MCP server. This supports either a
`StdioServerParameters` object for STDIO or a `dict` for SSE connections.
*tool_names: Optional names of tools to filter. If provided, only tools with
matching names will be available.
connect_timeout: Connection timeout in seconds to the MCP server (default is 30s).

connect_timeout: Connection timeout in seconds to the MCP server.
Defaults to 30.
"""

super().__init__()
self._adapter = None
self._tools = None
self._tool_names = list(tool_names) if tool_names else None

if not MCP_AVAILABLE:
import click

if click.confirm(
"You are missing the 'mcp' package. Would you like to install it?"
):
import subprocess

try:
subprocess.run(["uv", "add", "mcp crewai-tools[mcp]"], check=True)

except subprocess.CalledProcessError:
raise ImportError("Failed to install mcp package")
else:
raise ImportError(
"`mcp` package not found, please run `uv add crewai-tools[mcp]`"
)
msg = (
"MCP is not available. The 'mcp' package, a required dependency, "
"must be installed for MCPServerAdapter to work."
)
logger.critical(msg)
raise ImportError(
"`mcp` package not found. Please install it with:\n"
" pip install mcp crewai-tools[mcp]"
)

try:
self._serverparams = serverparams
self._adapter = MCPAdapt(self._serverparams, CrewAIAdapter(), connect_timeout)
self.start()

except Exception as e:
logger.exception("Failed to initialize MCP Adapter during __init__.")
if self._adapter is not None:
try:
self.stop()
except Exception as stop_e:
logger.error(f"Error during stop cleanup: {stop_e}")
logger.error(f"Error during post-failure cleanup: {stop_e}")
raise RuntimeError(f"Failed to initialize MCP Adapter: {e}") from e

def start(self):
def start(self) -> None:
"""Start the MCP server and initialize the tools."""
if not self._adapter:
raise RuntimeError("Cannot start MCP server: Adapter is not initialized.")
if self._tools:
logger.debug("MCP server already started.")
return
self._tools = self._adapter.__enter__()

def stop(self):
"""Stop the MCP server"""
self._adapter.__exit__(None, None, None)
def stop(self) -> None:
"""Stop the MCP server and release all associated resources.

This method is idempotent; calling it multiple times has no effect.
"""
if not self._adapter:
logger.debug("stop() called but adapter is already stopped.")
return

try:
self._adapter.__exit__(None, None, None)
finally:
self._tools = None
self._adapter = None

@property
def tools(self) -> ToolCollection[BaseTool]:
Expand All @@ -139,25 +126,43 @@ def tools(self) -> ToolCollection[BaseTool]:
ValueError: If the MCP server is not started.

Returns:
The CrewAI tools available from the MCP server.
A ToolCollection of the available CrewAI tools.
"""
if self._tools is None:
raise ValueError(
"MCP server not started, run `mcp_server.start()` first before accessing `tools`"
"MCP tools are not available. The server may be stopped or initialization failed."
)

tools_collection = ToolCollection(self._tools)
if self._tool_names:
return tools_collection.filter_by_names(self._tool_names)
return tools_collection

def __enter__(self):
"""
Enter the context manager. Note that `__init__()` already starts the MCP server.
So tools should already be available.
"""
def __enter__(self) -> ToolCollection[BaseTool]:
"""Enter the context manager, returning the initialized tools."""
return self.tools

def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager."""
return self._adapter.__exit__(exc_type, exc_value, traceback)
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[Any],
) -> bool:
"""Exit the context manager, stop the server, and do not suppress exceptions."""
self.stop()
return False # Ensures any exceptions that occurred are re-raised.

def __del__(self) -> None:
"""
Finalizer to attempt cleanup if the user forgets to call stop() or use a
context manager.

Note: This is a fallback and should not be relied upon, as Python does
not guarantee __del__ will always be called on object destruction.
"""
if self._adapter:
logger.warning(
"MCPServerAdapter was not cleanly shut down. Please use a "
"context manager (`with` statement) or call .stop() explicitly."
)
self.stop()
Loading