diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py b/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py new file mode 100644 index 00000000000..8ddc3868997 --- /dev/null +++ b/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py @@ -0,0 +1,77 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +from pyomo.common.dependencies import pandas as pd +import pyomo.contrib.parmest.parmest as parmest +from pyomo.contrib.parmest.examples.rooney_biegler.rooney_biegler import ( + RooneyBieglerExperiment, +) + + +def main(): + + # Rooney & Biegler Reference Values + # a = 19.14, b = 0.53 + theta_ref = pd.Series({'asymptote': 20.0, 'rate_constant': 0.8}) + + # Create a 'Stiff' Prior for 'asymptote' but leave 'rate_constant' flexible + prior_FIM = pd.DataFrame( + [[1000.0, 0.0], [0.0, 0.0]], + index=['asymptote', 'rate_constant'], + columns=['asymptote', 'rate_constant'], + ) + + # Data + data = pd.DataFrame( + data=[[1, 8.3], [2, 10.3], [3, 19.0], [4, 16.0], [5, 15.6], [7, 19.8]], + columns=['hour', 'y'], + ) + + # Create an experiment list + exp_list = [] + for i in range(data.shape[0]): + exp_list.append(RooneyBieglerExperiment(data.loc[i, :])) + + # Create an instance of the parmest estimator + pest = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization='L2', + prior_FIM=prior_FIM, + theta_ref=theta_ref, + regularization_weight=None, + ) + + # Parameter estimation and covariance + obj, theta = pest.theta_est() + cov = pest.cov_est() + + if parmest.graphics.seaborn_available: + parmest.graphics.pairwise_plot( + (theta, cov, 100), + theta_star=theta, + alpha=0.8, + distributions=['MVN'], + title='Theta estimates within 80% confidence region', + ) + + # Assert statements compare parameter estimation (theta) to an expected value + # relative_error = abs(theta['asymptote'] - 19.1426) / 19.1426 + # assert relative_error < 0.01 + # relative_error = abs(theta['rate_constant'] - 0.5311) / 0.5311 + # assert relative_error < 0.01 + + return obj, theta, cov + + +if __name__ == "__main__": + obj, theta, cov = main() + print("Estimated parameters (theta):", theta) + print("Objective function value at theta:", obj) + print("Covariance of parameter estimates:", cov) diff --git a/pyomo/contrib/parmest/parmest.py b/pyomo/contrib/parmest/parmest.py index 21138d472da..aa85acc28a0 100644 --- a/pyomo/contrib/parmest/parmest.py +++ b/pyomo/contrib/parmest/parmest.py @@ -297,6 +297,140 @@ def SSE_weighted(model): ) +def _calculate_L2_penalty(model, prior_FIM, theta_ref=None): + """ + Calculates objective + (theta - theta_ref)^T * prior_FIM * (theta - theta_ref) + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. If None, it is automatically calculated as the ratio + of the objective value to the L2 term value at the current parameter values to balance their magnitudes. + obj_function: callable, optional + Built-in objective function selected by the user, e.g., `SSE`. Default is `SSE`. + + Returns + ------- + expr : Pyomo expression + Expression representing the L2-regularized objective addition + """ + + # Get current model parameters + # We assume model.unknown_parameters is a list of Pyomo Var objects + current_param_names = [p.name for p in model.unknown_parameters] + param_map = {p.name: p for p in model.unknown_parameters} + + # Confirm all the parameters in the prior_FIM columns are in the model parameters + common_params = [p for p in current_param_names if p in prior_FIM.columns] + + if len(common_params) == 0: + + logger.warning( + "Warning: No matching parameters found between Model and Prior FIM. Returning standard objective." + ) + + return 0.0 # No regularization if no common parameters + + elif len(common_params) < len(prior_FIM.columns): + + logger.warning( + "Warning: Only a subset of parameters in the Prior FIM match the Model parameters. " + "Regularization will be applied only to the matching subset." + ) + else: + + logger.info( + "All parameters in the Prior FIM match the Model parameters. Regularization will be applied to all parameters." + ) + + # Slice the dataframes to ONLY the common subset + sub_FIM = prior_FIM.loc[common_params, common_params] + + # For the reference theta, we also subset to the common parameters. If theta_ref is None, we create a zero vector of the right size. + if theta_ref is not None: + sub_theta = theta_ref.loc[common_params] + else: + sub_theta = pd.Series(0, index=common_params) + + # Fill the sub_theta with the initialized model parameter values (or zeros if not initialized) + for param in common_params: + sub_theta.loc[param] = pyo.value(param_map[param]) + logger.info( + "theta_ref is None. Using initialized parameter values as reference." + ) + + # Construct the Quadratic Form (The L2 term) + # @Reviewers: This can be calculated in a loop or in a vector form. + # Are there any issues with the vectorized form that we should be aware of for Pyomo expressions? + # The loop form is more transparent but the vectorized form is more efficient and concise. + + # l2_expr = 0.0 + # for i in common_params: + # di = name_to_var[i] - float(theta0.loc[i]) + # for j in common_params: + # qij = float(sub_FIM.loc[i, j]) + # if qij != 0.0: + # dj = name_to_var[j] - float(theta0.loc[j]) + # l2_expr += qij * di * dj + + # Create the (theta - theta_ref) vector, ensuring alignment by parameter name + delta_params = np.array( + [param_map[p] - sub_theta.loc[p] for p in common_params] + ) # (theta - theta_ref) vector + + # Compute the quadratic form: delta^T * FIM * delta + l2_term = delta_params.T @ sub_FIM.values @ delta_params + l2_term *= 0.5 # apply the 0.5 convention for quadratic regularization + + return l2_term + + +def L2_regularized_objective( + model, prior_FIM, theta_ref=None, regularization_weight=1.0, obj_function=SSE +): + """ + Calculates objective + (theta - theta_ref)^T * prior_FIM * (theta - theta_ref) + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. If None, it is automatically calculated as the ratio + of the objective value to the L2 term value at the current parameter values to balance their magnitudes. + obj_function: callable, optional + Built-in objective function selected by the user, e.g., `SSE`. Default is `SSE`. + + Returns + ------- + expr : Pyomo expression + Expression representing the L2-regularized objective + """ + + # Calculate the L2 penalty term + l2_term = _calculate_L2_penalty(model, prior_FIM, theta_ref) + + # Combine with objective + object_expr = obj_function(model) + + # Calculate the regularized objective + l2reg_objective = object_expr + (regularization_weight * l2_term) + + return l2reg_objective + + def _check_model_labels(model): """ Checks if the annotated Pyomo model contains the necessary suffixes @@ -375,6 +509,10 @@ class ObjectiveType(Enum): SSE_weighted = "SSE_weighted" +class RegularizationType(Enum): + L2 = "L2" + + # Compute the Jacobian matrix of measured variables with respect to the parameters def _compute_jacobian(experiment, theta_vals, step, solver, tee): """ @@ -474,6 +612,8 @@ def compute_covariance_matrix( solver, tee, estimated_var=None, + prior_FIM=None, + regularization_weight=None, ): """ Computes the covariance matrix of the estimated parameters using @@ -502,7 +642,13 @@ def compute_covariance_matrix( Value of the estimated variance of the measurement error in cases where the user does not supply the measurement error standard deviation - + prior_FIM: pd.DataFrame, optional + Prior Fisher Information Matrix from previous experimental design + to be added to the FIM of the current experiments for covariance estimation. + The prior_FIM should be a square matrix with parameter names as both + row and column labels. + regularization_weight: float, optional + Weighting factor for the regularization term. Default is 1.0. Returns ------- cov : pd.DataFrame @@ -540,6 +686,25 @@ def compute_covariance_matrix( FIM = np.sum(FIM_all_exp, axis=0) + # Add prior_FIM if including regularization. We expand the prior FIM to match the size of the + # FIM and align it based on parameter names to ensure correct addition. + # Add the prior FIM to the FIM of the current experiments, weighted by the regularization weight + if prior_FIM is not None: + if regularization_weight is None: + raise ValueError( + "regularization_weight must be provided when prior_FIM is used." + ) + expanded_prior_FIM = _expand_prior_FIM( + experiment_list[0], prior_FIM # theta_vals + ) # sanity check and alignment + + # Check that the prior FIM shape is the same as the FIM shape + if expanded_prior_FIM.shape != FIM.shape: + raise ValueError( + "The shape of the prior FIM must be the same as the shape of the FIM." + ) + FIM += expanded_prior_FIM * regularization_weight + # calculate the covariance matrix try: cov = np.linalg.inv(FIM) @@ -745,6 +910,44 @@ def _kaug_FIM(experiment, obj_function, theta_vals, solver, tee, estimated_var=N return FIM +def _expand_prior_FIM(experiment, prior_FIM): + """ + Expands the prior FIM to match the size of the FIM of the current experiment + + Parameters + ---------- + experiment : Experiment class + Experiment class object that contains the Pyomo model + for a particular experimental condition + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + + Returns + ------- + expanded_prior_FIM : pd.DataFrame + Expanded prior FIM with the same size as the FIM of the current experiment + """ + model = _get_labeled_model(experiment) + + # Extract parameter names from the Pyomo model + param_names = [param.name for param in model.unknown_parameters] + + # 1. Expand Prior FIM + # We reindex to match param_names. Parameters not in prior_FIM + # will be filled with 0, which maintains the PSD property. + expanded_prior_FIM = prior_FIM.reindex( + index=param_names, columns=param_names, fill_value=0.0 + ) + + # 3. Sanity Check: Positive Semi-Definiteness + # An FIM must have non-negative eigenvalues to be valid for optimization. + eigenvalues = np.linalg.eigvals(expanded_prior_FIM.values) + if np.any(eigenvalues < -1e-10): # Tolerance for numerical noise + raise ValueError("The expanded prior FIM is not positive semi-definite.") + + return expanded_prior_FIM + + class Estimator: """ Parameter estimation class @@ -768,6 +971,22 @@ class Estimator: solver_options: dict, optional Provides options to the solver (also the name of an attribute). Default is None. + + Added keyword arguments for L2 regularization: + regularization: string or function, optional + Built-in regularization type ("L2") or custom function used to + formulate the regularization term. If no regularization is specified, + no regularization term is added to the objective. Default is None. + prior_FIM: pd.DataFrame, optional + Prior Fisher Information Matrix from previous experimental design + to be added to the FIM of the current experiments for regularization. + The prior_FIM should be a square matrix with parameter names as both + row and column labels. + theta_ref: pd.Series, optional + Reference parameter values used in regularization. + If None, defaults to the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. Default is 1.0. """ # The singledispatchmethod decorator is used here as a deprecation @@ -780,6 +999,10 @@ def __init__( self, experiment_list, obj_function=None, + regularization=None, + prior_FIM=None, + theta_ref=None, + regularization_weight=None, tee=False, diagnostic_mode=False, solver_options=None, @@ -810,10 +1033,26 @@ def __init__( else: self.obj_function = obj_function + if isinstance(regularization, str): + try: + self.regularization = RegularizationType(regularization) + except ValueError: + raise ValueError( + f"Invalid regularization type: '{regularization}'. " + f"Choose from: {[e.value for e in RegularizationType]}." + ) + else: + self.regularization = regularization + self.tee = tee self.diagnostic_mode = diagnostic_mode self.solver_options = solver_options + # Added keyword arguments for L2 regularization + self.prior_FIM = prior_FIM + self.theta_ref = theta_ref + self.regularization_weight = regularization_weight + # TODO: delete this when the deprecated interface is removed self.pest_deprecated = None @@ -909,6 +1148,63 @@ def _expand_indexed_unknowns(self, model_temp): return model_theta_list + # Reviewers: Put in architecture to calculate a regularization weight based on the current parameter values and the prior FIM. + # However, if the prior_FIM is properly defined, this should not be necessary. Are there any use cases for this where we should give + # the user a scaling option, or remove and trust the prior_FIM to be properly scaled? + + # def _calc_regularization_weight(self, solver='ipopt'): + # """ + # Calculate regularization weight as the ratio of the objective value to the L2 term value at the current parameter values to balance their magnitudes. + # """ + # # Solve the model at the current parameter values to get the objective function value + # sse_vals = [] + # for experiment in self.exp_list: + # model = _get_labeled_model(experiment) + + # # fix the value of the unknown parameters to the estimated values + # for param in model.unknown_parameters: + # param.fix(pyo.value(param)) + + # # re-solve the model with the estimated parameters + # results = pyo.SolverFactory(solver).solve(model, tee=self.tee) + # assert_optimal_termination(results) + + # # choose and evaluate the sum of squared errors expression + # if self.obj_function == ObjectiveType.SSE: + # sse_expr = SSE(model) + # elif self.obj_function == ObjectiveType.SSE_weighted: + # sse_expr = SSE_weighted(model) + # else: + # raise ValueError( + # f"Invalid objective function for covariance calculation. " + # f"The covariance matrix can only be calculated using the built-in " + # f"objective functions: {[e.value for e in ObjectiveType]}. Supply " + # f"the Estimator object one of these built-in objectives and " + # f"re-run the code." + # ) + # l2_expr = _calculate_L2_penalty(model, self.prior_FIM, self.theta_ref) + + # # evaluate the numerical SSE and store it + # sse_val = pyo.value(sse_expr) + # sse_vals.append(sse_val) + + # sse = sum(sse_vals) + + # if l2_expr is None: + # l2_value = 0 + # else: + # l2_value = pyo.value(l2_expr) + + # if l2_value == 0: + # logger.warning( + # "L2 penalty is zero at the current parameter values. Regularization weight set to 1.0 by default." + # ) + # return 1.0 + + # reg_weight = float(pyo.value(sse) / (pyo.value(l2_value))) + # logger.info(f"Calculated regularization weight: {reg_weight}") + # return reg_weight + def _create_parmest_model(self, experiment_number): """ Modify the Pyomo model for parameter estimation @@ -939,12 +1235,34 @@ def _create_parmest_model(self, experiment_number): # TODO, this needs to be turned into an enum class of options that still support # custom functions - if self.obj_function is ObjectiveType.SSE: - second_stage_rule = SSE - self.covariance_objective = second_stage_rule - elif self.obj_function is ObjectiveType.SSE_weighted: - second_stage_rule = SSE_weighted - self.covariance_objective = second_stage_rule + + if self.obj_function in ObjectiveType: + + if self.obj_function == ObjectiveType.SSE: + self.covariance_objective = SSE + + elif self.obj_function == ObjectiveType.SSE_weighted: + self.covariance_objective = SSE_weighted + + if RegularizationType.L2 and self.prior_FIM is not None: + + if self.regularization_weight is None: + self.regularization_weight = 1.0 + + # self.regularization_weight = self._calc_regularization_weight( + # solver='ipopt' + # ) + + second_stage_rule = lambda m: L2_regularized_objective( + m, + prior_FIM=self.prior_FIM, + theta_ref=self.theta_ref, + regularization_weight=self.regularization_weight, + obj_function=self.covariance_objective, + ) + else: + second_stage_rule = self.covariance_objective + else: # A custom function uses model.experiment_outputs as data second_stage_rule = self.obj_function @@ -1298,6 +1616,8 @@ def _cov_at_theta(self, method, solver, step): method, obj_function=self.covariance_objective, theta_vals=self.estimated_theta, + prior_FIM=self.prior_FIM, + regularization_weight=self.regularization_weight, solver=solver, step=step, tee=self.tee, @@ -1328,7 +1648,10 @@ def _cov_at_theta(self, method, solver, step): solver=solver, step=step, tee=self.tee, + prior_FIM=self.prior_FIM, + regularization_weight=self.regularization_weight, ) + else: raise ValueError( "One or more values of the measurement errors have " @@ -1366,6 +1689,8 @@ def _cov_at_theta(self, method, solver, step): method, obj_function=self.covariance_objective, theta_vals=self.estimated_theta, + prior_FIM=self.prior_FIM, + regularization_weight=self.regularization_weight, step=step, solver=solver, tee=self.tee, @@ -1610,6 +1935,8 @@ def _get_sample_list(self, samplesize, num_samples, replacement=True): return samplelist + # @Reviewers: Currently regularization chosen with objective at Estimator initialization, + # Would it be preferable to have regularization choice as an argument in the theta_est function instead? def theta_est( self, solver="ef_ipopt", return_values=[], calc_cov=NOTSET, cov_n=NOTSET ):