44import os
55import subprocess
66from typing import Dict , Optional
7+ from cachetools import TTLCache
78from loguru import logger
9+ from ack_cluster_handler import parse_master_url
810from models import KubectlOutput
911
1012
13+ class KubectlContextManager (TTLCache ):
14+ """基于 TTL+LRU 缓存的 kubeconfig 文件管理器"""
15+
16+ def __init__ (self , ttl_minutes : int = 60 ):
17+ """初始化上下文管理器
18+
19+ Args:
20+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
21+ """
22+ # 初始化 TTL+LRU 缓存
23+ super ().__init__ (maxsize = 50 , ttl = ttl_minutes * 60 ) # TTL 以秒为单位,提前5min
24+
25+ self ._cs_client = None # CS客户端实例
26+
27+ # 使用 .kube 目录存储 kubeconfig 文件
28+ self ._kube_dir = os .path .expanduser ("~/.kube" )
29+ os .makedirs (self ._kube_dir , exist_ok = True )
30+
31+ self ._setup_cleanup_handlers ()
32+
33+ def _setup_cleanup_handlers (self ):
34+ """设置清理处理器"""
35+ import atexit
36+ import signal
37+
38+ def cleanup_contexts ():
39+ """清理所有上下文"""
40+ try :
41+ context_manager = get_context_manager ()
42+ if context_manager :
43+ context_manager .cleanup ()
44+ else :
45+ self .cleanup_all_mcp_files ()
46+ except Exception as e :
47+ logger .error (f"Cleanup failed: { e } " )
48+ raise e
49+
50+ def signal_handler (signum , frame ):
51+ """信号处理器"""
52+ cleanup_contexts ()
53+ exit (0 )
54+
55+ atexit .register (cleanup_contexts )
56+ signal .signal (signal .SIGINT , signal_handler )
57+ signal .signal (signal .SIGTERM , signal_handler )
58+
59+ def cleanup_all_mcp_files (self ):
60+ """类方法:清理所有MCP创建的kubeconfig文件(安全清理)"""
61+ try :
62+ kube_dir = os .path .expanduser ("~/.kube" )
63+ if not os .path .exists (kube_dir ):
64+ return
65+
66+ removed_count = 0
67+ for filename in os .listdir (kube_dir ):
68+ if filename .startswith ("mcp-kubeconfig-" ) and filename .endswith (".yaml" ):
69+ file_path = os .path .join (kube_dir , filename )
70+ try :
71+ os .remove (file_path )
72+ removed_count += 1
73+ except Exception :
74+ pass
75+
76+ if removed_count > 0 :
77+ print (f"Cleaned up { removed_count } MCP kubeconfig files" )
78+ except Exception :
79+ pass
80+
81+ def _get_or_create_kubeconfig_file (self , cluster_id : str ) -> str :
82+ """获取或创建集群的 kubeconfig 文件
83+
84+ Args:
85+ cluster_id: 集群ID
86+
87+ Returns:
88+ kubeconfig 文件路径
89+ """
90+ # 检查缓存中是否已存在
91+ if cluster_id in self :
92+ logger .debug (f"Found cached kubeconfig for cluster { cluster_id } " )
93+ return self [cluster_id ]
94+
95+ # 创建新的 kubeconfig 文件
96+ kubeconfig_content = self ._get_kubeconfig_from_ack (cluster_id , int (self .ttl / 60 )) # 转换为分钟
97+ if not kubeconfig_content :
98+ raise ValueError (f"Failed to get kubeconfig for cluster { cluster_id } " )
99+
100+ # 创建 kubeconfig 文件
101+ kubeconfig_path = os .path .join (self ._kube_dir , f"mcp-kubeconfig-{ cluster_id } .yaml" )
102+
103+ # 确保目录存在
104+ os .makedirs (os .path .dirname (kubeconfig_path ), exist_ok = True )
105+
106+ with open (kubeconfig_path , 'w' ) as f :
107+ f .write (kubeconfig_content )
108+
109+ # 添加到缓存
110+ self [cluster_id ] = kubeconfig_path
111+ return kubeconfig_path
112+
113+ def popitem (self ):
114+ """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件"""
115+ key , path = super ().popitem ()
116+ # 删除 kubeconfig 文件
117+ if path and os .path .exists (path ):
118+ try :
119+ os .remove (path )
120+ logger .debug (f"Removed cached kubeconfig file: { path } " )
121+ except Exception as e :
122+ logger .warning (f"Failed to remove cached kubeconfig file { path } : { e } " )
123+
124+ return key , path
125+
126+ def cleanup (self ):
127+ """清理资源,删除所有 MCP 创建的 kubeconfig 文件和缓存"""
128+ removed_count = 0
129+ for key , path in list (self .items ()):
130+ if path and os .path .exists (path ):
131+ try :
132+ os .remove (path )
133+ removed_count += 1
134+ except Exception :
135+ pass
136+ self .clear ()
137+ print (f"Cleaned up { removed_count } kubeconfig files" )
138+
139+ def set_cs_client (self , cs_client ):
140+ """设置CS客户端
141+
142+ Args:
143+ cs_client: CS客户端实例
144+ """
145+ self ._cs_client = cs_client
146+
147+ def _get_cs_client (self ):
148+ """获取CS客户端"""
149+ if not self ._cs_client :
150+ raise ValueError ("CS client not set" )
151+ return self ._cs_client
152+
153+ def _get_kubeconfig_from_ack (self , cluster_id : str , ttl_minutes : int = 60 ) -> Optional [str ]:
154+ """通过ACK API获取kubeconfig配置
155+
156+ Args:
157+ cluster_id: 集群ID
158+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
159+ """
160+ try :
161+ # 获取CS客户端
162+ cs_client = self ._get_cs_client ()
163+ from alibabacloud_cs20151215 import models as cs_models
164+
165+ # 先检查集群详情,确认是否有公网端点
166+ detail_response = cs_client .describe_cluster_detail (cluster_id )
167+
168+ if not detail_response or not detail_response .body :
169+ raise ValueError (f"Failed to get cluster details for { cluster_id } " )
170+
171+ cluster_info = detail_response .body
172+ # 检查是否有公网API Server端点
173+ master_url_str = getattr (cluster_info , 'master_url' , '' )
174+ master_url = parse_master_url (master_url_str )
175+ if not master_url ["api_server_endpoint" ]:
176+ raise ValueError (f"Cluster { cluster_id } does not have public endpoint access, "
177+ f"Please enable public endpoint access setting first." )
178+
179+ # 调用DescribeClusterUserKubeconfig API
180+ request = cs_models .DescribeClusterUserKubeconfigRequest (
181+ private_ip_address = False , # 获取公网连接配置
182+ temporary_duration_minutes = ttl_minutes , # 使用传入的TTL
183+ )
184+
185+ response = cs_client .describe_cluster_user_kubeconfig (cluster_id , request )
186+
187+ if response and response .body and response .body .config :
188+ logger .info (f"Successfully fetched kubeconfig for cluster { cluster_id } (TTL: { ttl_minutes } minutes)" )
189+ return response .body .config
190+ else :
191+ logger .warning (f"No kubeconfig found for cluster { cluster_id } " )
192+ return None
193+
194+ except Exception as e :
195+ logger .error (f"Failed to fetch kubeconfig for cluster { cluster_id } : { e } " )
196+ raise e
197+
198+ def get_kubeconfig_path (self , cluster_id : str ) -> str :
199+ """获取集群的 kubeconfig 文件路径
200+
201+ Args:
202+ cluster_id: 集群ID
203+
204+ Returns:
205+ kubeconfig 文件路径
206+ """
207+ return self ._get_or_create_kubeconfig_file (cluster_id )
208+
209+
210+ # 全局上下文管理器实例
211+ _context_manager : Optional [KubectlContextManager ] = None
212+
213+
214+ def get_context_manager (ttl_minutes : int = 60 ) -> KubectlContextManager :
215+ """获取全局上下文管理器实例
216+
217+ Args:
218+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
219+ """
220+ global _context_manager
221+ if _context_manager is None :
222+ _context_manager = KubectlContextManager (ttl_minutes = ttl_minutes )
223+ return _context_manager
224+
225+
11226class KubectlHandler :
12227 """Handler for running kubectl commands via a FastMCP tool."""
13228
@@ -31,20 +246,29 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None):
31246
32247 self ._register_tools ()
33248
34- def _get_kubeconfig_path (self ) -> str :
35- """获取本机kubeconfig文件路径"""
36- # 优先使用KUBECONFIG环境变量
37- kubeconfig_path = os .environ .get ('KUBECONFIG' )
38- if kubeconfig_path and os .path .exists (kubeconfig_path ):
39- return kubeconfig_path
40-
41- # 使用默认路径
42- default_path = os .path .expanduser ("~/.kube/config" )
43- if os .path .exists (default_path ):
44- return default_path
45-
46- # 如果都不存在,返回默认路径让kubectl处理
47- return default_path
249+ def _setup_cs_client (self , ctx : Context ):
250+ """设置CS客户端(仅在需要时)"""
251+ try :
252+ # 检查是否已经设置过
253+ if hasattr (get_context_manager (), '_cs_client' ) and get_context_manager ()._cs_client :
254+ return
255+
256+ lifespan_context = ctx .request_context .lifespan_context
257+ if isinstance (lifespan_context , dict ):
258+ providers = lifespan_context .get ("providers" , {})
259+ else :
260+ providers = getattr (lifespan_context , "providers" , {})
261+
262+ cs_client_factory = providers .get ("cs_client_factory" )
263+ if cs_client_factory :
264+ # 传入统一签名所需的 config
265+ config = lifespan_context .get ("config" , {}) if isinstance (lifespan_context , dict ) else {}
266+ get_context_manager ().set_cs_client (cs_client_factory ("CENTER" , config ))
267+ logger .debug ("CS client factory set successfully" )
268+ else :
269+ logger .warning ("cs_client not available in lifespan context" )
270+ except Exception as e :
271+ logger .error (f"Failed to setup CS client: { e } " )
48272
49273 def is_write_command (self , command : str ) -> tuple [bool , Optional [str ]]:
50274 """检查是否为可写命令
@@ -241,12 +465,14 @@ def _register_tools(self):
241465
242466 @self .server .tool (
243467 name = "ack_kubectl" ,
244- description = "Execute kubectl command using local kubeconfig file ."
468+ description = "Execute kubectl command with intelligent context management for ACK clusters. You MUST use this tool when accessing ACK clusters. Using external shells or any other kubectl entrypoints is NOT allowed. Supports cluster_id for automatic context switching and creation ."
245469 )
246470 async def ack_kubectl (
247471 ctx : Context ,
248472 command : str = Field (
249- ..., description = """Arguments after 'kubectl', e.g. 'get pods -A', 'config get-contexts', 'config use-context <name>'. Don't include the kubectl prefix.
473+ ..., description = """IMPORTANT POLICY: When accessing ACK clusters, you MUST use this tool. Do NOT invoke kubectl via shell or any external mechanism.
474+
475+ Arguments after 'kubectl', e.g. 'get pods -A', 'config get-contexts', 'config use-context <name>'. Don't include the kubectl prefix.
250476
251477IMPORTANT: Do not use interactive commands. Instead:
252478- Use 'kubectl get -o yaml', 'kubectl patch', or 'kubectl apply' instead of 'kubectl edit'
@@ -278,9 +504,17 @@ async def ack_kubectl(
278504user: I need to execute a command in the pod
279505assistant: exec my-pod -- /bin/sh -c "your command here"""
280506 ),
507+ cluster_id : str = Field (
508+ ..., description = "The ID of the Kubernetes cluster to query. If specified, will auto find/create "
509+ "and switch to appropriate context. If you are not sure of cluster id, "
510+ "please use the list_clusters tool to get it first."
511+ ),
281512 ) -> KubectlOutput :
282513
283514 try :
515+ # 设置CS客户端
516+ self ._setup_cs_client (ctx )
517+
284518 # 检查是否为只读模式
285519 if not self .allow_write :
286520 is_write_command , not_allow_write_error = self .is_write_command (command )
@@ -302,8 +536,9 @@ async def ack_kubectl(
302536 exit_code = 1
303537 )
304538
305- # 获取本机 kubeconfig 文件路径
306- kubeconfig_path = self ._get_kubeconfig_path ()
539+ # 获取 kubeconfig 文件路径
540+ context_manager = get_context_manager ()
541+ kubeconfig_path = context_manager .get_kubeconfig_path (cluster_id )
307542
308543 # 检查是否为流式命令
309544 is_streaming , stream_type = self .is_streaming_command (command )
0 commit comments