44import os
55import subprocess
66from typing import Dict , Optional
7+ from cachetools import TTLCache
78from loguru import logger
9+ from ack_cluster_handler import parse_master_url
810from models import KubectlOutput
911
1012
13+ class KubectlContextManager (TTLCache ):
14+ """基于 TTL+LRU 缓存的 kubeconfig 文件管理器"""
15+
16+ def __init__ (self , ttl_minutes : int = 60 ):
17+ """初始化上下文管理器
18+
19+ Args:
20+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
21+ """
22+ # 初始化 TTL+LRU 缓存
23+ super ().__init__ (maxsize = 50 , ttl = ttl_minutes * 60 ) # TTL 以秒为单位,提前5min
24+
25+ self ._cs_client = None # CS客户端实例
26+
27+ # 使用 .kube 目录存储 kubeconfig 文件
28+ self ._kube_dir = os .path .expanduser ("~/.kube" )
29+ os .makedirs (self ._kube_dir , exist_ok = True )
30+
31+ self ._setup_cleanup_handlers ()
32+
33+ def _setup_cleanup_handlers (self ):
34+ """设置清理处理器"""
35+ import atexit
36+ import signal
37+
38+ def cleanup_contexts ():
39+ """清理所有上下文"""
40+ try :
41+ context_manager = get_context_manager ()
42+ if context_manager :
43+ context_manager .cleanup ()
44+ else :
45+ self .cleanup_all_mcp_files ()
46+ except Exception as e :
47+ logger .error (f"Cleanup failed: { e } " )
48+ raise e
49+
50+ def signal_handler (signum , frame ):
51+ """信号处理器"""
52+ cleanup_contexts ()
53+ exit (0 )
54+
55+ atexit .register (cleanup_contexts )
56+ signal .signal (signal .SIGINT , signal_handler )
57+ signal .signal (signal .SIGTERM , signal_handler )
58+
59+ def cleanup_all_mcp_files (self ):
60+ """类方法:清理所有MCP创建的kubeconfig文件(安全清理)"""
61+ try :
62+ kube_dir = os .path .expanduser ("~/.kube" )
63+ if not os .path .exists (kube_dir ):
64+ return
65+
66+ removed_count = 0
67+ for filename in os .listdir (kube_dir ):
68+ if filename .startswith ("mcp-kubeconfig-" ) and filename .endswith (".yaml" ):
69+ file_path = os .path .join (kube_dir , filename )
70+ try :
71+ os .remove (file_path )
72+ removed_count += 1
73+ except Exception :
74+ pass
75+
76+ if removed_count > 0 :
77+ print (f"Cleaned up { removed_count } MCP kubeconfig files" )
78+ except Exception :
79+ pass
80+
81+ def _get_or_create_kubeconfig_file (self , cluster_id : str ) -> str :
82+ """获取或创建集群的 kubeconfig 文件
83+
84+ Args:
85+ cluster_id: 集群ID
86+
87+ Returns:
88+ kubeconfig 文件路径
89+ """
90+ # 检查缓存中是否已存在
91+ if cluster_id in self :
92+ logger .debug (f"Found cached kubeconfig for cluster { cluster_id } " )
93+ return self [cluster_id ]
94+
95+ # 创建新的 kubeconfig 文件
96+ kubeconfig_content = self ._get_kubeconfig_from_ack (cluster_id , int (self .ttl / 60 )) # 转换为分钟
97+ if not kubeconfig_content :
98+ raise ValueError (f"Failed to get kubeconfig for cluster { cluster_id } " )
99+
100+ # 创建 kubeconfig 文件
101+ kubeconfig_path = os .path .join (self ._kube_dir , f"mcp-kubeconfig-{ cluster_id } .yaml" )
102+
103+ # 确保目录存在
104+ os .makedirs (os .path .dirname (kubeconfig_path ), exist_ok = True )
105+
106+ with open (kubeconfig_path , 'w' ) as f :
107+ f .write (kubeconfig_content )
108+
109+ # 添加到缓存
110+ self [cluster_id ] = kubeconfig_path
111+ return kubeconfig_path
112+
113+ def popitem (self ):
114+ """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件"""
115+ key , path = super ().popitem ()
116+ # 删除 kubeconfig 文件
117+ if path and os .path .exists (path ):
118+ try :
119+ os .remove (path )
120+ logger .debug (f"Removed cached kubeconfig file: { path } " )
121+ except Exception as e :
122+ logger .warning (f"Failed to remove cached kubeconfig file { path } : { e } " )
123+
124+ return key , path
125+
126+ def cleanup (self ):
127+ """清理资源,删除所有 MCP 创建的 kubeconfig 文件和缓存"""
128+ removed_count = 0
129+ for key , path in list (self .items ()):
130+ if path and os .path .exists (path ):
131+ try :
132+ os .remove (path )
133+ removed_count += 1
134+ except Exception :
135+ pass
136+ self .clear ()
137+ print (f"Cleaned up { removed_count } kubeconfig files" )
138+
139+ def set_cs_client (self , cs_client ):
140+ """设置CS客户端
141+
142+ Args:
143+ cs_client: CS客户端实例
144+ """
145+ self ._cs_client = cs_client
146+
147+ def _get_cs_client (self ):
148+ """获取CS客户端"""
149+ if not self ._cs_client :
150+ raise ValueError ("CS client not set" )
151+ return self ._cs_client
152+
153+ def _get_kubeconfig_from_ack (self , cluster_id : str , ttl_minutes : int = 60 ) -> Optional [str ]:
154+ """通过ACK API获取kubeconfig配置
155+
156+ Args:
157+ cluster_id: 集群ID
158+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
159+ """
160+ try :
161+ # 获取CS客户端
162+ cs_client = self ._get_cs_client ()
163+ from alibabacloud_cs20151215 import models as cs_models
164+
165+ # 先检查集群详情,确认是否有公网端点
166+ detail_response = cs_client .describe_cluster_detail (cluster_id )
167+
168+ if not detail_response or not detail_response .body :
169+ raise ValueError (f"Failed to get cluster details for { cluster_id } " )
170+
171+ cluster_info = detail_response .body
172+ # 检查是否有公网API Server端点
173+ master_url_str = getattr (cluster_info , 'master_url' , '' )
174+ master_url = parse_master_url (master_url_str )
175+ if not master_url ["api_server_endpoint" ]:
176+ raise ValueError (f"Cluster { cluster_id } does not have public endpoint access, "
177+ f"Please enable public endpoint access setting first." )
178+
179+ # 调用DescribeClusterUserKubeconfig API
180+ request = cs_models .DescribeClusterUserKubeconfigRequest (
181+ private_ip_address = False , # 获取公网连接配置
182+ temporary_duration_minutes = ttl_minutes , # 使用传入的TTL
183+ )
184+
185+ response = cs_client .describe_cluster_user_kubeconfig (cluster_id , request )
186+
187+ if response and response .body and response .body .config :
188+ logger .info (f"Successfully fetched kubeconfig for cluster { cluster_id } (TTL: { ttl_minutes } minutes)" )
189+ return response .body .config
190+ else :
191+ logger .warning (f"No kubeconfig found for cluster { cluster_id } " )
192+ return None
193+
194+ except Exception as e :
195+ logger .error (f"Failed to fetch kubeconfig for cluster { cluster_id } : { e } " )
196+ raise e
197+
198+ def get_kubeconfig_path (self , cluster_id : str ) -> str :
199+ """获取集群的 kubeconfig 文件路径
200+
201+ Args:
202+ cluster_id: 集群ID
203+
204+ Returns:
205+ kubeconfig 文件路径
206+ """
207+ return self ._get_or_create_kubeconfig_file (cluster_id )
208+
209+
210+ # 全局上下文管理器实例
211+ _context_manager : Optional [KubectlContextManager ] = None
212+
213+
214+ def get_context_manager (ttl_minutes : int = 60 ) -> KubectlContextManager :
215+ """获取全局上下文管理器实例
216+
217+ Args:
218+ ttl_minutes: kubeconfig有效期(分钟),默认60分钟
219+ """
220+ global _context_manager
221+ if _context_manager is None :
222+ _context_manager = KubectlContextManager (ttl_minutes = ttl_minutes )
223+ return _context_manager
224+
225+
11226class KubectlHandler :
12227 """Handler for running kubectl commands via a FastMCP tool."""
13228
@@ -31,20 +246,29 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None):
31246
32247 self ._register_tools ()
33248
34- def _get_kubeconfig_path (self ) -> str :
35- """获取本机kubeconfig文件路径"""
36- # 优先使用KUBECONFIG环境变量
37- kubeconfig_path = os .environ .get ('KUBECONFIG' )
38- if kubeconfig_path and os .path .exists (kubeconfig_path ):
39- return kubeconfig_path
40-
41- # 使用默认路径
42- default_path = os .path .expanduser ("~/.kube/config" )
43- if os .path .exists (default_path ):
44- return default_path
45-
46- # 如果都不存在,返回默认路径让kubectl处理
47- return default_path
249+ def _setup_cs_client (self , ctx : Context ):
250+ """设置CS客户端(仅在需要时)"""
251+ try :
252+ # 检查是否已经设置过
253+ if hasattr (get_context_manager (), '_cs_client' ) and get_context_manager ()._cs_client :
254+ return
255+
256+ lifespan_context = ctx .request_context .lifespan_context
257+ if isinstance (lifespan_context , dict ):
258+ providers = lifespan_context .get ("providers" , {})
259+ else :
260+ providers = getattr (lifespan_context , "providers" , {})
261+
262+ cs_client_factory = providers .get ("cs_client_factory" )
263+ if cs_client_factory :
264+ # 传入统一签名所需的 config
265+ config = lifespan_context .get ("config" , {}) if isinstance (lifespan_context , dict ) else {}
266+ get_context_manager ().set_cs_client (cs_client_factory ("CENTER" , config ))
267+ logger .debug ("CS client factory set successfully" )
268+ else :
269+ logger .warning ("cs_client not available in lifespan context" )
270+ except Exception as e :
271+ logger .error (f"Failed to setup CS client: { e } " )
48272
49273 def is_write_command (self , command : str ) -> tuple [bool , Optional [str ]]:
50274 """检查是否为可写命令
@@ -280,9 +504,17 @@ async def ack_kubectl(
280504user: I need to execute a command in the pod
281505assistant: exec my-pod -- /bin/sh -c "your command here"""
282506 ),
507+ cluster_id : str = Field (
508+ ..., description = "The ID of the Kubernetes cluster to query. If specified, will auto find/create "
509+ "and switch to appropriate context. If you are not sure of cluster id, "
510+ "please use the list_clusters tool to get it first."
511+ ),
283512 ) -> KubectlOutput :
284513
285514 try :
515+ # 设置CS客户端
516+ self ._setup_cs_client (ctx )
517+
286518 # 检查是否为只读模式
287519 if not self .allow_write :
288520 is_write_command , not_allow_write_error = self .is_write_command (command )
@@ -304,8 +536,9 @@ async def ack_kubectl(
304536 exit_code = 1
305537 )
306538
307- # 获取本机 kubeconfig 文件路径
308- kubeconfig_path = self ._get_kubeconfig_path ()
539+ # 获取 kubeconfig 文件路径
540+ context_manager = get_context_manager ()
541+ kubeconfig_path = context_manager .get_kubeconfig_path (cluster_id )
309542
310543 # 检查是否为流式命令
311544 is_streaming , stream_type = self .is_streaming_command (command )
0 commit comments