Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions jobs/kpi-forecasting/kpi_forecasting/configs/dau_desktop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,5 @@ summarize:

write_results:
project: "moz-fx-data-shared-prod"
project_legacy: "moz-fx-data-shared-prod"
dataset: "telemetry_derived"
dataset_legacy: "telemetry_derived"
table: "kpi_forecasts_v0"
2 changes: 0 additions & 2 deletions jobs/kpi-forecasting/kpi_forecasting/configs/dau_mobile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,5 @@ summarize:

write_results:
project: "moz-fx-data-shared-prod"
project_legacy: "moz-fx-data-shared-prod"
dataset: "telemetry_derived"
dataset_legacy: "telemetry_derived"
table: "kpi_forecasts_v0"
224 changes: 0 additions & 224 deletions jobs/kpi-forecasting/kpi_forecasting/models/prophet_forecast.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import json
import pandas as pd
from pandas.api import types as pd_types
import prophet
import numpy as np
from typing import Dict, List


from datetime import datetime, timezone
from dataclasses import dataclass
from kpi_forecasting.models.base_forecast import BaseForecast
from kpi_forecasting import pandas_extras as pdx
Expand Down Expand Up @@ -69,87 +67,6 @@ def _validate_forecast_df(self, df) -> None:
f" but column {i} has type {df[i].dtypes}."
)

def _predict_legacy(self) -> pd.DataFrame:
"""
Recreate the legacy format used in
`moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_v1`.
"""
# TODO: This method should be removed once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676

df = self.model.predict(
self.dates_to_predict.rename(columns=self.column_names_map)
)

# set legacy column values
if "dau" in self.metric_hub.alias.lower():
df["metric"] = "DAU"
else:
df["metric"] = self.metric_hub.alias

df["forecast_date"] = str(
datetime.now(timezone.utc).replace(tzinfo=None).date()
)
df["forecast_parameters"] = str(
json.dumps({**self.parameters, "holidays": self.use_holidays})
)

alias = self.metric_hub.alias.lower()

if ("desktop" in alias) and ("mobile" in alias):
raise ValueError(
"Metric Hub alias must include either 'desktop' or 'mobile', not both."
)
elif "desktop" in alias:
df["target"] = "desktop"
elif "mobile" in alias:
df["target"] = "mobile"
else:
raise ValueError(
"Metric Hub alias must include either 'desktop' or 'mobile'."
)

columns = [
"ds",
"trend",
"yhat_lower",
"yhat_upper",
"trend_lower",
"trend_upper",
"additive_terms",
"additive_terms_lower",
"additive_terms_upper",
"extra_regressors_additive",
"extra_regressors_additive_lower",
"extra_regressors_additive_upper",
"holidays",
"holidays_lower",
"holidays_upper",
"regressor_00",
"regressor_00_lower",
"regressor_00_upper",
"weekly",
"weekly_lower",
"weekly_upper",
"yearly",
"yearly_lower",
"yearly_upper",
"multiplicative_terms",
"multiplicative_terms_lower",
"multiplicative_terms_upper",
"yhat",
"target",
"forecast_date",
"forecast_parameters",
"metric",
]

for column in columns:
if column not in df.columns:
df[column] = 0.0

return df[columns]

def _combine_forecast_observed(
self,
forecast_df,
Expand Down Expand Up @@ -238,118 +155,12 @@ def _summarize(

return df

def _summarize_legacy(self) -> pd.DataFrame:
"""
Converts a `self.summary_df` to the legacy format used in
`moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1`
"""
# TODO: This method should be removed once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676

df = self.summary_df.copy(deep=True)

# rename columns to legacy values
df.rename(
columns={
"forecast_end_date": "asofdate",
"submission_date": "date",
"metric_alias": "target",
"aggregation_period": "unit",
},
inplace=True,
)
df["forecast_date"] = df["forecast_predicted_at"].dt.date
df["type"] = df["source"].replace("historical", "actual")
df = df.replace(
{
"measure": {
"observed": "value",
"p05": "yhat_p5",
"p10": "yhat_p10",
"p20": "yhat_p20",
"p30": "yhat_p30",
"p40": "yhat_p40",
"p50": "yhat_p50",
"p60": "yhat_p60",
"p70": "yhat_p70",
"p80": "yhat_p80",
"p90": "yhat_p90",
"p95": "yhat_p95",
},
"target": {
"desktop_dau": "desktop",
"mobile_dau": "mobile",
},
}
)

# pivot the df from "long" to "wide" format
index_columns = [
"asofdate",
"date",
"target",
"unit",
"forecast_parameters",
"forecast_date",
]
df = (
df[index_columns + ["measure", "value"]]
.pivot(
index=index_columns,
columns="measure",
values="value",
)
.reset_index()
)

# pivot sets the "name" attribute of the columns for some reason. It's
# None by default, so we just reset that here.
df.columns.name = None

# When there's an overlap in the observed and forecasted period -- for
# example, when a monthly forecast is generated mid-month -- the legacy
# format only records the forecasted value, not the observed value. To
# account for this, we'll just find the max of the "mean" (forecasted) and
# "value" (observed) data. In all non-overlapping observed periods, the
# forecasted value will be NULL. In all non-overlapping forecasted periods,
# the observed value will be NULL. In overlapping periods, the forecasted
# value will always be larger because it is the sum of the observed and forecasted
# values. Below is a query that demonstrates the legacy behavior:
#
# SELECT *
# FROM `moz-fx-data-shared-prod.telemetry_derived.kpi_automated_forecast_confidences_v1`
# WHERE asofdate = "2023-12-31"
# AND target = "mobile"
# AND unit = "month"
# AND forecast_date = "2022-06-04"
# AND date BETWEEN "2022-05-01" AND "2022-06-01"
# ORDER BY date
df["value"] = df[["mean", "value"]].max(axis=1)
df.drop(columns=["mean"], inplace=True)

# non-numeric columns are represented in the legacy bq schema as strings
string_cols = [
"asofdate",
"date",
"target",
"unit",
"forecast_parameters",
"forecast_date",
]
df[string_cols] = df[string_cols].astype(str)

return df

def write_results(
self,
project: str,
dataset: str,
table: str,
project_legacy: str,
dataset_legacy: str,
write_disposition: str = "WRITE_APPEND",
forecast_table_legacy: str = "kpi_automated_forecast_v1",
confidences_table_legacy: str = "kpi_automated_forecast_confidences_v1",
) -> None:
"""
Write `self.summary_df` to Big Query.
Expand All @@ -362,11 +173,6 @@ def write_results(
should the table be overwritten ("WRITE_TRUNCATE") or appended to
("WRITE_APPEND")?
"""
# get legacy tables
# TODO: remove this once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676
self.forecast_df_legacy = self._predict_legacy()
self.summary_df_legacy = self._summarize_legacy()

print(f"Writing results to `{project}.{dataset}.{table}`.", flush=True)
client = bigquery.Client(project=project)
Expand Down Expand Up @@ -399,33 +205,3 @@ def write_results(
)
# Wait for the job to complete.
job.result()

# TODO: remove the below jobs once the forecasting data model is updated:
# https://mozilla-hub.atlassian.net/browse/DS-2676

job = client.load_table_from_dataframe(
dataframe=self.forecast_df_legacy,
destination=f"{project_legacy}.{dataset_legacy}.{forecast_table_legacy}",
job_config=bigquery.LoadJobConfig(
write_disposition=write_disposition,
schema=[
bigquery.SchemaField("ds", bq_types.TIMESTAMP),
bigquery.SchemaField("forecast_date", bq_types.STRING),
bigquery.SchemaField("forecast_parameters", bq_types.STRING),
],
),
)
job.result()

job = client.load_table_from_dataframe(
dataframe=self.summary_df_legacy,
destination=f"{project_legacy}.{dataset_legacy}.{confidences_table_legacy}",
job_config=bigquery.LoadJobConfig(
write_disposition=write_disposition,
schema=[
bigquery.SchemaField("asofdate", bq_types.STRING),
bigquery.SchemaField("date", bq_types.STRING),
],
),
)
job.result()