import numpy as np, pandas as pd
from ijazz.RegionalFitter import RegionalFitter
import tensorflow as tf
from typing import Union, List, Dict
from pathlib import Path
import yaml, json
import argparse, sys
from ijazz.dtypes import floatzz, uintzz
from copy import deepcopy
from .sas_utils import (
to_correction_lib, json_safe_load, parameters_to_json, parameters_from_json,
create_pt_corrector, apply_corrlib_df_mmg, compute_syst_from_jsons
)
from .categorize import categorize_mmg
[docs]
def ijazz_sas_cmd():
"""
entry point for the ijazz_sas command
"""
parser = argparse.ArgumentParser(description=f'IJazZ Scale and Smearing fit')
parser.add_argument('config', type=str, help='yaml config file')
args = parser.parse_args(sys.argv[1:])
with open(args.config, 'r') as fcfg:
config = yaml.safe_load(fcfg)
ijazz_sas(config)
[docs]
def ijazz_sas(config:dict=None, do_fit=True, save_corrlib=True, correct_files=True):
"""
Perform the scale and smearing fit, plot the results and save the results.
The configuration file should contain the following sections:
- file_dt: data file (parquet format)
- file_mc: mc file (parquet format)
- dir_results: directory where to save the results
- fitter: configuration for the fitter
- sas: configuration for the scale and smearing fit
- minimizer: configuration for the minimizer
- syst: configuration for the systematic computation
- corrlib: add description and version number to the correctionlib file
- cat_latex: dictionary to convert the category name to latex
- scale_flat_syst: flat systematic for scale
- smear_flat_syst: flat systematic for smearing
- dset_name: name of the dataset eg ``2023``
- cset_name: name of the correction set eg ``EtaR9``
See an example `sas_config.yaml <https://gitlab.cern.ch/fcouderc/ijazz_2p0/-/blob/master/config/sas_config.yaml?ref_type=heads>`_
Args:
config (dict): configuration dictionary containing the parameters for the fit
do_fit: if True, perform the fit
save_corrlib: if True, save the correctionlib file
"""
files_dt = [config['file_dt']] if np.isscalar(config['file_dt']) else config['file_dt']
files_mc = [config['file_mc']] if np.isscalar(config['file_mc']) else config['file_mc']
dt = pd.concat([pd.read_parquet(afile, engine='pyarrow', dtype_backend='pyarrow').assign(ifile=ifile) for ifile, afile in enumerate(files_dt)]).reset_index(drop=True)
mc = pd.concat([pd.read_parquet(afile, engine='pyarrow', dtype_backend='pyarrow').assign(ifile=ifile) for ifile, afile in enumerate(files_mc)]).reset_index(drop=True)
dir_results = Path(config['dir_results'])
dir_results.mkdir(parents=True, exist_ok=True)
dset_name = config.get('dset_name', 'DSET')
cset_name = config.get('cset_name', 'CSET')
config['sas']['dump_json'] = dir_results / f'SAS{dset_name}_syst-Nominal.json'
if do_fit:
compute_sas(dt, mc, config)
json_file_out = dir_results / f'SAS{dset_name}_syst-Nominal.json'
nominal_json = str(json_file_out)
import ijazz.plotting as ijp
resp_range = config.get('resp_range', (0.98, 1.03))
reso_range = config.get('reso_range', (0., 0.05))
cat_latex = config.get('cat_latex', None)
figs, _ = ijp.plot_results_from_json(json_file_out, resp_range=resp_range, reso_range=reso_range,
cat_latex=cat_latex, leg_fontsize='small')
for ifig, fig in enumerate(figs):
print(f" --> Plot nominal : {json_file_out.with_suffix(f'.fig{ifig}.jpg')}")
fig.savefig(json_file_out.with_suffix(f'.fig{ifig}.jpg'))
if config.get('syst', False):
syst_jsons = []
scale_flat_syst = config.get('scale_flat_syst', 0)
smear_flat_syst = config.get('smear_flat_syst', 0)
ratio_resp_range = config.get('ratio_resp_range', (0.99, 1.01))
ratio_reso_range = config.get('ratio_reso_range', (0.5, 1.5))
config_ref = deepcopy(config)
for syst_name, syst_cfg in config['syst'].items():
print(f'---------------------------------------')
print(f'----- syst {syst_name}')
print(f'---------------------------------------')
print(syst_cfg)
config = deepcopy(config_ref)
for sub_sec, sub_cfg in syst_cfg.items():
for k, v in sub_cfg.items():
config[sub_sec][k] = v
config['sas']['correct_data'] = False
config['sas']['correct_mc'] = False
config['sas']['dump_json'] = dir_results / f'SAS{dset_name}_syst-{syst_name}.json'
if do_fit:
compute_sas(dt, mc, config)
sas_syst = json_safe_load(config['sas']['dump_json'])
syst_jsons.append(str(dir_results / f'SAS{dset_name}_syst-{syst_name}.json'))
#-- plot ratio of syst/nominal
figs, _ = ijp.plot_results_from_json({'nominal': nominal_json, 'syst': str(dir_results / f'SAS{dset_name}_syst-{syst_name}.json')},
resp_range=ratio_resp_range, reso_range=ratio_reso_range, cat_latex=cat_latex, jsons_mode='ratio', leg_fontsize='small')
for ifig, fig in enumerate(figs):
print(f" --> Plot {syst_name} : {dir_results}/SAS{dset_name}_syst-{syst_name}.fig{ifig}.jpg")
fig.savefig(dir_results / f'SAS{dset_name}_syst-{syst_name}.fig{ifig}.jpg')
# -- re-init config to its original value
config = deepcopy(config_ref)
# -- compute the systematic errors from the jsons
nominalWithSyst = compute_syst_from_jsons(nominal_json, syst_jsons, scale_flat_syst=scale_flat_syst, smear_flat_syst=smear_flat_syst)
json_file_out = dir_results / f'SAS{dset_name}_syst-NominalWithSyst.json'
parameters_to_json(nominalWithSyst, json_file_out)
#-- plot nominal result with adjusted syst
figs, _ = ijp.plot_results_from_json({'nominal':parameters_from_json(json_file_out), 'syst':parameters_from_json(json_file_out)}, jsons_mode='ratio',
resp_range=ratio_resp_range, reso_range=ratio_reso_range, cat_latex=cat_latex, leg_fontsize='small')
for ifig, fig in enumerate(figs):
print(f" --> Plot Nominal with Syst : {json_file_out.with_suffix(f'.fig{ifig}.jpg')}")
fig.savefig(json_file_out.with_suffix(f'.fig{ifig}.jpg'))
if config.get('corrlib', False) and save_corrlib:
vllg_name = config['sas'].get('name_vllg', 'v_mmg')
mll_name = config['sas'].get('name_mll', 'mass_mm')
mllg_name = config['sas'].get('name_mllg', 'mass')
pt_name = config['sas'].get('name_ptg', 'ptg')
corrlib_file, cset_vars = to_correction_lib(json_file_out, dir_results=dir_results, dset_name=dset_name, cset_name=cset_name,
cset_description=config['corrlib'].get('cset_description', None),
cset_version=config['corrlib'].get('cset_version', 1),
change_abs_var=config['corrlib'].get('change_abs_var', {}))
if config['corrlib'].get('smoothing', False):
create_pt_corrector(json_file_out, dir_results=dir_results, dset_name=dset_name, cset_name=cset_name,
cset_description=config['corrlib'].get('cset_description', None),
cset_version=config['corrlib'].get('cset_version', 1))
def save_to_parquet(is_mc):
correct = config['sas'].get('correct_mc', False) if is_mc else config['sas'].get('correct_data', False)
if not correct:
return
df = mc if is_mc else dt
syst_name = 'smear' if is_mc else 'scale'
corr_name = f'EGMSmearAndSyst_{cset_name}_{dset_name}' if is_mc else f'EGMScale_{cset_name}_{dset_name}'
print(f'Apply SAS correction (is_mc = {is_mc})')
apply_corrlib_df_mmg(df, cset_vars, corr_name, corrlib_file, syst_name,
vllg_name=vllg_name, mllg_name=mllg_name, mll_name=mll_name, ptg_name=pt_name)
df[f'{pt_name}_corr'] = df[pt_name]
vllg_syst_names = config['sas'].get('vllg_syst_names', [])
mllg_syst_names = config['sas'].get('mllg_syst_names', [])
mll_syst_names = config['sas'].get('mll_syst_names', [])
for vllg_syst, mllg_syst, mll_syst in zip(vllg_syst_names, mllg_syst_names, mll_syst_names):
print(f'Apply SAS systematic variation {vllg_syst}, {mllg_syst}, {mll_syst} (is_mc = {is_mc})')
try:
apply_corrlib_df_mmg(df, cset_vars, corr_name, corrlib_file, syst_name,
vllg_name=vllg_syst, mllg_name=mllg_syst,
mll_name=mll_syst, ptg_name=pt_name)
except KeyError:
pass
# apply_corrlib_df_mmg(df, cset_vars, corr_name, corrlib_file, syst_name,
# vllg_name=f'{vllg_name}_musystup',
# mllg_name=f'{mllg_name}_musystup', mll_name=f'{mll_name}_musystup', ptg_name=pt_name)
# apply_corrlib_df_mmg(df, cset_vars, corr_name, corrlib_file, syst_name,
# vllg_name=f'{vllg_name}_musystdown',
# mllg_name=f'{mllg_name}_musystdown', mll_name=f'{mll_name}_musystdown', ptg_name=pt_name)
df[pt_name] = df[f'{pt_name}_corr']
df.drop(columns=[f'{pt_name}_corr'], inplace=True)
cat_name = config['fitter'].get('name_cat', 'cat')
col_to_drop = [col for col in df.columns if col[:len(cat_name)]==cat_name] + ['ifile']
for ifile, afile in enumerate(files_mc if is_mc else files_dt):
print(afile)
df_fileout = Path(afile).with_suffix(f".{cset_name}Corr.parquet")
print(f'Saving corrected dataset to {df_fileout}')
df.query("ifile == @ifile").drop(columns=col_to_drop).to_parquet(df_fileout, engine='auto')
if correct_files:
save_to_parquet(is_mc=True)
save_to_parquet(is_mc=False)
[docs]
class IJazZMMG:
"""Scale-and-smearing fitter specialized for MMG inputs."""
def __init__(self, dt: pd.DataFrame, mc: pd.DataFrame, config: dict, categories: dict=None) -> None:
"""This class performs the scale and smearing fit for leptons in different categories.
The input dataframe should have as columns contain the categorisation variables.
The naming convention is "var1" (resp. "var2") for the value of the variable var for lepton 1 (resp. 2)
WARNING: If pt is used for categorisation the naming convention is "pt", so please rename if necessary
For categorisation with "pt" bining it is more reliable to do the fit based on a categorisation on the relative pt.
This will be automatically done unless turned off (use_r_pt=False).
Args:
dt (pd.DataFrame): dataframe containing the data events
mc (pd.DataFrame): dataframe containing the mc events
config (dict): dictionary containing the config of the RegionalFitter and its minimize function. An example yaml is given in the repertory configs/
categories (dict): dictionary for categorisation, e.g. {'pt': [25, 50, 100], 'abs_eta': [0, 1, 2]}. Defaults to None (can be specified in the config dict)
NB: the config dictionnary can be saved under a yaml file (prefered), opened and passed to the fitter.
It **MUST** have three section: "sas", "fitter", "minimizer"
"""
try:
config["sas"]
except KeyError:
raise KeyError('The sas section is not present in the configuration dict')
try:
config["fitter"]
except KeyError:
raise KeyError('The fitter section is not present in the configuration dict')
try:
config["minimizer"]
except KeyError:
raise KeyError('The minimizer section is not present in the configuration dict')
if categories is None:
# - this will crask if no categories section but this is expected.
categories = config['sas']['categories']
[docs]
self.categories_def = categories
[docs]
self.vllg_name = config['sas'].get('name_vllg', 'v_mmg')
[docs]
self.mll_name = config['sas'].get('name_mll', 'mass_mm')
[docs]
self.mllg_name = config['sas'].get('name_mllg', 'mass')
[docs]
self.pt_name = config['sas'].get('name_ptg', 'ptg')
# -- dirty trick: n the fitter config change the variable name to fit
self.cfg['fitter']['name_mll'] = self.vllg_name
[docs]
self.cat_name = config['fitter'].get('name_cat', 'cat')
cut = config['sas'].get('cut', None)
cut = '' if cut is None else cut
if categories.get(self.pt_name, None):
pt_bins = categories[self.pt_name]
cut_pt = f"{self.pt_name} >= {pt_bins[0]}" + " and " + f"{self.pt_name} < {pt_bins[-1]}"
cut = cut_pt if cut == "" else f'{cut} and {cut_pt}'
print(f"Applying selection: {cut}")
# -- if one of the categories is pt or et, categorize first vs pT/mee
[docs]
self.categories = categories
# can have different MC/data categories when recasting w/r to pT
[docs]
self.categories_mc = self.categories.copy()
[docs]
self.categories_dt = self.categories.copy()
for df in [dt, mc]:
self.lepton_reg = categorize_mmg(df, self.categories, cut=self.cut, prefix=self.cat_name)
[docs]
self.resp_corr = None # -- response after data scale correction
[docs]
self.reso_uncorr = None # -- resolution before data scale correction
[docs]
def fit(self, optimizer=tf.keras.optimizers.Adam) -> np.ndarray:
"""Actually performs the fit
Args:
optimizer (class): tensorflow compatible optimizer for instance tf.keras.optimizers.Adam (this is not the optimizer the the class)
learning_rate (float, optional): learning rate to pass to the optimizer. Defaults to 0.01.
dnll_tol (float, optional): delta likelihood tolerance to stop the fit. Defaults to 0.01.
max_epochs (int, optional): maximum number of iteration. Defaults to 1000.
init_rand (bool, optional): random starting poin. Defaults to False.
batch_size (int, optional): can perform the fit in batch size (usefull for large number of parameters). Defaults to -1.
device (str, optional): _description_. Defaults to 'CPU'.
batch_training (bool, optional): use the batch training mode. Defaults to False.
hess (str, optional): computation of the hession (None, 'numerical', or 'analytical'), it can take a while. Defaults to 'numerical'.
cats (_type_, optional): subset of categories to perform the fit. Defaults to slice(None).
Returns:
np.ndarray: array of the value of the -2 * log likelihood during the fit
"""
self.dt[f'{self.mllg_name}_raw'] = self.dt[self.mllg_name]
self.dt[f'{self.vllg_name}_raw'] = self.dt[self.vllg_name]
self.dt[f'{self.pt_name}_raw'] = self.dt[self.pt_name]
learning_rate = self.cfg['sas'].get('learning_rate', 1e-3)
n_fit = 1
stop_min = 999.
while stop_min > 0.001 and n_fit < 10:
if n_fit > 1:
categorize_mmg(self.dt, self.categories, cut='', prefix="cat_nocut")
self.correct_data(fitter.resp.numpy(), prefix='cat_nocut')
categorize_mmg(self.dt, self.categories, cut=self.cut)
print(f"======== FIT {n_fit}")
fitter = RegionalFitter(self.dt, self.mc, n_par=self.lepton_reg.size, **self.cfg['fitter'])
nlls = fitter.minimize(optimizer(learning_rate=learning_rate), **self.cfg['minimizer'])
stop_min = np.max(np.abs(fitter.resp.numpy() - 1))
print(fitter.resp.numpy())
if n_fit == 1:
self.resp = fitter.resp.numpy().astype(np.float32).copy()
else:
self.resp *= fitter.resp.numpy().astype(np.float32).copy()
n_fit += 1
self.reso = np.abs(fitter.reso.numpy().astype(np.float32).copy())
self.eresp = np.zeros(self.resp.shape, dtype=np.float32)
self.ereso = np.zeros(self.reso.shape, dtype=np.float32)
self.eresp_mc = np.ones(self.resp.shape, dtype=np.float32)*np.nan
self.ereso_mc = np.ones(self.reso.shape, dtype=np.float32)*np.nan
self.fitter = fitter # -- keep the fitter
self.dt[self.vllg_name] = self.dt[f'{self.vllg_name}_raw']
self.dt[self.mllg_name] = self.dt[f'{self.mllg_name}_raw']
self.dt[self.pt_name] = self.dt[f'{self.pt_name}_raw']
# remove temporary columns
# if not config['sas'].get('debug', False)
# col_to_remove = [col for col in self.dt.columns \
# if 'cat_nocut' in col or 'raw' in col or self.cat_name in col]
# self.dt.drop(columns=col_to_remove, inplace=True)
return nlls
[docs]
def calc_err_stat(self, force=False):
hess = self.cfg['sas'].get('hess', False)
if hess:
numerical = True if hess == 'numerical' else False
print(f"======== HESSIAN COMPUTATION: numerical {numerical}")
self.fitter.covariance(numerical=numerical, force=force, batch_size=-1)
self.eresp = self.fitter.eresp
self.ereso = self.fitter.ereso
else:
print("NB: HESSIAN computation was not requested")
[docs]
def calc_err_mc(self):
"""compute the error due to MC statistical fluctuations, this may take a while
"""
if self.cfg['sas'].get('err_mc', True):
print(f"======== MC UNCERTAINTY COMPUTATION")
self.fitter.calc_err_mc()
self.eresp_mc = self.fitter.eresp_mc.copy()
self.ereso_mc = self.fitter.ereso_mc.copy()
self.eresp = np.sqrt(self.eresp**2 + self.eresp_mc**2)
self.ereso = np.sqrt(self.ereso**2 + self.ereso_mc**2)
else:
print("NB: no MC uncertainties were required")
[docs]
def correct_data(self, response: np.ndarray, prefix:str=None):
"""correct the data dataframe after the fit
Args:
response (np.ndarray): array with the responses to apply for correction
prefix (str, optional): prefix used for naming the categorisation. Defaults to None.
"""
if prefix is None:
prefix = self.cat_name
idx = self.dt[f'{prefix}1'] >= 0
resp = response[self.dt.loc[idx, f'{prefix}1']]
self.dt.loc[idx, f'{self.pt_name}'] /= resp
self.dt.loc[idx, self.mllg_name] = self.dt.loc[idx].eval(f'sqrt(({self.mllg_name}**2-{self.mll_name}**2) / @resp + {self.mll_name}**2)')
self.dt.loc[idx, self.vllg_name] = self.dt.loc[idx].eval(f'2*({self.mllg_name}/91.2-1)/(1-{self.mll_name}**2/{self.mllg_name}**2)')
[docs]
def dump_json(self, outfile: Union[Path, str]=None):
"""Dump the resulst to a json file
Args:
outfile (Union[Path, str], optional): output file name. Defaults to None.
"""
if outfile is None:
return
parameters_to_json({"categories": self.categories, "resp": self.resp, "reso": self.reso,
"eresp": self.eresp, "ereso": self.ereso,
"eresp_mc": self.eresp_mc, "ereso_mc": self.ereso_mc, "pt_mean": self.pt_mean}, outfile)
[docs]
def compute_sas(dt, mc, cfg, categories=None) -> IJazZMMG:
"""Wrapper to compute the scale and smearing parameters
Args:
dt (pd.DataFrame): input dataframe for data
mc (pd.DataFrame): input dataframe for mc
cfg (Union[str, Path]): config file to control the sas fit
categories (dict, optional): categories to do the fit (can be specified in config). Defaults to None.
Returns:
IJazZSAS: return the ScaleAndSmearing fitter
"""
sas = IJazZMMG(dt, mc, config=cfg, categories=categories)
sas.fit()
sas.calc_err_stat()
sas.calc_err_mc()
sas.dump_json(cfg['sas'].get('dump_json', None))
return sas