import numpy as np
import tensorflow as tf
from pathlib import Path
import json, yaml
import pandas as pd
from typing import Dict, Union, List, Tuple
import matplotlib.pyplot as plt
import cms_fstyle as cms
from cms_fstyle.fitter import FFitter
from copy import deepcopy
import gzip
import correctionlib.schemav2 as cs
import rich
[docs]
def ijazz_shape(categories: dict) -> List:
"""Returns the shape to be used for reshaping the tensor of parameters of the ijazz fit
Args:
categories (dict): categories used in the ijazz sas fit
Returns:
list: shape
"""
return [len(bins)-1 for bins in categories.values()]
[docs]
def parameters_from_json(json_file: Union[str, Path, Dict]) -> Dict:
"""Extract and re-shape the IJazZ sas results saved in a json file
Args:
json_file (Union[str, Path, Dict]): name of the json file where the fit results are stored
Returns:
dict: dictionnary with the categories used in the fit and the results of the fit
"""
if isinstance(json_file, Dict):
ijazz_json = json_file.copy()
else:
with open(json_file) as f_json:
ijazz_json = json.load(f_json)
categories = ijazz_json['categories']
shape = ijazz_shape(ijazz_json['categories'])
par_names = ['resp', 'reso', 'eresp', 'ereso', 'eresp_mc', 'ereso_mc'] \
+(['reso_scale', 'ereso_scale','ereso_scale_mc', 'mu', 'emu', 'emu_mc', 'frac', 'efrac', 'efrac_mc'] if 'reso_scale' in ijazz_json.keys() else []) \
+(['mean2G', 'std2G'] if 'mean2G' in ijazz_json.keys() else [])
parameters = {par_name: np.array(ijazz_json.get(par_name, np.ones(shape)*np.nan)).reshape(shape) for par_name in par_names}
parameters['eresp'] = np.where(np.isnan(parameters['eresp']), np.inf, parameters['eresp'])
parameters['ereso'] = np.where(np.isnan(parameters['ereso']), np.inf, parameters['ereso'])
parameters = {k: tf.convert_to_tensor(v, dtype=tf.float32) for k, v in parameters.items()}
parameters['bins'] = [np.array(bin) for bin in categories.values()]
parameters['bins'] = [np.where(bin <= -9999, -np.inf, bin) for bin in parameters['bins']]
parameters['bins'] = [np.where(bin >= +9999, +np.inf, bin) for bin in parameters['bins']]
parameters['bins'] = [tf.convert_to_tensor(bin, dtype=tf.float32) for bin in parameters['bins'] ]
parameters['dims'] = np.array(list(categories.keys()), dtype=str)
parameters['pt'] = np.array(ijazz_json.get('pt_mean', None))
return parameters
[docs]
def parameters_to_json(params:Dict, outfile: Union[str, Path]) -> None:
"""Dump the parameters dictionnary to a json file
Args:
params (Dict): input dictionnary
outfile (Union[str, Path]): output file
"""
print(f'Dump results to json file: {outfile}')
categories = params['categories']
categories_serialized = {}
for k, v in categories.items():
v = np.where(np.array(v) == +np.inf, +9999, np.array(v))
v = np.where(np.array(v) == -np.inf, -9999, np.array(v))
categories_serialized[k] = v.tolist()
categories_serialized
with open(outfile, "w") as fjson:
eresp = np.where(np.isnan(np.array(params['eresp'])), 0, np.array(params['eresp']))
ereso = np.where(np.isnan(np.array(params['ereso'])), 0, np.array(params['ereso']))
eresp_mc = np.where(np.isnan(np.array(params['eresp_mc'])), 0, np.array(params['eresp_mc']))
ereso_mc = np.where(np.isnan(np.array(params['ereso_mc'])), 0, np.array(params['ereso_mc']))
pt_mean = None if params.get('pt_mean', None) is None else params['pt_mean'].tolist()
if "reso_scale" in params.keys():
emu = np.where(np.isnan(np.array(params['emu'])), 0, np.array(params['emu']))
ereso_scale = np.where(np.isnan(np.array(params['ereso_scale'])), 0, np.array(params['ereso_scale']))
efrac = np.where(np.isnan(np.array(params['efrac'])), 0, np.array(params['efrac']))
emu_mc = np.where(np.isnan(np.array(params['emu_mc'])), 0, np.array(params['emu_mc']))
ereso_scale_mc = np.where(np.isnan(np.array(params['ereso_scale_mc'])), 0, np.array(params['ereso_scale_mc']))
efrac_mc = np.where(np.isnan(np.array(params['efrac_mc'])), 0, np.array(params['efrac_mc']))
json.dump({"categories": categories_serialized,
"resp": np.array(params['resp']).tolist(), "reso": np.array(params['reso']).tolist(),
"reso_scale": np.array(params['reso_scale']).tolist(), "mu": np.array(params['mu']).tolist(), "frac" : np.array(params['frac']).tolist(),
"eresp": eresp.tolist(), "ereso": ereso.tolist(),
"ereso_scale": ereso_scale.tolist(), "emu": emu.tolist(), "efrac" : efrac.tolist(),
"eresp_mc": eresp_mc.tolist(), "ereso_mc": ereso_mc.tolist(),
"ereso_scale_mc": ereso_scale_mc.tolist(), "emu_mc": emu_mc.tolist(), "efrac_mc" : efrac_mc.tolist(),
"pt_mean": pt_mean} | ({"mean2G": np.array(params['mean2G']).tolist(), "std2G": np.array(params['std2G']).tolist()} if "mean2G" in params.keys() else {}),
fjson, indent=4)
else:
json.dump({"categories": categories_serialized,
"resp": np.array(params['resp']).tolist(), "reso": np.array(params['reso']).tolist(),
"eresp": eresp.tolist(), "ereso": ereso.tolist(),
"eresp_mc": eresp_mc.tolist(), "ereso_mc": ereso_mc.tolist(),
"pt_mean": pt_mean},
fjson, indent=4)
[docs]
def json_safe_load(json_name, mc_errors=False):
"""Load an IJazZ JSON file and convert arrays to numpy.
Args:
json_name (str | Path): path to the JSON file.
mc_errors (bool): if True, also load MC error arrays.
Returns:
dict: parsed JSON with numpy arrays for numeric fields.
"""
with open(json_name, 'r') as fjson:
sas = json.load(fjson)
for k in ['resp', 'reso', 'eresp', 'ereso', 'pt_mean'] + \
(['eresp_mc', 'ereso_mc'] if mc_errors else []) + \
(['reso_scale', 'ereso_scale', 'mu', 'emu', 'frac', 'efrac'] if 'reso_scale' in sas.keys() else []) + \
(['ereso_scale_mc', 'emu_mc', 'efrac_mc'] if ('reso_scale' in sas.keys()) and mc_errors else []):
sas[k] = np.array(sas[k])
return sas
[docs]
def to_correction_lib(sas: Union[Dict, str, Path], dir_results:Union[Path, str]='./tmp/', dset_name='DSET',
cset_name='CSET', cset_description:str=None, cset_version=1, change_abs_var:Dict[str, str]={},
cat_latex=None, dry_run=False) -> Tuple[str, List[str]]:
"""Create a correction lib files based
Args:
sas (Dict, ): dictionnary with scales and smearings
dir_results (Path, optional): directory. Defaults to Path('./tmp/').
dset_name (str, optional): dataset nickname. Defaults to 'DSET'.
cset_name (str, optional): correction saet name. Defaults to 'CSET'.
cset_description (str, optional): description of the correction set. Defaults to None.
cset_version (int, optional): correction set version. Defaults to 1.
cat_latex (Dict, optional): dictionnary with a description of variables in categories. Defaults to None.
"""
if not isinstance(sas, Dict):
sas = json_safe_load(sas)
dir_results = Path(dir_results)
print(f'---------------------------------------')
print(f'---- Create correction lib')
print(f'---------------------------------------')
if cat_latex is None:
cat_latex = {key: key for key in sas['categories']}
def create_correction(cset_name, out_name, measures, change_abs_var={}):
def create_bin_corr(s_name):
# flow = cs.Binning(['error'] + ['clamp'] * len(sas['categories']))
# flow = {'syst': 'error'} | {k: 'clamp' for k in sas['categories'].keys()}
flow = 'clamp'
# change abs variables into non absolute variables to avoid having AbsScEta and ScEta as final inputs
if any(k in sas['categories'].keys() for k in change_abs_var):
inputs = list(sas['categories'].keys())
edges = list(sas['categories'].values())
content = measures[s_name]
n_cat_bins = [len(bin)-1 for bin in edges]
content = content.reshape(*n_cat_bins)
for i, var in enumerate(inputs):
if var in change_abs_var:
inputs[i] = change_abs_var[var]
e = np.asarray(edges[i])
# Mirror positive abs-binning into signed binning, without duplicating 0
if np.isclose(e[0], 0):
neg = -e[:0:-1] # skip the first 0 edge
edges[i] = list(neg) + list(e)
else:
neg = -e[::-1]
edges[i] = list(neg) + list(e)
flipped = np.flip(content, axis=i)
content = np.concatenate((flipped, content), axis=i)
return cs.MultiBinning( nodetype='multibinning',
inputs=inputs,
edges=edges,
content=content.flatten(),
flow=flow)
else:
return cs.MultiBinning( nodetype='multibinning',
inputs=list(sas['categories'].keys()),
edges=list(sas['categories'].values()),
content=measures[s_name],
flow=flow)
return cs.Correction(name = cset_name,
version = cset_version,
description=cset_description,
inputs = [cs.Variable(name="syst", type="string", description="Systematic type")] +
[cs.Variable(name=change_abs_var[dim] if dim in change_abs_var else dim, type='real', description=change_abs_var[dim] if dim in change_abs_var else dim) for dim in sas['categories']],
output = cs.Variable(name=out_name, type="real", description=f"EM {out_name} value"),
data = cs.Category(nodetype='category',
input='syst',
content=[cs.CategoryItem(key=k,value=create_bin_corr(k)) for k in measures.keys()]
)
)
esr_rel = sas['eresp']/sas['resp']
scales = {'scale': 1./sas['resp'], 'escale': esr_rel/sas['resp'], 'scale_up': 1./sas['resp']*(1 + esr_rel), 'scale_down': 1./sas['resp']*(1-esr_rel)}
smears = {'smear': sas['reso'], 'esmear': sas['ereso'], 'smear_up': sas['reso']+sas['ereso'], 'smear_down': np.maximum(0, sas['reso']-sas['ereso']),
'escale': esr_rel, 'scale_up': 1 + esr_rel, 'scale_down': 1 - esr_rel }
if 'reso_scale' in sas.keys():
smears |= {'reso_scale': sas['reso_scale'], 'ereso_scale': sas['ereso_scale'], 'reso_scale_up': sas['reso_scale']+sas['ereso_scale'], 'reso_scale_down': sas['reso_scale']-sas['ereso_scale'],
'mu': sas['mu'], 'emu': sas['emu'], "mu_up": sas['mu']+sas['emu'], "mu_down": sas['mu']-sas['emu'],
'frac': sas['frac'],'efrac': sas['efrac'], 'frac_up': sas['frac']+sas['efrac'], 'frac_down': sas['frac']-sas['efrac']}
c_scale = create_correction(f'EGMScale_{cset_name}_{dset_name}' , 'scale', scales, change_abs_var=change_abs_var) # only scales to apply to data
c_smear = create_correction(f'EGMSmearAndSyst_{cset_name}_{dset_name}', 'smear', smears, change_abs_var=change_abs_var) # MC parameters
rich.print(c_scale)
rich.print(c_smear)
cset = cs.CorrectionSet(schema_version=2,
description=cset_description,
corrections=[c_scale, c_smear]
)
dir_results.mkdir(exist_ok=True)
file_out = dir_results / f"EGMScalesSmearing_{dset_name}.v{cset_version}.json"
if not dry_run:
print(f"Writing out correction lib file: {file_out}.gz")
with gzip.open(f"{file_out}.gz", "wt") as fout:
fout.write(cset.model_dump_json(indent=1,exclude_unset=True))
return f"{file_out}.gz", list(sas['categories'].keys()),
[docs]
def compute_syst_from_jsons(nominal:Union[Dict, str, Path], syst_jsons:List[Union[Dict, str, Path]], fit_2g:Union[Dict, str, Path]=None,
scale_flat_syst:float=0.0, smear_flat_syst:float=0.0) -> Tuple[str, List[str]]:
"""Compute the systematic uncertainties from the json files
Args:
nominal (Union[Dict, str, Path]): nominal json file
syst_jsons (List[Union[Dict, str, Path]]): list of json files with systematic variations
fit_2g (Union[Dict, str, Path], optional): json file with the parameters of the double gaussian fit. Defaults to None.
scale_flat_syst (float, optional): flat scale systematic uncertainty. Defaults to 0.0.
smear_flat_syst (float, optional): flat smear systematic uncertainty. Defaults to 0.0.
Returns:
"""
if not isinstance(nominal, Dict):
nominal = json_safe_load(nominal, mc_errors=True)
else:
raise ValueError('Missing nominal json file')
eresp = nominal['eresp']**2
ereso = nominal['ereso']**2
eresp += scale_flat_syst**2
ereso += smear_flat_syst**2
for syst in syst_jsons:
if not isinstance(syst, Dict):
syst = json_safe_load(syst)
eresp += (syst['resp']-nominal['resp'])**2
ereso += (syst['reso']-nominal['reso'])**2
if fit_2g:
if not isinstance(fit_2g, Dict):
fit_2g = json_safe_load(fit_2g)
import copy
nominal_eresp = copy.deepcopy(eresp)
nominal_ereso = copy.deepcopy(ereso)
r = fit_2g['resp']
s1 = fit_2g['reso']
rs = fit_2g['reso_scale']
s2 = s1 * rs
mu = fit_2g['mu']
f = fit_2g['frac']
fit_2g['mean2G'] = (f + (1-f)*mu)*r
fit_2g['std2G'] = np.sqrt(f*s1**2 + (1-f)*(mu**2)*(s2**2) + f*(1-f)*((mu-1)**2)) * r
eresp += (nominal['resp'] - fit_2g['mean2G'])**2
ereso += (nominal['reso'] - fit_2g['std2G'])**2
fit_2g['eresp'] = np.sqrt(nominal_eresp)
fit_2g['ereso'] = np.sqrt(nominal_ereso)
fit_2g['eresp_mc'] = nominal['eresp_mc']
fit_2g['ereso_mc'] = nominal['ereso_mc']
nominal['eresp'] = np.sqrt(eresp)
nominal['ereso'] = np.sqrt(ereso)
if fit_2g:
return nominal, fit_2g
else:
return nominal
[docs]
def apply_corrlib_df(df: pd.DataFrame, cset_vars:List[str], cset_name:str, cset_file:str, syst_name:str, mll_name='mass', ptl_name='pt'):
"""Apply the correction lib to a pandas dataframe
Args:
df (pd.DataFrame): dataframe with the variables to correct
cset_vars (List[str]): list of the variables used in the correction
cset_name (str): name of the correction set
cset_file (str): name of the correction file
syst_name (str): type of correction to apply: scale or smear
mll_name (str, optional): dilepton mass variable name. Defaults to 'mass'.
ptl_name (str, optional): lepton tranverse momentum variable name. Defaults to 'pt'.
"""
import correctionlib
cset = correctionlib.CorrectionSet.from_file(cset_file)
try:
corr = cset[cset_name]
except IndexError:
corr = cset.compound[cset_name]
lep_names = [f'{name}1' for name in cset_vars]
lep_names = [name.split('1')[0] for name in df.columns.intersection(lep_names)]
for i_ele in [1, 2]:
lep_vars = [df[f'{var}{i_ele}' if var in lep_names else var] for var in cset_vars]
scales = corr.evaluate(*([syst_name] + lep_vars))
if syst_name == 'smear':
try:
scales *= corr.evaluate(*(['stdnormal'] + lep_vars))
print('Smearing using the included random number generator')
except:
scales = np.random.normal(1, scales)
print('Smearing using the numpy random')
df[mll_name] *= np.sqrt(scales)
df[f'{ptl_name}{i_ele}'] *= scales
[docs]
def apply_corrlib_df_mmg(df: pd.DataFrame, cset_vars:List[str], cset_name:str, cset_file:str, syst_name:str,
vllg_name='v_mmg', mllg_name='mass', mll_name='mass_mm', ptg_name='ptg'):
"""Apply the correction lib to a pandas dataframe
Args:
df (pd.DataFrame): dataframe with the variables to correct
cset_vars (List[str]): list of the variables used in the correction
cset_name (str): name of the correction set
cset_file (str): name of the correction file
syst_name (str): type of correction to apply: scale or smear
vllg_name (str, optional): Zmmg visible mass variable name. Defaults to 'v_mmg'.
mllg_name (str, optional): Zmmg mass variable name. Defaults to 'mass'.
mll_name (str, optional): dilepton mass variable name. Defaults to 'mass_mm'.
ptg_name (str, optional): lepton tranverse momentum variable name. Defaults to 'ptg'.
"""
import correctionlib
cset = correctionlib.CorrectionSet.from_file(cset_file)
try:
corr = cset[cset_name]
except IndexError:
corr = cset.compound[cset_name]
lep_names = [f'{name}g' for name in cset_vars]
lep_names = [name.split('g')[0] for name in df.columns.intersection(lep_names)]
lep_vars = [df[f'{var}g' if var in lep_names else var] for var in cset_vars]
scales = corr.evaluate(*([syst_name] + lep_vars))
if syst_name == 'smear':
try:
scales *= corr.evaluate(*(['stdnormal'] + lep_vars))
print('Smearing using the included random number generator')
except:
scales = np.random.normal(1, scales)
print('Smearing using the numpy random')
df[mllg_name] *= df.eval(f'sqrt({mll_name}**2 + ({mllg_name}**2 - {mll_name}**2) * @scales)') / df[mllg_name]
df[vllg_name] = df.eval(f'2*({mllg_name}/91.2-1)/(1-{mll_name}**2/{mllg_name}**2)')
df[ptg_name] *= scales
# ------------------------------------------------------------------------------------------
# -- Potentially fit the results of the SAS when they are computed vs pT
# -- This is done to avoid spikes in the pT spectrum (+ smoothen the result)
# ------------------------------------------------------------------------------------------
[docs]
class FuncStr:
"""Vectorized function wrapper built from a string expression."""
def __init__(self, expression: str):
"""This class generates a numpy verctorized function based on a string expression
Parameters for fits are possible with [], e.g. par0 == [0]
Args:
expression (str): analytical expression of the form "np.sqrt([1]*x + [0])
Raises:
SyntaxError: checkout that the syntax is correct (only [] for now)
"""
if expression.count("[") != expression.count("]"):
raise SyntaxError("Missing a stating or closing braket")
[docs]
self.n_par = 1 + expression.count("[")
[docs]
self.expression = expression
[docs]
def replace_par(self, *par):
expression = self.expression
for p_name, p_val in zip([f"[{ip}]" for ip in range(self.n_par-1)], par):
expression = expression.replace(p_name, f'{p_val:.10f}')
return expression
[docs]
def __call__(self, x, *par):
return np.frompyfunc(lambda x, *par: eval(self.replace_par(*par)), self.n_par, 1)(x, *par)
[docs]
resp_exp = "[0] * np.power(min(x, 160)/45., [1]) + [2]" # fix corr for pT > 160 GeV
[docs]
reso_exp = "np.sqrt(np.abs(np.power([0], 2) + [1]/min(x, 160)))" # fix corr for pT > 160 GeV
[docs]
pt_fit_resp = FuncStr(resp_exp)
[docs]
pt_fit_reso = FuncStr(reso_exp)
[docs]
p0_resp = [-0.02, -1., 1.0]
[docs]
pr_resp = [[-0.9, 0.5], [-5, 5], [0.5, 1.5]]
[docs]
pr_reso = [[1e-3, 0.1], [-0.05, 0.05]]
[docs]
def do_pt_fit(axval, avar, eavar, func:FuncStr, p0, p_range,
do_plot=False, min_pt=25, max_pt = 160, width_pt=0.5):
"""Fit a smooth pT-dependent function to binned inputs.
Args:
axval (np.ndarray): x-bin centers.
avar (np.ndarray): values to fit (shape: nbin x ncat).
eavar (np.ndarray): uncertainties for avar.
func (FuncStr): function form to fit.
p0 (list): initial parameter values.
p_range (list): parameter ranges.
do_plot (bool): if True, plot fit and points.
min_pt (float): minimum pT for fine grid.
max_pt (float): maximum pT for fine grid.
width_pt (float): step size for fine grid.
Returns:
list: list of fitted TFormula strings per category.
"""
minimizer="BFGS"
fit_result = [None] * avar.shape[-1]
if do_plot:
n_row = int(np.ceil(avar.shape[-1]/3.))
fig, ax = plt.subplots(3, n_row, figsize=(9, 3*n_row) )
ax = ax.flatten()
fine_pt_bining = np.linspace(min_pt, max_pt, int((max_pt-min_pt)/width_pt) + 1 )
xxval = (fine_pt_bining[1:] + fine_pt_bining[:-1])*0.5
y_fine = []
for i_dim in np.arange(avar.shape[-1]):
fitter = FFitter(p0=p0, p_range=p_range, minimizer=minimizer)
par, epar = fitter.fit(axval, avar[:, i_dim], eavar[:, i_dim], func)
if par is None:
fitter = FFitter(p0=p0, p_range=p_range, minimizer='Nelder-Mead')
par1, epar1 = fitter.fit(axval, avar[:, i_dim], eavar[:, i_dim], func)
print(" * refit1", par1)
fitter = FFitter(p0=par1, p_range=p_range, minimizer=minimizer)
par, epar = fitter.fit(axval, avar[:, i_dim], eavar[:, i_dim], func)
print(" * refit2", par)
if par is None:
par, epar = (par1, epar1)
y_fine.append(func(xxval, *par))
if do_plot:
plt.sca(ax[i_dim])
cms.draw(axval, avar[:, i_dim], yerr=eavar[:, i_dim], option ='E')
ax[i_dim].plot(xxval, y_fine[-1], color=ax[i_dim].lines[-1].get_color(), ls='--')
fit_result[i_dim] = func.tformula(*par)
return fit_result
[docs]
def pt_smoothing(json_file:Union[str, Path, Dict], dim_to_fit='pt', do_plot=False) -> Tuple[Dict, Dict]:
"""Function to fit the pT-dependent sas to make them smooth
Args:
json_file (Union[str, Path]): name of the json file containing the result of the sas fit
dim_to_fit (str, optional): name of the pt dimension. Defaults to 'pt'.
Returns:
Dict: new sas dictionnary
"""
params = parameters_from_json(json_file)
if params.get('pt', None) is not None:
xval = params['pt']
else:
return {}
fit_vres = {}
fit_eres = {}
for sas_type in ['resp', 'reso']:
var, evar, evar_mc = params[sas_type], params[f'e{sas_type}'], params[f'e{sas_type}_mc']
# -- find the dimension with pt
idim_pt = np.where(params['dims'] == dim_to_fit)[0][-1]
d_perm = [idim_pt] + [idim for idim, d in enumerate(params['dims']) if d != dim_to_fit]
var = tf.transpose(var, perm=d_perm).reshape(var.shape[idim_pt], -1)
evar = tf.transpose(evar, perm=d_perm).reshape(evar.shape[idim_pt], -1)
evar_mc = tf.transpose(evar_mc, perm=d_perm).reshape(evar.shape[idim_pt], -1)
p0 = p0_resp if sas_type == 'resp' else p0_reso
pr = pr_resp if sas_type == 'resp' else pr_reso
pt_fit = pt_fit_resp if sas_type == 'resp' else pt_fit_reso
var = var.numpy()
evar = evar.numpy()
var_up = var + evar
if sas_type == 'resp':
var = 1 / var
var_up = 1 / (var - evar)
res = do_pt_fit(xval, var, evar, pt_fit, p0, pr, do_plot=do_plot)
res_up = do_pt_fit(xval, var_up, evar, pt_fit, p0, pr)
fit_vres[sas_type] = res
fit_eres[sas_type] = res_up
eresp_abs = f'abs({fit_vres["resp"]} - {fit_vres["resp"]})'
ereso_abs = f'abs({fit_vres["reso"]} - {fit_vres["reso"]})'
eresp_rel = f'abs({fit_vres["resp"]}/{fit_vres["resp"]} - 1)'
fit_res_dt = {'scale': fit_vres['resp'], 'escale': eresp_abs}
fit_res_mc = {'smear': fit_vres['reso'], 'escale': eresp_rel, 'esmear': ereso_abs}
return fit_res_dt, fit_res_mc
[docs]
def create_pt_corrector(sas: Union[Dict, str, Path], dir_results:Union[Path, str]='./tmp/', dset_name='DSET', cset_name='CSET',
cset_description:str=None, cset_version=1, cat_latex=None, pt_name='pt'):
"""Create a correction lib files based
Args:
sas (Dict, ): dictionnary with scales and smearings
dir_results (Path, optional): directory. Defaults to Path('./tmp/').
dset_name (str, optional): dataset nickname. Defaults to 'DSET'.
cset_name (str, optional): correction saet name. Defaults to 'CSET'.
cset_description (str, optional): description of the correction set. Defaults to None.
cset_version (int, optional): correction set version. Defaults to 1.
cat_latex (Dict, optional): dictionnary with a description of variables in categories. Defaults to None.
"""
import correctionlib.schemav2 as cs
import rich
print(f'--------------------------------------------')
print(f'---- Create correction lib with PT SMOOTHING')
print(f'--------------------------------------------')
# -- do fits
fit_dt, fit_mc = pt_smoothing(sas)
if not isinstance(sas, Dict):
sas = json_safe_load(sas)
if cat_latex is None:
cat_latex = {key: key for key in sas['categories']}
dir_results = Path(dir_results)
categories = sas['categories'].copy()
categories.pop(pt_name)
edges = list(categories.values())
n_val = np.prod([len(e)-1 for e in edges])
def get_correction(sas_type, fit_results):
input_cats = [cs.Variable(name=dim, type='real', description=cat_latex[dim]) for dim in categories.keys()]
def get_pt_corr(expression, flow='clamp'):
content=[cs.Formula(nodetype="formula", variables=[pt_name], parser="TFormula", expression=expression[ibin]) for ibin in range(n_val)]
return cs.MultiBinning(nodetype='multibinning',
inputs=list(categories.keys()),
edges=edges,
content=content,
flow=flow)
cset = cs.Correction(name=f"EGMScale_{cset_name}_{dset_name}" if sas_type == 'scale' else f"EGMSmearAndSyst_{cset_name}_{dset_name}" ,
version = cset_version,
description=cset_description,
inputs = [cs.Variable(name="syst", type="string", description="Systematic type"),
cs.Variable(name=pt_name, type="real", description="p_T in GeV")] + input_cats ,
output = cs.Variable(name=sas_type, type="real", description=f"EM {sas_type} value"),
data = cs.Category(nodetype='category',
input='syst',
content=[cs.CategoryItem(key=k,value=get_pt_corr(v)) for k, v in fit_results.items()],
default=get_pt_corr(fit_results[sas_type]))
)
return cset
c_scale = get_correction('scale', fit_dt)
c_smear = get_correction('smear', fit_mc)
rich.print(c_scale)
rich.print(c_smear)
cset = cs.CorrectionSet(schema_version=2,
description=cset_description,
corrections=[c_scale, c_smear]
)
dir_results.mkdir(exist_ok=True)
file_out = dir_results / f"EGMScalesSmearing_{dset_name}_SMOOTH.v{cset_version}.json"
print(f"Writing out correction lib file: {file_out}.gz")
import gzip
with gzip.open(f"{file_out}.gz", "wt") as fout:
fout.write(cset.model_dump_json(indent=1,exclude_unset=True))
return c_scale
[docs]
def ijazz_sas_smoothing():
"""entry point for the ijazz_sas_smoothing command
"""
import argparse
import sys
parser = argparse.ArgumentParser(description=f'IJazZ Scale and Smearing fit')
parser.add_argument('json_file', type=str, help='name of json file')
parser.add_argument('--dset', type=str, default='DSET', help="name of the dataset")
parser.add_argument('--cset', type=str, default='CSET', help="name of the correction set")
parser.add_argument('--description', type=str, default=None, help="cset description")
parser.add_argument('-v', '--vers', type=int, default=1, help="cset version")
parser.add_argument('-o', '--outdir', type=str, default="./", help="output directory")
args = parser.parse_args(sys.argv[1:])
create_pt_corrector(args.json_file, dir_results=args.outdir, dset_name=args.dset, cset_name=args.cset,
cset_description=args.description, cset_version=args.vers)