Commit 87b57421 authored by Philippe SWARTVAGHER's avatar Philippe SWARTVAGHER
Browse files

plot: add class and script to model performances of comm/comp

parent 36a21d8d
import statistics
from comm_comp import FilesParser, CompMetric
class CommCompModel:
"""
A class to build a model from output of bench_openmp and predict
degradation of computations and communications when executed in parallel.
"""
def __init__(self, files, predict_now=True):
# Model parameters:
self.max_together_i = 0
self.max_together_value = 0
self.max_comp_alone_i = 0
self.max_comp_alone_value = 0 # only for comp_alone_model
self.together_when_max_alone = 0
self.impacted_network_ratio = 0
self.degradation_per_core_left = 0
self.degradation_per_core_right = 0
self.comp_mem_req = 0
self.comm_alone_bw = 0
self.xs = []
# Other:
self.total_model = None
self.comm_with_comp_model = None
self.comp_with_comm_model = None
self.comm_alone_model = None
self.comp_alone_model = None
self.x_model = None
parser = FilesParser(files)
self.xs = parser.flatten_results["x"]
comm_with_comp_bw = parser.flatten_results['comm']['with_comp']['bw']['med']
self.comm_alone_bw = comm_with_comp_bw[0] if 'alone' not in parser.flatten_results['comm'] else statistics.mean(parser.flatten_results['comm']['alone']['bw']['med'])
comp_bw_alone = [sum(parser.comp_alone_per_thread_results['memset'][x][CompMetric.BANDWIDTH]['avg']) for x in self.xs]
comp_bw_with_comm = [sum(parser.comp_with_comm_per_thread_results['memset'][x][CompMetric.BANDWIDTH]['avg']) for x in self.xs]
total_parallel = [sum(v) for v in zip(comp_bw_with_comm, comm_with_comp_bw)]
# 1/ Get by how much the network performance can be impacted in the worst case
# In some experiments, the network performances can be slightly better with computations, so make sure
self.impacted_network_ratio = min([comm_with_comp_bw[i] / self.comm_alone_bw for i in range(len(self.xs))])
# 2/ Find what is the maximum memory throughput when both computations and communications are performed:
# We consider performances when comp and comm are executed in parallel, because on inter-processor buses,
# it can lead to a bandwidth higher than computations alone.
for i in range(len(self.xs)):
if comp_bw_alone[i] > self.max_comp_alone_value:
self.max_comp_alone_value = comp_bw_alone[i]
self.max_comp_alone_i = i
for i in range(self.max_comp_alone_i+1):
if total_parallel[i] > self.max_together_value:
self.max_together_value = total_parallel[i]
self.max_together_i = i
self.print_params()
assert(self.max_together_i <= self.max_comp_alone_i)
self.together_when_max_alone = total_parallel[self.max_comp_alone_i]
# Memory controllers (for the RAM or UPI) don't scale well,
# and their performances decrease when there are too many
# parallel requests (too many cores)
# We lineraly model this degradation, with curve change when
# contention starts with even lonely computations. This allows
# for a better model when comm+comp > comp_alone.
if self.max_comp_alone_i > self.max_together_i:
self.degradation_per_core_left = (self.max_together_value - total_parallel[self.max_comp_alone_i]) / (self.xs[self.max_comp_alone_i] - self.xs[self.max_together_i])
self.degradation_per_core_right = (total_parallel[self.max_comp_alone_i] - total_parallel[-1]) / (self.xs[-1] + 1 - self.xs[self.max_comp_alone_i])
# 4/ Characterize computations: what is the memory througput of one core,
# and what the maximum memory throughput the system can deliver:
assert(self.xs[0] == 1)
self.comp_mem_req = comp_bw_alone[0]
self.x_model = list(range(1, self.xs[-1]+1))
self.comm_alone_model = [self.comm_alone_bw] * len(self.x_model)
if predict_now:
self.__predict_total()
self.__predict_comm_comp()
def print_params(self):
print(f"max_together_i: {self.max_together_i:8d} [x={self.xs[self.max_together_i]}]")
print(f"max_together_value: {self.max_together_value:10.1f}")
print(f"max_comp_alone_i: {self.max_comp_alone_i:8d} [x={self.xs[self.max_comp_alone_i]}]")
print(f"max_comp_alone_value: {self.max_comp_alone_value:10.1f}")
print(f"together_when_max_alone: {self.together_when_max_alone:10.1f}")
print(f"impacted_network_ratio: {self.impacted_network_ratio:12.3f}")
print(f"degradation_per_core_left: {self.degradation_per_core_left:10.1f}")
print(f"degradation_per_core_right: {self.degradation_per_core_right:10.1f}")
print(f"comp_mem_req: {self.comp_mem_req:10.1f}")
print(f"comm_alone_bw: {self.comm_alone_bw:10.1f}")
print(f"xs: {self.xs}")
def __predict_total(self):
if self.total_model is not None:
return
# Compute what is the total throughput the memory system can support:
# This is just a trick to know later more easily if we reach the maximum
# or not. The values can be different (higher) than lonely computations,
# for inter-processors links, for instance.
self.total_model = []
for i in range(len(self.x_model)):
if self.x_model[i] <= self.xs[self.max_together_i]:
self.total_model.append(self.max_together_value)
elif self.x_model[i] <= self.xs[self.max_comp_alone_i]:
self.total_model.append(self.max_together_value - (self.degradation_per_core_left * (self.x_model[i]-self.xs[self.max_together_i])))
else:
self.total_model.append(self.together_when_max_alone - (self.degradation_per_core_right * (self.x_model[i]-self.xs[self.max_comp_alone_i])))
def __predict_comm_comp(self):
if self.comm_with_comp_model is not None:
assert(self.comp_with_comm_model is not None)
assert(self.comp_alone_model is not None)
return
self.comm_with_comp_model = []
self.comp_with_comm_model = []
self.comp_alone_model = []
last_comm_ratio_left_value = None
last_comm_ratio_left_i = None
diff = max(0, self.max_together_value - self.max_comp_alone_value)
for i in range(len(self.x_model)):
self.comp_alone_model.append(min(self.comp_mem_req*self.x_model[i], self.total_model[i]))
if self.comp_mem_req*self.x_model[i] + self.impacted_network_ratio*self.comm_alone_bw < self.total_model[i]:
# We didn't reach the memory system limit yet, the begining of the plateau.
# Computation performances scale perfectly, and are not disturbed:
self.comp_with_comm_model.append(self.comp_mem_req*self.x_model[i])
# Communications get the remaining of the system capabilities,
# but are throttled by their performances obtained without computations:
self.comm_with_comp_model.append(min(self.comm_alone_bw, self.total_model[i]-self.comp_with_comm_model[-1]))
last_comm_ratio_left_value = self.comm_with_comp_model[-1] / self.comm_alone_bw
assert(last_comm_ratio_left_value <= 1)
last_comm_ratio_left_i = i
assert(self.x_model[i] <= self.xs[self.max_together_i])
else:
# The communications can be affected, so apply the ratio:
r = self.impacted_network_ratio
if (self.max_comp_alone_i-self.max_together_i) > 1 and self.x_model[i] < self.xs[self.max_comp_alone_i]:
# The plateau of communication degradation isn't reached yet and
# the gap between max_together_i and max_comp_alone_i is large
# (case when sub-numa clustering is on):
# we lineraly approximate the communication degradation between
# the last ratio when system memory limit wasn't reached and when
# it will be reached.
# This is important because we start by modeling bandwidth for
# communications and then we give the remaining to computations.
assert(self.xs[self.max_comp_alone_i] != self.x_model[last_comm_ratio_left_i])
comm_ratio_degradation_per_core = (last_comm_ratio_left_value - self.impacted_network_ratio) / (self.xs[self.max_comp_alone_i] - self.x_model[last_comm_ratio_left_i])
r = last_comm_ratio_left_value - (comm_ratio_degradation_per_core * (self.x_model[i] - self.x_model[last_comm_ratio_left_i]))
self.comm_with_comp_model.append(r*self.comm_alone_bw)
# The remaining of the bandwidth is for computations:
self.comp_with_comm_model.append(self.total_model[i]-self.comm_with_comp_model[i])
import argparse
import glob
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from comm_comp import *
from comm_comp_model import *
from hwloc import *
from merge_images import *
def get_model(comp_kernel, numa_node, nb_cores_to_consider):
files_for_model = [
f"{comp_kernel}/comp_{numa_node}_comm_{numa_node}_{i}_threads.out"
for i in range(1, nb_cores_to_consider+1)
]
return CommCompModel(files_for_model)
cli_parser = argparse.ArgumentParser()
cli_parser.add_argument("comp_kernel", help="computing kernel - folder where the input files are stored")
cli_parser.add_argument("hwloc_file", help="hwloc XML file")
cli_parser.add_argument('--model', action=argparse.BooleanOptionalAction, help="Wether to generate or not the model", default=True)
cli_parser.add_argument('--only-first-socket', action=argparse.BooleanOptionalAction, help="Wether to plot or not only cores of first socket", default=True)
cli_args = cli_parser.parse_args()
if cli_args.model and not cli_args.only_first_socket:
print("Can't combine options --model and --no-only-first-socket")
sys.exit(1)
print("** Parsing Hwloc topology...")
topo = HwlocTopology(cli_args.hwloc_file, ["L3Cache"])
print(f"{topo.nb_sockets} sockets")
print(f"{topo.nb_numa_nodes_total} NUMA nodes")
print(f"{topo.nb_cores_total} cores")
nb_cores_to_consider = topo.nb_cores_per_numa_node * topo.nb_numa_nodes_per_socket
if not cli_args.only_first_socket:
nb_cores_to_consider *= topo.nb_sockets
print(f"Will consider the performance with up to {nb_cores_to_consider} cores")
picture_file_suffix = ".png"
if cli_args.model:
picture_file_suffix = "_model.png"
print("** Modeling local accesses...")
model_local = get_model(cli_args.comp_kernel, 0, nb_cores_to_consider)
model_local.print_params()
print("** Modeling remote accesses...")
model_remote = get_model(cli_args.comp_kernel, topo.nb_numa_nodes_per_socket, nb_cores_to_consider)
model_remote.print_params()
else:
print("** Skipping model building")
print("** Modeling all placement combinaisons...")
max_comp = 0
max_comm = 0
graphs = []
for numa_comp in range(topo.nb_numa_nodes_total):
graphs.append([])
for numa_comm in range(topo.nb_numa_nodes_total):
print(f"Data for computations on NUMA node #{numa_comp} and data for communications on NUMA node #{numa_comm}")
all_files = [
f"{cli_args.comp_kernel}/comp_{numa_comp}_comm_{numa_comm}_{i}_threads.out"
for i in range(1, nb_cores_to_consider+1)
]
try:
parser = FilesParser(all_files)
except Exception as e:
print(f"Error in parsing files: {e}")
if len(graphs) > 0 and len(graphs[0]) > 0:
graphs[numa_comp].append(None)
continue
else:
raise Exception("Can't find file for first configuration, can't continue.")
real_x_values = parser.x_values[:nb_cores_to_consider]
x_span_zone = topo.nb_cores_per_numa_node if topo.nb_numa_nodes_per_socket > 1 else None
graph = CommCompGraph(
real_x_values,
parser.x_type,
CommCompGraphCommType.BANDWIDTH,
parser.compute_bench_type,
parser.compute_bench_type.default_metric,
title=f"Comp {numa_comp} Comm {numa_comm}",
comp_legend_title=cli_args.comp_kernel,
comm_legend_title="Network BW",
x_span_zone=x_span_zone)
if len(parser.comm_bw_alone_results) > 0:
values = [parser.comm_bw_alone_results[x]['d9'] for x in real_x_values]
max_comm = max(max_comm, max(values))
graph.add_comm_curve(
[parser.comm_bw_alone_results[x]['med'] for x in real_x_values],
"alone",
CommCompGraphCurveType.ALONE,
False,
[parser.comm_bw_alone_results[x]['d1'] for x in real_x_values],
values
)
else:
graph.add_comm_curve(
[parser.comm_bw_with_comp_results[1]['med']] * len(real_x_values),
"alone",
CommCompGraphCurveType.ALONE,
False
)
max_comm = max(max_comm, parser.comm_bw_with_comp_results[1]['med'])
values = [parser.comm_bw_with_comp_results[x]['d9'] for x in real_x_values]
max_comm = max(max_comm, max(values))
graph.add_comm_curve(
[parser.comm_bw_with_comp_results[x]['med'] for x in real_x_values],
"while Computations",
CommCompGraphCurveType.PARALLEL,
True,
[parser.comm_bw_with_comp_results[x]['d1'] for x in real_x_values],
values
)
# Decision of which model to apply for communications:
if cli_args.model:
if numa_comp == numa_comm and numa_comp >= topo.nb_numa_nodes_per_socket:
comm_values_model = model_remote.comm_with_comp_model
else:
comm_values_model = model_local.comm_with_comp_model
graph.add_comm_curve(
comm_values_model,
"while Computations - model",
CommCompGraphCurveType.PARALLEL,
True,
display_line=False
)
op = "memset" # TODO: handle other cases
metric = parser.compute_bench_type.default_metric
values = [parser.comp_alone_results[op][x][metric]['max'] for x in real_x_values]
max_comp = max(max_comp, max(values))
graph.add_comp_curve(
[parser.comp_alone_results[op][x][metric]['avg'] for x in real_x_values],
f"{cli_args.comp_kernel} alone",
CommCompGraphCurveType.ALONE,
False,
[parser.comp_alone_results[op][x][metric]['min'] for x in real_x_values],
values
)
graph.add_comp_curve(
[parser.comp_with_comm_results[op][x][metric]['avg'] for x in real_x_values],
f"{cli_args.comp_kernel} while Ping-Pongs",
CommCompGraphCurveType.PARALLEL,
True,
[parser.comp_with_comm_results[op][x][metric]['min'] for x in real_x_values],
[parser.comp_with_comm_results[op][x][metric]['max'] for x in real_x_values]
)
if cli_args.model:
# Decision of which model to apply for computations:
if numa_comp < topo.nb_numa_nodes_per_socket:
# Computations do local accesses:
if numa_comm == numa_comp:
comp_values_model = model_local.comp_with_comm_model
else:
comp_values_model = model_local.comp_alone_model
total_model = model_local.total_model
else:
# Computations do remote acceses:
if numa_comm == numa_comp:
comp_values_model = model_remote.comp_with_comm_model
else:
comp_values_model = model_remote.comp_alone_model
total_model = model_remote.total_model
graph.add_comp_curve(
comp_values_model,
f"{cli_args.comp_kernel} - model",
CommCompGraphCurveType.PARALLEL,
True,
display_line=False
)
graphs[numa_comp].append(graph)
# Stacked plot:
fig = plt.figure(1024, clear=True)
ax = fig.gca()
plt.stackplot(
real_x_values,
[[parser.comp_with_comm_results[op][x][metric]['avg'] for x in real_x_values], [parser.comm_bw_with_comp_results[x]['med'] for x in real_x_values]],
labels=["Comp with comm bw", "Comm with comp bw"],
)
plt.plot(real_x_values, [parser.comp_alone_results[op][x][metric]['avg'] for x in real_x_values], label="Comp alone bw")
if cli_args.model:
plt.plot(real_x_values, comp_values_model, "+", label="Comp with comm bw model")
plt.plot(real_x_values, [sum(x) for x in zip(comp_values_model, comm_values_model)], "+", label="Comm with comp bw model")
plt.plot(real_x_values, total_model, "+", label="Total model")
ax.legend()
ax.grid(axis='y')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
ax.set_axisbelow(True)
ax.set(title=f"Comp {numa_comp} Comm {numa_comm}")
plt.savefig(f"{cli_args.comp_kernel}/comp_{numa_comp}_comm_{numa_comm}_stacked{picture_file_suffix}")
print("** Saving all images...")
max_comp *= 1.1
max_comm *= 1.1
merged_image = ImageMerger(f"{cli_args.comp_kernel}/merged{picture_file_suffix}", topo.nb_numa_nodes_total**2, topo.nb_numa_nodes_total)
merged_stacked_image = ImageMerger(f"{cli_args.comp_kernel}/merged_stack{picture_file_suffix}", topo.nb_numa_nodes_total**2, topo.nb_numa_nodes_total)
for numa_comp in range(topo.nb_numa_nodes_total):
for numa_comm in range(topo.nb_numa_nodes_total):
figname = f"{cli_args.comp_kernel}/comp_{numa_comp}_comm_{numa_comm}{picture_file_suffix}"
if graphs[numa_comp][numa_comm] is not None:
graphs[numa_comp][numa_comm].comm_top_limit = max_comm
graphs[numa_comp][numa_comm].comp_top_limit = max_comp
graphs[numa_comp][numa_comm].save(figname)
merged_image[numa_comm, numa_comp] = figname
merged_stacked_image[numa_comm, numa_comp] = f"{cli_args.comp_kernel}/comp_{numa_comp}_comm_{numa_comm}_stacked{picture_file_suffix}"
merged_image.save()
merged_stacked_image.save()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment