Skip to content

add: async_main.py #47

@iicey

Description

@iicey
# -*- coding: utf-8 -*-
import asyncio
import ipaddress
import socket
from time import monotonic, time
from aiohttp import ClientSession, TCPConnector, AsyncResolver
from typing import List, Tuple, AsyncGenerator

# 全局配置
MAX_CONCURRENT = 200  # 并发连接数
TIMEOUT = 0.8  # 超时时间(秒)
RETRY = 2  # 失败重试次数
BATCH_SIZE = 1000  # 结果批量处理阈值


async def generate_ips(cidrs: List[str]) -> AsyncGenerator[str, None]:
    """高效生成IP地址"""
    for cidr in cidrs:
        network = ipaddress.ip_network(cidr)
        for ip in network.hosts():
            yield str(ip)


async def test_ip(ip: str, session: ClientSession) -> Tuple[str, float]:
    """异步测速核心函数"""
    url = f"https://{ip}/"
    start = monotonic()
    try:
        async with session.head(  # 使用HEAD方法减少数据传输
                url,
                headers={"Host": "translate.googleapis.com"},
                allow_redirects=False,
                timeout=TIMEOUT,
                ssl=False
        ) as resp:
            latency = monotonic() - start
            # 接受2xx/3xx/4xx状态码 (排除5xx错误)
            if resp.status < 500:
                print(f"{ip}: {latency:.2f}")
                return ip, latency
    except (Exception,):
        ...
    return ip, float('inf')


async def worker(queue: asyncio.Queue, results: list, session: ClientSession):
    """工作协程"""
    while True:
        ip = await queue.get()
        for _ in range(RETRY + 1):
            ip, latency = await test_ip(ip, session)
            if latency <= TIMEOUT:
                results.append((ip, latency))
                # 批量处理结果
                if len(results) >= BATCH_SIZE:
                    print(f"找到有效IP: {len(results)} 个")
                break
        queue.task_done()


async def main(cidrs: List[str]):
    """主调度器"""
    queue = asyncio.Queue(maxsize=MAX_CONCURRENT * 2)
    results = []

    # 初始化连接池
    connector = TCPConnector(
        # 连接池控制
        limit=2000,  # 最大并发连接数 (根据系统fd限制调整)
        limit_per_host=50,  # 单目标主机最大连接

        # DNS优化
        use_dns_cache=True,  # 启用DNS缓存
        ttl_dns_cache=300,  # 缓存5分钟
        resolver=AsyncResolver(),  # 使用异步DNS解析器

        # 协议栈优化
        family=socket.AF_INET,  # 强制IPv4避免双栈开销
        happy_eyeballs_delay=0.1,  # 缩短IPv6回退延迟
        interleave=2,  # 交替尝试地址顺序

        # SSL配置
        ssl=False,  # 自定义SSL上下文

        # 连接复用
        keepalive_timeout=30,  # 保持连接存活时间
        force_close=False,  # 允许连接复用

        # 系统级优化
        local_addr=('0.0.0.0', 0),  # 绑定本地所有接口
        enable_cleanup_closed=True  # 自动清理关闭连接
    )

    async with ClientSession(connector=connector) as session:
        # 启动工作协程集群
        workers = [
            asyncio.create_task(worker(queue, results, session))
            for _ in range(MAX_CONCURRENT)
        ]

        # 动态填充任务队列
        ips = generate_ips(cidrs)
        async for ip in ips:
            await queue.put(ip)
            # 流量控制:保持队列满载但不过载
            while queue.qsize() > MAX_CONCURRENT * 1.5:
                await asyncio.sleep(0.1)

        await queue.join()
        for task in workers:
            task.cancel()

    # 结果排序去重
    results.sort(key=lambda x: x[1])
    return [x for x in results if x[1] < float('inf')]


# 使用示例
if __name__ == "__main__":
    _cidr_list = [
        "142.250.0.0/15",
        "108.177.0.0/17",
        "172.217.0.0/16",
        "172.253.0.0/16",
        "216.58.192.0/19",
        "72.14.192.0/18",
        "74.125.0.0/16",

        # "142.251.222.30"
    ]

    # 随机打乱CIDR顺序避免热点
    # shuffle(_cidr_list)

    # 运行测试
    _start_time = time()

    # 运行异步引擎
    found = asyncio.run(main(_cidr_list))
    print(f"\n测试完成,耗时 {time() - _start_time:.2f}秒")
    print(f"共找到 {len(found)} 个低延迟IP")
    for _ip, _latency in found[:10]:
        print(f"{_ip}: {_latency * 1000:.2f}ms")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions