From e46b4caf7fab85b05aebc38bc34cfbafedec5c60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Sat, 31 May 2025 12:17:43 -0500 Subject: [PATCH 1/6] Add unit and integration tests for all modules --- tests/__init__.py | 0 .../controllers/test_lti_dd_mpc_params.yaml | 21 + .../test_nonlinear_dd_mpc_params.yaml | 23 + tests/config/models/test_lti_model.yaml | 19 + tests/conftest.py | 129 +++++ tests/mocks.py | 96 ++++ tests/test_controllers/conftest.py | 92 ++++ .../test_lti_dd_mpc_controller_unit.py | 275 +++++++++++ .../test_nonlinear_dd_mpc_controller_unit.py | 234 +++++++++ tests/test_lti_dd_mpc_integration.py | 200 ++++++++ tests/test_nonlinear_dd_mpc_integration.py | 202 ++++++++ tests/test_placeholder.py | 2 - tests/test_utilities/conftest.py | 54 ++ tests/test_utilities/controller/conftest.py | 70 +++ .../controller/test_controller_creation.py | 123 +++++ .../controller/test_controller_params.py | 267 ++++++++++ .../controller/test_data_driven_mpc_sim.py | 117 +++++ .../test_initial_data_generation.py | 66 +++ tests/test_utilities/models/test_lti_model.py | 151 ++++++ .../models/test_nonlinear_model.py | 55 +++ .../test_utilities/test_data_visualization.py | 463 ++++++++++++++++++ tests/test_utilities/test_hankel_matrix.py | 55 +++ .../test_initial_state_estimation.py | 114 +++++ .../test_yaml_config_loading.py | 39 ++ 24 files changed, 2865 insertions(+), 2 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/config/controllers/test_lti_dd_mpc_params.yaml create mode 100644 tests/config/controllers/test_nonlinear_dd_mpc_params.yaml create mode 100644 tests/config/models/test_lti_model.yaml create mode 100644 tests/conftest.py create mode 100644 tests/mocks.py create mode 100644 tests/test_controllers/conftest.py create mode 100644 tests/test_controllers/test_lti_dd_mpc_controller_unit.py create mode 100644 tests/test_controllers/test_nonlinear_dd_mpc_controller_unit.py create mode 100644 tests/test_lti_dd_mpc_integration.py create mode 100644 tests/test_nonlinear_dd_mpc_integration.py delete mode 100644 tests/test_placeholder.py create mode 100644 tests/test_utilities/conftest.py create mode 100644 tests/test_utilities/controller/conftest.py create mode 100644 tests/test_utilities/controller/test_controller_creation.py create mode 100644 tests/test_utilities/controller/test_controller_params.py create mode 100644 tests/test_utilities/controller/test_data_driven_mpc_sim.py create mode 100644 tests/test_utilities/controller/test_initial_data_generation.py create mode 100644 tests/test_utilities/models/test_lti_model.py create mode 100644 tests/test_utilities/models/test_nonlinear_model.py create mode 100644 tests/test_utilities/test_data_visualization.py create mode 100644 tests/test_utilities/test_hankel_matrix.py create mode 100644 tests/test_utilities/test_initial_state_estimation.py create mode 100644 tests/test_utilities/test_yaml_config_loading.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/config/controllers/test_lti_dd_mpc_params.yaml b/tests/config/controllers/test_lti_dd_mpc_params.yaml new file mode 100644 index 0000000..b632908 --- /dev/null +++ b/tests/config/controllers/test_lti_dd_mpc_params.yaml @@ -0,0 +1,21 @@ +--- +test_lti_dd_mpc_params: + n: 4 + N: 400 + L: 30 + Q_weights: 3 + R_weights: 1.0e-4 + epsilon_bar: 0.002 + lambda_sigma: 1000 + lambda_alpha_epsilon_bar: 0.1 + U: + - [-5, 5] + - [-5, 5] + u_d_range: + - [-1, 1] + - [-1, 1] + slack_var_constraint_type: 0 + controller_type: 1 + u_s: [1, 1] + y_s: [0.65, 0.77] + n_n_mpc_step: true diff --git a/tests/config/controllers/test_nonlinear_dd_mpc_params.yaml b/tests/config/controllers/test_nonlinear_dd_mpc_params.yaml new file mode 100644 index 0000000..fd5731e --- /dev/null +++ b/tests/config/controllers/test_nonlinear_dd_mpc_params.yaml @@ -0,0 +1,23 @@ +--- +test_nonlinear_dd_mpc_params: + n: 4 + N: 50 + L: 10 + Q_weights: 1 + R_weights: 1 + S_weights: 10 + lambda_alpha: 10.0 + lambda_sigma: 1.0e7 + U: + - [-10.0, 10.0] + Us: + - [-9.9, 9.9] + u_range: + - [-1.0, 1.0] + alpha_reg_type: 0 + lamb_alpha_s: 1e-2 + lamb_sigma_s: 1.0e7 + y_r: [3.0] + ext_out_incr_in: true + update_cost_threshold: null + n_n_mpc_step: true diff --git a/tests/config/models/test_lti_model.yaml b/tests/config/models/test_lti_model.yaml new file mode 100644 index 0000000..cfa88b2 --- /dev/null +++ b/tests/config/models/test_lti_model.yaml @@ -0,0 +1,19 @@ +--- +test_lti_model: + A: + - [0.921, 0, 0.041, 0] + - [0, 0.918, 0, 0.033] + - [0, 0, 0.924, 0] + - [0, 0, 0, 0.937] + B: + - [0.017, 0.001] + - [0.001, 0.023] + - [0, 0.061] + - [0.072, 0] + C: + - [1, 0, 0, 0] + - [0, 1, 0, 0] + D: + - [0, 0] + - [0, 0] + eps_max: 0.002 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3f93787 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,129 @@ +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCType, + SlackVarConstraintType, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + AlphaRegType, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + LTIDataDrivenMPCParams, + NonlinearDataDrivenMPCParams, +) + +from .mocks import ( + MockLTIDDMPCController, + MockLTIModel, + MockNonlinearDDMPCController, + MockNonlinearModel, +) + + +@pytest.fixture +def mock_lti_model() -> MockLTIModel: + return MockLTIModel() + + +@pytest.fixture +def mock_nonlinear_model() -> MockNonlinearModel: + return MockNonlinearModel() + + +@pytest.fixture +def mock_lti_controller() -> MockLTIDDMPCController: + return MockLTIDDMPCController() + + +@pytest.fixture +def mock_nonlinear_controller() -> MockNonlinearDDMPCController: + return MockNonlinearDDMPCController() + + +@pytest.fixture +def dummy_lti_controller_data() -> tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray +]: + n = 2 + m = 2 + p = 3 + L = 4 + + # Compute minimum required N for persistent excitation + N_min = m * (L + 2 * n) + L + 2 * n - 1 + u_range = np.array([[-1, 1]] * m) + + lti_dd_mpc_params: LTIDataDrivenMPCParams = { + "n": n, + "N": N_min, + "L": L, + "Q": np.eye(p * L), + "R": np.eye(m * L), + "eps_max": 0.01, + "lamb_alpha": 0.01, + "lamb_sigma": 0.01, + "c": 1.0, + "U": np.array([[-1.0, 1.0]] * m), + "u_range": u_range, + "slack_var_constraint_type": SlackVarConstraintType.CONVEX, + "controller_type": LTIDataDrivenMPCType.NOMINAL, + "n_mpc_step": 1, + "u_s": np.zeros((m, 1)), + "y_s": np.ones((p, 1)), + } + + # Generate randomized initial input-output data + np_random = np.random.default_rng(0) + u_d = np_random.uniform(u_range[:, 0], u_range[:, 1], (N_min, m)) + y_d = np_random.uniform(-1.0, 1.0, (N_min, p)) + # Note: + # For simplicity, dummy `u_range` defines the same input bounds for every + # input. This is not always the case. + + return (lti_dd_mpc_params, u_d, y_d) + + +@pytest.fixture +def dummy_nonlinear_controller_data() -> tuple[ + NonlinearDataDrivenMPCParams, np.ndarray, np.ndarray +]: + n = 2 + m = 2 + p = 3 + L = 4 + + # Compute minimum required N for persistent excitation + N = 50 + u_range = np.array([[-1, 1]] * m) + + nonlinear_dd_mpc_params: NonlinearDataDrivenMPCParams = { + "n": n, + "N": N, + "L": L, + "Q": np.eye(p * (L + n + 1)), + "R": np.eye(m * (L + n + 1)), + "S": np.eye(p), + "lamb_alpha": 0.01, + "lamb_sigma": 0.01, + "U": np.array([[-1.0, 1.0]] * m), + "Us": np.array([[-0.9, 0.9]] * m), + "u_range": u_range, + "alpha_reg_type": AlphaRegType.APPROXIMATED, + "lamb_alpha_s": 0.01, + "lamb_sigma_s": 0.01, + "y_r": np.ones((p, 1)), + "ext_out_incr_in": False, + "update_cost_threshold": None, + "n_mpc_step": 1, + } + + # Generate randomized initial input-output data + np_random = np.random.default_rng(0) + u = np_random.uniform(u_range[:, 0], u_range[:, 1], (N, m)) + y = np_random.uniform(-1.0, 1.0, (N, p)) + # Note: + # For simplicity, dummy `u_range` defines the same input bounds for every + # input. This is not always the case. + + return (nonlinear_dd_mpc_params, u, y) diff --git a/tests/mocks.py b/tests/mocks.py new file mode 100644 index 0000000..85bda49 --- /dev/null +++ b/tests/mocks.py @@ -0,0 +1,96 @@ +"""Define mock classes and objects.""" + +import numpy as np + + +class MockLTIModel: + def __init__(self) -> None: + self.n = 3 + self.m = 2 + self.p = 3 + self.eps_max = 0.0 + self.x = np.zeros((self.n)) + + def simulate_step(self, u: np.ndarray, w: np.ndarray) -> np.ndarray: + return np.ones((self.p,)) + + def simulate(self, U: np.ndarray, W: np.ndarray, steps: int) -> np.ndarray: + return np.ones((steps, self.p)) + + def get_initial_state_from_trajectory( + self, U: np.ndarray, Y: np.ndarray + ) -> np.ndarray: + return np.zeros((self.n,)) + + def set_state(self, state: np.ndarray) -> None: + self.x = state + + +class MockNonlinearModel: + def __init__(self) -> None: + self.n = 3 + self.m = 2 + self.p = 3 + self.eps_max = 0.0 + self.x = np.zeros((self.n)) + + def simulate_step(self, u: np.ndarray, w: np.ndarray) -> np.ndarray: + return np.ones((self.p,)) + + def simulate(self, U: np.ndarray, W: np.ndarray, steps: int) -> np.ndarray: + return np.ones((steps, self.p)) + + +class MockLTIDDMPCController: + def __init__(self) -> None: + self.m = 2 + self.p = 3 + self.eps_max = 0.01 + self.u_s = np.zeros((self.m, 1)) + self.y_s = np.zeros((self.p, 1)) + self.n_mpc_step = 1 + + def update_and_solve_data_driven_mpc(self) -> None: + return + + def get_optimal_control_input_at_step(self, n_step: int) -> np.ndarray: + return np.ones((self.m,)) + + def store_input_output_measurement( + self, + u_current: np.ndarray, + y_current: np.ndarray, + ) -> None: + return + + def get_optimal_cost_value(self) -> float: + return 1.0 + + +class MockNonlinearDDMPCController: + def __init__(self) -> None: + self.m = 2 + self.p = 3 + self.eps_max = 0.01 + self.y_r = np.zeros((self.p, 1)) + self.n_mpc_step = 1 + + def update_and_solve_data_driven_mpc(self) -> None: + return + + def get_optimal_control_input_at_step(self, n_step: int) -> np.ndarray: + return np.ones((self.m,)) + + def get_du_value_at_step(self, n_step: int) -> np.ndarray: + return np.zeros((self.m,)) + + def store_input_output_measurement( + self, + u_current: np.ndarray, + y_current: np.ndarray, + du_current: np.ndarray, + ) -> None: + return + + def get_optimal_cost_value(self) -> float: + return 1.0 diff --git a/tests/test_controllers/conftest.py b/tests/test_controllers/conftest.py new file mode 100644 index 0000000..a104231 --- /dev/null +++ b/tests/test_controllers/conftest.py @@ -0,0 +1,92 @@ +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCController, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + NonlinearDataDrivenMPCController, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + LTIDataDrivenMPCParams, + NonlinearDataDrivenMPCParams, +) + + +@pytest.fixture +@patch.object(LTIDataDrivenMPCController, "initialize_data_driven_mpc") +def dummy_lti_controller( + _: Mock, + dummy_lti_controller_data: tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> LTIDataDrivenMPCController: + controller_params, u_d, y_d = dummy_lti_controller_data + m = u_d.shape[1] + p = y_d.shape[1] + + dummy_lti_controller = LTIDataDrivenMPCController( + n=controller_params["n"], + m=m, + p=p, + u_d=u_d, + y_d=y_d, + L=controller_params["L"], + Q=controller_params["Q"], + R=controller_params["R"], + u_s=controller_params["u_s"], + y_s=controller_params["y_s"], + eps_max=controller_params["eps_max"], + lamb_alpha=controller_params["lamb_alpha"], + lamb_sigma=controller_params["lamb_sigma"], + U=controller_params["U"], + c=controller_params["c"], + slack_var_constraint_type=( + controller_params["slack_var_constraint_type"] + ), + controller_type=controller_params["controller_type"], + n_mpc_step=controller_params["n_mpc_step"], + use_terminal_constraints=True, + ) + + return dummy_lti_controller + + +@pytest.fixture +@patch.object(LTIDataDrivenMPCController, "initialize_data_driven_mpc") +def dummy_nonlinear_controller( + _: Mock, + dummy_nonlinear_controller_data: tuple[ + NonlinearDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> NonlinearDataDrivenMPCController: + controller_params, u, y = dummy_nonlinear_controller_data + m = u.shape[1] + p = y.shape[1] + + dummy_nonlinear_controller = NonlinearDataDrivenMPCController( + n=controller_params["n"], + m=m, + p=p, + u=u, + y=y, + L=controller_params["L"], + Q=controller_params["Q"], + R=controller_params["R"], + S=controller_params["S"], + y_r=controller_params["y_r"], + lamb_alpha=controller_params["lamb_alpha"], + lamb_sigma=controller_params["lamb_sigma"], + U=controller_params["U"], + Us=controller_params["Us"], + alpha_reg_type=controller_params["alpha_reg_type"], + lamb_alpha_s=controller_params["lamb_alpha_s"], + lamb_sigma_s=controller_params["lamb_sigma_s"], + ext_out_incr_in=controller_params["ext_out_incr_in"], + update_cost_threshold=controller_params["update_cost_threshold"], + n_mpc_step=controller_params["n_mpc_step"], + ) + + return dummy_nonlinear_controller diff --git a/tests/test_controllers/test_lti_dd_mpc_controller_unit.py b/tests/test_controllers/test_lti_dd_mpc_controller_unit.py new file mode 100644 index 0000000..3e66791 --- /dev/null +++ b/tests/test_controllers/test_lti_dd_mpc_controller_unit.py @@ -0,0 +1,275 @@ +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCController, + LTIDataDrivenMPCType, + SlackVarConstraintType, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + LTIDataDrivenMPCParams, +) + + +@pytest.mark.parametrize("use_terminal_constraints", [True, False]) +@pytest.mark.parametrize( + "slack_var_constraint_type", + [SlackVarConstraintType.NONE, SlackVarConstraintType.CONVEX], +) +@pytest.mark.parametrize( + "controller_type", + [LTIDataDrivenMPCType.NOMINAL, LTIDataDrivenMPCType.ROBUST], +) +@patch.object(LTIDataDrivenMPCController, "get_optimal_control_input") +def test_lti_dd_mpc_controller_init( + mock_controller_get_optimal_input: Mock, + controller_type: LTIDataDrivenMPCType, + slack_var_constraint_type: SlackVarConstraintType, + use_terminal_constraints: bool, + dummy_lti_controller_data: tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> None: + # Define test parameters + controller_params, u_d, y_d = dummy_lti_controller_data + m = u_d.shape[1] + p = y_d.shape[1] + + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Test expected LTI data-driven MPC controller initialization + controller = LTIDataDrivenMPCController( + n=controller_params["n"], + m=m, + p=p, + u_d=u_d, + y_d=y_d, + L=controller_params["L"], + Q=controller_params["Q"], + R=controller_params["R"], + u_s=controller_params["u_s"], + y_s=controller_params["y_s"], + eps_max=controller_params["eps_max"], + lamb_alpha=controller_params["lamb_alpha"], + lamb_sigma=controller_params["lamb_sigma"], + U=controller_params["U"], + c=controller_params["c"], + slack_var_constraint_type=slack_var_constraint_type, + controller_type=controller_type, + n_mpc_step=controller_params["n_mpc_step"], + use_terminal_constraints=use_terminal_constraints, + ) + + # Verify controller instantiation with the MPC solution cost value + assert controller.get_optimal_cost_value() is not None + + +def test_lti_dd_mpc_controller_robust_non_convex( + dummy_lti_controller_data: tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> None: + # Define test parameters + controller_params, u_d, y_d = dummy_lti_controller_data + m = u_d.shape[1] + p = y_d.shape[1] + + # Verify that a `NotImplementedError` is raised for robust + # controllers with a non-convex slack variable constraint + with pytest.raises(NotImplementedError): + LTIDataDrivenMPCController( + n=controller_params["n"], + m=m, + p=p, + u_d=u_d, + y_d=y_d, + L=controller_params["L"], + Q=controller_params["Q"], + R=controller_params["R"], + u_s=controller_params["u_s"], + y_s=controller_params["y_s"], + eps_max=controller_params["eps_max"], + lamb_alpha=controller_params["lamb_alpha"], + lamb_sigma=controller_params["lamb_sigma"], + U=controller_params["U"], + c=controller_params["c"], + slack_var_constraint_type=SlackVarConstraintType.NON_CONVEX, + controller_type=LTIDataDrivenMPCType.ROBUST, + n_mpc_step=controller_params["n_mpc_step"], + use_terminal_constraints=True, + ) + + +@pytest.mark.parametrize( + "case_value, expected_error_match", + [ + # Case 1: Mismatched number of inputs + ("invalid_m", "should match the number of inputs"), + # Case 2: Insufficient input data length + ("short_input", "The required minimum N"), + # Case 3: Input data not persistently exciting + ("non_pers_exc_input", "rank of its induced Hankel"), + # Case 4a: Prediction horizon too short for NOMINAL controller + ("short_horizon_nominal", "prediction horizon"), + # Case 4b: Prediction horizon too short for ROBUST controller + ("short_horizon_robust", "prediction horizon"), + # Case 5a: Invalid Q matrix + ("invalid_Q", "Output weighting square matrix Q"), + # Case 5b: Invalid R matrix + ("invalid_R", "Input weighting square matrix R"), + ], +) +def test_lti_dd_mpc_controller_invalid_params( + case_value: str, + expected_error_match: str, + dummy_lti_controller_data: tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> None: + # Define test parameters + controller_params, u_d, y_d = dummy_lti_controller_data + m = u_d.shape[1] + p = y_d.shape[1] + L = controller_params["L"] + n = controller_params["n"] + + # Compute minimum required N for persistent excitation + N_min = m * (L + 2 * n) + L + 2 * n - 1 + + base_controller_kwargs = { + "n": n, + "m": m, + "p": p, + "u_d": u_d, + "y_d": y_d, + "L": L, + "Q": controller_params["Q"], + "R": controller_params["R"], + "u_s": controller_params["u_s"], + "y_s": controller_params["y_s"], + "eps_max": controller_params["eps_max"], + "lamb_alpha": controller_params["lamb_alpha"], + "lamb_sigma": controller_params["lamb_sigma"], + "U": controller_params["U"], + "c": controller_params["c"], + "slack_var_constraint_type": SlackVarConstraintType.CONVEX, + "controller_type": LTIDataDrivenMPCType.ROBUST, + "n_mpc_step": controller_params["n_mpc_step"], + "use_terminal_constraints": True, + } + + controller_kwargs = base_controller_kwargs.copy() + + # Override controller parameters based on the test case + if case_value == "invalid_m": + controller_kwargs["m"] = m + 1 + elif case_value == "short_input": + controller_kwargs["u_d"] = np.random.uniform(-1.0, 1.0, (N_min - 1, m)) + elif case_value == "non_pers_exc_input": + controller_kwargs["u_d"] = np.zeros((N_min, m)) + elif case_value == "short_horizon_nominal": + controller_kwargs["controller_type"] = LTIDataDrivenMPCType.NOMINAL + controller_kwargs["L"] = n - 1 + elif case_value == "short_horizon_robust": + controller_kwargs["controller_type"] = LTIDataDrivenMPCType.ROBUST + controller_kwargs["L"] = 2 * n - 1 + elif case_value == "invalid_Q": + controller_kwargs["Q"] = np.eye(p * L - 1) + elif case_value == "invalid_R": + controller_kwargs["R"] = np.eye(m * L - 1) + + # Run test + with pytest.raises(ValueError, match=expected_error_match): + LTIDataDrivenMPCController(**controller_kwargs) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(LTIDataDrivenMPCController, "get_optimal_control_input") +def test_lti_store_input_output_measurement( + mock_controller_get_optimal_input: Mock, + valid_dimensions: bool, + dummy_lti_controller: LTIDataDrivenMPCController, +) -> None: + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Get dummy LTI data-driven MPC controller + controller = dummy_lti_controller + + # Test input-output measurement storage + if valid_dimensions: + u_current = np.ones((controller.m, 1)) + y_current = np.ones((controller.p, 1)) + + controller.store_input_output_measurement(u_current, y_current) + + assert np.allclose(controller.u_past[-controller.m :], u_current) + assert np.allclose(controller.y_past[-controller.p :], y_current) + else: + u_current = np.ones((controller.m + 1, 1)) + y_current = np.ones((controller.p + 1, 1)) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.store_input_output_measurement(u_current, y_current) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(LTIDataDrivenMPCController, "get_optimal_control_input") +def test_lti_set_past_input_output_data( + mock_controller_get_optimal_input: Mock, + valid_dimensions: bool, + dummy_lti_controller: LTIDataDrivenMPCController, +) -> None: + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Get dummy LTI data-driven MPC controller + controller = dummy_lti_controller + + # Test past input-output data setting + if valid_dimensions: + u_past = np.zeros((controller.n * controller.m, 1)) + y_past = np.ones((controller.n * controller.p, 1)) + + controller.set_past_input_output_data(u_past, y_past) + + assert np.allclose(controller.u_past, u_past) + assert np.allclose(controller.y_past, y_past) + else: + u_past = np.zeros((controller.n * controller.m + 1, 1)) + y_past = np.ones((controller.n * controller.p + 1, 1)) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.set_past_input_output_data(u_past, y_past) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(LTIDataDrivenMPCController, "initialize_data_driven_mpc") +def test_lti_set_input_output_setpoints( + mock_controller_init: Mock, + valid_dimensions: bool, + dummy_lti_controller: LTIDataDrivenMPCController, +) -> None: + # Get dummy LTI data-driven MPC controller + controller = dummy_lti_controller + + # Test input-output setpoint setting + if valid_dimensions: + u_s = np.ones_like(controller.u_s) + y_s = np.ones_like(controller.y_s) + + controller.set_input_output_setpoints(u_s, y_s) + + assert np.allclose(controller.u_s, u_s) + assert np.allclose(controller.y_s, y_s) + + mock_controller_init.assert_called() + else: + u_s = np.ones((controller.u_s.shape[0] + 1, 1)) + y_s = np.ones((controller.y_s.shape[0] + 1, 1)) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.set_input_output_setpoints(u_s, y_s) diff --git a/tests/test_controllers/test_nonlinear_dd_mpc_controller_unit.py b/tests/test_controllers/test_nonlinear_dd_mpc_controller_unit.py new file mode 100644 index 0000000..273fed9 --- /dev/null +++ b/tests/test_controllers/test_nonlinear_dd_mpc_controller_unit.py @@ -0,0 +1,234 @@ +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + AlphaRegType, + NonlinearDataDrivenMPCController, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + NonlinearDataDrivenMPCParams, +) + + +@pytest.mark.parametrize("ext_out_incr_in", [True, False]) +@pytest.mark.parametrize( + "alpha_reg_type", + [ + AlphaRegType.APPROXIMATED, + AlphaRegType.PREVIOUS, + AlphaRegType.ZERO, + ], +) +@patch.object(NonlinearDataDrivenMPCController, "get_optimal_control_input") +def test_nonlinear_dd_mpc_controller_init( + mock_controller_get_optimal_input: Mock, + alpha_reg_type: AlphaRegType, + ext_out_incr_in: bool, + dummy_nonlinear_controller_data: tuple[ + NonlinearDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> None: + # Define test parameters + controller_params, u, y = dummy_nonlinear_controller_data + m = u.shape[1] + p = y.shape[1] + + # Override MPC matrices for `ext_out_incr_in = True` + if ext_out_incr_in: + L = controller_params["L"] + n = controller_params["n"] + controller_params["Q"] = np.eye((m + p) * (L + n + 1)) + + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Test expected LTI data-driven MPC controller initialization + controller = NonlinearDataDrivenMPCController( + n=controller_params["n"], + m=m, + p=p, + u=u, + y=y, + L=controller_params["L"], + Q=controller_params["Q"], + R=controller_params["R"], + S=controller_params["S"], + y_r=controller_params["y_r"], + lamb_alpha=controller_params["lamb_alpha"], + lamb_sigma=controller_params["lamb_sigma"], + U=controller_params["U"], + Us=controller_params["Us"], + alpha_reg_type=alpha_reg_type, + lamb_alpha_s=controller_params["lamb_alpha_s"], + lamb_sigma_s=controller_params["lamb_sigma_s"], + ext_out_incr_in=ext_out_incr_in, + update_cost_threshold=controller_params["update_cost_threshold"], + n_mpc_step=controller_params["n_mpc_step"], + ) + + # Verify controller instantiation with the MPC solution cost value + assert controller.get_optimal_cost_value() is not None + + +@pytest.mark.parametrize( + "case_value, expected_error_match", + [ + # Case 1: Mismatched number of inputs + ("invalid_m", "should match the number of inputs"), + # Case 2: Input data not persistently exciting + ("non_pers_exc_input", "rank of its induced Hankel"), + # Case 3: Prediction horizon too short + ("short_horizon", "prediction horizon"), + # Case 4a: Invalid Q matrix + ("invalid_Q", "Output weighting square matrix Q"), + # Case 4b: Invalid R matrix + ("invalid_R", "Input weighting square matrix R"), + # Case 4c: Invalid S matrix + ("invalid_S", "Output setpoint weighting square matrix S"), + ], +) +def test_nonlinear_dd_mpc_controller_invalid_params( + case_value: str, + expected_error_match: str, + dummy_nonlinear_controller_data: tuple[ + NonlinearDataDrivenMPCParams, np.ndarray, np.ndarray + ], +) -> None: + # Define test parameters + controller_params, u, y = dummy_nonlinear_controller_data + m = u.shape[1] + p = y.shape[1] + N = controller_params["N"] + L = controller_params["L"] + n = controller_params["n"] + + base_controller_kwargs = { + "n": n, + "m": m, + "p": p, + "u": u, + "y": y, + "L": L, + "Q": controller_params["Q"], + "R": controller_params["R"], + "S": controller_params["S"], + "y_r": controller_params["y_r"], + "lamb_alpha": controller_params["lamb_alpha"], + "lamb_sigma": controller_params["lamb_sigma"], + "U": controller_params["U"], + "Us": controller_params["Us"], + "alpha_reg_type": controller_params["alpha_reg_type"], + "lamb_alpha_s": controller_params["lamb_alpha_s"], + "lamb_sigma_s": controller_params["lamb_sigma_s"], + "ext_out_incr_in": controller_params["ext_out_incr_in"], + "update_cost_threshold": controller_params["update_cost_threshold"], + "n_mpc_step": controller_params["n_mpc_step"], + } + + controller_kwargs = base_controller_kwargs.copy() + + # Override controller parameters based on the test case + if case_value == "invalid_m": + controller_kwargs["m"] = m + 1 + elif case_value == "non_pers_exc_input": + controller_kwargs["u"] = np.zeros((N, m)) + elif case_value == "short_horizon": + controller_kwargs["L"] = n - 1 + elif case_value == "invalid_Q": + controller_kwargs["Q"] = np.eye(p * (L + n + 1) - 1) + elif case_value == "invalid_R": + controller_kwargs["R"] = np.eye(m * (L + n + 1) - 1) + elif case_value == "invalid_S": + controller_kwargs["S"] = np.eye(p + 1) + + # Run test + with pytest.raises(ValueError, match=expected_error_match): + NonlinearDataDrivenMPCController(**controller_kwargs) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(NonlinearDataDrivenMPCController, "get_optimal_control_input") +def test_nonlinear_store_input_output_measurement( + mock_controller_get_optimal_input: Mock, + valid_dimensions: bool, + dummy_nonlinear_controller: NonlinearDataDrivenMPCController, +) -> None: + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Get dummy nonlinear data-driven MPC controller + controller = dummy_nonlinear_controller + + # Test input-output measurement storage + if valid_dimensions: + u_current = np.ones((controller.m,)) + y_current = np.ones((controller.p,)) + + controller.store_input_output_measurement(u_current, y_current) + + assert np.allclose(controller.u[-1:], u_current) + assert np.allclose(controller.y[-1:], y_current) + else: + u_current = np.ones((controller.m + 1,)) + y_current = np.ones((controller.p + 1,)) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.store_input_output_measurement(u_current, y_current) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(NonlinearDataDrivenMPCController, "get_optimal_control_input") +def test_nonlinear_set_input_output_data( + mock_controller_get_optimal_input: Mock, + valid_dimensions: bool, + dummy_nonlinear_controller: NonlinearDataDrivenMPCController, +) -> None: + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.ones((1,)) + + # Get dummy nonlinear data-driven MPC controller + controller = dummy_nonlinear_controller + + # Test input-output data setting + if valid_dimensions: + u = np.zeros(controller.u.shape) + y = np.ones(controller.y.shape) + + controller.set_input_output_data(u, y) + + assert np.allclose(controller.u, u) + assert np.allclose(controller.y, y) + else: + u = np.zeros((controller.u.shape[0] + 1, controller.u.shape[1])) + y = np.ones((controller.y.shape[0] + 1, controller.y.shape[1])) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.set_input_output_data(u, y) + + +@pytest.mark.parametrize("valid_dimensions", [True, False]) +@patch.object(NonlinearDataDrivenMPCController, "initialize_data_driven_mpc") +def test_nonlinear_set_output_setpoint( + mock_controller_init: Mock, + valid_dimensions: bool, + dummy_nonlinear_controller: NonlinearDataDrivenMPCController, +) -> None: + # Get dummy LTI data-driven MPC controller + controller = dummy_nonlinear_controller + + # Test input-output setpoint setting + if valid_dimensions: + y_r = np.ones_like(controller.y_r) + + controller.set_output_setpoint(y_r) + + assert np.allclose(controller.y_r, y_r) + + mock_controller_init.assert_called() + else: + y_r = np.ones((controller.y_r.shape[0] + 1, 1)) + + with pytest.raises(ValueError, match="Incorrect dimensions"): + controller.set_output_setpoint(y_r) diff --git a/tests/test_lti_dd_mpc_integration.py b/tests/test_lti_dd_mpc_integration.py new file mode 100644 index 0000000..9fbd72c --- /dev/null +++ b/tests/test_lti_dd_mpc_integration.py @@ -0,0 +1,200 @@ +import math +import os +from pathlib import Path + +import matplotlib +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCType, + SlackVarConstraintType, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + create_lti_data_driven_mpc_controller, +) +from direct_data_driven_mpc.utilities.controller.controller_params import ( + get_lti_data_driven_mpc_controller_params, +) +from direct_data_driven_mpc.utilities.controller.data_driven_mpc_sim import ( + simulate_lti_data_driven_mpc_control_loop, +) +from direct_data_driven_mpc.utilities.controller.initial_data_generation import ( # noqa: E501 + generate_initial_input_output_data, + randomize_initial_system_state, +) +from direct_data_driven_mpc.utilities.data_visualization import ( + plot_input_output, + plot_input_output_animation, + save_animation, +) +from direct_data_driven_mpc.utilities.models.lti_model import LTISystemModel + +matplotlib.use("Agg") # Prevent GUI backend + +# Define test configuration file paths +TEST_LTI_MODEL_PATH = os.path.join( + os.path.dirname(__file__), "config/models/test_lti_model.yaml" +) +TEST_MODEL_KEY = "test_lti_model" + +TEST_LTI_DD_MPC_CONFIG_PATH = os.path.join( + os.path.dirname(__file__), + "config/controllers/test_lti_dd_mpc_params.yaml", +) +TEST_LTI_DD_MPC_PARAMS_KEY = "test_lti_dd_mpc_params" + + +@pytest.mark.parametrize("n_n_mpc_step", [True, False]) +@pytest.mark.parametrize( + "controller_type, slack_var_constraint_type", + [ + (LTIDataDrivenMPCType.NOMINAL, SlackVarConstraintType.NONE), + (LTIDataDrivenMPCType.ROBUST, SlackVarConstraintType.NONE), + (LTIDataDrivenMPCType.ROBUST, SlackVarConstraintType.CONVEX), + ], +) +@pytest.mark.parametrize("bound_input", [True, False]) +def test_lti_dd_mpc_integration( + bound_input: bool, + controller_type: LTIDataDrivenMPCType, + slack_var_constraint_type: SlackVarConstraintType, + n_n_mpc_step: bool, + tmp_path: Path, +) -> None: + """ + Integration test for LTI data-driven MPC controllers across multiple + configurations. + """ + # Define test parameters + np_random = np.random.default_rng(0) + n_steps = 50 + verbose = 0 + + # Define system model + system_model = LTISystemModel( + config_file=TEST_LTI_MODEL_PATH, + model_key_value=TEST_MODEL_KEY, + verbose=verbose, + ) + + # Load Data-Driven MPC controller parameters from configuration file + m = system_model.m # Number of inputs + p = system_model.p # Number of outputs + dd_mpc_config = get_lti_data_driven_mpc_controller_params( + config_file=TEST_LTI_DD_MPC_CONFIG_PATH, + controller_key_value=TEST_LTI_DD_MPC_PARAMS_KEY, + m=m, + p=p, + verbose=verbose, + ) + + # Override controller parameters based on parameterized params + if bound_input: + dd_mpc_config["U"] = np.array([[-5, 5]] * m) + else: + dd_mpc_config["U"] = None + + dd_mpc_config["controller_type"] = controller_type + dd_mpc_config["slack_var_constraint_type"] = slack_var_constraint_type + dd_mpc_config["n_mpc_step"] = dd_mpc_config["n"] if n_n_mpc_step else 1 + + # Randomize the initial state of the system + x_0 = randomize_initial_system_state( + system_model=system_model, + controller_config=dd_mpc_config, + np_random=np_random, + ) + + # Set system state to the randomized initial state + system_model.set_state(state=x_0) + + # Generate initial input-output data + u_d, y_d = generate_initial_input_output_data( + system_model=system_model, + controller_config=dd_mpc_config, + np_random=np_random, + ) + + # Create LTI data-driven MPC controller + dd_mpc_controller = create_lti_data_driven_mpc_controller( + controller_config=dd_mpc_config, u_d=u_d, y_d=y_d + ) + + # Verify controller MPC solution on initialization + assert dd_mpc_controller.get_problem_solve_status() == "optimal" + assert dd_mpc_controller.get_optimal_cost_value() is not None + + # Simulate data-driven MPC control system + u_sys, y_sys = simulate_lti_data_driven_mpc_control_loop( + system_model=system_model, + data_driven_mpc_controller=dd_mpc_controller, + n_steps=n_steps, + np_random=np_random, + verbose=verbose, + ) + + # Verify input constraint satisfaction if enabled + if bound_input: + U = dd_mpc_config["U"] + + assert U is not None + assert np.all(u_sys >= U[:, 0]) + assert np.all(u_sys <= U[:, 1]) + + # Verify system reached stabilization + # (input and outputs are close to their setpoints) + u_s = dd_mpc_config["u_s"].flatten() + y_s = dd_mpc_config["y_s"].flatten() + + np.testing.assert_allclose(u_sys[-1], u_s, rtol=2e-1) + np.testing.assert_allclose(y_sys[-1], y_s, rtol=1e-1) + + # Test control data plotting + u_s = dd_mpc_config["u_s"] + y_s = dd_mpc_config["y_s"] + U = dd_mpc_config["U"] + u_bounds_list = U.tolist() if U is not None else None + + plot_input_output( + u_k=u_sys, + y_k=y_sys, + u_s=u_s, + y_s=y_s, + u_bounds_list=u_bounds_list, + dpi=100, + ) + + # Plot and save control data animation + N = dd_mpc_config["N"] + anim_fps = 50 + anim_points_per_frame = 5 + + anim = plot_input_output_animation( + u_k=u_sys, + y_k=y_sys, + u_s=u_s, + y_s=y_s, + u_bounds_list=u_bounds_list, + initial_steps=N, + interval=1000.0 / anim_fps, + points_per_frame=anim_points_per_frame, + dpi=100, + ) + + # Save input-output animation as a GIF + data_length = N + n_steps + anim_frames = math.ceil((data_length - 1) / anim_points_per_frame) + 1 + anim_bitrate = 2000 + anim_path = os.path.join(tmp_path, "anim.gif") + + save_animation( + animation=anim, + total_frames=anim_frames, + fps=anim_fps, + bitrate=anim_bitrate, + file_path=anim_path, + ) + + # Assert animation file exists (animation was created) + assert os.path.isfile(anim_path) diff --git a/tests/test_nonlinear_dd_mpc_integration.py b/tests/test_nonlinear_dd_mpc_integration.py new file mode 100644 index 0000000..61c0c0e --- /dev/null +++ b/tests/test_nonlinear_dd_mpc_integration.py @@ -0,0 +1,202 @@ +import math +import os +from pathlib import Path + +import matplotlib +import numpy as np +import pytest + +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + AlphaRegType, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + create_nonlinear_data_driven_mpc_controller, +) +from direct_data_driven_mpc.utilities.controller.controller_params import ( + get_nonlinear_data_driven_mpc_controller_params, +) +from direct_data_driven_mpc.utilities.controller.data_driven_mpc_sim import ( + simulate_nonlinear_data_driven_mpc_control_loop, +) +from direct_data_driven_mpc.utilities.controller.initial_data_generation import ( # noqa: E501 + generate_initial_input_output_data, +) +from direct_data_driven_mpc.utilities.data_visualization import ( + plot_input_output, + plot_input_output_animation, + save_animation, +) +from direct_data_driven_mpc.utilities.models.nonlinear_model import ( + NonlinearSystem, +) + +matplotlib.use("Agg") # Prevent GUI backend + +# Define test configuration file paths +TEST_NONLINEAR_DD_MPC_CONFIG_PATH = os.path.join( + os.path.dirname(__file__), + "config/controllers/test_nonlinear_dd_mpc_params.yaml", +) +TEST_NONLINEAR_DD_MPC_PARAMS_KEY = "test_nonlinear_dd_mpc_params" + + +def test_nonlinear_system_model() -> NonlinearSystem: + """Simple nonlinear system for testing.""" + eps_max = 0.0 + + # Define Dynamics function + def cstr_dynamics(x: np.ndarray, u: np.ndarray) -> np.ndarray: + x1, x2 = x + u1 = u[0] + + x1_new = x1 + 0.1 * (-0.5 * x1 + u1**2) + x2_new = x2 + 0.1 * (-0.3 * x2 + u1) + + return np.array([x1_new, x2_new]) + + # Define Output function + def cstr_output(x: np.ndarray, u: np.ndarray) -> np.ndarray: + return x[[1]] + + # Create nonlinear system model + return NonlinearSystem( + f=cstr_dynamics, + h=cstr_output, + n=2, + m=1, + p=1, + eps_max=eps_max, + ) + + +@pytest.mark.parametrize("n_n_mpc_step", [True, False]) +@pytest.mark.parametrize("ext_out_incr_in", [True, False]) +@pytest.mark.parametrize( + "alpha_reg_type", + [AlphaRegType.APPROXIMATED, AlphaRegType.PREVIOUS, AlphaRegType.ZERO], +) +def test_nonlinear_dd_mpc_integration( + alpha_reg_type: AlphaRegType, + ext_out_incr_in: bool, + n_n_mpc_step: bool, + tmp_path: Path, +) -> None: + """ + Integration test for nonlinear data-driven MPC controllers across multiple + configurations. + """ + # Define test parameters + np_random = np.random.default_rng(0) + n_steps = 300 + verbose = 0 + + # Define system model + system_model = test_nonlinear_system_model() + + # Load Data-Driven MPC controller parameters from configuration file + m = system_model.m # Number of inputs + p = system_model.p # Number of outputs + dd_mpc_config = get_nonlinear_data_driven_mpc_controller_params( + config_file=TEST_NONLINEAR_DD_MPC_CONFIG_PATH, + controller_key_value=TEST_NONLINEAR_DD_MPC_PARAMS_KEY, + m=m, + p=p, + verbose=verbose, + ) + + # Override controller parameters based on parameterized params + dd_mpc_config["alpha_reg_type"] = alpha_reg_type + + if ext_out_incr_in: + dd_mpc_config["ext_out_incr_in"] = ext_out_incr_in + L = dd_mpc_config["L"] + n = dd_mpc_config["n"] + dd_mpc_config["Q"] = np.eye((m + p) * (L + n + 1)) + + dd_mpc_config["n_mpc_step"] = dd_mpc_config["n"] if n_n_mpc_step else 1 + + # Generate initial input-output data + u, y = generate_initial_input_output_data( + system_model=system_model, + controller_config=dd_mpc_config, + np_random=np_random, + ) + + # Create nonlinear data-driven MPC controller + dd_mpc_controller = create_nonlinear_data_driven_mpc_controller( + controller_config=dd_mpc_config, u=u, y=y + ) + + # Verify controller MPC solution on initialization + assert dd_mpc_controller.get_problem_solve_status() == "optimal" + assert dd_mpc_controller.get_optimal_cost_value() is not None + + # Simulate data-driven MPC control system + u_sys, y_sys = simulate_nonlinear_data_driven_mpc_control_loop( + system_model=system_model, + data_driven_mpc_controller=dd_mpc_controller, + n_steps=n_steps, + np_random=np_random, + verbose=verbose, + ) + + # Verify input constraint satisfaction + U = dd_mpc_config["U"] + + assert np.all(u_sys >= U[:, 0]) + assert np.all(u_sys <= U[:, 1]) + + # Verify system reached stabilization + # (input and outputs are close to their setpoints) + y_r = dd_mpc_config["y_r"].flatten() + + # Only assert convergence for alpha regularization types + # APPROXIMATED and PREVIOUS, since ZERO tends to underperform + if alpha_reg_type != AlphaRegType.ZERO: + np.testing.assert_allclose(y_sys[-1], y_r, rtol=1e-1) + + # Test control data plotting + y_r = dd_mpc_config["y_r"] + U = dd_mpc_config["U"] + u_bounds_list = U.tolist() if U is not None else None + + plot_input_output( + u_k=u_sys, + y_k=y_sys, + y_s=y_r, + u_bounds_list=u_bounds_list, + dpi=100, + ) + + # Plot and save control data animation + N = dd_mpc_config["N"] + anim_fps = 50 + anim_points_per_frame = 5 + + anim = plot_input_output_animation( + u_k=u_sys, + y_k=y_sys, + y_s=y_r, + u_bounds_list=u_bounds_list, + initial_steps=N, + interval=1000.0 / anim_fps, + points_per_frame=anim_points_per_frame, + dpi=100, + ) + + # Save input-output animation as a GIF + data_length = N + n_steps + anim_frames = math.ceil((data_length - 1) / anim_points_per_frame) + 1 + anim_bitrate = 2000 + anim_path = os.path.join(tmp_path, "anim.gif") + + save_animation( + animation=anim, + total_frames=anim_frames, + fps=anim_fps, + bitrate=anim_bitrate, + file_path=anim_path, + ) + + # Assert animation file exists (animation was created) + assert os.path.isfile(anim_path) diff --git a/tests/test_placeholder.py b/tests/test_placeholder.py deleted file mode 100644 index bf518b3..0000000 --- a/tests/test_placeholder.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_placeholder() -> None: - assert True diff --git a/tests/test_utilities/conftest.py b/tests/test_utilities/conftest.py new file mode 100644 index 0000000..d7bda58 --- /dev/null +++ b/tests/test_utilities/conftest.py @@ -0,0 +1,54 @@ +from typing import Any + +import numpy as np +import pytest + + +@pytest.fixture +def test_linear_system() -> Any: + A = np.array([[0.2, 0.5, 0.2], [0, -0.2, 0.2], [0.1, -0.5, 0.1]]) + B = np.array([0, 1, 2]).reshape(-1, 1) + C = np.array([[1, 2, 3], [4, 5, 6]]) + D = np.array([0, 1]).reshape(-1, 1) + t = 3 + + # Precomputed observability matrix + # Note: The test system is observable to ensure correct + # initial state estimation testing + Ot = np.array( + [ + [1, 2, 3], + [4, 5, 6], + [0.5, -1.4, 0.9], + [1.4, -2, 2.4], + [0.19, 0.08, -0.09], + [0.52, -0.1, 0.12], + ] + ) + + # Precomputed Toeplitz matrix + Tt = np.array( + [ + [0, 0, 0], + [1, 0, 0], + [8, 0, 0], + [17, 1, 0], + [0.4, 8, 0], + [2.8, 17, 1], + ] + ) + + return A, B, C, D, t, Ot, Tt + + +@pytest.fixture +def dummy_plot_data() -> tuple[np.ndarray, ...]: + T = 50 + m = 2 + p = 2 + u_k = np.zeros((T, m)) + y_k = np.ones((T, p)) + u_s = np.ones((m, 1)) + y_s = np.ones((p, 1)) + + return u_k, y_k, u_s, y_s diff --git a/tests/test_utilities/controller/conftest.py b/tests/test_utilities/controller/conftest.py new file mode 100644 index 0000000..f357938 --- /dev/null +++ b/tests/test_utilities/controller/conftest.py @@ -0,0 +1,70 @@ +from typing import Any + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCType, + SlackVarConstraintType, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + AlphaRegType, +) + + +@pytest.fixture +def test_dd_mpc_controller_yaml_config() -> dict[str, Any]: + return { + "n": 2, + "N": 50, + "L": 4, + "Q_weights": 1.0, + "R_weights": 0.1, + "epsilon_bar": 0.01, + "lambda_alpha": 0.1, + "lambda_sigma": 0.1, + "lambda_alpha_epsilon_bar": 0.001, + "U": [[-1, 1]], + "Us": [[-1, 1]], + "u_d_range": [[-1, 1]], + "u_range": [[-1, 1]], + "slack_var_constraint_type": 1, + "controller_type": 0, + "u_s": [0.0], + "y_s": [0.0], + "y_r": [0.0], + "n_n_mpc_step": True, + "alpha_reg_type": 0, + "lamb_alpha_s": 0.01, + "lamb_sigma_s": 0.01, + "ext_out_incr_in": False, + "update_cost_threshold": 0.0, + } + + +@pytest.fixture +def test_dd_mpc_controller_config() -> dict[str, Any]: + return { + "n": 2, + "L": 4, + "Q": np.eye(1), + "R": np.eye(1), + "S": np.eye(1), + "u_s": np.zeros((1,)), + "y_s": np.zeros((1,)), + "y_r": np.zeros((1,)), + "eps_max": 0.1, + "lamb_alpha": 0.01, + "lamb_sigma": 0.01, + "U": np.zeros((2, 2)), + "Us": np.zeros((2, 2)), + "c": 1.0, + "slack_var_constraint_type": SlackVarConstraintType.CONVEX, + "controller_type": LTIDataDrivenMPCType.ROBUST, + "n_mpc_step": 1, + "alpha_reg_type": AlphaRegType.APPROXIMATED, + "lamb_alpha_s": 0.01, + "lamb_sigma_s": 0.01, + "ext_out_incr_in": False, + "update_cost_threshold": 0.0, + } diff --git a/tests/test_utilities/controller/test_controller_creation.py b/tests/test_utilities/controller/test_controller_creation.py new file mode 100644 index 0000000..854d6ff --- /dev/null +++ b/tests/test_utilities/controller/test_controller_creation.py @@ -0,0 +1,123 @@ +from typing import Any +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCController, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + NonlinearDataDrivenMPCController, +) +from direct_data_driven_mpc.utilities.controller.controller_creation import ( + create_lti_data_driven_mpc_controller, + create_nonlinear_data_driven_mpc_controller, +) + + +@pytest.mark.parametrize( + "u_shape, y_shape, expected_exception", + [ + ((100, 2), (100, 2), None), # Valid: Equal number of inputs-outputs + ((100, 2), (90, 1), ValueError), # Invalid: Mismatched sample count + ((100, 1), (100, 2), None), # Valid: Unequal number of inputs-outputs + ], +) +def test_create_lti_data_driven_mpc_controller( + u_shape: tuple[int, int], + y_shape: tuple[int, int], + expected_exception: type[Exception] | None, + test_dd_mpc_controller_config: dict[str, Any], +) -> None: + # Define test parameters + np_random = np.random.default_rng(0) + u_d = np_random.random(u_shape) + y_d = np_random.random(y_shape) + m = u_d.shape[1] # Number of inputs + p = y_d.shape[1] # Number of outputs + + config: Any = test_dd_mpc_controller_config + config["Q"] = np.eye(p * config["L"]) + config["R"] = np.eye(m * config["L"]) + config["u_s"] = np.zeros((m, 1)) + config["y_s"] = np.zeros((p, 1)) + config["U"] = np.array([[0, 1]] * m) + + # Check for exception or validate correct controller instantiation + if expected_exception: + with pytest.raises(expected_exception): + create_lti_data_driven_mpc_controller(config, u_d, y_d) + else: + controller = create_lti_data_driven_mpc_controller(config, u_d, y_d) + assert isinstance(controller, LTIDataDrivenMPCController) + assert controller.m == u_shape[1] + assert controller.p == y_shape[1] + assert controller.get_optimal_cost_value() is not None + + +@pytest.mark.parametrize("ext_out_incr_in_status", [True, False]) +@pytest.mark.parametrize( + "u_shape, y_shape, expected_exception", + [ + ((100, 2), (100, 2), None), # Valid: Equal number of inputs-outputs + ((100, 2), (90, 1), ValueError), # Invalid: Mismatched sample count + ((100, 1), (100, 2), None), # Valid: Unequal number of inputs-outputs + ], +) +@patch.object(NonlinearDataDrivenMPCController, "get_optimal_control_input") +@patch.object(NonlinearDataDrivenMPCController, "solve_alpha_sr_Lin_Dt") +def test_create_nonlinear_data_driven_mpc_controller( + mock_controller_alpha_solve: Mock, + mock_controller_get_optimal_input: Mock, + u_shape: tuple[int, int], + y_shape: tuple[int, int], + expected_exception: type[Exception] | None, + ext_out_incr_in_status: bool, + test_dd_mpc_controller_config: dict[str, Any], +) -> None: + # Define test parameters + np_random = np.random.default_rng(0) + u_d = np_random.random(u_shape) + y_d = np_random.random(y_shape) + m = u_d.shape[1] # Number of inputs + p = y_d.shape[1] # Number of outputs + + config: Any = test_dd_mpc_controller_config + matrix_order = config["L"] + config["n"] + 1 + + if ext_out_incr_in_status: + config["Q"] = np.eye((m + p) * (matrix_order)) + else: + config["Q"] = np.eye(p * (matrix_order)) + + config["R"] = np.eye(m * matrix_order) + config["S"] = np.eye(p) + config["U"] = np.array([[0, 1]] * m) + config["Us"] = np.array([[0.1, 0.9]] * m) + config["y_r"] = np.zeros((p, 1)) + config["ext_out_incr_in"] = ext_out_incr_in_status + + # Patch optimal control input retrieval to bypass solver status checks + mock_controller_get_optimal_input.return_value = np.zeros((1,)) + + # Patch `alpha` approximation to return a value with the correct shape + N = u_shape[0] + L = config["L"] + n = config["n"] + alpha_dim = (N - L - n, 1) + mock_controller_alpha_solve.return_value = np.zeros(alpha_dim) + + # Check for exception or validate correct controller instantiation + if expected_exception: + with pytest.raises(expected_exception): + create_nonlinear_data_driven_mpc_controller(config, u_d, y_d) + else: + controller = create_nonlinear_data_driven_mpc_controller( + config, u_d, y_d + ) + + assert isinstance(controller, NonlinearDataDrivenMPCController) + assert controller.m == u_shape[1] + assert controller.p == y_shape[1] + assert controller.get_optimal_cost_value() is not None diff --git a/tests/test_utilities/controller/test_controller_params.py b/tests/test_utilities/controller/test_controller_params.py new file mode 100644 index 0000000..57c2326 --- /dev/null +++ b/tests/test_utilities/controller/test_controller_params.py @@ -0,0 +1,267 @@ +import copy +from typing import Any +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCType, + SlackVarConstraintType, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + AlphaRegType, +) +from direct_data_driven_mpc.utilities.controller.controller_params import ( + LTIDataDrivenMPCParams, + NonlinearDataDrivenMPCParams, + construct_weighting_matrix, + get_lti_data_driven_mpc_controller_params, + get_nonlinear_data_driven_mpc_controller_params, + get_weights_list_from_param, + print_parameter_loading_details, +) + +LOAD_YAML_PATCH_PATH = ( + "direct_data_driven_mpc.utilities.controller.controller_params." + "load_yaml_config_params" +) + + +@pytest.mark.parametrize("scalar_weights", [True, False]) +@patch(LOAD_YAML_PATCH_PATH) +def test_get_lti_data_driven_mpc_controller_params( + mock_load_yaml: Mock, + scalar_weights: bool, + test_dd_mpc_controller_yaml_config: dict[str, Any], +) -> None: + # Define test parameters + m = 2 # Number of inputs + p = 1 # Number of outputs + loaded_params = copy.deepcopy(test_dd_mpc_controller_yaml_config) + + if scalar_weights: + loaded_params["Q_weights"] = 1.0 + loaded_params["R_weights"] = 0.1 + else: + loaded_params["Q_weights"] = [1.0] * p + loaded_params["R_weights"] = [0.1] * m + + loaded_params["U"] = [[0, 1]] * m + loaded_params["u_s"] = [0.1] * m + loaded_params["y_s"] = [1.0] * p + + # Mock return value of `load_yaml_config_params` + mock_load_yaml.return_value = loaded_params + + # Load controller parameters + params = get_lti_data_driven_mpc_controller_params( + config_file="dummy.yaml", + controller_key_value="controller_key", + m=m, + p=p, + ) + + # Validate correct parameter initialization + L = loaded_params["L"] + assert isinstance(params["Q"], np.ndarray) + assert isinstance(params["R"], np.ndarray) + assert params["Q"].shape[0] == p * L + assert params["R"].shape[0] == m * L + assert params["controller_type"] == LTIDataDrivenMPCType.NOMINAL + assert params["slack_var_constraint_type"] == SlackVarConstraintType.CONVEX + + +@pytest.mark.parametrize("ext_out_incr_in", [True, False]) +@pytest.mark.parametrize("scalar_weights", [True, False]) +@patch(LOAD_YAML_PATCH_PATH) +def test_get_nonlinear_data_driven_mpc_controller_params( + mock_load_yaml: Mock, + scalar_weights: bool, + ext_out_incr_in: bool, + test_dd_mpc_controller_yaml_config: dict[str, Any], +) -> None: + # Define test parameters + m = 2 # Number of inputs + p = 1 # Number of outputs + loaded_params = copy.deepcopy(test_dd_mpc_controller_yaml_config) + + if scalar_weights: + loaded_params["Q_weights"] = 1.0 + loaded_params["R_weights"] = 0.1 + loaded_params["S_weights"] = 0.01 + else: + loaded_params["Q_weights"] = [1.0] * p + loaded_params["R_weights"] = [0.1] * m + loaded_params["S_weights"] = [0.01] * p + + loaded_params["U"] = [[0, 1]] * m + loaded_params["y_r"] = [1.0] * p + loaded_params["ext_out_incr_in"] = ext_out_incr_in + + # Mock return value of `load_yaml_config_params` + mock_load_yaml.return_value = loaded_params + + # Load controller parameters + params = get_nonlinear_data_driven_mpc_controller_params( + config_file="dummy.yaml", + controller_key_value="controller_key", + m=m, + p=p, + ) + + # Validate correct parameter initialization + n = loaded_params["n"] + L = loaded_params["L"] + assert isinstance(params["Q"], np.ndarray) + assert isinstance(params["R"], np.ndarray) + assert isinstance(params["S"], np.ndarray) + + if ext_out_incr_in: + assert params["Q"].shape[0] == (m + p) * (L + n + 1) + else: + assert params["Q"].shape[0] == p * (L + n + 1) + + assert params["R"].shape[0] == m * (L + n + 1) + assert params["S"].shape[0] == p + + assert params["alpha_reg_type"] == AlphaRegType.APPROXIMATED + + +@patch(LOAD_YAML_PATCH_PATH) +def test_lti_param_missing_key_raises(mock_load_yaml: Mock) -> None: + # Missing required keys in config should raise `ValueError` + mock_load_yaml.return_value = {"N": 100} + + with pytest.raises(ValueError, match="Missing required parameter key"): + get_lti_data_driven_mpc_controller_params( + "dummy.yaml", "controller_key", m=1, p=1 + ) + + +@patch(LOAD_YAML_PATCH_PATH) +def test_nonlinear_param_missing_key_raises(mock_load_yaml: Mock) -> None: + # Missing required keys in config should raise `ValueError` + mock_load_yaml.return_value = {"N": 100} + + with pytest.raises(ValueError, match="Missing required parameter key"): + get_nonlinear_data_driven_mpc_controller_params( + "dummy.yaml", "controller_key", m=1, p=1 + ) + + +@pytest.mark.parametrize( + "weights_param, n_vars, horizon, expected_matrix", + [ + (2.0, 3, 2, np.kron(np.eye(2), np.diag([2.0, 2.0, 2.0]))), + ([1.0, 2.0], 2, 3, np.kron(np.eye(3), np.diag([1.0, 2.0]))), + ], +) +def test_construct_weighting_matrix_valid( + weights_param: float | list[float], + n_vars: int, + horizon: int, + expected_matrix: np.ndarray, +) -> None: + matrix = construct_weighting_matrix(weights_param, n_vars, horizon) + + np.testing.assert_array_equal(matrix, expected_matrix) + + +@pytest.mark.parametrize( + "weights_param, n_vars, expected_message", + [ + ([1.0, 2.0], 3, "Expected a list of length 3"), + ("invalid_weight", 2, "Expected a scalar or a list of length 2"), + ], +) +def test_construct_weighting_matrix_invalid( + weights_param: Any, n_vars: int, expected_message: str +) -> None: + # Verify a `ValueError` is raised with invalid parameters + with pytest.raises(ValueError, match=expected_message): + construct_weighting_matrix(weights_param, n_vars, 2) + + +@pytest.mark.parametrize( + "weights_param, size, expected", + [ + (1.0, 3, [1.0, 1.0, 1.0]), + ([0.5, 1.5], 2, [0.5, 1.5]), + ], +) +def test_get_weights_list_from_param_valid( + weights_param: float | list[float], size: int, expected: list[float] +) -> None: + result = get_weights_list_from_param(weights_param, size) + + assert result == expected + + +@pytest.mark.parametrize( + "weights_param, size, expected_message", + [ + ([1.0], 2, "Expected a scalar or a list of length 2"), + ("invalid_weight", 2, "Expected a scalar or a list of length 2"), + ], +) +def test_get_weights_list_from_param_invalid( + weights_param: Any, size: int, expected_message: str +) -> None: + # Verify a `ValueError` is raised with invalid parameters + with pytest.raises(ValueError, match=expected_message): + get_weights_list_from_param(weights_param, size) + + +@pytest.mark.parametrize("verbosity_level", [0, 1, 2]) +@pytest.mark.parametrize("dd_mpc_type", ["lti", "nonlinear"]) +def test_print_parameter_loading_details( + dd_mpc_type: str, + verbosity_level: int, + dummy_lti_controller_data: tuple[ + LTIDataDrivenMPCParams, np.ndarray, np.ndarray + ], + dummy_nonlinear_controller_data: tuple[ + NonlinearDataDrivenMPCParams, np.ndarray, np.ndarray + ], + capsys: pytest.CaptureFixture, +) -> None: + # Define test parameters + controller_params: LTIDataDrivenMPCParams | NonlinearDataDrivenMPCParams + if dd_mpc_type == "lti": + controller_params, _, _ = dummy_lti_controller_data + cost_horizon = controller_params["L"] + controller_type = controller_params["controller_type"].name + controller_label = "LTI" + else: + controller_params, _, _ = dummy_nonlinear_controller_data + cost_horizon = controller_params["L"] + controller_params["n"] + 1 + controller_label = "Nonlinear" + alpha_reg_type = controller_params["alpha_reg_type"].name + + print_parameter_loading_details( + controller_params, cost_horizon, verbosity_level, controller_label + ) + + # Capture printed output + out, _ = capsys.readouterr() + + # Verify print output based on the verbosity level + if verbosity_level == 0: + assert out == "" + elif verbosity_level == 1: + assert ( + f"Loaded {controller_label} Data-Driven MPC controller " + "parameters\n" in out + ) + else: + assert ( + f"Loaded {controller_label} Data-Driven MPC controller parameters:" + in out + ) + assert "Q weights:" in out + + if dd_mpc_type == "lti": + assert f"controller_type: {controller_type}" in out + else: + assert f"alpha_reg_type: {alpha_reg_type}" in out diff --git a/tests/test_utilities/controller/test_data_driven_mpc_sim.py b/tests/test_utilities/controller/test_data_driven_mpc_sim.py new file mode 100644 index 0000000..ffadc9f --- /dev/null +++ b/tests/test_utilities/controller/test_data_driven_mpc_sim.py @@ -0,0 +1,117 @@ +from unittest.mock import Mock + +import numpy as np +import pytest + +from direct_data_driven_mpc.lti_data_driven_mpc_controller import ( + LTIDataDrivenMPCController, +) +from direct_data_driven_mpc.nonlinear_data_driven_mpc_controller import ( + NonlinearDataDrivenMPCController, +) +from direct_data_driven_mpc.utilities.controller.data_driven_mpc_sim import ( + print_mpc_step_info, + simulate_lti_data_driven_mpc_control_loop, + simulate_nonlinear_data_driven_mpc_control_loop, +) +from direct_data_driven_mpc.utilities.models.lti_model import LTIModel +from direct_data_driven_mpc.utilities.models.nonlinear_model import ( + NonlinearSystem, +) + + +def test_sim_lti_dd_mpc_control_loop( + mock_lti_model: LTIModel, + mock_lti_controller: LTIDataDrivenMPCController, +) -> None: + # Define test parameters + m = mock_lti_model.m + p = mock_lti_model.p + n_steps = 10 + np_random = np.random.default_rng(0) + + # Test LTI data-driven MPC closed-loop simulation + u, y = simulate_lti_data_driven_mpc_control_loop( + mock_lti_model, mock_lti_controller, n_steps, np_random, verbose=0 + ) + + # Verify correct shapes of output + assert u.shape == (n_steps, m) + assert y.shape == (n_steps, p) + + +def test_sim_nonlinear_dd_mpc_control_loop( + mock_nonlinear_model: NonlinearSystem, + mock_nonlinear_controller: NonlinearDataDrivenMPCController, +) -> None: + # Define test parameters + m = mock_nonlinear_model.m + p = mock_nonlinear_model.p + n_steps = 10 + np_random = np.random.default_rng(0) + + # Test LTI data-driven MPC closed-loop simulation + u, y = simulate_nonlinear_data_driven_mpc_control_loop( + mock_nonlinear_model, + mock_nonlinear_controller, + n_steps, + np_random, + verbose=0, + ) + + # Verify correct shapes of output + assert u.shape == (n_steps, m) + assert y.shape == (n_steps, p) + + +@pytest.mark.parametrize("verbose", [0, 1, 2]) +@pytest.mark.parametrize("include_inputs", [True, False]) +def test_print_mpc_step_info( + verbose: int, include_inputs: bool, capsys: pytest.CaptureFixture +) -> None: + # Define test parameters + step = 5 + mpc_cost_val = 1.0 + y_s = np.array([1.0, 2.0]) + y_sys_k = np.array([0.5, 1.5]) + + if include_inputs: + u_s = np.array([0.0, 1.0]) + u_sys_k = np.array([-0.5, 0.5]) + else: + u_s = None + u_sys_k = None + + # Create a mock progress bar + mock_progress_bar = Mock() + + # Test print function + print_mpc_step_info( + verbose=verbose, + step=step, + mpc_cost_val=mpc_cost_val, + u_s=u_s, + u_sys_k=u_sys_k, + y_s=y_s, + y_sys_k=y_sys_k, + progress_bar=mock_progress_bar, + ) + + # Capture printed output + out, _ = capsys.readouterr() + + # Verify behavior based on the verbosity level + if verbose == 0: + assert out == "" + mock_progress_bar.set_description.assert_not_called() + elif verbose == 1: + mock_progress_bar.set_description.assert_called_once() + mock_progress_bar.update.assert_called_once() + assert out == "" + elif verbose == 2: + assert f"MPC cost value: {mpc_cost_val:>8.4f}" in out + assert "y_1e" in out + if include_inputs: + assert "u_1e" in out + else: + assert "u_1e" not in out diff --git a/tests/test_utilities/controller/test_initial_data_generation.py b/tests/test_utilities/controller/test_initial_data_generation.py new file mode 100644 index 0000000..4096e08 --- /dev/null +++ b/tests/test_utilities/controller/test_initial_data_generation.py @@ -0,0 +1,66 @@ +from typing import Any + +import numpy as np + +from direct_data_driven_mpc.utilities.controller.initial_data_generation import ( # noqa: E501 + generate_initial_input_output_data, + randomize_initial_system_state, + simulate_n_input_output_measurements, +) +from direct_data_driven_mpc.utilities.models.lti_model import ( + LTIModel, +) + + +def test_randomize_initial_system_state(mock_lti_model: LTIModel) -> None: + # Define test parameters + n = mock_lti_model.n # System order + m = mock_lti_model.m # Number of inputs + controller_config: Any = {"u_range": np.array([[-1, 1]] * m)} + np_random = np.random.default_rng(0) + + x0 = randomize_initial_system_state( + mock_lti_model, controller_config, np_random + ) + + # Verify system state has correct dimensions + assert x0.shape == (n,) + + +def test_generate_initial_input_output_data(mock_lti_model: LTIModel) -> None: + # Define test parameters + m = mock_lti_model.m # Number of inputs + p = mock_lti_model.p # Number of outputs + N = 5 # Initial data trajectory length + controller_config: Any = {"N": N, "u_range": np.array([[-1, 1]] * m)} + np_random = np.random.default_rng(0) + + u_d, y_d = generate_initial_input_output_data( + mock_lti_model, controller_config, np_random + ) + + # Verify correct input-output data dimensions + assert u_d.shape == (N, m) + assert y_d.shape == (N, p) + + # Verify input is correctly bounded + assert np.all(u_d >= -1.0) and np.all(u_d <= 1.0) + + +def test_simulate_n_input_output_measurements( + mock_lti_model: LTIModel, +) -> None: + # Define test parameters + m = mock_lti_model.m # Number of inputs + p = mock_lti_model.p # Number of outputs + n = 4 # Estimated system order + controller_config: Any = {"n": n, "u_s": np.zeros((m, 1))} + np_random = np.random.default_rng(0) + + U_n, Y_n = simulate_n_input_output_measurements( + mock_lti_model, controller_config, np_random + ) + + # Verify correct input-output data dimensions + assert U_n.shape == (n, m) + assert Y_n.shape == (n, p) diff --git a/tests/test_utilities/models/test_lti_model.py b/tests/test_utilities/models/test_lti_model.py new file mode 100644 index 0000000..ed3876d --- /dev/null +++ b/tests/test_utilities/models/test_lti_model.py @@ -0,0 +1,151 @@ +from typing import Any +from unittest.mock import Mock, patch + +import numpy as np +import pytest + +from direct_data_driven_mpc.utilities.models.lti_model import ( + LTIModel, + LTISystemModel, +) + +LOAD_YAML_PATCH_PATH = ( + "direct_data_driven_mpc.utilities.models.lti_model.load_yaml_config_params" +) + + +def test_lti_model_simulation(test_linear_system: Any) -> None: + # Define test parameters + A, B, C, D, _, _, _ = test_linear_system + + # Instantiate LTI model + model = LTIModel(A, B, C, D, eps_max=0.0) + + m = model.m # Number of inputs + p = model.p # Number of outputs + + # Test single step simulation + u = np.ones((m,)) + w = np.zeros((p,)) + expected_x = np.array([0, 1, 2]) + expected_y = np.array([0, 1]) + + y = model.simulate_step(u, w) + + np.testing.assert_allclose(model.x, expected_x) + np.testing.assert_allclose(y, expected_y) + + # Test multiple steps simulation + n_steps = 5 + U = np.ones((n_steps, m)) + W = np.zeros((n_steps, p)) + expected_x = np.array([1.13164, 1.12084, 1.72342]) + expected_Y = np.array( + [ + [8, 18], + [8.4, 20.8], + [8.3, 20.94], + [8.514, 21.352], + [8.5514, 21.4716], + ] + ) + + Y = model.simulate(U, W, n_steps) + + np.testing.assert_allclose(model.x, expected_x) + np.testing.assert_allclose(Y, expected_Y) + + +@pytest.mark.parametrize("valid_state", [True, False]) +def test_lti_model_setters(valid_state: bool, test_linear_system: Any) -> None: + # Define test parameters + A, B, C, D, _, _, _ = test_linear_system + + # Instantiate LTI model + model = LTIModel(A, B, C, D, eps_max=0.0) + + # Test model internal state setter + if valid_state: + x_set = np.array([0, 0, 0]) + + model.set_state(x_set) + + np.testing.assert_allclose(model.x, x_set) + + else: + x_set = np.array([0]) + + # Verify `set_state` raises a `ValueError` when + # given a state with incorrect dimensions + with pytest.raises(ValueError): + model.set_state(x_set) + + # Test system measurement noise setter + eps_max = 0.01 + model.set_eps_max(eps_max) + + assert model.eps_max == eps_max + + +def test_lti_model_utility_methods(test_linear_system: Any) -> None: + # Define test parameters + A, B, C, D, _, _, _ = test_linear_system + + # Instantiate LTI model + model = LTIModel(A, B, C, D, eps_max=0.0) + + n = model.n # System order + m = model.m # Number of inputs + + # Test initial state estimation + expected_estimated_x = np.zeros((n, 1)) + U_n = np.ones((n, m)).reshape(-1, 1) + Y = np.array([[0, 1], [8, 18], [8.4, 20.8]]).reshape(-1, 1) + + estimated_x = model.get_initial_state_from_trajectory(U_n, Y) + + np.testing.assert_allclose(estimated_x, expected_estimated_x) + + # Test calculation of equilibrium output from input + test_u_eq = np.ones((m,)) + expected_y_eq = np.array([8.54945055, 21.48351648]) + + y_eq = model.get_equilibrium_output_from_input(test_u_eq) + + np.testing.assert_allclose(y_eq, expected_y_eq) + + # Test calculation of equilibrium input from output + # (Round-trip test using calculated `y_eq`) + u_eq = model.get_equilibrium_input_from_output(y_eq) + + np.testing.assert_allclose(u_eq, test_u_eq) + + +@pytest.mark.parametrize("valid_inputs", [True, False]) +@patch(LOAD_YAML_PATCH_PATH) +def test_lti_system_model( + mock_load_yaml: Mock, valid_inputs: bool, test_linear_system: Any +) -> None: + # Define test parameters + A, B, C, D, t, _, _ = test_linear_system + + loaded_model_params = {"A": A, "B": B, "C": C, "D": D, "eps_max": 0.01} + + if not valid_inputs: + loaded_model_params["A"] = np.eye(1) + loaded_model_params["B"] = np.eye(5) + + # Mock return value of `load_yaml_config_params` + mock_load_yaml.return_value = loaded_model_params + + # Check for exception or validate correct controller instantiation + if valid_inputs: + # Instantiate LTI model + system_model = LTISystemModel("dummy_path.yaml", "model_key") + + assert system_model.A.shape == A.shape + assert system_model.eps_max == 0.01 + assert isinstance(system_model, LTIModel) + else: + with pytest.raises(ValueError): + system_model = LTISystemModel("dummy_path.yaml", "model_key") diff --git a/tests/test_utilities/models/test_nonlinear_model.py b/tests/test_utilities/models/test_nonlinear_model.py new file mode 100644 index 0000000..c638c4c --- /dev/null +++ b/tests/test_utilities/models/test_nonlinear_model.py @@ -0,0 +1,55 @@ +import numpy as np + +from direct_data_driven_mpc.utilities.models.nonlinear_model import ( + NonlinearSystem, +) + + +def test_nonlinear_model_simulation() -> None: + # Define dynamics function + def f(x: np.ndarray, u: np.ndarray) -> np.ndarray: + return np.sin(x) + u + + # Define output function + def h(x: np.ndarray, u: np.ndarray) -> np.ndarray: + return x**2 + u + + # Define test parameters + n = 2 + m = 2 + p = 2 + eps_max = 0.0 + + # Instantiate nonlinear model + model = NonlinearSystem(f, h, n, m, p, eps_max) + + # Test single step simulation + u = np.ones((m,)) + w = np.zeros((p,)) + expected_x = np.array([1, 1]) + expected_y = np.array([1, 1]) + + y = model.simulate_step(u, w) + + np.testing.assert_allclose(model.x, expected_x) + np.testing.assert_allclose(y, expected_y) + + # Test multiple steps simulation + n_steps = 5 + U = np.ones((n_steps, m)) + W = np.zeros((n_steps, p)) + expected_x = np.array([1.93321866, 1.93321866]) + expected_Y = np.array( + [ + [2, 2], + [4.39101539, 4.39101539], + [4.85568853, 4.85568853], + [4.70117209, 4.70117209], + [4.75709853, 4.75709853], + ] + ) + + Y = model.simulate(U, W, n_steps) + + np.testing.assert_allclose(model.x, expected_x) + np.testing.assert_allclose(Y, expected_Y) diff --git a/tests/test_utilities/test_data_visualization.py b/tests/test_utilities/test_data_visualization.py new file mode 100644 index 0000000..38f6c75 --- /dev/null +++ b/tests/test_utilities/test_data_visualization.py @@ -0,0 +1,463 @@ +import os +from pathlib import Path +from typing import Any + +import matplotlib +import matplotlib.pyplot as plt +import numpy as np +import pytest +from matplotlib.animation import FuncAnimation +from matplotlib.figure import Figure +from matplotlib.legend import Legend +from matplotlib.lines import Line2D +from matplotlib.patches import Rectangle +from matplotlib.text import Text + +from direct_data_driven_mpc.utilities.data_visualization import ( + HandlerInitMeasurementRect, + create_input_output_figure, + initialize_data_animation, + plot_data, + plot_input_output, + plot_input_output_animation, + save_animation, + update_data_animation, +) + +matplotlib.use("Agg") # Prevent GUI backend + + +@pytest.mark.parametrize("include_u_s", [True, False]) +def test_plot_input_output( + include_u_s: bool, dummy_plot_data: tuple[np.ndarray, ...] +) -> None: + # Define test parameters + u_k, y_k, u_s, y_s = dummy_plot_data + T, m = u_k.shape + _, p = y_k.shape + + # Test input-output plot + try: + if include_u_s: + plot_input_output(u_k=u_k, y_k=y_k, u_s=u_s, y_s=y_s) + else: + plot_input_output(u_k=u_k, y_k=y_k, y_s=y_s) + + except Exception as e: + pytest.fail( + f"`plot_input_output` (`include_u_s` = {include_u_s}) raised an " + f"exception: {e}" + ) + + # Get current figure and axes + fig = plt.gcf() + axs = fig.axes + + # Verify correct number of input-output subplots + assert len(axs) == m + p + + # Verify that input data is correctly plotted + for j, ax in enumerate(axs[:m]): + # Check input line data + lines = ax.get_lines() + np.testing.assert_equal(np.asarray(lines[0].get_ydata()), u_k[:, j]) + + # Check input setpoint line data if included + if include_u_s: + expected_u_s = np.full_like(u_k[:, j], u_s[j, 0]) + np.testing.assert_equal( + np.asarray(lines[1].get_ydata()), expected_u_s + ) + + # Verify that output data is correctly plotted + for j, ax in enumerate(axs[m:]): + # Check input line data + lines = ax.get_lines() + np.testing.assert_equal(np.asarray(lines[0].get_ydata()), y_k[:, j]) + + # Check input setpoint line data if included + expected_y_s = np.full_like(y_k[:, j], y_s[j, 0]) + np.testing.assert_equal(np.asarray(lines[1].get_ydata()), expected_y_s) + + plt.close("all") + + +def test_plot_input_output_mismatched_dim_len( + dummy_plot_data: tuple[np.ndarray, ...], +) -> None: + u_k, y_k, u_s, y_s = dummy_plot_data + m = u_k.shape[1] + p = y_k.shape[1] + + # Verify `ValueError` is raised on dimension + # mismatch between input-output arrays + with pytest.raises(ValueError, match="Dimension mismatch"): + plot_input_output(u_k=u_k[:-1], y_k=y_k, y_s=y_s) + + # Verify `ValueError` is raised on dimension + # mismatch between setpoint arrays + with pytest.raises(ValueError, match="Dimension mismatch"): + plot_input_output(u_k=u_k, y_k=y_k, y_s=y_s[:1]) + + with pytest.raises(ValueError, match="Dimension mismatch"): + plot_input_output(u_k=u_k, y_k=y_k, u_s=u_s[:1], y_s=y_s) + + # Verify `ValueError` is raised when bounds or + # y-limits list lengths mismatch + mismatch_cases = [ + ("u_bounds_list", {"u_bounds_list": [(0.0, 1.0)] * (m + 1)}), + ("y_bounds_list", {"y_bounds_list": [(0.0, 1.0)] * (p + 1)}), + ("u_ylimits_list", {"u_ylimits_list": [(0.0, 1.0)] * (m + 1)}), + ("y_ylimits_list", {"y_ylimits_list": [(0.0, 1.0)] * (p + 1)}), + ] + + for param_name, kwargs in mismatch_cases: + with pytest.raises(ValueError, match=f"{param_name}.*does not match"): + # Prevent mypy [arg-type] error + safe_kwargs: dict[str, Any] = kwargs + + plot_input_output( + u_k=u_k, + y_k=y_k, + u_s=u_s, + y_s=y_s, + **safe_kwargs, + ) + + plt.close("all") + + +@pytest.mark.parametrize("highlight_initial_steps", [True, False]) +@pytest.mark.parametrize("plot_bounds", [True, False]) +def test_plot_data(plot_bounds: bool, highlight_initial_steps: bool) -> None: + # Define test parameters + fig, ax = plt.subplots() + T = 50 + data = np.linspace(0, 1, T) + setpoint = np.array([0.7]) + var_symbol = "u" + setpoint_var_symbol = "u^s" + data_label = "_test" + bounds = (0.2, 0.8) if plot_bounds else None + initial_steps = 10 if highlight_initial_steps else None + + plot_data( + axis=ax, + data=data, + setpoint=setpoint, + index=0, + data_line_params={"color": "blue"}, + setpoint_line_params={"color": "green", "linestyle": "--"}, + bounds_line_params={"color": "red", "linestyle": ":"}, + var_symbol=var_symbol, + setpoint_var_symbol=setpoint_var_symbol, + var_label="Input", + data_label=data_label, + initial_text="Init", + control_text="Control", + display_initial_text=True, + display_control_text=True, + fontsize=10, + legend_params={"fontsize": 8, "loc": "upper right"}, + fig=fig, + bounds=bounds, + initial_steps=initial_steps, + plot_ylimits=(0.0, 1.0), + ) + + # Verify plot lines are correctly plotted + lines = ax.get_lines() + + # Expected lines: data, setpoint, 2 x bounds, initial steps line + expected_lines = 2 + 2 * int(plot_bounds) + int(highlight_initial_steps) + assert len(lines) == expected_lines + + # Verify data is correctly plotted + line_data_label_tuples: list[tuple[np.ndarray, str]] = [ + (data, f"${var_symbol}_1${data_label}"), + (np.full(T, setpoint), f"${setpoint_var_symbol}_1$"), + ] + + for line_data, line_label in line_data_label_tuples: + data_line = next( + (line for line in lines if line.get_label() == line_label), None + ) + + assert data_line is not None + + x_data, y_data = data_line.get_data() + np.testing.assert_equal(x_data, np.arange(T)) + np.testing.assert_equal(y_data, line_data) + + # Check legend + legend = ax.get_legend() + labels = [text.get_text() for text in legend.get_texts()] + assert f"${var_symbol}_1${data_label}" in labels + assert f"${setpoint_var_symbol}_1$" in labels + + if plot_bounds: + assert "Constraints" in labels + + # Check initial steps highlighting elements if enabled + if highlight_initial_steps: + # Check rectangle (axvspan) for initial steps + rects = ax.patches + assert len(rects) > 0 + + assert isinstance(rects[0], Rectangle) # Ensure mypy infers rect type + rect = rects[0] + assert rect.get_x() == 0 + assert rect.get_width() == initial_steps + + # Check vertical dashed line (axvline) + for line in lines: + pattern = getattr(line, "_dash_pattern", None) + if pattern is not None: + if pattern == (0.0, [5.0, 5.0]): + assert np.asarray(line.get_xdata())[0] == initial_steps + + plt.close(fig) + + +@pytest.mark.parametrize("continuous_updates", [True, False]) +@pytest.mark.parametrize("highlight_initial_steps", [True, False]) +def test_plot_input_output_animation_return( + highlight_initial_steps: bool, + continuous_updates: bool, + dummy_plot_data: tuple[np.ndarray, ...], +) -> None: + # Define test parameters + u_k, y_k, u_s, y_s = dummy_plot_data + initial_steps = 10 if highlight_initial_steps else None + + anim = plot_input_output_animation( + u_k=u_k, + y_k=y_k, + u_s=u_s, + y_s=y_s, + initial_steps=initial_steps, + continuous_updates=continuous_updates, + ) + + assert isinstance(anim, FuncAnimation) + + plt.close("all") + + +@pytest.mark.parametrize("continuous_updates", [True, False]) +@pytest.mark.parametrize("highlight_initial_steps", [True, False]) +@pytest.mark.parametrize("plot_bounds", [True, False]) +def test_initialize_data_animation( + plot_bounds: bool, + highlight_initial_steps: bool, + continuous_updates: bool, +) -> None: + # Define test parameters + fig, ax = plt.subplots() + T = 50 + data = np.sin(np.linspace(0, 2 * np.pi, T)) + setpoint = np.array([0.5]) + bounds = (0.2, 0.8) if plot_bounds else None + initial_steps = 10 if highlight_initial_steps else None + + # Initialize plot element storage lists + lines: list[Line2D] = [] + rects: list[Rectangle] = [] + right_lines: list[Line2D] = [] + left_lines: list[Line2D] = [] + init_texts: list[Text] = [] + control_texts: list[Text] = [] + y_centers: list[float] = [] + + # Test animation plot element initialization + initialize_data_animation( + axis=ax, + data=data, + setpoint=setpoint, + index=0, + data_line_params={"color": "blue"}, + setpoint_line_params={"color": "green"}, + bounds_line_params={"color": "red"}, + var_symbol="u", + setpoint_var_symbol="u^s", + var_label="Input", + initial_text="Init", + control_text="Control", + fontsize=10, + legend_params={}, + lines=lines, + rects=rects, + right_rect_lines=right_lines, + left_rect_lines=left_lines, + init_texts=init_texts, + control_texts=control_texts, + y_axis_centers=y_centers, + bounds=bounds, + initial_steps=initial_steps, + initial_steps_label="Init period", + continuous_updates=continuous_updates, + ) + + # Verify plot objects are correctly created and stored in lists + assert len(lines) == 1 + assert len(y_centers) == 1 + + if initial_steps: + assert len(rects) == 1 + assert len(right_lines) == 1 + assert len(init_texts) == 1 + assert len(control_texts) == 1 + + if continuous_updates: + assert len(left_lines) == 1 + else: + assert len(left_lines) == 0 + else: + assert len(rects) == 0 + assert len(right_lines) == 0 + assert len(init_texts) == 0 + assert len(control_texts) == 0 + assert len(left_lines) == 0 + + plt.close(fig) + + +@pytest.mark.parametrize("continuous_updates", [True, False]) +@pytest.mark.parametrize("highlight_initial_steps", [True, False]) +def test_update_data_animation( + highlight_initial_steps: bool, + continuous_updates: bool, +) -> None: + # Define test parameters + fig, ax = plt.subplots() + T = 25 + index = 20 + data = np.sin(np.linspace(0, 1, T)) + initial_steps = 10 if highlight_initial_steps else None + + # Create dummy plot elements + line = Line2D([], []) + rect = Rectangle((0, 0), 0, 1) + right_line = Line2D([0], [0]) + left_line = Line2D([0], [0]) + init_text = ax.text(0, 0, "Init") + control_text = ax.text(0, 0, "Control") + + ax.add_line(line) + ax.add_patch(rect) + ax.add_line(right_line) + ax.add_line(left_line) + + # Test data animation update + update_data_animation( + index=index, + data=data, + data_length=T, + points_per_frame=1, + initial_steps=initial_steps, + continuous_updates=continuous_updates, + line=line, + rect=rect, + y_axis_center=0.5, + right_rect_line=right_line, + left_rect_line=left_line, + init_text_obj=init_text, + control_text_obj=control_text, + display_initial_text=True, + display_control_text=True, + init_text_width=5, + control_text_width=5, + ) + + # Verify updated line data + x_line, y_line = line.get_data() + assert len(np.asarray(x_line)) == index + 1 + np.testing.assert_equal(np.asarray(y_line), data[: index + 1]) + + if initial_steps: + # Check rectangle updated position and width + if continuous_updates: + assert rect.get_width() == initial_steps + assert rect.get_x() == index - initial_steps + else: + assert rect.get_width() == 0 + assert rect.get_x() == 0 + + # Check if boundary lines are at their expected position + if continuous_updates: + assert np.asarray(right_line.get_xdata())[0] == index + assert ( + np.asarray(left_line.get_xdata())[0] == index - initial_steps + ) + else: + assert np.asarray(left_line.get_xdata())[0] == 0 + assert np.asarray(right_line.get_xdata())[0] == 0 + + # Check label visibility + assert init_text.get_visible() + assert control_text.get_visible() + + plt.close(fig) + + +def test_save_animation(tmp_path: Path) -> None: + # Define test parameters + fig, _ = plt.subplots() + + def dummy_update(frame: int) -> list: + return [] + + anim = FuncAnimation(fig, dummy_update, frames=10) + file_path = os.path.join(tmp_path, "anim.gif") + + # Test save animation + save_animation( + anim, total_frames=10, fps=30, bitrate=2000, file_path=file_path + ) + + # Assert file exists (animation file was created) + assert os.path.isfile(file_path) + + plt.close(fig) + + +def test_create_input_output_figure() -> None: + # Define test parameters + m = 2 + p = 3 + + fig, axs_u, axs_y = create_input_output_figure( + m, p, figsize=(10, 6), dpi=100, fontsize=12, title="Test Fig" + ) + + # Ensure figure is created with correct number of subplots + assert isinstance(fig, Figure) + assert len(axs_u) == m + assert len(axs_y) == p + for ax in list(axs_u) + list(axs_y): + assert hasattr(ax, "plot") + + plt.close(fig) + + +def test_custom_legend_handler() -> None: + # Define test parameters + handler = HandlerInitMeasurementRect() + fig, ax = plt.subplots() + dummy_legend = Legend(ax, handles=[], labels=[]) + dummy_rect = Rectangle((0, 0), 1, 1, facecolor="gray", alpha=0.2) + + legend_artists = handler.create_artists( + legend=dummy_legend, + orig_handle=dummy_rect, + xdescent=0, + ydescent=0, + width=1, + height=1, + fontsize=10, + trans=plt.gca().transData, + ) + + assert any(isinstance(artist, Rectangle) for artist in legend_artists) + + plt.close(fig) diff --git a/tests/test_utilities/test_hankel_matrix.py b/tests/test_utilities/test_hankel_matrix.py new file mode 100644 index 0000000..21b4a76 --- /dev/null +++ b/tests/test_utilities/test_hankel_matrix.py @@ -0,0 +1,55 @@ +import numpy as np +import pytest + +from direct_data_driven_mpc.utilities.hankel_matrix import ( + evaluate_persistent_excitation, + hankel_matrix, +) + + +def test_hankel_matrix() -> None: + # Define test parameters + X = np.array([[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]) + L = 2 + expected_hankel = np.array( + [ + [0, 2, 4, 6], + [1, 3, 5, 7], + [2, 4, 6, 8], + [3, 5, 7, 9], + ] + ) + + # Construct hankel matrix + hankel = hankel_matrix(X, L) + + # Verify the resulting hankel matrix has correct shape and values + assert hankel.shape == expected_hankel.shape + np.testing.assert_array_equal(hankel, expected_hankel) + + +@pytest.mark.parametrize("pers_exc", [True, False]) +def test_evaluate_persistent_excitation(pers_exc: bool) -> None: + # Define test parameters + N = 5 + n = 2 + expected_order = 2 + + # Generate input based on the desired persistent excitation condition + if pers_exc: + X = np.random.uniform(-1, 1, (N, n)) + else: + X = np.ones((N, n)) + + # Evaluate persistent excitation + hankel_rank, is_pers_exc = evaluate_persistent_excitation( + X, expected_order + ) + + # Verify Hankel matrix rank and persistent excitation status + if pers_exc: + assert hankel_rank >= expected_order + assert is_pers_exc + else: + assert hankel_rank < expected_order + assert not is_pers_exc diff --git a/tests/test_utilities/test_initial_state_estimation.py b/tests/test_utilities/test_initial_state_estimation.py new file mode 100644 index 0000000..764a025 --- /dev/null +++ b/tests/test_utilities/test_initial_state_estimation.py @@ -0,0 +1,114 @@ +from typing import Any + +import numpy as np + +from direct_data_driven_mpc.utilities.initial_state_estimation import ( + calculate_equilibrium_input_from_output, + calculate_equilibrium_output_from_input, + estimate_initial_state, + observability_matrix, + toeplitz_input_output_matrix, +) + + +def test_observability_matrix(test_linear_system: Any) -> None: + # Define test parameters + A, _, C, _, _, Ot, _ = test_linear_system + expected_observability = Ot + + # Construct observability matrix + obs_matrix = observability_matrix(A, C) + + # Verify the resulting observability matrix has correct shape and values + assert obs_matrix.shape == expected_observability.shape + np.testing.assert_allclose(obs_matrix, expected_observability) + + +def test_toeplitz_input_output_matrix(test_linear_system: Any) -> None: + # Define test parameters + A, B, C, D, t, _, Tt = test_linear_system + expected_toeplitz = Tt + + # Construct Toeplitz matrix + toeplitz_matrix = toeplitz_input_output_matrix(A, B, C, D, t) + + # Verify the resulting Toeplitz matrix has correct shape and values + assert toeplitz_matrix.shape == expected_toeplitz.shape + np.testing.assert_allclose(toeplitz_matrix, expected_toeplitz) + + +def test_estimate_initial_state(test_linear_system: Any) -> None: + # Define test parameters + A, B, C, D, _, Ot, Tt = test_linear_system + x0_true = np.array([0.1, 1.5, -0.5]) # Known initial state + np_random = np.random.default_rng(0) + + # Simulate linear system + n_steps = A.shape[0] + u_k = np_random.uniform(-1.0, 1.0, (n_steps, B.shape[1])) + y_k = np.zeros((n_steps, C.shape[0])) + x = x0_true + + for k in range(n_steps): + y_k[k, :] = C @ x + D @ u_k[k, :] + x = A @ x + B @ u_k[k, :] + + # Calculate estimated initial state + x0_estimate = estimate_initial_state( + Ot=Ot, Tt=Tt, U=u_k.flatten(), Y=y_k.flatten() + ) + + # Verify that the estimated value is close to the simulated one + np.testing.assert_allclose(x0_estimate, x0_true) + + +def test_calculate_equilibrium_output_from_input( + test_linear_system: Any, +) -> None: + # Define test parameters + A, B, C, D, _, _, _ = test_linear_system + u_eq = np.array([0.5]) # Equilibrium input + + # Simulate linear system for a large number of steps to reach equilibrium + n_steps = 15 + u_k = np.tile(u_eq, n_steps).reshape(-1, 1) + y_k = np.zeros((n_steps, C.shape[0])) + x = np.zeros(A.shape[0]) + + for k in range(n_steps): + y_k[k, :] = C @ x + D @ u_k[k, :] + x = A @ x + B @ u_k[k, :] + + y_eq_sim = y_k[-1, :] + + # Calculate equilibrium output + y_eq = calculate_equilibrium_output_from_input(A, B, C, D, u_eq) + + # Verify that the estimated value is close to the simulated one + np.testing.assert_allclose(y_eq, y_eq_sim) + + +def test_calculate_equilibrium_input_from_output( + test_linear_system: Any, +) -> None: + # Define test parameters + A, B, C, D, _, _, _ = test_linear_system + expected_u_eq = np.array([0.5]) # Expected equilibrium input + + # Simulate linear system for a large number of steps to reach equilibrium + n_steps = 15 + u_k = np.tile(expected_u_eq, n_steps).reshape(-1, 1) + y_k = np.zeros((n_steps, C.shape[0])) + x = np.zeros(A.shape[0]) + + for k in range(n_steps): + y_k[k, :] = C @ x + D @ u_k[k, :] + x = A @ x + B @ u_k[k, :] + + y_eq_sim = y_k[-1, :] + + # Calculate equilibrium input + u_eq = calculate_equilibrium_input_from_output(A, B, C, D, y_eq_sim) + + # Verify that the estimated value is close to the simulated one + np.testing.assert_allclose(u_eq, expected_u_eq) diff --git a/tests/test_utilities/test_yaml_config_loading.py b/tests/test_utilities/test_yaml_config_loading.py new file mode 100644 index 0000000..081cf65 --- /dev/null +++ b/tests/test_utilities/test_yaml_config_loading.py @@ -0,0 +1,39 @@ +from pathlib import Path + +import pytest +import yaml + +from direct_data_driven_mpc.utilities.yaml_config_loading import ( + load_yaml_config_params, +) + + +def test_load_yaml_valid_key(tmp_path: Path) -> None: + # Create test YAML file + config_data = {"controller_key": {"param_1": 1, "param_2": 2}} + config_file = tmp_path / "test_config.yaml" + config_file.write_text(yaml.dump(config_data)) + + # Load and retrieve parameters from config file + result = load_yaml_config_params(str(config_file), "controller_key") + + # Verify parameters are correctly loaded + assert result == {"param_1": 1, "param_2": 2} + + +def test_load_yaml_missing_file() -> None: + # Verify that `FileNotFoundError` is raised when given a non-existent file + with pytest.raises(FileNotFoundError): + load_yaml_config_params("nonexistent.yaml", "key") + + +def test_load_yaml_missing_key(tmp_path: Path) -> None: + # Create test YAML file + config_data = {"different_key": 123} + config_file = tmp_path / "config.yaml" + config_file.write_text(yaml.dump(config_data)) + + # Verify that `ValueError` is raised when the + # specified controller key is missing + with pytest.raises(ValueError, match="Missing `controller` value"): + load_yaml_config_params(str(config_file), "controller") From e8f59b35eeecf4183708d8489eb23e13d98b3c96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Sun, 1 Jun 2025 00:18:19 -0500 Subject: [PATCH 2/6] Fix restrictive Hankel matrix persistent excitation check Modify the persistent excitation condition evaluation in `evaluate_persistent_excitation` to check for `rank >= n * order` insted of requiring an exact match. This aligns with the theoretical definition, allowing signals that are more "exciting" than necessary to pass this check. --- direct_data_driven_mpc/utilities/hankel_matrix.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/direct_data_driven_mpc/utilities/hankel_matrix.py b/direct_data_driven_mpc/utilities/hankel_matrix.py index 13e3174..86ecbcf 100644 --- a/direct_data_driven_mpc/utilities/hankel_matrix.py +++ b/direct_data_driven_mpc/utilities/hankel_matrix.py @@ -62,7 +62,8 @@ def evaluate_persistent_excitation( sequence of `N` elements, each of length `n`. This is determined by checking if the rank of the Hankel matrix - constructed from `X` is equal to the expected rank `n * order`. + constructed from `X` is greater than or equal to the expected rank + `n * order`. Args: X (np.ndarray): Input data matrix of shape (N, n), where N is the @@ -81,7 +82,7 @@ def evaluate_persistent_excitation( # Calculate the Hankel matrix order rank_H_order = np.linalg.matrix_rank(H_order) - # Evaluate the persistently exiting nature of X - pers_exciting = rank_H_order == n * (order) + # Check if X is persistently exciting of the given order + pers_exciting = rank_H_order >= n * order return rank_H_order, pers_exciting From 5e606c90214d11d033a1c7a0f34f62eaa9a491dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Mon, 2 Jun 2025 01:09:45 -0500 Subject: [PATCH 3/6] Fix initial steps highlighting in animation plots Modify plot functions to only select initial step plot elements (`rect`, `right_rect_line`, `left_rect_line`, `init_text`, `control_text`) when initial steps highlighting is enabled. Previously, these elements were always selected from their storage lists, causing errors when they were not initialized (e.g., when initial steps highlighting was disabled). --- .../utilities/data_visualization.py | 125 ++++++++++++------ 1 file changed, 83 insertions(+), 42 deletions(-) diff --git a/direct_data_driven_mpc/utilities/data_visualization.py b/direct_data_driven_mpc/utilities/data_visualization.py index 0bce44f..cf54cbc 100644 --- a/direct_data_driven_mpc/utilities/data_visualization.py +++ b/direct_data_driven_mpc/utilities/data_visualization.py @@ -856,22 +856,27 @@ def plot_input_output_animation( legend_loc="lower right", ) - # Get initial text bounding box width - init_text_width_input = get_text_width_in_data( - text_object=u_init_texts[0], axis=axs_u[0], fig=fig - ) - init_text_width_output = get_text_width_in_data( - text_object=y_init_texts[0], axis=axs_y[0], fig=fig - ) + # Calculate text bounding box width for initial and + # control data if initial steps highlighting is enabled + init_text_width = 0.0 + control_text_width = 0.0 + if initial_steps: + # Get initial text bounding box width + init_text_width_input = get_text_width_in_data( + text_object=u_init_texts[0], axis=axs_u[0], fig=fig + ) + init_text_width_output = get_text_width_in_data( + text_object=y_init_texts[0], axis=axs_y[0], fig=fig + ) - # Calculate maximum text width between input and - # output labels to show them at the same time - init_text_width = max(init_text_width_input, init_text_width_output) + # Calculate maximum text width between input and + # output labels to show them at the same time + init_text_width = max(init_text_width_input, init_text_width_output) - # Get control text bounding box width - control_text_width = get_text_width_in_data( - text_object=u_control_texts[0], axis=axs_u[0], fig=fig - ) + # Get control text bounding box width + control_text_width = get_text_width_in_data( + text_object=u_control_texts[0], axis=axs_u[0], fig=fig + ) # Animation update function def update(frame: int) -> list[Any]: @@ -881,11 +886,26 @@ def update(frame: int) -> list[Any]: # Update input plot data for i in range(m): - # Get lower boundary line of the initial measurement region - # if continuous updates are enabled - u_left_rect_line = ( - u_left_rect_lines[i] if continuous_updates else None - ) + # Get initial step plot elements if + # initial steps highlighting is enabled + if initial_steps: + u_rect = u_rects[i] + u_right_rect_line = u_right_rect_lines[i] + u_init_text = u_init_texts[i] + u_control_text = u_control_texts[i] + + # Get lower boundary line of the initial measurement + # region if continuous updates are enabled + u_left_rect_line = ( + u_left_rect_lines[i] if continuous_updates else None + ) + else: + u_rect = None + u_right_rect_line = None + u_init_text = None + u_control_text = None + u_left_rect_line = None + update_data_animation( index=current_index, data=u_k[: current_index + 1, i], @@ -894,12 +914,12 @@ def update(frame: int) -> list[Any]: initial_steps=initial_steps, continuous_updates=continuous_updates, line=u_lines[i], - rect=u_rects[i], + rect=u_rect, y_axis_center=u_y_axis_centers[i], - right_rect_line=u_right_rect_lines[i], + right_rect_line=u_right_rect_line, left_rect_line=u_left_rect_line, - init_text_obj=u_init_texts[i], - control_text_obj=u_control_texts[i], + init_text_obj=u_init_text, + control_text_obj=u_control_text, display_initial_text=display_initial_text, display_control_text=display_control_text, init_text_width=init_text_width, @@ -908,11 +928,26 @@ def update(frame: int) -> list[Any]: # Update output plot data for j in range(p): - # Get lower boundary line of the initial measurement region - # if continuous updates are enabled - y_left_rect_line = ( - y_left_rect_lines[j] if continuous_updates else None - ) + # Get initial step plot elements if + # initial steps highlighting is enabled + if initial_steps: + y_rect = y_rects[j] + y_right_rect_line = y_right_rect_lines[j] + y_init_text = y_init_texts[j] + y_control_text = y_control_texts[j] + + # Get lower boundary line of the initial measurement + # region if continuous updates are enabled + y_left_rect_line = ( + y_left_rect_lines[j] if continuous_updates else None + ) + else: + y_rect = None + y_right_rect_line = None + y_init_text = None + y_control_text = None + y_left_rect_line = None + update_data_animation( index=current_index, data=y_k[: current_index + 1, j], @@ -921,12 +956,12 @@ def update(frame: int) -> list[Any]: initial_steps=initial_steps, continuous_updates=continuous_updates, line=y_lines[j], - rect=y_rects[j], + rect=y_rect, y_axis_center=y_y_axis_centers[j], - right_rect_line=y_right_rect_lines[j], + right_rect_line=y_right_rect_line, left_rect_line=y_left_rect_line, - init_text_obj=y_init_texts[j], - control_text_obj=y_control_texts[j], + init_text_obj=y_init_text, + control_text_obj=y_control_text, display_initial_text=display_initial_text, display_control_text=display_control_text, init_text_width=init_text_width, @@ -1228,12 +1263,12 @@ def update_data_animation( initial_steps: int | None, continuous_updates: bool, line: Line2D, - rect: Rectangle, + rect: Rectangle | None, y_axis_center: float, - right_rect_line: Line2D, + right_rect_line: Line2D | None, left_rect_line: Line2D | None, - init_text_obj: Text, - control_text_obj: Text, + init_text_obj: Text | None, + control_text_obj: Text | None, display_initial_text: bool, display_control_text: bool, init_text_width: float, @@ -1263,17 +1298,17 @@ def update_data_animation( highlight should move with the latest data to represent continuous input-output measurement updates. line (Line2D): The plot line corresponding to the data series plot. - rect (Rectangle): The rectangle representing the initial measurement - region. + rect (Rectangle | None): The rectangle representing the initial + measurement region. y_axis_center (float): The y-axis center of the plot axis. - right_rect_line (Line2D): The line object representing the upper + right_rect_line (Line2D | None): The line object representing the upper boundary of the initial measurement region. left_rect_line (Line2D | None]): The line object representing the lower boundary of the initial measurement region. - init_text_obj (Text): The text object containing the initial + init_text_obj (Text | None): The text object containing the initial measurement period label. - control_text_obj (Text): The text object containing the control period - label. + control_text_obj (Text | None): The text object containing the control + period label. display_initial_text (bool): Whether to display the `initial_text` label on the plot. display_control_text (bool): Whether to display the `control_text` @@ -1301,6 +1336,12 @@ def update_data_animation( min(index, initial_steps) if not continuous_updates else index ) + # Ensure type safety for static checking + assert rect is not None + assert right_rect_line is not None + assert init_text_obj is not None + assert control_text_obj is not None + # Update rectangle width rect_width = min(lim_index, initial_steps) rect.set_width(rect_width) From 7896c7a0ecb251ff2416a3a6f53e65e5da01c459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Wed, 4 Jun 2025 03:43:08 -0500 Subject: [PATCH 4/6] Remove `c` parameter from nonlinear controller TypedDict Remove the `c` parameter from `NonlinearDataDrivenMPCParams`. This parameter was mistakenly included and is not used by the nonlinear data-driven MPC controller. --- direct_data_driven_mpc/utilities/controller/controller_params.py | 1 - 1 file changed, 1 deletion(-) diff --git a/direct_data_driven_mpc/utilities/controller/controller_params.py b/direct_data_driven_mpc/utilities/controller/controller_params.py index 69cadb4..cca3410 100644 --- a/direct_data_driven_mpc/utilities/controller/controller_params.py +++ b/direct_data_driven_mpc/utilities/controller/controller_params.py @@ -75,7 +75,6 @@ class NonlinearDataDrivenMPCParams(TypedDict, total=False): lamb_alpha: float # Regularization parameter for alpha lamb_sigma: float # Regularization parameter for sigma - c: float # Convex slack variable constraint constant U: np.ndarray # Bounds for the predicted input Us: np.ndarray # Bounds for the predicted input setpoint From a4b16728374173aa9e4cd64a008f2d80d085af33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Wed, 4 Jun 2025 04:44:19 -0500 Subject: [PATCH 5/6] Use `python:3.12-slim-bullseye` container in CI workflow Set the CI workflow container to `python:3.12-slim-bullseye`, which includes `gcc` and other essential build tools required by libraries used during test execution. --- .github/workflows/ci_workflow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index 9a63f46..b4a9f36 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -10,7 +10,7 @@ on: jobs: ci_workflow: runs-on: ubuntu-latest - container: python:3.12-slim + container: python:3.12-slim-bullseye steps: - name: Checkout uses: actions/checkout@v4 From f6bb0cdbcf3d49e10c6e14d24c3e64251dc47412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A1vel=20A=2E=20Campos-Pe=C3=B1a?= Date: Wed, 4 Jun 2025 04:48:23 -0500 Subject: [PATCH 6/6] Add ffmpeg installation step in CI workflow Install ffmpeg in the CI workflow runner to support saving animation files during tests. This prevents the following error during CI workflow tests: "FileNotFoundError: [Errno 2] No such file or directory: 'ffmpeg'" --- .github/workflows/ci_workflow.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index b4a9f36..7327873 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -15,6 +15,11 @@ jobs: - name: Checkout uses: actions/checkout@v4 + - name: Install ffmpeg + run: | + apt-get update && \ + apt-get install -y ffmpeg + - name: Install dependencies run: | python -m pip install --upgrade pip