Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
depthai==3.2.1
depthai-nodes @ git+https://github.com/luxonis/depthai-nodes.git@f40211e5665473b5db48457640bed18fd1f2cc8d #InstanceToSemanticMask
opencv-python-headless~=4.10.0
numpy>=1.22
tokenizers~=0.21.0
onnxruntime
# onnxruntime-gpu # if you want to use CUDAExecutionProvider
requests
python-box
pydantic
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Callable

from nn import NNState


class GetCurrentParamsService:
"""
Service that returns current parameters for the frontend.
Matches the expected format: class_names, image_prompt_labels, confidence_threshold.
"""

def __init__(
self,
get_nn_state: Callable[[], NNState],
):
self._get_nn_state = get_nn_state

def handle(self, _req=None) -> dict:
nn_state = self._get_nn_state()
return {
"class_names": nn_state.current_classes,
"image_prompt_labels": nn_state.image_prompt_labels,
"confidence_threshold": nn_state.confidence_threshold,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .camera_source_node import CameraSourceNode

__all__ = ["CameraSourceNode"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from pathlib import Path
from typing import Optional

import depthai as dai

from config.config_data_classes import VideoConfig


class CameraSourceNode(dai.node.ThreadedHostNode):
"""
High-level node for the camera part of the pipeline.

Internal block:
Camera
-> BGR output
-> NV12 stream -> VideoEncoder -> H.264

Handles both live camera and file replay sources, exposing:
- bgr: BGR888i stream.
- encoded: H.264 encoded stream.

"""

def __init__(self) -> None:
super().__init__()

self._encoder: dai.node.VideoEncoder = self.createSubnode(dai.node.VideoEncoder)

self._camera: Optional[dai.node.Camera] = None
self._replay: Optional[dai.node.ReplayVideo] = None
self._manip: Optional[dai.node.ImageManip] = None

self._nv12_out: Optional[dai.Node.Output] = None
self.bgr: Optional[dai.Node.Output] = None
self.encoded: Optional[dai.Node.Output] = None

def build(self, cfg: VideoConfig) -> "CameraSourceNode":
"""
@param cfg: Video configuration with resolution, fps, and optional media_path.
"""
if cfg.media_path:
self._setup_replay(cfg)
else:
self._setup_camera(cfg)

self._setup_encoder(cfg)
return self

def _setup_camera(self, cfg: VideoConfig) -> None:
"""Configure live camera source."""
self._camera = self.createSubnode(dai.node.Camera)
self._camera.build()

self.bgr = self._camera.requestOutput(
size=(cfg.width, cfg.height),
type=dai.ImgFrame.Type.BGR888i,
fps=cfg.fps,
)

self._nv12_out = self._camera.requestOutput(
size=(cfg.width, cfg.height),
type=dai.ImgFrame.Type.NV12,
fps=cfg.fps,
)

def _setup_replay(self, cfg: VideoConfig) -> None:
"""Configure file replay source."""
self._replay = self.createSubnode(dai.node.ReplayVideo)
self._replay.setReplayVideoFile(Path(cfg.media_path))
self._replay.setOutFrameType(dai.ImgFrame.Type.NV12)
self._replay.setLoop(True)
self._replay.setFps(cfg.fps)
self._replay.setSize((cfg.width, cfg.height))

# ImageManip to convert NV12 to BGR
self._manip = self.createSubnode(dai.node.ImageManip)
self._manip.setMaxOutputFrameSize(cfg.width * cfg.height * 3)
self._manip.initialConfig.setOutputSize(cfg.width, cfg.height)
self._manip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888i)
self._replay.out.link(self._manip.inputImage)

self.bgr = self._manip.out
self._nv12_out = self._replay.out

def _setup_encoder(self, cfg: VideoConfig) -> None:
"""Configure H.264 encoder for visualization."""
self._encoder.setDefaultProfilePreset(
fps=cfg.fps,
profile=dai.VideoEncoderProperties.Profile.H264_MAIN,
)
self._nv12_out.link(self._encoder.input)
self.encoded = self._encoder.out

def run(self) -> None:
# High-level node: subnodes handle all processing.
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .arguments import parse_args
from .system_configuration import build_configuration
from .config_data_classes import (
ModelInfo,
VideoConfig,
NeuralNetworkConfig,
)

__all__ = [
"parse_args",
"build_configuration",
"ModelInfo",
"VideoConfig",
"NeuralNetworkConfig",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, Namespace


def parse_args() -> Namespace:
"""
Define and parse CLI arguments for the application.
"""
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)

parser.add_argument(
"-fps",
"--fps_limit",
help="FPS limit for the pipeline runtime.",
required=False,
default=None,
type=int,
)

parser.add_argument(
"-media",
"--media_path",
help=(
"Path to the media file to run the model on. "
"If not set, the model runs on the live camera input."
),
required=False,
default=None,
type=str,
)

parser.add_argument(
"-m",
"--model",
help="Model to use: 'yoloe' or 'yolo-world'",
choices=["yoloe", "yolo-world"],
default="yoloe",
type=str,
)

parser.add_argument(
"--semantic_seg",
help="Display output as semantic segmentation otherwise use instance segmentation (only applicable for YOLOE).",
action="store_true",
)

args = parser.parse_args()
return args
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path
from dataclasses import dataclass

from box import Box
import depthai as dai


@dataclass
class ModelInfo:
"""Model metadata from DepthAI zoo."""

name: str
precision: str
yaml_path: Path
width: int
height: int
description: dai.NNModelDescription
archive: dai.NNArchive


@dataclass
class VideoConfig:
"""Config for CameraSourceNode."""

fps: int
media_path: str
width: int
height: int


@dataclass
class NeuralNetworkConfig:
"""Config for NNDetectionNode."""

model: ModelInfo
backend_type: str
runtime: str
performance_profile: str
num_inference_threads: int
confidence_thr: float
prompts: Box
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from dataclasses import dataclass
from pathlib import Path
from argparse import Namespace
import logging

from box import Box
import depthai as dai

from .config_data_classes import ModelInfo, VideoConfig, NeuralNetworkConfig

log = logging.getLogger(__name__)

DETECTOR_YAMLS = {
"yolo-world": {"fp16": "yolo_world_l_fp16", "int8": "yolo_world_l"},
"yoloe": {"fp16": "yoloe_v8_l_fp16"},
}


@dataclass
class SystemConfig:
video: VideoConfig
nn: NeuralNetworkConfig


def build_configuration(platform: str, args: Namespace) -> SystemConfig:
"""From CLI args if provided, if not use config.yaml defaults."""
configs_dir = Path(__file__).parent
yamls_dir = configs_dir / "yaml_configs"

yaml_config = _load_yaml_config(yamls_dir / "config.yaml")

model_name = (getattr(args, "model", None) or yaml_config.model.name).lower()
precision = yaml_config.model.precision

prompts_file = yaml_config.model.prompts_files.get(model_name)
if not prompts_file:
raise ValueError(f"No prompts config registered for model '{model_name}'.")

prompts_path = yamls_dir / prompts_file
if not prompts_path.exists():
raise FileNotFoundError(f"Prompts config not found: {prompts_path}")
prompts = Box.from_yaml(filename=prompts_path)

model_info = _load_model(platform, precision, model_name)

# Video config
fps = getattr(args, "fps_limit", None) or yaml_config.video.fps
video = VideoConfig(
fps=fps,
media_path=getattr(args, "media_path", None),
width=int(yaml_config.video.width),
height=int(yaml_config.video.height),
)

# NN config
b = yaml_config.nn.backend
nn = NeuralNetworkConfig(
model=model_info,
backend_type=str(b.type),
runtime=str(b.runtime),
performance_profile=str(b.performance_profile),
num_inference_threads=int(b.inference_threads),
confidence_thr=float(yaml_config.nn.confidence_thr),
prompts=prompts,
)

return SystemConfig(video=video, nn=nn)


def _load_yaml_config(path: Path) -> Box:
if not path.exists():
raise FileNotFoundError(f"Missing root config: {path}")
return Box.from_yaml(filename=path)


def _load_model(platform: str, precision: str, model_name: str) -> ModelInfo:
models_dir = Path(__file__).parent.parent / "depthai_models"

if model_name not in DETECTOR_YAMLS:
raise ValueError(
f"Unknown model: {model_name}. Choose from: {list(DETECTOR_YAMLS)}"
)

yaml_base = DETECTOR_YAMLS[model_name].get(precision)
if not yaml_base:
supported = list(DETECTOR_YAMLS[model_name].keys())
raise ValueError(
f"Model '{model_name}' does not support precision '{precision}'. "
f"Supported: {supported}."
)

yaml_path = models_dir / f"{yaml_base}.{platform}.yaml"
if not yaml_path.exists():
raise FileNotFoundError(f"Model YAML not found: {yaml_path}")

desc = dai.NNModelDescription.fromYamlFile(str(yaml_path))
desc.platform = platform
archive = dai.NNArchive(dai.getModelFromZoo(desc))
w, h = archive.getInputSize()
log.info(f"Loaded model '{model_name}' ({precision}) from {yaml_path.name}")

return ModelInfo(
name=model_name,
precision=precision,
yaml_path=yaml_path,
width=w,
height=h,
description=desc,
archive=archive,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
model:
name: "yoloe" # "yoloe" | "yolo-world"
precision: "fp16" # "fp16" | "int8"

prompts_files:
yoloe: "prompts_yoloe.yaml"
yolo-world: "prompts_yolo_world.yaml"

nn:
backend:
type: "snpe"
runtime: "dsp"
performance_profile: "default"
inference_threads: 1
confidence_thr: 0.1

video:
width: 1280
height: 960
fps: 10
Loading