Commit 0ebcfb6e authored by Philippe SWARTVAGHER's avatar Philippe SWARTVAGHER
Browse files

Add option to measure memset throughput instead of bandwidth

Also add a basic program to compare the behaviours when measuring
bandwidth or throughput and a Python script to generate a Paje file
from the output.
parent 18177375
......@@ -88,7 +88,8 @@ if test "$available_avx" != "none"; then
fi
# for non-temporal memset:
AC_CHECK_DECLS([_mm_stream_si32], [], [], [[#include <x86intrin.h>]])
AC_CHECK_DECLS([_mm_stream_si32], [have_mm_stream_si32=yes], [have_mm_stream_si32=no], [[#include <x86intrin.h>]])
AM_CONDITIONAL(HAVE_MM_STREAM_SI32, test x$have_mm_stream_si32 = xyes)
# Compiler
......
import argparse
import math
import pandas as pd
cli_parser = argparse.ArgumentParser()
cli_parser.add_argument("bench_output", help="File containing ouput of basic_openmp_memset benchmark")
cli_parser.add_argument("nb_cores_per_numa", help="Number of cores per NUMA node", type=int)
args = cli_parser.parse_args()
df = new_df = pd.read_csv(args.bench_output, sep='\t', comment="#")
df_starts = df[['Thread', 'Core', 'Time start']]
df_starts['Type'] = "Start"
df_starts.rename(columns={"Time start": "Time"}, inplace=True)
df_ends = df[['Thread', 'Core', 'Time end']]
df_ends['Type'] = "End"
df_ends.rename(columns={"Time end": "Time"}, inplace=True)
final_df = pd.concat([df_starts, df_ends], ignore_index=True).sort_values(by=["Time"])
threads = final_df[["Thread", "Core"]].value_counts().reset_index(name='count')
threads.drop(columns=["count"], inplace=True)
threads.set_index("Thread", inplace=True)
binding_thread_to_core = threads.to_dict()['Core']
nb_numa_nodes = math.ceil(len(binding_thread_to_core) / args.nb_cores_per_numa)
print(f"Guessed {nb_numa_nodes} NUMA nodes, with {args.nb_cores_per_numa} cores each")
s = """
%EventDef PajeDefineContainerType 1
% Alias string
% ContainerType string
% Name string
%EndEventDef
%EventDef PajeDefineStateType 2
% Alias string
% ContainerType string
% Name string
%EndEventDef
%EventDef PajeDefineEntityValue 3
% Alias string
% EntityType string
% Name string
% Color color
%EndEventDef
%EventDef PajeCreateContainer 4
% Time date
% Alias string
% Type string
% Container string
% Name string
%EndEventDef
%EventDef PajeDestroyContainer 5
% Time date
% Name string
% Type string
%EndEventDef
%EventDef PajeSetState 6
% Time date
% Type string
% Container string
% Value string
%EndEventDef
1 CT_Node 0 "Node"
1 CT_NumaNode CT_Node "Numa Node"
1 CT_Core CT_NumaNode "Core"
1 CT_Thread CT_Core "Thread"
2 ST_ThreadState CT_Thread "Thread state"
3 S_1 ST_ThreadState "Idle" "0.500000 0.500000 0.500000"
3 S_2 ST_ThreadState "Memset" "1.000000 0.000000 0.000000"
4 0.000000 C_Node CT_Node 0 "Machine"
"""
for n in range(nb_numa_nodes):
s += '4 0.000000 C_Numa{} CT_NumaNode C_Node "Numa Node {}"\n'.format(n, n)
for th in binding_thread_to_core:
core = binding_thread_to_core[th]
numa_node = core // args.nb_cores_per_numa
s += '4 0.000000 C_Core{} CT_Core C_Numa{} "Core{}"\n'.format(core, numa_node, core)
s += '4 0.000000 C_Thread{} CT_Thread C_Core{} "Thread{}"\n'.format(th, core, th)
for i, e in final_df.iterrows():
if e['Type'] == "Start":
s += '6 {} ST_ThreadState C_Thread{} S_2\n'.format(e["Time"] / 1000, e["Thread"])
elif e['Type'] == "End":
s += '6 {} ST_ThreadState C_Thread{} S_1\n'.format(e["Time"] / 1000, e["Thread"])
last_point = args.bench_output.rfind(".")
output_filename = args.bench_output[:last_point] + ".trace"
with open(output_filename, "w") as f:
f.write(s)
......@@ -299,13 +299,16 @@ class FilesParser:
@classmethod
def _get_bw_time_results(cls, lines, op, nb_threads=None):
def apply_scale(value):
if value == "-":
return None
value = float(value)
if nb_threads is not None:
return value / nb_threads
else:
return value
def line_to_dict(l):
values = list(map(float, l.strip().split()[-6:]))
values = l.strip().split()[-6:]
return {
CompMetric.BANDWIDTH: {
'min': apply_scale(values[0]),
......
......@@ -21,6 +21,21 @@ uncore_get_LDADD = $(LIKWID_LIBS)
endif
if HAVE_MM_STREAM_SI32
bin_PROGRAMS += basic_openmp_memset
basic_openmp_memset_SOURCES = \
basic_openmp_memset.c \
helper.c \
cli.c \
malloc.c \
timing.c \
openmp.c
basic_openmp_memset_CFLAGS = $(OPENMP_CFLAGS) $(HWLOC_CFLAGS) -DCLI_DISABLE_KERNELS=1
basic_openmp_memset_LDADD = $(HWLOC_LIBS)
endif
if HAVE_MPI
bin_PROGRAMS += bench_openmp bench_openmp_freq overlap_openmp
......
/* This program is a minimal version of
* bench_openmp --compute_bench=memset --nt
* to see the difference between measuring the bandwidth or the throughput.
*
* The output has to be converted with plot/basic_openmp_memset2paje.py and can
* then be visualized with ViTE.
**/
#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include <x86intrin.h>
#include "malloc.h"
#include "cli.h"
#define MEMSET_TYPE int
#define NB_ITER 3
#define THROUGHPUT_DURATION 1500000 // 1.5 sec
#define NB_MAX_EVENTS 100
#define COMM_BANDWIDTH_BUFFER_SIZE 16777216 // 64 MB is enough to see impact on network bandwidth
struct event_s {
puk_tick_t time_start;
puk_tick_t time_end;
};
int main(int argc, char* argv[])
{
int measure_throughput = 0;
int nb_threads = get_nb_openmp_threads();
const int array_size = COMM_BANDWIDTH_BUFFER_SIZE;
struct machine_s machine;
struct params_s params;
MEMSET_TYPE scalar = 3;
MEMSET_TYPE **a;
puk_tick_t time_origin;
init_params();
disable_all_params();
enable_param(BIND_MEMORY_COMP);
parse_args(argc, argv, &params);
hwloc_topology_init(&machine.topology);
hwloc_topology_load(machine.topology);
fill_machine(&machine);
init_malloc(&machine, params.memory_comp_numa_nodes, params.memory_comp_numa_nodes_nb, -1);
for (int i = 1; i < argc; i++)
{
if (!strcmp(argv[i], "--throughput"))
{
measure_throughput = 1;
break;
}
}
printf("# Nb workers: %d\n", nb_threads);
printf("# Will measure %s\n", measure_throughput ? "throughput" : "bandwidth");
int* nb_events_per_thread = malloc(nb_threads * sizeof(int));
struct event_s** events_per_thread = malloc(nb_threads * sizeof(struct event_s*));
a = malloc(nb_threads*sizeof(MEMSET_TYPE*));
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
{
nb_events_per_thread[i] = 0;
events_per_thread[i] = malloc(NB_MAX_EVENTS * sizeof(struct event_s));
a[i] = data_malloc(
array_size*sizeof(MEMSET_TYPE),
params.memory_comp_numa_nodes_nb == 0 ? -1 : get_numa_node_comp_for_id(i % params.memory_comp_numa_nodes_nb));
for (int j = 0; j < array_size; j++)
{
a[i][j] = 1;
}
}
unsigned* nb_iter_per_thread = malloc(nb_threads * sizeof(unsigned));
PUK_GET_TICK(time_origin);
if (measure_throughput)
{
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
{
nb_iter_per_thread[i] = 0;
puk_tick_t start_iter_time, end_iter_time;
do
{
PUK_GET_TICK(start_iter_time);
int value = scalar * (nb_iter_per_thread[i]+1);
for (int k = 0; k < array_size; k++)
{
_mm_stream_si32(a[i]+k, value);
}
PUK_GET_TICK(end_iter_time);
nb_iter_per_thread[i]++;
events_per_thread[i][nb_events_per_thread[i]].time_start = start_iter_time;
events_per_thread[i][nb_events_per_thread[i]].time_end = end_iter_time;
nb_events_per_thread[i]++;
} while (PUK_TIMING_DELAY(time_origin, end_iter_time) < THROUGHPUT_DURATION);
}
}
else
{
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
{
for (int j = 1; j <= NB_ITER; j++)
{
puk_tick_t start_iter_time, end_iter_time;
int value = scalar * j;
PUK_GET_TICK(start_iter_time);
for (int k = 0; k < array_size; k++)
{
_mm_stream_si32(a[i]+k, value);
}
PUK_GET_TICK(end_iter_time);
events_per_thread[i][nb_events_per_thread[i]].time_start = start_iter_time;
events_per_thread[i][nb_events_per_thread[i]].time_end = end_iter_time;
nb_events_per_thread[i]++;
}
}
}
/* Print trace: */
int* thread_binding = malloc(nb_threads * sizeof(int));
memset(thread_binding, 0, nb_threads * sizeof(int));
get_worker_binding_ids(machine.topology, nb_threads, thread_binding);
printf("Thread\tCore\tTime start\tTime end\n");
for (int i = 0; i < nb_threads; i++)
{
for (int j = 0; j < nb_events_per_thread[i]; j++)
{
printf("%2d\t%2d\t% 10.1f\t% 10.1f\n",
i, thread_binding[i],
PUK_TIMING_DELAY(time_origin, events_per_thread[i][j].time_start),
PUK_TIMING_DELAY(time_origin, events_per_thread[i][j].time_end));
}
}
free(thread_binding);
/* Release everything: */
hwloc_topology_destroy(machine.topology);
for (int i = 0; i < nb_threads; i++)
{
free(events_per_thread[i]);
data_free(
a[i],
array_size*sizeof(MEMSET_TYPE),
params.memory_comp_numa_nodes_nb == 0 ? -1 : get_numa_node_comp_for_id(i % params.memory_comp_numa_nodes_nb));
}
free(a);
free(nb_events_per_thread);
free(events_per_thread);
return EXIT_SUCCESS;
}
......@@ -3,11 +3,15 @@
#include <stdlib.h>
#include <assert.h>
#include "config.h"
#include "cli.h"
#ifdef CLI_DISABLE_KERNELS
#define NB_KERNELS 0
#else
#include "stream.h"
#include "prime.h"
#include "cursor.h"
#include "config.h"
#if WITH_STARPU == 1
#if HAVE_MKL
......@@ -27,6 +31,7 @@
#define NB_KERNELS 7
#endif
#endif
#endif
#define CHECK_PARAM(param) if (!enabled_params[param]) { printf("Parameter '%s' not available\n", argv[i]); continue; }
......@@ -45,6 +50,7 @@ void init_params()
enabled_params[BIND_MEMORY_COMP] = 0;
#endif
#ifndef CLI_DISABLE_KERNELS
computing_kernels[0] = prime_get_functions();
computing_kernels[1] = stream_get_functions();
computing_kernels[2] = cursor_get_functions();
......@@ -61,6 +67,7 @@ void init_params()
computing_kernels[7] = scalar_avx_get_functions();
#endif
#endif
#endif
}
void disable_param(enum param_e param)
......@@ -68,6 +75,19 @@ void disable_param(enum param_e param)
enabled_params[param] = 0;
}
void enable_param(enum param_e param)
{
enabled_params[param] = 1;
}
void disable_all_params()
{
for (int i = 0; i < _LAST_PARAM; i++)
{
enabled_params[i] = 0;
}
}
void parse_args(int argc, char* argv[], struct params_s* params)
......@@ -81,7 +101,9 @@ void parse_args(int argc, char* argv[], struct params_s* params)
params->do_warmup = 1;
params->do_alone = DO_COMP_ALONE | DO_COMM_ALONE;
params->comm_bench_type = LATENCY;
#ifndef CLI_DISABLE_KERNELS
params->computing = stream_get_functions();
#endif
params->ping_thread_location = LAST;
params->memory_comp_numa_nodes_nb = 0;
params->memory_comm_numa_node = -1;
......
......@@ -67,6 +67,8 @@ void parse_args(int argc, char* argv[], struct params_s* params);
void print_params(struct params_s params);
void print_help(struct params_s params);
void disable_param(enum param_e param);
void enable_param(enum param_e param);
void disable_all_params();
void print_machine(struct machine_s* machine, struct params_s* params);
void ping_thread_location_to_pu(struct params_s* params, struct machine_s* machine);
......
......@@ -15,11 +15,13 @@
#define MEMSET_TYPE int
#define NB_ITER 3
#define THROUGHPUT_DURATION 1500000 // 1.5 sec
static MEMSET_TYPE **a;
static int array_size;
static int nb_threads;
static int use_non_temporal = 0;
static int measure_throughput = 0;
extern struct machine_s machine;
extern struct params_s params;
......@@ -47,12 +49,12 @@ static int memset_get_nb_runs(enum comm_bench_type comm_bench_type)
static inline double time_to_bw(double t) // microsec
{
return (sizeof(MEMSET_TYPE) * array_size * nb_threads) / (t / NB_ITER); // MB/s
return (sizeof(MEMSET_TYPE) * array_size * nb_threads) / t; // MB/s
}
static inline double time_to_bw_per_thread(double t) // microsec
{
return (sizeof(MEMSET_TYPE) * array_size) / (t / NB_ITER); // MB/s
return (sizeof(MEMSET_TYPE) * array_size) / t; // MB/s
}
static void memset_print_results()
......@@ -92,21 +94,42 @@ static void memset_print_results()
printf("# memset results: Bandwidth MB/s (max, avg, min) Time ms (min, avg, max)\n");
if (memset_bench_done[WARMUP])
{
printf("# warmup "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_warmup[0]), time_to_bw(memset_perfs_warmup[1]), time_to_bw(memset_perfs_warmup[2]),
memset_perfs_warmup[0] / 1000.0f, memset_perfs_warmup[1] / 1000.0f, memset_perfs_warmup[2] / 1000.0f);
if (measure_throughput)
{
printf("# warmup -\t-\t-\t-\t-\t-\n");
}
else
{
printf("# warmup "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_warmup[0]), time_to_bw(memset_perfs_warmup[1]), time_to_bw(memset_perfs_warmup[2]),
memset_perfs_warmup[0] / 1000.0f, memset_perfs_warmup[1] / 1000.0f, memset_perfs_warmup[2] / 1000.0f);
}
}
if (memset_bench_done[WITH_COMM])
{
printf("# with communications "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_comm[0]), time_to_bw(memset_perfs_comm[1]), time_to_bw(memset_perfs_comm[2]),
memset_perfs_comm[0] / 1000.0f, memset_perfs_comm[1] / 1000.0f, memset_perfs_comm[2] / 1000.0f);
if (measure_throughput)
{
printf("# with communications -\t-\t-\t-\t-\t-\n");
}
else
{
printf("# with communications "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_comm[0]), time_to_bw(memset_perfs_comm[1]), time_to_bw(memset_perfs_comm[2]),
memset_perfs_comm[0] / 1000.0f, memset_perfs_comm[1] / 1000.0f, memset_perfs_comm[2] / 1000.0f);
}
}
if (memset_bench_done[WITHOUT_COMM])
{
printf("# without communications "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_no_comm[0]), time_to_bw(memset_perfs_no_comm[1]), time_to_bw(memset_perfs_no_comm[2]),
memset_perfs_no_comm[0] / 1000.0f, memset_perfs_no_comm[1] / 1000.0f, memset_perfs_no_comm[2] / 1000.0f);
if (measure_throughput)
{
printf("# without communications -\t-\t-\t-\t-\t-\n");
}
else
{
printf("# without communications "COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_BW_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\t"COMP_TIME_FORMAT"\n",
time_to_bw(memset_perfs_no_comm[0]), time_to_bw(memset_perfs_no_comm[1]), time_to_bw(memset_perfs_no_comm[2]),
memset_perfs_no_comm[0] / 1000.0f, memset_perfs_no_comm[1] / 1000.0f, memset_perfs_no_comm[2] / 1000.0f);
}
}
}
......@@ -162,35 +185,81 @@ static double memset_run_kernel(enum bench_type bench_type)
puk_tick_t* thread_start_times = malloc(nb_threads * sizeof(puk_tick_t));
puk_tick_t* thread_end_times = malloc(nb_threads * sizeof(puk_tick_t));
double* thread_durations = malloc(nb_threads * sizeof(double));
unsigned* nb_iter_per_thread = malloc(nb_threads * sizeof(unsigned));
PUK_GET_TICK(start_time);
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
if (measure_throughput)
{
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
{
nb_iter_per_thread[i] = 0;
thread_durations[i] = 0;
puk_tick_t start_iter_time, end_iter_time;
double last_iter_duration = 0;
do
{
#if defined(HAVE_DECL__MM_STREAM_SI32)
if (use_non_temporal)
if (use_non_temporal)
{
PUK_GET_TICK(start_iter_time);
int value = scalar * (nb_iter_per_thread[i]+1);
for (int k = 0; k < array_size; k++)
{
_mm_stream_si32(a[i]+k, value);
}
PUK_GET_TICK(end_iter_time);
}
else
#endif
{
PUK_GET_TICK(start_iter_time);
memset(a[i], scalar * (nb_iter_per_thread[i]+1), array_size*sizeof(MEMSET_TYPE));
PUK_GET_TICK(end_iter_time);
}
if (nb_iter_per_thread[i] > 0)
{
last_iter_duration = PUK_TIMING_DELAY(start_iter_time, end_iter_time);
thread_durations[i] += last_iter_duration;
}
nb_iter_per_thread[i]++;
} while (PUK_TIMING_DELAY(start_time, end_iter_time) < THROUGHPUT_DURATION);
nb_iter_per_thread[i] -= 2;
thread_durations[i] -= last_iter_duration;
thread_durations[i] /= nb_iter_per_thread[i];
}
}
else
{
#pragma omp parallel for
for (int i = 0; i < nb_threads; i++)
{
PUK_GET_TICK(thread_start_times[i]);
for (int j = 1; j <= NB_ITER; j++)
#if defined(HAVE_DECL__MM_STREAM_SI32)
if (use_non_temporal)
{
int value = scalar * j;
for (int k = 0; k < array_size; k++)
PUK_GET_TICK(thread_start_times[i]);
for (int j = 1; j <= NB_ITER; j++)
{
_mm_stream_si32(a[i]+k, value);
int value = scalar * j;
for (int k = 0; k < array_size; k++)
{
_mm_stream_si32(a[i]+k, value);
}
}
PUK_GET_TICK(thread_end_times[i]);
}
PUK_GET_TICK(thread_end_times[i]);
}
else
else
#endif
{
PUK_GET_TICK(thread_start_times[i]);
for (int j = 1; j <= NB_ITER; j++)
{
memset(a[i], scalar*j, array_size*sizeof(MEMSET_TYPE));
PUK_GET_TICK(thread_start_times[i]);
for (int j = 1; j <= NB_ITER; j++)
{
memset(a[i], scalar*j, array_size*sizeof(MEMSET_TYPE));
}
PUK_GET_TICK(thread_end_times[i]);
}
PUK_GET_TICK(thread_end_times[i]);
}
}
PUK_GET_TICK(end_time);
......@@ -199,17 +268,37 @@ static double memset_run_kernel(enum bench_type bench_type)
{
for (int i = 0; i < nb_threads; i++)
{
double thread_duration = PUK_TIMING_DELAY(thread_start_times[i], thread_end_times[i]);
double thread_duration;
if (measure_throughput)
{
thread_duration = thread_durations[i];
}
else
{
thread_duration = PUK_TIMING_DELAY(thread_start_times[i], thread_end_times[i]) / NB_ITER;
}
per_thread_perfs[i*3] = MIN(per_thread_perfs[i*3], thread_duration);
per_thread_perfs[i*3+1] += thread_duration;
per_thread_perfs[i*3+2] = MAX(per_thread_perfs[i*3+2], thread_duration);
}
}
if (measure_throughput)
{
for (int i = 0; i < nb_threads; i++)
{
if (nb_iter_per_thread[i] <= NB_ITER)
{
fprintf(stderr, "# Warning: thread %d did less than %d iterations (%u).\n", i, NB_ITER, nb_iter_per_thread[i]);
}
}
}
free(thread_start_times);
free(thread_end_times);
free(nb_iter_per_thread);
return PUK_TIMING_DELAY(start_time, end_time);
return PUK_TIMING_DELAY(start_time, end_time) / NB_ITER;
}
static int memset_run(int nb_runs, enum bench_type bench_type)
......@@ -313,15 +402,17 @@ static void memset_release()
static void memset_man()
{
#if defined(HAVE_DECL__MM_STREAM_SI32)
printf("Memset-related options:\n");