diff --git a/examples/asynchronous_api/run.py b/examples/asynchronous_api/run.py index 8385b2a9..edd21be3 100644 --- a/examples/asynchronous_api/run.py +++ b/examples/asynchronous_api/run.py @@ -7,16 +7,19 @@ import sys import cv2 + from model_api.models import DetectionModel def main(): if len(sys.argv) != 2: - raise RuntimeError(f"Usage: {sys.argv[0]} ") + usage_message = f"Usage: {sys.argv[0]} " + raise RuntimeError(usage_message) image = cv2.cvtColor(cv2.imread(sys.argv[1]), cv2.COLOR_BGR2RGB) if image is None: - raise RuntimeError("Failed to read the image") + error_message = f"Failed to read the image: {sys.argv[1]}" + raise RuntimeError(error_message) # Create Object Detection model using mode name and download from Open Model Zoo # Replace numpy preprocessing and embed it directly into a model graph to speed up inference diff --git a/examples/serving_api/run.py b/examples/serving_api/run.py index 99299164..ce138981 100755 --- a/examples/serving_api/run.py +++ b/examples/serving_api/run.py @@ -7,20 +7,24 @@ import sys import cv2 + from model_api.models import DetectionModel def main(): if len(sys.argv) != 2: - raise RuntimeError(f"Usage: {sys.argv[0]} ") + usage_message = f"Usage: {sys.argv[0]} " + raise RuntimeError(usage_message) image = cv2.cvtColor(cv2.imread(sys.argv[1]), cv2.COLOR_BGR2RGB) if image is None: - raise RuntimeError("Failed to read the image") + error_message = f"Failed to read the image: {sys.argv[1]}" + raise RuntimeError(error_message) # Create Object Detection model specifying the OVMS server URL model = DetectionModel.create_model( - "localhost:8000/v2/models/ssd_mobilenet_v1_fpn_coco", model_type="ssd" + "localhost:8000/v2/models/ssd_mobilenet_v1_fpn_coco", + model_type="ssd", ) detections = model(image) print(f"Detection results: {detections}") diff --git a/examples/synchronous_api/run.py b/examples/synchronous_api/run.py index 3f3d6806..0fd80b88 100755 --- a/examples/synchronous_api/run.py +++ b/examples/synchronous_api/run.py @@ -7,17 +7,20 @@ import sys import cv2 -from model_api.models import ClassificationModel, DetectionModel, SegmentationModel from PIL import Image +from model_api.models import ClassificationModel, DetectionModel, SegmentationModel + def main(): if len(sys.argv) != 2: - raise RuntimeError(f"Usage: {sys.argv[0]} ") + usage_message = f"Usage: {sys.argv[0]} " + raise RuntimeError(usage_message) image = cv2.cvtColor(cv2.imread(sys.argv[1]), cv2.COLOR_BGR2RGB) if image is None: - raise RuntimeError("Failed to read the image") + error_message = f"Failed to read the image: {sys.argv[1]}" + raise RuntimeError(error_message) # Create Image Classification model using mode name and download from Open Model Zoo efficientnet_b0 = ClassificationModel.create_model("efficientnet-b0-pytorch") @@ -37,7 +40,7 @@ def main(): # Instantiate from a local model (downloaded previously) ssd_mobilenet_fpn_local = DetectionModel.create_model( - "tmp/public/ssd_mobilenet_v1_fpn_coco/FP16/ssd_mobilenet_v1_fpn_coco.xml" + "tmp/public/ssd_mobilenet_v1_fpn_coco/FP16/ssd_mobilenet_v1_fpn_coco.xml", ) detections = ssd_mobilenet_fpn_local(image) print(f"Detection results for local: {detections}") diff --git a/examples/visual_prompting/run.py b/examples/visual_prompting/run.py index 793e1504..af0ee01f 100644 --- a/examples/visual_prompting/run.py +++ b/examples/visual_prompting/run.py @@ -6,15 +6,17 @@ import argparse import colorsys +from itertools import starmap import cv2 import numpy as np + from model_api.models import Model, Prompt, SAMVisualPrompter def get_colors(n: int): HSV_tuples = [(x / n, 0.5, 0.5) for x in range(n)] - RGB_tuples = map(lambda x: colorsys.hsv_to_rgb(*x), HSV_tuples) + RGB_tuples = starmap(colorsys.hsv_to_rgb, HSV_tuples) return (np.array(list(RGB_tuples)) * 255).astype(np.uint8) @@ -28,7 +30,8 @@ def main(): image = cv2.cvtColor(cv2.imread(args.image), cv2.COLOR_BGR2RGB) if image is None: - raise RuntimeError("Failed to read the image") + error_message = f"Failed to read the image: {args.image}" + raise RuntimeError(error_message) encoder = Model.create_model(args.encoder_path) decoder = Model.create_model(args.decoder_path) diff --git a/examples/zsl_visual_prompting/run.py b/examples/zsl_visual_prompting/run.py index 439c1765..db7319e8 100644 --- a/examples/zsl_visual_prompting/run.py +++ b/examples/zsl_visual_prompting/run.py @@ -6,15 +6,17 @@ import argparse import colorsys +from itertools import starmap import cv2 import numpy as np + from model_api.models import Model, Prompt, SAMLearnableVisualPrompter def get_colors(n: int): HSV_tuples = [(x / n, 0.5, 0.5) for x in range(n)] - RGB_tuples = map(lambda x: colorsys.hsv_to_rgb(*x), HSV_tuples) + RGB_tuples = starmap(colorsys.hsv_to_rgb, HSV_tuples) return (np.array(list(RGB_tuples)) * 255).astype(np.uint8) @@ -30,16 +32,20 @@ def main(): image = cv2.cvtColor(cv2.imread(args.image_source), cv2.COLOR_BGR2RGB) if image is None: - raise RuntimeError("Failed to read the source image") + error_message = f"Failed to read the source image: {args.image_source}" + raise RuntimeError(error_message) image_target = cv2.cvtColor(cv2.imread(args.image_target), cv2.COLOR_BGR2RGB) if image_target is None: - raise RuntimeError("Failed to read the target image") + error_message = f"Failed to read the target image: {args.image_target}" + raise RuntimeError(error_message) encoder = Model.create_model(args.encoder_path) decoder = Model.create_model(args.decoder_path) zsl_sam_prompter = SAMLearnableVisualPrompter( - encoder, decoder, threshold=args.threshold + encoder, + decoder, + threshold=args.threshold, ) all_prompts = [] @@ -61,7 +67,11 @@ def main(): image_target = cv2.addWeighted(image_target, 0.2, masked_img, 0.8, 0) print(f"Reference point: {prompt_point}, point score: {confidence:.3f}") cv2.circle( - image_target, prompt_point, radius=0, color=(0, 0, 255), thickness=5 + image_target, + prompt_point, + radius=0, + color=(0, 0, 255), + thickness=5, ) cv2.imwrite("zsl_sam_result.jpg", cv2.cvtColor(image_target, cv2.COLOR_RGB2BGR)) diff --git a/pyproject.toml b/pyproject.toml index 6ae3e443..f429177a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -217,9 +217,7 @@ exclude = [ "buck-out", "build", "dist", - "examples", "node_modules", - "tests", "venv", ] diff --git a/tests/accuracy/conftest.py b/tests/accuracy/conftest.py index 1afca4bc..e694972e 100644 --- a/tests/accuracy/conftest.py +++ b/tests/accuracy/conftest.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 # import json +from pathlib import Path import pytest @@ -28,5 +29,5 @@ def pytest_runtest_makereport(item, call): if result.when == "call": test_results = item.config.test_results - with open("test_scope.json", "w") as outfile: + with Path("test_scope.json").open("w") as outfile: json.dump(test_results, outfile, indent=4) diff --git a/tests/accuracy/prepare_data.py b/tests/accuracy/prepare_data.py index 275361ca..6b4da535 100644 --- a/tests/accuracy/prepare_data.py +++ b/tests/accuracy/prepare_data.py @@ -18,15 +18,15 @@ async def download_images(data_dir): with ZipFile(BytesIO(archive.content)) as zfile: zfile.extractall(data_dir) image = await client.get( - "https://raw.githubusercontent.com/Shenggan/BCCD_Dataset/master/BCCD/JPEGImages/BloodImage_00007.jpg" + "https://raw.githubusercontent.com/Shenggan/BCCD_Dataset/master/BCCD/JPEGImages/BloodImage_00007.jpg", ) - with open(data_dir / "BloodImage_00007.jpg", "wb") as im: + with Path(data_dir / "BloodImage_00007.jpg").open("wb") as im: im.write(image.content) async def stream_file(client, url, filename): async with client.stream("GET", url) as stream: - with open(filename, "wb") as file: + with Path(filename).open("wb") as file: async for data in stream.aiter_bytes(): file.write(data) @@ -74,11 +74,17 @@ async def main(): download_otx_model(client, otx_models_dir, "mlc_efficient_v2s_voc"), download_otx_model(client, otx_models_dir, "det_mobilenetv2_atss_bccd"), download_otx_model( - client, otx_models_dir, "det_mobilenetv2_atss_bccd_onnx", "onnx" + client, + otx_models_dir, + "det_mobilenetv2_atss_bccd_onnx", + "onnx", ), download_otx_model(client, otx_models_dir, "cls_mobilenetv3_large_cars"), download_otx_model( - client, otx_models_dir, "cls_mobilenetv3_large_cars", "onnx" + client, + otx_models_dir, + "cls_mobilenetv3_large_cars", + "onnx", ), download_otx_model(client, otx_models_dir, "cls_efficient_b0_cars"), download_otx_model(client, otx_models_dir, "cls_efficient_v2s_cars"), @@ -88,7 +94,9 @@ async def main(): download_otx_model(client, otx_models_dir, "Lite-hrnet-s_mod2", "onnx"), download_otx_model(client, otx_models_dir, "Lite-hrnet-x-mod3"), download_otx_model( - client, otx_models_dir, "is_efficientnetb2b_maskrcnn_coco_reduced" + client, + otx_models_dir, + "is_efficientnetb2b_maskrcnn_coco_reduced", ), download_otx_model( client, @@ -97,15 +105,21 @@ async def main(): "onnx", ), download_otx_model( - client, otx_models_dir, "is_resnet50_maskrcnn_coco_reduced" + client, + otx_models_dir, + "is_resnet50_maskrcnn_coco_reduced", ), download_otx_model(client, otx_models_dir, "mobilenet_v3_large_hc_cf"), download_otx_model( - client, otx_models_dir, "classification_model_with_xai_head" + client, + otx_models_dir, + "classification_model_with_xai_head", ), download_otx_model(client, otx_models_dir, "detection_model_with_xai_head"), download_otx_model( - client, otx_models_dir, "segmentation_model_with_xai_head" + client, + otx_models_dir, + "segmentation_model_with_xai_head", ), download_otx_model(client, otx_models_dir, "maskrcnn_model_with_xai_head"), download_otx_model(client, otx_models_dir, "maskrcnn_xai_tiling"), @@ -114,7 +128,9 @@ async def main(): download_otx_model(client, otx_models_dir, "anomaly_stfpm_bottle_mvtec"), download_otx_model(client, otx_models_dir, "deit-tiny"), download_otx_model( - client, otx_models_dir, "cls_efficient_b0_shuffled_outputs" + client, + otx_models_dir, + "cls_efficient_b0_shuffled_outputs", ), download_otx_model(client, otx_models_dir, "action_cls_xd3_kinetic"), download_otx_model(client, otx_models_dir, "sam_vit_b_zsl_encoder"), diff --git a/tests/accuracy/test_YOLOv8.py b/tests/accuracy/test_YOLOv8.py index 2b1ae77e..70770e15 100644 --- a/tests/accuracy/test_YOLOv8.py +++ b/tests/accuracy/test_YOLOv8.py @@ -13,10 +13,11 @@ import pytest import torch import ultralytics -from model_api.models import YOLOv5 from ultralytics.data import utils from ultralytics.models import yolo +from model_api.models import YOLOv5 + def _init_predictor(yolo): yolo.predict(np.empty([1, 1, 3], np.uint8)) @@ -26,15 +27,17 @@ def _init_predictor(yolo): def _cached_alignment(pt): export_dir = Path( ultralytics.YOLO( - Path(os.environ["DATA"]) / "ultralytics" / pt, "detect" - ).export(format="openvino", half=True) + Path(os.environ["DATA"]) / "ultralytics" / pt, + "detect", + ).export(format="openvino", half=True), ) impl_wrapper = YOLOv5.create_model(export_dir / (pt.stem + ".xml"), device="CPU") ref_wrapper = ultralytics.YOLO(export_dir, "detect") ref_wrapper.overrides["imgsz"] = (impl_wrapper.w, impl_wrapper.h) _init_predictor(ref_wrapper) ref_wrapper.predictor.model.ov_compiled_model = ov.Core().compile_model( - ref_wrapper.predictor.model.ov_model, "CPU" + ref_wrapper.predictor.model.ov_model, + "CPU", ) ref_dir = export_dir / "ref" ref_dir.mkdir(exist_ok=True) @@ -60,9 +63,8 @@ def _impaths(): } ) if not impaths: - raise RuntimeError( - f"{Path(os.environ['DATA']) / 'coco128/images/train2017/'} is empty" - ) + error_message = f"{Path(os.environ['DATA']) / 'coco128/images/train2017/'} is empty" + raise RuntimeError(error_message) return impaths @@ -77,7 +79,7 @@ def test_alignment(impath, pt): pred_scores = impl_preds.scores.astype(np.float32) pred_labels = impl_preds.labels ref_predictions = ref_wrapper.predict(im) - assert 1 == len(ref_predictions) + assert len(ref_predictions) == 1 ref_boxes = ref_predictions[0].boxes.data.numpy() if 0 == len(pred_boxes) == len(ref_boxes): return # np.isclose() doesn't work for empty arrays @@ -86,7 +88,7 @@ def test_alignment(impath, pt): assert np.isclose(pred_boxes, ref_boxes[:, :4], 0, 1).all() assert np.isclose(pred_scores, ref_boxes[:, 4], 0.0, 0.02).all() assert (pred_labels == ref_boxes[:, 5]).all() - with open(ref_dir / impath.with_suffix(".txt").name, "w") as file: + with Path.open(ref_dir / impath.with_suffix(".txt").name, "w") as file: print(impl_preds, end="", file=file) @@ -108,20 +110,22 @@ def evaluate(self, wrapper): ) self.init_metrics( types.SimpleNamespace( - names={idx: label for idx, label in enumerate(wrapper.labels)} - ) + names=dict(enumerate(wrapper.labels)), + ), ) for batch in dataloader: im = cv2.imread(batch["im_file"][0]) result = wrapper(im) bboxes = torch.from_numpy(result.bboxes) / torch.tile( - torch.tensor([im.shape[1], im.shape[0]], dtype=torch.float32), (1, 2) + torch.tensor([im.shape[1], im.shape[0]], dtype=torch.float32), + (1, 2), ) scores = torch.from_numpy(result.scores) labels = torch.from_numpy(result.labels) pred = torch.cat( - [bboxes, scores[:, None], labels[:, None].float()], dim=1 + [bboxes, scores[:, None], labels[:, None].float()], + dim=1, ).unsqueeze(0) if not pred.numel(): pred = pred.view(1, 0, 6) @@ -130,7 +134,7 @@ def evaluate(self, wrapper): @pytest.mark.parametrize( - "pt,ref_mAP50_95", + ("pt", "ref_mAP50_95"), [ ( Path("yolov8n.pt"), @@ -146,13 +150,12 @@ def test_metric(pt, ref_mAP50_95): mAP50_95 = Metrics().evaluate( YOLOv5.create_model( ultralytics.YOLO( - Path(os.environ["DATA"]) / "ultralytics" / pt, "detect" + Path(os.environ["DATA"]) / "ultralytics" / pt, + "detect", ).export(format="openvino", half=True) / pt.with_suffix(".xml"), device="CPU", configuration={"confidence_threshold": 0.001}, - ) + ), )["metrics/mAP50-95(B)"] - assert ( - abs(mAP50_95 - ref_mAP50_95) <= 0.01 * ref_mAP50_95 or mAP50_95 >= ref_mAP50_95 - ) + assert abs(mAP50_95 - ref_mAP50_95) <= 0.01 * ref_mAP50_95 or mAP50_95 >= ref_mAP50_95 diff --git a/tests/accuracy/test_accuracy.py b/tests/accuracy/test_accuracy.py index 98af9f72..39b7e37e 100644 --- a/tests/accuracy/test_accuracy.py +++ b/tests/accuracy/test_accuracy.py @@ -2,6 +2,8 @@ # Copyright (C) 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 # +import ast +import contextlib import json import os from pathlib import Path @@ -49,34 +51,52 @@ SemanticSegmentationTiler, ) +# Mapping of model type strings to actual classes for security +MODEL_TYPE_MAPPING = { + "ActionClassificationModel": ActionClassificationModel, + "AnomalyDetection": AnomalyDetection, + "ClassificationModel": ClassificationModel, + "DetectionModel": DetectionModel, + "ImageModel": ImageModel, + "KeypointDetectionModel": KeypointDetectionModel, + "MaskRCNNModel": MaskRCNNModel, + "SAMDecoder": SAMDecoder, + "SAMImageEncoder": SAMImageEncoder, + "SAMLearnableVisualPrompter": SAMLearnableVisualPrompter, + "SAMVisualPrompter": SAMVisualPrompter, + "SegmentationModel": SegmentationModel, + # Tiler classes + "DetectionTiler": DetectionTiler, + "InstanceSegmentationTiler": InstanceSegmentationTiler, + "SemanticSegmentationTiler": SemanticSegmentationTiler, +} -def read_config(path: Path): - with open(path, "r") as f: + +def read_config(fname): + with fname.open("r") as f: return json.load(f) def create_models(model_type, model_path, download_dir, force_onnx_adapter=False): if model_path.endswith(".onnx") and force_onnx_adapter: wrapper_type = model_type.get_model_class( - load_parameters_from_onnx(onnx.load(model_path))["model_info"]["model_type"] + load_parameters_from_onnx(onnx.load(model_path))["model_info"]["model_type"], ) model = wrapper_type( ONNXRuntimeAdapter( - model_path, ort_options={"providers": ["CPUExecutionProvider"]} - ) + model_path, + ort_options={"providers": ["CPUExecutionProvider"]}, + ), ) model.load() return [model] models = [ - model_type.create_model(model_path, device="CPU", download_dir=download_dir) + model_type.create_model(model_path, device="CPU", download_dir=download_dir), ] if model_path.endswith(".xml"): wrapper_type = model_type.get_model_class( - create_core() - .read_model(model_path) - .get_rt_info(["model_info", "model_type"]) - .astype(str) + create_core().read_model(model_path).get_rt_info(["model_info", "model_type"]).astype(str), ) model = wrapper_type(OpenvinoAdapter(create_core(), model_path, device="CPU")) model.load() @@ -100,41 +120,53 @@ def result(pytestconfig): @pytest.mark.parametrize( - ("model_data"), read_config(Path(__file__).resolve().parent / "public_scope.json") + ("model_data"), + read_config(Path(__file__).resolve().parent / "public_scope.json"), ) -def test_image_models(data, dump, result, model_data): +def test_image_models(data, dump, result, model_data): # noqa: C901 name = model_data["name"] - if name.endswith(".xml") or name.endswith(".onnx"): + if name.endswith((".xml", ".onnx")): name = f"{data}/{name}" for model in create_models( - eval(model_data["type"]), name, data, model_data.get("force_ort", False) + MODEL_TYPE_MAPPING[model_data["type"]], + name, + data, + model_data.get("force_ort", False), ): if "tiler" in model_data: if "extra_model" in model_data: extra_adapter = OpenvinoAdapter( - create_core(), f"{data}/{model_data['extra_model']}", device="CPU" + create_core(), + f"{data}/{model_data['extra_model']}", + device="CPU", ) - extra_model = eval(model_data["extra_type"])( - extra_adapter, configuration={}, preload=True + extra_model = MODEL_TYPE_MAPPING[model_data["extra_type"]]( + extra_adapter, + configuration={}, + preload=True, ) - model = eval(model_data["tiler"])( + model = MODEL_TYPE_MAPPING[model_data["tiler"]]( model, configuration={}, tile_classifier_model=extra_model, ) else: - model = eval(model_data["tiler"])(model, configuration={}) + model = MODEL_TYPE_MAPPING[model_data["tiler"]](model, configuration={}) elif "prompter" in model_data: encoder_adapter = OpenvinoAdapter( - create_core(), f"{data}/{model_data['encoder']}", device="CPU" + create_core(), + f"{data}/{model_data['encoder']}", + device="CPU", ) - encoder_model = eval(model_data["encoder_type"])( - encoder_adapter, configuration={}, preload=True + encoder_model = MODEL_TYPE_MAPPING[model_data["encoder_type"]]( + encoder_adapter, + configuration={}, + preload=True, ) - model = eval(model_data["prompter"])(encoder_model, model) + model = MODEL_TYPE_MAPPING[model_data["prompter"]](encoder_model, model) if dump: result.append(model_data) @@ -144,9 +176,10 @@ def test_image_models(data, dump, result, model_data): image_path = Path(data) / test_data["image"] image = cv2.imread(str(image_path)) if image is None: - raise RuntimeError("Failed to read the image") + error_message = f"Failed to read the image at {image_path}" + raise RuntimeError(error_message) if "input_res" in model_data: - image = cv2.resize(image, eval(model_data["input_res"])) + image = cv2.resize(image, ast.literal_eval(model_data["input_res"])) if isinstance(model, ActionClassificationModel): image = np.stack([image for _ in range(8)]) if "prompter" in model_data: @@ -157,7 +190,7 @@ def test_image_models(data, dump, result, model_data): Prompt( np.array([image.shape[0] / 2, image.shape[1] / 2]), 0, - ) + ), ], polygons=[ Prompt( @@ -166,10 +199,10 @@ def test_image_models(data, dump, result, model_data): [image.shape[0] / 4, image.shape[1] / 4], [image.shape[0] / 4, image.shape[1] / 2], [image.shape[0] / 2, image.shape[1] / 2], - ] + ], ), 1, - ) + ), ], ) outputs = model(image) @@ -180,23 +213,18 @@ def test_image_models(data, dump, result, model_data): Prompt( np.array([image.shape[0] / 2, image.shape[1] / 2]), 0, - ) + ), ], ) else: outputs = model(image) - if isinstance(outputs, ClassificationResult): - assert 1 == len(test_data["reference"]) - output_str = str(outputs) - assert test_data["reference"][0] == output_str - image_result = [output_str] - elif type(outputs) is DetectionResult: - assert 1 == len(test_data["reference"]) + if isinstance(outputs, ClassificationResult) or type(outputs) is DetectionResult: + assert len(test_data["reference"]) == 1 output_str = str(outputs) assert test_data["reference"][0] == output_str image_result = [output_str] elif isinstance(outputs, ImageResultWithSoftPrediction): - assert 1 == len(test_data["reference"]) + assert len(test_data["reference"]) == 1 if hasattr(model, "get_contours"): contours = model.get_contours(outputs) else: @@ -208,42 +236,30 @@ def test_image_models(data, dump, result, model_data): assert test_data["reference"][0] == output_str image_result = [output_str] elif type(outputs) is InstanceSegmentationResult: - assert 1 == len(test_data["reference"]) + assert len(test_data["reference"]) == 1 output_str = str(add_rotated_rects(outputs)) + "; " - try: + with contextlib.suppress(RuntimeError): # getContours() assumes each instance generates only one contour. # That doesn't hold for some models - output_str += ( - "; ".join(str(contour) for contour in get_contours(outputs)) - + "; " - ) - except RuntimeError: - pass + output_str += "; ".join(str(contour) for contour in get_contours(outputs)) + "; " assert test_data["reference"][0] == output_str image_result = [output_str] elif isinstance(outputs, AnomalyResult): - assert 1 == len(test_data["reference"]) - output_str = str(outputs) - assert test_data["reference"][0] == output_str - image_result = [output_str] - elif isinstance(outputs, (ZSLVisualPromptingResult, VisualPromptingResult)): + assert len(test_data["reference"]) == 1 output_str = str(outputs) assert test_data["reference"][0] == output_str image_result = [output_str] - elif isinstance(outputs, DetectedKeypoints): + elif isinstance(outputs, (ZSLVisualPromptingResult, VisualPromptingResult, DetectedKeypoints)): output_str = str(outputs) assert test_data["reference"][0] == output_str image_result = [output_str] else: - assert False + pytest.fail(f"Unexpected output type: {type(outputs)}") if dump: inference_results.append( - {"image": test_data["image"], "reference": image_result} + {"image": test_data["image"], "reference": image_result}, ) - if name.endswith(".xml"): - save_name = os.path.basename(name) - else: - save_name = name + ".xml" + save_name = Path(name).name if name.endswith(".xml") else name + ".xml" if not model_data.get("force_ort", False): if "tiler" in model_data: diff --git a/tests/functional/test_save.py b/tests/functional/test_save.py index 754d3310..d14ba826 100644 --- a/tests/functional/test_save.py +++ b/tests/functional/test_save.py @@ -7,9 +7,9 @@ import onnx -from model_api.models import Model from model_api.adapters import ONNXRuntimeAdapter from model_api.adapters.utils import load_parameters_from_onnx +from model_api.models import Model def test_detector_save(tmp_path, data): @@ -20,11 +20,7 @@ def test_detector_save(tmp_path, data): detector.save(xml_path) deserialized = Model.create_model(xml_path) - assert ( - deserialized.get_model() - .get_rt_info(["model_info", "embedded_processing"]) - .astype(bool) - ) + assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(detector) is type(deserialized) for attr in detector.parameters(): assert getattr(detector, attr) == getattr(deserialized, attr) @@ -38,11 +34,7 @@ def test_classifier_save(tmp_path, data): classifier.save(xml_path) deserialized = Model.create_model(xml_path) - assert ( - deserialized.get_model() - .get_rt_info(["model_info", "embedded_processing"]) - .astype(bool) - ) + assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(classifier) is type(deserialized) for attr in classifier.parameters(): assert getattr(classifier, attr) == getattr(deserialized, attr) @@ -56,11 +48,7 @@ def test_segmentor_save(tmp_path, data): segmenter.save(xml_path) deserialized = Model.create_model(xml_path) - assert ( - deserialized.get_model() - .get_rt_info(["model_info", "embedded_processing"]) - .astype(bool) - ) + assert deserialized.get_model().get_rt_info(["model_info", "embedded_processing"]).astype(bool) assert type(segmenter) is type(deserialized) for attr in segmenter.parameters(): assert getattr(segmenter, attr) == getattr(deserialized, attr) @@ -78,12 +66,7 @@ def test_onnx_save(tmp_path, data): cls_model.save(onnx_path) deserialized = Model.create_model(onnx_path) - assert ( - load_parameters_from_onnx(onnx.load(onnx_path))["model_info"][ - "embedded_processing" - ] - == "True" - ) + assert load_parameters_from_onnx(onnx.load(onnx_path))["model_info"]["embedded_processing"] == "True" assert type(cls_model) is type(deserialized) for attr in cls_model.parameters(): assert getattr(cls_model, attr) == getattr(deserialized, attr) diff --git a/tests/precommit/prepare_data.py b/tests/precommit/prepare_data.py index bdb1a8dc..b0e78966 100644 --- a/tests/precommit/prepare_data.py +++ b/tests/precommit/prepare_data.py @@ -1,3 +1,7 @@ +# +# Copyright (C) 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# import argparse import json import os @@ -6,17 +10,17 @@ def retrieve_otx_model(data_dir, model_name, format="xml"): - destination_folder = os.path.join(data_dir, "otx_models") - os.makedirs(destination_folder, exist_ok=True) + destination_folder = Path(data_dir) / "otx_models" + destination_folder.mkdir(parents=True, exist_ok=True) if format == "onnx": urlretrieve( f"https://storage.openvinotoolkit.org/repositories/model_api/test/otx_models/{model_name}/model.onnx", - f"{destination_folder}/{model_name}.onnx", + destination_folder / f"{model_name}.onnx", ) else: urlretrieve( f"https://storage.openvinotoolkit.org/repositories/model_api/test/otx_models/{model_name}/openvino.xml", - f"{destination_folder}/{model_name}.xml", + destination_folder / f"{model_name}.xml", ) urlretrieve( f"https://storage.openvinotoolkit.org/repositories/model_api/test/otx_models/{model_name}/openvino.bin", @@ -32,13 +36,20 @@ def prepare_model( # flake8: noqa: F401 from model_api.models import ClassificationModel, DetectionModel, SegmentationModel - with open(public_scope, "r") as f: + # Mapping of model type strings to actual classes for security + MODEL_TYPE_MAPPING = { + "ClassificationModel": ClassificationModel, + "DetectionModel": DetectionModel, + "SegmentationModel": SegmentationModel, + } + + with Path(public_scope).open("r") as f: public_scope = json.load(f) for model in public_scope: if model["name"].endswith(".xml") or model["name"].endswith(".onnx"): continue - model = eval(model["type"]).create_model(model["name"], download_dir=data_dir) + model = MODEL_TYPE_MAPPING[model["type"]].create_model(model["name"], download_dir=data_dir) def prepare_data(data_dir="./data"): @@ -47,13 +58,12 @@ def prepare_data(data_dir="./data"): COCO128_URL = "https://ultralytics.com/assets/coco128.zip" - with urlopen(COCO128_URL) as zipresp: - with ZipFile(BytesIO(zipresp.read())) as zfile: - zfile.extractall(data_dir) + with urlopen(COCO128_URL) as zipresp, ZipFile(BytesIO(zipresp.read())) as zfile: # noqa: S310 + zfile.extractall(data_dir) urlretrieve( "https://raw.githubusercontent.com/Shenggan/BCCD_Dataset/master/BCCD/JPEGImages/BloodImage_00007.jpg", - os.path.join(data_dir, "BloodImage_00007.jpg"), + Path(data_dir) / "BloodImage_00007.jpg", ) diff --git a/tests/unit/results/test_cls_result.py b/tests/unit/results/test_cls_result.py index 7110e4a1..f6b3120d 100644 --- a/tests/unit/results/test_cls_result.py +++ b/tests/unit/results/test_cls_result.py @@ -4,6 +4,7 @@ # import numpy as np + from model_api.models.result import ClassificationResult, Label diff --git a/tests/unit/results/test_det_result.py b/tests/unit/results/test_det_result.py index cec500e2..9ee7c236 100644 --- a/tests/unit/results/test_det_result.py +++ b/tests/unit/results/test_det_result.py @@ -4,13 +4,19 @@ # import numpy as np + from model_api.models.result import DetectionResult def test_cls_result(): tst_vector = np.array([1, 2, 3, 4], dtype=np.float32) det_result = DetectionResult( - tst_vector, tst_vector, tst_vector, ["a"], tst_vector, tst_vector + tst_vector, + tst_vector, + tst_vector, + ["a"], + tst_vector, + tst_vector, ) assert det_result.labels.dtype == np.int32 diff --git a/tests/unit/results/test_sseg_result.py b/tests/unit/results/test_sseg_result.py index 26d0f83d..73071243 100644 --- a/tests/unit/results/test_sseg_result.py +++ b/tests/unit/results/test_sseg_result.py @@ -4,6 +4,7 @@ # import numpy as np + from model_api.models.result import Contour diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index b0597a1e..195e9539 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -2,15 +2,16 @@ # Copyright (C) 2024-2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 # +import cv2 as cv import numpy as np import openvino as ov +import pytest +from openvino.preprocess import PrePostProcessor + from model_api.adapters.utils import ( resize_image_with_aspect, resize_image_with_aspect_ocv, ) -from openvino.preprocess import PrePostProcessor -import cv2 as cv -import pytest @pytest.mark.parametrize( @@ -33,13 +34,14 @@ def test_resize_image_with_aspect_ocv(img_shape): (model_h, model_w), "linear", pad_value, - ) + ), ) ppp.input().preprocess().convert_element_type(ov.Type.f32) ov_resize_image_with_aspect = ov.Core().compile_model(ppp.build(), "CPU") - img = np.random.randint(0, 255, size=img_shape, dtype=np.uint8) - ov_results = list(ov_resize_image_with_aspect(img[None]).values())[0][0] + rng = np.random.default_rng() + img = rng.integers(0, 255, size=img_shape, dtype=np.uint8) + ov_results = next(iter(ov_resize_image_with_aspect(img[None]).values()))[0] np_results = resize_image_with_aspect_ocv(img, (model_w, model_h)) diff --git a/tests/unit/visualizer/test_primitive.py b/tests/unit/visualizer/test_primitive.py index 0c2e028c..ef965658 100644 --- a/tests/unit/visualizer/test_primitive.py +++ b/tests/unit/visualizer/test_primitive.py @@ -59,8 +59,7 @@ def test_polygon(mock_image: PIL.Image): assert polygon.compute(mock_image) == expected_image with pytest.raises(ValueError, match="No contours found in the mask."): - polygon = Polygon(mask=np.zeros((100, 100), dtype=np.uint8)) - polygon.compute(mock_image) + Polygon(mask=np.zeros((100, 100), dtype=np.uint8)).compute(mock_image) def test_label(mock_image: PIL.Image): diff --git a/tests/unit/visualizer/test_scene.py b/tests/unit/visualizer/test_scene.py index c15ea62a..891007c0 100644 --- a/tests/unit/visualizer/test_scene.py +++ b/tests/unit/visualizer/test_scene.py @@ -6,8 +6,8 @@ from pathlib import Path import numpy as np -from PIL import Image import pytest +from PIL import Image from model_api.models.result import ( AnomalyResult, @@ -53,7 +53,9 @@ def test_classification_scene(mock_image: Image, tmpdir: Path): ) visualizer = Visualizer() visualizer.save( - mock_image, classification_result, tmpdir / "classification_scene.jpg" + mock_image, + classification_result, + tmpdir / "classification_scene.jpg", ) assert Path(tmpdir / "classification_scene.jpg").exists() @@ -83,13 +85,11 @@ def test_segmentation_scene(mock_image: Image, tmpdir: Path, with_saliency_map: masks=np.array( [ np.ones((128, 128), dtype=np.uint8), - ] + ], ), scores=np.array([0.85, 0.75]), label_names=["person", "car"], - saliency_map=[np.ones((128, 128), dtype=np.uint8) * 255] - if with_saliency_map - else None, + saliency_map=[np.ones((128, 128), dtype=np.uint8) * 255] if with_saliency_map else None, feature_vector=np.array([1, 2, 3, 4]), ) @@ -103,18 +103,20 @@ def test_segmentation_scene(mock_image: Image, tmpdir: Path, with_saliency_map: # Test ImageResultWithSoftPrediction soft_prediction_result = ImageResultWithSoftPrediction( resultImage=np.array( - [[0, 1, 2], [1, 2, 0], [2, 0, 1]], dtype=np.uint8 + [[0, 1, 2], [1, 2, 0], [2, 0, 1]], + dtype=np.uint8, ), # 3x3 test image with 3 classes soft_prediction=np.ones( - (3, 3, 3), dtype=np.float32 + (3, 3, 3), + dtype=np.float32, ), # 3 classes, 3x3 prediction - saliency_map=np.ones((3, 3), dtype=np.uint8) * 255 - if with_saliency_map - else None, + saliency_map=np.ones((3, 3), dtype=np.uint8) * 255 if with_saliency_map else None, feature_vector=np.array([1, 2, 3, 4]), ) visualizer.save( - mock_image, soft_prediction_result, tmpdir / "soft_prediction_scene.jpg" + mock_image, + soft_prediction_result, + tmpdir / "soft_prediction_scene.jpg", ) assert Path(tmpdir / "soft_prediction_scene.jpg").exists()