Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7305d05
Initial version of ETL job
gleonard-m Jul 9, 2025
b69a042
Removed deleted project
gleonard-m Jul 9, 2025
3e051a7
Add Glenda's utils, a dev runbook doc, and rename some args and env vars
mashalifshin Aug 6, 2025
7d21e54
WIP: First pass refactor almost complete
mashalifshin Aug 12, 2025
89a4cb5
Working first-pass refactor
mashalifshin Aug 12, 2025
3ad955b
Add helper functions for the main for loops, add config objects, add …
mashalifshin Aug 13, 2025
caee055
Read job config from GCS bucket
mashalifshin Aug 16, 2025
4fd78a5
Tweak error handling, whitespace, and add more comments
mashalifshin Aug 16, 2025
c7f3bfd
Whoops, remove manual testing of Nimbus fetching error handling
mashalifshin Aug 18, 2025
cfbe5fe
Write job output to file in gcs bucket
mashalifshin Aug 18, 2025
f8e1e7b
Give log files an extension
mashalifshin Aug 18, 2025
ee4e345
Tweak log message
mashalifshin Aug 18, 2025
6133976
WIP tests and add some hopefully helpful type hints
mashalifshin Aug 29, 2025
e9350e0
Add dap collection helper tests and fix bug with bucket indexing
mashalifshin Aug 30, 2025
6ab2eae
Remove copypasta job clause in circleci config
mashalifshin Aug 30, 2025
a0bb7f7
Bump up the process timeout threshold
mashalifshin Aug 30, 2025
7146c86
Make batch_start a param again as that will come from the airflow job
mashalifshin Aug 30, 2025
210a696
Configure batch duration per-experiment
mashalifshin Aug 30, 2025
5074a9b
Remove unused test
mashalifshin Aug 30, 2025
258fe4b
Clean up the offset for bucket keys handling by pushing it down into …
mashalifshin Sep 2, 2025
c3856ef
Fix CI job for running tests, and a bit of cleanup
mashalifshin Sep 3, 2025
55d9003
Reduce mock responses to mostly just necessary fields
mashalifshin Sep 4, 2025
4630ae6
Rough first pass for populating collect_start and collect_end dynamic…
mashalifshin Sep 10, 2025
77b2fcc
Add tests for top-level writing to BQ helper
mashalifshin Sep 12, 2025
e2d5b7b
Redact the task veclen and value count when printing IncrementalityBr…
mashalifshin Sep 12, 2025
6f5aed7
Lint
mashalifshin Sep 12, 2025
5f197ec
Fix bigquery client mocking to fix tests in CI
mashalifshin Sep 20, 2025
05cd838
Rename hpke_token to auth_token
mashalifshin Sep 24, 2025
12cfa71
Fix batch interval logic to make batch_end_date inclusive, and WIP un…
mashalifshin Sep 25, 2025
8d531f0
Give batch duration a default of 7 days and make it optional in config
mashalifshin Sep 25, 2025
8cb20c8
Add README and tests for default duration
mashalifshin Sep 25, 2025
bcbd562
Lint and format (and re-enable logging to bucket that was accidentall…
mashalifshin Sep 25, 2025
58be045
Fix bug where created_at was being written to be BQ as a datetime and…
mashalifshin Sep 26, 2025
81670c3
Fix up exception handling in main loop for throwing to airflow and up…
mashalifshin Sep 26, 2025
14942a9
Remove some unnecessary models
mashalifshin Sep 26, 2025
681c121
Expect created_at to be a string in insert_rows_json calls
mashalifshin Oct 1, 2025
a9ca652
Date mocking attempt 1: from unittest.mock docs, partial mocking section
mashalifshin Oct 1, 2025
f87c915
Date mocking attempt 2: define a wrapper function on the class and mo…
mashalifshin Oct 1, 2025
1eb5271
Date mocking attempt 3: classmethod on MockDate wrapper class
mashalifshin Oct 1, 2025
2a42873
Date mocking attempt 4: Frozen mock class
mashalifshin Oct 2, 2025
ccacd16
Date mocking success: Patch a wrapper function on experiment class in…
mashalifshin Oct 2, 2025
252fa3a
Remove unnecessary DAP docs and generalize examples
mashalifshin Oct 3, 2025
22cf4fa
Clean up done todo and use example value in test mocks
mashalifshin Oct 3, 2025
e006488
Separate params for config bucket gcp project and BQ table gcp project
mashalifshin Oct 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ jobs:
name: Verify CI config is up-to-date
command: docker run docker-etl:build python3 -m docker_etl.ci_config --dry-run | diff -B .circleci/config.yml -

build-job-ads-incrementality-dap-collector:
docker:
- image: << pipeline.parameters.git-image >>
steps:
- checkout
- compare-branch:
pattern: ^jobs/ads-incrementality-dap-collector/
- setup_remote_docker:
version: << pipeline.parameters.docker-version >>
- run:
name: Build Docker image
command: docker build -t app:build jobs/ads-incrementality-dap-collector/
- run:
name: Test Code
command: docker run app:build python3 -m pytest

build-job-bq2sftp:
docker:
- image: << pipeline.parameters.git-image >>
Expand Down Expand Up @@ -213,6 +229,7 @@ jobs:
name: Build Docker image
command: docker build -t app:build jobs/experiments-monitoring-data-export/


build-job-extensions:
docker:
- image: << pipeline.parameters.git-image >>
Expand Down Expand Up @@ -402,6 +419,20 @@ workflows:
jobs:
- build-docker-etl

job-ads-incrementality-dap-collector:
jobs:
- build-job-ads-incrementality-dap-collector
- gcp-gcr/build-and-push-image:
context: data-eng-airflow-gcr
docker-context: jobs/ads-incrementality-dap-collector/
path: jobs/ads-incrementality-dap-collector/
image: ads-incrementality-dap-collector_docker_etl
requires:
- build-job-ads-incrementality-dap-collector
filters:
branches:
only: main

job-bq2sftp:
jobs:
- build-job-bq2sftp
Expand Down Expand Up @@ -533,7 +564,6 @@ workflows:
branches:
only: main


job-extensions:
jobs:
- build-job-extensions
Expand Down
13 changes: 13 additions & 0 deletions jobs/ads-incrementality-dap-collector/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.cache/
ci_job.yaml
ci_workflow.yaml
public_key_to_hpke_config.py
dev_run_docker.sh
dev_runbook.md
.DS_Store
example_config.json
*.pyc
.pytest_cache/
.python-version
__pycache__/
venv/
2 changes: 2 additions & 0 deletions jobs/ads-incrementality-dap-collector/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
5 changes: 5 additions & 0 deletions jobs/ads-incrementality-dap-collector/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.DS_Store
*.pyc
__pycache__/
venv/
.python-version
38 changes: 38 additions & 0 deletions jobs/ads-incrementality-dap-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
FROM python:3.12
LABEL maintainer="Glenda Leonard <[email protected]>"
ARG HOME="/janus_build"
WORKDIR ${HOME}

RUN apt update && apt --yes install curl

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
ENV PATH=$HOME/.cargo/bin:$PATH

# build the CLI tool
RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69'
RUN cd janus && cargo build -r -p janus_tools --bin collect

######### next stage

FROM python:3.12
LABEL maintainer="Glenda Leonard <[email protected]>"
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md
ARG USER_ID="10001"
ARG GROUP_ID="app"
ARG HOME="/app"
WORKDIR ${HOME}

RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID}
##################### from other Dockerfile
COPY --from=0 /janus_build/janus/target/release/collect ./
###################

# Drop root and change ownership of the application folder to the user
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME}
USER ${USER_ID}
ADD ./requirements.txt .
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

ADD . .
131 changes: 131 additions & 0 deletions jobs/ads-incrementality-dap-collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Ads Incrementality DAP collector

## Background

Incrementality is a way to measure the effectiveness of our ads in a general, agreggated, privacy-preserving way --
without knowing anything about specific users.

Incrementality works by dividing clients into various Nimbus experiment branches that vary how/whether an ad is shown.
Separately, a [DAP](https://docs.divviup.org/) task is configured to store the metrics for each experiment branch in a
different DAP bucket.

Firefox is instrumented with [DAP telemetry functionality](https://github.com/mozilla-firefox/firefox/tree/main/toolkit/components/telemetry/dap), which allows it to send metrics and reports into the correct DAP buckets as configured in the experiment.

Then this job can go out and collect metrics from DAP (using bucket info from the experiment's data), and write them
to BQ.

## Overview

This job is driven by a config file from a GCS bucket. Inform the job of the config file location by passing the
`gcp_project` and `gcs_config_bucket` parameters. See `example_config.json` for how to structure this file.

The config file specifies the incrementality experiments that are currently running, some config and credentials from DAP,
and where in BQ to write the incrementality results.

The job will go out to Nimbus and read data for each of the experiments, then go out to DAP and read experiment branch results,
then put it all together into results rows and write metrics to BQ.

## Configuration

The three recognized top-level keys here are `bq`, `dap`, and `nimbus`

#### bq

Everything the job needs to connect to BigQuery.

- `project`: GCP project
- `namespace`: BQ namespace for ads incrementality
- `table`: BQ table where incrementality results go

#### dap

Everything the job needs to connect to DAP.

- `auth_token`: Token defined in the collector credentials, used to authenticate to the leader
- `hpke_private_key`: Private key defined in the collector credentials, used to decrypt shares from the leader
and helper
- `hpke_config`: base64 url-encoded version of public key defined in the collector credentials
- `batch_start`: Start of the collection interval, as the number of seconds since the Unix epoch


#### nimbus

Everything the job needs to connect to Nimbus.

- `api_url`: API URL for fetching the Nimbus experiment info
- `experiments`: List of incrementality experiments configs

##### experiment config list

The experiments that the job should collect results for.

- `slug`: Experiment slug
- `batch_duration`: Optional. Duration of the collection batch interval, in seconds.
This will default to 7 days if not specified

## Usage

This script is intended to be run in a docker container.

It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it
starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it
also documents those variables.

Once the environment variables are set up, run the job with:


```sh
./dev_run_docker.sh
```

To just build the docker image, use:

```sh
docker build -t ads_incrementality_dap_collector .
```

To run outside of docker, install dependencies with:

```sh
pip install -r requirements.txt
```

Run the script with:

```sh
python3 -m python_template_job.main
```

## Testing

Run tests with:

```sh
python3 -m pytest
```

## Linting and format

`flake8` and `black` are included for code linting and formatting:

```sh
pytest --black --flake8
```

or

```sh
flake8 .
```

or

```sh
black .
```

or

```sh
black --diff .
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datetime import datetime

from google.cloud import bigquery

DAP_LEADER = "https://dap-09-3.api.divviup.org"
VDAF = "histogram"
PROCESS_TIMEOUT = 1200 # 20 mins

CONFIG_FILE_NAME = "config.json" # See example_config.json for the contents and structure of the job config file.
LOG_FILE_NAME = f"{datetime.now()}-ads-incrementality-dap-collector.log"

DEFAULT_BATCH_DURATION = 604800

COLLECTOR_RESULTS_SCHEMA = [
bigquery.SchemaField(
"collection_start",
"DATE",
mode="REQUIRED",
description="Start date of the collected time window, inclusive.",
),
bigquery.SchemaField(
"collection_end",
"DATE",
mode="REQUIRED",
description="End date of the collected time window, inclusive.",
),
bigquery.SchemaField(
"country_codes",
"JSON",
mode="NULLABLE",
description="List of 2-char country codes for the experiment",
),
bigquery.SchemaField(
"experiment_slug",
"STRING",
mode="REQUIRED",
description="Slug indicating the experiment.",
),
bigquery.SchemaField(
"experiment_branch",
"STRING",
mode="REQUIRED",
description="The experiment branch this data is associated with.",
),
bigquery.SchemaField(
"advertiser",
"STRING",
mode="REQUIRED",
description="Advertiser associated with this experiment.",
),
bigquery.SchemaField(
"metric",
"STRING",
mode="REQUIRED",
description="Metric collected for this experiment.",
),
bigquery.SchemaField(
name="value",
field_type="RECORD",
mode="REQUIRED",
fields=[
bigquery.SchemaField("count", "INT64", mode="NULLABLE"),
bigquery.SchemaField("histogram", "JSON", mode="NULLABLE"),
],
),
bigquery.SchemaField(
"created_at",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to created_timestamp ?

"TIMESTAMP",
mode="REQUIRED",
description="Timestamp for when this row was written.",
),
]
Loading