diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index cd7271fec..f4bbe2326 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -151,6 +151,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.spec.datetime_column.name, + postprocessing=self.spec.postprocessing, ) Parallel(n_jobs=-1, require="sharedmem")( diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index e1730ac0a..ae8e4812d 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -100,6 +100,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.spec.datetime_column.name, + postprocessing=self.spec.postprocessing, ) # Clean up kwargs for pass through diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 65af79cc5..879346c2e 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -54,6 +54,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.spec.datetime_column.name, + postprocessing=self.spec.postprocessing, ) try: model = self.loaded_models if self.loaded_models is not None else None diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 3019b6839..9829bec62 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -5,6 +5,7 @@ from typing import Dict, List +import numpy as np import pandas as pd from ads.opctl import logger @@ -18,13 +19,15 @@ get_frequency_of_datetime, ) -from ..const import ForecastOutputColumns, SupportedModels, TROUBLESHOOTING_GUIDE -from ..operator_config import ForecastOperatorConfig +from ..const import TROUBLESHOOTING_GUIDE, ForecastOutputColumns, SupportedModels +from ..operator_config import ForecastOperatorConfig, PostprocessingSteps class HistoricalData(AbstractData): def __init__(self, spec, historical_data=None, subset=None): - super().__init__(spec=spec, name="historical_data", data=historical_data, subset=subset) + super().__init__( + spec=spec, name="historical_data", data=historical_data, subset=subset + ) self.subset = subset def _ingest_data(self, spec): @@ -49,15 +52,19 @@ def _verify_dt_col(self, spec): f"{SupportedModels.AutoMLX} requires data with a frequency of at least one hour. Please try using a different model," " or select the 'auto' option." ) - raise InvalidParameterError(f"{message}" - f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps.") + raise InvalidParameterError( + f"{message}" + f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." + ) class AdditionalData(AbstractData): def __init__(self, spec, historical_data, additional_data=None, subset=None): self.subset = subset if additional_data is not None: - super().__init__(spec=spec, name="additional_data", data=additional_data, subset=subset) + super().__init__( + spec=spec, name="additional_data", data=additional_data, subset=subset + ) self.additional_regressors = list(self.data.columns) elif spec.additional_data is not None: super().__init__(spec=spec, name="additional_data", subset=subset) @@ -70,7 +77,7 @@ def __init__(self, spec, historical_data, additional_data=None, subset=None): ) elif historical_data.get_max_time() != add_dates[-(spec.horizon + 1)]: raise DataMismatchError( - f"The Additional Data must be present for all historical data and the entire horizon. The Historical Data ends on {historical_data.get_max_time()}. The additonal data horizon starts after {add_dates[-(spec.horizon+1)]}. These should be the same date." + f"The Additional Data must be present for all historical data and the entire horizon. The Historical Data ends on {historical_data.get_max_time()}. The additonal data horizon starts after {add_dates[-(spec.horizon + 1)]}. These should be the same date." f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) else: @@ -150,7 +157,9 @@ def __init__( self._datetime_column_name = config.spec.datetime_column.name self._target_col = config.spec.target_column if historical_data is not None: - self.historical_data = HistoricalData(config.spec, historical_data, subset=subset) + self.historical_data = HistoricalData( + config.spec, historical_data, subset=subset + ) self.additional_data = AdditionalData( config.spec, self.historical_data, additional_data, subset=subset ) @@ -276,6 +285,7 @@ def __init__( horizon: int, target_column: str, dt_column: str, + postprocessing: PostprocessingSteps, ): """Forecast Output contains all the details required to generate the forecast.csv output file. @@ -285,12 +295,14 @@ def __init__( horizon: int length of horizon target_column: str the name of the original target column dt_column: the name of the original datetime column + postprocessing: postprocessing steps to be executed """ self.series_id_map = {} self._set_ci_column_names(confidence_interval_width) self.horizon = horizon self.target_column_name = target_column self.dt_column_name = dt_column + self.postprocessing = postprocessing def add_series_id( self, @@ -337,6 +349,12 @@ def populate_series_output( -------- None """ + min_threshold, max_threshold = ( + self.postprocessing.set_min_forecast, + self.postprocessing.set_max_forecast, + ) + if min_threshold is not None or max_threshold is not None: + np.clip(forecast_val, min_threshold, max_threshold, out=forecast_val) try: output_i = self.series_id_map[series_id] except KeyError as e: @@ -422,9 +440,9 @@ def _set_ci_column_names(self, confidence_interval_width): def _check_forecast_format(self, forecast): assert isinstance(forecast, pd.DataFrame) - assert ( - len(forecast.columns) == 7 - ), f"Expected just 7 columns, but got: {forecast.columns}" + assert len(forecast.columns) == 7, ( + f"Expected just 7 columns, but got: {forecast.columns}" + ) assert ForecastOutputColumns.DATE in forecast.columns assert ForecastOutputColumns.SERIES in forecast.columns assert ForecastOutputColumns.INPUT_VALUE in forecast.columns @@ -506,16 +524,30 @@ def set_errors_dict(self, errors_dict: Dict): def get_errors_dict(self): return getattr(self, "errors_dict", None) - def merge(self, other: 'ForecastResults'): + def merge(self, other: "ForecastResults"): """Merge another ForecastResults object into this one.""" # Merge DataFrames if they exist, else just set for attr in [ - 'forecast', 'metrics', 'test_metrics', 'local_explanations', 'global_explanations', 'model_parameters', 'models', 'errors_dict']: + "forecast", + "metrics", + "test_metrics", + "local_explanations", + "global_explanations", + "model_parameters", + "models", + "errors_dict", + ]: val_self = getattr(self, attr, None) val_other = getattr(other, attr, None) if val_self is not None and val_other is not None: - if isinstance(val_self, pd.DataFrame) and isinstance(val_other, pd.DataFrame): - setattr(self, attr, pd.concat([val_self, val_other], ignore_index=True, axis=0)) + if isinstance(val_self, pd.DataFrame) and isinstance( + val_other, pd.DataFrame + ): + setattr( + self, + attr, + pd.concat([val_self, val_other], ignore_index=True, axis=0), + ) elif isinstance(val_self, dict) and isinstance(val_other, dict): val_self.update(val_other) setattr(self, attr, val_self) diff --git a/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py b/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py index 05c0dd606..8bfd0440b 100644 --- a/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py +++ b/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py @@ -182,6 +182,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.date_col, + postprocessing=self.spec.postprocessing, ) self._train_model(data_train, data_test, model_kwargs) return self.forecast_output.get_forecast_long() diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index f3a7a33f6..5ca4ac9bd 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -234,6 +234,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.spec.datetime_column.name, + postprocessing=self.spec.postprocessing, ) for i, (s_id, df) in enumerate(full_data_dict.items()): diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index c77d1e78f..29b4f72b1 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -198,6 +198,7 @@ def _build_model(self) -> pd.DataFrame: horizon=self.spec.horizon, target_column=self.original_target_column, dt_column=self.spec.datetime_column.name, + postprocessing=self.spec.postprocessing, ) Parallel(n_jobs=-1, require="sharedmem")( diff --git a/ads/opctl/operator/lowcode/forecast/model_evaluator.py b/ads/opctl/operator/lowcode/forecast/model_evaluator.py index d5986703e..42e33af12 100644 --- a/ads/opctl/operator/lowcode/forecast/model_evaluator.py +++ b/ads/opctl/operator/lowcode/forecast/model_evaluator.py @@ -149,6 +149,7 @@ def create_operator_config( backtest_spec["output_directory"] = {"url": output_file_path} backtest_spec["target_category_columns"] = [DataColumns.Series] backtest_spec["generate_explanations"] = False + backtest_spec.pop('postprocessing', None) cleaned_config = self.remove_none_values(backtest_op_config_draft) backtest_op_config = ForecastOperatorConfig.from_dict(obj_dict=cleaned_config) @@ -233,6 +234,7 @@ def find_best_model( nonempty_metrics = { model: metric for model, metric in metrics.items() if metric != {} } + avg_backtests_metric = { model: sum(value.values()) / len(value.values()) for model, value in nonempty_metrics.items() diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 23ec5b959..0813fce7e 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -76,6 +76,14 @@ class PreprocessingSteps(DataClassSerializable): outlier_treatment: bool = True +@dataclass(repr=True) +class PostprocessingSteps(DataClassSerializable): + """Class representing postprocessing steps for operator.""" + + set_min_forecast: int = None + set_max_forecast: int = None + + @dataclass(repr=True) class DataPreprocessor(DataClassSerializable): """Class representing operator specification preprocessing details.""" @@ -110,6 +118,7 @@ class ForecastOperatorSpec(DataClassSerializable): local_explanation_filename: str = None target_column: str = None preprocessing: DataPreprocessor = field(default_factory=DataPreprocessor) + postprocessing: PostprocessingSteps = field(default_factory=PostprocessingSteps) datetime_column: DateTimeColumn = field(default_factory=DateTimeColumn) target_category_columns: List[str] = field(default_factory=list) generate_report: bool = None @@ -146,6 +155,11 @@ def __post_init__(self): if self.preprocessing is not None else DataPreprocessor(enabled=True) ) + self.postprocessing = ( + self.postprocessing + if self.postprocessing is not None + else PostprocessingSteps() + ) # For Report Generation. When user doesn't specify defaults to True self.generate_report = ( self.generate_report if self.generate_report is not None else True diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index fe7c90df5..2b2067790 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -329,6 +329,21 @@ spec: required: false default: false + postprocessing: + type: dict + required: false + schema: + set_min_forecast: + type: integer + required: false + meta: + description: "This can be used to define the minimum forecast in the output." + set_max_forecast: + type: integer + required: false + meta: + description: "This can be used to define the maximum forecast in the output." + generate_explanations: type: boolean required: false diff --git a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst index dc0ee92de..a2d8c0c06 100644 --- a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst +++ b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - string - No - prophet - - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select. + - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select, auto-select-series. * - model_kwargs - dict @@ -163,6 +163,18 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - false - Handle outliers. + * - postprocessing.set_min_forecast + - integer + - No + - + - This can be used to define the minimum forecast in the output. + + * - postprocessing.set_max_forecast + - integer + - No + - + - This can be used to define the maximum forecast in the output. + * - generate_explanations - boolean - No @@ -266,7 +278,7 @@ Further Description * **format**: (Optional) Specify the format for output data (e.g., ``csv``, ``json``, ``excel``). * **options**: (Optional) Include any additional arguments, such as connection parameters for storage. - * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``autots``, and ``auto-select``. + * **model**: (Optional) The name of the model framework to use. Defaults to ``prophet``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``automlx``, ``autots``, ``auto-select``, and ``auto-select-series``. * **model_kwargs**: (Optional) A dictionary of arguments to pass directly to the model framework, allowing for detailed control over modeling. @@ -282,6 +294,10 @@ Further Description * **preprocessing**: (Optional) Controls preprocessing and feature engineering steps. This can be enabled or disabled using the ``enabled`` flag. The default is ``true``. * **steps**: (Optional) Specific preprocessing steps, such as ``missing_value_imputation`` and ``outlier_treatment``, which are enabled by default. + * **postprocessing**: (Optional) Controls postprocessing steps. + * **set_min_forecast**: (Optional) This can be used to define the minimum forecast in the output. + * **set_max_forecast**: (Optional) This can be used to define the maximum forecast in the output. + * **metric**: (Optional) The metric to select during model evaluation. Options include ``MAPE``, ``RMSE``, ``MSE``, and ``SMAPE``. The default is ``MAPE``. * **confidence_interval_width**: (Optional) The width of the confidence interval to calculate in the forecast. The default is 0.80, indicating an 80% confidence interval. @@ -324,4 +340,4 @@ Further Description * **cool_down_in_seconds**: The cooldown period (in seconds) to wait before performing another scaling action. * **scaling_metric**: The metric used for scaling actions. e.g. ``CPU_UTILIZATION`` or ``MEMORY_UTILIZATION`` * **scale_in_threshold**: The utilization percentage below which the instances will scale in (reduce). - * **scale_out_threshold**: The utilization percentage above which the instances will scale out (increase). + * **scale_out_threshold**: The utilization percentage above which the instances will scale out (increase). \ No newline at end of file diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index 8460bbea7..379f77b37 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -413,5 +413,42 @@ def run_operator( # generate_train_metrics = True +def test_postprocessing_clipping(): + """Tests the postprocessing clipping of forecast values.""" + df = pd.DataFrame( + { + "Date": pd.to_datetime(pd.date_range("2023-01-01", periods=20, freq="D")), + "Y": range(0, 40, 2), + } + ) + + min_clip = 40 + + max_clip = 42 + + with tempfile.TemporaryDirectory() as tmpdirname: + output_data_path = f"{tmpdirname}/results" + yaml_i = deepcopy(TEMPLATE_YAML) + yaml_i["spec"]["model"] = "prophet" + yaml_i["spec"]["historical_data"].pop("url") + yaml_i["spec"]["historical_data"]["data"] = df + yaml_i["spec"]["target_column"] = "Y" + yaml_i["spec"]["datetime_column"]["name"] = DATETIME_COL + yaml_i["spec"]["horizon"] = 5 + yaml_i["spec"]["output_directory"]["url"] = output_data_path + yaml_i["spec"]["postprocessing"] = { + "set_min_forecast": min_clip, + "set_max_forecast": max_clip, + } + + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + forecast_operate(operator_config) + + forecast_df = pd.read_csv(f"{output_data_path}/forecast.csv") + + assert forecast_df["forecast_value"].min() >= min_clip + assert forecast_df["forecast_value"].max() <= max_clip + + if __name__ == "__main__": pass