44import os
55import subprocess
66from typing import Dict , Optional
7- from cachetools import TTLCache
87from loguru import logger
9- from ack_cluster_handler import parse_master_url
108from models import KubectlOutput
119
1210
13- class KubectlContextManager (TTLCache ):
14- """基于 TTL+LRU 缓存的 kubeconfig 文件管理器"""
15-
16- def __init__ (self , ttl_minutes : int = 60 ):
17- """初始化上下文管理器
18-
19- Args:
20- ttl_minutes: kubeconfig有效期(分钟),默认60分钟
21- """
22- # 初始化 TTL+LRU 缓存
23- super ().__init__ (maxsize = 50 , ttl = ttl_minutes * 60 ) # TTL 以秒为单位,提前5min
24-
25- self ._cs_client = None # CS客户端实例
26-
27- # 使用 .kube 目录存储 kubeconfig 文件
28- self ._kube_dir = os .path .expanduser ("~/.kube" )
29- os .makedirs (self ._kube_dir , exist_ok = True )
30-
31- self ._setup_cleanup_handlers ()
32-
33- def _setup_cleanup_handlers (self ):
34- """设置清理处理器"""
35- import atexit
36- import signal
37-
38- def cleanup_contexts ():
39- """清理所有上下文"""
40- try :
41- context_manager = get_context_manager ()
42- if context_manager :
43- context_manager .cleanup ()
44- else :
45- self .cleanup_all_mcp_files ()
46- except Exception as e :
47- logger .error (f"Cleanup failed: { e } " )
48- raise e
49-
50- def signal_handler (signum , frame ):
51- """信号处理器"""
52- cleanup_contexts ()
53- exit (0 )
54-
55- atexit .register (cleanup_contexts )
56- signal .signal (signal .SIGINT , signal_handler )
57- signal .signal (signal .SIGTERM , signal_handler )
58-
59- def cleanup_all_mcp_files (self ):
60- """类方法:清理所有MCP创建的kubeconfig文件(安全清理)"""
61- try :
62- kube_dir = os .path .expanduser ("~/.kube" )
63- if not os .path .exists (kube_dir ):
64- return
65-
66- removed_count = 0
67- for filename in os .listdir (kube_dir ):
68- if filename .startswith ("mcp-kubeconfig-" ) and filename .endswith (".yaml" ):
69- file_path = os .path .join (kube_dir , filename )
70- try :
71- os .remove (file_path )
72- removed_count += 1
73- except Exception :
74- pass
75-
76- if removed_count > 0 :
77- print (f"Cleaned up { removed_count } MCP kubeconfig files" )
78- except Exception :
79- pass
80-
81- def _get_or_create_kubeconfig_file (self , cluster_id : str ) -> str :
82- """获取或创建集群的 kubeconfig 文件
83-
84- Args:
85- cluster_id: 集群ID
86-
87- Returns:
88- kubeconfig 文件路径
89- """
90- # 检查缓存中是否已存在
91- if cluster_id in self :
92- logger .debug (f"Found cached kubeconfig for cluster { cluster_id } " )
93- return self [cluster_id ]
94-
95- # 创建新的 kubeconfig 文件
96- kubeconfig_content = self ._get_kubeconfig_from_ack (cluster_id , int (self .ttl / 60 )) # 转换为分钟
97- if not kubeconfig_content :
98- raise ValueError (f"Failed to get kubeconfig for cluster { cluster_id } " )
99-
100- # 创建 kubeconfig 文件
101- kubeconfig_path = os .path .join (self ._kube_dir , f"mcp-kubeconfig-{ cluster_id } .yaml" )
102-
103- # 确保目录存在
104- os .makedirs (os .path .dirname (kubeconfig_path ), exist_ok = True )
105-
106- with open (kubeconfig_path , 'w' ) as f :
107- f .write (kubeconfig_content )
108-
109- # 添加到缓存
110- self [cluster_id ] = kubeconfig_path
111- return kubeconfig_path
112-
113- def popitem (self ):
114- """重写 popitem 方法,在驱逐缓存项时清理 kubeconfig 文件"""
115- key , path = super ().popitem ()
116- # 删除 kubeconfig 文件
117- if path and os .path .exists (path ):
118- try :
119- os .remove (path )
120- logger .debug (f"Removed cached kubeconfig file: { path } " )
121- except Exception as e :
122- logger .warning (f"Failed to remove cached kubeconfig file { path } : { e } " )
123-
124- return key , path
125-
126- def cleanup (self ):
127- """清理资源,删除所有 MCP 创建的 kubeconfig 文件和缓存"""
128- removed_count = 0
129- for key , path in list (self .items ()):
130- if path and os .path .exists (path ):
131- try :
132- os .remove (path )
133- removed_count += 1
134- except Exception :
135- pass
136- self .clear ()
137- print (f"Cleaned up { removed_count } kubeconfig files" )
138-
139- def set_cs_client (self , cs_client ):
140- """设置CS客户端
141-
142- Args:
143- cs_client: CS客户端实例
144- """
145- self ._cs_client = cs_client
146-
147- def _get_cs_client (self ):
148- """获取CS客户端"""
149- if not self ._cs_client :
150- raise ValueError ("CS client not set" )
151- return self ._cs_client
152-
153- def _get_kubeconfig_from_ack (self , cluster_id : str , ttl_minutes : int = 60 ) -> Optional [str ]:
154- """通过ACK API获取kubeconfig配置
155-
156- Args:
157- cluster_id: 集群ID
158- ttl_minutes: kubeconfig有效期(分钟),默认60分钟
159- """
160- try :
161- # 获取CS客户端
162- cs_client = self ._get_cs_client ()
163- from alibabacloud_cs20151215 import models as cs_models
164-
165- # 先检查集群详情,确认是否有公网端点
166- detail_response = cs_client .describe_cluster_detail (cluster_id )
167-
168- if not detail_response or not detail_response .body :
169- raise ValueError (f"Failed to get cluster details for { cluster_id } " )
170-
171- cluster_info = detail_response .body
172- # 检查是否有公网API Server端点
173- master_url_str = getattr (cluster_info , 'master_url' , '' )
174- master_url = parse_master_url (master_url_str )
175- if not master_url ["api_server_endpoint" ]:
176- raise ValueError (f"Cluster { cluster_id } does not have public endpoint access, "
177- f"Please enable public endpoint access setting first." )
178-
179- # 调用DescribeClusterUserKubeconfig API
180- request = cs_models .DescribeClusterUserKubeconfigRequest (
181- private_ip_address = False , # 获取公网连接配置
182- temporary_duration_minutes = ttl_minutes , # 使用传入的TTL
183- )
184-
185- response = cs_client .describe_cluster_user_kubeconfig (cluster_id , request )
186-
187- if response and response .body and response .body .config :
188- logger .info (f"Successfully fetched kubeconfig for cluster { cluster_id } (TTL: { ttl_minutes } minutes)" )
189- return response .body .config
190- else :
191- logger .warning (f"No kubeconfig found for cluster { cluster_id } " )
192- return None
193-
194- except Exception as e :
195- logger .error (f"Failed to fetch kubeconfig for cluster { cluster_id } : { e } " )
196- raise e
197-
198- def get_kubeconfig_path (self , cluster_id : str ) -> str :
199- """获取集群的 kubeconfig 文件路径
200-
201- Args:
202- cluster_id: 集群ID
203-
204- Returns:
205- kubeconfig 文件路径
206- """
207- return self ._get_or_create_kubeconfig_file (cluster_id )
208-
209-
210- # 全局上下文管理器实例
211- _context_manager : Optional [KubectlContextManager ] = None
212-
213-
214- def get_context_manager (ttl_minutes : int = 60 ) -> KubectlContextManager :
215- """获取全局上下文管理器实例
216-
217- Args:
218- ttl_minutes: kubeconfig有效期(分钟),默认60分钟
219- """
220- global _context_manager
221- if _context_manager is None :
222- _context_manager = KubectlContextManager (ttl_minutes = ttl_minutes )
223- return _context_manager
224-
225-
22611class KubectlHandler :
22712 """Handler for running kubectl commands via a FastMCP tool."""
22813
@@ -246,29 +31,20 @@ def __init__(self, server: FastMCP, settings: Optional[Dict[str, Any]] = None):
24631
24732 self ._register_tools ()
24833
249- def _setup_cs_client (self , ctx : Context ):
250- """设置CS客户端(仅在需要时)"""
251- try :
252- # 检查是否已经设置过
253- if hasattr (get_context_manager (), '_cs_client' ) and get_context_manager ()._cs_client :
254- return
255-
256- lifespan_context = ctx .request_context .lifespan_context
257- if isinstance (lifespan_context , dict ):
258- providers = lifespan_context .get ("providers" , {})
259- else :
260- providers = getattr (lifespan_context , "providers" , {})
261-
262- cs_client_factory = providers .get ("cs_client_factory" )
263- if cs_client_factory :
264- # 传入统一签名所需的 config
265- config = lifespan_context .get ("config" , {}) if isinstance (lifespan_context , dict ) else {}
266- get_context_manager ().set_cs_client (cs_client_factory ("CENTER" , config ))
267- logger .debug ("CS client factory set successfully" )
268- else :
269- logger .warning ("cs_client not available in lifespan context" )
270- except Exception as e :
271- logger .error (f"Failed to setup CS client: { e } " )
34+ def _get_kubeconfig_path (self ) -> str :
35+ """获取本机kubeconfig文件路径"""
36+ # 优先使用KUBECONFIG环境变量
37+ kubeconfig_path = os .environ .get ('KUBECONFIG' )
38+ if kubeconfig_path and os .path .exists (kubeconfig_path ):
39+ return kubeconfig_path
40+
41+ # 使用默认路径
42+ default_path = os .path .expanduser ("~/.kube/config" )
43+ if os .path .exists (default_path ):
44+ return default_path
45+
46+ # 如果都不存在,返回默认路径让kubectl处理
47+ return default_path
27248
27349 def is_write_command (self , command : str ) -> tuple [bool , Optional [str ]]:
27450 """检查是否为可写命令
@@ -465,8 +241,7 @@ def _register_tools(self):
465241
466242 @self .server .tool (
467243 name = "ack_kubectl" ,
468- description = "Execute kubectl command with intelligent context management. Supports cluster_id for "
469- "automatic context switching and creation."
244+ description = "Execute kubectl command using local kubeconfig file."
470245 )
471246 async def ack_kubectl (
472247 ctx : Context ,
@@ -503,17 +278,9 @@ async def ack_kubectl(
503278user: I need to execute a command in the pod
504279assistant: exec my-pod -- /bin/sh -c "your command here"""
505280 ),
506- cluster_id : str = Field (
507- ..., description = "The ID of the Kubernetes cluster to query. If specified, will auto find/create "
508- "and switch to appropriate context. If you are not sure of cluster id, "
509- "please use the list_clusters tool to get it first."
510- ),
511281 ) -> KubectlOutput :
512282
513283 try :
514- # 设置CS客户端
515- self ._setup_cs_client (ctx )
516-
517284 # 检查是否为只读模式
518285 if not self .allow_write :
519286 is_write_command , not_allow_write_error = self .is_write_command (command )
@@ -535,9 +302,8 @@ async def ack_kubectl(
535302 exit_code = 1
536303 )
537304
538- # 获取 kubeconfig 文件路径
539- context_manager = get_context_manager ()
540- kubeconfig_path = context_manager .get_kubeconfig_path (cluster_id )
305+ # 获取本机 kubeconfig 文件路径
306+ kubeconfig_path = self ._get_kubeconfig_path ()
541307
542308 # 检查是否为流式命令
543309 is_streaming , stream_type = self .is_streaming_command (command )
0 commit comments