Source code for aido.plotting

import glob
import os
import re
from typing import Annotated, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from aido.logger import logger
from aido.simulation_helpers import SimulationParameterDictionary


[docs] def percentage_type(value: float) -> float: """ Checks if a float lies between [0, 1] """ if not (0.0 <= value < 1.0): raise ValueError(f"Value {value} must be in [0, 1]") return value
Percentage = Annotated[float, percentage_type]
[docs] class Plotting: """Container for all plotting functions """
[docs] @classmethod def plot(cls, plot_types: str | List[str] = "all", results_dir: str | os.PathLike = "./results/"): """ Plot the evolution of variables of interest over the Optimization process. Args ---- plot_types (str | List[str], optional): The types of plots to be generated. It can be a string or a list of strings. If "all" is specified, it will generate all available plots. Available methods: ["parameter_evolution", "optimizer_loss", "simulation_samples"] Returns ------- None TODO Clean up this class and do not repeat the reading of files all the time """ if plot_types == "all": plot_types = ["optimizer_loss", "probability_evolution", "parameter_evolution", "simulation_samples"] if isinstance(plot_types, str): plot_types = [plot_types] for plot_type in plot_types: getattr(cls, plot_type)(results_dir=results_dir) logger.info(f"Saved all figures to {results_dir}")
[docs] def parameter_evolution( fig_savepath: str | os.PathLike | None = "/plots/parameter_evolution", results_dir: str = "./results/", parameter_dir: str | os.PathLike = "/parameters/" ) -> Tuple[pd.DataFrame, np.ndarray]: """ Plots the evolution of all simulation parameters along with their respective "sigma". Args ---- fig_savepath (str | os.PathLike, optional): The file path to save the figure. Defaults to "<results_dir>/plots/parameter_evolution". If None, the figure will not be saved. results_dir (str | os.PathLike, optional): Results directory. Defaults to "./results/" parameter_dir (str | os.PathLike, optional): The directory path where the SimulationParameterDictionaries are stored (.json files). Defaults to "<results_dir>/parameters". Returns ------- Tuple(pd.DataFrame, np.ndarray): A Tuple containing the DataFrame with all parameters provided by the optimizer after each iteration, and the simulation sampling standard deviation (2D array). """ fig_savepath = f"{results_dir}/{fig_savepath}" parameter_dir = f"{results_dir}/{parameter_dir}" df_list = [] sigma_df_list = [] for file_name in os.listdir(parameter_dir): param_dict = SimulationParameterDictionary.from_json(parameter_dir + file_name) df_list.append(pd.DataFrame( param_dict.get_current_values(format="dict", types="continuous"), index=[param_dict.iteration], )) sigma_df_list.append(np.diag(param_dict.covariance)) df: pd.DataFrame = pd.concat(df_list, axis=0).sort_index() sigma = np.concatenate(sigma_df_list, axis=0) if fig_savepath is not None: plt.figure(figsize=(8, 6), dpi=400) cmap = plt.get_cmap("Set2") for i, col in enumerate(df.columns): plt.plot(df[col], label=col, color=cmap(i)) if np.any(sigma[i]): plt.fill_between( df[col].index, df[col] - sigma[i], df[col] + sigma[i], alpha=0.5, color=cmap(i) ) plt.legend() plt.xlabel("Iteration", loc="right") plt.ylabel("Parameter Value", loc="top") plt.savefig(fig_savepath) plt.close() return df, sigma
[docs] def optimizer_loss( fig_savepath: str | os.PathLike | None = "/plots/optimizer_loss", results_dir: str = "./results/", optimizer_loss_dir: str | os.PathLike = "/loss/optimizer" ) -> pd.DataFrame: """ Plot the optimizer loss over epochs and save the figure if `fig_savepath` is provided. Args ---- fig_savepath (str | os.PathLike | None): Path to save the figure. If None, the figure will not be saved. results_dir (str | os.PathLike, optional): Results directory. Defaults to "./results/" optimizer_loss_dir (str | os.PathLike): Directory containing the optimizer loss files. Returns ------- df_loss (pd.DataFrame): DataFrame with the optimizer loss at each iteration """ fig_savepath = f"{results_dir}/{fig_savepath}" optimizer_loss_dir = f"{results_dir}/{optimizer_loss_dir}" df_loss_list = [] files = glob.glob(f"{optimizer_loss_dir}/*") files.sort(key=lambda x: int(re.search(r"optimizer_loss_(\d+)", x).group(1))) for i, file_name in enumerate(files): df_i = pd.read_csv(file_name, names=["Epoch", "Loss"], dtype="float32", header=1) df_i["Iteration"] = i df_i["Scaled Epoch"] = np.linspace(i, i + 1, len(df_i)) df_loss_list.append(df_i) df_loss: pd.DataFrame = pd.concat(df_loss_list) if fig_savepath is not None: plt.figure(figsize=(8, 6), dpi=400) plt.plot(df_loss["Scaled Epoch"], df_loss["Loss"], c="k", label="optimizer_loss") plt.xlabel("Iteration", loc="right") plt.xlim(0, df_loss["Iteration"].to_numpy()[-1]) plt.xlabel("Epoch", loc="right") plt.ylabel("Loss", loc="top") plt.legend() plt.savefig(fig_savepath) plt.close() return df_loss
[docs] def simulation_samples( fig_savepath: str | os.PathLike | None = "/plots/simulation_samples", results_dir: str = "./results/", parameter_dir: str = "/parameters/", sampled_param_dict_filepath: str | os.PathLike = "/task_outputs/iteration=*/validation=False" ) -> Tuple[pd.DataFrame, np.ndarray]: """Generate a DataFrame of simulation parameters and their values. This method collects simulation parameters and their values for each iteration and task, organizing them into a DataFrame. Args ---- fig_savepath : str or os.PathLike or None, optional Path to save the generated plot. Defaults to "./results/plots/simulation_samples". sampled_param_dict_filepath : str or os.PathLike, optional Path to the sampled parameter dictionary files. Defaults to "./results/task_outputs/simulation_task*". parameter_dir : str, optional Where the parameters are stored in the results folder. Defaults to 'parameters'. Returns ------- pd.DataFrame DataFrame containing the simulation parameters. np.ndarray Array of sigma values. Notes ----- TODO: Check for files dynamically in case b2luigi changes directory names due to changes in the b2luigi.Parameters of the SimulationTasks. """ fig_savepath = f"{results_dir}/{fig_savepath}" sampled_param_dict_filepath = f"{results_dir}/{sampled_param_dict_filepath}" df_list: List[pd.DataFrame] = [] for iteration_dir in glob.glob(sampled_param_dict_filepath): for file_order, simulation_dir in enumerate(glob.glob(iteration_dir + "/simulation_task_id=*")): df = SimulationParameterDictionary.from_json( simulation_dir + "/param_dict.json" ).to_df(types="continuous") df["Iteration"] = int(re.search(r"/iteration=(\d+)/", iteration_dir).group(1)) df["Task_ID"] = int(re.search(r"task_id=(\d+)", simulation_dir).group(1)) df_list.append(df) if len(df_list) <= 1: return df_list df_params = pd.concat(df_list) df_params = df_params.sort_values(["Iteration", "Task_ID"]).reset_index(drop=True) if fig_savepath is not None: cmap = plt.get_cmap("Set2") df_optim, sigma = Plotting.parameter_evolution(None, results_dir=results_dir) plt.figure(figsize=(8, 6), dpi=400) for i, col in enumerate(df_optim.columns): plt.plot(df_optim[col], label=col, color=cmap(i)) if np.any(sigma[i]): plt.fill_between( df_optim[col].index, df_optim[col] - sigma[i], df_optim[col] + sigma[i], alpha=0.5, color=cmap(i) ) plt.gca().set_prop_cycle(None) for i, col in enumerate(df_params.columns.drop(["Iteration", "Task_ID"])): plt.scatter(df_params["Iteration"], df_params[col].values, marker="+", s=100, color=cmap(i)) plt.xlabel("Iteration", loc="right") plt.ylabel("Parameter Value", loc="top") plt.legend() plt.savefig(fig_savepath) plt.close() return df_params, sigma
def probability_evolution( fig_savepath: str | os.PathLike | None = "/plots/probability_evolution", results_dir: str = "./results/", parameter_dir: str | os.PathLike = "/parameters" ): def plot_probabilities( name: str, param_dicts_list: List[SimulationParameterDictionary], fig_savepath_absolute: str | os.PathLike, ): probabilities_over_iterations = [] iterations = [] for param_dict in param_dicts_list: discrete_values = param_dict[name].discrete_values iterations.append(param_dict.iteration) probabilities_over_iterations.append(param_dict[name].probabilities) probabilities_over_iterations = np.array(probabilities_over_iterations)[np.argsort(iterations)] iterations = np.array(iterations)[np.argsort(iterations)] fig, ax = plt.subplots(figsize=(8, 6)) for i, discrete_value in enumerate(discrete_values): ax.bar( iterations, probabilities_over_iterations[:, i], bottom=probabilities_over_iterations[:, :i].sum(axis=1), label=discrete_value, width=1, align="edge" ) ax.set_xlabel("Iteration") ax.set_ylabel("Probabilities") plt.legend() plt.xlim(iterations[0], iterations[-1]) plt.ylim(0, 1) plt.tight_layout() plt.savefig(f"{fig_savepath_absolute}_{name}") plt.close() return None fig_savepath_absolute = f"{results_dir}/{fig_savepath}" parameter_dir_absolute = f"{results_dir}/{parameter_dir}/*" param_dicts_list: List[SimulationParameterDictionary] = [] for param_dict_dir in glob.glob(parameter_dir_absolute): param_dicts_list.append(SimulationParameterDictionary.from_json(param_dict_dir)) if not param_dicts_list: raise FileNotFoundError(f"No parameter dicts files could be found in {parameter_dir_absolute}") for parameter in param_dicts_list[0]: if parameter.discrete_values: plot_probabilities(parameter.name, param_dicts_list, fig_savepath_absolute) return None
[docs] class FWHM: """Class for computing Full Width at Half Maximum (or other height) for a given (x, y) curve. """
[docs] def __init__( self, x: np.ndarray, y: np.ndarray, height: Percentage = 0.5, ) -> None: """Compute the Full Width Half Maximum for a given mapping of (x, y) values. Args: x (np.ndarray): X-axis values of the curve y (np.ndarray): Y-axis values of the curve (must be non-negative) height (Percentage, optional): Height at which to compute the width, as a fraction of the maximum height. Defaults to 0.5. """ assert np.all(y >= 0.0), "y must be an Array with only positive entries" if len(x) == len(y) + 1: x = x[1:] # Account for mismatched array length (e.g from matplotlib bins) self.height_absolute = float(np.max(y) * height) index_max = np.argmax(y) self.x_left = float(np.interp(self.height_absolute, y[:index_max + 1], x[:index_max + 1])) self.x_right = float(np.interp(self.height_absolute, np.flip(y[index_max:]), np.flip(x[index_max:]))) self.width = self.x_right - self.x_left
@property def values(self): """ Returns ------- tuple - width : float Width of the distribution (FWHM) at the specified height (x_right - x_left) - x_left : float x-intersection at the left edge - x_right : float x-intersection at the right edge - height : float Absolute height used for computing the peak """ return ( self.width, self.x_left, self.x_right, self.height_absolute, )
[docs] def add_to_axis( self, ax: plt.Axes, color: str = "k", linestyles: str = "--", **kwargs, ) -> plt.Axes: """ Add two vertical lines at the x-intersection to represent the FWHM. Args ---- ax: matplotlib.pyplot.Axes Axes on which to add the vertical lines color: str linestyles: str Returns ------- ax: matplotlib.pyplot.Axes """ ax.vlines(self.x_left, 0.0, self.height_absolute, color=color, linestyles=linestyles, **kwargs) ax.vlines(self.x_right, 0.0, self.height_absolute, color=color, linestyles=linestyles, **kwargs) return ax