Skip to content
13 changes: 6 additions & 7 deletions src/arduino/app_bricks/audio_classification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def stop(self):
"""
super().stop()

def classify_from_file(self, audio_path: str, confidence: int = None) -> dict | None:
@staticmethod
def classify_from_file(audio_path: str, confidence: float = 0.8) -> dict | None:
"""Classify audio content from a WAV file.

Supported sample widths:
Expand All @@ -77,9 +78,8 @@ def classify_from_file(self, audio_path: str, confidence: int = None) -> dict |

Args:
audio_path (str): Path to the `.wav` audio file to classify.
confidence (int, optional): Confidence threshold (0–1). If None,
the default confidence level specified during initialization
will be applied.
confidence (float, optional): Minimum confidence threshold (0.0–1.0) required
for a detection to be considered valid. Defaults to 0.8 (80%).

Returns:
dict | None: A dictionary with keys:
Expand Down Expand Up @@ -121,9 +121,8 @@ def classify_from_file(self, audio_path: str, confidence: int = None) -> dict |
features = list(struct.unpack(fmt, frames))
else:
raise ValueError(f"Unsupported sample width: {samp_width} bytes. Cannot process this WAV file.")

classification = super().infer_from_features(features[: int(self.model_info.input_features_count)])
best_match = super().get_best_match(classification, confidence)
classification = AudioClassification.infer_from_features(features)
best_match = AudioDetector.get_best_match(classification, confidence)
if not best_match:
return None
keyword, confidence = best_match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,5 @@
# EXAMPLE_REQUIRES = "Requires an audio file with the glass breaking sound."
from arduino.app_bricks.audio_classification import AudioClassification

classifier = AudioClassification()

classification = classifier.classify_from_file("glass_breaking.wav")
classification = AudioClassification.classify_from_file("glass_breaking.wav")
print("Result:", classification)
2 changes: 1 addition & 1 deletion src/arduino/app_bricks/motion_detection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _detection_loop(self):
return

try:
ret = super().infer_from_features(features[: int(self._model_info.input_features_count)].flatten().tolist())
ret = self.infer_from_features(features.flatten().tolist())
spotted_movement = self._movement_spotted(ret)
if spotted_movement is not None:
keyword, confidence, complete_detection = spotted_movement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def loop(self):
if features is None or len(features) == 0:
return

ret = super().infer_from_features(features[: int(self._model_info.input_features_count)].flatten().tolist())
ret = self.infer_from_features(features.flatten().tolist())
logger.debug(f"Inference result: {ret}")
spotted_anomaly = self._extract_anomaly_score(ret)
if spotted_anomaly is not None:
Expand Down
17 changes: 12 additions & 5 deletions src/arduino/app_internal/core/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,24 @@ def stop(self):
self._mic.stop()
self._buffer.flush()

def get_best_match(self, item: dict, confidence: int = None) -> tuple[str, float] | None:
@staticmethod
def get_best_match(item: dict, confidence: float) -> tuple[str, float] | None:
"""Extract the best matched keyword from the classification results.

Args:
item (dict): The classification result from the inference.
confidence (int): The confidence threshold for classification. If None, uses the instance's confidence level.
confidence (float): The confidence threshold for classification.

Returns:
tuple[str, float] | None: The best matched keyword and its confidence, or None if no match is found.

Raises:
ValueError: If confidence level is not provided.
"""
classification = _extract_classification(item, confidence or self.confidence)
if confidence is None:
raise ValueError("Confidence level must be provided.")

classification = _extract_classification(item, confidence)
if not classification:
return None

Expand Down Expand Up @@ -141,8 +148,8 @@ def _inference_loop(self):

logger.debug(f"Processing sensor data with {len(features)} features.")
try:
ret = super().infer_from_features(features[: int(self.model_info.input_features_count)].tolist())
spotted_keyword = self.get_best_match(ret)
ret = self.infer_from_features(features.tolist())
spotted_keyword = self.get_best_match(ret, self.confidence)
if spotted_keyword:
keyword, confidence = spotted_keyword
keyword = keyword.lower()
Expand Down
70 changes: 46 additions & 24 deletions src/arduino/app_internal/core/ei.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,13 @@ class EdgeImpulseRunnerFacade:
"""Facade for Edge Impulse Object Detection and Classification."""

def __init__(self):
"""Initialize the EdgeImpulseRunnerFacade with the API path."""
infra = load_brick_compose_file(self.__class__)
for k, v in infra["services"].items():
self.host = k
self.infra = v
break # Only one service is expected

self.host = resolve_address(self.host)
"""Initialize the EdgeImpulseRunnerFacade with the API path.

self.port = 1337 # Default EI HTTP port
self.url = f"http://{self.host}:{self.port}"
logger.warning(f"[{self.__class__.__name__}] Host: {self.host} - Ports: {self.port} - URL: {self.url}")
Raises:
RuntimeError: If the Edge Impulse runner address cannot be resolved.
"""
self.url = self._get_ei_url()
logger.info(f"[{self.__class__.__name__}] URL: {self.url}")

def infer_from_file(self, image_path: str) -> dict | None:
if not image_path or image_path == "":
Expand Down Expand Up @@ -124,47 +119,58 @@ def process(self, item):
logger.error(f"[{self.__class__}] Error processing file {item}: {e}")
return None

def infer_from_features(self, features: list) -> dict | None:
"""Infer from features using the Edge Impulse API.
@classmethod
def infer_from_features(cls, features: list) -> dict | None:
"""
Infer from features using the Edge Impulse API.

Args:
cls: The class method caller.
features (list): A list of features to send to the Edge Impulse API.

Returns:
dict | None: The response from the Edge Impulse API as a dictionary, or None if an error occurs.
"""
try:
response = requests.post(f"{self.url}/api/features", json={"features": features})
url = cls._get_ei_url()
model_info = cls.get_model_info(url)
features = features[: int(model_info.input_features_count)]

response = requests.post(f"{url}/api/features", json={"features": features})
if response.status_code == 200:
return response.json()
else:
logger.warning(f"[{self.__class__}] error: {response.status_code}. Message: {response.text}")
logger.warning(f"[{cls.__name__}] error: {response.status_code}. Message: {response.text}")
return None
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Error: {e}")
logger.error(f"[{cls.__name__}] Error: {e}")
return None

def get_model_info(self) -> EdgeImpulseModelInfo | None:
@classmethod
def get_model_info(cls, url: str = None) -> EdgeImpulseModelInfo | None:
"""Get model information from the Edge Impulse API.

Args:
cls: The class method caller.
url (str): The base URL of the Edge Impulse API. If None, it will be determined automatically.

Returns:
model_info (EdgeImpulseModelInfo | None): An instance of EdgeImpulseModelInfo containing model details, None if an error occurs.
"""
if not self.host or not self.port:
logger.error(f"[{self.__class__}] Host or port not set. Cannot fetch model info.")
return None
if not url:
url = cls._get_ei_url()

http_client = HttpClient(total_retries=6) # Initialize the HTTP client with retry logic
try:
response = http_client.request_with_retry(f"{self.url}/api/info")
response = http_client.request_with_retry(f"{url}/api/info")
if response.status_code == 200:
logger.debug(f"[{self.__class__.__name__}] Fetching model info from {self.url}/api/info -> {response.status_code} {response.json}")
logger.debug(f"[{cls.__name__}] Fetching model info from {url}/api/info -> {response.status_code} {response.json}")
return EdgeImpulseModelInfo(response.json())
else:
logger.warning(f"[{self.__class__}] Error fetching model info: {response.status_code}. Message: {response.text}")
logger.warning(f"[{cls}] Error fetching model info: {response.status_code}. Message: {response.text}")
return None
except Exception as e:
logger.error(f"[{self.__class__}] Error fetching model info: {e}")
logger.error(f"[{cls}] Error fetching model info: {e}")
return None
finally:
http_client.close() # Close the HTTP client session
Expand Down Expand Up @@ -237,3 +243,19 @@ def _extract_anomaly_score(self, item: dict):
return class_results["anomaly"]

return None

@classmethod
def _get_ei_url(cls):
infra = load_brick_compose_file(cls)
if not infra or "services" not in infra:
raise RuntimeError("Cannot load Brick Compose file to resolve Edge Impulse runner address.")
host = None
for k, v in infra["services"].items():
host = k
break
if not host:
raise RuntimeError("Cannot resolve Edge Impulse runner address from Brick Compose file.")
addr = resolve_address(host)
if not addr:
raise RuntimeError("Host address resolution failed for Edge Impulse runner.")
return f"http://{addr}:1337"
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def app_instance(monkeypatch):
@pytest.fixture(autouse=True)
def mock_dependencies(monkeypatch: pytest.MonkeyPatch):
"""Mock out docker-compose lookups and image helpers."""
fake_compose = {"services": {"models-runner": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-8100}:8100"]}}}
monkeypatch.setattr("arduino.app_internal.core.load_brick_compose_file", lambda cls: fake_compose)
fake_compose = {"services": {"ei-inference": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-1337}:1337"]}}}
monkeypatch.setattr("arduino.app_internal.core.ei.load_brick_compose_file", lambda cls: fake_compose)
monkeypatch.setattr("arduino.app_internal.core.resolve_address", lambda host: "127.0.0.1")
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "8200")])
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "1337")])

class FakeResp:
status_code = 200
Expand Down
60 changes: 57 additions & 3 deletions tests/arduino/app_bricks/objectdetection/test_objectdetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io
from PIL import Image
from arduino.app_bricks.object_detection import ObjectDetection
from arduino.app_utils import HttpClient


class ModelInfo:
Expand All @@ -20,12 +21,65 @@ def mock_dependencies(monkeypatch: pytest.MonkeyPatch):

This is needed to avoid network calls and other side effects.
"""
fake_compose = {"services": {"models-runner": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-8100}:8100"]}}}
monkeypatch.setattr("arduino.app_internal.core.load_brick_compose_file", lambda cls: fake_compose)
fake_compose = {"services": {"ei-inference": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-1337}:1337"]}}}
monkeypatch.setattr("arduino.app_internal.core.ei.load_brick_compose_file", lambda cls: fake_compose)
monkeypatch.setattr("arduino.app_internal.core.resolve_address", lambda host: "127.0.0.1")
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "8100")])
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "1337")])
monkeypatch.setattr("arduino.app_bricks.object_detection.ObjectDetection.get_model_info", lambda self: ModelInfo("object-detection"))

class FakeResp:
status_code = 200

def json(self):
return {
"project": {
"deploy_version": 11,
"id": 774707,
"impulse_id": 1,
"impulse_name": "Time series data, Spectral Analysis, Classification (Keras), Anomaly Detection (K-means)",
"name": "Fan Monitoring - Advanced Anomaly Detection",
"owner": "Arduino",
},
"modelParameters": {
"has_visual_anomaly_detection": False,
"axis_count": 3,
"frequency": 100,
"has_anomaly": 1,
"has_object_tracking": False,
"has_performance_calibration": False,
"image_channel_count": 0,
"image_input_frames": 0,
"image_input_height": 0,
"image_input_width": 0,
"image_resize_mode": "none",
"inferencing_engine": 4,
"input_features_count": 600,
"interval_ms": 10,
"label_count": 2,
"labels": ["nominal", "off"],
"model_type": "classification",
"sensor": 2,
"slice_size": 50,
"thresholds": [],
"use_continuous_mode": False,
"sensorType": "accelerometer",
},
}

def fake_get(
self,
url: str,
method: str = "GET",
data: dict | str = None,
json: dict = None,
headers: dict = None,
timeout: int = 5,
):
return FakeResp()

# Mock the requests.get method to return a fake response
monkeypatch.setattr(HttpClient, "request_with_retry", fake_get)


@pytest.fixture
def detector():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def app_instance(monkeypatch):
@pytest.fixture(autouse=True)
def mock_dependencies(monkeypatch: pytest.MonkeyPatch):
"""Mock out docker-compose lookups and image helpers."""
fake_compose = {"services": {"models-runner": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-8100}:8100"]}}}
monkeypatch.setattr("arduino.app_internal.core.load_brick_compose_file", lambda cls: fake_compose)
fake_compose = {"services": {"ei-inference": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-1337}:1337"]}}}
monkeypatch.setattr("arduino.app_internal.core.ei.load_brick_compose_file", lambda cls: fake_compose)
monkeypatch.setattr("arduino.app_internal.core.resolve_address", lambda host: "127.0.0.1")
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "8200")])
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda x: [(None, None), (None, "1337")])

class FakeResp:
status_code = 200
Expand Down
58 changes: 58 additions & 0 deletions tests/arduino/app_core/test_edge_impulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,64 @@ def fake_post(url: str, json: dict):
# Mock the requests.post method to return a fake response
monkeypatch.setattr("arduino.app_internal.core.ei.requests.post", fake_post)

class FakeResp2:
status_code = 200

def json(self):
return {
"project": {
"deploy_version": 163,
"id": 412593,
"impulse_id": 1,
"impulse_name": "Impulse #1",
"name": "Tutorial: Continuous motion recognition",
"owner": "Edge Impulse Inc.",
},
"modelParameters": {
"has_visual_anomaly_detection": False,
"axis_count": 3,
"frequency": 62.5,
"has_anomaly": 1,
"has_object_tracking": False,
"image_channel_count": 0,
"image_input_frames": 0,
"image_input_height": 0,
"image_input_width": 0,
"image_resize_mode": "none",
"inferencing_engine": 4,
"input_features_count": 375,
"interval_ms": 16,
"label_count": 4,
"labels": ["idle", "snake", "updown", "wave"],
"model_type": "classification",
"sensor": 2,
"slice_size": 31,
"thresholds": [],
"use_continuous_mode": False,
"sensorType": "accelerometer",
},
}

def fake_get(
self,
url: str,
method: str = "GET",
data: dict | str = None,
json: dict = None,
headers: dict = None,
timeout: int = 5,
):
return FakeResp2()

# Mock the requests.get method to return a fake response
monkeypatch.setattr(HttpClient, "request_with_retry", fake_get)

# Mock docker-compose related functions
fake_compose = {"services": {"ei-inference": {"ports": ["${BIND_ADDRESS:-127.0.0.1}:${BIND_PORT:-1337}:1337"]}}}
monkeypatch.setattr("arduino.app_internal.core.ei.load_brick_compose_file", lambda cls: fake_compose)
monkeypatch.setattr("arduino.app_internal.core.resolve_address", lambda h: "127.0.0.1")
monkeypatch.setattr("arduino.app_internal.core.parse_docker_compose_variable", lambda s: [(None, None), (None, "1337")])

result = facade.infer_from_features(features)
assert captured["url"].endswith("/api/features")
assert captured["json"] == {"features": features}
Expand Down