Source code for polarimetry.lhcb

# cspell:ignore modelparameters modelstudies
# pyright: reportConstantRedefinition=false
"""Import functions that are specifically for this LHCb analysis.

.. seealso:: :doc:`/cross-check`
"""
from __future__ import annotations

import itertools
import re
import sys
from copy import deepcopy
from math import sqrt
from typing import TYPE_CHECKING, Generic, Iterable, Pattern, TypeVar

import attrs
import numpy as np
import sympy as sp
import yaml
from attrs import frozen
from sympy.core.symbol import Str

from polarimetry.amplitude import (
    AmplitudeModel,
    DalitzPlotDecompositionBuilder,
    DynamicsBuilder,
    get_indexed_base,
)
from polarimetry.decay import IsobarNode, Particle, ThreeBodyDecay, ThreeBodyDecayChain
from polarimetry.spin import filter_parity_violating_ls, generate_ls_couplings

from .dynamics import (
    formulate_breit_wigner,
    formulate_bugg_breit_wigner,
    formulate_exponential_bugg_breit_wigner,
    formulate_flatte_1405,
)
from .particle import PARTICLE_TO_ID, Σ, K, Λc, p, π

if TYPE_CHECKING:
    from pathlib import Path

if sys.version_info < (3, 8):
    from typing_extensions import Literal
else:
    from typing import Literal


[docs]def load_model( model_file: Path | str, particle_definitions: dict[str, Particle], model_id: int | str = 0, ) -> AmplitudeModel: builder = load_model_builder(model_file, particle_definitions, model_id) model = builder.formulate() imported_parameter_values = load_model_parameters( model_file, builder.decay, model_id, particle_definitions ) model.parameter_defaults.update(imported_parameter_values) return model
[docs]def load_model_builder( model_file: Path | str, particle_definitions: dict[str, Particle], model_id: int | str = 0, ) -> DalitzPlotDecompositionBuilder: with open(model_file) as f: model_definitions = yaml.load(f, Loader=yaml.SafeLoader) model_title = _find_model_title(model_definitions, model_id) model_def = model_definitions[model_title] lineshapes: dict[str, str] = model_def["lineshapes"] min_ls = "LS couplings" not in model_title decay = load_three_body_decay(lineshapes, particle_definitions, min_ls) amplitude_builder = DalitzPlotDecompositionBuilder(decay, min_ls) for chain in decay.chains: lineshape_choice = lineshapes[chain.resonance.name] dynamics_builder = _get_resonance_builder(lineshape_choice) amplitude_builder.dynamics_choices.register_builder(chain, dynamics_builder) return amplitude_builder
def _find_model_title(model_definitions: dict, model_id: int | str) -> str: if isinstance(model_id, int): if model_id >= len(model_definitions): msg = ( f"Model definition file contains {len(model_definitions)} models, but" f" trying to get number {model_id}." ) raise KeyError(msg) for i, title in enumerate(model_definitions): if i == model_id: return title if model_id not in model_definitions: msg = f'Could not find model with title "{model_id}"' raise KeyError(msg) return model_id def _get_resonance_builder(lineshape: str) -> DynamicsBuilder: if lineshape in {"BreitWignerMinL", "BreitWignerMinL_LS"}: return formulate_breit_wigner if lineshape == "BuggBreitWignerExpFF": return formulate_exponential_bugg_breit_wigner if lineshape in {"BuggBreitWignerMinL", "BuggBreitWignerMinL_LS"}: return formulate_bugg_breit_wigner if lineshape in {"Flatte1405", "Flatte1405_LS"}: return formulate_flatte_1405 msg = f'No dynamics implemented for lineshape "{lineshape}"' raise NotImplementedError(msg)
[docs]def load_three_body_decay( resonance_names: Iterable[str], particle_definitions: dict[str, Particle], min_ls: bool = True, ) -> ThreeBodyDecay: def create_isobar(resonance: Particle) -> list[ThreeBodyDecayChain]: if resonance.name.startswith("K"): child1, child2, spectator = π, K, p elif resonance.name.startswith("L"): child1, child2, spectator = K, p, π elif resonance.name.startswith("D"): child1, child2, spectator = p, π, K else: raise NotImplementedError prod_ls_couplings = generate_ls(Λc, resonance, spectator, conserve_parity=False) dec_ls_couplings = generate_ls(resonance, child1, child2, conserve_parity=True) if min_ls: decay = IsobarNode( parent=Λc, child1=IsobarNode( parent=resonance, child1=child1, child2=child2, interaction=min(dec_ls_couplings), ), child2=spectator, interaction=min(prod_ls_couplings), ) return [ThreeBodyDecayChain(decay)] chains = [] for dec_ls, prod_ls in itertools.product(dec_ls_couplings, prod_ls_couplings): decay = IsobarNode( parent=Λc, child1=IsobarNode( parent=resonance, child1=child1, child2=child2, interaction=dec_ls, ), child2=spectator, interaction=prod_ls, ) chains.append(ThreeBodyDecayChain(decay)) return chains def generate_ls( parent: Particle, child1: Particle, child2: Particle, conserve_parity: bool ) -> list[tuple[int, sp.Rational]]: ls = generate_ls_couplings(parent.spin, child1.spin, child2.spin) if conserve_parity: return filter_parity_violating_ls( ls, parent.parity, child1.parity, child2.parity ) return ls resonances = [particle_definitions[name] for name in resonance_names] chains: list[ThreeBodyDecayChain] = [] for res in resonances: chains.extend(create_isobar(res)) return ThreeBodyDecay( states={state_id: particle for particle, state_id in PARTICLE_TO_ID.items()}, chains=tuple(chains), )
[docs]class ParameterBootstrap: """A wrapper for loading parameters from :download:`model-definitions.yaml </../data/model-definitions.yaml>`.""" def __init__( self, filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, ) -> None: particle_definitions = extract_particle_definitions(decay) symbolic_parameters = load_model_parameters_with_uncertainties( filename, decay, model_id, particle_definitions ) self._parameters = {str(k): v for k, v in symbolic_parameters.items()} @property def values(self) -> dict[str, complex | float | int]: return {k: v.value for k, v in self._parameters.items()} @property def uncertainties(self) -> dict[str, complex | float | int]: return {k: v.uncertainty for k, v in self._parameters.items()}
[docs] def create_distribution( self, sample_size: int, seed: int | None = None ) -> dict[str, complex | float | int]: return _smear_gaussian( parameter_values=self.values, parameter_uncertainties=self.uncertainties, size=sample_size, seed=seed, )
[docs]def load_model_parameters( filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, particle_definitions: dict[str, Particle] | None = None, ) -> dict[sp.Indexed | sp.Symbol, complex | float]: parameters = load_model_parameters_with_uncertainties( filename, decay, model_id, particle_definitions ) return {k: v.value for k, v in parameters.items()}
[docs]def load_model_parameters_with_uncertainties( filename: Path | str, decay: ThreeBodyDecay, model_id: int | str = 0, particle_definitions: dict[str, Particle] | None = None, ) -> dict[sp.Indexed | sp.Symbol, MeasuredParameter]: with open(filename) as f: model_definitions = yaml.load(f, Loader=yaml.SafeLoader) model_title = _find_model_title(model_definitions, model_id) min_ls = "LS couplings" not in model_title parameter_definitions = model_definitions[model_title]["parameters"] parameters = _to_symbol_value_mapping( parameter_definitions, min_ls, particle_definitions ) decay_couplings = compute_decay_couplings(decay) parameters.update(decay_couplings) return parameters
def _smear_gaussian( parameter_values: dict[str, complex | float], parameter_uncertainties: dict[str, complex | float], size: int, seed: int | None = None, ) -> dict[str, np.ndarray]: value_distributions = {} for k, mean in parameter_values.items(): std = parameter_uncertainties[k] distribution = _create_gaussian_distribution(mean, std, size, seed) value_distributions[k] = distribution return value_distributions def _create_gaussian_distribution( mean: complex | float, std: complex | float, size: int, seed: int | None = None, ): rng = np.random.default_rng(seed) if isinstance(mean, complex) and isinstance(std, complex): return ( rng.normal(mean.real, std.real, size) + rng.normal(mean.imag, std.imag, size) * 1j ) if isinstance(mean, (float, int)) and isinstance(std, (float, int)): return rng.normal(mean, std, size) raise NotImplementedError _T = TypeVar("_T") _K = TypeVar("_K") _V = TypeVar("_V")
[docs]def flip_production_coupling_signs(obj: _T, subsystem_names: Iterable[Pattern]) -> _T: if isinstance(obj, AmplitudeModel): return attrs.evolve( obj, parameter_defaults=_flip_signs(obj.parameter_defaults, subsystem_names), ) if isinstance(obj, ParameterBootstrap): bootstrap = deepcopy(obj) bootstrap._parameters = _flip_signs(bootstrap._parameters, subsystem_names) # type: ignore[reportPrivateUsage] return bootstrap if isinstance(obj, dict): return _flip_signs(obj) raise NotImplementedError
def _flip_signs( parameters: dict[_K, _V], subsystem_names: Iterable[Pattern] ) -> dict[_K, _V]: pattern = rf".*\\mathrm{{production}}\[[{''.join(subsystem_names)}].*" return { key: _flip(value) if re.match(pattern, str(key)) else value for key, value in parameters.items() } def _flip(obj: _V) -> _V: if isinstance(obj, MeasuredParameter): return attrs.evolve(obj, value=_flip(obj.value)) return -obj
[docs]def compute_decay_couplings( decay: ThreeBodyDecay, ) -> dict[sp.Indexed, MeasuredParameter[int]]: H_dec = get_indexed_base("decay") half = sp.Rational(1, 2) decay_couplings = {} for chain in decay.chains: R = Str(chain.resonance.name) if chain.resonance.name.startswith("K"): decay_couplings[H_dec[R, 0, 0]] = 1 if chain.resonance.name[0] in {"D", "L"}: child1, child2 = chain.decay_products if chain.resonance.name.startswith("D"): coupling_pos = H_dec[R, +half, 0] coupling_neg = H_dec[R, -half, 0] else: coupling_pos = H_dec[R, 0, +half] coupling_neg = H_dec[R, 0, -half] decay_couplings[coupling_pos] = 1 decay_couplings[coupling_neg] = int( chain.resonance.parity * child1.parity * child2.parity * (-1) ** (chain.resonance.spin - child1.spin - child2.spin) ) return { symbol: MeasuredParameter(value, hesse=0) for symbol, value in decay_couplings.items() }
def _to_symbol_value_mapping( parameter_dict: dict[str, str], min_ls: bool, particle_definitions: dict[str, Particle], ) -> dict[sp.Basic, complex | float]: key_to_value: dict[str, MeasuredParameter] = {} for key, str_value in parameter_dict.items(): if key.startswith("Ar"): identifier = key[2:] str_imag = parameter_dict[f"Ai{identifier}"] key = f"A{identifier}" indexed_symbol: sp.Indexed = parameter_key_to_symbol( key, min_ls, particle_definitions ) resonance_name = str(indexed_symbol.indices[0]) resonance = particle_definitions[resonance_name] if min_ls: conversion_factor = get_conversion_factor(resonance) else: _, L, S = indexed_symbol.indices conversion_factor = get_conversion_factor_ls(resonance, L, S) real = _to_value_with_uncertainty(str_value) imag = _to_value_with_uncertainty(str_imag) parameter = _form_complex_parameter(real, imag) key_to_value[key] = attrs.evolve( parameter, value=conversion_factor * parameter.value, ) elif key.startswith("Ai"): continue else: key_to_value[key] = _to_value_with_uncertainty(str_value) return { parameter_key_to_symbol(key, min_ls, particle_definitions): value for key, value in key_to_value.items() } def _to_value_with_uncertainty(str_value: str) -> MeasuredParameter[float]: """ >>> _to_value_with_uncertainty('1.5 ± 0.2') MeasuredParameter(value=1.5, hesse=0.2, model=None, systematic=None) >>> par = _to_value_with_uncertainty('0.94 ± 0.042 ± 0.35 ± 0.04') >>> par MeasuredParameter(value=0.94, hesse=0.042, model=0.35, systematic=0.04) >>> par.uncertainty 0.058 """ float_values = tuple(float(s) for s in str_value.split(" ± ")) if len(float_values) == 2: return MeasuredParameter( value=float_values[0], hesse=float_values[1], ) if len(float_values) == 4: return MeasuredParameter( value=float_values[0], hesse=float_values[1], model=float_values[2], systematic=float_values[3], ) msg = f"Cannot convert '{str_value}' to {MeasuredParameter.__name__}" raise ValueError(msg) def _form_complex_parameter( real: MeasuredParameter[float], imag: MeasuredParameter[float], ) -> MeasuredParameter[complex]: def convert_optional(real: float | None, imag: float | None) -> complex | None: if real is None or imag is None: return None return complex(real, imag) return MeasuredParameter( value=complex(real.value, imag.value), hesse=complex(real.hesse, imag.hesse), model=convert_optional(real.model, imag.model), systematic=convert_optional(real.systematic, imag.systematic), ) ParameterType = TypeVar("ParameterType", complex, float) """Template for the parameter type of a for `MeasuredParameter`."""
[docs]@frozen class MeasuredParameter(Generic[ParameterType]): """Data structure for imported parameter values. `MeasuredParameter.value` and `~.MeasuredParameter.hesse` are taken from the `supplemental material <https://cds.cern.ch/record/2824328/files>`_, whereas `~.MeasuredParameter.model` and `~.MeasuredParameter.systematic` are taken from `Tables 8 and 9 <https://arxiv.org/pdf/2208.03262.pdf#page=21>`_ from the original LHCb paper :cite:`LHCb-PAPER-2022-002`. """ value: ParameterType """Central value of the parameter as determined by a fit with Minuit.""" hesse: ParameterType """Parameter uncertainty as determined by a fit with Minuit.""" model: ParameterType | None = None """Systematic uncertainties from fit bootstrapping.""" systematic: ParameterType | None = None """Systematic uncertainties from detector effects etc..""" @property def uncertainty(self) -> ParameterType: if self.systematic is None: return self.hesse if isinstance(self.value, float): return sqrt(self.hesse**2 + self.systematic**2) return complex( sqrt(self.hesse.real**2 + self.systematic.real**2), sqrt(self.hesse.imag**2 + self.systematic.imag**2), )
[docs]def get_conversion_factor(resonance: Particle) -> Literal[-1, 1]: # https://github.com/ComPWA/polarimetry/issues/5#issue-1220525993 half = sp.Rational(1, 2) if resonance.name.startswith("D"): return int(-resonance.parity * (-1) ** (resonance.spin - half)) if resonance.name.startswith("K"): return 1 if resonance.name.startswith("L"): return int(-resonance.parity) msg = f"No conversion factor implemented for {resonance.name}" raise NotImplementedError(msg)
[docs]def get_conversion_factor_ls( resonance: Particle, L: sp.Rational, S: sp.Rational ) -> Literal[-1, 1]: # https://github.com/ComPWA/polarimetry/issues/192#issuecomment-1321892494 if resonance.name.startswith("K") and resonance.spin == 0: return 1 half = sp.Rational(1, 2) cg_flip_factor = int((-1) ** (L + S - half)) return get_conversion_factor(resonance) * cg_flip_factor
[docs]def parameter_key_to_symbol( # noqa: C901, PLR0911, PLR0912, PLR0915 key: str, min_ls: bool = True, particle_definitions: dict[str, Particle] | None = None, ) -> sp.Indexed | sp.Symbol: H_prod = get_indexed_base("production", min_ls) half = sp.Rational(1, 2) if key.startswith("A"): # https://github.com/ComPWA/polarimetry/issues/5#issue-1220525993 R = _stringify(key[1:-1]) subsystem_identifier = str(R)[0] coupling_number = int(key[-1]) if min_ls: # Helicity couplings if subsystem_identifier in {"D", "L"}: if coupling_number == 1: return H_prod[R, -half, 0] if coupling_number == 2: return H_prod[R, +half, 0] if subsystem_identifier == "K": if str(R) in {"K(700)", "K(1430)"}: if coupling_number == 1: return H_prod[R, 0, +half] if coupling_number == 2: return H_prod[R, 0, -half] else: if coupling_number == 1: return H_prod[R, 0, -half] if coupling_number == 2: return H_prod[R, -1, -half] if coupling_number == 3: return H_prod[R, +1, +half] if coupling_number == 4: return H_prod[R, 0, +half] else: # LS-couplings: supplemental material p.1 (https://cds.cern.ch/record/2824328/files) if particle_definitions is None: msg = ( "You need to provide particle definitions in order to map the" " coupling IDs to coupling symbols" ) raise ValueError(msg) resonance = particle_definitions[str(R)] if subsystem_identifier in {"D", "L"}: if coupling_number == 1: return H_prod[R, resonance.spin - half, resonance.spin] if coupling_number == 2: return H_prod[R, resonance.spin + half, resonance.spin] if subsystem_identifier == "K": if resonance.spin == 0: # "K(700)", "K(1430)" if coupling_number == 1: return H_prod[R, 0, half] if coupling_number == 2: return H_prod[R, 1, half] else: if coupling_number == 1: return H_prod[R, 0, half] if coupling_number == 2: return H_prod[R, 1, half] if coupling_number == 3: return H_prod[R, 1, 3 * half] if coupling_number == 4: return H_prod[R, 2, 3 * half] if key.startswith("alpha"): R = _stringify(key[5:]) return sp.Symbol(Rf"\alpha_{{{R}}}") if key.startswith("gamma"): R = _stringify(key[5:]) return sp.Symbol(Rf"\gamma_{{{R}}}") if key.startswith("M"): R = _stringify(key[1:]) return sp.Symbol(Rf"m_{{{R}}}") if key.startswith("G1"): R = _stringify(key[2:]) return sp.Symbol(Rf"\Gamma_{{{R} \to {p.latex} {K.latex}}}") if key.startswith("G2"): R = _stringify(key[2:]) return sp.Symbol(Rf"\Gamma_{{{R} \to {Σ.latex} {π.latex}}}") if key.startswith("G"): R = _stringify(key[1:]) return sp.Symbol(Rf"\Gamma_{{{R}}}") if key == "dLc": return sp.Symbol(R"R_{\Lambda_c}") msg = f'Cannot convert key "{key}" in model parameter JSON file to SymPy symbol' raise NotImplementedError(msg)
def _stringify(obj) -> Str: if isinstance(obj, Particle): return Str(obj.name) return Str(f"{obj}")
[docs]def extract_particle_definitions(decay: ThreeBodyDecay) -> dict[str, Particle]: particles = {} def update_definitions(particle: Particle) -> None: particles[particle.name] = particle for chain in decay.chains: update_definitions(chain.parent) update_definitions(chain.resonance) update_definitions(chain.spectator) update_definitions(chain.decay_products[0]) update_definitions(chain.decay_products[1]) return particles