From fcb2fc68c433b97c857a1e8c3800b6ff352139ba Mon Sep 17 00:00:00 2001
From: Martin Khannouz <martin.khannouz@inria.fr>
Date: Fri, 29 Apr 2016 10:40:35 +0200
Subject: [PATCH] Change post traitements.

Add gantt and communications volumes : R scripting and python parsing.
Add start_profiling and stop_profiling in python script.
Add calls to gen_comm and gen_gantt in generate_graph.R.
---
 Utils/benchmark/common.R            |  45 +------
 Utils/benchmark/gen_comm.R          |  43 ++++++
 Utils/benchmark/gen_gantt.R         | 194 ++++++++++++++++++++++++++++
 Utils/benchmark/gen_times_taskdep.R |   1 +
 Utils/benchmark/generate_graph.R    |   4 +
 Utils/benchmark/loutre.py           | 129 +++++++++++++++++-
 6 files changed, 367 insertions(+), 49 deletions(-)
 create mode 100644 Utils/benchmark/gen_comm.R
 create mode 100644 Utils/benchmark/gen_gantt.R

diff --git a/Utils/benchmark/common.R b/Utils/benchmark/common.R
index cd6e8aa18..62d0c5ff6 100644
--- a/Utils/benchmark/common.R
+++ b/Utils/benchmark/common.R
@@ -15,6 +15,7 @@ get_data_subset <- function(f, n, h, p)
 		"numeric",	# idle_time
 		"numeric",	# scheduling_time
 		"numeric",	# communuication_time
+		"numeric",	# communuication_vol
 		"numeric"	# rmen
 		))
 
@@ -29,50 +30,6 @@ get_data_subset <- function(f, n, h, p)
 
     return (d)
 }
-get_db_with_time <- function(f, s, n, h, m)
-{
-    d <- read.csv(file=f,comment.char = "#", sep=",", quote = "\"", head=TRUE,
-                  dec=".", colClasses=
-	    c("integer",	# nthreads
-		"factor",	# name
-		"factor",	# scheme
-                "factor",	# compiler
-		"factor",	# runtime
-		"factor",	# model
-		"integer",	# npart
-		"integer",	# height
-		"integer",	# bsize
-                "integer",      # niter
-		"numeric",	# time
-                "integer"       # rmem
-		))
-
-    d$npart     <- ordered(d$npart)
-    d$height    <- ordered(d$height)
-    d$bsize     <- ordered(d$bsize)
-
-    d <- subset(d, scheme %in% s)
-    if (n)
-        d <- subset(d, npart == n)
-    if (h)
-        d <- subset(d, height == h)
-    d <- subset(d, model == m)
-
-    d <- within(d, "compiler_runtime" <- paste(compiler, runtime, sep='-'))
-
-    names <- c(
-        'GCC-OMP4',
-        'Klang',
-        'Klang-C',
-        'Klang-CP',
-        'StarPU',
-        'StarPU-C',
-        'StarPU-CP'
-    )
-    d <- subset(d, name %in% names)
-
-    return (d)
-}
 
 # OMP Compiler/runtime breaks, colors...
 get_breaks_runtime <- function()
diff --git a/Utils/benchmark/gen_comm.R b/Utils/benchmark/gen_comm.R
new file mode 100644
index 000000000..09e461e9d
--- /dev/null
+++ b/Utils/benchmark/gen_comm.R
@@ -0,0 +1,43 @@
+library(plyr)
+library(ggplot2)
+
+gen_comm_plot <- function(db, d_breaks, model_wanted)
+{
+	db <- subset(db, model == model_wanted)
+
+    g <- ggplot(data=db,aes_string(x="nnode", y="communication_vol", color="algo"))
+    g <- g + geom_line()
+    g <- g + facet_wrap(npart ~ height, scales="free",
+                        labeller = labeller(npart = as_labeller(npart_labeller),
+                                            height = as_labeller(height_labeller),
+                                            .default=label_both,
+                                            .multi_line=FALSE))
+
+    # Set our own colors, linetypes and point shapes.
+    g <- g + scale_color_manual(name="Algorithm",
+                                breaks=get_breaks_runtime(),
+                                labels=get_labels_runtime(),
+                                values=get_colors_runtime())
+
+    # Set X/Y labels.
+    g <- g + xlab("Number of nodes")
+    g <- g + ylab("Volume of Communication (MB)")
+
+    g <- g + scale_x_continuous(breaks=c(1, 2, 3, 4, 5, 6, 9, 12, 16, 20, 24))
+
+    # Save generated plot.
+	output <- paste(get_output_directory(), "/", model_wanted, "-comm.pdf", sep="")
+    ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
+}
+
+gen_comm <- function(dbfile)
+{
+    data <- get_data_subset(dbfile, 0L, 0L, "False")
+	data <- subset(data, algo != get_one_node_reference_algorithm())
+	all_model <- unique(data$model)
+	for (i in 1:length(all_model))
+	{
+		gen_comm_plot(data, unique(data$algo), all_model[i])
+	}
+}
+
diff --git a/Utils/benchmark/gen_gantt.R b/Utils/benchmark/gen_gantt.R
new file mode 100644
index 000000000..95c94d6c1
--- /dev/null
+++ b/Utils/benchmark/gen_gantt.R
@@ -0,0 +1,194 @@
+library(plyr)
+library(ggplot2)
+
+starts_with <- function(str, pattern)
+{
+	if(nchar(pattern) == 0)
+	{
+		return (FALSE)
+	}
+	return (pattern == substr(str, 1, nchar(pattern)))
+}
+create_ressource_order <- function(df, nnode, nthreads)
+{
+	ret <- c()
+	if(nnode == 1)
+	{
+		for(nth in 0:(nthreads-1))
+		{
+			ret <- c(ret, paste("CPU", nth, sep=""))
+		}
+	}
+	else
+	{
+		for(nno in 0:(nnode-1))
+		{
+			for(nth in 0:(nthreads-1))
+			{
+				ret <- c(ret, paste(nno, "_CPU", nth, sep=""))
+			}
+		}
+	}
+	return (ret)
+}
+set_node_name <- function(df, nnode)
+{
+	for(i in 1:nrow(df))
+	{
+		node_number <- substr(df$ResourceId[i], 1, grep("_", df$ResourceId[i]))
+		name_origin <- paste("Node ", node_number, sep="")
+		df$Origin[i] <- name_origin
+	}
+}
+read_trace <- function(file, nnode, nthreads, start_profiling, stop_profiling)
+{
+	#Read file
+	df<- read.table(file, header=TRUE, sep=",", strip.white=TRUE, fill=TRUE)
+	#Remove column I don't need
+	df = df[!(names(df) %in% c("Nature","Type", "Depth", "Footprint", "JobId", "Tag", "Params"))]
+	#Init origin
+	df$Origin <- as.factor("Node 0")
+
+	#Remove uninteresting state (at least, uninteresting for me)
+	def_states<-c("Initializing","Deinitializing", "Idle", "Overhead","Nothing","Freeing","Allocating","WritingBack","FetchingInput","PushingOutput","Callback","Progressing","Unpartitioning","AllocatingReuse","Reclaiming","DriverCopy","DriverCopyAsync","Scheduling","Executing", "dplgsy", "execute_on_all_wrapper", "Su")
+	df<-df[!(df$Value %in% def_states),]
+
+	#Get only the CPUs event
+	df$ResourceId <- as.factor(df$ResourceId)
+	resource_order <- create_ressource_order(df, nnode, nthreads)
+	df$ResourceId <- factor(df$ResourceId, levels = rev(resource_order))
+	#Remove invalide ressources if there is some
+	df <- subset(df, !is.na(ResourceId))
+
+	#Get only event between start_profiling and stop_profiling
+	df <- subset(df, df$Start > start_profiling)
+	df <- subset(df, df$Start < stop_profiling)
+
+	#Set value as a string
+	df$Value <- as.character(df$Value)
+
+	#When there is more than one node put the good node on origin
+	#Dont do it on one node, because CPU name doesn't start with 0_CPU1, but with CPU1, so all node will be on Node 0
+	if(nnode > 1)
+	{
+		#Reset origin with ResourceId, so the replace will work
+		#Other way to do it may exist, but I didn't manage to do it in a less weird way, sorry for that
+		df$Origin <- df$ResourceId
+		df$Origin <- as.character(df$Origin)
+		#For the number of node, replace origin
+		for(i in 0:(nnode-1))
+		{
+			name_origin <- paste("Node ", i, sep="")
+			cpu_origin <- paste(i, "_C", sep="")
+			df$Origin[starts_with(df$Origin, cpu_origin)] <- name_origin
+		}
+	}
+
+	#This may change given what you wanna see
+	df$Value[starts_with(df$Value, "L2L")] <- "L2L"
+	df$Value[starts_with(df$Value, "M2L")] <- "M2L"
+	df$Value[starts_with(df$Value, "M2M")] <- "M2M"
+	df$Value[starts_with(df$Value, "P2P")] <- "P2P"
+	df$Value[df$Value == ""] <- NA
+
+	df$SimpleValue <- df$Value
+	df$SimpleValue[df$SimpleValue == "P2M"] <- "Far Field"
+	df$SimpleValue[df$SimpleValue == "M2M"] <- "Far Field"
+	df$SimpleValue[df$SimpleValue == "M2L"] <- "Far Field"
+	df$SimpleValue[df$SimpleValue == "L2L"] <- "Far Field"
+	df$SimpleValue[df$SimpleValue == "L2P"] <- "Far Field"
+	df$SimpleValue[df$SimpleValue == "P2P"] <- "Near Field"
+
+	#Shift the start, if some event has been removed
+	m <- min(df$Start)
+	df$Start <- df$Start - m
+	df$End <- df$Start+df$Duration
+
+	#Return
+	df
+} 
+gen_simple_gantt_plot <- function(data, model, algo, nnode, npart)
+{
+	output <- paste(get_output_directory(), "/", model, "-", algo, "-", nnode, "N-", npart/1000000, "M-simple-gantt.pdf", sep="")
+	title <- paste(model, " ", algo, " ", nnode, "N ", npart/1000000, "M", sep="")
+	breaks <- c('Sleeping', 'Far Field', 'Near Field')
+	labels <- c('Sleeping', 'Far Field', 'Near Field')
+	colors <- c(
+		'Sleeping'      = "#f9766e",
+		'Far Field'   = "#61f1ff",
+		'Near Field'   = "#619dff"
+	)
+	g <- ggplot(data,aes(x=Start,xend=End, y=factor(ResourceId), yend=factor(ResourceId),color=SimpleValue)) 
+	g <- g + theme_bw()
+	g <- g + geom_segment(size=8)
+	g <- g + ggtitle(title)
+	g <- g + ylab("Resource") + xlab("Time [ms]")
+	g <- g + scale_color_manual(name="Action", breaks=breaks, labels=labels, values=colors)
+	g <- g + facet_wrap(~Origin, ncol=1, scales="free_y") 
+	g <- g + scale_y_discrete(breaks=NULL)
+    ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
+}
+gen_gantt_plot <- function(data, model, algo, nnode, npart)
+{
+	output <- paste(get_output_directory(), "/", model, "-", algo, "-", nnode, "N-", npart/1000000, "M-gantt.pdf", sep="")
+	title <- paste(model, " ", algo, " ", nnode, "N ", npart/1000000, "M", sep="")
+	g <- ggplot(data,aes(x=Start,xend=End, y=factor(ResourceId), yend=factor(ResourceId),color=Value)) 
+	g <- g + theme_bw()
+	g <- g + geom_segment(size=8)
+	g <- g + ggtitle(title)
+	g <- g + ylab("Resource") + xlab("Time [ms]")
+	g <- g + scale_color_brewer(palette="Set1")
+	g <- g + facet_wrap(~Origin, ncol=1, scales="free_y") 
+	g <- g + scale_y_discrete(breaks=NULL)
+    ggsave(output, g, width=29.7, height=21, units=c("cm"), device=cairo_pdf)
+}
+gen_gantt_grab_data <- function(df)
+{
+	file <- paste(df$filename[1], sep="")
+	data <- read_trace(file, df$nnode[1], df$nthreads[1], df$start_profiling[1], df$stop_profiling[1])
+	return (data)
+}
+gen_gantt <- function(dbfile)
+{
+	df<- read.table(dbfile, header=TRUE, sep=",", strip.white=TRUE, fill=TRUE,
+				  dec=".", colClasses=
+		c("factor",	# model
+		"factor",	# algorithm
+		"integer",	# nnode
+		"integer",	# nthread
+		"integer",	# npart
+		"integer",	# height
+		"integer",	# bsize
+		"factor",	# filename
+		"numeric",	# start_profiling
+		"numeric"	# stop_profiling
+		))
+	all_algorithm <- unique(df$algo)
+	all_model <- unique(df$model)
+	all_nnode <- unique(df$nnode)
+	all_npart <- unique(df$npart)
+	for (mod in 1:length(all_model))
+	{
+		for (alg in 1:length(all_algorithm))
+		{
+			for (nno in 1:length(all_nnode))
+			{
+				for (npa in 1:length(all_npart))
+				{
+					#Get the corresponding line
+					tmp_df <- subset(df, model == all_model[mod] & algo == all_algorithm[alg] & nnode == all_nnode[nno] & npart == all_npart[npa])
+					if(nrow(tmp_df) > 0)
+					{
+						look <- paste("Grab data ", all_model[mod],all_algorithm[alg], all_nnode[nno], "N", all_npart[npa], sep="-")
+						#Print something so I know were is the script
+						print(look) 
+						data <- gen_gantt_grab_data(tmp_df)
+						gen_gantt_plot(data, all_model[mod], all_algorithm[alg], all_nnode[nno], all_npart[npa])
+						gen_simple_gantt_plot(data, all_model[mod], all_algorithm[alg], all_nnode[nno], all_npart[npa])
+					}
+				}
+			}
+
+		}
+	}
+}
diff --git a/Utils/benchmark/gen_times_taskdep.R b/Utils/benchmark/gen_times_taskdep.R
index 71b3dbbd7..0cd15d92c 100644
--- a/Utils/benchmark/gen_times_taskdep.R
+++ b/Utils/benchmark/gen_times_taskdep.R
@@ -8,6 +8,7 @@ gen_times_taskdep_plot <- function(data, algo_wanted, model_wanted)
     # Sort data to have task, runtime and idle.
 	subdata <- subset(data, model == model_wanted & algo == algo_wanted)
 	subdata$rmem <- NULL
+	subdata$communication_vol <- NULL
 	subdata$global_time <- NULL
 	subdata <- melt(subdata, id=c("model", "algo", "nnode", "nthreads", "npart","height","bsize"))
 	subdata <- rename(subdata, c("variable"="event", "value"="duration"))
diff --git a/Utils/benchmark/generate_graph.R b/Utils/benchmark/generate_graph.R
index 9ceabca8d..31c86d996 100644
--- a/Utils/benchmark/generate_graph.R
+++ b/Utils/benchmark/generate_graph.R
@@ -4,12 +4,16 @@ source("gen_efficiencies_taskdep.R")
 source("gen_speedup_plots.R")
 source("gen_parallel_efficiency_plots.R")
 source("gen_normalized_time_plots.R")
+source("gen_comm.R")
+source("gen_gantt.R")
 
 ###
 # Generate display of bars with the time spent in Task, Runtime and Idle.
 ###
 gen_times_taskdep("loutre.db")
 gen_efficiencies("loutre.db")
+gen_comm("loutre.db")
 gen_speedup("loutre.db")
 gen_pareff("loutre.db")
 gen_normalized_time("loutre.db")
+gen_gantt("canard.db")
diff --git a/Utils/benchmark/loutre.py b/Utils/benchmark/loutre.py
index 516043cf9..852792530 100755
--- a/Utils/benchmark/loutre.py
+++ b/Utils/benchmark/loutre.py
@@ -8,6 +8,7 @@ import socket
 import subprocess
 import re
 import types
+import glob
 
 class ScalFMMConfig(object):
     num_threads    = 1
@@ -44,6 +45,7 @@ class ScalFMMConfig(object):
             "idle_time",
             "scheduling_time",
             "communication_time",
+            "communication_vol",
             "rmem",
         ]
         header = ""
@@ -54,8 +56,28 @@ class ScalFMMConfig(object):
         header += "\n"
         return header
 
+    def gen_header_gantt(self):
+        columns = [
+            "model",
+            "algo",
+            "nnode",
+            "nthreads",
+            "npart",
+            "height",
+            "bsize",
+            "filename",
+            "start_profiling",
+            "stop_profiling",
+        ]
+        header = ""
+        for i in range(len(columns)):
+            if not i == 0:
+                header += ","
+            header += "\"" + columns[i] + "\""
+        header += "\n"
+        return header
 
-    def gen_record(self, global_time, runtime_time, task_time, idle_time, scheduling_time, communication_time, rmem):
+    def gen_record(self, global_time, runtime_time, task_time, idle_time, scheduling_time, communication_time, communication_vol, rmem):
         columns = [
             self.model,
             self.algorithm,
@@ -70,6 +92,7 @@ class ScalFMMConfig(object):
             idle_time,
             scheduling_time,
             communication_time,
+            communication_vol,
             rmem,
         ]
         record = ""
@@ -86,6 +109,33 @@ class ScalFMMConfig(object):
         record += "\n"
         return record
 
+    def gen_record_gantt(self, filename, start_profiling, stop_profiling):
+        columns = [
+            self.model,
+            self.algorithm,
+            self.num_nodes,
+            self.num_threads,
+            self.num_particules,
+            self.height,
+            self.bloc_size,
+            filename,
+            start_profiling,
+            stop_profiling,
+        ]
+        record = ""
+        for i in range(len(columns)):
+            if not i == 0:
+                record += ","
+            if (type(columns[i]) is bool or
+                type(columns[i]) == str):
+                record += "\""
+            record += str(columns[i])
+            if (type(columns[i]) == bool or
+                type(columns[i]) == str):
+                record += "\""
+        record += "\n"
+        return record
+
 def get_times_from_trace_file(filename):
     cmd = "starpu_trace_state_stats.py " + filename
     proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
@@ -105,27 +155,81 @@ def get_times_from_trace_file(filename):
         if len(arr) >= 4:
             if arr[2] == "Runtime":
                 if arr[0] == "Scheduling":
-                    scheduling_time = float(arr[3])
+                    scheduling_time += float(arr[3])
                 else:
-                    runtime_time = float(arr[3])
+                    runtime_time += float(arr[3])
             elif arr[2] == "Task":
                 task_time += float(arr[3])
             elif arr[2] == "Other":
-                idle_time = float(arr[3])
+                idle_time += float(arr[3])
+            elif arr[2] == "MPI":
+                communication_time += float(arr[3])
+            elif arr[2] == "User":
+                if arr[0] == "Decoding task for MPI":
+                    communication_time += float(arr[3])
+                elif arr[0] == "Preparing task for MPI":
+                    communication_time += float(arr[3])
+                elif arr[0] == "Post-processing task for MPI":
+                    communication_time += float(arr[3])
+                else:
+                    runtime_time += float(arr[3])
+            else:
+                print("Error type " + arr[2])
             # sys.exit("Invalid time!")
     return runtime_time, task_time, idle_time, scheduling_time, communication_time
 
+def generate_gantt(config, gantt_filename, initial_dir):
+    if (os.path.isfile(gantt_filename)):
+        gantt_file = open(gantt_filename, "a")
+    else:
+        gantt_file = open(gantt_filename, "w")
+        gantt_file.write(config.gen_header_gantt())
+    csv_paje_filename = initial_dir + "/paje.csv"
+    simple_paje_filename = initial_dir + "/paje.trace"
+
+    #Get start profiling time
+    cmd = "grep start_profiling " + simple_paje_filename
+    proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
+    stdout, stderr = proc.communicate()
+    if not proc.returncode == 0:
+        sys.exit("FATAL: Failed to parse " + simple_paje_filename + "!")
+        return proc.returncode
+    start_profiling = float("inf")
+    for line in stdout.decode().splitlines():
+        arr = line.split()
+        start_marker = float(arr[1])
+        if(start_marker < start_profiling):
+            start_profiling = start_marker
+    
+    #Get stop profiling time
+    cmd = "grep stop_profiling " + simple_paje_filename
+    proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
+    stdout, stderr = proc.communicate()
+    if not proc.returncode == 0:
+        sys.exit("FATAL: Failed to parse " + simple_paje_filename + "!")
+        return proc.returncode
+    stop_profiling = 0.0
+    for line in stdout.decode().splitlines():
+        arr = line.split()
+        stop_marker = float(arr[1])
+        if(stop_marker > stop_profiling):
+            stop_profiling = stop_marker
+
+    gantt_file.write(config.gen_record_gantt(csv_paje_filename, start_profiling, stop_profiling))
+
 def main():
     output_trace_file=""
     trace_filename="trace.rec"
     output_filename="loutre.db"
+    gantt_database=""
 
     long_opts = ["help",
                  "trace-file=",
                  "output-trace-file=",
+                 "gantt-database=",
                  "output-file="]
 
-    opts, args = getopt.getopt(sys.argv[1:], "ht:i:o:", long_opts)
+    opts, args = getopt.getopt(sys.argv[1:], "ht:i:o:g:", long_opts)
     for o, a in opts:
         if o in ("-h", "--help"):
             # usage()
@@ -137,6 +241,8 @@ def main():
             output_trace_file = str(a)
         elif o in ("-o", "--output-file"):
             output_filename = str(a)
+        elif o in ("-g", "--gantt-database"):
+            gantt_database = str(a)
         else:
             assert False, "unhandled option"
 
@@ -147,6 +253,8 @@ def main():
     task_time = 0.0
     idle_time = 0.0
     scheduling_time = 0.0
+    communication_time = 0.0
+    communication_vol = 0.0
 
     if (os.path.isfile(output_filename)): #Time in milli
         output_file = open(output_filename, "a")
@@ -184,16 +292,26 @@ def main():
                 config.model = line[line.index(":")+1:].strip()
             elif re.search("Algorithm", line):
                 config.algorithm = line[line.index(":")+1:].strip()
+            elif re.search("TOTAL", line) and re.search("starpu_comm_stats", line):
+                a = re.findall("(\d*\.\d+|\d+).MB", line)
+                if len(a) == 1:
+                    communication_vol += float(a[0])
+                
+    if(gantt_database != ""):
+        generate_gantt(config, gantt_database, os.path.dirname(trace_filename))
 
+    print("Generating time ...")
     if (os.path.isfile(trace_filename)): #Time in milli
         runtime_time, task_time, idle_time, scheduling_time, communication_time = get_times_from_trace_file(trace_filename)
     else:
         print("File doesn't exist " + trace_filename)
+
     sum_time = (runtime_time + task_time + scheduling_time + communication_time)/(config.num_nodes*config.num_threads)
     diff_time = float('%.2f'%(abs(global_time-sum_time)/global_time))
 
     if diff_time > 0.01:   
         print('\033[31m/!\\Timing Error of ' + str(diff_time) + '\033[39m')
+        print('\033[31m Global ' + str(global_time) + ' Sum ' + str(sum_time) + '\033[39m')
 
     # Write a record to the output file.
     output_file.write(config.gen_record(global_time,
@@ -202,6 +320,7 @@ def main():
                       float(idle_time),
                       float(scheduling_time),
                       float(communication_time),
+                      float(communication_vol),
                       int(rmem)))
 
 main()
-- 
GitLab