Skip to content

Commit 25c2f5c

Browse files
authored
Merge pull request #1997 from AnonymDevOSS/feat/threads-queues-video-process
feat: add threaded I/O pipeline for video processing
2 parents 70e9af9 + 316cb62 commit 25c2f5c

File tree

3 files changed

+157
-39
lines changed

3 files changed

+157
-39
lines changed

supervision/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from supervision.detection.tools.polygon_zone import PolygonZone, PolygonZoneAnnotator
5353
from supervision.detection.tools.smoother import DetectionsSmoother
5454
from supervision.detection.utils.boxes import (
55+
box_aspect_ratio,
5556
clip_boxes,
5657
denormalize_boxes,
5758
move_boxes,
@@ -199,6 +200,7 @@
199200
"VideoInfo",
200201
"VideoSink",
201202
"approximate_polygon",
203+
"box_aspect_ratio",
202204
"box_iou",
203205
"box_iou_batch",
204206
"box_iou_batch_with_jaccard",

supervision/detection/utils/boxes.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,57 @@
66
from supervision.detection.utils.iou_and_nms import box_iou_batch
77

88

9+
def box_aspect_ratio(xyxy: np.ndarray) -> np.ndarray:
10+
"""
11+
Calculate aspect ratios of bounding boxes given in xyxy format.
12+
13+
Computes the width divided by height for each bounding box. Returns NaN
14+
for boxes with zero height to avoid division errors.
15+
16+
Args:
17+
xyxy (`numpy.ndarray`): Array of bounding boxes in
18+
`(x_min, y_min, x_max, y_max)` format with shape `(N, 4)`.
19+
20+
Returns:
21+
`numpy.ndarray`: Array of aspect ratios with shape `(N,)`, where each element is
22+
the width divided by height of a box. Elements are NaN if height is zero.
23+
24+
Examples:
25+
```python
26+
import numpy as np
27+
import supervision as sv
28+
29+
xyxy = np.array([
30+
[10, 20, 30, 50],
31+
[0, 0, 40, 10],
32+
])
33+
34+
sv.box_aspect_ratio(xyxy)
35+
# array([0.66666667, 4. ])
36+
37+
xyxy = np.array([
38+
[10, 10, 30, 10],
39+
[5, 5, 25, 25],
40+
])
41+
42+
sv.box_aspect_ratio(xyxy)
43+
# array([ nan, 1. ])
44+
```
45+
"""
46+
widths = xyxy[:, 2] - xyxy[:, 0]
47+
heights = xyxy[:, 3] - xyxy[:, 1]
48+
49+
aspect_ratios = np.full_like(widths, np.nan, dtype=np.float64)
50+
np.divide(
51+
widths,
52+
heights,
53+
out=aspect_ratios,
54+
where=heights != 0,
55+
)
56+
57+
return aspect_ratios
58+
59+
960
def clip_boxes(xyxy: np.ndarray, resolution_wh: tuple[int, int]) -> np.ndarray:
1061
"""
1162
Clips bounding boxes coordinates to fit within the frame resolution.

supervision/utils/video.py

Lines changed: 104 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from __future__ import annotations
22

3+
import threading
34
import time
45
from collections import deque
56
from collections.abc import Callable, Generator
67
from dataclasses import dataclass
8+
from queue import Queue
79

810
import cv2
911
import numpy as np
@@ -196,63 +198,126 @@ def process_video(
196198
source_path: str,
197199
target_path: str,
198200
callback: Callable[[np.ndarray, int], np.ndarray],
201+
*,
199202
max_frames: int | None = None,
203+
prefetch: int = 32,
204+
writer_buffer: int = 32,
200205
show_progress: bool = False,
201206
progress_message: str = "Processing video",
202207
) -> None:
203208
"""
204-
Process a video file by applying a callback function on each frame
205-
and saving the result to a target video file.
209+
Process video frames asynchronously using a threaded pipeline.
210+
211+
This function orchestrates a three-stage pipeline to optimize video processing
212+
throughput:
213+
214+
1. Reader thread: Continuously reads frames from the source video file and
215+
enqueues them into a bounded queue (`frame_read_queue`). The queue size is
216+
limited by the `prefetch` parameter to control memory usage.
217+
2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the
218+
user-defined `callback` function to process each frame, then enqueues the
219+
processed frames into another bounded queue (`frame_write_queue`) for writing.
220+
The processing happens in the main thread, simplifying use of stateful objects
221+
without synchronization.
222+
3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes
223+
them sequentially to the output video file.
206224
207225
Args:
208-
source_path (str): The path to the source video file.
209-
target_path (str): The path to the target video file.
210-
callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in
211-
a numpy ndarray representation of a video frame and an
212-
int index of the frame and returns a processed numpy ndarray
213-
representation of the frame.
214-
max_frames (Optional[int]): The maximum number of frames to process.
215-
show_progress (bool): Whether to show a progress bar.
216-
progress_message (str): The message to display in the progress bar.
226+
source_path (str): Path to the input video file.
227+
target_path (str): Path where the processed video will be saved.
228+
callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for
229+
each frame, accepting the frame as a numpy array and its zero-based index,
230+
returning the processed frame.
231+
max_frames (int | None): Optional maximum number of frames to process.
232+
If None, the entire video is processed (default).
233+
prefetch (int): Maximum number of frames buffered by the reader thread.
234+
Controls memory use; default is 32.
235+
writer_buffer (int): Maximum number of frames buffered before writing.
236+
Controls output buffer size; default is 32.
237+
show_progress (bool): Whether to display a tqdm progress bar during processing.
238+
Default is False.
239+
progress_message (str): Description shown in the progress bar.
217240
218-
Examples:
241+
Returns:
242+
None
243+
244+
Example:
219245
```python
246+
import cv2
220247
import supervision as sv
248+
from rfdetr import RFDETRMedium
221249
222-
def callback(scene: np.ndarray, index: int) -> np.ndarray:
223-
...
250+
model = RFDETRMedium()
251+
252+
def callback(frame, frame_index):
253+
return model.predict(frame)
224254
225255
process_video(
226-
source_path=<SOURCE_VIDEO_PATH>,
227-
target_path=<TARGET_VIDEO_PATH>,
228-
callback=callback
256+
source_path="source.mp4",
257+
target_path="target.mp4",
258+
callback=frame_callback,
229259
)
230260
```
231261
"""
232-
source_video_info = VideoInfo.from_video_path(video_path=source_path)
233-
video_frames_generator = get_video_frames_generator(
234-
source_path=source_path, end=max_frames
262+
video_info = VideoInfo.from_video_path(video_path=source_path)
263+
total_frames = (
264+
min(video_info.total_frames, max_frames)
265+
if max_frames is not None
266+
else video_info.total_frames
235267
)
236-
with VideoSink(target_path=target_path, video_info=source_video_info) as sink:
237-
total_frames = (
238-
min(source_video_info.total_frames, max_frames)
239-
if max_frames is not None
240-
else source_video_info.total_frames
268+
269+
frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch)
270+
frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer)
271+
272+
def reader_thread() -> None:
273+
frame_generator = get_video_frames_generator(
274+
source_path=source_path,
275+
end=max_frames,
241276
)
242-
for index, frame in enumerate(
243-
tqdm(
244-
video_frames_generator,
245-
total=total_frames,
246-
disable=not show_progress,
247-
desc=progress_message,
248-
)
249-
):
250-
result_frame = callback(frame, index)
251-
sink.write_frame(frame=result_frame)
252-
else:
253-
for index, frame in enumerate(video_frames_generator):
254-
result_frame = callback(frame, index)
255-
sink.write_frame(frame=result_frame)
277+
for frame_index, frame in enumerate(frame_generator):
278+
frame_read_queue.put((frame_index, frame))
279+
frame_read_queue.put(None)
280+
281+
def writer_thread(video_sink: VideoSink) -> None:
282+
while True:
283+
frame = frame_write_queue.get()
284+
if frame is None:
285+
break
286+
video_sink.write_frame(frame=frame)
287+
288+
reader_worker = threading.Thread(target=reader_thread, daemon=True)
289+
with VideoSink(target_path=target_path, video_info=video_info) as video_sink:
290+
writer_worker = threading.Thread(
291+
target=writer_thread,
292+
args=(video_sink,),
293+
daemon=True,
294+
)
295+
296+
reader_worker.start()
297+
writer_worker.start()
298+
299+
progress_bar = tqdm(
300+
total=total_frames,
301+
disable=not show_progress,
302+
desc=progress_message,
303+
)
304+
305+
try:
306+
while True:
307+
read_item = frame_read_queue.get()
308+
if read_item is None:
309+
break
310+
311+
frame_index, frame = read_item
312+
processed_frame = callback(frame, frame_index)
313+
314+
frame_write_queue.put(processed_frame)
315+
progress_bar.update(1)
316+
finally:
317+
frame_write_queue.put(None)
318+
reader_worker.join()
319+
writer_worker.join()
320+
progress_bar.close()
256321

257322

258323
class FPSMonitor:

0 commit comments

Comments
 (0)