Commit fcb2fc68 authored by Martin Khannouz's avatar Martin Khannouz Committed by Berenger Bramas

Change post traitements.

Add gantt and communications volumes : R scripting and python parsing.
Add start_profiling and stop_profiling in python script.
Add calls to gen_comm and gen_gantt in generate_graph.R.
parent 2cc828aa
......@@ -15,6 +15,7 @@ get_data_subset <- function(f, n, h, p)
"numeric", # idle_time
"numeric", # scheduling_time
"numeric", # communuication_time
"numeric", # communuication_vol
"numeric" # rmen
))
......@@ -29,50 +30,6 @@ get_data_subset <- function(f, n, h, p)
return (d)
}
get_db_with_time <- function(f, s, n, h, m)
{
d <- read.csv(file=f,comment.char = "#", sep=",", quote = "\"", head=TRUE,
dec=".", colClasses=
c("integer", # nthreads
"factor", # name
"factor", # scheme
"factor", # compiler
"factor", # runtime
"factor", # model
"integer", # npart
"integer", # height
"integer", # bsize
"integer", # niter
"numeric", # time
"integer" # rmem
))
d$npart <- ordered(d$npart)
d$height <- ordered(d$height)
d$bsize <- ordered(d$bsize)
d <- subset(d, scheme %in% s)
if (n)
d <- subset(d, npart == n)
if (h)
d <- subset(d, height == h)
d <- subset(d, model == m)
d <- within(d, "compiler_runtime" <- paste(compiler, runtime, sep='-'))
names <- c(
'GCC-OMP4',
'Klang',
'Klang-C',
'Klang-CP',
'StarPU',
'StarPU-C',
'StarPU-CP'
)
d <- subset(d, name %in% names)
return (d)
}
# OMP Compiler/runtime breaks, colors...
get_breaks_runtime <- function()
......
library(plyr)
library(ggplot2)
gen_comm_plot <- function(db, d_breaks, model_wanted)
{
db <- subset(db, model == model_wanted)
g <- ggplot(data=db,aes_string(x="nnode", y="communication_vol", color="algo"))
g <- g + geom_line()
g <- g + facet_wrap(npart ~ height, scales="free",
labeller = labeller(npart = as_labeller(npart_labeller),
height = as_labeller(height_labeller),
.default=label_both,
.multi_line=FALSE))
# Set our own colors, linetypes and point shapes.
g <- g + scale_color_manual(name="Algorithm",
breaks=get_breaks_runtime(),
labels=get_labels_runtime(),
values=get_colors_runtime())
# Set X/Y labels.
g <- g + xlab("Number of nodes")
g <- g + ylab("Volume of Communication (MB)")
g <- g + scale_x_continuous(breaks=c(1, 2, 3, 4, 5, 6, 9, 12, 16, 20, 24))
# Save generated plot.
output <- paste(get_output_directory(), "/", model_wanted, "-comm.pdf", sep="")
ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
}
gen_comm <- function(dbfile)
{
data <- get_data_subset(dbfile, 0L, 0L, "False")
data <- subset(data, algo != get_one_node_reference_algorithm())
all_model <- unique(data$model)
for (i in 1:length(all_model))
{
gen_comm_plot(data, unique(data$algo), all_model[i])
}
}
library(plyr)
library(ggplot2)
starts_with <- function(str, pattern)
{
if(nchar(pattern) == 0)
{
return (FALSE)
}
return (pattern == substr(str, 1, nchar(pattern)))
}
create_ressource_order <- function(df, nnode, nthreads)
{
ret <- c()
if(nnode == 1)
{
for(nth in 0:(nthreads-1))
{
ret <- c(ret, paste("CPU", nth, sep=""))
}
}
else
{
for(nno in 0:(nnode-1))
{
for(nth in 0:(nthreads-1))
{
ret <- c(ret, paste(nno, "_CPU", nth, sep=""))
}
}
}
return (ret)
}
set_node_name <- function(df, nnode)
{
for(i in 1:nrow(df))
{
node_number <- substr(df$ResourceId[i], 1, grep("_", df$ResourceId[i]))
name_origin <- paste("Node ", node_number, sep="")
df$Origin[i] <- name_origin
}
}
read_trace <- function(file, nnode, nthreads, start_profiling, stop_profiling)
{
#Read file
df<- read.table(file, header=TRUE, sep=",", strip.white=TRUE, fill=TRUE)
#Remove column I don't need
df = df[!(names(df) %in% c("Nature","Type", "Depth", "Footprint", "JobId", "Tag", "Params"))]
#Init origin
df$Origin <- as.factor("Node 0")
#Remove uninteresting state (at least, uninteresting for me)
def_states<-c("Initializing","Deinitializing", "Idle", "Overhead","Nothing","Freeing","Allocating","WritingBack","FetchingInput","PushingOutput","Callback","Progressing","Unpartitioning","AllocatingReuse","Reclaiming","DriverCopy","DriverCopyAsync","Scheduling","Executing", "dplgsy", "execute_on_all_wrapper", "Su")
df<-df[!(df$Value %in% def_states),]
#Get only the CPUs event
df$ResourceId <- as.factor(df$ResourceId)
resource_order <- create_ressource_order(df, nnode, nthreads)
df$ResourceId <- factor(df$ResourceId, levels = rev(resource_order))
#Remove invalide ressources if there is some
df <- subset(df, !is.na(ResourceId))
#Get only event between start_profiling and stop_profiling
df <- subset(df, df$Start > start_profiling)
df <- subset(df, df$Start < stop_profiling)
#Set value as a string
df$Value <- as.character(df$Value)
#When there is more than one node put the good node on origin
#Dont do it on one node, because CPU name doesn't start with 0_CPU1, but with CPU1, so all node will be on Node 0
if(nnode > 1)
{
#Reset origin with ResourceId, so the replace will work
#Other way to do it may exist, but I didn't manage to do it in a less weird way, sorry for that
df$Origin <- df$ResourceId
df$Origin <- as.character(df$Origin)
#For the number of node, replace origin
for(i in 0:(nnode-1))
{
name_origin <- paste("Node ", i, sep="")
cpu_origin <- paste(i, "_C", sep="")
df$Origin[starts_with(df$Origin, cpu_origin)] <- name_origin
}
}
#This may change given what you wanna see
df$Value[starts_with(df$Value, "L2L")] <- "L2L"
df$Value[starts_with(df$Value, "M2L")] <- "M2L"
df$Value[starts_with(df$Value, "M2M")] <- "M2M"
df$Value[starts_with(df$Value, "P2P")] <- "P2P"
df$Value[df$Value == ""] <- NA
df$SimpleValue <- df$Value
df$SimpleValue[df$SimpleValue == "P2M"] <- "Far Field"
df$SimpleValue[df$SimpleValue == "M2M"] <- "Far Field"
df$SimpleValue[df$SimpleValue == "M2L"] <- "Far Field"
df$SimpleValue[df$SimpleValue == "L2L"] <- "Far Field"
df$SimpleValue[df$SimpleValue == "L2P"] <- "Far Field"
df$SimpleValue[df$SimpleValue == "P2P"] <- "Near Field"
#Shift the start, if some event has been removed
m <- min(df$Start)
df$Start <- df$Start - m
df$End <- df$Start+df$Duration
#Return
df
}
gen_simple_gantt_plot <- function(data, model, algo, nnode, npart)
{
output <- paste(get_output_directory(), "/", model, "-", algo, "-", nnode, "N-", npart/1000000, "M-simple-gantt.pdf", sep="")
title <- paste(model, " ", algo, " ", nnode, "N ", npart/1000000, "M", sep="")
breaks <- c('Sleeping', 'Far Field', 'Near Field')
labels <- c('Sleeping', 'Far Field', 'Near Field')
colors <- c(
'Sleeping' = "#f9766e",
'Far Field' = "#61f1ff",
'Near Field' = "#619dff"
)
g <- ggplot(data,aes(x=Start,xend=End, y=factor(ResourceId), yend=factor(ResourceId),color=SimpleValue))
g <- g + theme_bw()
g <- g + geom_segment(size=8)
g <- g + ggtitle(title)
g <- g + ylab("Resource") + xlab("Time [ms]")
g <- g + scale_color_manual(name="Action", breaks=breaks, labels=labels, values=colors)
g <- g + facet_wrap(~Origin, ncol=1, scales="free_y")
g <- g + scale_y_discrete(breaks=NULL)
ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
}
gen_gantt_plot <- function(data, model, algo, nnode, npart)
{
output <- paste(get_output_directory(), "/", model, "-", algo, "-", nnode, "N-", npart/1000000, "M-gantt.pdf", sep="")
title <- paste(model, " ", algo, " ", nnode, "N ", npart/1000000, "M", sep="")
g <- ggplot(data,aes(x=Start,xend=End, y=factor(ResourceId), yend=factor(ResourceId),color=Value))
g <- g + theme_bw()
g <- g + geom_segment(size=8)
g <- g + ggtitle(title)
g <- g + ylab("Resource") + xlab("Time [ms]")
g <- g + scale_color_brewer(palette="Set1")
g <- g + facet_wrap(~Origin, ncol=1, scales="free_y")
g <- g + scale_y_discrete(breaks=NULL)
ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
}
gen_gantt_grab_data <- function(df)
{
file <- paste(df$filename[1], sep="")
data <- read_trace(file, df$nnode[1], df$nthreads[1], df$start_profiling[1], df$stop_profiling[1])
return (data)
}
gen_gantt <- function(dbfile)
{
df<- read.table(dbfile, header=TRUE, sep=",", strip.white=TRUE, fill=TRUE,
dec=".", colClasses=
c("factor", # model
"factor", # algorithm
"integer", # nnode
"integer", # nthread
"integer", # npart
"integer", # height
"integer", # bsize
"factor", # filename
"numeric", # start_profiling
"numeric" # stop_profiling
))
all_algorithm <- unique(df$algo)
all_model <- unique(df$model)
all_nnode <- unique(df$nnode)
all_npart <- unique(df$npart)
for (mod in 1:length(all_model))
{
for (alg in 1:length(all_algorithm))
{
for (nno in 1:length(all_nnode))
{
for (npa in 1:length(all_npart))
{
#Get the corresponding line
tmp_df <- subset(df, model == all_model[mod] & algo == all_algorithm[alg] & nnode == all_nnode[nno] & npart == all_npart[npa])
if(nrow(tmp_df) > 0)
{
look <- paste("Grab data ", all_model[mod],all_algorithm[alg], all_nnode[nno], "N", all_npart[npa], sep="-")
#Print something so I know were is the script
print(look)
data <- gen_gantt_grab_data(tmp_df)
gen_gantt_plot(data, all_model[mod], all_algorithm[alg], all_nnode[nno], all_npart[npa])
gen_simple_gantt_plot(data, all_model[mod], all_algorithm[alg], all_nnode[nno], all_npart[npa])
}
}
}
}
}
}
......@@ -8,6 +8,7 @@ gen_times_taskdep_plot <- function(data, algo_wanted, model_wanted)
# Sort data to have task, runtime and idle.
subdata <- subset(data, model == model_wanted & algo == algo_wanted)
subdata$rmem <- NULL
subdata$communication_vol <- NULL
subdata$global_time <- NULL
subdata <- melt(subdata, id=c("model", "algo", "nnode", "nthreads", "npart","height","bsize"))
subdata <- rename(subdata, c("variable"="event", "value"="duration"))
......
......@@ -4,12 +4,16 @@ source("gen_efficiencies_taskdep.R")
source("gen_speedup_plots.R")
source("gen_parallel_efficiency_plots.R")
source("gen_normalized_time_plots.R")
source("gen_comm.R")
source("gen_gantt.R")
###
# Generate display of bars with the time spent in Task, Runtime and Idle.
###
gen_times_taskdep("loutre.db")
gen_efficiencies("loutre.db")
gen_comm("loutre.db")
gen_speedup("loutre.db")
gen_pareff("loutre.db")
gen_normalized_time("loutre.db")
gen_gantt("canard.db")
......@@ -8,6 +8,7 @@ import socket
import subprocess
import re
import types
import glob
class ScalFMMConfig(object):
num_threads = 1
......@@ -44,6 +45,7 @@ class ScalFMMConfig(object):
"idle_time",
"scheduling_time",
"communication_time",
"communication_vol",
"rmem",
]
header = ""
......@@ -54,8 +56,28 @@ class ScalFMMConfig(object):
header += "\n"
return header
def gen_header_gantt(self):
columns = [
"model",
"algo",
"nnode",
"nthreads",
"npart",
"height",
"bsize",
"filename",
"start_profiling",
"stop_profiling",
]
header = ""
for i in range(len(columns)):
if not i == 0:
header += ","
header += "\"" + columns[i] + "\""
header += "\n"
return header
def gen_record(self, global_time, runtime_time, task_time, idle_time, scheduling_time, communication_time, rmem):
def gen_record(self, global_time, runtime_time, task_time, idle_time, scheduling_time, communication_time, communication_vol, rmem):
columns = [
self.model,
self.algorithm,
......@@ -70,6 +92,7 @@ class ScalFMMConfig(object):
idle_time,
scheduling_time,
communication_time,
communication_vol,
rmem,
]
record = ""
......@@ -86,6 +109,33 @@ class ScalFMMConfig(object):
record += "\n"
return record
def gen_record_gantt(self, filename, start_profiling, stop_profiling):
columns = [
self.model,
self.algorithm,
self.num_nodes,
self.num_threads,
self.num_particules,
self.height,
self.bloc_size,
filename,
start_profiling,
stop_profiling,
]
record = ""
for i in range(len(columns)):
if not i == 0:
record += ","
if (type(columns[i]) is bool or
type(columns[i]) == str):
record += "\""
record += str(columns[i])
if (type(columns[i]) == bool or
type(columns[i]) == str):
record += "\""
record += "\n"
return record
def get_times_from_trace_file(filename):
cmd = "starpu_trace_state_stats.py " + filename
proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
......@@ -105,27 +155,81 @@ def get_times_from_trace_file(filename):
if len(arr) >= 4:
if arr[2] == "Runtime":
if arr[0] == "Scheduling":
scheduling_time = float(arr[3])
scheduling_time += float(arr[3])
else:
runtime_time = float(arr[3])
runtime_time += float(arr[3])
elif arr[2] == "Task":
task_time += float(arr[3])
elif arr[2] == "Other":
idle_time = float(arr[3])
idle_time += float(arr[3])
elif arr[2] == "MPI":
communication_time += float(arr[3])
elif arr[2] == "User":
if arr[0] == "Decoding task for MPI":
communication_time += float(arr[3])
elif arr[0] == "Preparing task for MPI":
communication_time += float(arr[3])
elif arr[0] == "Post-processing task for MPI":
communication_time += float(arr[3])
else:
runtime_time += float(arr[3])
else:
print("Error type " + arr[2])
# sys.exit("Invalid time!")
return runtime_time, task_time, idle_time, scheduling_time, communication_time
def generate_gantt(config, gantt_filename, initial_dir):
if (os.path.isfile(gantt_filename)):
gantt_file = open(gantt_filename, "a")
else:
gantt_file = open(gantt_filename, "w")
gantt_file.write(config.gen_header_gantt())
csv_paje_filename = initial_dir + "/paje.csv"
simple_paje_filename = initial_dir + "/paje.trace"
#Get start profiling time
cmd = "grep start_profiling " + simple_paje_filename
proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
stdout, stderr = proc.communicate()
if not proc.returncode == 0:
sys.exit("FATAL: Failed to parse " + simple_paje_filename + "!")
return proc.returncode
start_profiling = float("inf")
for line in stdout.decode().splitlines():
arr = line.split()
start_marker = float(arr[1])
if(start_marker < start_profiling):
start_profiling = start_marker
#Get stop profiling time
cmd = "grep stop_profiling " + simple_paje_filename
proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
stdout, stderr = proc.communicate()
if not proc.returncode == 0:
sys.exit("FATAL: Failed to parse " + simple_paje_filename + "!")
return proc.returncode
stop_profiling = 0.0
for line in stdout.decode().splitlines():
arr = line.split()
stop_marker = float(arr[1])
if(stop_marker > stop_profiling):
stop_profiling = stop_marker
gantt_file.write(config.gen_record_gantt(csv_paje_filename, start_profiling, stop_profiling))
def main():
output_trace_file=""
trace_filename="trace.rec"
output_filename="loutre.db"
gantt_database=""
long_opts = ["help",
"trace-file=",
"output-trace-file=",
"gantt-database=",
"output-file="]
opts, args = getopt.getopt(sys.argv[1:], "ht:i:o:", long_opts)
opts, args = getopt.getopt(sys.argv[1:], "ht:i:o:g:", long_opts)
for o, a in opts:
if o in ("-h", "--help"):
# usage()
......@@ -137,6 +241,8 @@ def main():
output_trace_file = str(a)
elif o in ("-o", "--output-file"):
output_filename = str(a)
elif o in ("-g", "--gantt-database"):
gantt_database = str(a)
else:
assert False, "unhandled option"
......@@ -147,6 +253,8 @@ def main():
task_time = 0.0
idle_time = 0.0
scheduling_time = 0.0
communication_time = 0.0
communication_vol = 0.0
if (os.path.isfile(output_filename)): #Time in milli
output_file = open(output_filename, "a")
......@@ -184,16 +292,26 @@ def main():
config.model = line[line.index(":")+1:].strip()
elif re.search("Algorithm", line):
config.algorithm = line[line.index(":")+1:].strip()
elif re.search("TOTAL", line) and re.search("starpu_comm_stats", line):
a = re.findall("(\d*\.\d+|\d+).MB", line)
if len(a) == 1:
communication_vol += float(a[0])
if(gantt_database != ""):
generate_gantt(config, gantt_database, os.path.dirname(trace_filename))
print("Generating time ...")
if (os.path.isfile(trace_filename)): #Time in milli
runtime_time, task_time, idle_time, scheduling_time, communication_time = get_times_from_trace_file(trace_filename)
else:
print("File doesn't exist " + trace_filename)
sum_time = (runtime_time + task_time + scheduling_time + communication_time)/(config.num_nodes*config.num_threads)
diff_time = float('%.2f'%(abs(global_time-sum_time)/global_time))
if diff_time > 0.01:
print('\033[31m/!\\Timing Error of ' + str(diff_time) + '\033[39m')
print('\033[31m Global ' + str(global_time) + ' Sum ' + str(sum_time) + '\033[39m')
# Write a record to the output file.
output_file.write(config.gen_record(global_time,
......@@ -202,6 +320,7 @@ def main():
float(idle_time),
float(scheduling_time),
float(communication_time),
float(communication_vol),
int(rmem)))
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment