-
Notifications
You must be signed in to change notification settings - Fork 17
[AE-782] Build dap collector job for incrementality experiments #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mashalifshin
wants to merge
44
commits into
main
Choose a base branch
from
ae-782-build-dap-collector-job
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,939
−1
Open
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
7305d05
Initial version of ETL job
gleonard-m b69a042
Removed deleted project
gleonard-m 3e051a7
Add Glenda's utils, a dev runbook doc, and rename some args and env vars
mashalifshin 7d21e54
WIP: First pass refactor almost complete
mashalifshin 89a4cb5
Working first-pass refactor
mashalifshin 3ad955b
Add helper functions for the main for loops, add config objects, add …
mashalifshin caee055
Read job config from GCS bucket
mashalifshin 4fd78a5
Tweak error handling, whitespace, and add more comments
mashalifshin c7f3bfd
Whoops, remove manual testing of Nimbus fetching error handling
mashalifshin cfbe5fe
Write job output to file in gcs bucket
mashalifshin f8e1e7b
Give log files an extension
mashalifshin ee4e345
Tweak log message
mashalifshin 6133976
WIP tests and add some hopefully helpful type hints
mashalifshin e9350e0
Add dap collection helper tests and fix bug with bucket indexing
mashalifshin 6ab2eae
Remove copypasta job clause in circleci config
mashalifshin a0bb7f7
Bump up the process timeout threshold
mashalifshin 7146c86
Make batch_start a param again as that will come from the airflow job
mashalifshin 210a696
Configure batch duration per-experiment
mashalifshin 5074a9b
Remove unused test
mashalifshin 258fe4b
Clean up the offset for bucket keys handling by pushing it down into …
mashalifshin c3856ef
Fix CI job for running tests, and a bit of cleanup
mashalifshin 55d9003
Reduce mock responses to mostly just necessary fields
mashalifshin 4630ae6
Rough first pass for populating collect_start and collect_end dynamic…
mashalifshin 77b2fcc
Add tests for top-level writing to BQ helper
mashalifshin e2d5b7b
Redact the task veclen and value count when printing IncrementalityBr…
mashalifshin 6f5aed7
Lint
mashalifshin 5f197ec
Fix bigquery client mocking to fix tests in CI
mashalifshin 05cd838
Rename hpke_token to auth_token
mashalifshin 12cfa71
Fix batch interval logic to make batch_end_date inclusive, and WIP un…
mashalifshin 8d531f0
Give batch duration a default of 7 days and make it optional in config
mashalifshin 8cb20c8
Add README and tests for default duration
mashalifshin bcbd562
Lint and format (and re-enable logging to bucket that was accidentall…
mashalifshin 58be045
Fix bug where created_at was being written to be BQ as a datetime and…
mashalifshin 81670c3
Fix up exception handling in main loop for throwing to airflow and up…
mashalifshin 14942a9
Remove some unnecessary models
mashalifshin 681c121
Expect created_at to be a string in insert_rows_json calls
mashalifshin a9ca652
Date mocking attempt 1: from unittest.mock docs, partial mocking section
mashalifshin f87c915
Date mocking attempt 2: define a wrapper function on the class and mo…
mashalifshin 1eb5271
Date mocking attempt 3: classmethod on MockDate wrapper class
mashalifshin 2a42873
Date mocking attempt 4: Frozen mock class
mashalifshin ccacd16
Date mocking success: Patch a wrapper function on experiment class in…
mashalifshin 252fa3a
Remove unnecessary DAP docs and generalize examples
mashalifshin 22cf4fa
Clean up done todo and use example value in test mocks
mashalifshin e006488
Separate params for config bucket gcp project and BQ table gcp project
mashalifshin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
.cache/ | ||
ci_job.yaml | ||
ci_workflow.yaml | ||
public_key_to_hpke_config.py | ||
dev_run_docker.sh | ||
dev_runbook.md | ||
.DS_Store | ||
example_config.json | ||
*.pyc | ||
.pytest_cache/ | ||
.python-version | ||
__pycache__/ | ||
venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[flake8] | ||
max-line-length = 120 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
.DS_Store | ||
*.pyc | ||
__pycache__/ | ||
venv/ | ||
.python-version |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
FROM python:3.12 | ||
LABEL maintainer="Glenda Leonard <[email protected]>" | ||
ARG HOME="/janus_build" | ||
WORKDIR ${HOME} | ||
|
||
RUN apt update && apt --yes install curl | ||
|
||
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y | ||
ENV PATH=$HOME/.cargo/bin:$PATH | ||
|
||
# build the CLI tool | ||
RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69' | ||
RUN cd janus && cargo build -r -p janus_tools --bin collect | ||
|
||
######### next stage | ||
|
||
FROM python:3.12 | ||
LABEL maintainer="Glenda Leonard <[email protected]>" | ||
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md | ||
ARG USER_ID="10001" | ||
ARG GROUP_ID="app" | ||
ARG HOME="/app" | ||
WORKDIR ${HOME} | ||
|
||
RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ | ||
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} | ||
##################### from other Dockerfile | ||
COPY --from=0 /janus_build/janus/target/release/collect ./ | ||
################### | ||
|
||
# Drop root and change ownership of the application folder to the user | ||
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} | ||
USER ${USER_ID} | ||
ADD ./requirements.txt . | ||
RUN pip install --upgrade pip | ||
RUN pip install -r requirements.txt | ||
|
||
ADD . . |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
# Ads Incrementality DAP collector | ||
|
||
## Background | ||
|
||
Incrementality is a way to measure the effectiveness of our ads in a general, agreggated, privacy-preserving way -- | ||
without knowing anything about specific users. | ||
|
||
Incrementality works by dividing clients into various Nimbus experiment branches that vary how/whether an ad is shown. | ||
Separately, a [DAP](https://docs.divviup.org/) task is configured to store the metrics for each experiment branch in a | ||
different DAP bucket. | ||
|
||
Firefox is instrumented with [DAP telemetry functionality](https://github.com/mozilla-firefox/firefox/tree/main/toolkit/components/telemetry/dap), which allows it to send metrics and reports into the correct DAP buckets as configured in the experiment. | ||
|
||
Then this job can go out and collect metrics from DAP (using bucket info from the experiment's data), and write them | ||
to BQ. | ||
|
||
## Overview | ||
|
||
This job is driven by a config file from a GCS bucket. Inform the job of the config file location by passing the | ||
`gcp_project` and `gcs_config_bucket` parameters. See `example_config.json` for how to structure this file. | ||
|
||
The config file specifies the incrementality experiments that are currently running, some config and credentials from DAP, | ||
and where in BQ to write the incrementality results. | ||
|
||
The job will go out to Nimbus and read data for each of the experiments, then go out to DAP and read experiment branch results, | ||
then put it all together into results rows and write metrics to BQ. | ||
|
||
## Configuration | ||
|
||
The three recognized top-level keys here are `bq`, `dap`, and `nimbus` | ||
|
||
#### bq | ||
|
||
Everything the job needs to connect to BigQuery. | ||
|
||
- `project`: GCP project | ||
- `namespace`: BQ namespace for ads incrementality | ||
- `table`: BQ table where incrementality results go | ||
|
||
#### dap | ||
|
||
Everything the job needs to connect to DAP. | ||
|
||
- `auth_token`: Token defined in the collector credentials, used to authenticate to the leader | ||
- `hpke_private_key`: Private key defined in the collector credentials, used to decrypt shares from the leader | ||
and helper | ||
- `hpke_config`: base64 url-encoded version of public key defined in the collector credentials | ||
- `batch_start`: Start of the collection interval, as the number of seconds since the Unix epoch | ||
|
||
|
||
#### nimbus | ||
|
||
Everything the job needs to connect to Nimbus. | ||
|
||
- `api_url`: API URL for fetching the Nimbus experiment info | ||
- `experiments`: List of incrementality experiments configs | ||
|
||
##### experiment config list | ||
|
||
The experiments that the job should collect results for. | ||
|
||
- `slug`: Experiment slug | ||
- `batch_duration`: Optional. Duration of the collection batch interval, in seconds. | ||
This will default to 7 days if not specified | ||
|
||
## Usage | ||
|
||
This script is intended to be run in a docker container. | ||
|
||
It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it | ||
starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it | ||
also documents those variables. | ||
|
||
Once the environment variables are set up, run the job with: | ||
|
||
|
||
```sh | ||
./dev_run_docker.sh | ||
``` | ||
|
||
To just build the docker image, use: | ||
|
||
```sh | ||
docker build -t ads_incrementality_dap_collector . | ||
``` | ||
|
||
To run outside of docker, install dependencies with: | ||
|
||
```sh | ||
pip install -r requirements.txt | ||
``` | ||
|
||
Run the script with: | ||
|
||
```sh | ||
python3 -m python_template_job.main | ||
``` | ||
|
||
## Testing | ||
|
||
Run tests with: | ||
|
||
```sh | ||
python3 -m pytest | ||
``` | ||
|
||
## Linting and format | ||
|
||
`flake8` and `black` are included for code linting and formatting: | ||
|
||
```sh | ||
pytest --black --flake8 | ||
``` | ||
|
||
or | ||
|
||
```sh | ||
flake8 . | ||
``` | ||
|
||
or | ||
|
||
```sh | ||
black . | ||
``` | ||
|
||
or | ||
|
||
```sh | ||
black --diff . | ||
``` |
72 changes: 72 additions & 0 deletions
72
jobs/ads-incrementality-dap-collector/ads_incrementality_dap_collector/constants.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from datetime import datetime | ||
|
||
from google.cloud import bigquery | ||
|
||
DAP_LEADER = "https://dap-09-3.api.divviup.org" | ||
VDAF = "histogram" | ||
PROCESS_TIMEOUT = 1200 # 20 mins | ||
|
||
CONFIG_FILE_NAME = "config.json" # See example_config.json for the contents and structure of the job config file. | ||
LOG_FILE_NAME = f"{datetime.now()}-ads-incrementality-dap-collector.log" | ||
|
||
DEFAULT_BATCH_DURATION = 604800 | ||
|
||
COLLECTOR_RESULTS_SCHEMA = [ | ||
bigquery.SchemaField( | ||
"collection_start", | ||
"DATE", | ||
mode="REQUIRED", | ||
description="Start date of the collected time window, inclusive.", | ||
), | ||
bigquery.SchemaField( | ||
"collection_end", | ||
"DATE", | ||
mode="REQUIRED", | ||
description="End date of the collected time window, inclusive.", | ||
), | ||
bigquery.SchemaField( | ||
"country_codes", | ||
"JSON", | ||
mode="NULLABLE", | ||
description="List of 2-char country codes for the experiment", | ||
), | ||
bigquery.SchemaField( | ||
"experiment_slug", | ||
"STRING", | ||
mode="REQUIRED", | ||
description="Slug indicating the experiment.", | ||
), | ||
bigquery.SchemaField( | ||
"experiment_branch", | ||
"STRING", | ||
mode="REQUIRED", | ||
description="The experiment branch this data is associated with.", | ||
), | ||
bigquery.SchemaField( | ||
"advertiser", | ||
"STRING", | ||
mode="REQUIRED", | ||
description="Advertiser associated with this experiment.", | ||
), | ||
bigquery.SchemaField( | ||
"metric", | ||
"STRING", | ||
mode="REQUIRED", | ||
description="Metric collected for this experiment.", | ||
), | ||
bigquery.SchemaField( | ||
name="value", | ||
field_type="RECORD", | ||
mode="REQUIRED", | ||
fields=[ | ||
bigquery.SchemaField("count", "INT64", mode="NULLABLE"), | ||
bigquery.SchemaField("histogram", "JSON", mode="NULLABLE"), | ||
], | ||
), | ||
bigquery.SchemaField( | ||
"created_at", | ||
"TIMESTAMP", | ||
mode="REQUIRED", | ||
description="Timestamp for when this row was written.", | ||
), | ||
] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to
created_timestamp
?