diff --git a/.circleci/config.yml b/.circleci/config.yml index c41e9e14..4e14f25c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -65,6 +65,22 @@ jobs: name: Verify CI config is up-to-date command: docker run docker-etl:build python3 -m docker_etl.ci_config --dry-run | diff -B .circleci/config.yml - + build-job-ads-incrementality-dap-collector: + docker: + - image: << pipeline.parameters.git-image >> + steps: + - checkout + - compare-branch: + pattern: ^jobs/ads-incrementality-dap-collector/ + - setup_remote_docker: + version: << pipeline.parameters.docker-version >> + - run: + name: Build Docker image + command: docker build -t app:build jobs/ads-incrementality-dap-collector/ + - run: + name: Test Code + command: docker run app:build python3 -m pytest + build-job-bq2sftp: docker: - image: << pipeline.parameters.git-image >> @@ -213,6 +229,7 @@ jobs: name: Build Docker image command: docker build -t app:build jobs/experiments-monitoring-data-export/ + build-job-extensions: docker: - image: << pipeline.parameters.git-image >> @@ -402,6 +419,20 @@ workflows: jobs: - build-docker-etl + job-ads-incrementality-dap-collector: + jobs: + - build-job-ads-incrementality-dap-collector + - gcp-gcr/build-and-push-image: + context: data-eng-airflow-gcr + docker-context: jobs/ads-incrementality-dap-collector/ + path: jobs/ads-incrementality-dap-collector/ + image: ads-incrementality-dap-collector_docker_etl + requires: + - build-job-ads-incrementality-dap-collector + filters: + branches: + only: main + job-bq2sftp: jobs: - build-job-bq2sftp @@ -533,7 +564,6 @@ workflows: branches: only: main - job-extensions: jobs: - build-job-extensions diff --git a/jobs/ads-incrementality-dap-collector/.dockerignore b/jobs/ads-incrementality-dap-collector/.dockerignore new file mode 100644 index 00000000..e436bc5d --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/.dockerignore @@ -0,0 +1,13 @@ +.cache/ +ci_job.yaml +ci_workflow.yaml +public_key_to_hpke_config.py +dev_run_docker.sh +dev_runbook.md +.DS_Store +example_config.json +*.pyc +.pytest_cache/ +.python-version +__pycache__/ +venv/ diff --git a/jobs/ads-incrementality-dap-collector/.flake8 b/jobs/ads-incrementality-dap-collector/.flake8 new file mode 100644 index 00000000..6deafc26 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 120 diff --git a/jobs/ads-incrementality-dap-collector/.gitignore b/jobs/ads-incrementality-dap-collector/.gitignore new file mode 100644 index 00000000..ee9df185 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/.gitignore @@ -0,0 +1,5 @@ +.DS_Store +*.pyc +__pycache__/ +venv/ +.python-version diff --git a/jobs/ads-incrementality-dap-collector/Dockerfile b/jobs/ads-incrementality-dap-collector/Dockerfile new file mode 100644 index 00000000..3fc66397 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.12 +LABEL maintainer="Glenda Leonard " +ARG HOME="/janus_build" +WORKDIR ${HOME} + +RUN apt update && apt --yes install curl + +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH=$HOME/.cargo/bin:$PATH + +# build the CLI tool +RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69' +RUN cd janus && cargo build -r -p janus_tools --bin collect + +######### next stage + +FROM python:3.12 +LABEL maintainer="Glenda Leonard " +# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md +ARG USER_ID="10001" +ARG GROUP_ID="app" +ARG HOME="/app" +WORKDIR ${HOME} + +RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ + useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} +##################### from other Dockerfile +COPY --from=0 /janus_build/janus/target/release/collect ./ +################### + +# Drop root and change ownership of the application folder to the user +RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} +USER ${USER_ID} +ADD ./requirements.txt . +RUN pip install --upgrade pip +RUN pip install -r requirements.txt + +ADD . . diff --git a/jobs/ads-incrementality-dap-collector/README.md b/jobs/ads-incrementality-dap-collector/README.md new file mode 100644 index 00000000..d521e2a9 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/README.md @@ -0,0 +1,131 @@ +# Ads Incrementality DAP collector + +## Background + +Incrementality is a way to measure the effectiveness of our ads in a general, agreggated, privacy-preserving way -- +without knowing anything about specific users. + +Incrementality works by dividing clients into various Nimbus experiment branches that vary how/whether an ad is shown. +Separately, a [DAP](https://docs.divviup.org/) task is configured to store the metrics for each experiment branch in a +different DAP bucket. + +Firefox is instrumented with [DAP telemetry functionality](https://github.com/mozilla-firefox/firefox/tree/main/toolkit/components/telemetry/dap), which allows it to send metrics and reports into the correct DAP buckets as configured in the experiment. + +Then this job can go out and collect metrics from DAP (using bucket info from the experiment's data), and write them +to BQ. + +## Overview + +This job is driven by a config file from a GCS bucket. Inform the job of the config file location by passing the +`gcp_project` and `gcs_config_bucket` parameters. See `example_config.json` for how to structure this file. + +The config file specifies the incrementality experiments that are currently running, some config and credentials from DAP, +and where in BQ to write the incrementality results. + +The job will go out to Nimbus and read data for each of the experiments, then go out to DAP and read experiment branch results, +then put it all together into results rows and write metrics to BQ. + +## Configuration + +The three recognized top-level keys here are `bq`, `dap`, and `nimbus` + +#### bq + +Everything the job needs to connect to BigQuery. + +- `project`: GCP project +- `namespace`: BQ namespace for ads incrementality +- `table`: BQ table where incrementality results go + +#### dap + +Everything the job needs to connect to DAP. + +- `auth_token`: Token defined in the collector credentials, used to authenticate to the leader +- `hpke_private_key`: Private key defined in the collector credentials, used to decrypt shares from the leader + and helper +- `hpke_config`: base64 url-encoded version of public key defined in the collector credentials +- `batch_start`: Start of the collection interval, as the number of seconds since the Unix epoch + + +#### nimbus + +Everything the job needs to connect to Nimbus. + +- `api_url`: API URL for fetching the Nimbus experiment info +- `experiments`: List of incrementality experiments configs + +##### experiment config list + +The experiments that the job should collect results for. + +- `slug`: Experiment slug +- `batch_duration`: Optional. Duration of the collection batch interval, in seconds. + This will default to 7 days if not specified + +## Usage + +This script is intended to be run in a docker container. + +It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it +starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it +also documents those variables. + +Once the environment variables are set up, run the job with: + + +```sh +./dev_run_docker.sh +``` + +To just build the docker image, use: + +```sh +docker build -t ads_incrementality_dap_collector . +``` + +To run outside of docker, install dependencies with: + +```sh +pip install -r requirements.txt +``` + +Run the script with: + +```sh +python3 -m python_template_job.main +``` + +## Testing + +Run tests with: + +```sh +python3 -m pytest +``` + +## Linting and format + +`flake8` and `black` are included for code linting and formatting: + +```sh +pytest --black --flake8 +``` + +or + +```sh +flake8 . +``` + +or + +```sh +black . +``` + +or + +```sh +black --diff . +``` diff --git a/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/constants.py b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/constants.py new file mode 100644 index 00000000..cad9ca4b --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/constants.py @@ -0,0 +1,72 @@ +from datetime import datetime + +from google.cloud import bigquery + +DAP_LEADER = "https://dap-09-3.api.divviup.org" +VDAF = "histogram" +PROCESS_TIMEOUT = 1200 # 20 mins + +CONFIG_FILE_NAME = "config.json" # See example_config.json for the contents and structure of the job config file. +LOG_FILE_NAME = f"{datetime.now()}-ads-incrementality-dap-collector.log" + +DEFAULT_BATCH_DURATION = 604800 + +COLLECTOR_RESULTS_SCHEMA = [ + bigquery.SchemaField( + "collection_start", + "DATE", + mode="REQUIRED", + description="Start date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "collection_end", + "DATE", + mode="REQUIRED", + description="End date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "country_codes", + "JSON", + mode="NULLABLE", + description="List of 2-char country codes for the experiment", + ), + bigquery.SchemaField( + "experiment_slug", + "STRING", + mode="REQUIRED", + description="Slug indicating the experiment.", + ), + bigquery.SchemaField( + "experiment_branch", + "STRING", + mode="REQUIRED", + description="The experiment branch this data is associated with.", + ), + bigquery.SchemaField( + "advertiser", + "STRING", + mode="REQUIRED", + description="Advertiser associated with this experiment.", + ), + bigquery.SchemaField( + "metric", + "STRING", + mode="REQUIRED", + description="Metric collected for this experiment.", + ), + bigquery.SchemaField( + name="value", + field_type="RECORD", + mode="REQUIRED", + fields=[ + bigquery.SchemaField("count", "INT64", mode="NULLABLE"), + bigquery.SchemaField("histogram", "JSON", mode="NULLABLE"), + ], + ), + bigquery.SchemaField( + "created_at", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp for when this row was written.", + ), +] diff --git a/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/helpers.py b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/helpers.py new file mode 100644 index 00000000..1dc38000 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/helpers.py @@ -0,0 +1,328 @@ +import ast +from datetime import datetime +import json +import logging +import requests +import subprocess +import time + +from google.cloud import bigquery +from google.cloud import storage +from types import SimpleNamespace +from typing import Optional + +from constants import ( + COLLECTOR_RESULTS_SCHEMA, + CONFIG_FILE_NAME, + DAP_LEADER, + DEFAULT_BATCH_DURATION, + LOG_FILE_NAME, + PROCESS_TIMEOUT, + VDAF, +) +from models import ( + IncrementalityBranchResultsRow, + NimbusExperiment, +) + + +# Nimbus Experimenter helper functions +def get_experiment( + experiment_config: SimpleNamespace, api_url: str +) -> Optional[NimbusExperiment]: + """Fetch the experiment from Experimenter API and return the configuration.""" + logging.info(f"Fetching experiment: {experiment_config.slug}") + try: + nimbus_experiments_json = fetch(f"{api_url}/{experiment_config.slug}/") + if not hasattr(experiment_config, "batch_duration"): + experiment_config.batch_duration = DEFAULT_BATCH_DURATION + nimbus_experiments_json["batchDuration"] = experiment_config.batch_duration + nimbus_experiment = NimbusExperiment.from_dict(nimbus_experiments_json) + logging.info(f"Fetched experiment json: {experiment_config.slug}") + return nimbus_experiment + except Exception as e: + raise Exception( + f"Failed getting experiment: {experiment_config.slug} from: {api_url}" + ) from e + + +def prepare_results_rows( + experiment: NimbusExperiment, +) -> dict[str, dict[int, IncrementalityBranchResultsRow]]: + """Pull info out of the experiment metadata to set up experiment branch results rows. The info + here will be used to call DAP and get results data for each branch, and ultimately written + to BQ.""" + tasks_to_process: dict[str, dict[int, IncrementalityBranchResultsRow]] = {} + if not experiment.collect_today(): + logging.info( + f"Skipping collection for {experiment.slug} today. \ + Next collection date will be {experiment.next_collect_date()}" + ) + return tasks_to_process + + for branch in experiment.branches: + logging.info(f"Processing experiment branch: {branch.slug}") + dap_telemetry_features = [ + f for f in branch.features if f.get("featureId") == "dapTelemetry" + ] + + for feature in dap_telemetry_features: + logging.info(f"Processing dapTelemetry experiment feature: {feature}") + visit_counting_experiment_list = feature.get("value").get( + "visitCountingExperimentList" + ) + + for visit_counting_list_item in visit_counting_experiment_list: + incrementality = IncrementalityBranchResultsRow( + experiment, branch.slug, visit_counting_list_item + ) + task_id = incrementality.task_id + + if task_id not in tasks_to_process: + tasks_to_process[task_id] = {} + tasks_to_process[task_id][incrementality.bucket] = incrementality + logging.info( + f"Prepared intermediate result rows: {tasks_to_process[task_id]}" + ) + + logging.info(f"Finished processing experiment branch: {branch.slug}.") + return tasks_to_process + + +# DAP helper functions +def collect_dap_result( + task_id: str, + vdaf_length: int, + batch_start: int, + duration: int, + auth_token: str, + hpke_config: str, + hpke_private_key: str, +) -> dict: + # Beware! This command string reveals secrets. Uncomment logging below only for debugging in local dev. + # + # command_str = (f"./collect --task-id {task_id} --leader {DAP_LEADER} --vdaf {VDAF} --length {vdaf_length} " + # f"--authorization-bearer-token {auth_token} --batch-interval-start {batch_start} " + # f"--batch-interval-duration {duration} --hpke-config {hpke_config} " + # f"--hpke-private-key {hpke_private_key}") + # logging.debug(f"command_str: {command_str}") + logging.info(f"Processing batch_start: {batch_start} for duration: {duration}") + try: + result = subprocess.run( + [ + "./collect", + "--task-id", + task_id, + "--leader", + DAP_LEADER, + "--vdaf", + VDAF, + "--length", + f"{vdaf_length}", + "--authorization-bearer-token", + auth_token, + "--batch-interval-start", + f"{batch_start}", + "--batch-interval-duration", + f"{duration}", + "--hpke-config", + hpke_config, + "--hpke-private-key", + hpke_private_key, + ], + capture_output=True, + text=True, + check=True, + timeout=PROCESS_TIMEOUT, + ) + for line in result.stdout.splitlines(): + if line.startswith("Aggregation result:"): + entries = parse_histogram(line[21:-1]) + return entries + # Beware! Exceptions thrown by the subprocess reveal secrets. + # Log them and include traceback only for debugging in local dev. + except subprocess.CalledProcessError as e: + raise Exception( + f"Collection failed for {task_id}, {e.returncode}, stderr: {e.stderr}" + ) from None + except subprocess.TimeoutExpired as e: + raise Exception( + f"Collection timed out for {task_id}, {e.timeout}, stderr: {e.stderr}" + ) from None + + +def collect_dap_results( + tasks_to_collect: dict[str, dict[int, IncrementalityBranchResultsRow]], + config: SimpleNamespace, + experiment_config: SimpleNamespace, +): + tasks = list(dict.fromkeys(tasks_to_collect)) + logging.info(f"Starting DAP collection for tasks: {tasks}.") + for task_id in tasks: + logging.info(f"Collecting DAP task: {task_id}") + results = tasks_to_collect[task_id] + # The task vector length and batch duration are specified per-experiment and + # stored with each branch. So it's okay to just use the first branch + # to populate these values for all the tasks here. + firstBranch = list(results.values())[0] + task_veclen = firstBranch.task_veclen + batch_start_epoch = int( + datetime.combine(firstBranch.batch_start, datetime.min.time()).timestamp() + ) + batch_duration = firstBranch.batch_duration + collected = collect_dap_result( + task_id, + task_veclen, + batch_start_epoch, + batch_duration, + config.auth_token, + config.hpke_config, + config.hpke_private_key, + ) + try: + for bucket in results.keys(): + tasks_to_collect[task_id][bucket].value_count = collected[bucket] + except Exception as e: + raise Exception( + f"Failed to parse collected DAP results: {collected}" + ) from e + logging.info(f"Prepared final result rows: {tasks_to_collect[task_id]}") + logging.info(f"Finished collecting DAP task: {task_id}") + logging.info("Finished DAP collection for all tasks.") + return tasks_to_collect + + +def correct_wraparound(num: int) -> int: + field_prime = 340282366920938462946865773367900766209 + field_size = 128 + cutoff = 2 ** (field_size - 1) + if num > cutoff: + return num - field_prime + return num + + +def parse_histogram(histogram_str: str) -> dict: + parsed_list = ast.literal_eval(histogram_str) + # Experiment branches are indexed starting from 1, DAP bucket results from 0, + # so use i + 1 as the key here when parsing the histogram + return {i + 1: correct_wraparound(val) for i, val in enumerate(parsed_list)} + + +# BigQuery helper functions +def create_bq_table_if_not_exists( + project: str, namespace: str, table: str, bq_client: bigquery.Client +): + data_set = f"{project}.{namespace}" + bq_client.create_dataset(data_set, exists_ok=True) + full_table_id = f"{data_set}.{table}" + table = bigquery.Table(full_table_id, schema=COLLECTOR_RESULTS_SCHEMA) + + try: + bq_client.create_table(table, exists_ok=True) + return full_table_id + except Exception as e: + raise Exception(f"Failed to create BQ table: {full_table_id}") from e + + +def create_bq_row( + collection_start: str, + collection_end: str, + country_codes: str, + experiment_slug: str, + experiment_branch: str, + advertiser: str, + metric: str, + value_histogram: str = None, + value_count: int = None, +) -> dict: + row = { + "collection_start": collection_start, + "collection_end": collection_end, + "country_codes": country_codes, + "experiment_slug": experiment_slug, + "experiment_branch": experiment_branch, + "advertiser": advertiser, + "metric": metric, + "value": {"count": value_count, "histogram": value_histogram}, + "created_at": datetime.now().isoformat(), + } + return row + + +def insert_into_bq(row, bqclient, table_id: str): + """Inserts the results into BQ. Assumes that they are already in the right format""" + if row: + insert_res = bqclient.insert_rows_json(table=table_id, json_rows=[row]) + if len(insert_res) != 0: + raise Exception(f"Error inserting rows into {table_id}: {insert_res}") + + +def write_results_to_bq(collected_tasks: dict, config: SimpleNamespace): + """Takes the collected results for each experiment branch and writes out rows to BQ.""" + records = [v for inner in collected_tasks.values() for v in inner.values()] + logging.info(f"Inserting results rows into BQ: {records}") + bq_client = bigquery.Client(project=config.project) + full_table_id = create_bq_table_if_not_exists( + config.project, config.namespace, config.table, bq_client + ) + for record in records: + row = create_bq_row( + collection_start=record.batch_start.isoformat(), + collection_end=record.batch_end.isoformat(), + country_codes=record.country_codes, + advertiser=record.advertiser, + experiment_slug=record.experiment_slug, + experiment_branch=record.branch, + metric=record.metric, + value_count=record.value_count, + ) + insert_into_bq(row, bq_client, full_table_id) + logging.info("Finished inserting results rows into BQ.") + + +# GCS helper functions +def get_config( + gcp_project: str, config_bucket: str, auth_token: str, hpke_private_key: str +) -> SimpleNamespace: + """Gets the incrementality job's config from a file in a GCS bucket. See example_config.json for the structure.""" + client = storage.Client(project=gcp_project) + try: + bucket = client.get_bucket(config_bucket) + blob = bucket.blob(CONFIG_FILE_NAME) + reader = blob.open("rt") + config = json.load(reader, object_hook=lambda d: SimpleNamespace(**d)) + config.dap.auth_token = auth_token + config.dap.hpke_private_key = hpke_private_key + return config + except Exception as e: + raise Exception( + f"Failed to get job config file: {CONFIG_FILE_NAME} from GCS bucket: \ + {config_bucket} in project: {gcp_project}." + ) from e + + +def write_job_logs_to_bucket(gcp_project: str, config_bucket: str): + client = storage.Client(project=gcp_project) + try: + bucket = client.get_bucket(config_bucket) + blob = bucket.blob(f"logs/{LOG_FILE_NAME}") + blob.upload_from_filename(LOG_FILE_NAME) + except Exception as e: + raise Exception( + f"Failed to upload job log file: {LOG_FILE_NAME} to GCS bucket: {config_bucket} in project: {gcp_project}." + ) from e + + +# General helper functions +def fetch(url: str): + for _ in range(2): + try: + return requests.get( + url, + timeout=30, + headers={"user-agent": "https://github.com/mozilla/docker-etl"}, + ).json() + except Exception as e: + last_exception = e + time.sleep(1) + raise last_exception diff --git a/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/main.py b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/main.py new file mode 100644 index 00000000..7a5561aa --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/main.py @@ -0,0 +1,74 @@ +import click +import logging +import traceback + +from constants import LOG_FILE_NAME +from helpers import ( + get_config, + get_experiment, + prepare_results_rows, + collect_dap_results, + write_job_logs_to_bucket, + write_results_to_bq, +) + + +@click.command() +@click.option("--job_config_gcp_project", + help="GCP project id for the GCS bucket where this job will look for a configuration file. ", + required=True) +@click.option( + "--job_config_bucket", + help="GCS bucket where the configuration for this job can be found. See example_config.json for format details.", + required=True, +) +@click.option( + "--auth_token", + envvar="DAP_AUTH_TOKEN", + help="The 'token' defined in the collector credentials, used to authenticate to the leader", + required=True, +) +@click.option( + "--hpke_private_key", + envvar="DAP_PRIVATE_KEY", + help="The 'private_key' defined in the collector credentials, used to decrypt shares from the leader and helper", + required=True, +) +def main(job_config_gcp_project, job_config_bucket, auth_token, hpke_private_key): + try: + logging.info( + f"Starting collector job with configuration from gcp project: {job_config_gcp_project} and gcs bucket: {job_config_bucket}" + ) + config = get_config( + job_config_gcp_project, job_config_bucket, auth_token, hpke_private_key + ) + logging.info( + f"Starting collector job for experiments: {config.nimbus.experiments}." + ) + + for experiment_config in config.nimbus.experiments: + experiment = get_experiment(experiment_config, config.nimbus.api_url) + + tasks_to_collect = prepare_results_rows(experiment) + collected_tasks = collect_dap_results( + tasks_to_collect, config.dap, experiment_config + ) + + write_results_to_bq(collected_tasks, config.bq) + except Exception as e: + logging.error(f"Collector job failed. Error: {e}\n{traceback.format_exc()}") + raise e + finally: + write_job_logs_to_bucket(job_config_gcp_project, job_config_bucket) + + +if __name__ == "__main__": + logging.basicConfig( + filename=LOG_FILE_NAME, + filemode="a", + format="%(asctime)s,%(msecs)03d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, + ) + logging.getLogger().setLevel(logging.INFO) + main() diff --git a/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/models.py b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/models.py new file mode 100644 index 00000000..59444ecb --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/models.py @@ -0,0 +1,231 @@ +import attr +import cattrs +from datetime import date, datetime, timedelta +import json +import pytz +import re +import tldextract + +from typing import List, Optional + + +@attr.s(auto_attribs=True) +class Branch: + """Defines a branch of a Nimbus experiement from Experimenter.""" + + slug: str + ratio: int + features: Optional[dict] + + +@attr.s(auto_attribs=True) +class NimbusExperiment: + """Represents a v8 Nimbus experiment from Experimenter. Most of these values get read from + Nimbus's GET experiment endpoint's json response. The notable exception is batch_duration, + which currently comes from the experiment's configuration in config.json, but we add it + to this model so we can conveniently figure out when the data should be collected. + + Attributes: + appId: Id of the app we're experimenting on, something like 'firefox-desktop'. + appName: Name of the app we're experimenting on, something like 'firefox_desktop'. + batchDuration: The DAP agreggation time interval. + branches: A list of Branch objects for the experiment's branch data. + bucketConfig: + channel: The release channel for this experiment, something like 'nightly'. + featureIds: A list of all the features used in this experiment. + proposedEnrollment: + referenceBranch: The slug of the control branch. + slug: Normandy slug that uniquely identifies the experiment + in Nimbus. + targeting: A string of js that evaluates to a boolean value indicating + targeting based on region, channel, and user prefs. + + startDate: The day the experiment will begin enrolling users. + endDate: The day the experiment will be turned off. + enrollmentEndDate: The day the experiment's enrollment phase ends. + + """ + + appId: str + appName: str + batchDuration: int + branches: List[Branch] + bucketConfig: dict + channel: str + featureIds: list[str] + proposedEnrollment: int + referenceBranch: Optional[str] + slug: str + targeting: str + + startDate: date + endDate: Optional[date] + enrollmentEndDate: Optional[date] + + # Experiment results are removed from DAP after 2 weeks, so our window to collect results for + # an experiment is up to 2 weeks after the experiment's latest_collectible_batch_end. + # However, we don't need to collect every day of those two weeks (this job runs daily). So this + # constant defines how many days after latest_collectible_batch_end to go out and collect and write to BQ. + COLLECT_RETRY_DAYS = 7 + + @classmethod + def from_dict(cls, d) -> "NimbusExperiment": + """Load an experiment from dict.""" + converter = cattrs.BaseConverter() + converter.register_structure_hook( + date, + lambda num, _: datetime.fromisoformat(num.replace("Z", "+00:00")) + .astimezone(pytz.utc) + .date(), + ) + converter.register_structure_hook( + Branch, + lambda b, _: Branch( + slug=b["slug"], ratio=b["ratio"], features=b["features"] + ), + ) + return converter.structure(d, cls) + + def latest_collectible_batch_start(self) -> date: + latest_collectible_batch_start = self.startDate + # If the experiment's start date is today or in the future, return it + if latest_collectible_batch_start >= self.todays_date(): + return latest_collectible_batch_start + + # While the latest_collectible_batch_start variable is before the batch that includes today... + while latest_collectible_batch_start < ( + self.todays_date() - timedelta(seconds=self.batchDuration) + ): + # Increment the latest_collectible_batch_start by the batch interval + latest_collectible_batch_start = latest_collectible_batch_start + timedelta( + seconds=self.batchDuration + ) + # After the loop, we have the batch start date for the batch that includes today. + # We need to return the previous batch, which is now complete and ready for collection. + return latest_collectible_batch_start - timedelta(seconds=self.batchDuration) + + def latest_collectible_batch_end(self) -> date: + return self.latest_collectible_batch_start() + timedelta( + seconds=self.batchDuration, days=-1 + ) + + def next_collect_date(self) -> date: + return self.latest_collectible_batch_end() + timedelta(days=1) + + def collect_today(self) -> bool: + return ( + self.latest_collectible_batch_end() + < self.todays_date() + < ( + self.latest_collectible_batch_end() + + timedelta(days=self.COLLECT_RETRY_DAYS) + ) + ) + + def todays_date(self) -> date: + return date.today() + + +def get_country_from_targeting(targeting: str) -> Optional[str]: + """Parses the region/country from the targeting string and + returns a JSON formatted list of country codes.""" + # match = re.findall(r"region\s+in\s+(^]+)", targeting) + match = re.search(r"region\s+in\s+\[([^]]+)]", targeting) + + if match: + inner = match.group(1) + regions = [r.strip().strip("'\"") for r in inner.split(",")] + # logging.info("regions: %s", regions) + return json.dumps(regions) + return None + + +def normalize_url(url: str) -> str: + # Replace wildcard with a dummy protocol and subdomain so urlparse can handle it + normalized = re.sub(r"^\*://\*\.?", "https://", url) + return normalized + + +def get_advertiser_from_url(url: str) -> Optional[str]: + """Parses the advertiser name (domain) from the url""" + # tldextract cannot handle wildcards, replace with standard values. + normalized = normalize_url(url) + ext = tldextract.extract(normalized) + return ext.domain + + +@attr.s(auto_attribs=True, auto_detect=True, eq=True) +class IncrementalityBranchResultsRow: + """This object encapsulates all the data for an incrementality experiment branch that uses the + Nimbus dapTelemetry feature. It is used as an intermediate data structure, first to hold the + info from the experiment metadata which is later used in the DAP collection, then to store + the actual count values fetched from DAP, and finally to write most of these attributes to + a BQ results row. + + Attributes: + advertiser: Derived from from the urls stored in the visitCountingExperimentList + key within Nimbus's dapTelemetry feature. + batch_start: The start date of the collection period that we're getting counts for from DAP, inclusive. + batch_end: The end date of the collection period that we're getting counts from DAP, inclusive. + batch_duration: The duration of the collection period that we're requeting counts for from DAP. + branch: A Nimbus experiment branch. Each experiment may have multiple + branches (ie control, treatment-a). + bucket: Stored in Nimbus experiment metadata. Each exeriment branch specifies + the corresponding DAP bucket where the visit counts for that branch + can be collected. + country_codes: The countries where the experiment is active, as an array of ISO country code strings. + experiment_slug: The Nimbus experiment's URL slug + metric: Currently hardcoded to "unique_client_organic_visits" for incrementality. + task_id: Stored in Nimbus experiment metadata. The task id is returned when setting + up DAP counting, and is used to collect the experiment result counts. + task_veclen: Stored in Nimbus experiment metadata. The task_veclen is configured when + setting up DAP counting, and is needed to collect the experiment results. + value_count: The url visits count value collected from DAP for this experiment branch. + """ + + advertiser: str + batch_start: date + batch_end: date + batch_duration: date + branch: str + bucket: int + country_codes: Optional[str] + experiment_slug: str + metric: str + task_id: str + task_veclen: int + value_count: int + + def __init__( + self, + experiment: NimbusExperiment, + branch_slug: str, + visitCountingExperimentListItem: dict, + ): + self.advertiser = "not_set" + urls = visitCountingExperimentListItem.get("urls") + # Default to the first url in the list to determine the advertiser. + if len(urls) > 0: + self.advertiser = get_advertiser_from_url(urls[0]) + self.branch = branch_slug + self.bucket = visitCountingExperimentListItem.get("bucket") + self.batch_start = experiment.latest_collectible_batch_start() + self.batch_end = experiment.latest_collectible_batch_end() + self.batch_duration = experiment.batchDuration + self.country_codes = get_country_from_targeting(experiment.targeting) + self.experiment_slug = experiment.slug + self.metric = "unique_client_organic_visits" + self.task_id = visitCountingExperimentListItem.get("task_id") + self.task_veclen = visitCountingExperimentListItem.get("task_veclen") + # This will be populated when we successfully fetch the count from DAP + self.value_count = None + + def __str__(self): + return str( + f"IncrementalityBranchResultsRow(advertiser='{self.advertiser}', branch='{self.branch}', " + f"bucket='{self.bucket}', batch_start='{self.batch_start}', batch_end='{self.batch_end}', " + f"country_codes='{self.country_codes}', experiment_slug='{self.experiment_slug}', metric='{self.metric}', " + f"task_id='{self.task_id}', task_veclen='{self.task_veclen}', value_count='redacted')" + ) + + __repr__ = __str__ diff --git a/jobs/ads-incrementality-dap-collector/ci_job.yaml b/jobs/ads-incrementality-dap-collector/ci_job.yaml new file mode 100644 index 00000000..ccb615e4 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ci_job.yaml @@ -0,0 +1,15 @@ +build-job-ads-incrementality-dap-collector: + docker: + - image: << pipeline.parameters.git-image >> + steps: + - checkout + - compare-branch: + pattern: ^jobs/ads-incrementality-dap-collector/ + - setup_remote_docker: + version: << pipeline.parameters.docker-version >> + - run: + name: Build Docker image + command: docker build -t app:build jobs/ads-incrementality-dap-collector/ + - run: + name: Test Code + command: docker run app:build python3 -m pytest diff --git a/jobs/ads-incrementality-dap-collector/ci_workflow.yaml b/jobs/ads-incrementality-dap-collector/ci_workflow.yaml new file mode 100644 index 00000000..4cbd5603 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/ci_workflow.yaml @@ -0,0 +1,13 @@ +job-ads-incrementality-dap-collector: + jobs: + - build-job-ads-incrementality-dap-collector + - gcp-gcr/build-and-push-image: + context: data-eng-airflow-gcr + docker-context: jobs/ads-incrementality-dap-collector/ + path: jobs/ads-incrementality-dap-collector/ + image: ads-incrementality-dap-collector_docker_etl + requires: + - build-job-ads-incrementality-dap-collector + filters: + branches: + only: main \ No newline at end of file diff --git a/jobs/ads-incrementality-dap-collector/dev_run_docker.sh b/jobs/ads-incrementality-dap-collector/dev_run_docker.sh new file mode 100755 index 00000000..bfc76bea --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/dev_run_docker.sh @@ -0,0 +1,10 @@ +docker build -t ads_incrementality_dap_collector . + +docker run -it --rm \ + -v $HOME/.config/gcloud:/app/.config/gcloud \ + -e GOOGLE_APPLICATION_CREDENTIALS=/app/.config/gcloud/application_default_credentials.json \ + ads_incrementality_dap_collector python ./ads_incrementality_dap_collector/main.py \ + --job_config_gcp_project moz-fx-dev-mlifshin-sandbox \ + --job_config_bucket ads-nonprod-stage-incrementality-dap-collector-config \ + --auth_token $DAP_AUTH_TOKEN \ + --hpke_private_key $DAP_PRIVATE_KEY \ diff --git a/jobs/ads-incrementality-dap-collector/example_config.json b/jobs/ads-incrementality-dap-collector/example_config.json new file mode 100644 index 00000000..02e76e7a --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/example_config.json @@ -0,0 +1,32 @@ +{ + "bq": { + "project": "example-gcp-project", + "namespace": "ads_dap", + "table": "incrementality" + }, + "dap": { + "hpke_config": "some-base64-encoded-stuff" + }, + "nimbus": { + "api_url": "https://some-experimenter-api-url", + "experiments": [{ + "slug": "some-experiment-1" + }, + { + "slug": "some-experiment-2" + }, + { + "slug": "some-experiment-3" + }, + { + "slug": "some-experiment-4" + }, + { + "slug": "some-experiment-5" + }, + { + "slug": "example-incrementality-study", + "batch_duration" : 432000 + }] + } +} diff --git a/jobs/ads-incrementality-dap-collector/pytest.ini b/jobs/ads-incrementality-dap-collector/pytest.ini new file mode 100644 index 00000000..e618d7a5 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = + tests diff --git a/jobs/ads-incrementality-dap-collector/requirements.txt b/jobs/ads-incrementality-dap-collector/requirements.txt new file mode 100644 index 00000000..0ba261c8 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/requirements.txt @@ -0,0 +1,9 @@ +cattrs==25.1.1 +click==8.0.4 +pytest==6.2.5 +pytest-black==0.3.11 +pytest-flake8==1.0.6 +pytz==2025.2 +google-cloud-bigquery==3.34.0 +google-cloud-storage==3.3.0 +tldextract==5.3.0 diff --git a/jobs/ads-incrementality-dap-collector/setup.py b/jobs/ads-incrementality-dap-collector/setup.py new file mode 100644 index 00000000..6fa1a2f7 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/setup.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +readme = open("README.md").read() + +setup( + name="ads-incrementality-dap-collector", + version="0.1.0", + author="gleonard@mozilla.com", + packages=find_packages(include=["ads_incrementality_dap_collector"]), + long_description=readme, + include_package_data=True, + license="MPL 2.0", +) diff --git a/jobs/ads-incrementality-dap-collector/tests/__init__.py b/jobs/ads-incrementality-dap-collector/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/jobs/ads-incrementality-dap-collector/tests/test_helpers.py b/jobs/ads-incrementality-dap-collector/tests/test_helpers.py new file mode 100644 index 00000000..1ada6475 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/tests/test_helpers.py @@ -0,0 +1,340 @@ +from datetime import date, datetime +import os +import pytest +import re +import sys +from unittest import TestCase +from unittest.mock import call, patch + + +# Append the source code directory to the path +sys.path.append( + os.path.abspath( + os.path.join(os.path.dirname(__file__), "../ads_incrementality_dap_collector") + ) +) + +from tests.test_mocks import ( # noqa: E402 + mock_nimbus_success, + mock_nimbus_fail, + mock_nimbus_experiment, + mock_control_row, + mock_treatment_a_row, + mock_treatment_b_row, + mock_task_id, + mock_nimbus_unparseable_experiment, + mock_tasks_to_collect, + mock_dap_config, + mock_experiment_config, + mock_experiment_config_with_default_duration, + mock_dap_subprocess_success, + mock_dap_subprocess_fail, + mock_dap_subprocess_raise, + mock_collected_tasks, + mock_bq_config, + mock_bq_table, + mock_create_dataset, + mock_create_dataset_fail, + mock_create_table, + mock_create_table_fail, + mock_insert_rows_json, + mock_insert_rows_json_fail, +) +from ads_incrementality_dap_collector.helpers import ( # noqa: E402 + get_experiment, + prepare_results_rows, + collect_dap_results, + write_results_to_bq, +) +from ads_incrementality_dap_collector.constants import ( # noqa: E402 + COLLECTOR_RESULTS_SCHEMA, + DEFAULT_BATCH_DURATION, +) + + +class TestHelpers(TestCase): + @patch("requests.get", side_effect=mock_nimbus_success) + def test_get_experiment_success(self, mock_fetch): + experiment = get_experiment(mock_experiment_config(), "nimbus_api_url") + self.assertEqual("traffic-impact-study-5", experiment.slug) + self.assertEqual( + mock_experiment_config().batch_duration, experiment.batchDuration + ) + self.assertEqual(1, mock_fetch.call_count) + + @patch("requests.get", side_effect=mock_nimbus_success) + def test_get_experiment_with_default_duration_success(self, mock_fetch): + experiment = get_experiment( + mock_experiment_config_with_default_duration(), "nimbus_api_url" + ) + self.assertEqual("traffic-impact-study-5", experiment.slug) + self.assertEqual(DEFAULT_BATCH_DURATION, experiment.batchDuration) + self.assertEqual(1, mock_fetch.call_count) + + @patch("requests.get", side_effect=mock_nimbus_fail) + def test_get_experiment_fail(self, mock_fetch): + with pytest.raises( + Exception, + match="Failed getting experiment: traffic-impact-study-5 from: nimbus_api_url", + ): + _ = get_experiment(mock_experiment_config(), "nimbus_api_url") + self.assertEqual(1, mock_fetch.call_count) + + def test_prepare_results_rows_success(self): + experiment = mock_nimbus_experiment() + results_rows = prepare_results_rows(experiment) + task_id = mock_task_id() + self.assertEqual([task_id], list(results_rows.keys())) + self.assertEqual(mock_control_row(experiment), results_rows[task_id][1]) + self.assertEqual(mock_treatment_a_row(experiment), results_rows[task_id][2]) + self.assertEqual(mock_treatment_b_row(experiment), results_rows[task_id][3]) + + def test_prepare_results_row_unparseable_experiment(self): + experiment = mock_nimbus_unparseable_experiment() + results_rows = prepare_results_rows(experiment) + self.assertEqual({}, results_rows) + self.assertEqual([], list(results_rows.keys())) + + @patch("subprocess.run", side_effect=mock_dap_subprocess_success) + def test_collect_dap_results_success(self, mock_dap_subprocess_success): + tasks_to_collect = mock_tasks_to_collect() + task_id = list(tasks_to_collect.keys())[0] + collect_dap_results( + tasks_to_collect, mock_dap_config(), mock_experiment_config() + ) + self.assertEqual(1, mock_dap_subprocess_success.call_count) + self.assertEqual(tasks_to_collect[task_id][1].value_count, 53) + self.assertEqual(tasks_to_collect[task_id][2].value_count, 48) + self.assertEqual(tasks_to_collect[task_id][3].value_count, 56) + + @patch("subprocess.run", side_effect=mock_dap_subprocess_fail) + def test_collect_dap_results_fail(self, mock_dap_subprocess_fail): + tasks_to_collect = mock_tasks_to_collect() + with pytest.raises( + Exception, match="Failed to parse collected DAP results: None" + ): + collect_dap_results( + tasks_to_collect, mock_dap_config(), mock_experiment_config() + ) + self.assertEqual(1, mock_dap_subprocess_success.call_count) + + @patch("subprocess.run", side_effect=mock_dap_subprocess_raise) + def test_collect_dap_results_raise(self, mock_dap_subprocess_raise): + tasks_to_collect = mock_tasks_to_collect() + task_id = list(tasks_to_collect.keys())[0] + with pytest.raises( + Exception, match=f"Collection failed for {task_id}, 1, stderr: Uh-oh" + ): + collect_dap_results( + tasks_to_collect, mock_dap_config(), mock_experiment_config() + ) + self.assertEqual(1, mock_dap_subprocess_success.call_count) + + @patch("google.cloud.bigquery.Table") + @patch("google.cloud.bigquery.Client") + @patch("ads_incrementality_dap_collector.helpers.datetime") + @patch("tests.test_mocks.NimbusExperiment.todays_date") + def test_write_results_to_bq_success( + self, + todays_date, + datetime_in_helpers, + bq_client, + bq_table, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset + bq_client.return_value.create_table.side_effect = mock_create_table + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json + + mock_datetime = datetime(2025, 9, 19, 16, 54, 34, 366228) + datetime_in_helpers.now.return_value = mock_datetime + datetime_in_helpers.side_effect = lambda *args, **kw: datetime(*args, **kw) + + mock_date = date(2025, 9, 19) + todays_date.return_value = mock_date + + bq_config = mock_bq_config() + collected_tasks = mock_collected_tasks() + write_results_to_bq(collected_tasks, bq_config) + + bq_client.assert_called_once_with(project=bq_config.project) + bq_table.assert_called_once_with( + f"{bq_config.project}.{bq_config.namespace}.{bq_config.table}", + schema=COLLECTOR_RESULTS_SCHEMA, + ) + bq_client.return_value.create_dataset.assert_called_once_with( + f"{bq_config.project}.{bq_config.namespace}", exists_ok=True + ) + bq_client.return_value.create_table.assert_called_once_with( + mock_bq_table(), exists_ok=True + ) + + calls = [ + call( + table=f"{bq_config.project}.{bq_config.namespace}.{bq_config.table}", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-14", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "control", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 13645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + call( + table=f"{bq_config.project}.{bq_config.namespace}.{bq_config.table}", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-14", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "treatment-b", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 18645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + call( + table=f"{bq_config.project}.{bq_config.namespace}.{bq_config.table}", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-14", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "treatment-a", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 9645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + ] + bq_client.return_value.insert_rows_json.assert_has_calls(calls) + + @patch("google.cloud.bigquery.Client") + def test_write_results_to_bq_create_dataset_fail( + self, + bq_client, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset_fail + bq_client.return_value.create_table.side_effect = mock_create_table + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json + bq_config = mock_bq_config() + + with pytest.raises(Exception, match="BQ create dataset Uh-oh"): + write_results_to_bq(mock_collected_tasks(), bq_config) + + bq_client.return_value.create_dataset.assert_called_once_with( + f"{bq_config.project}.{bq_config.namespace}", exists_ok=True + ) + bq_client.return_value.create_table.assert_not_called() + bq_client.return_value.insert_rows_json.assert_not_called() + + @patch("google.cloud.bigquery.Client") + def test_write_results_to_bq_create_table_fail( + self, + bq_client, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset + bq_client.return_value.create_table.side_effect = mock_create_table_fail + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json + bq_config = mock_bq_config() + + with pytest.raises( + Exception, + match=f"Failed to create BQ table: {bq_config.project}.{bq_config.namespace}.{bq_config.table}", + ): + + write_results_to_bq(mock_collected_tasks(), bq_config) + bq_client.return_value.create_dataset.assert_called_once_with( + f"{bq_config.project}.{bq_config.namespace}", exists_ok=True + ) + bq_client.return_value.create_table.assert_called_once_with( + mock_bq_table(), exists_ok=True + ) + bq_client.return_value.insert_rows_json.assert_not_called() + + @patch("google.cloud.bigquery.Client") + def test_write_results_to_bq_insert_rows_fail( + self, + bq_client, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset + bq_client.return_value.create_table.side_effect = mock_create_table + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json_fail + bq_config = mock_bq_config() + mock_datetime = datetime(2025, 9, 19, 16, 54, 34, 366228) + + with pytest.raises( + Exception, + match=re.escape( + "Error inserting rows into some-gcp-project-id.ads_dap.incrementality: [{'key': 0, 'errors': 'Problem writing bucket 1 results'}, {'key': 1, 'errors': 'Problem writing bucket 2 results'}, {'key': 2, 'errors': 'Problem writing bucket 3 results'}]" # noqa: E501 + ), + ): + write_results_to_bq(mock_collected_tasks(), bq_config) + bq_client.return_value.create_dataset.assert_called_once_with( + f"{bq_config.project}.{bq_config.namespace}", exists_ok=True + ) + bq_client.return_value.create_table.assert_called_once_with( + mock_bq_table(), exists_ok=True + ) + calls = [ + call( + table="some-gcp-project-id.ads_dap.incrementality", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-15", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "control", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 13645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + call( + table="some-gcp-project-id.ads_dap.incrementality", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-15", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "treatment-b", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 18645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + call( + table="some-gcp-project-id.ads_dap.incrementality", + json_rows=[ + { + "collection_start": "2025-09-08", + "collection_end": "2025-09-15", + "country_codes": '["US"]', + "experiment_slug": "traffic-impact-study-5", + "experiment_branch": "treatment-a", + "advertiser": "glamazon", + "metric": "unique_client_organic_visits", + "value": {"count": 9645, "histogram": None}, + "created_at": mock_datetime.isoformat(), + } + ], + ), + ] + bq_client.return_value.insert_rows_json.assert_has_calls(calls) diff --git a/jobs/ads-incrementality-dap-collector/tests/test_mock_responses.py b/jobs/ads-incrementality-dap-collector/tests/test_mock_responses.py new file mode 100644 index 00000000..101d75b4 --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/tests/test_mock_responses.py @@ -0,0 +1,226 @@ +NIMBUS_SUCCESS = { + "slug": "traffic-impact-study-5", + "appName": "firefox_desktop", + "appId": "firefox-desktop", + "channel": "nightly", + "bucketConfig": { + "randomizationUnit": "group_id", + "namespace": "firefox-desktop-dapTelemetry-newtabSponsoredContent-nightly-group_id-2", + "start": 0, + "count": 10000, + "total": 10000, + }, + "featureIds": ["dapTelemetry", "newtabSponsoredContent"], + "branches": [ + { + "slug": "control", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetry", + "enabled": True, + "value": { + "enabled": True, + "visitCountingEnabled": True, + "visitCountingExperimentList": [ + { + "name": "1841986", + "bucket": 1, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": ["*://*.glamazon.com/"], + } + ], + }, + }, + { + "featureId": "newtabSponsoredContent", + "enabled": True, + "value": { + "tilesPlacements": "newtab_tile_exp_1a, newtab_tile_exp_2a, newtab_tile_exp_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + { + "slug": "treatment-a", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetry", + "enabled": True, + "value": { + "enabled": True, + "visitCountingEnabled": True, + "visitCountingExperimentList": [ + { + "name": "1841986", + "bucket": 2, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": ["*://*.glamazon.com/"], + } + ], + }, + }, + { + "featureId": "newtabSponsoredContent", + "enabled": True, + "value": { + "tilesPlacements": "newtab_tile_exp_1b, newtab_tile_exp_2a, newtab_tile_exp_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + { + "slug": "treatment-b", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetry", + "enabled": True, + "value": { + "enabled": True, + "visitCountingEnabled": True, + "visitCountingExperimentList": [ + { + "name": "1841986", + "bucket": 3, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": [ + "*://*.glamazon.com/", + "*://*.glamazon.com/*tag=admarketus*ref=*mfadid=adm", + ], + } + ], + }, + }, + { + "featureId": "newtabSponsoredContent", + "enabled": True, + "value": { + "tilesPlacements": "newtab_tile_exp_1b, newtab_tile_exp_2a, newtab_tile_exp_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + ], + # noqa: E501 + "targeting": "(browserSettings.update.channel == \"nightly\") && ((experiment.slug in activeExperiments) || ((\n 'browser.newtabpage.activity-stream.showSponsoredTopSites'|preferenceValue\n ) && (version|versionCompare('140.!') >= 0) && (region in ['US'])))", # noqa: E501 + "startDate": "2025-08-18", + "enrollmentEndDate": None, + "endDate": None, + "proposedEnrollment": 7, + "referenceBranch": "control", +} + +NIMBUS_NOT_AN_INCREMENTALITY_EXPERIMENT = { + "slug": "something-experiment", + "appName": "firefox_desktop", + "appId": "firefox-desktop", + "channel": "nightly", + "bucketConfig": { + "randomizationUnit": "group_id", + "namespace": "firefox-desktop-dapTelemetry-newtabSponsoredContent-nightly-group_id-2", + "start": 0, + "count": 10000, + "total": 10000, + }, + "featureIds": [ + "dapTelemetryIsNotPartOfThisExperiment", + "newtabSponsoredContentIsNotPartOfThisExperiment", + ], + "branches": [ + { + "slug": "control", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetryIsNotPartOfThisExperiment", + "enabled": True, + "value": { + "enabled": True, + "somethingEnabled": True, + "someOtherExperimentList": [ + { + "name": "1841986890", + } + ], + }, + }, + { + "featureId": "newtabSponsoredContentIsNotPartofThisExperiment", + "enabled": True, + "value": { + "somePlacements": "something_1a, something_2a, something_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + { + "slug": "treatment-a", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetryIsNotPartOfThisExperiment", + "enabled": True, + "value": { + "enabled": True, + "somethingEnabled": True, + "someOtherExperimentList": [ + { + "name": "1841986625495", + } + ], + }, + }, + { + "featureId": "newtabSponsoredContentIsNotPartofThisExperiment", + "enabled": True, + "value": { + "somePlacements": "something_1a, something_2a, something_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + { + "slug": "treatment-b", + "ratio": 1, + "features": [ + { + "featureId": "dapTelemetryIsNotPartOfThisExperiment", + "enabled": True, + "value": { + "enabled": True, + "somethingEnabled": True, + "someOtherExperimentList": [ + { + "name": "18419866254958234765", + } + ], + }, + }, + { + "featureId": "newtabSponsoredContentIsNotPartofThisExperiment", + "enabled": True, + "value": { + "somePlacements": "something_1a, something_2a, something_3a" + }, + }, + ], + "firefoxLabsTitle": None, + }, + ], + "targeting": "(browserSettings.update.channel == \"nightly\") && ((experiment.slug in activeExperiments) || ((\n 'browser.newtabpage.activity-stream.showSponsoredTopSites'|preferenceValue\n ) && (version|versionCompare('140.!') >= 0) && (region in ['US'])))", # noqa: E501 + "startDate": "2025-08-18", + "enrollmentEndDate": None, + "endDate": "2025-09-15", + "proposedEnrollment": 7, + "referenceBranch": "control", +} diff --git a/jobs/ads-incrementality-dap-collector/tests/test_mocks.py b/jobs/ads-incrementality-dap-collector/tests/test_mocks.py new file mode 100644 index 00000000..bcd6914b --- /dev/null +++ b/jobs/ads-incrementality-dap-collector/tests/test_mocks.py @@ -0,0 +1,351 @@ +from google.cloud import bigquery +from collections.abc import Mapping, Sequence +from subprocess import CompletedProcess +from types import SimpleNamespace + +from models import ( + IncrementalityBranchResultsRow, + NimbusExperiment, +) +from tests.test_mock_responses import ( + NIMBUS_SUCCESS, + NIMBUS_NOT_AN_INCREMENTALITY_EXPERIMENT, +) + + +class MockResponse: + """Mock for returning a response, used in functions that mock requests.""" + + def __init__( + self, json_data: object, status_code: int, headers_data: object = None + ): + self.json_data = json_data + self.status_code = status_code + self.headers = headers_data or {} + self.ok = 200 <= self.status_code < 400 + + def json(self): + """Mock json data.""" + return self.json_data + + +def mock_nimbus_success(*args, **kwargs) -> MockResponse: + """Mock successful POST requests to Nimbus.""" + + return MockResponse(NIMBUS_SUCCESS, 200) + + +def mock_nimbus_fail(*args, **kwargs) -> MockResponse: + """Mock failing POST requests to Nimbus.""" + + return MockResponse({}, 404) + + +def mock_nimbus_experiment() -> NimbusExperiment: + nimbus_success_json = NIMBUS_SUCCESS + nimbus_success_json["batchDuration"] = mock_experiment_config().batch_duration + return NimbusExperiment.from_dict(nimbus_success_json) + + +def mock_task_id() -> str: + return "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o" + + +def mock_control_row(experiment) -> IncrementalityBranchResultsRow: + return IncrementalityBranchResultsRow( + experiment, + "control", + { + "name": "1841986", + "bucket": 1, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": ["*://*.glamazon.com/"], + }, + ) + + +def mock_treatment_a_row(experiment) -> IncrementalityBranchResultsRow: + return IncrementalityBranchResultsRow( + experiment, + "treatment-a", + { + "name": "1841986", + "bucket": 2, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": ["*://*.glamazon.com/"], + }, + ) + + +def mock_treatment_b_row(experiment) -> IncrementalityBranchResultsRow: + return IncrementalityBranchResultsRow( + experiment, + "treatment-b", + { + "name": "1841986", + "bucket": 3, + "task_id": "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "task_veclen": 4, + "urls": [ + "*://*.glamazon.com/", + "*://*.glamazon.com/*tag=admarketus*ref=*mfadid=adm", + ], + }, + ) + + +def mock_nimbus_unparseable_experiment() -> NimbusExperiment: + nimbus_unparseable_json = NIMBUS_NOT_AN_INCREMENTALITY_EXPERIMENT + nimbus_unparseable_json["batchDuration"] = mock_experiment_config().batch_duration + return NimbusExperiment.from_dict(nimbus_unparseable_json) + + +def mock_tasks_to_collect() -> dict[str, dict[int, IncrementalityBranchResultsRow]]: + experiment = mock_nimbus_experiment() + return { + "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o": { + 1: mock_control_row(experiment), + 2: mock_treatment_b_row(experiment), + 3: mock_treatment_a_row(experiment), + } + } + + +def mock_collected_tasks() -> dict[str, dict[int, IncrementalityBranchResultsRow]]: + experiment = mock_nimbus_experiment() + tasks_to_collect = { + "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o": { + 1: mock_control_row(experiment), + 2: mock_treatment_b_row(experiment), + 3: mock_treatment_a_row(experiment), + } + } + tasks_to_collect["mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o"][ + 1 + ].value_count = 13645 + tasks_to_collect["mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o"][ + 2 + ].value_count = 18645 + tasks_to_collect["mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o"][ + 3 + ].value_count = 9645 + return tasks_to_collect + + +def mock_dap_config() -> SimpleNamespace: + return SimpleNamespace( + hpke_config="AQAgAAEAAQAgpdceoGiuWvIiogA8SPCdprkhWMNtLq_y0GSePI7EhXE", + auth_token="shh-secret-token", + hpke_private_key="ssh-private-key", + batch_start="1755291600", + ) + + +def mock_experiment_config() -> SimpleNamespace: + return SimpleNamespace(slug="traffic-impact-study-5", batch_duration=604800) + + +def mock_experiment_config_with_default_duration() -> SimpleNamespace: + return SimpleNamespace(slug="traffic-impact-study-5") + + +def mock_bq_config() -> SimpleNamespace: + return SimpleNamespace( + project="some-gcp-project-id", namespace="ads_dap", table="incrementality" + ) + + +def mock_dap_subprocess_success( + args: list[str], capture_output: bool, text: bool, check: bool, timeout: int +) -> CompletedProcess: + return CompletedProcess( + args=[ + "./collect", + "--task-id", + "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "--leader", + "https://dap-leader-url", + "--vdaf", + "histogram", + "--length", + "3", + "--authorization-bearer-token", + "ssh_secret_token", + "--batch-interval-start", + "1756335600", + "--batch-interval-duration", + "3600", + "--hpke-config", + "AQAgAAEAAQAgpdceoGiuWvIiogA8SPCdprkhWMNtLq_y0GSePI7EhXE", + "--hpke-private-key", + "ssh-secret-private-key", + ], + returncode=0, + stdout="Number of reports: 150\nInterval start: 2025-08-27 23:32:00 UTC\nInterval end: 2025-08-27 23:34:00 UTC\nInterval length: 120s\nAggregation result: [53, 48, 56]\n", # noqa: E501 + stderr="", + ) + + +def mock_dap_subprocess_fail( + args: list[str], capture_output: bool, text: bool, check: bool, timeout: int +) -> CompletedProcess: + return CompletedProcess( + args=[ + "./collect", + "--task-id", + "mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o", + "--leader", + "https://dap-09-3.api.divviup.org", + "--vdaf", + "histogram", + "--length", + "3", + "--authorization-bearer-token", + "ssh_secret_token", + "--batch-interval-start", + "1756335600", + "--batch-interval-duration", + "3600", + "--hpke-config", + "AQAgAAEAAQAgpdceoGiuWvIiogA8SPCdprkhWMNtLq_y0GSePI7EhXE", + "--hpke-private-key", + "ssh-secret-private-key", + ], + returncode=0, + stdout="Derp", + stderr="Uh-oh stuff went wrong actually", + ) + + +def mock_dap_subprocess_raise( + args: list[str], capture_output: bool, text: bool, check: bool, timeout: int +) -> CompletedProcess: + raise Exception( + "Collection failed for mubArkO3So8Co1X98CBo62-lSCM4tB-NZPOUGJ83N1o, 1, stderr: Uh-oh" + ) from None + + +def mock_create_dataset(data_set: str, exists_ok: bool): + pass + + +def mock_create_dataset_fail(data_set: str, exists_ok: bool): + raise Exception("BQ create dataset Uh-oh") + + +def mock_create_table(table: bigquery.Table, exists_ok: bool): + pass + + +def mock_create_table_fail(table: bigquery.Table, exists_ok: bool): + raise Exception("BQ create table Uh-oh") + + +def mock_insert_rows_json(table: str, json_rows: dict) -> Sequence[Mapping]: + return [] + + +def mock_insert_rows_json_fail(table: str, json_rows: dict) -> Sequence[Mapping]: + return [ + {"key": 0, "errors": "Problem writing bucket 1 results"}, + {"key": 1, "errors": "Problem writing bucket 2 results"}, + {"key": 2, "errors": "Problem writing bucket 3 results"}, + ] + + +def mock_bq_table() -> bigquery.Table: + return bigquery.Table( + "some-gcp-project-id.ads_dap.incrementality", + schema=[ + bigquery.SchemaField( + "collection_start", + "DATE", + "REQUIRED", + None, + "Start date of the collected time window, inclusive.", + (), + None, + ), + bigquery.SchemaField( + "collection_end", + "DATE", + "REQUIRED", + None, + "End date of the collected time window, inclusive.", + (), + None, + ), + bigquery.SchemaField( + "country_codes", + "JSON", + "NULLABLE", + None, + "List of 2-char country codes for the experiment", + (), + None, + ), + bigquery.SchemaField( + "experiment_slug", + "STRING", + "REQUIRED", + None, + "Slug indicating the experiment.", + (), + None, + ), + bigquery.SchemaField( + "experiment_branch", + "STRING", + "REQUIRED", + None, + "The experiment branch this data is associated with.", + (), + None, + ), + bigquery.SchemaField( + "advertiser", + "STRING", + "REQUIRED", + None, + "Advertiser associated with this experiment.", + (), + None, + ), + bigquery.SchemaField( + "metric", + "STRING", + "REQUIRED", + None, + "Metric collected for this experiment.", + (), + None, + ), + bigquery.SchemaField( + "value", + "RECORD", + "REQUIRED", + None, + None, + ( + bigquery.SchemaField( + "count", "INT64", "NULLABLE", None, None, (), None + ), + bigquery.SchemaField( + "histogram", "JSON", "NULLABLE", None, None, (), None + ), + ), + None, + ), + bigquery.SchemaField( + "created_at", + "TIMESTAMP", + "REQUIRED", + None, + "Timestamp for when this row was written.", + (), + None, + ), + ], + )