Skip to content

Commit e071bf5

Browse files
committed
feat: Add rate limit to network requests
1 parent 2c0b6bf commit e071bf5

File tree

6 files changed

+162
-75
lines changed

6 files changed

+162
-75
lines changed

mcim_sync/config.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import json
22
import os
3-
from typing import Optional, Union
4-
from pydantic import BaseModel, ValidationError, field_validator
5-
from enum import Enum
3+
from typing import Optional, Union, Dict
4+
from pydantic import BaseModel, field_validator
65

76
# config path
87
CONFIG_PATH = os.path.join("config.json")
@@ -54,6 +53,12 @@ class JobInterval(BaseModel):
5453
global_statistics: int = 60 * 60 * 24 # 24 hours
5554

5655

56+
class DomainRateLimitModel(BaseModel):
57+
"""域名限速配置"""
58+
max_requests: int = 10 # 最大请求数
59+
time_window: int = 60 # 时间窗口(秒)
60+
61+
5762
class ConfigModel(BaseModel):
5863
debug: bool = False
5964
mongodb: MongodbConfigModel = MongodbConfigModel()
@@ -78,6 +83,11 @@ class ConfigModel(BaseModel):
7883
max_file_size: int = 1024 * 1024 * 20 # 20MB
7984
log_to_file: bool = False
8085
log_path: str = "./logs"
86+
# 域名限速配置
87+
domain_rate_limits: Dict[str, DomainRateLimitModel] = {
88+
"api.curseforge.com": DomainRateLimitModel(max_requests=100, time_window=60),
89+
"api.modrinth.com": DomainRateLimitModel(max_requests=300, time_window=60),
90+
}
8191

8292

8393
class Config:

mcim_sync/tasks/__init__.py

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,11 @@
1-
import threading
2-
import time
3-
from concurrent.futures import ThreadPoolExecutor, as_completed
4-
5-
from mcim_sync.utils.loger import log
6-
from mcim_sync.config import Config
7-
from mcim_sync.exceptions import ResponseCodeException, TooManyRequestsException
8-
9-
10-
config = Config.load()
11-
12-
# 429 全局暂停
13-
curseforge_pause_event = threading.Event()
14-
modrinth_pause_event = threading.Event()
15-
16-
def sync_with_pause(sync_function, *args):
17-
times = 0
18-
if "curseforge" in threading.current_thread().name:
19-
pause_event = curseforge_pause_event
20-
thread_type = "CurseForge"
21-
elif "modrinth" in threading.current_thread().name:
22-
pause_event = modrinth_pause_event
23-
thread_type = "Modrinth"
24-
else:
25-
log.error(
26-
f"Unknown thread name {threading.current_thread().name}, can't determine pause event."
27-
)
28-
return
29-
while times < 3:
30-
# 检查是否需要暂停
31-
pause_event.wait()
32-
try:
33-
return sync_function(*args)
34-
35-
except (ResponseCodeException, TooManyRequestsException) as e:
36-
if e.status_code in [429, 403]:
37-
log.warning(
38-
f"Received HTTP {e.status_code}, pausing all {thread_type} threads for 30 seconds..."
39-
)
40-
pause_event.clear()
41-
time.sleep(30)
42-
pause_event.set()
43-
log.info("Resuming all threads.")
44-
else:
45-
break
46-
else:
47-
log.error(
48-
f"Failed to sync data after 3 retries, func: {sync_function}, args: {args}"
49-
)
1+
from concurrent.futures import ThreadPoolExecutor
502

513

524
def create_tasks_pool(sync_function, data, max_workers, thread_name_prefix):
535
thread_pool = ThreadPoolExecutor(
546
max_workers=max_workers, thread_name_prefix=thread_name_prefix
557
)
568
futures = [
57-
thread_pool.submit(sync_with_pause, sync_function, item) for item in data
9+
thread_pool.submit(sync_function, item) for item in data
5810
]
5911
return thread_pool, futures

mcim_sync/tasks/curseforge.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
fetch_all_curseforge_data,
2323
)
2424
from mcim_sync.queues.curseforge import clear_curseforge_all_queues
25-
from mcim_sync.tasks import create_tasks_pool, curseforge_pause_event
25+
from mcim_sync.tasks import create_tasks_pool
2626

2727
config = Config.load()
2828

@@ -43,7 +43,6 @@ def refresh_curseforge_with_modify_date() -> bool:
4343
log.info(f"Curseforge expired data fetched: {len(curseforge_expired_modids)}")
4444
log.info("Start syncing CurseForge expired data...")
4545

46-
curseforge_pause_event.set()
4746
curseforge_pool, curseforge_futures = create_tasks_pool(
4847
sync_mod, # 需要 ProjectDetail 返回值
4948
curseforge_expired_modids,
@@ -108,7 +107,6 @@ def sync_curseforge_queue() -> bool:
108107
log.info(f"New modids: {new_modids}, count: {len(new_modids)}")
109108

110109
if modids:
111-
curseforge_pause_event.set()
112110
# pool, futures = create_tasks_pool(sync_mod, modids, MAX_WORKERS, "curseforge")
113111
pool, futures = create_tasks_pool(
114112
sync_mod, new_modids, MAX_WORKERS, "sync_curseforge"
@@ -196,7 +194,6 @@ def sync_curseforge_by_search():
196194

197195
log.info(f"CurseForge new modids fetched: {len(new_modids)}")
198196
if new_modids:
199-
curseforge_pause_event.set()
200197
pool, futures = create_tasks_pool(
201198
sync_mod, new_modids, MAX_WORKERS, "sync_curseforge_by_search"
202199
)
@@ -229,8 +226,6 @@ def sync_curseforge_full():
229226
curseforge_data = fetch_all_curseforge_data()
230227
log.info(f"Curseforge data totally fetched: {len(curseforge_data)}")
231228

232-
curseforge_pause_event.set()
233-
234229
curseforge_pool, curseforge_futures = create_tasks_pool(
235230
sync_mod, curseforge_data, MAX_WORKERS, "curseforge"
236231
)

mcim_sync/tasks/modrinth.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from mcim_sync.cleaner.modrinth import remove_projects
2626
from mcim_sync.fetcher.modrinth import fetch_expired_and_removed_modrinth_data
2727
from mcim_sync.queues.modrinth import clear_modrinth_all_queues
28-
from mcim_sync.tasks import create_tasks_pool, modrinth_pause_event
28+
from mcim_sync.tasks import create_tasks_pool
2929

3030
config = Config.load()
3131

@@ -52,7 +52,6 @@ def refresh_modrinth_with_modify_date() -> bool:
5252

5353
# 刷新过期的 modrinth 数据
5454
log.info("Start syncing Modrinth expired data...")
55-
modrinth_pause_event.set()
5655
modrinth_pool, modrinth_futures = create_tasks_pool(
5756
sync_project, # 需要 ProjectDetail 返回值
5857
modrinth_expired_data,
@@ -108,7 +107,6 @@ def sync_modrinth_queue() -> bool:
108107
log.info(f"New project ids: {new_project_ids}, count: {len(new_project_ids)}")
109108

110109
if project_ids:
111-
modrinth_pause_event.set()
112110
pool, futures = create_tasks_pool(
113111
# sync_project, project_ids, MAX_WORKERS, "modrinth"
114112
sync_project,
@@ -152,7 +150,6 @@ def sync_modrinth_by_search():
152150
new_project_ids = check_newest_search_result()
153151
log.info(f"Modrinth project ids fetched: {len(new_project_ids)}")
154152
if new_project_ids:
155-
modrinth_pause_event.set()
156153
pool, futures = create_tasks_pool(
157154
sync_project,
158155
new_project_ids,

mcim_sync/utils/network/__init__.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@
33
"""
44

55
import httpx
6+
import time
7+
from typing import Optional, Union
68

79
from tenacity import retry, stop_after_attempt, retry_if_not_exception_type
8-
from typing import Optional, Union
9-
from mcim_sync.exceptions import ApiException, ResponseCodeException, TooManyRequestsException
10+
from mcim_sync.exceptions import ResponseCodeException, TooManyRequestsException
1011
from mcim_sync.config import Config
11-
from mcim_sync.utils.loger import log
12+
from mcim_sync.utils.rate_limit import domain_rate_limiter
1213

1314
config = Config.load()
1415

15-
1616
PROXY: Optional[str] = config.proxies
1717

1818
HEADERS = {
@@ -21,8 +21,6 @@
2121

2222
TIMEOUT = 5
2323
RETRY_TIMES = 3
24-
REQUEST_LOG = True
25-
2624

2725
httpx_client: httpx.Client = httpx.Client(proxy=PROXY)
2826

@@ -52,21 +50,27 @@ def request(
5250
**kwargs,
5351
) -> httpx.Response:
5452
"""
55-
HTTPX 请求函数
53+
HTTPX 请求函数,集成域名限速
5654
5755
Args:
5856
url (str): 请求 URL
59-
6057
method (str, optional): 请求方法 默认 GET
61-
6258
timeout (Optional[Union[int, float]], optional): 超时时间,默认为 5 秒
63-
6459
**kwargs: 其他参数
6560
6661
Returns:
67-
Any: 请求结果
62+
httpx.Response: 请求结果
6863
"""
69-
# delete null query
64+
# 应用域名限速
65+
while not domain_rate_limiter.can_make_request(url):
66+
wait_time = domain_rate_limiter.wait_time(url)
67+
if wait_time > 0:
68+
time.sleep(wait_time)
69+
70+
# 记录请求
71+
domain_rate_limiter.record_request(url)
72+
73+
# 执行实际请求
7074
if params is not None:
7175
params = {k: v for k, v in params.items() if v is not None}
7276

@@ -80,6 +84,7 @@ def request(
8084
res: httpx.Response = session.request(
8185
method, url, data=data, params=params, timeout=timeout, **kwargs
8286
)
87+
8388
if not ignore_status_code:
8489
if res.status_code != 200:
8590
if res.status_code == 429:
@@ -99,3 +104,16 @@ def request(
99104
msg=res.text,
100105
)
101106
return res
107+
108+
109+
def get_domain_status(domain: str) -> dict:
110+
"""
111+
获取域名的限速状态
112+
113+
Args:
114+
domain (str): 域名
115+
116+
Returns:
117+
dict: 限速状态信息
118+
"""
119+
return domain_rate_limiter.get_domain_status(domain)

mcim_sync/utils/rate_limit.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""
2+
域名限速器模块
3+
"""
4+
5+
import time
6+
import threading
7+
from collections import deque, defaultdict
8+
from urllib.parse import urlparse
9+
from typing import Dict
10+
11+
from mcim_sync.config import Config
12+
13+
14+
class DomainRateLimiter:
15+
"""简单的域名限速器"""
16+
17+
def __init__(self):
18+
self.config = Config.load()
19+
self.domain_requests: Dict[str, deque] = defaultdict(deque)
20+
self.locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
21+
22+
def get_domain_from_url(self, url: str) -> str:
23+
"""从URL中提取域名"""
24+
try:
25+
parsed = urlparse(url)
26+
return parsed.netloc.lower()
27+
except Exception:
28+
return "unknown"
29+
30+
def can_make_request(self, url: str) -> bool:
31+
"""检查是否可以向指定URL发起请求"""
32+
domain = self.get_domain_from_url(url)
33+
34+
# 如果域名没有配置限速,则允许请求
35+
if domain not in self.config.domain_rate_limits:
36+
return True
37+
38+
domain_config = self.config.domain_rate_limits[domain]
39+
current_time = time.time()
40+
41+
with self.locks[domain]:
42+
requests = self.domain_requests[domain]
43+
44+
# 清理超出时间窗口的请求记录
45+
while requests and current_time - requests[0] > domain_config.time_window:
46+
requests.popleft()
47+
48+
# 检查是否超过最大请求数
49+
return len(requests) < domain_config.max_requests
50+
51+
def record_request(self, url: str):
52+
"""记录请求"""
53+
domain = self.get_domain_from_url(url)
54+
55+
# 如果域名没有配置限速,则不记录
56+
if domain not in self.config.domain_rate_limits:
57+
return
58+
59+
current_time = time.time()
60+
61+
with self.locks[domain]:
62+
self.domain_requests[domain].append(current_time)
63+
64+
def wait_time(self, url: str) -> float:
65+
"""计算需要等待的时间"""
66+
domain = self.get_domain_from_url(url)
67+
68+
# 如果域名没有配置限速,则不需要等待
69+
if domain not in self.config.domain_rate_limits:
70+
return 0.0
71+
72+
domain_config = self.config.domain_rate_limits[domain]
73+
current_time = time.time()
74+
75+
with self.locks[domain]:
76+
requests = self.domain_requests[domain]
77+
78+
# 清理超出时间窗口的请求记录
79+
while requests and current_time - requests[0] > domain_config.time_window:
80+
requests.popleft()
81+
82+
# 如果请求数已满,计算等待时间
83+
if len(requests) >= domain_config.max_requests:
84+
oldest_request = requests[0]
85+
return domain_config.time_window - (current_time - oldest_request)
86+
87+
return 0.0
88+
89+
def get_domain_status(self, domain: str) -> Dict:
90+
"""获取域名的限速状态"""
91+
if domain not in self.config.domain_rate_limits:
92+
return {"configured": False}
93+
94+
domain_config = self.config.domain_rate_limits[domain]
95+
current_time = time.time()
96+
97+
with self.locks[domain]:
98+
requests = self.domain_requests[domain]
99+
100+
# 清理超出时间窗口的请求记录
101+
while requests and current_time - requests[0] > domain_config.time_window:
102+
requests.popleft()
103+
104+
return {
105+
"configured": True,
106+
"max_requests": domain_config.max_requests,
107+
"time_window": domain_config.time_window,
108+
"current_requests": len(requests),
109+
"remaining_requests": domain_config.max_requests - len(requests),
110+
"next_reset_time": requests[0] + domain_config.time_window if requests else current_time
111+
}
112+
113+
114+
# 全局限速器实例
115+
domain_rate_limiter = DomainRateLimiter()

0 commit comments

Comments
 (0)