diff --git a/framework/ServeTest/baseline_pic_mtp.txt b/framework/ServeTest/baseline_pic_mtp.txt new file mode 100644 index 0000000000..6d25e42c7d --- /dev/null +++ b/framework/ServeTest/baseline_pic_mtp.txt @@ -0,0 +1 @@ +The first image shows a bank branch with the signage "中国工商银行" (Industrial and Commercial Bank of China), and the second image also shows a bank branch with the signage "中国工商银行" (Industrial and Commercial Bank of China). Therefore, both images are related to Industrial and Commercial Bank of China.<|im_end|> \ No newline at end of file diff --git a/framework/ServeTest/baseline_text_mtp.txt b/framework/ServeTest/baseline_text_mtp.txt new file mode 100644 index 0000000000..a2eb82d977 --- /dev/null +++ b/framework/ServeTest/baseline_text_mtp.txt @@ -0,0 +1 @@ +“温故而知新”出自《论语·为政》,意为复习旧知识从而获得新的理解和体会。这一思想强调了学习的循环性和持续性,认为通过对已学内容的回顾和反思,可以深化对知识的理解,并从中发现新的价值和应用场景。在学习过程中,温故而知新不仅有助于巩固基础知识,还能培养批判性思维和创新能力,使学习更加高效和有意义。<|im_end|> \ No newline at end of file diff --git a/framework/ServeTest/deploy.py b/framework/ServeTest/deploy.py index aa305360b8..1f31fd37f8 100644 --- a/framework/ServeTest/deploy.py +++ b/framework/ServeTest/deploy.py @@ -7,6 +7,7 @@ import subprocess import sys import time +import traceback import requests import yaml @@ -58,10 +59,12 @@ def get_available_port(env_key: str, default_start: int): FD_API_PORT = get_available_port("FD_API_PORT", FLASK_PORT + 1) FD_ENGINE_QUEUE_PORT = get_available_port("FD_ENGINE_QUEUE_PORT", FD_API_PORT + 1) FD_METRICS_PORT = get_available_port("FD_METRICS_PORT", FD_ENGINE_QUEUE_PORT + 1) +FD_CACHE_QUEUE_PORT = get_available_port("FD_CACHE_QUEUE_PORT", FD_METRICS_PORT + 1) DEFAULT_PARAMS = { "--port": FD_API_PORT, "--engine-worker-queue-port": FD_ENGINE_QUEUE_PORT, "--metrics-port": FD_METRICS_PORT, + "--cache-queue-port": FD_CACHE_QUEUE_PORT, "--enable-logprob": True, } @@ -78,6 +81,7 @@ def build_command(config): # 添加配置参数 for key, value in config.items(): if "--enable" in key: + value = bool(value if isinstance(value, bool) else eval(value)) if value: cmd.append(key) else: @@ -175,19 +179,34 @@ def stop_server(signum=None, frame=None): # 终止进程组(包括所有子进程) os.killpg(os.getpgid(pid_port["PID"]), signal.SIGTERM) except Exception as e: - print(f"Failed to stop server: {e}") + print(f"Failed to stop server: {e}, {str(traceback.format_exc())}") + try: + result = subprocess.run( + f"ps -efww | grep '\-\-cache_queue_port {FD_CACHE_QUEUE_PORT}' | grep -v grep", shell=True, capture_output=True, text=True + ) + for line in result.stdout.strip().split("\n"): + if not line: + continue + parts = line.split() + pid = int(parts[1]) + print(f"Killing PID: {pid}") + os.kill(pid, signal.SIGKILL) + except Exception as e: + print(f"Failed to kill cache manager process: {e}, {str(traceback.format_exc())}") - for port in [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT]: + for port in [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]: try: output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() for pid in output.splitlines(): os.kill(int(pid), signal.SIGKILL) print(f"Killed process on port {port}, pid={pid}") except Exception as e: - print(f"Failed to killed process on port: {e}") + print(f"Failed to kill process on port: {e}, {str(traceback.format_exc())}") # 若log目录存在,则重命名为log_timestamp if os.path.isdir("./log"): os.rename("./log", "./log_{}".format(time.strftime("%Y%m%d%H%M%S"))) + if os.path.exists("gemm_profiles.json"): + os.remove("gemm_profiles.json") if signum: sys.exit(0) @@ -229,8 +248,10 @@ def start_service(): # 构建命令 cmd = build_command(final_config) except Exception as e: + error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -264,8 +285,10 @@ def start_service(): return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") except Exception as e: + error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -295,8 +318,10 @@ def switch_service(): # 构建命令 cmd = build_command(final_config) except Exception as e: + error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -330,8 +355,10 @@ def switch_service(): return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") except Exception as e: + error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), + json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -406,8 +433,10 @@ def get_config(): ) except Exception as e: + error_msg = f"{e}, {str(traceback.format_exc())}" + print(error_msg) return Response( - json.dumps({"message": "api_server.log解析失败,请检查log", "error": str(e)}, ensure_ascii=False), + json.dumps({"message": "api_server.log解析失败,请检查log", "error": error_msg}, ensure_ascii=False), status=500, content_type="application/json", ) @@ -447,7 +476,7 @@ def tail_file(path, lines=50): with open(path, "r", encoding="utf-8", errors="ignore") as f: return "".join(f.readlines()[-lines:]) except Exception as e: - return f"[无法读取 {path}]: {e}\n" + return f"[无法读取 {path}]: {e}, {str(traceback.format_exc())}\n" result = f"服务启动超时,耗时:[{timeout}s]\n\n" result += "==== server.log tail 50 ====\n" diff --git a/framework/ServeTest/mtp.yaml b/framework/ServeTest/mtp.yaml new file mode 100644 index 0000000000..3fcd4714c3 --- /dev/null +++ b/framework/ServeTest/mtp.yaml @@ -0,0 +1,4 @@ +speculative_config: + method: mtp + num_speculative_tokens: 1 + model: /MODELDATA/safetensor_ckpt_step1600/mtp/ \ No newline at end of file diff --git a/framework/ServeTest/test_ci_mtp.py b/framework/ServeTest/test_ci_mtp.py new file mode 100644 index 0000000000..e276c5b8dd --- /dev/null +++ b/framework/ServeTest/test_ci_mtp.py @@ -0,0 +1,281 @@ +import json +import os +import pytest + +import requests + + +HOST = os.environ.get("HOST") +if not HOST: + HOST = "127.0.0.1" +PORT = os.environ.get("FD_API_PORT") +if not PORT: + raise ValueError("Please set FD_API_PORT environment variable.") +URL = f"http://{HOST}:{PORT}/v1/chat/completions" + + +def send_request(url, payload, timeout=600): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post( + url, + headers=headers, + json=payload, + timeout=timeout + ) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: "):] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_text_diff(): + payload = { + "model": "null", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "解释一下温故而知新", + }, + ], + }, + ], + "stream": True, + "temperature": 1.0, + "seed": 21, + "top_p": 0, + "stop": ["", "", "<|endoftext|>", "<|im_end|>"], + "metadata": { + "chat_template_kwargs": { + "options": { + "thinking_mode": "close", + }, + }, + "bad_words_token_ids": [101031, 101032, 101027, 101028, 101023, 101024], + } + } + + print("fastdeploy answer is :") + + try: + response = send_request(URL, payload) + chunks = get_stream_chunks(response) + # for idx, chunk in enumerate(chunks): + # print(f"\nchunk[{idx}]:\n{json.dumps(chunk, indent=2, ensure_ascii=False)}") + result = "".join([x['choices'][0]['delta']['content'] for x in chunks]) + except Exception as e: + print(f"解析失败: {e}") + # 打印log/worklog.0 + if os.path.exists('log/workerlog.0'): + with open('log/workerlog.0', 'r') as file: + log_contents = file.read() + print("################# workerlog.0 ##################", log_contents) + pytest.fail(f"解析失败: {e}") + print("\nresult:\n", result) + # 对比baseline + with open("./baseline_text_mtp.txt", "r", encoding="utf-8") as f: + baseline = f.read() + # with open("./baseline_text_mtp.txt", "w", encoding="utf-8") as f: + # f.writelines(result) + assert result == baseline, f"与baseline存在diff,result: {result}\n baseline: {baseline}" + + +def test_picture_diff(): + payload = { + "model": "null", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "bos://nlp-sr-text2img/luobin06/dataset/doc_images/ChineseDocVQA/4e5278fdb82c881c69122c09f902e029.png", + }, + "tokenizer_options": { + "resolution": 4096, + "version": "v1" + } + }, + {"type": "text", "text": "哪个银行?"}, + { + "type": "image_url", + "image_url": { + "url": "bos://nlp-sr-text2img/luobin06/dataset/doc_images/ChineseDocVQA/4e5278fdb82c881c69122c09f902e029.png", + }, + "tokenizer_options": { + "resolution": 4096, + "version": "v1" + } + }, + {"type": "text", "text": "哪个银行?"}, + ], + }, + ], + "stream": True, + "temperature": 1.0, + "seed": 21, + "top_p": 0, + "stop": ["", "", "<|endoftext|>", "<|im_end|>"], + "metadata": { + "chat_template_kwargs": { + "options": { + "thinking_mode": "close", + }, + }, + "bad_words_token_ids": [101031, 101032, 101027, 101028, 101023, 101024], + } + } + + print("fastdeploy answer is :") + + try: + response = send_request(URL, payload) + chunks = get_stream_chunks(response) + # for idx, chunk in enumerate(chunks): + # print(f"\nchunk[{idx}]:\n{json.dumps(chunk, indent=2, ensure_ascii=False)}") + result = "".join([x['choices'][0]['delta']['content'] for x in chunks]) + except Exception as e: + print(f"解析失败: {e}") + # 打印log/worklog.0 + if os.path.exists('log/workerlog.0'): + with open('log/workerlog.0', 'r') as file: + log_contents = file.read() + print("################# workerlog.0 ##################", log_contents) + pytest.fail(f"解析失败: {e}") + print("\nresult:\n", result) + # 对比baseline + with open("./baseline_pic_mtp.txt", "r", encoding="utf-8") as f: + baseline = f.read() + # with open("./baseline_pic_mtp.txt", "w", encoding="utf-8") as f: + # f.writelines(result) + assert result == baseline, f"与baseline存在diff,result: {result}\n baseline: {baseline}" + + +def test_chat_usage_stream(): + """测试流式chat usage""" + payload = { + "model": "null", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "解释一下温故而知新", + }, + ], + }, + ], + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "temperature": 1.0, + "seed": 21, + "top_p": 0, + "stop": ["", "", "<|endoftext|>", "<|im_end|>"], + "metadata": { + "min_tokens": 10, + "chat_template_kwargs": { + "options": { + "thinking_mode": "close", + }, + }, + "bad_words_token_ids": [101031, 101032, 101027, 101028, 101023, 101024], + }, + "max_tokens": 50, + } + + response = send_request(url=URL, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(): + """测试非流式chat usage""" + payload = { + "model": "null", + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "解释一下温故而知新", + }, + ], + }, + ], + "stream": False, + "temperature": 1.0, + "seed": 21, + "top_p": 0, + "stop": ["", "", "<|endoftext|>", "<|im_end|>"], + "metadata": { + "min_tokens": 50, + "chat_template_kwargs": { + "options": { + "thinking_mode": "close", + }, + }, + "bad_words_token_ids": [101031, 101032, 101027, 101028, 101023, 101024], + }, + "max_tokens": 50, + } + + response = send_request(url=URL, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +if __name__ == '__main__': + test_text_diff()