"""Visualize the results of the experiments."""
from __future__ import annotations # for correct nested type hints e.g. list[str], tuple[dict, str]
import warnings
from collections import defaultdict
from math import ceil
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.cm import get_cmap
from matplotlib.colors import LinearSegmentedColormap, rgb2hex, to_hex, to_rgb
from autotuning_methodology.baseline import (
Baseline,
ExecutedStrategyBaseline,
RandomSearchCalculatedBaseline,
RandomSearchSimulatedBaseline,
)
from autotuning_methodology.curves import Curve, CurveBasis
from autotuning_methodology.experiments import execute_experiment, get_args_from_cli
from autotuning_methodology.report_experiments import (
get_aggregation_data,
get_aggregation_data_key,
get_strategies_aggregated_performance,
)
from autotuning_methodology.searchspace_statistics import SearchspaceStatistics
# The kernel information per device and device information for visualization purposes
marker_variatons = ["v", "s", "*", "1", "2", "d", "P", "X"]
remove_from_gpus_label = ""
remove_from_applications_label = " milo"
remove_from_searchspace_label = " milo"
# total set of objective time keys
objective_time_keys_values = ["compilation", "benchmark", "framework", "search_algorithm", "validation"]
[docs]
def lighten_color(color, amount: float = 0.5):
"""Lightens the given color by interpolating it toward white."""
r, g, b = to_rgb(color)
return to_hex([(1 - amount) * c + amount for c in (r, g, b)])
[docs]
def get_colors(strategies: list[dict]) -> list:
"""Assign colors using the tab10 colormap, with lighter shades for children."""
tab10 = plt.get_cmap("tab10").colors
tab10 = [c for i, c in enumerate(tab10) if i != 1] # remove the second color (orange) to avoid confusion with the fourth (red)
max_parents = len(tab10)
strategy_parents = defaultdict(list)
override_index = False
# Group children under their parents and check for overriden color indices
for i, strategy in enumerate(strategies):
if "color_parent" in strategy:
strategy_parents[strategy["color_parent"]].append(i)
if "color_index" in strategy:
override_index = True
if "color_parent" in strategy:
raise ValueError(
f"Strategy '{strategy['name']}' has both 'color_index' and 'color_parent' defined, which is not allowed."
)
if len(strategy_parents) > max_parents:
raise ValueError(f"Too many color parents: max supported is {max_parents} using tab10")
parent_colors = {}
colors = [None] * len(strategies)
color_index = 0
for i, strategy in enumerate(strategies):
name = strategy["name"]
if name in strategy_parents:
children_indices = strategy_parents[name]
if len(children_indices) > 2:
raise ValueError(f"Color parent '{name}' has more than two children")
if override_index:
assert "color_index" in strategy, f"All strategies, including '{name}', must have either 'color_index' or 'color_parent' if 'color_index' is used anywhere."
color_index = strategy["color_index"]
if color_index >= len(tab10):
raise ValueError(f"Color index {color_index} for strategy '{name}' is out of bounds for tab10 colormap (max {len(tab10) - 1})")
base_color = tab10[color_index]
parent_colors[name] = {
idx: lighten_color(base_color, amount=0.4 + 0.3 * j) for j, idx in enumerate(children_indices)
}
colors[i] = to_hex(base_color)
color_index += 1
elif "color_parent" in strategy:
parent = strategy["color_parent"]
assert parent in parent_colors, f"Parent '{parent}' for strategy '{name}' not found in parent colors - child strategies must be defined after their parents."
colors[i] = parent_colors[parent][i]
else:
if override_index:
assert "color_index" in strategy, f"All strategies, including '{name}', must have either 'color_index' or 'color_parent' if 'color_index' is used anywhere."
color_index = strategy["color_index"]
if color_index >= len(tab10):
raise ValueError("Too many unparented strategies for tab10 colormap")
colors[i] = to_hex(tab10[color_index])
color_index += 1
return colors
[docs]
def get_colors_old(strategies: list[dict], scale_margin_left=0.4, scale_margin_right=0.15) -> list:
"""Function to get the colors for each of the strategies."""
default_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
main_colors = ["Blues", "Greens", "Reds", "Purples", "Greys", "Oranges"]
main_color_counter = 0
strategy_parents = defaultdict(list)
# TODO switch to qualitative colormaps, e.g. tab10 if no children, otherwise tab20 (https://matplotlib.org/stable/users/explain/colors/colormaps.html#qualitative)
# get the dictionary of parents with the index of their child strategies
for strategy_index, strategy in enumerate(strategies):
if "color_parent" in strategy:
parent = strategy["color_parent"]
strategy_parents[parent].append(strategy_index)
if len(strategy_parents) == 0:
return default_colors
if len(strategy_parents) > len(main_colors):
raise ValueError(f"Can't use parent colors with more than {len(main_colors)} strategies")
def get_next_single_color_list(main_color_counter: int, num_colors: int):
colorname = main_colors[main_color_counter]
cmap = get_cmap(colorname)
spacing = np.linspace(scale_margin_left, 1 - scale_margin_right, num=num_colors) if num_colors > 1 else [0.5]
colormap = cmap(spacing)
color_list = [rgb2hex(c) for c in colormap]
return color_list
parented_colors = dict()
colors = list()
for strategy_index, strategy in enumerate(strategies):
name = strategy["name"]
if name in strategy_parents:
children_index = strategy_parents[name]
if len(children_index) == 3:
warnings.warn(f"Color parent '{name}' has three children, check if lines in plot are visually distinct")
if len(children_index) > 3:
raise ValueError(
f"Color parent '{name}' should not have more than three children to maintain visual distinction"
)
color_scale = get_next_single_color_list(main_color_counter, len(children_index) + 1)
main_color_counter += 1
parented_colors[name] = dict()
for index, child_index in enumerate(children_index):
parented_colors[name][child_index] = color_scale[(len(children_index) - 1) - index]
color = color_scale[len(children_index)]
else:
if "color_parent" in strategy:
parent = strategy["color_parent"]
color = parented_colors[parent][strategy_index]
else:
color = get_next_single_color_list(main_color_counter, 1)[0]
main_color_counter += 1
colors.append(color)
return colors
[docs]
class Visualize:
"""Class for visualization of experiments."""
x_metric_displayname = dict(
{
"fevals": "Number of function evaluations used",
"time_total": "Total time in seconds",
"aggregate_time": "Relative time to cutoff point",
"time_partial_framework_time": "framework time",
"time_partial_framework": "framework time",
"time_partial_strategy_time": "strategy time",
"time_partial_search_algorithm": "strategy time",
"time_partial_compile_time": "compile time",
"time_partial_compilation": "compile time",
"time_partial_benchmark_time": "kernel runtime",
"time_partial_times": "kernel runtime",
"time_partial_runtimes": "kernel runtime",
"time_partial_verification_time": "verification time",
}
)
y_metric_displayname = dict(
{
"objective_absolute": "Best-found objective value",
"objective_scatter": "Best-found objective value",
"objective_relative_median": "Fraction of absolute optimum relative to median",
"objective_normalized": "Best-found objective value\n(normalized from median to optimum)",
"objective_baseline": "Best-found objective value\n(relative to baseline)",
"objective_baseline_max": "Improvement over random sampling",
"aggregate_objective": "Aggregate best-found objective value relative to baseline",
"aggregate_objective_max": "Aggregate improvement over random sampling",
"time": "Best-found kernel time in milliseconds",
"GFLOP/s": "GFLOP/s",
}
)
plot_x_value_types = ["fevals", "time", "aggregated"] # number of function evaluations, time, aggregation
plot_y_value_types = [
"absolute",
"scatter",
"normalized",
"baseline",
] # absolute values, scatterplot, median-absolute normalized, improvement over baseline
def __init__(
self,
experiment_filepath: str,
save_figs=True,
save_extra_figs=False,
continue_after_comparison=False,
compare_extra_baselines=False,
use_strategy_as_baseline=None,
) -> None:
"""Initialization method for the Visualize class.
Args:
experiment_filepath: the path to the experiment-filename.json to run.
save_figs: whether to save the figures to file, if not, displays in a window. Defaults to True.
save_extra_figs: whether to save split times and baseline comparisons figures to file. Defaults to False.
continue_after_comparison: whether to continue plotting after processing comparisons. Defaults to False.
compare_extra_baselines: whether to include additional baselines for comparison. Defaults to False.
use_strategy_as_baseline: whether to use an executed strategy as the baseline. WARNING: likely destroys comparability. Defaults to None.
Raises:
ValueError: on various invalid inputs.
"""
# # silently execute the experiment
# with warnings.catch_warnings():
# warnings.simplefilter("ignore")
self.experiment, self.all_experimental_groups, self.searchspace_statistics, self.results_descriptions = (
execute_experiment(experiment_filepath, profiling=False)
)
experiment_folder: Path = self.experiment["parent_folder_absolute_path"]
assert isinstance(experiment_folder, Path)
self.plot_filename_prefix = experiment_folder.joinpath("run", "generated_graphs")
print("\n")
print("Visualizing")
# preparing filesystem
if save_figs or save_extra_figs:
Path(self.plot_filename_prefix).mkdir(exist_ok=True)
# search strategies are search methods defined in experiments setup file
# self.all_experimental_groups are all combinations of gpu+application+search method that got executed
self.strategies = self.experiment["search_strategies"]
# settings
cutoff_percentile: float = self.experiment["statistics_settings"]["cutoff_percentile"]
cutoff_percentile_start: float = self.experiment["statistics_settings"]["cutoff_percentile_start"]
cutoff_type: str = self.experiment["statistics_settings"]["cutoff_type"]
assert cutoff_type == "fevals" or cutoff_type == "time", f"cutoff_type != 'fevals' or 'time', is {cutoff_type}"
time_resolution: float = self.experiment["visualization_settings"]["resolution"]
if int(time_resolution) != time_resolution:
raise ValueError(f"The resolution must be an integer, yet is {time_resolution}.")
time_resolution = int(time_resolution)
objective_time_keys: list[str] = self.experiment["statistics_settings"]["objective_time_keys"]
# plot settings
plots: list[dict] = self.experiment["visualization_settings"]["plots"]
compare_baselines: bool = self.experiment["visualization_settings"]["compare_baselines"]
compare_split_times: bool = self.experiment["visualization_settings"]["compare_split_times"]
confidence_level: float = self.experiment["visualization_settings"]["confidence_level"]
self.colors = get_colors(self.strategies)
# self.colors = get_colors_old(
# self.strategies,
# scale_margin_left=self.experiment["visualization_settings"].get("color_parent_scale_margin_left", 0.4),
# scale_margin_right=self.experiment["visualization_settings"].get("color_parent_scale_margin_right", 0.1),
# )
self.plot_skip_strategies: list[str] = list()
if use_strategy_as_baseline is not None:
self.plot_skip_strategies.append(use_strategy_as_baseline)
# visualize
aggregation_data = get_aggregation_data(
experiment_folder,
self.experiment,
self.searchspace_statistics,
self.strategies,
self.results_descriptions,
cutoff_percentile,
cutoff_percentile_start,
confidence_level,
time_resolution,
use_strategy_as_baseline,
)
# plot per searchspace
for gpu_name in self.experiment["experimental_groups_defaults"]["gpus"]:
for application_name in self.experiment["experimental_groups_defaults"]["applications_names"]:
print(f" | visualizing optimization of {application_name} for {gpu_name}")
title = f"{application_name} on {gpu_name}"
title = title.replace("_", " ")
# unpack the aggregation data
random_baseline, strategies_curves, searchspace_stats, time_range, fevals_range = aggregation_data[
get_aggregation_data_key(gpu_name=gpu_name, application_name=application_name)
]
# baseline_time_interpolated = np.linspace(mean_feval_time, cutoff_point_time, time_resolution)
# baseline = get_random_curve(cutoff_point_fevals, sorted_times, time_resolution)
# compare baselines
if compare_baselines is True:
self.plot_baselines_comparison(
time_range,
searchspace_stats,
objective_time_keys,
strategies_curves=strategies_curves,
confidence_level=confidence_level,
title=title,
save_fig=save_extra_figs,
)
if compare_split_times is True:
self.plot_split_times_comparison(
"fevals",
fevals_range,
searchspace_stats,
objective_time_keys,
title=title,
strategies_curves=strategies_curves,
save_fig=save_extra_figs,
)
self.plot_split_times_comparison(
"time",
time_range,
searchspace_stats,
objective_time_keys,
title=title,
strategies_curves=strategies_curves,
save_fig=save_extra_figs,
)
self.plot_split_times_bar_comparison(
"time",
time_range,
searchspace_stats,
objective_time_keys,
title=title,
strategies_curves=strategies_curves,
save_fig=save_extra_figs,
)
if not continue_after_comparison and (compare_baselines is True or compare_split_times is True):
continue
# set additional baselines for comparison
baselines_extra: list[Baseline] = []
if compare_extra_baselines is True:
baselines_extra.append(RandomSearchSimulatedBaseline(searchspace_stats, repeats=1000))
baselines_extra.append(RandomSearchCalculatedBaseline(searchspace_stats, include_nan=True))
# baselines_extra.append(
# ExecutedStrategyBaseline(
# searchspace_stats,
# strategy=(
# baseline_executed_strategy
# if baseline_executed_strategy is not None
# else strategies_curves[0]
# ),
# confidence_level=confidence_level,
# )
# )
for plot in plots:
# get settings
scope: str = plot["scope"]
if scope != "searchspace":
continue
style: str = plot["style"]
plot_x_value_types: list[str] = plot["x_axis_value_types"]
plot_y_value_types: list[str] = plot["y_axis_value_types"]
# visualize the results
for x_type in plot_x_value_types:
if x_type == "fevals":
x_axis_range = fevals_range
elif x_type == "time":
x_axis_range = time_range
else:
raise NotImplementedError(f"X-axis type '{x_type}' not supported for scope '{plot}'")
# create the figure and plots
fig, axs = plt.subplots(
nrows=len(plot_y_value_types),
ncols=1,
figsize=(8, 4.2 * len(plot_y_value_types)),
sharex=True,
dpi=300,
)
if not hasattr(
axs, "__len__"
): # if there is just one subplot, wrap it in a list so it can be passed to the plot functions
axs = [axs]
fig.canvas.manager.set_window_title(title)
if not save_figs:
fig.suptitle(title)
# plot the subplots of individual searchspaces
for index, y_type in enumerate(plot_y_value_types):
self.plot_strategies(
style,
x_type,
y_type,
axs[index],
searchspace_stats,
strategies_curves,
x_axis_range,
self.experiment["visualization_settings"],
random_baseline,
baselines_extra=baselines_extra,
)
if index == 0:
loc = "lower right" if y_type == "normalized" else "best"
axs[index].legend(loc=loc)
# finalize the figure and save or display it
fig.supxlabel(self.get_x_axis_label(x_type, objective_time_keys))
fig.tight_layout()
if save_figs:
filename_path = Path(self.plot_filename_prefix) / f"{title}_{x_type}".replace(" ", "_")
fig.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
# plot per searchstrategy
for plot in plots:
# get settings
scope: str = plot["scope"]
style: str = plot["style"]
if scope != "search_strategy":
continue
if style != "heatmap" and style != "compare_heatmaps":
raise NotImplementedError(
f"Scope {scope} currently only supports 'heatmap' or 'compare_heatmaps' as a style, not {style}"
)
plot_x_value_types: list[str] = plot["x_axis_value_types"]
plot_y_value_types: list[str] = plot["y_axis_value_types"]
annotate: bool = plot.get("annotate", True)
print_mean_of_columns: bool = plot.get("print_mean_of_columns", False)
print_mean_of_rows: bool = plot.get("print_mean_of_rows", False)
assert len(plot_x_value_types) == 1
assert len(plot_y_value_types) == 1
x_type = plot_x_value_types[0]
y_type = plot_y_value_types[0]
bins = plot.get("bins", 10)
vmin = plot.get("vmin", -15.0) # color range lower limit
vmax = plot.get("vmax", 1.0) # color range upper limit
cmin = plot.get("cmin", vmin) # colorbar lower limit
cmax = plot.get("cmax", vmax) # colorbar upper limit
cnum = plot.get("cnum", 5) # number of ticks on the colorbar
cap_to_vmin = plot.get("cap_to_vmin", False) # whether to cap the values to vmin
divide_train_test_axis = plot.get(
"divide_train_test_axis", False
) # whether to add visual indication for train/test split
divide_train_test_after_num = plot.get(
"divide_train_test_after_num", False
) # where to add the visual indication for train/test split
include_y_labels = plot.get("include_y_labels", None)
include_colorbar = plot.get("include_colorbar", True)
if vmin != -15.0:
warnings.warn(
f"Careful: VMin has been changed from -15.0 to {vmin}. This breaks visual comparison compatiblity with plots that do not have the same VMin. Maybe use cmin instead?."
)
if vmax != 1.0:
warnings.warn(
f"Careful: VMax has been changed from 1.0 to {vmax}. This breaks visual comparison compatiblity with plots that do not have the same VMax. Maybe use cmax instead?"
)
if cmin < vmin:
raise ValueError(
f"Colorbar minimum can't be lower than the minimum value of the heatmap: {cmin} < {vmin}"
)
if cmax > vmax:
raise ValueError(
f"Colorbar maximum can't be higher than the maximum value of the heatmap: {cmax} > {vmax}"
)
# set the colormap
def norm_color_val(v):
"""Normalize a color value to fit in the 0-1 range."""
return (v - vmin) / (vmax - vmin)
cmap = LinearSegmentedColormap.from_list(
"my_colormap",
[
(norm_color_val(-15.0), "black"),
(norm_color_val(-4.0), "red"),
(norm_color_val(-1.0), "orange"),
(norm_color_val(0.0), "yellow"),
(norm_color_val(1.0), "green"),
],
)
# collect and plot the data for each search strategy
data_collected: dict[str, list[tuple]] = defaultdict(list)
for strategy in self.strategies:
strategy_name = strategy["name"]
strategy_displayname = strategy["display_name"]
assert (
sum([1 for s in self.strategies if s["name"] == strategy_name]) == 1
), f"Strategy name '{strategy_name}' is not unqiue"
# get the data from the collected aggregated data
for gpu_name in self.experiment["experimental_groups_defaults"]["gpus"]:
for application_name in self.experiment["experimental_groups_defaults"]["applications_names"]:
# unpack the aggregation data
random_baseline, strategies_curves, searchspace_stats, time_range, fevals_range = (
aggregation_data[
get_aggregation_data_key(gpu_name=gpu_name, application_name=application_name)
]
)
# get the data
dist = searchspace_stats.objective_performances_total_sorted
for _, strategy_curve in enumerate(strategies_curves):
if strategy_name != strategy_curve.name:
continue
# get the real and fictional performance curves
(
_,
x_axis_range_real,
curve_real,
_,
_,
x_axis_range_fictional,
curve_fictional,
_,
_,
) = strategy_curve.get_curve_over_time(
time_range, dist=dist, confidence_level=confidence_level
)
# combine the real and fictional parts to get the full curve
combine = x_axis_range_fictional.ndim > 0
x_axis_range = (
np.concatenate([x_axis_range_real, x_axis_range_fictional])
if combine
else x_axis_range_real
)
assert np.array_equal(
time_range, x_axis_range, equal_nan=True
), "time_range != x_axis_range"
curve = np.concatenate([curve_real, curve_fictional]) if combine else curve_real
# get the standardised curves and write them to the collector
curve: np.ndarray = random_baseline.get_standardised_curves(
time_range, [curve], x_type="time"
)[0]
score = np.mean(curve, axis=0)
curve_binned = np.array_split(curve, bins)
score_binned = [np.mean(c, axis=0) for c in curve_binned]
# set the data
gpu_display_name = str(gpu_name).replace("_", " ")
application_display_name = str(application_name).replace("_", " ").capitalize()
data_collected[strategy_name].append(
tuple([gpu_display_name, application_display_name, score, score_binned])
)
if style == "heatmap":
for strategy in self.strategies:
strategy_name = strategy["name"]
strategy_displayname = strategy["display_name"]
strategy_data = data_collected[strategy_name]
# get the performance per selected type in an array
plot_data = np.stack(np.array([t[2] for t in strategy_data]))
cutoff_percentile: float = self.experiment["statistics_settings"].get("cutoff_percentile", 1.0)
cutoff_percentile_start: float = self.experiment["statistics_settings"].get(
"cutoff_percentile_start", 0.01
)
label_data = {
"gpus": (
list(dict.fromkeys([t[0].replace(remove_from_gpus_label, "") for t in strategy_data])),
"GPUs",
),
"applications": (
list(
dict.fromkeys([t[1].replace(remove_from_applications_label, "") for t in strategy_data])
),
"Applications",
),
"searchspaces": (
list(
dict.fromkeys(
[
f"{t[1]} on\n{t[0]}".replace(remove_from_searchspace_label, "")
for t in strategy_data
]
)
),
"Searchspaces",
),
"time": (
np.round(np.linspace(0.0, 1.0, bins), 2),
f"Fraction of time between {cutoff_percentile_start * 100}% and {cutoff_percentile * 100}%",
),
}
x_ticks = label_data[x_type][0]
y_ticks = label_data[y_type][0]
figsize = None
if (x_type == "time" and y_type == "searchspaces") or (
x_type == "searchspaces" and y_type == "time"
):
plot_data: np.ndarray = np.stack(np.array([t[3] for t in strategy_data]))
if x_type == "searchspaces":
plot_data = plot_data.transpose()
figsize = (9, 5)
elif (x_type == "gpus" and y_type == "applications") or (
y_type == "gpus" and x_type == "applications"
):
plot_data = np.reshape(
plot_data, (len(label_data["gpus"][0]), len(label_data["applications"][0]))
)
if x_type == "gpus":
plot_data = np.transpose(plot_data)
figsize = (5, 3.5)
else:
raise NotImplementedError(
f"Heatmap has not yet been implemented for {x_type}, {y_type}. Submit an issue to request it."
)
# validate the data is within the vmin-vmax range and visible colorbar range
assert not (
plot_data > 1.0
).any(), (
"Plot data contains values greater than 1.0, which should not be possible. Please investigate."
)
if cap_to_vmin:
plot_data = np.clip(plot_data, vmin, 1.0)
outside_range = np.where(np.logical_or(plot_data < vmin, plot_data > vmax))
assert (
len(outside_range[0]) == 0 and len(outside_range[1]) == 0
), f"There are values outside of the range ({vmin}, {vmax}): {plot_data[outside_range]} ({outside_range} for strategy {strategy_displayname})"
outside_visible_range = np.where(np.logical_or(plot_data < cmin, plot_data > cmax))
if not (len(outside_visible_range[0]) == 0 and len(outside_visible_range[1]) == 0):
warnings.warn(
f"There are values outside of the visible colorbar range ({cmin}, {cmax}): {plot_data[outside_visible_range]} ({outside_visible_range})"
)
# set up the plot
fig, axs = plt.subplots(
ncols=1, figsize=figsize, dpi=300
) # if multiple subplots, pass the axis to the plot function with axs[0] etc.
if not hasattr(axs, "__len__"):
axs = [axs]
title = f"Performance of {strategy_displayname} over {'+'.join(plot_x_value_types)},{'+'.join(plot_y_value_types)}"
fig.canvas.manager.set_window_title(title)
if not save_figs:
fig.suptitle(title)
# plot the heatmap
axs[0].set_xlabel(plot.get("xlabel", label_data[x_type][1]))
axs[0].set_xticks(ticks=np.arange(len(x_ticks)), labels=x_ticks, rotation=0)
if include_y_labels is True or None:
axs[0].set_ylabel(plot.get("ylabel", label_data[y_type][1]))
axs[0].set_yticks(ticks=np.arange(len(y_ticks)), labels=y_ticks)
if include_y_labels is True:
# axs[0].yaxis.set_label_position("right")
axs[0].yaxis.tick_right()
elif include_y_labels is False:
axs[0].set_yticks(ticks=np.arange(len(y_ticks)))
axs[0].tick_params(labelleft=False)
hm = axs[0].imshow(
plot_data,
vmin=vmin,
vmax=vmax,
cmap=cmap,
interpolation="nearest",
aspect="auto",
# extent=[-0.5, plot_data.shape[1] + 0.5, -0.5, plot_data.shape[0] + 0.5],
)
if divide_train_test_axis is not False:
# axs[0].set_ylim(plot_data.shape[0] - 0.5, -0.5) # Ensure correct y-axis limits
if x_type == divide_train_test_axis.lower():
# add the vertical line to the x-axis
axs[0].axvline(
x=divide_train_test_after_num - 0.5, color="black", linestyle="--", linewidth=0.8
)
# add train and test texts to either side of the x-label
axs[0].text(
x=divide_train_test_after_num - 0.5,
y=-0.5,
s="train",
ha="center",
va="top",
fontsize=10,
)
axs[0].text(
x=divide_train_test_after_num - 0.5,
y=plot_data.shape[0] - 0.5,
s="test",
ha="center",
va="bottom",
fontsize=10,
)
elif y_type == divide_train_test_axis.lower():
# add the horizontal line to the y-axis
axs[0].axhline(
y=divide_train_test_after_num - 0.5, color="black", linestyle="--", linewidth=0.8
)
if include_y_labels is not False:
# add train and test texts to either side of the y-label
x_loc = -0.02
y_center = 0.5
text = "train"
axs[0].text(
x=x_loc,
y=y_center + 0.25 + (len(text) * 0.01),
s=text,
color="grey",
fontsize=8.5,
ha="center",
va="center",
rotation=90,
transform=axs[0].transAxes,
)
text = "test"
axs[0].text(
x=x_loc,
y=y_center - 0.25 - (len(text) * 0.01),
s=text,
color="grey",
fontsize=8.5,
ha="center",
va="center",
rotation=90,
transform=axs[0].transAxes,
)
else:
raise ValueError(f"{divide_train_test_axis=} not in x ({x_type}) or y ({y_type}) axis")
# plot the colorbar
if include_colorbar is True:
cbar = fig.colorbar(hm)
if cmin != vmin or cmax != vmax:
cbar.set_ticks(np.linspace(cmin, cmax, num=cnum)) # set colorbar limits
cbar.ax.set_ylim(cmin, cmax) # adjust visible colorbar limits
# cbar.set_label("Performance relative to baseline (0.0) and optimum (1.0)")
cbar.ax.set_ylabel("Performance score", rotation=-90, va="bottom")
# keep only non-overlapping ticks
max_ticks = 15
if len(x_ticks) > max_ticks:
indices = np.linspace(0, len(x_ticks) - 1, max_ticks).round()
hide_tick = np.isin(np.arange(len(x_ticks)), indices, invert=True, assume_unique=True)
for i, t in enumerate(axs[0].xaxis.get_ticklabels()):
if hide_tick[i]:
t.set_visible(False)
if len(y_ticks) > max_ticks:
indices = np.linspace(0, len(y_ticks) - 1, max_ticks).round()
hide_tick = np.isin(np.arange(len(y_ticks)), indices, invert=True, assume_unique=True)
for i, t in enumerate(axs[0].yaxis.get_ticklabels()):
if hide_tick[i]:
t.set_visible(False)
# loop over data dimensions and create text annotations
if annotate:
# replace with looping over plot_data instead
for i, j in np.ndindex(plot_data.shape):
number = plot_data[i, j]
if np.isnan(number):
continue
text = axs[0].text(
j,
i,
f"{round(number, 2) if number < -10 else round(number, 3)}",
ha="center",
va="center",
color="white" if (number > 0.5 or number < -2) else "black",
fontsize="small",
)
# print extra information if requested
if print_mean_of_columns:
mean_of_columns = np.nanmean(plot_data, axis=0)
print(f"Mean of columns for {strategy_displayname} ({x_type}): {mean_of_columns}")
if print_mean_of_rows:
mean_of_rows = np.nanmean(plot_data, axis=1)
print(f"Mean of rows for {strategy_displayname} ({y_type}): {mean_of_rows}")
# finalize the figure and save or display it
fig.tight_layout()
if save_figs:
suffix = ""
if include_colorbar and not (x_type == "time" or y_type == "time"):
suffix += "_colorbar"
if include_y_labels and not (x_type == "time" or y_type == "time"):
suffix += "_ylabels"
filename_path = (
Path(self.plot_filename_prefix)
/ f"{strategy_name}_heatmap_{'_'.join(plot_x_value_types)}_{'_'.join(plot_y_value_types)}{suffix}"
)
fig.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
elif style == "compare_heatmaps":
raise NotImplementedError("Still a work in progress")
# comparisons = plot["comparison"]
# # set up the plot
# fig, axs = plt.subplots(
# ncols=1, figsize=(9, 6), dpi=300
# ) # if multiple subplots, pass the axis to the plot function with axs[0] etc.
# if not hasattr(axs, "__len__"):
# axs = [axs]
# # title = f"Performance of {strategy_displayname} over {'+'.join(plot_x_value_types)},{'+'.join(plot_y_value_types)}"
# # fig.canvas.manager.set_window_title(title)
# # if not save_figs:
# # fig.suptitle(title)
# for comparison in comparisons:
# strategy_names = comparisons["strategies"]
# strategies = [s for s in self.strategies if s["name"]]
# # for strategy in strategies:
# strategy_displayname = strategy["display_name"]
# strategy_data = data_collected[strategy_name]
# # get the performance per selected type in an array
# plot_data = np.stack(np.array([t[2] for t in strategy_data]))
# cutoff_percentile: float = self.experiment["statistics_settings"].get("cutoff_percentile", 1)
# cutoff_percentile_start: float = self.experiment["statistics_settings"].get(
# "cutoff_percentile_start", 0.01
# )
# label_data = {
# "gpus": (
# list(dict.fromkeys([t[0].replace(remove_from_gpus_label, "") for t in strategy_data])),
# "GPUs",
# ),
# "applications": (
# list(
# dict.fromkeys([t[1].replace(remove_from_applications_label, "") for t in strategy_data])
# ),
# "Applications",
# ),
# "searchspaces": (
# list(
# dict.fromkeys(
# [
# f"{t[1]} on\n{t[0]}".replace(remove_from_searchspace_label, "")
# for t in strategy_data
# ]
# )
# ),
# "Searchspaces",
# ),
# "time": (
# np.round(np.linspace(0.0, 1.0, bins), 2),
# f"Fraction of time between {cutoff_percentile_start * 100}% and {cutoff_percentile * 100}%",
# ),
# }
# x_ticks = label_data[x_type][0]
# y_ticks = label_data[y_type][0]
# if (x_type == "time" and y_type == "searchspaces") or (
# x_type == "searchspaces" and y_type == "time"
# ):
# plot_data: np.ndarray = np.stack(np.array([t[3] for t in strategy_data]))
# if x_type == "searchspaces":
# plot_data = plot_data.transpose()
# elif (x_type == "gpus" and y_type == "applications") or (
# y_type == "gpus" and x_type == "applications"
# ):
# plot_data = np.reshape(
# plot_data, (len(label_data["gpus"][0]), len(label_data["applications"][0]))
# )
# if x_type == "gpus":
# plot_data = np.transpose(plot_data)
# else:
# raise NotImplementedError(
# f"Heatmap has not yet been implemented for {x_type}, {y_type}. Submit an issue to request it."
# )
# # validate the data
# outside_range = np.where(np.logical_or(plot_data < vmin, plot_data > vmax))
# assert len(outside_range[0]) == 0 and len(outside_range[1]) == 0, (
# f"There are values outside of the range ({vmin}, {vmax}): {plot_data[outside_range]} ({outside_range} for strategy {strategy_displayname})"
# )
else:
raise NotImplementedError(f"Invalid {style=}")
# plot the aggregated searchspaces
for plot in plots:
# get settings
scope: str = plot["scope"]
style: str = plot["style"]
vmin: float = plot.get("vmin", None) # visual range lower limit
if scope != "aggregate":
continue
if style != "line" and style != "head2head":
raise NotImplementedError(f"{scope} does currently not support {style}, create an issue to request it.")
if style == "head2head":
compare_at_relative_time = plot["comparison"]["relative_time"]
comparison_unit = plot["comparison"]["unit"]
annotate = plot.get("annotate", True)
# the comparison data will be a double nested dictionary of the strategy indices
comparison_data_raw = self.get_head2head_comparison_data(
aggregation_data, compare_at_relative_time, comparison_unit
)
# if more than half of the comparisons between two strategies are NaN, set all to NaN
for strategy1 in comparison_data_raw.keys():
for strategy2 in comparison_data_raw[strategy1].keys():
comparison = comparison_data_raw[strategy1][strategy2]
if len([v for v in comparison if np.isnan(v)]) > ceil(0.5 * len(comparison)):
comparison_data_raw[strategy1][strategy2] = [np.nan] * len(comparison)
# convert the comparison data dictionary to a 2D numpy array of means
comparison_data = np.array(
[
[
np.nanmean(comparison_data_raw[strategy1][strategy2])
for strategy2 in comparison_data_raw[strategy1].keys()
]
for strategy1 in comparison_data_raw.keys()
]
).transpose()
# set up the plot
fig, axs = plt.subplots(ncols=1, figsize=(8, 6), dpi=300)
if not hasattr(axs, "__len__"):
axs = [axs]
ax = axs[0]
title = f"Head-to-head comparison of strategies at {compare_at_relative_time} relative time"
fig.canvas.manager.set_window_title(title)
if not save_figs:
fig.suptitle(title)
# # set the x and y labels
# if comparison_unit == "time":
# ax.set_xlabel("How much time do these strategies take...")
# elif comparison_unit == "objective":
# ax.set_xlabel("How much objective value do these strategies achieve...")
# ax.set_ylabel("...relative to these strategies?")
# ax.xaxis.set_label_position('top')
# set the x and y ticks
x_ticks = list(comparison_data_raw.keys())
y_ticks = list(comparison_data_raw.keys())
# Show all ticks and label them with the respective list entries
ax.set_xticks(range(len(x_ticks)), labels=x_ticks, rotation=-15, ha="right", rotation_mode="anchor")
ax.set_yticks(range(len(y_ticks)), labels=y_ticks, rotation=-30, ha="right", rotation_mode="anchor")
ax.xaxis.tick_top()
# set the color map
vmin = 0.0
vmax = 1000.0
def norm_color_val(v):
"""Normalize a color value to fit in the 0-1 range."""
return (v - vmin) / (vmax - vmin)
if comparison_unit == "time":
cmap = LinearSegmentedColormap.from_list(
"head2head_colormap",
[
(norm_color_val(vmin), "darkgreen"),
(norm_color_val(100.0), "greenyellow"),
(norm_color_val(200.0), "orange"),
(norm_color_val(500.0), "red"),
(norm_color_val(800.0), "darkred"),
(norm_color_val(vmax), "black"),
],
)
elif comparison_unit == "objective":
cmap = LinearSegmentedColormap.from_list(
"head2head_colormap",
[
(norm_color_val(vmin), "darkred"),
(norm_color_val(80.0), "yellow"),
(norm_color_val(100.0), "greenyellow"),
(norm_color_val(200.0), "green"),
(norm_color_val(vmax), "darkgreen"),
],
)
# if there are any values above the vmax, warn
if np.any(comparison_data > vmax):
warnings.warn(
f"There are values above the vmax ({vmax}) in the comparison data: {comparison_data[comparison_data > vmax]}, these are clipped"
)
# clip the comparison data to the vmin-vmax range
comparison_data_clipped = np.clip(comparison_data, vmin, vmax)
# plot the comparison data
im = ax.imshow(
comparison_data_clipped,
vmin=vmin,
vmax=vmax,
aspect="auto",
cmap=cmap,
)
# set the colorbar
# cmin = np.nanmin(comparison_data_clipped)
cmin = vmin # always show 0.0 as the start
max_val = np.nanmax(comparison_data_clipped)
if np.isnan(max_val):
max_val = vmax
# round to the nearest 100
cmax = round(ceil(max_val), -2)
if cmax < max_val:
cmax += 100 # ensure the colorbar max is above the max value
cnum = round(cmax / 100) + 1
cbar = ax.figure.colorbar(im, ax=ax)
if cmin != vmin or cmax != vmax:
cbar.set_ticks(np.linspace(cmin, cmax, num=cnum)) # set colorbar limits
cbar.ax.set_ylim(cmin, cmax) # adjust visible colorbar limits
if comparison_unit == "time":
cbar.ax.set_ylabel(
"Time difference to same objective value (lower is better)", rotation=-90, va="bottom"
)
elif comparison_unit == "objective":
cbar.ax.set_ylabel(
"Objective value difference at same time (higher is better)", rotation=-90, va="bottom"
)
else:
raise NotImplementedError(f"Comparison unit '{comparison_unit}' not implemented")
# loop over data dimensions and create text annotations
if annotate:
for i in range(len(x_ticks)):
for j in range(len(y_ticks)):
number = comparison_data[i, j]
if np.isnan(number):
continue
text = ax.text(
j,
i,
f"{round(number, 1) if number < 100 else round(number)}%",
ha="center",
va="center",
color="white" if (number > 200 or number < 50) else "black",
fontsize="small",
)
# plot the averages per strategy as labels under the heatmap
averages = np.nanmean(comparison_data, axis=0)
# add "mean" before the averages
ax.text(-0.5, len(y_ticks) - 0.2, "Mean:", ha="right", va="center", color="black", fontsize=10)
for i, avg in enumerate(averages):
ax.text(
i,
len(y_ticks) - 0.2,
f"{'NaN' if np.isnan(avg) else round(avg, 1) if avg < 100 else round(avg)}%",
ha="center",
va="center",
color="black",
fontsize="small",
)
print(
f"Averages per strategy at {compare_at_relative_time} relative time: {[(s, a) for s, a in zip(x_ticks, averages)]}"
)
# finalize the figure and save or display it
fig.tight_layout()
if save_figs:
filename_path = Path(self.plot_filename_prefix) / f"head2head_comparison_{comparison_unit}"
fig.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
# plot the aggregation
if style == "line" and (continue_after_comparison or not (compare_baselines or compare_split_times)):
# fig, axs = plt.subplots(
# ncols=1, figsize=(6.8, 4.0), dpi=300
# ) # if multiple subplots, pass the axis to the plot function with axs[0] etc.
fig, axs = plt.subplots(
ncols=1, figsize=(8.5, 5.0), dpi=300
) # if multiple subplots, pass the axis to the plot function with axs[0] etc.
if not hasattr(axs, "__len__"):
axs = [axs]
title = f"""Aggregated Data\napplications:
{", ".join(self.experiment["experimental_groups_defaults"]["applications_names"])}\nGPUs: {", ".join(self.experiment["experimental_groups_defaults"]["gpus"])}"""
fig.canvas.manager.set_window_title(title)
if not save_figs:
fig.suptitle(title)
# finalize the figure and save or display it
lowest_real_y_value = self.plot_strategies_aggregated(
axs[0],
aggregation_data,
visualization_settings=self.experiment["visualization_settings"],
plot_settings=plot,
)
if vmin is not None:
if isinstance(vmin, (int, float)):
axs[0].set_ylim(bottom=vmin)
elif vmin == "real":
axs[0].set_ylim(bottom=lowest_real_y_value - (abs(lowest_real_y_value) + 1.0) * 0.02)
else:
raise NotImplementedError(f"{vmin=} not implemented")
fig.tight_layout()
if save_figs:
filename_path = Path(self.plot_filename_prefix) / "aggregated"
fig.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
[docs]
def plot_baselines_comparison(
self,
time_range: np.ndarray,
searchspace_stats: SearchspaceStatistics,
objective_time_keys: list,
strategies_curves: list[Curve],
confidence_level: float,
title: str = None,
save_fig=False,
):
"""Plots a comparison of baselines on a time range.
Optionally also compares against strategies listed in strategies_curves.
Args:
time_range: range of time to plot on.
searchspace_stats: Searchspace statistics object.
objective_time_keys: objective time keys.
strategies_curves: the strategy curves to draw in the plot.
confidence_level: the confidence interval used for the confidence / prediction interval.
title: the title for this plot, if not given, a title is generated. Defaults to None.
save_fig: whether to save the resulting figure to file. Defaults to False.
"""
dist = searchspace_stats.objective_performances_total_sorted
plt.figure(figsize=(9, 7), dpi=300)
# list the baselines to test
baselines: list[Baseline] = list()
# baselines.append(
# RandomSearchCalculatedBaseline(searchspace_stats, include_nan=False, time_per_feval_operator="median")
# )
# baselines.append(RandomSearchCalculatedBaseline(searchspace_stats, time_per_feval_operator="mean"))
# baselines.append(
# RandomSearchCalculatedBaseline(
# searchspace_stats, include_nan=True, time_per_feval_operator="median_per_feval"
# )
# )
baselines.append(
ExecutedStrategyBaseline(
searchspace_stats, strategy=strategies_curves[0], confidence_level=confidence_level
)
)
# plot random baseline implementations
for baseline in baselines:
timecurve = baseline.get_curve_over_time(time_range)
print(f"{baseline.label}: {timecurve[-1]}")
plt.plot(time_range, timecurve, label=baseline.label)
# plot normal strategies
for strategy_curve in strategies_curves:
if strategy_curve.name in self.plot_skip_strategies:
continue
(
_,
x_axis_range_real,
curve_real,
curve_lower_err_real,
curve_upper_err_real,
x_axis_range_fictional,
curve_fictional,
curve_lower_err_fictional,
curve_upper_err_fictional,
) = strategy_curve.get_curve_over_time(time_range, dist=dist, confidence_level=confidence_level)
# when adding error shades to visualization, don't forget to pass confidence interval to get_curve_over_time
plt.plot(x_axis_range_real, curve_real, label=strategy_curve.display_name, linestyle="dashed")
if x_axis_range_fictional.ndim > 0:
plt.plot(x_axis_range_fictional, curve_fictional, linestyle="dotted")
# finalize the plot
if title is not None:
plt.title(title)
plt.xlabel(self.get_x_axis_label("time", objective_time_keys))
plt.ylabel(self.y_metric_displayname["objective_absolute"])
plt.xlim(time_range[0], time_range[-1])
plt.legend()
plt.tight_layout()
# write to file or show
if save_fig:
filename_path = Path(self.plot_filename_prefix) / f"{title}_baselines".replace(" ", "_")
plt.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
[docs]
def plot_split_times_comparison(
self,
x_type: str,
fevals_or_time_range: np.ndarray,
searchspace_stats: SearchspaceStatistics,
objective_time_keys: list,
title: str = None,
strategies_curves: list[Curve] = list(),
save_fig=False,
):
"""Plots a comparison of split times for strategies and baselines over the given range.
Args:
x_type: the type of ``fevals_or_time_range``.
fevals_or_time_range: the time or function evaluations range to plot on.
searchspace_stats: the Searchspace statistics object.
objective_time_keys: the objective time keys.
title: the title for this plot, if not given, a title is generated. Defaults to None.
strategies_curves: the strategy curves to draw in the plot. Defaults to list().
save_fig: whether to save the resulting figure to file. Defaults to False.
Raises:
ValueError: on unexpected strategies curve instance.
"""
# list the baselines to test
baselines: list[Baseline] = list()
# baselines.append(
# RandomSearchCalculatedBaseline(
# searchspace_stats, include_nan=True, time_per_feval_operator="median_per_feval"
# )
# )
lines: list[CurveBasis] = strategies_curves + baselines
for line in lines:
if isinstance(line, Curve) and line.name in self.plot_skip_strategies:
lines.remove(line)
# setup the subplots
num_rows = len(lines)
fig, axs = plt.subplots(nrows=num_rows, ncols=1, figsize=(9, 3 * num_rows), sharex=True)
if not hasattr(
axs, "__len__"
): # if there is just one subplot, wrap it in a list so it can be passed to the plot functions
axs = [axs]
if title is not None:
fig.canvas.manager.set_window_title(title)
fig.suptitle(title)
labels = list(key for key in objective_time_keys)
# plot the baselines and strategies
for ax_index, line in enumerate(lines):
ax = axs[ax_index]
if isinstance(line, Curve):
curvetitle = line.display_name
elif isinstance(line, Baseline):
curvetitle = line.label
else:
raise ValueError(f"Expected Curve or Baseline instance, but line is {type(line)}")
split_times = line.get_split_times(fevals_or_time_range, x_type, searchspace_stats)
ax.set_title(curvetitle)
ax.stackplot(fevals_or_time_range, split_times, labels=labels)
ax.set_ylabel(self.get_x_axis_label("time", objective_time_keys))
ax.set_xlim(fevals_or_time_range[0], fevals_or_time_range[-1])
# plot the mean
mean = np.mean(np.sum(split_times, axis=0))
ax.axhline(y=mean, label="Mean sum")
if isinstance(line, Baseline):
average_time_per_feval_used = searchspace_stats.get_time_per_feval(line.time_per_feval_operator)
ax.axhline(y=average_time_per_feval_used, label="Average used")
print(f"{curvetitle} mean: {round(mean, 3)}, average used: {round(average_time_per_feval_used, 3)}")
else:
print(f"{curvetitle} mean: {round(mean, 3)}")
# finalize the plot
handles, labels = ax.get_legend_handles_labels()
fig.legend(handles, labels)
fig.supxlabel(self.get_x_axis_label(x_type, objective_time_keys))
fig.tight_layout()
# write to file or show
if save_fig:
filename_path = Path(self.plot_filename_prefix) / f"{title}_split_times_{x_type}".replace(" ", "_")
plt.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
[docs]
def plot_split_times_bar_comparison(
self,
x_type: str,
fevals_or_time_range: np.ndarray,
searchspace_stats: SearchspaceStatistics,
objective_time_keys: list[str],
title: str = None,
strategies_curves: list[Curve] = list(),
print_table_format=True,
print_skip=["validation"],
save_fig=False,
):
"""Plots a bar chart comparison of the average split times for strategies over the given range.
Args:
x_type: the type of ``fevals_or_time_range``.
fevals_or_time_range: the time or function evaluations range to plot on.
searchspace_stats: the Searchspace statistics object.
objective_time_keys: the objective time keys.
title: the title for this plot, if not given, a title is generated. Defaults to None.
strategies_curves: the strategy curves to draw in the plot. Defaults to list().
print_table_format: print a LaTeX-formatted table. Defaults to True.
print_skip: list of ``time_keys`` to be skipped in the printed table. Defaults to ["verification_time"].
save_fig: whether to save the resulting figure to file. Defaults to False.
"""
fig, ax = plt.subplots(dpi=200)
width = 0.5
strategy_labels = list()
for print_skip_key in print_skip:
assert (
print_skip_key in objective_time_keys
), f"Each key in print_skip must be in objective_time_keys, {print_skip_key} is not ({objective_time_keys})"
# get a dictionary of {time_key: [array_average_time_per_strategy]}
data_dict = dict.fromkeys(objective_time_keys)
data_table = list(
list(list() for _ in range(len(objective_time_keys) - len(print_skip)))
for _ in range((len(strategies_curves) - len(self.plot_skip_strategies)) + 1)
)
for objective_time_key in objective_time_keys:
data_dict[objective_time_key] = np.full((len(strategies_curves)), np.nan)
for strategy_index, strategy_curve in enumerate(strategies_curves):
if strategy_curve.name in self.plot_skip_strategies:
continue
print_skip_counter = 0
strategy_labels.append(strategy_curve.display_name)
strategy_split_times = strategy_curve.get_split_times(fevals_or_time_range, x_type, searchspace_stats)
# print(f"{strategy_curve.display_name}: ({strategy_split_times.shape})")
for objective_time_key_index, objective_time_key in enumerate(objective_time_keys):
key_split_times = strategy_split_times[objective_time_key_index]
key_split_times = key_split_times[key_split_times > 0]
split_time = max(np.median(key_split_times), 0.0)
split_time = 0.0 if np.isnan(split_time) else split_time
data_dict[objective_time_key][strategy_index] = split_time
# print(
# f""" {objective_time_key}: {key_split_times[key_split_times > 0].shape},
# {np.mean(key_split_times)}, {np.median(key_split_times)}"""
# )
if objective_time_key not in print_skip:
if strategy_index == 0:
data_table[0][objective_time_key_index - print_skip_counter] = objective_time_key.replace(
"_", " "
)
data_table[strategy_index + 1][objective_time_key_index - print_skip_counter] = str(
"%.3g" % split_time
)
else:
print_skip_counter += 1
# print in table format
if print_table_format:
print("")
num_times = len(data_table[0])
print("\\begin{tabularx}{\linewidth}{l" + "|X" * num_times + "}")
print("\hline")
header = "} & \\textbf{".join(data_table[0])
print("\\textbf{Algorithm} & \\textbf{" + header + "} \\" + "\\")
print("\hline")
for strategy_index in range(len(strategy_labels)):
print(
f" {strategy_labels[strategy_index]} & {' & '.join(data_table[strategy_index + 1])} \\\\\hline"
)
print("\end{tabularx}")
# plot the bars
bottom = np.zeros(len(strategies_curves))
for objective_time_key, objective_times in data_dict.items():
objective_times = np.array(objective_times)
ax.bar(strategy_labels, objective_times, width, label=objective_time_key, bottom=bottom)
bottom += objective_times
# finalize the plot
ax.set_ylabel(self.get_x_axis_label(x_type, objective_time_keys))
ax.legend()
# ax.set_title(title)
# fig.supxlabel("Median split times per optimization algorithm")
fig.tight_layout()
# write to file or show
if save_fig:
filename_path = Path(self.plot_filename_prefix) / f"{title}_split_times_bar".replace(" ", "_")
plt.savefig(filename_path, dpi=300, bbox_inches="tight", pad_inches=0.01)
print(f"Figure saved to {filename_path}")
else:
plt.show()
[docs]
def get_head2head_comparison_data(
self, aggregation_data: dict, compare_at_relative_time: float, comparison_unit: str
) -> dict:
"""Gets the data for a head-to-head comparison of strategies across all searchspaces."""
# the comparison data will be a double nested dictionary of the strategy indices
comparison_data = dict()
for strategy_alpha in self.strategies:
comparison_data[strategy_alpha["display_name"]] = dict()
for strategy_beta in self.strategies:
comparison_data[strategy_alpha["display_name"]][strategy_beta["display_name"]] = list()
# iterate over the searchspaces and strategies to get head2head data
for gpu_name in self.experiment["experimental_groups_defaults"]["gpus"]:
for application_name in self.experiment["experimental_groups_defaults"]["applications_names"]:
print(f" | visualizing head2head of {application_name} for {gpu_name}")
# unpack the aggregation data
_, strategies_curves, searchspace_stats, time_range, _ = aggregation_data[
get_aggregation_data_key(gpu_name=gpu_name, application_name=application_name)
]
# get the head2head comparison data
comparison_data_ss = self.get_head2head_comparison_data_searchspace(
"time",
compare_at_relative_time,
comparison_unit,
searchspace_stats,
strategies_curves,
time_range,
)
# for this searchspace, append each strategy's data to the comparison data
for strategy_index_alpha, strategy_alpha in enumerate(self.strategies):
for strategy_index_beta, strategy_beta in enumerate(self.strategies):
comparison_data[strategy_alpha["display_name"]][strategy_beta["display_name"]].append(
comparison_data_ss[strategy_index_alpha][strategy_index_beta]
)
return comparison_data
[docs]
def get_head2head_comparison_data_searchspace(
self,
x_type: str,
compare_at_relative_time: float,
comparison_unit: str,
searchspace_stats: SearchspaceStatistics,
strategies_curves: list[Curve],
x_axis_range: np.ndarray,
) -> dict:
"""Gets the data for a head-to-head comparison of strategies on a specific searchspace.
Args:
x_type: the type of ``x_axis_range``.
compare_at_relative_time: the relative point in time to compare at, between 0.0 and 1.0.
comparison_unit: the unit to compare with, 'time' or 'objective'.
searchspace_stats: the Searchspace statistics object.
strategies_curves: the strategy curves to draw in the plot.
x_axis_range: the time or function evaluations range to plot on.
Returns:
A doubly-nested dictionary with strategy names as keys and how much better outer performs relative to inner.
"""
comparison_point = x_axis_range[-1] * compare_at_relative_time
comparison_data = dict()
confidence_level = 0.95 # irrelevant because the confidence intervals are not used
minimization = searchspace_stats.minimization
dist = searchspace_stats.objective_performances_total_sorted
for strategy_index_alpha, strategy_alpha in enumerate(self.strategies):
inner_comparison_data = dict()
strategy_curve_alpha = strategies_curves[strategy_index_alpha]
_, time_range_alpha, curve_alpha, _, _ = strategy_curve_alpha.get_curve(
x_axis_range, x_type, dist=dist, confidence_level=confidence_level, return_split=False
)
# find the index of the closest time and performance to the comparison point
closest_index_alpha = np.argmin(np.abs(time_range_alpha - comparison_point))
time_at_comparison_alpha = time_range_alpha[closest_index_alpha]
performance_at_comparison_alpha = curve_alpha[closest_index_alpha]
absolute_optimum = searchspace_stats.total_performance_absolute_optimum()
median = searchspace_stats.total_performance_median()
def normalize(val):
"""Min-max normalization of the performance value."""
if absolute_optimum == median:
return 0.0
return (val - median) / (absolute_optimum - median)
performance_at_comparison_alpha_norm = normalize(performance_at_comparison_alpha)
# compare against all other strategies
for strategy_index_beta, strategy_beta in enumerate(self.strategies):
if strategy_index_alpha == strategy_index_beta:
inner_comparison_data[strategy_index_beta] = np.nan
continue
strategy_curve_beta = strategies_curves[strategy_index_beta]
_, time_range_beta, curve_beta, _, _ = strategy_curve_beta.get_curve(
x_axis_range, x_type, dist=dist, confidence_level=confidence_level, return_split=False
)
# calculate the relative difference between the two strategies at the comparison point
if comparison_unit == "time":
# given the performance at `compare_at_relative_time`, what is the index of the first time that strategy beta reaches at least the same performance?
index_matching = (
np.argwhere(curve_beta <= performance_at_comparison_alpha)
if minimization
else np.argwhere(curve_beta >= performance_at_comparison_alpha)
)
if index_matching.size == 0:
# if strategy beta never reaches the performance of strategy alpha, we cannot compare, instead we take the time at the end so we know what the minimal performance gain is
time_at_comparison_beta = time_range_beta[-1]
# another alternative: take the last time * fraction of inverse (e.g. if GA-nc doesn’t find the objective of GA, take end-of-time * 1/([GA-to-GAnc]/100))
# inner_comparison_data[strategy_index_beta] = np.nan
# continue
else:
# get the time at which strategy beta reaches the performance of strategy alpha
closest_index_beta = index_matching[0][0] # take the first match
time_at_comparison_beta = time_range_beta[closest_index_beta]
# given the performance at `compare_at_relative_time`, how much longer does strategy beta take to get to the same performance compared to strategy alpha? (lower is better)
# closest_index_beta = np.argmin(np.abs(curve_beta - performance_at_comparison_alpha))
# time_at_comparison_beta = time_range_beta[closest_index_beta]
# outer takes X% of the time inner takes to reach the same performance (100%+percentage change)
percentage_change = (
(time_at_comparison_alpha - time_at_comparison_beta) / abs(time_at_comparison_beta) * 100
)
inner_comparison_data[strategy_index_beta] = 100 + percentage_change
elif comparison_unit == "objective":
# given the time at `compare_at_relative_time`, how much worse is the objective value of strategy beta at that moment compared to strategy alpha? (higher is better)
closest_index_beta = np.argmin(np.abs(time_range_beta - time_at_comparison_alpha))
performance_at_comparison_beta = curve_beta[closest_index_beta]
performance_at_comparison_beta_norm = normalize(performance_at_comparison_beta)
# percentage_change = (performance_at_comparison_beta - performance_at_comparison_alpha) / abs(performance_at_comparison_beta) * 100
# if not minimization:
# percentage_change = -percentage_change
percentage_change_norm = (
(performance_at_comparison_beta_norm - performance_at_comparison_alpha_norm)
/ abs(performance_at_comparison_beta_norm)
* 100
)
inner_comparison_data[strategy_index_beta] = 100 + percentage_change_norm
else:
raise ValueError(f"Invalid comparison unit: {comparison_unit}. Expected 'time' or 'objective'.")
comparison_data[strategy_index_alpha] = inner_comparison_data
return comparison_data
[docs]
def plot_strategies(
self,
style: str,
x_type: str,
y_type: str,
ax: plt.Axes,
searchspace_stats: SearchspaceStatistics,
strategies_curves: list[Curve],
x_axis_range: np.ndarray,
plot_settings: dict,
baseline_curve: Baseline = None,
baselines_extra: list[Baseline] = list(),
plot_errors=True,
plot_cutoffs=False,
):
"""Plots all optimization strategies for individual search spaces.
Args:
style: the style of plot, either 'line' or 'scatter'.
x_type: the type of ``x_axis_range``.
y_type: the type of plot on the y-axis.
ax: the axis to plot on.
searchspace_stats: the Searchspace statistics object.
strategies_curves: the strategy curves to draw in the plot. Defaults to list().
x_axis_range: the time or function evaluations range to plot on.
plot_settings: dictionary of additional plot settings.
baseline_curve: the ``Baseline`` to be used as a baseline in the plot. Defaults to None.
baselines_extra: additional ``Baseline`` curves to compare against. Defaults to list().
plot_errors: whether errors (confidence / prediction intervals) are visualized. Defaults to True.
plot_cutoffs: whether the cutoff points for early stopping algorithms are visualized. Defaults to False.
"""
confidence_level: float = plot_settings.get("confidence_level", 0.95)
absolute_optimum = searchspace_stats.total_performance_absolute_optimum()
median = searchspace_stats.total_performance_median()
def normalize(curve):
"""Min-max normalization with median as min and absolute optimum as max."""
if curve is None:
return None
return (curve - median) / (absolute_optimum - median)
def normalize_multiple(curves: list) -> tuple:
"""Normalize multiple curves at once."""
return tuple(normalize(curve) for curve in curves)
# plot the absolute optimum
absolute_optimum_y_value = absolute_optimum if y_type == "absolute" or style == "scatter" else 1
absolute_optimum_label = (
"Absolute optimum ({})".format(round(absolute_optimum, 3)) if y_type == "absolute" else "Absolute optimum"
)
ax.axhline(absolute_optimum_y_value, c="black", ls="-.", label=absolute_optimum_label)
# plot baseline
if baseline_curve is not None:
if y_type == "baseline":
ax.axhline(0, label="baseline trajectory", color="black", ls="--")
elif y_type == "normalized" or y_type == "baseline" or y_type == "absolute":
baseline = baseline_curve.get_curve(x_axis_range, x_type)
if absolute_optimum in baseline:
raise ValueError(
f"The optimum {absolute_optimum} is in the baseline, this will cause zero division problems"
)
# cut_at_index = np.argmax(baseline == absolute_optimum)
# baseline = baseline[:cut_at_index]
# x_axis_range = x_axis_range[:cut_at_index]
if y_type == "normalized":
baseline = normalize(baseline)
ax.plot(x_axis_range, baseline, label="Calculated baseline", color="black", ls="--")
# plot additional baselines if provided
baselines_extra_curves = list()
for baseline_extra in baselines_extra:
curve = baseline_extra.get_curve(x_axis_range, x_type)
if y_type == "normalized":
curve = normalize(curve)
elif y_type == "baseline":
curve = baseline_curve.get_standardised_curve(x_axis_range, curve, x_type=x_type)
ax.plot(x_axis_range, curve, label=baseline_extra.label, ls=":")
baselines_extra_curves.append(curve)
if len(baselines_extra) >= 2:
ax.plot(x_axis_range, np.mean(baselines_extra_curves, axis=0), label="Mean of extra baselines", ls=":")
# plot each strategy
dist = searchspace_stats.objective_performances_total_sorted
ylim_min = 0
for strategy_index, strategy in enumerate(self.strategies):
if "hide" in strategy.keys() and strategy["hide"]:
if strategy["name"] not in self.plot_skip_strategies:
self.plot_skip_strategies.append(strategy["name"])
continue
# get the data
color = self.colors[strategy_index]
label = f"{strategy['display_name']}"
strategy_curve = strategies_curves[strategy_index]
if strategy_curve.name in self.plot_skip_strategies:
continue
# get the plot data
if style == "scatter":
x_axis, y_axis = strategy_curve.get_scatter_data(x_type)
ax.scatter(x_axis, y_axis, label=label, color=color)
continue
else:
(
real_stopping_point,
x_axis_range_real,
curve_real,
curve_lower_err_real,
curve_upper_err_real,
x_axis_range_fictional,
curve_fictional,
curve_lower_err_fictional,
curve_upper_err_fictional,
) = strategy_curve.get_curve(x_axis_range, x_type, dist=dist, confidence_level=confidence_level)
# transform the curves as necessary and set ylims
if y_type == "normalized":
curve_real, curve_lower_err_real, curve_upper_err_real = normalize_multiple(
[curve_real, curve_lower_err_real, curve_upper_err_real]
)
curve_fictional, curve_lower_err_fictional, curve_upper_err_fictional = normalize_multiple(
[curve_fictional, curve_lower_err_fictional, curve_upper_err_fictional]
)
elif y_type == "baseline":
curve_real, curve_lower_err_real, curve_upper_err_real = baseline_curve.get_standardised_curves(
x_axis_range_real, [curve_real, curve_lower_err_real, curve_upper_err_real], x_type
)
if x_axis_range_fictional.ndim > 0 and x_axis_range_fictional.shape[0] > 0:
(
curve_fictional,
curve_lower_err_fictional,
curve_upper_err_fictional,
) = baseline_curve.get_standardised_curves(
x_axis_range_fictional,
[curve_fictional, curve_lower_err_fictional, curve_upper_err_fictional],
x_type,
)
ylim_min = min(np.min(curve_real), ylim_min)
# visualize
if plot_errors:
ax.fill_between(
x_axis_range_real,
curve_lower_err_real,
curve_upper_err_real,
alpha=0.15,
antialiased=True,
color=color,
)
ax.plot(x_axis_range_real, curve_real, label=label, color=color)
# select the parts of the data that are fictional
if real_stopping_point < x_axis_range.shape[-1]:
# visualize fictional part
if plot_errors:
ax.fill_between(
x_axis_range_fictional,
curve_lower_err_fictional,
curve_upper_err_fictional,
alpha=0.15,
antialiased=True,
color=color,
ls="dashed",
)
ax.plot(x_axis_range_fictional, curve_fictional, color=color, ls="dashed")
# # plot cutoff point
# def plot_cutoff_point(cutoff_percentiles: np.ndarray, show_label=True):
# """ plot the cutoff point """
# cutoff_point_values = list()
# cutoff_point_fevals = list()
# for cutoff_percentile in cutoff_percentiles:
# cutoff_point_value, cutoff_point_feval = searchspace_stats.cutoff_point(cutoff_percentile)
# cutoff_point_values.append(cutoff_point_value)
# cutoff_point_fevals.append(cutoff_point_feval)
# # get the correct value depending on the plot type
# if y_type == 'absolute':
# y_values = cutoff_point_values
# elif y_type == 'normalized':
# y_values = normalize(cutoff_point_values)
# elif y_type == 'baseline':
# y_values = baseline_curve.get_standardised_curve_over_fevals(fevals_range, cutoff_percentiles)
# # plot
# label = f"cutoff point" if show_label else None
# ax.plot(cutoff_point_fevals, y_values, marker='o', color='red', label=label)
# # test a range of cutoff percentiles to see if they match with random search
# if y_type == 'absolute' or y_type == 'normalized':
# cutoff_percentile_start = self.experiment.get("cutoff_percentile_start", 0.01)
# cutoff_percentile_end = self.experiment.get("cutoff_percentile")
# cutoff_percentiles_low_precision = np.arange(cutoff_percentile_start, 0.925, step=0.05)
# cutoff_percentiles_high_precision = np.arange(0.925, cutoff_percentile_end, step=0.001)
# plot_cutoff_point(np.concatenate([cutoff_percentiles_low_precision, cutoff_percentiles_high_precision]))
# finalize the plot
ax.set_xlim(tuple([x_axis_range[0], x_axis_range[-1]]))
ax.set_ylabel(self.y_metric_displayname[f"objective_{y_type}"], fontsize="large")
normalized_ylim_margin = 0.02
if y_type == "absolute":
# multiplier = 0.99 if self.minimization else 1.01
# ax.set_ylim(absolute_optimum * multiplier, median)
# ax.set_ylim(1.0)
pass
# elif y_type == 'normalized':
# ax.set_ylim((0.0, 1 + normalized_ylim_margin))
elif y_type == "baseline":
ax.set_ylim((min(-normalized_ylim_margin, ylim_min - normalized_ylim_margin), 1 + normalized_ylim_margin))
[docs]
def plot_strategies_aggregated(
self,
ax: plt.Axes,
aggregation_data,
visualization_settings: dict = {},
plot_settings: dict = {},
) -> float:
"""Plots all optimization strategies combined accross search spaces.
Args:
ax: the axis to plot on.
aggregation_data: the aggregated data from the various searchspaces.
visualization_settings: dictionary of additional visualization settings.
plot_settings: dictionary of additional visualization settings related to this particular plot.
Returns:
The lowest performance value of the real stopping point for all strategies.
"""
# plot the random baseline and absolute optimum
ax.axhline(0, label="Calculated baseline", c="black", ls=":")
ax.axhline(1, label="Absolute optimum", c="black", ls="-.")
# get the relative aggregated performance for each strategy
confidence_level: float = visualization_settings.get("confidence_level", 0.95)
(
strategies_performance,
strategies_lower_err,
strategies_upper_err,
strategies_real_stopping_point_fraction,
) = get_strategies_aggregated_performance(list(aggregation_data.values()), confidence_level)
# get the relevant plot settings
cutoff_percentile: float = self.experiment["statistics_settings"].get("cutoff_percentile", 1)
cutoff_percentile_start: float = self.experiment["statistics_settings"].get("cutoff_percentile_start", 0.01)
xlabel = plot_settings.get(
"xlabel",
f"{self.x_metric_displayname['aggregate_time']} ({cutoff_percentile_start * 100}% to {cutoff_percentile * 100}%)",
) # noqa: E501
ylabel = plot_settings.get("ylabel", self.y_metric_displayname["aggregate_objective"])
tmin = plot_settings.get("tmin", 1.0)
# setup the plot
y_axis_size = strategies_performance[0].shape[0]
time_range = np.arange(y_axis_size)
plot_errors = True
lowest_real_y_value = 0.0
print("\n-------")
print("Quantification of aggregate performance across all search spaces:")
# get the highest real_stopping_point_index, adjust y_axis_size and time_range if necessary
real_stopping_point_indices = [
min(
round(strategies_real_stopping_point_fraction[strategy_index] * time_range.shape[0]) + 1,
time_range.shape[0],
)
for strategy_index in range(len(strategies_performance))
] # noqa: E501
real_stopping_point_index_max = max(real_stopping_point_indices)
if tmin == "real":
# stop the time at the largest real stopping point
if real_stopping_point_index_max < y_axis_size:
y_axis_size = real_stopping_point_index_max
print(f" adjusted stopping point index: {real_stopping_point_index_max}/{y_axis_size}")
time_range = np.arange(y_axis_size)
elif tmin < 1.0:
# stop the time at the given tmin
y_axis_size = y_axis_size * tmin
time_range = np.arange(y_axis_size)
elif tmin > 1.0:
raise ValueError(f"Invalid {tmin=}, must be between 0.0 and 1.0 or 'real'")
# adjust the xlabel if necessary
if tmin == "real" and "xlabel" not in plot_settings:
xlabel = "Relative time until the last strategy stopped"
# plot each strategy
for strategy_index, strategy_performance in enumerate(strategies_performance):
if self.strategies[strategy_index]["name"] in self.plot_skip_strategies:
continue
displayname = self.strategies[strategy_index]["display_name"]
color = self.colors[strategy_index]
real_stopping_point_index = real_stopping_point_indices[strategy_index]
if real_stopping_point_index <= 1:
warnings.warn(f"Stopping point index for {displayname} is at {real_stopping_point_index}")
continue
# calculate the lowest real_y_value
lowest_real_y_value = min(
lowest_real_y_value,
(
strategy_performance[real_stopping_point_index]
if real_stopping_point_index < time_range.shape[0]
else strategy_performance[time_range.shape[0] - 1]
),
)
assert isinstance(lowest_real_y_value, (int, float)), f"Invalid {lowest_real_y_value=}"
# plot the errors
if plot_errors:
strategy_lower_err = strategies_lower_err[strategy_index]
strategy_upper_err = strategies_upper_err[strategy_index]
ax.fill_between(
time_range[:real_stopping_point_index],
strategy_lower_err[:real_stopping_point_index],
strategy_upper_err[:real_stopping_point_index],
alpha=0.15,
antialiased=True,
color=color,
)
if (
real_stopping_point_index < time_range.shape[0]
and real_stopping_point_index < len(strategy_lower_err) - 1
):
ax.fill_between(
time_range[real_stopping_point_index - 1 : y_axis_size],
strategy_lower_err[real_stopping_point_index - 1 : y_axis_size],
strategy_upper_err[real_stopping_point_index - 1 : y_axis_size],
alpha=0.15,
antialiased=True,
color=color,
ls="dashed",
)
# plot the curve
ax.plot(
time_range[:real_stopping_point_index],
strategy_performance[:real_stopping_point_index],
color=color,
label=displayname,
)
if (
real_stopping_point_index < time_range.shape[0]
and real_stopping_point_index < len(strategy_performance) - 1
):
ax.plot(
time_range[real_stopping_point_index - 1 : y_axis_size],
strategy_performance[real_stopping_point_index - 1 : y_axis_size],
color=color,
ls="dashed",
)
performance_score = round(np.mean(strategy_performance), 3)
performance_score_std = round(np.std(strategy_performance), 3)
print(f" | performance of {displayname}: {performance_score} (±{performance_score_std})")
# set the axis labels
ax.set_xlabel(xlabel, fontsize="large")
ax.set_ylabel(ylabel, fontsize="large")
# set the ticks
if tmin == "real":
ax.set_xticks([], [])
else:
num_ticks = 11
ax.set_xticks(
np.linspace(0, y_axis_size, num_ticks),
np.round(np.linspace(0, tmin, num_ticks), 2),
)
# set the limits and legend
ax.set_ylim(top=1.02)
ax.set_xlim((0, y_axis_size - 1))
ax.legend()
return lowest_real_y_value
[docs]
def get_x_axis_label(self, x_type: str, objective_time_keys: list):
"""Formatter to get the appropriate x-axis label depending on the x-axis type.
Args:
x_type: the type of a range, either time or function evaluations.
objective_time_keys: the objective time keys used.
Raises:
ValueError: when an invalid ``x_type`` is given.
Returns:
The formatted x-axis label.
"""
if x_type == "fevals":
x_label = self.x_metric_displayname[x_type]
elif x_type == "time" and len(objective_time_keys) == len(objective_time_keys_values):
x_label = self.x_metric_displayname["time_total"]
elif x_type == "time":
partials = list(f"{self.x_metric_displayname[f'time_partial_{key}']}" for key in objective_time_keys)
concatenated = ", ".join(partials)
if len(objective_time_keys) > 2:
concatenated = f"\n{concatenated}"
x_label = f"Cumulative time in seconds of {concatenated}"
else:
raise ValueError(f"Invalid {x_type=}")
return x_label
[docs]
def is_ran_as_notebook() -> bool: # pragma: no cover
"""Function to determine if this file is ran from an interactive notebook."""
try:
from IPython import get_ipython
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True # Jupyter notebook or qtconsole
elif shell == "TerminalInteractiveShell":
return False # Terminal running IPython
else:
return False # Other type (?)
except ModuleNotFoundError or NameError:
return False # Probably standard Python interpreter
[docs]
def entry_point(): # pragma: no cover
"""Entry point function for Visualization."""
is_notebook = is_ran_as_notebook()
if is_notebook:
# take the CWD one level up
import os
os.chdir("../")
print(os.getcwd())
experiment_filepath = "test_random_calculated"
# experiment_filepath = "methodology_paper_example"
# %matplotlib widget # IPython magic line that sets matplotlib to widget backend for interactive
else:
experiment_filepath = get_args_from_cli()
Visualize(experiment_filepath, save_figs=not is_notebook)
if __name__ == "__main__":
entry_point()