Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 127 additions & 64 deletions atlas/analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
"metadata": {},
"outputs": [],
"source": [
"import collections\n",
"import dataclasses\n",
"import gzip\n",
"import json\n",
"import math\n",
"import pathlib\n",
"import re\n",
"import time\n",
"\n",
"import awkward as ak\n",
"import cloudpickle\n",
"import dask\n",
"import dask.bag\n",
"import hist\n",
"import matplotlib.pyplot as plt\n",
"import mplhep\n",
Expand Down Expand Up @@ -75,7 +80,7 @@
"# construct fileset\n",
"fileset = {}\n",
"input_size_GB = 0\n",
"for containers_for_category in dataset_info.values():\n",
"for category, containers_for_category in dataset_info.items():\n",
" for container, metadata in containers_for_category.items():\n",
" if metadata[\"files_output\"] is None:\n",
" # print(f\"skipping missing {container}\")\n",
Expand All @@ -84,12 +89,13 @@
" dsid, _, campaign = utils.dsid_rtag_campaign(container)\n",
"\n",
" # debugging shortcuts, use one or both of the following to reduce workload\n",
" if campaign not in [\"mc23a\", \"data22\"]: continue\n",
" # if \"601229\" not in dsid: continue\n",
" if campaign not in [\"mc23a\", \"mc23d\", \"data22\", \"data23\"]: continue\n",
" if campaign not in [\"mc23a\"]: continue\n",
" if \"601229\" not in dsid: continue\n",
"\n",
" weight_xs = utils.sample_xs(campaign, dsid)\n",
" lumi = utils.integrated_luminosity(campaign)\n",
" fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n",
" fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"category\": category, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n",
" input_size_GB += metadata[\"size_output_GB\"]\n",
"\n",
"print(f\"fileset has {len(fileset)} categories with {sum([len(f[\"files\"]) for f in fileset.values()])} files total, size is {input_size_GB:.2f} GB\")\n",
Expand All @@ -114,11 +120,13 @@
"metadata": {},
"outputs": [],
"source": [
"access_log = []\n",
"events = NanoEventsFactory.from_root(\n",
" {list(fileset[list(fileset.keys())[0]][\"files\"])[0]: \"reco\"},\n",
" mode=\"virtual\",\n",
" schemaclass=NtupleSchema,\n",
" entry_stop=1000\n",
" entry_stop=1000,\n",
" access_log=access_log\n",
").events()\n",
"\n",
"h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").StrCat([], name=\"variation\", growth=True).Weight()\n",
Expand Down Expand Up @@ -156,14 +164,39 @@
" # executor = processor.IterativeExecutor(), # to run locally\n",
" schema=NtupleSchema,\n",
" savemetrics=True,\n",
" chunksize=50_000,\n",
" chunksize=100_000,\n",
" skipbadfiles=True,\n",
" align_clusters=False,\n",
" # maxchunks=1 # for debugging only\n",
")\n",
"\n",
"\n",
"def extract_sumw(f):\n",
" \"\"\"read initial sum of weights, custom function to be injected during pre-processing\"\"\"\n",
" matching_histograms = f.keys(filter_name=\"CutBookkeeper*NOSYS\")\n",
" if len(matching_histograms):\n",
" sumw = float(f[matching_histograms[0]].values()[1])\n",
" else:\n",
" sumw = 0 # for data\n",
" return {\"sumw\": sumw}\n",
"\n",
"\n",
"with performance_report(filename=\"preprocess.html\"):\n",
" preprocess_output = run.preprocess(fileset)\n",
" # custom pre-processing: like coffea, but with more metadata\n",
" t0 = time.perf_counter()\n",
" preprocess_output = utils.custom_preprocess(fileset, client=client, chunksize=run.chunksize, custom_func=extract_sumw)\n",
" t1 = time.perf_counter()\n",
"\n",
"\n",
"# calculate dataset-aggregated sumw (without chunk duplication) and update preprocessing metadata accordingly\n",
"sumw_dict = collections.defaultdict(dict)\n",
"for chunk in preprocess_output:\n",
" sumw_dict[chunk.dataset][chunk.filename] = chunk.usermeta[\"sumw\"]\n",
"\n",
"for chunk in preprocess_output:\n",
" # for data all sumw entries are 0, set the total to 1 manually\n",
" chunk.usermeta.update({\"sumw_dataset\": sum(sumw_dict[chunk.dataset].values()) or 1})\n",
"\n",
"\n",
"# write to disk\n",
"with open(\"preprocess_output.json\", \"w\") as f:\n",
Expand All @@ -173,7 +206,11 @@
"with open(\"preprocess_output.json\") as f:\n",
" preprocess_output = utils.json_to_preprocess(json.load(f))\n",
"\n",
"len(preprocess_output), preprocess_output[:3]"
"# visualize task stream\n",
"ts = client.get_task_stream(start=f\"{math.ceil(time.perf_counter()-t0)}s\")\n",
"_ = utils.plot_taskstream(ts)\n",
"\n",
"print(f\"generated list of {len(preprocess_output)} work items in {t1-t0:.1f} sec:\\n{preprocess_output[:3]}\")"
]
},
{
Expand All @@ -187,66 +224,93 @@
{
"cell_type": "code",
"execution_count": null,
"id": "dc78d57d-4fe3-4e11-ab4c-0f0afe60ca32",
"id": "a325d073-d017-4443-a539-b64d9b8feeb4",
"metadata": {},
"outputs": [],
"source": [
"class Analysis(processor.ProcessorABC):\n",
" def __init__(self):\n",
" self.h = hist.new.Regular(20, 0, 1_000, label=\"$m_{jj}$ [GeV]\").\\\n",
" StrCat([], name=\"dsid_and_campaign\", growth=True).\\\n",
" StrCat([], name=\"category\", growth=True).\\\n",
" StrCat([], name=\"variation\", growth=True).\\\n",
" Weight()\n",
"\n",
" def process(self, events):\n",
" f = uproot.open(events.metadata[\"filename\"])\n",
"\n",
" # this should match existing pre-determined metadata\n",
" # sim_type, mc_campaign, dsid, etag = f[\"metadata\"].axes[0]\n",
" # assert mc_campaign == events.metadata[\"campaign\"]\n",
" # assert dsid == events.metadata[\"dsid\"]\n",
"\n",
" # ensure systematics in schema and in histogram match\n",
" # systematics_from_hist = list(f[\"listOfSystematics\"].axes[0])\n",
" # assert sorted(systematics_from_hist) == sorted(events.systematic_names)\n",
"\n",
" # categorize events by DSID and campaign with a single histogram axis\n",
" dsid_and_campaign = f\"{events.metadata[\"dsid\"]}_{events.metadata[\"campaign\"]}\"\n",
"\n",
" if events.metadata[\"dsid\"] != \"data\":\n",
" sumw = float(f[f.keys(filter_name=\"CutBookkeeper*NOSYS\")[0]].values()[1]) # initial sum of weights\n",
" else:\n",
" sumw = None # no normalization for data\n",
"\n",
" sumw = events.metadata[\"sumw_dataset\"]\n",
" for variation in events.systematic_names:\n",
" if variation not in [\"NOSYS\"] + [name for name in events.systematic_names if \"JET_JER_Effective\" in name]:\n",
" continue\n",
"\n",
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
" # TODO: remaining weights\n",
" weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"]\n",
" weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"] / sumw\n",
" mjj = (events[variation][cut==1].jet[:, 0] + events[variation][cut==1].jet[:, 1]).mass\n",
" self.h.fill(mjj / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n",
" self.h.fill(mjj / 1_000, category=events.metadata[\"category\"], variation=variation, weight=weight)\n",
"\n",
" return {\n",
" \"hist\": self.h,\n",
" \"meta\": {\n",
" \"sumw\": {(events.metadata[\"dsid\"], events.metadata[\"campaign\"]): {(events.metadata[\"fileuuid\"], sumw)}}} # sumw in a set to avoid summing multiple times per file\n",
" }\n",
" return {\"hist\": self.h, \"meta\": {}}\n",
"\n",
" def postprocess(self, accumulator):\n",
" # normalize histograms\n",
" # https://topcptoolkit.docs.cern.ch/latest/starting/running_local/#sum-of-weights\n",
" for dsid_and_campaign in accumulator[\"hist\"].axes[1]:\n",
" dsid, campaign = dsid_and_campaign.split(\"_\")\n",
" if dsid == \"data\":\n",
" continue # no normalization for data by total number of weighted events\n",
" norm = 1 / sum([sumw for uuid, sumw in accumulator[\"meta\"][\"sumw\"][(dsid, campaign)]])\n",
" count_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].values()*norm\n",
" variance_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].variances()*norm**2\n",
" accumulator[\"hist\"][:, dsid_and_campaign, :] = np.stack([count_normalized, variance_normalized], axis=-1)\n",
"\n",
" pass\n",
"\n",
"\n",
"def custom_process(workitems, processor_class, preload: list=[]):\n",
"\n",
" def run_analysis(wi: processor.executor.WorkItem):\n",
" analysis_instance = processor_class()\n",
" f = uproot.open(wi.filename)\n",
" events = NanoEventsFactory.from_root(\n",
" f,\n",
" treepath=wi.treename,\n",
" mode=\"virtual\",\n",
" access_log=(access_log := []),\n",
" preload=lambda b: b.name in preload,\n",
" schemaclass=NtupleSchema,\n",
" entry_start=wi.entrystart,\n",
" entry_stop=wi.entrystop,\n",
" ).events()\n",
" events.metadata.update(wi.usermeta)\n",
" out = analysis_instance.process(events)\n",
" bytesread = f.file.source.num_requested_bytes\n",
" report = {\"bytesread\": bytesread, \"columns\": access_log, \"bytesread_per_chunk\": {(wi.filename, wi.entrystart, wi.entrystop): bytesread}}\n",
" return out, report\n",
"\n",
" def sum_output(a, b):\n",
" return (\n",
" {\"hist\": a[0][\"hist\"] + b[0][\"hist\"]},\n",
" {\n",
" \"bytesread\": a[1][\"bytesread\"] + b[1][\"bytesread\"],\n",
" \"columns\": list(set(a[1][\"columns\"]) | set(b[1][\"columns\"])),\n",
" \"bytesread_per_chunk\": a[1][\"bytesread_per_chunk\"] | b[1][\"bytesread_per_chunk\"],\n",
" }\n",
" )\n",
"\n",
" workitems_bag = dask.bag.from_sequence(workitems, partition_size=1)\n",
" futures = workitems_bag.map(run_analysis).fold(sum_output)\n",
" return client.compute(futures).result()\n",
"\n",
"columns_to_preload = json.load(pathlib.Path(\"columns_to_preload.json\").open())[\"JET_JER_Effective\"]\n",
"columns_to_preload = []\n",
"\n",
"t0 = time.perf_counter()\n",
"with performance_report(filename=\"process_custom.html\"):\n",
" out, report = custom_process(preprocess_output[:1], processor_class=Analysis, preload=columns_to_preload)\n",
"t1 = time.perf_counter()\n",
"\n",
"print(f\"walltime: {t1 - t0:.2f} sec ({(t1 - t0) / 60:.2f} min)\")\n",
"print(f\"{report[\"bytesread\"] / 1_000**3:.2f} GB read\")\n",
"print(f\"preloaded columns: {len(columns_to_preload)}, {columns_to_preload} {\"etc.\" if len(columns_to_preload) > 4 else \"\"}\")\n",
"print(f\"preloaded but unused columns: {len([c for c in columns_to_preload if c not in report[\"columns\"]])}\")\n",
"print(f\"used but not preloaded columns: {len([c for c in report[\"columns\"] if c not in columns_to_preload])}\")\n",
"# out"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc78d57d-4fe3-4e11-ab4c-0f0afe60ca32",
"metadata": {},
"outputs": [],
"source": [
"client.run_on_scheduler(utils.start_tracking) # track worker count on scheduler\n",
"t0 = time.perf_counter() # track walltime\n",
"\n",
Expand All @@ -257,7 +321,7 @@
"worker_count_dict = client.run_on_scheduler(utils.stop_tracking) # stop tracking, read out data, get average\n",
"nworker_avg = utils.get_avg_num_workers(worker_count_dict)\n",
"\n",
"print(f\"histogram size: {out[\"hist\"].view(True).nbytes / 1_000 / 1_000:.2f} GB\\n\")\n",
"print(f\"histogram size: {out[\"hist\"].view(True).nbytes / 1_000 / 1_000:.2f} MB\\n\")\n",
"\n",
"# shortened version of report, dropping extra columns\n",
"dict((k, v) for k, v in report.items() if k != \"columns\") | ({\"columns\": report[\"columns\"][0:10] + [\"...\"]})"
Expand Down Expand Up @@ -299,7 +363,11 @@
"print(f\"fraction of time spent in processing: {report[\"processtime\"] / ((t1 - t0) * nworker_avg):.1%}\")\n",
"print(f\"average process task length: {report[\"processtime\"] / report[\"chunks\"]:.1f} sec\")\n",
"\n",
"_ = utils.plot_worker_count(worker_count_dict)"
"_ = utils.plot_worker_count(worker_count_dict)\n",
"\n",
"# visualize task stream\n",
"ts = client.get_task_stream(start=f\"{math.ceil(time.perf_counter()-t0)}s\")\n",
"_ = utils.plot_taskstream(ts)"
]
},
{
Expand All @@ -311,24 +379,19 @@
"source": [
"mc_stack = []\n",
"labels = []\n",
"for key in dataset_info:\n",
" dsids = []\n",
" for container in dataset_info[key]:\n",
" dsids.append(container.split(\".\")[1])\n",
"\n",
" dsids = sorted(set(dsids))\n",
" dsids_in_hist = [dc for dc in out[\"hist\"].axes[1] if dc.split(\"_\")[0] in dsids]\n",
" # print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n",
"\n",
" if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"] or len(dsids_in_hist) == 0:\n",
" continue # data drawn separately, skip MC modeling variations and skip empty categories\n",
"for key in dataset_info.keys():\n",
" if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"]:\n",
" continue # data drawn separately, skip MC modeling variations\n",
"\n",
" mc_stack.append(out[\"hist\"][:, :, \"NOSYS\"].integrate(\"dsid_and_campaign\", dsids_in_hist))\n",
" labels.append(key)\n",
" if key in out[\"hist\"].axes[1]:\n",
" mc_stack.append(out[\"hist\"][:, key, \"NOSYS\"])\n",
" labels.append(key)\n",
" else:\n",
" print(f\"missing {key}\")\n",
"\n",
"try:\n",
" data_hist = out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"]\n",
"except ValueError:\n",
" data_hist = out[\"hist\"][:, \"data\", \"NOSYS\"]\n",
"except KeyError:\n",
" print(\"falling back to plotting first entry of categorical axes as \\\"data\\\"\")\n",
" data_hist = out[\"hist\"][:, 0, 0]\n",
"\n",
Expand Down
Loading