"""
The two classes Simulation Parameters and Dictionary are a key component of the interface between
AIDO and user-defined programs. There are two ingredients:
1. **SimulationParameter**: Represents a single parameter with all its features and settings. It has
the following properties
- Name: For unique identification
- Values: Both a starting value (set by the user) and the current value (as adjusted by the optimizer)
- Units (optional)
- Optimizable (optional): Whether this parameter is constant or not
- Min / Max values (optional but recommended): Constrains the possible values that this parameter
can adopt. For example thicknesses cannot be smaller than zero.
- Discrete: Some parameters are categorical and cannot be easily represented by a floating point
number. For these cases, the possible values have to be listed in an Iterable from which to
choose the current value.
- Cost (optional): Use for computing additional penalties
2. **SimulationParameterDictionary**: The container that stores multiple SimulationParameters in a
dict-style object
- Can be indexed as a list or a dict
- Converted to list, dict and :class:`pandas.DataFrame`
- Written and read from and to json files. This feature is important to pass it to non-python
programs, e.g. Geant4.
- Additional metadata about a given training step.
"""
import copy
import datetime
import json
from typing import Any, Dict, Iterable, Iterator, List, Literal, Self
import numpy as np
import pandas as pd
from aido.config import AIDOConfig
config = AIDOConfig.from_json("config.json")
[docs]
class SimulationParameter:
"""Base class for all parameters used in the simulation.
A simulation parameter represents a single variable that can be optimized
during the simulation process.
"""
[docs]
def __init__(
self,
name: str,
starting_value: Any,
current_value: Any | None = None,
units: str | None = None,
optimizable: bool = True,
min_value: float | None = None,
max_value: float | None = None,
sigma: float | None = None,
sigma_mode: Literal["flat", "scale"] | None = None,
discrete_values: Iterable | None = None,
probabilities: Iterable[float] | None = None,
cost: float | Iterable | None = None
):
"""Create a new Simulation Parameter.
Parameters
----------
name : str
The name of the parameter.
starting_value : Any
The starting value of the parameter.
current_value : Any, optional
The current value of the parameter. Defaults to None, in which case
it set to the starting value instead.
units : str, optional
The units of the parameter. Defaults to None.
optimizable : bool, optional
Whether the parameter is optimizable. Defaults to True.
min_value : float, optional
The minimum value of the parameter. Defaults to None.
max_value : float, optional
The maximum value of the parameter. Defaults to None.
sigma : float, optional
The standard deviation of the parameter. Defaults to 0.5 for continuous
(float) parameters and remains None otherwise.
sigma_mode : {"flat", "scale"}, optional
Whether to set the sampling distribution standard deviation to sigma ("flat")
or scale sigma with the current value ("scale"). Defaults to "flat". If "sigma"
is not defined, this parameter has no action. Can be changed class-wide using
the cls.set_sigma_mode() classmethod.
discrete_values : Iterable, optional
The allowed discrete values of the parameter. Defaults to None.
probabilities : Iterable[float], optional
A list of the same length as 'discrete_values' used to sample from
'discrete_values'. If set to None, an equally-distributed array is created.
Only for discrete parameters. Defaults to None.
cost : float or Iterable, optional
A float that quantifies the cost per unit of this Parameter.
Defaults to None. For discrete parameters, this parameter must be an
Iterable (e.g. list) of the same length as 'discrete_values'.
"""
def check_boundaries() -> None:
assert (
isinstance(min_value, type(starting_value)) or min_value is None
), "Only float parameters are allowed to have lower bounds"
assert (
isinstance(max_value, type(starting_value)) or max_value is None
), "Only float parameters are allowed to have upper bounds"
def check_discrete_parameters() -> None:
assert (
isinstance(self._current_value, float) is True or (discrete_values is not None or optimizable is False)
), "Specify discrete values when the parameter is not a float, but is optimizable"
if discrete_values is not None:
if optimizable is False:
raise AssertionError(
"Non-optimizable parameters are excluded from requiring allowed discrete values"
)
assert (
self._current_value in discrete_values
), "Current value must be included in the list of allowed discrete values"
assert (
starting_value in discrete_values
), "Starting value must be included in the list of allowed discrete values"
assert (
min_value is None and max_value is None
), "Not allowed to specify min and max value for parameter with discrete values"
def check_sigma() -> None:
if discrete_values is None and optimizable:
if sigma is not None:
assert sigma > 0.0, "Sigma parameter must be a positive float"
else:
assert (
sigma is None
), "Unable to assign standard deviation to discrete or non-optimizable parameter."
def check_cost() -> None:
if cost is None:
pass
elif self.discrete_values:
assert (
isinstance(cost, Iterable)
), "Parameter 'cost' must be an iterable of the same length as 'discrete_values'"
assert (
len(list(cost)) == len(list(self.discrete_values))
), f"Length of 'cost' ({len(list(cost))}) is different"
f"from that of 'discrete_values' ({len(list(self.discrete_values))})"
assert (
(isinstance(cost_item, float) and cost_item > 0.0 for cost_item in cost)
), "Entries of argument 'cost' must be positive floats"
elif self.discrete_values is None:
assert (
isinstance(cost, float) and cost > 0.0
), "Cost argument must be a positive float for continuous parameters."
assert isinstance(name, str), "Name must be a string"
self.name = name
check_boundaries()
self._starting_value = starting_value
self._optimizable = optimizable
self.units = units
self.min_value = min_value
self.max_value = max_value
self._current_value = current_value if current_value is not None else starting_value
check_discrete_parameters()
self.discrete_values = discrete_values
self.probabilities = probabilities if discrete_values is not None else None
check_sigma()
self.sigma = sigma
self.sigma_mode = sigma_mode or config.simulation.sigma_mode
check_cost()
self.cost = cost
def __str__(self) -> str:
return json.dumps(self.to_dict(), indent=4)
[docs]
def to_dict(self) -> Dict:
"""Convert to dictionary
Returns:
Dict: A dict with all the attributes of this class
Note:
Protected attributes are written to file as public attributes.
"""
return {key.removeprefix("_"): value for key, value in self.__dict__.items()}
[docs]
@classmethod
def from_dict(cls, attribute_dict: Dict) -> Self:
"""Create from dictionary
Args:
attribute_dict (Dict): A dict whose keys match the names of the parameters found in :meth:`SimulationParameter.__init__`.
Returns:
SimulationParameter: Instance of this class
"""
return cls(**attribute_dict)
@property
def current_value(self) -> Any:
"""Get the value stored in this instance
Example:
>>> sim_param = SimulationParameter("foo", 1.0)
>>> sim_param.current_value
1.0
"""
return self._current_value
@current_value.setter
def current_value(self, value: Any) -> None:
if isinstance(self._starting_value, int) and isinstance(value, float) and abs(value - round(value)) < 10E-15:
value = round(value)
assert (
isinstance(value, type(self._starting_value))
), f"The updated value is of another type ({type(value)}) than before ({type(self._starting_value)})"
if self.optimizable is False and value != self._starting_value:
raise AttributeError("Do not change the current value of non-optimizable parameter")
assert (
self.discrete_values is None or value in self.discrete_values
), "Updated discrete parameter value is not in the list of allowed discrete values."
self._current_value = value
@property
def optimizable(self) -> bool:
"""Whether this parameter is constant or optimizable.
Tip:
Non-optimizable parameters can be used to store constants, for example the number of events to simulate.
"""
return self._optimizable
@property
def sigma(self) -> float | None:
"""The standard deviation of the Gaussian distribution used to generate new values for this parameter.
Note:
The starting value must be provided by the user (or left to be the default from the :class:`AIDOConfig`).
Afterwards, the optimization loop might adjust the :attr:`sigma` to explore some regions quicker.
"""
if self.discrete_values is not None or not self.optimizable:
return None
if self.sigma_mode == "scale":
return self._sigma * self.current_value
elif self.sigma_mode == "flat":
return self._sigma
@sigma.setter
def sigma(self, value: float | None):
if self.discrete_values is None and self.optimizable:
if value is None:
value = config.simulation.sigma
assert value > 0.0
else:
assert value is None
self._sigma = value
@property
def probabilities(self) -> List[float]:
"""List of normalized probabilities that convey the confidence of the Optimizer in the individual choice.
Note:
Used for generating new values by sampling from this distribution.
See :meth:`SimulationParameterDictionary.generate_new`.
"""
return self._probabilities
@probabilities.setter
def probabilities(self, value: Iterable[float] | None):
if self.discrete_values is None:
return None
if value is None:
value = np.full(len(list(self.discrete_values)), 1.0 / len(list(self.discrete_values)))
assert (
len(list(value)) == len(list(self.discrete_values))
), f"Length of 'probabilities' ({len(list(value))}) must match length"
f"of 'discrete values' ({len(list(self.discrete_values))})"
prob_array = np.array(value, dtype=float)
assert (
np.all(prob_array >= 0)
), "All entries must be non-negative numerical values"
assert (
np.isclose(np.sum(prob_array), 1.0, atol=10E-8)
), f"Probabilities are not normed, their sum is {np.sum(prob_array)} but must equal 1"
prob_array: np.ndarray = prob_array / np.sum(prob_array)
self._probabilities: List[float] = prob_array.tolist()
@property
def weighted_cost(self) -> None | float:
"""Helper property that returns the expected cost for a given parameter.
Returns:
- None if :attr:`SimulationParameter.cost` was not defined
- float otherwise (without any unit)
Note:
This weighted cost is either `current_value * cost` if the parameter is continuous,
or the scalar product of :attr:`SimulationParameter.probabilities` and
:attr:`SimulationParameter.cost`.
Tip:
This value can be used to calculate penalty costs to apply to the Optimizer loss
function.
"""
if self.cost is None:
return None
if self.discrete_values is None:
return self.current_value * self.cost
else:
return float(np.dot(np.array(self.probabilities), np.array(self.cost)))
[docs]
class SimulationParameterDictionary:
"""Container for storing :class:`SimulationParameters`"""
[docs]
def __init__(
self,
parameter_list: List[SimulationParameter] = [],
):
""" Initialize with a list of parameters 'SimulationParameter'
Args:
parameter_list: List[SimulationParameter]
"""
self.iteration: int = 0
self.creation_time = str(datetime.datetime.now())
self.rng_seed: int | None = None
self.description = ""
self._covariance: np.ndarray | None = None
self.parameter_list = parameter_list
self.parameter_dict = self.to_dict(serialized=False)
def __str__(self) -> str:
return json.dumps(self.to_dict(), indent=4)
def __setitem__(self, name: str, simulation_parameter: SimulationParameter):
assert (
name == simulation_parameter.name
), "Key does not match name assigned in 'SimulationParameter'"
self.parameter_list.append(simulation_parameter)
self.parameter_dict[name] = simulation_parameter
[docs]
def __getitem__(self, key: str | int) -> SimulationParameter | Dict:
"""Access a SimulationParameter either by name (dict-style) or by index (list-style)"""
if isinstance(key, str):
return self.parameter_dict[key]
if isinstance(key, int):
return self.parameter_list[key]
def __len__(self) -> int:
return len(self.parameter_list)
[docs]
def to_dict(self, serialized: bool = True) -> Dict[str, SimulationParameter | Any]:
"""Convert the parameter list to a dictionary.
Args:
serialized (bool, default=True):
If True, returns a Dict of Dicts by serializing each SimulationParameter too.
If False, returns a Dict of SimulationParameters, which is used by this class
to allow dictionary-style access to the individual parameters.
Returns:
Dict: A dictionary where the keys are parameter names and the values are either
serialized dictionaries or SimulationParameter objects.
"""
parameter_dict: Dict[str, SimulationParameter | Any]
if serialized is False:
parameter_dict = {parameter.name: parameter for parameter in self.parameter_list}
else:
parameter_dict = {parameter.name: parameter.to_dict() for parameter in self.parameter_list}
parameter_dict["metadata"] = self.metadata
return parameter_dict
[docs]
def to_json(self, file_path: str):
"""Write the parameter list to a .json file
TODO Check for the existence of the file path or otherwise set as default to ../
"""
with open(file_path, "w") as file:
json.dump(self.to_dict(), file)
[docs]
def to_df(
self,
df_length: int | None = 1,
include_non_optimizables: bool = False,
display_discrete: Literal["default", "as_probabilities", "as_one_hot"] = "default",
types: Literal["all", "continuous", "discrete"] = "all",
**kwargs
) -> pd.DataFrame:
""" Convert parameter dictionary to a pd.DataFrame
Args:
df_length (int): The length of the DataFrame to be created. Default is None.
include_non_optimizables (bool): Whether to include non-optimizable parameters in the
df. Defaults to False.
display_discrete (Literal):
- 'default': Simply write the current value of the Parameter (default)
- 'as_probabilities': Write the probability of each category (from the list of available
discrete parameter).
- 'as_one_hot': Write the current value as a one-hot encoded array. All categories are set
to zero except for the 'current_value' which is set to one.
Ref. https://pytorch.org/docs/stable/generated/torch.nn.functional.one_hot.html
types (str): Choose what kind of parameters will be added to the pd.DataFrame. All includes
all parameters, continuous only those with no 'discrete_values' and discrete only
those with 'discrete_values'.
kwargs: Additional keyword arguments to be passed to the pd.DataFrame constructor.
Returns:
pd.DataFrame: A pandas DataFrame containing the current parameter values.
"""
if df_length is not None:
kwargs["index"] = range(df_length)
return pd.DataFrame(
self.get_current_values("dict", include_non_optimizables, display_discrete=display_discrete, types=types),
**kwargs,
)
[docs]
def get_current_values(
self,
format: Literal["list", "dict"] = "dict",
include_non_optimizables: bool = False,
display_discrete: Literal["default", "as_probabilities", "as_one_hot"] = "default",
types: Literal["all", "continuous", "discrete"] = "all",
) -> List | Dict:
"""Obtain all the current values of each parameter in a list or dict format.
Args:
format (str):
- list: Returns a list of the current values
- dict: Returns a dict of name, current value pairs
include_non_optimizables (bool): Whether to include constant parameters in the returned object
display_discrete (str): How to display the categorical parameters
- default: Current value of the discrete parameter. Compatible with `format="list"`
- as_probabilities: Injects a dict of with pairs of the sort
{`<name>_<value>`: <probability>}
Where name is the name of parameter, value is the corresponding value from the list
of all possible values listed in :attr:`SimulationParameter.probabilities`.
Not compatible with `format="list"`.
- as_one_hot: Same as `as_probabilities` but replaces the probabilities with a one-hot-encoding scheme
where the likeliest categorical value is one and all other entries are zero.
Not compatible with `format="list"`.
types (str): Which types of parameters to include
Example:
>>> sim_param = SimulationParameterDictionary([
... SimulationParameter("foo", 1, discrete_parameters=[0, 1, 2, 3])],
... SimulationParameter("bar", 10.0)
... )
>>> sim_param.get_current_values(display_discrete="default", format="list")
[1, 10.0]
>>> sim_param.get_current_values(display_discrete="default", format="dict")
{"foo": 1, "bar": 10.0}
>>> sim_param.get_current_values(display_discrete="as_probabilities", format="dict")
{"foo_0": 0.25, "foo_1": 0.25, "foo_2": 0.25, "foo_3": 0.25, "bar": 10.0}
"""
if format == "list" and display_discrete != "default":
raise NotImplementedError("One-Hot Encoding is only available with the 'list' format")
if types == "continuous" and display_discrete != "default":
raise ValueError(
"Continuous parameters can not be displayed as one-hot encoded, only discrete parameters."
"Setting types == 'continuous' and 'display_discrete' other than 'default' are incompatible"
)
def get_parameters() -> Iterator[SimulationParameter]:
for parameter in self.parameter_list:
if (
not parameter.optimizable and not include_non_optimizables
or parameter.discrete_values is not None and types == "continuous"
or parameter.discrete_values is None and types == "discrete"
):
continue
yield parameter
if format == "list":
current_values = []
for parameter in get_parameters():
current_values.append(parameter.current_value)
elif format == "dict":
current_values = {}
for parameter in get_parameters():
if parameter.discrete_values and not display_discrete == "default":
for index, val in enumerate(parameter.discrete_values):
key = f"{parameter.name}_{val}"
if display_discrete == "as_probabilities":
current_values[key] = parameter.probabilities[index]
elif display_discrete == "as_one_hot":
current_values[key] = 1 if val == parameter.current_value else 0
else:
current_values[parameter.name] = parameter.current_value
return current_values
[docs]
def get_probabilities(self) -> dict[str, List[float]]:
"""Returns a dict whose values are the probabilities of each discrete parameter
Note: More information about the usage of probabilities for one-hot
encoded parameters is listed in the docstring of :class:`SimulationParameter`.
"""
probabilities = {}
for parameter in self.parameter_list:
if parameter.discrete_values:
probabilities[parameter.name] = parameter.probabilities
return probabilities
[docs]
def update_current_values(self, current_values_parameter_dict: dict):
"""Updates the current values of parameters in the dictionary.
Args:
current_values_parameter_dict (dict): Dictionary with parameter names as keys and their new values.
Returns:
SimulationParameterDictionary
Raises:
AssertionError: If a key in current_values_parameter_dict doesn't exist in the parameter dictionary.
"""
for key, value in current_values_parameter_dict.items():
assert (
key in self.parameter_dict.keys()
), f"Key {key} was not in previous Parameter Dictionary"
self.parameter_dict[key].current_value = value
return self
[docs]
def update_probabilities(self, probabilities_dict: dict[str, List | np.ndarray]):
"""Updates the probabilities of discrete parameters.
Args:
probabilities_dict (dict[str, List | np.ndarray]): Dictionary containing parameter names as keys
and their new probability distributions as values.
Returns:
SimulationParameterDictionary
Raises:
AssertionError: If a key in probabilities_dict doesn't exist in the parameter dictionary.
"""
for key, value in probabilities_dict.items():
assert (
key in self.parameter_dict.keys()
), f"Key {key} was not in previous Parameter Dictionary"
if self.parameter_dict[key].discrete_values is not None:
self.parameter_dict[key].probabilities = value
return self
@property
def sigma_array(self) -> np.ndarray:
""" Diagonal matrix with the standard deviation of each continuous parameter
"""
sigma_array = []
for parameter in self.parameter_list:
if (
(parameter.optimizable is True)
and (parameter.discrete_values is None)
and (parameter.sigma is not None)
):
sigma_array.append(parameter.sigma**2)
return np.diag(np.array(sigma_array))
@property
def covariance(self) -> np.ndarray:
""" Get the current covariance matrix. If no covariance was set, it defaults
to the 'sigma_array' squared (covariance matrix with no correlations).
"""
if self._covariance is not None:
return self._covariance
else:
return self.sigma_array
@covariance.setter
def covariance(self, new_covariance: np.ndarray) -> None:
""" Set the input matrix as the new covariance matrix
"""
assert (
isinstance(new_covariance, np.ndarray)
), f"Covariance must be a numpy array, but is type {type(new_covariance)}"
assert (
np.allclose(new_covariance, new_covariance.T)
), "Covariance matrix is not symmetric"
assert (
np.all(np.linalg.eigvals(new_covariance) >= 0)
), "Covariance matrix is not positive semi-definite (negative eigenvalues detected)"
assert (
np.all(np.diag(new_covariance) >= 0)
), "Covariance matrix has negative variances on the diagonal"
self._covariance = new_covariance
@property
def metadata(self) -> Dict[str, Any]:
"""Gets the metadata dictionary containing simulation state information.
Returns:
Dict: Dictionary containing
- iteration (int): Current iteration number
- creation_time (str): Timestamp of creation
- rng_seed (int | None): Random number generator seed
- description (str): Optional description
- covariance (List[List[float]]): Covariance matrix as nested list
"""
return {
"iteration": self.iteration,
"creation_time": self.creation_time,
"rng_seed": self.rng_seed,
"description": self.description,
"covariance": self.covariance.tolist(),
}
@metadata.setter
def metadata(self, new_metadata_dict: Dict[str, int | str]):
"""Updates the simulation metadata.
Args:
new_metadata_dict (Dict[str, int | str]): Dictionary containing metadata to update.
Special handling for 'covariance' key which is converted to numpy array.
Other keys must match existing metadata attributes.
Note:
Only updates metadata attributes that already exist. Ignores unknown keys.
"""
if "covariance" in new_metadata_dict.keys():
self.covariance = np.array(new_metadata_dict.pop("covariance"))
for name, value in new_metadata_dict.items():
if name in self.metadata:
self.__setattr__(name, value)
[docs]
@classmethod
def from_dict(cls, parameter_dict: Dict) -> 'SimulationParameterDictionary':
"""Creates a :class:`SimulationParameterDictionary` instance from a dictionary.
Args:
parameter_dict (Dict): A dictionary containing serialized SimulationParameters and metadata.
Must contain a 'metadata' key with simulation metadata.
Returns:
SimulationParameterDictionary: A new instance initialized with the parameters and metadata
from the dictionary.
Note:
The input dictionary must contain serialized SimulationParameter data, not
SimulationParameter instances.
"""
metadata: Dict = parameter_dict.pop("metadata", None)
instance = cls([SimulationParameter.from_dict(parameter) for parameter in parameter_dict.values()])
instance.metadata = metadata
return instance
[docs]
@classmethod
def from_json(cls, file_path: str) -> 'SimulationParameterDictionary':
"""Creates a SimulationParameterDictionary instance from a JSON file.
Args:
file_path (str): Path to the JSON file containing serialized parameters and metadata.
Returns:
SimulationParameterDictionary: A new instance initialized with the parameters and metadata
from the JSON file.
"""
with open(file_path, "r") as file:
parameter_dicts: Dict = json.load(file)
return cls.from_dict(parameter_dicts)
[docs]
def generate_new(
self,
rng_seed: int | None = None,
discrete_index: int | None = None,
scaling_factor: float = 1.0
) -> 'SimulationParameterDictionary':
"""Generates a new instance with newly sampled parameter values.
Generates new values bounded by specified minimum and maximum values for float
parameters. For discrete parameters, the new value is randomly chosen from the
list of allowed values.
Args:
rng_seed (int | None, optional): Seed for the random number generator.
If provided, ensures reproducible results. If None, a random seed is generated.
Defaults to None.
discrete_index (int | None, optional): Force specific index for discrete parameters.
- If > len(parameter.discrete_values), falls back to random sampling.
- If valid index, uses parameter.discrete_values[discrete_index].
Defaults to None.
scaling_factor (float, optional): Scale factor applied to covariance matrix
when generating new values. Defaults to 1.0.
Returns:
SimulationParameterDictionary: A new instance with sampled parameter values.
"""
def generate_discrete(parameter: SimulationParameter) -> list[Any]:
if parameter.discrete_values is None:
raise ValueError("Parameter is not discrete")
if discrete_index is not None and discrete_index < len(list(parameter.discrete_values)):
return list(parameter.discrete_values)[discrete_index]
else:
return list(parameter.discrete_values)[
rng.choice(len(list(parameter.discrete_values)), p=parameter.probabilities)
]
rng = np.random.default_rng(rng_seed)
rng_seed = rng_seed or int(rng.integers(9_999_999))
new_instance: Self = copy.deepcopy(self) # Prevents the modification of this instance
for parameter in self.parameter_list:
if parameter.discrete_values is not None:
new_instance[parameter.name].current_value = generate_discrete(parameter) # type: ignore
new_continuous_parameter_list = rng.multivariate_normal(
mean=np.array(self.get_current_values(format="list", types="continuous")),
cov=self.covariance * scaling_factor
)
old_continuous_parameter_dict: Dict = self.get_current_values(types="continuous", format="dict") # type: ignore
for index, key in enumerate(old_continuous_parameter_dict.keys()):
new_instance[key].current_value = float(new_continuous_parameter_list[index]) # type: ignore
new_instance.metadata = self.metadata
new_instance.rng_seed = rng_seed
return new_instance