-
Notifications
You must be signed in to change notification settings - Fork 131
Open
Description
# -*- coding: utf-8 -*-
import asyncio
import ipaddress
import socket
from time import monotonic, time
from aiohttp import ClientSession, TCPConnector, AsyncResolver
from typing import List, Tuple, AsyncGenerator
# 全局配置
MAX_CONCURRENT = 200 # 并发连接数
TIMEOUT = 0.8 # 超时时间(秒)
RETRY = 2 # 失败重试次数
BATCH_SIZE = 1000 # 结果批量处理阈值
async def generate_ips(cidrs: List[str]) -> AsyncGenerator[str, None]:
"""高效生成IP地址"""
for cidr in cidrs:
network = ipaddress.ip_network(cidr)
for ip in network.hosts():
yield str(ip)
async def test_ip(ip: str, session: ClientSession) -> Tuple[str, float]:
"""异步测速核心函数"""
url = f"https://{ip}/"
start = monotonic()
try:
async with session.head( # 使用HEAD方法减少数据传输
url,
headers={"Host": "translate.googleapis.com"},
allow_redirects=False,
timeout=TIMEOUT,
ssl=False
) as resp:
latency = monotonic() - start
# 接受2xx/3xx/4xx状态码 (排除5xx错误)
if resp.status < 500:
print(f"{ip}: {latency:.2f}")
return ip, latency
except (Exception,):
...
return ip, float('inf')
async def worker(queue: asyncio.Queue, results: list, session: ClientSession):
"""工作协程"""
while True:
ip = await queue.get()
for _ in range(RETRY + 1):
ip, latency = await test_ip(ip, session)
if latency <= TIMEOUT:
results.append((ip, latency))
# 批量处理结果
if len(results) >= BATCH_SIZE:
print(f"找到有效IP: {len(results)} 个")
break
queue.task_done()
async def main(cidrs: List[str]):
"""主调度器"""
queue = asyncio.Queue(maxsize=MAX_CONCURRENT * 2)
results = []
# 初始化连接池
connector = TCPConnector(
# 连接池控制
limit=2000, # 最大并发连接数 (根据系统fd限制调整)
limit_per_host=50, # 单目标主机最大连接
# DNS优化
use_dns_cache=True, # 启用DNS缓存
ttl_dns_cache=300, # 缓存5分钟
resolver=AsyncResolver(), # 使用异步DNS解析器
# 协议栈优化
family=socket.AF_INET, # 强制IPv4避免双栈开销
happy_eyeballs_delay=0.1, # 缩短IPv6回退延迟
interleave=2, # 交替尝试地址顺序
# SSL配置
ssl=False, # 自定义SSL上下文
# 连接复用
keepalive_timeout=30, # 保持连接存活时间
force_close=False, # 允许连接复用
# 系统级优化
local_addr=('0.0.0.0', 0), # 绑定本地所有接口
enable_cleanup_closed=True # 自动清理关闭连接
)
async with ClientSession(connector=connector) as session:
# 启动工作协程集群
workers = [
asyncio.create_task(worker(queue, results, session))
for _ in range(MAX_CONCURRENT)
]
# 动态填充任务队列
ips = generate_ips(cidrs)
async for ip in ips:
await queue.put(ip)
# 流量控制:保持队列满载但不过载
while queue.qsize() > MAX_CONCURRENT * 1.5:
await asyncio.sleep(0.1)
await queue.join()
for task in workers:
task.cancel()
# 结果排序去重
results.sort(key=lambda x: x[1])
return [x for x in results if x[1] < float('inf')]
# 使用示例
if __name__ == "__main__":
_cidr_list = [
"142.250.0.0/15",
"108.177.0.0/17",
"172.217.0.0/16",
"172.253.0.0/16",
"216.58.192.0/19",
"72.14.192.0/18",
"74.125.0.0/16",
# "142.251.222.30"
]
# 随机打乱CIDR顺序避免热点
# shuffle(_cidr_list)
# 运行测试
_start_time = time()
# 运行异步引擎
found = asyncio.run(main(_cidr_list))
print(f"\n测试完成,耗时 {time() - _start_time:.2f}秒")
print(f"共找到 {len(found)} 个低延迟IP")
for _ip, _latency in found[:10]:
print(f"{_ip}: {_latency * 1000:.2f}ms")Metadata
Metadata
Assignees
Labels
No labels