Skip to content

Commit 35c2579

Browse files
committed
fix: 避免异常崩溃产生多个线程池
1 parent d724f42 commit 35c2579

File tree

3 files changed

+113
-112
lines changed

3 files changed

+113
-112
lines changed

mcim_sync/tasks/__init__.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
from concurrent.futures import ThreadPoolExecutor
2+
from contextlib import contextmanager
23

3-
4+
@contextmanager
45
def create_tasks_pool(sync_function, data, max_workers, thread_name_prefix):
6+
# 创建线程池
57
thread_pool = ThreadPoolExecutor(
6-
max_workers=max_workers, thread_name_prefix=thread_name_prefix
8+
max_workers=max_workers,
9+
thread_name_prefix=thread_name_prefix
710
)
8-
futures = [
9-
thread_pool.submit(sync_function, item) for item in data
10-
]
11-
return thread_pool, futures
11+
12+
try:
13+
# 提交所有任务
14+
futures = [
15+
thread_pool.submit(sync_function, item) for item in data
16+
]
17+
yield futures
18+
finally:
19+
thread_pool.shutdown(wait=True)

mcim_sync/tasks/curseforge.py

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,23 @@ def refresh_curseforge_with_modify_date() -> bool:
4545
log.info(f"Curseforge expired data fetched: {len(curseforge_expired_modids)}")
4646
log.info("Start syncing CurseForge expired data...")
4747

48-
curseforge_pool, curseforge_futures = create_tasks_pool(
49-
sync_mod, # 需要 ProjectDetail 返回值
48+
with create_tasks_pool(
49+
sync_mod,
5050
curseforge_expired_modids,
5151
MAX_WORKERS,
5252
"refresh_curseforge",
53-
)
54-
projects_detail_info: List[ProjectDetail] = []
55-
for future in as_completed(curseforge_futures):
56-
result = future.result()
57-
if result:
58-
projects_detail_info.append(result)
59-
else:
60-
curseforge_pool.shutdown()
53+
) as curseforge_futures:
54+
projects_detail_info: List[ProjectDetail] = []
55+
for future in as_completed(curseforge_futures):
56+
result = future.result()
57+
if result:
58+
projects_detail_info.append(result)
6159

6260
success_modids = [project.id for project in projects_detail_info if project]
63-
61+
6462
failed_count = len(curseforge_expired_modids) - len(success_modids)
6563
failed_modids = [
66-
modid
67-
for modid in curseforge_expired_modids
68-
if modid not in success_modids
64+
modid for modid in curseforge_expired_modids if modid not in success_modids
6965
]
7066

7167
log.info(
@@ -112,18 +108,15 @@ def sync_curseforge_queue() -> bool:
112108
log.info(f"New modids: {new_modids}, count: {len(new_modids)}")
113109

114110
if new_modids:
115-
# pool, futures = create_tasks_pool(sync_mod, modids, MAX_WORKERS, "curseforge")
116-
pool, futures = create_tasks_pool(
111+
with create_tasks_pool(
117112
sync_mod, new_modids, MAX_WORKERS, "sync_curseforge_queue"
118-
) # https://github.com/mcmod-info-mirror/mcim-sync/issues/2
119-
120-
projects_detail_info = []
121-
for future in as_completed(futures):
122-
result = future.result()
123-
if result:
124-
projects_detail_info.append(result)
113+
) as futures:
114+
projects_detail_info = []
115+
for future in as_completed(futures):
116+
result = future.result()
117+
if result:
118+
projects_detail_info.append(result)
125119

126-
pool.shutdown()
127120
log.info(f"CurseForge queue sync finished, total: {len(modids)}")
128121

129122
# clear queue
@@ -199,17 +192,15 @@ def sync_curseforge_by_search(class_ids: Optional[List[int]] = None) -> bool:
199192

200193
log.info(f"CurseForge new modids fetched: {len(new_modids)}")
201194
if new_modids:
202-
pool, futures = create_tasks_pool(
195+
with create_tasks_pool(
203196
sync_mod, new_modids, MAX_WORKERS, "sync_curseforge_by_search"
204-
)
205-
206-
projects_detail_info = []
207-
for future in as_completed(futures):
208-
result = future.result()
209-
if result:
210-
projects_detail_info.append(result)
197+
) as futures:
198+
projects_detail_info = []
199+
for future in as_completed(futures):
200+
result = future.result()
201+
if result:
202+
projects_detail_info.append(result)
211203

212-
pool.shutdown()
213204
log.info(f"CurseForge search sync finished, total: {len(new_modids)}")
214205

215206
if config.telegram_bot:
@@ -231,28 +222,23 @@ def sync_curseforge_full():
231222
curseforge_data = fetch_all_curseforge_data()
232223
log.info(f"Curseforge data totally fetched: {len(curseforge_data)}")
233224

234-
curseforge_pool, curseforge_futures = create_tasks_pool(
225+
with create_tasks_pool(
235226
sync_mod, curseforge_data, MAX_WORKERS, "curseforge_refresh_full"
236-
)
237-
238-
log.info(
239-
f"All {len(curseforge_futures)} tasks submitted, waiting for completion..."
240-
)
227+
) as curseforge_futures:
228+
log.info(
229+
f"All {len(curseforge_futures)} tasks submitted, waiting for completion..."
230+
)
241231

242-
projects_detail_info: List[ProjectDetail] = []
243-
for future in as_completed(curseforge_futures):
244-
result = future.result()
245-
if result:
246-
projects_detail_info.append(result)
247-
else:
248-
curseforge_pool.shutdown()
232+
projects_detail_info: List[ProjectDetail] = []
233+
for future in as_completed(curseforge_futures):
234+
result = future.result()
235+
if result:
236+
projects_detail_info.append(result)
249237

250238
success_modids = [project.id for project in projects_detail_info if project]
251239

252240
failed_count = len(curseforge_data) - len(success_modids)
253-
failed_modids = [
254-
modid for modid in curseforge_data if modid not in success_modids
255-
]
241+
failed_modids = [modid for modid in curseforge_data if modid not in success_modids]
256242

257243
log.info(
258244
f"CurseForge full sync finished, total: {len(curseforge_data)}, "
@@ -268,5 +254,5 @@ def sync_curseforge_full():
268254
)
269255
notification.send_to_telegram()
270256
log.info("CurseForge refresh message sent to telegram.")
271-
272-
return True
257+
258+
return True

mcim_sync/tasks/modrinth.py

Lines changed: 62 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
check_newest_search_result,
2424
)
2525
from mcim_sync.cleaner.modrinth import remove_projects
26-
from mcim_sync.fetcher.modrinth import fetch_expired_and_removed_modrinth_data, fetch_all_modrinth_data
26+
from mcim_sync.fetcher.modrinth import (
27+
fetch_expired_and_removed_modrinth_data,
28+
fetch_all_modrinth_data,
29+
)
2730
from mcim_sync.queues.modrinth import clear_modrinth_all_queues
2831
from mcim_sync.tasks import create_tasks_pool
2932

@@ -52,20 +55,20 @@ def refresh_modrinth_with_modify_date() -> bool:
5255

5356
# 刷新过期的 modrinth 数据
5457
log.info("Start syncing Modrinth expired data...")
55-
modrinth_pool, modrinth_futures = create_tasks_pool(
58+
with create_tasks_pool(
5659
sync_project, # 需要 ProjectDetail 返回值
5760
modrinth_expired_data,
5861
MAX_WORKERS,
5962
"refresh_modrinth",
60-
)
61-
62-
projects_detail_info = []
63-
for future in as_completed(modrinth_futures):
64-
result = future.result()
65-
if result:
66-
projects_detail_info.append(result)
67-
else:
68-
modrinth_pool.shutdown()
63+
) as modrinth_futures:
64+
log.info(
65+
f"All {len(modrinth_futures)} tasks submitted, waiting for completion..."
66+
)
67+
projects_detail_info = []
68+
for future in as_completed(modrinth_futures):
69+
result = future.result()
70+
if result:
71+
projects_detail_info.append(result)
6972

7073
if config.telegram_bot:
7174
notification = RefreshNotification(
@@ -85,13 +88,19 @@ def fetch_modrinth_not_found_ids_from_queue():
8588
project_ids = []
8689
avaliable_project_ids = check_modrinth_project_ids_available()
8790
project_ids.extend(avaliable_project_ids)
88-
log.info(f"Modrinth project ids queue available project_ids: {len(avaliable_project_ids)}")
91+
log.info(
92+
f"Modrinth project ids queue available project_ids: {len(avaliable_project_ids)}"
93+
)
8994
avaliable_project_ids = check_modrinth_version_ids_available()
9095
project_ids.extend(avaliable_project_ids)
91-
log.info(f"Modrinth version ids queue available project_ids: {len(avaliable_project_ids)}")
96+
log.info(
97+
f"Modrinth version ids queue available project_ids: {len(avaliable_project_ids)}"
98+
)
9299
avaliable_project_ids = check_modrinth_hashes_available()
93100
project_ids.extend(avaliable_project_ids)
94-
log.info(f"Modrinth hashes queue available project_ids: {len(avaliable_project_ids)}")
101+
log.info(
102+
f"Modrinth hashes queue available project_ids: {len(avaliable_project_ids)}"
103+
)
95104
return project_ids
96105

97106

@@ -107,22 +116,22 @@ def sync_modrinth_queue() -> bool:
107116
log.info(f"New project ids: {new_project_ids}, count: {len(new_project_ids)}")
108117

109118
if new_project_ids:
110-
pool, futures = create_tasks_pool(
111-
# sync_project, project_ids, MAX_WORKERS, "modrinth"
112-
sync_project,
113-
new_project_ids,
114-
MAX_WORKERS,
115-
"sync_modrinth_by_queue", # https://github.com/mcmod-info-mirror/mcim-sync/issues/2
116-
)
117-
118-
projects_detail_info = []
119+
with (
120+
create_tasks_pool(
121+
# sync_project, project_ids, MAX_WORKERS, "modrinth"
122+
sync_project,
123+
new_project_ids,
124+
MAX_WORKERS,
125+
"sync_modrinth_by_queue", # https://github.com/mcmod-info-mirror/mcim-sync/issues/2
126+
) as futures
127+
):
128+
projects_detail_info = []
129+
130+
for future in as_completed(futures):
131+
result = future.result()
132+
if result:
133+
projects_detail_info.append(result)
119134

120-
for future in as_completed(futures):
121-
result = future.result()
122-
if result:
123-
projects_detail_info.append(result)
124-
125-
pool.shutdown()
126135
log.info(f"Modrinth queue sync finished, total: {len(project_ids)}")
127136

128137
# clear queue
@@ -150,20 +159,18 @@ def sync_modrinth_by_search():
150159
new_project_ids = check_newest_search_result()
151160
log.info(f"Modrinth project ids fetched: {len(new_project_ids)}")
152161
if new_project_ids:
153-
pool, futures = create_tasks_pool(
162+
with create_tasks_pool(
154163
sync_project,
155164
new_project_ids,
156165
MAX_WORKERS,
157166
"sync_modrinth_by_search",
158-
)
167+
) as futures:
168+
projects_detail_info = []
169+
for future in as_completed(futures):
170+
result = future.result()
171+
if result:
172+
projects_detail_info.append(result)
159173

160-
projects_detail_info = []
161-
for future in as_completed(futures):
162-
result = future.result()
163-
if result:
164-
projects_detail_info.append(result)
165-
166-
pool.shutdown()
167174
log.info(
168175
f"Modrinth sync new project by search finished, total: {len(new_project_ids)}"
169176
)
@@ -200,6 +207,7 @@ def refresh_modrinth_tags():
200207
log.info("All Message sent to telegram.")
201208
return True
202209

210+
203211
def refresh_modrinth_full():
204212
"""
205213
刷新 modrinth 所有数据
@@ -209,26 +217,25 @@ def refresh_modrinth_full():
209217
modrinth_data = fetch_all_modrinth_data()
210218
log.info(f"Modrinth data totally fetched: {len(modrinth_data)}")
211219

212-
modrinth_pool, modrinth_futures = create_tasks_pool(
220+
with create_tasks_pool(
213221
sync_project, modrinth_data, MAX_WORKERS, "modrinth_refresh_full"
214-
)
215-
216-
log.info(
217-
f"All {len(modrinth_futures)} tasks submitted, waiting for completion..."
218-
)
222+
) as modrinth_futures:
223+
log.info(
224+
f"All {len(modrinth_futures)} tasks submitted, waiting for completion..."
225+
)
219226

220-
projects_detail_info = []
221-
for future in as_completed(modrinth_futures):
222-
result = future.result()
223-
if result:
224-
projects_detail_info.append(result)
225-
else:
226-
modrinth_pool.shutdown()
227+
projects_detail_info = []
228+
for future in as_completed(modrinth_futures):
229+
result = future.result()
230+
if result:
231+
projects_detail_info.append(result)
227232

228233
success_project_ids = [project.id for project in projects_detail_info if project]
229-
234+
230235
failed_count = len(modrinth_data) - len(success_project_ids)
231-
failed_project_ids = [project.id for project in modrinth_data if project.id not in success_project_ids]
236+
failed_project_ids = [
237+
project.id for project in modrinth_data if project.id not in success_project_ids
238+
]
232239

233240
log.info(
234241
f"Modrinth full sync finished, total: {len(modrinth_data)}, "
@@ -244,5 +251,5 @@ def refresh_modrinth_full():
244251
)
245252
notice.send_to_telegram()
246253
log.info("Modrinth full refresh message sent to telegram.")
247-
248-
return True
254+
255+
return True

0 commit comments

Comments
 (0)