|
8 | 8 | import zipfile |
9 | 9 | from pathlib import Path |
10 | 10 | import glob |
11 | | -from fastapi import FastAPI, UploadFile, File, Form |
| 11 | +from fastapi import Depends, FastAPI, HTTPException, UploadFile, File, Form |
12 | 12 | from fastapi.middleware.gzip import GZipMiddleware |
13 | 13 | from fastapi.responses import JSONResponse, FileResponse |
14 | 14 | from starlette.background import BackgroundTask |
|
21 | 21 | from mineru.utils.guess_suffix_or_lang import guess_suffix_by_path |
22 | 22 | from mineru.version import __version__ |
23 | 23 |
|
24 | | -app = FastAPI() |
| 24 | +# 并发控制器 |
| 25 | +_request_semaphore: Optional[asyncio.Semaphore] = None |
| 26 | + |
| 27 | + |
| 28 | +# 并发控制依赖函数 |
| 29 | +async def limit_concurrency(): |
| 30 | + if _request_semaphore is not None: |
| 31 | + if _request_semaphore.locked(): |
| 32 | + raise HTTPException( |
| 33 | + status_code=503, |
| 34 | + detail="Server is at maximum capacity. Please try again later." |
| 35 | + ) |
| 36 | + async with _request_semaphore: |
| 37 | + yield |
| 38 | + else: |
| 39 | + yield |
| 40 | + |
| 41 | + |
| 42 | +app = FastAPI(openapi_url=None, docs_url=None, redoc_url=None) |
25 | 43 | app.add_middleware(GZipMiddleware, minimum_size=1000) |
26 | 44 |
|
27 | 45 |
|
@@ -60,7 +78,7 @@ def get_infer_result(file_suffix_identifier: str, pdf_name: str, parse_dir: str) |
60 | 78 | return None |
61 | 79 |
|
62 | 80 |
|
63 | | -@app.post(path="/file_parse",) |
| 81 | +@app.post(path="/file_parse", dependencies=[Depends(limit_concurrency)]) |
64 | 82 | async def parse_pdf( |
65 | 83 | files: List[UploadFile] = File(...), |
66 | 84 | output_dir: str = Form("./output"), |
@@ -256,6 +274,14 @@ def main(ctx, host, port, reload, **kwargs): |
256 | 274 |
|
257 | 275 | kwargs.update(arg_parse(ctx)) |
258 | 276 |
|
| 277 | + # 初始化并发控制器 |
| 278 | + global _request_semaphore |
| 279 | + max_concurrent_requests = int(kwargs.get("max_concurrent_requests", 0)) |
| 280 | + if max_concurrent_requests > 0: |
| 281 | + _request_semaphore = asyncio.Semaphore(max_concurrent_requests) |
| 282 | + logger.info(f"Request concurrency limited to {max_concurrent_requests}") |
| 283 | + |
| 284 | + |
259 | 285 | # 将配置参数存储到应用状态中 |
260 | 286 | app.state.config = kwargs |
261 | 287 |
|
|
0 commit comments