diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml index 5ba432ea..28d423f7 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml @@ -24,7 +24,5 @@ summarize: write_results: project: "moz-fx-data-shared-prod" - project_legacy: "moz-fx-data-shared-prod" dataset: "telemetry_derived" - dataset_legacy: "telemetry_derived" table: "kpi_forecasts_v0" diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml index 74889971..89885c15 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml @@ -24,7 +24,5 @@ summarize: write_results: project: "moz-fx-data-shared-prod" - project_legacy: "moz-fx-data-shared-prod" dataset: "telemetry_derived" - dataset_legacy: "telemetry_derived" table: "kpi_forecasts_v0" diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py index b8539dab..32b8b687 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py @@ -1,4 +1,3 @@ -import json import pandas as pd from pandas.api import types as pd_types import prophet @@ -6,7 +5,6 @@ from typing import Dict, List -from datetime import datetime, timezone from dataclasses import dataclass from kpi_forecasting.models.base_forecast import BaseForecast from kpi_forecasting import pandas_extras as pdx @@ -69,87 +67,6 @@ def _validate_forecast_df(self, df) -> None: f" but column {i} has type {df[i].dtypes}." ) - def _predict_legacy(self) -> pd.DataFrame: - """ - Recreate the legacy format used in - `moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_v1`. - """ - # TODO: This method should be removed once the forecasting data model is updated: - # https://mozilla-hub.atlassian.net/browse/DS-2676 - - df = self.model.predict( - self.dates_to_predict.rename(columns=self.column_names_map) - ) - - # set legacy column values - if "dau" in self.metric_hub.alias.lower(): - df["metric"] = "DAU" - else: - df["metric"] = self.metric_hub.alias - - df["forecast_date"] = str( - datetime.now(timezone.utc).replace(tzinfo=None).date() - ) - df["forecast_parameters"] = str( - json.dumps({**self.parameters, "holidays": self.use_holidays}) - ) - - alias = self.metric_hub.alias.lower() - - if ("desktop" in alias) and ("mobile" in alias): - raise ValueError( - "Metric Hub alias must include either 'desktop' or 'mobile', not both." - ) - elif "desktop" in alias: - df["target"] = "desktop" - elif "mobile" in alias: - df["target"] = "mobile" - else: - raise ValueError( - "Metric Hub alias must include either 'desktop' or 'mobile'." - ) - - columns = [ - "ds", - "trend", - "yhat_lower", - "yhat_upper", - "trend_lower", - "trend_upper", - "additive_terms", - "additive_terms_lower", - "additive_terms_upper", - "extra_regressors_additive", - "extra_regressors_additive_lower", - "extra_regressors_additive_upper", - "holidays", - "holidays_lower", - "holidays_upper", - "regressor_00", - "regressor_00_lower", - "regressor_00_upper", - "weekly", - "weekly_lower", - "weekly_upper", - "yearly", - "yearly_lower", - "yearly_upper", - "multiplicative_terms", - "multiplicative_terms_lower", - "multiplicative_terms_upper", - "yhat", - "target", - "forecast_date", - "forecast_parameters", - "metric", - ] - - for column in columns: - if column not in df.columns: - df[column] = 0.0 - - return df[columns] - def _combine_forecast_observed( self, forecast_df, @@ -238,118 +155,12 @@ def _summarize( return df - def _summarize_legacy(self) -> pd.DataFrame: - """ - Converts a `self.summary_df` to the legacy format used in - `moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1` - """ - # TODO: This method should be removed once the forecasting data model is updated: - # https://mozilla-hub.atlassian.net/browse/DS-2676 - - df = self.summary_df.copy(deep=True) - - # rename columns to legacy values - df.rename( - columns={ - "forecast_end_date": "asofdate", - "submission_date": "date", - "metric_alias": "target", - "aggregation_period": "unit", - }, - inplace=True, - ) - df["forecast_date"] = df["forecast_predicted_at"].dt.date - df["type"] = df["source"].replace("historical", "actual") - df = df.replace( - { - "measure": { - "observed": "value", - "p05": "yhat_p5", - "p10": "yhat_p10", - "p20": "yhat_p20", - "p30": "yhat_p30", - "p40": "yhat_p40", - "p50": "yhat_p50", - "p60": "yhat_p60", - "p70": "yhat_p70", - "p80": "yhat_p80", - "p90": "yhat_p90", - "p95": "yhat_p95", - }, - "target": { - "desktop_dau": "desktop", - "mobile_dau": "mobile", - }, - } - ) - - # pivot the df from "long" to "wide" format - index_columns = [ - "asofdate", - "date", - "target", - "unit", - "forecast_parameters", - "forecast_date", - ] - df = ( - df[index_columns + ["measure", "value"]] - .pivot( - index=index_columns, - columns="measure", - values="value", - ) - .reset_index() - ) - - # pivot sets the "name" attribute of the columns for some reason. It's - # None by default, so we just reset that here. - df.columns.name = None - - # When there's an overlap in the observed and forecasted period -- for - # example, when a monthly forecast is generated mid-month -- the legacy - # format only records the forecasted value, not the observed value. To - # account for this, we'll just find the max of the "mean" (forecasted) and - # "value" (observed) data. In all non-overlapping observed periods, the - # forecasted value will be NULL. In all non-overlapping forecasted periods, - # the observed value will be NULL. In overlapping periods, the forecasted - # value will always be larger because it is the sum of the observed and forecasted - # values. Below is a query that demonstrates the legacy behavior: - # - # SELECT * - # FROM `moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1` - # WHERE asofdate = "2023-12-31" - # AND target = "mobile" - # AND unit = "month" - # AND forecast_date = "2022-06-04" - # AND date BETWEEN "2022-05-01" AND "2022-06-01" - # ORDER BY date - df["value"] = df[["mean", "value"]].max(axis=1) - df.drop(columns=["mean"], inplace=True) - - # non-numeric columns are represented in the legacy bq schema as strings - string_cols = [ - "asofdate", - "date", - "target", - "unit", - "forecast_parameters", - "forecast_date", - ] - df[string_cols] = df[string_cols].astype(str) - - return df - def write_results( self, project: str, dataset: str, table: str, - project_legacy: str, - dataset_legacy: str, write_disposition: str = "WRITE_APPEND", - forecast_table_legacy: str = "kpi_automated_forecast_v1", - confidences_table_legacy: str = "kpi_automated_forecast_confidences_v1", ) -> None: """ Write `self.summary_df` to Big Query. @@ -362,11 +173,6 @@ def write_results( should the table be overwritten ("WRITE_TRUNCATE") or appended to ("WRITE_APPEND")? """ - # get legacy tables - # TODO: remove this once the forecasting data model is updated: - # https://mozilla-hub.atlassian.net/browse/DS-2676 - self.forecast_df_legacy = self._predict_legacy() - self.summary_df_legacy = self._summarize_legacy() print(f"Writing results to `{project}.{dataset}.{table}`.", flush=True) client = bigquery.Client(project=project) @@ -399,33 +205,3 @@ def write_results( ) # Wait for the job to complete. job.result() - - # TODO: remove the below jobs once the forecasting data model is updated: - # https://mozilla-hub.atlassian.net/browse/DS-2676 - - job = client.load_table_from_dataframe( - dataframe=self.forecast_df_legacy, - destination=f"{project_legacy}.{dataset_legacy}.{forecast_table_legacy}", - job_config=bigquery.LoadJobConfig( - write_disposition=write_disposition, - schema=[ - bigquery.SchemaField("ds", bq_types.TIMESTAMP), - bigquery.SchemaField("forecast_date", bq_types.STRING), - bigquery.SchemaField("forecast_parameters", bq_types.STRING), - ], - ), - ) - job.result() - - job = client.load_table_from_dataframe( - dataframe=self.summary_df_legacy, - destination=f"{project_legacy}.{dataset_legacy}.{confidences_table_legacy}", - job_config=bigquery.LoadJobConfig( - write_disposition=write_disposition, - schema=[ - bigquery.SchemaField("asofdate", bq_types.STRING), - bigquery.SchemaField("date", bq_types.STRING), - ], - ), - ) - job.result()