Commit a9c82d47 authored by EYRAUD-DUBOIS Lionel's avatar EYRAUD-DUBOIS Lionel

Add the resultStorage script to simplify large experiments with pmtool.

parent 5f5653e4
\ No newline at end of file
# resultStorage: storing results from pmtool runs
**Purpose:** Make it easier to save the results of running pmtool with
many scheduling algorithms on many instances.
**Usage:** This is a Python class that should be used in a Python
The main component is the `MakeStorage` function, which returns a
`Storage` class. `MakeStorage` has 4 parameters:
+ `instanceParameters` is a list of strings, which indicate the names
of the parameters which describe the instances. The `Instance` class
of the resulting `Storage` is a named tuple whose field names are
those provided in `instanceParameters` in addition to the `platform`
field which describes the platform.
+ `getInstanceFile` is a function which, given an `Instance` object,
shoud return the name of the `.rec` file which contains the instance
+ `getPlatformFile` is a function which, given a platform name as a
string, should return the name of the corresponding platform file.
+ `commandArgs` is a list of strings indicating how to invoke
`pmtool`. The first string should be a path to the `pmtool`
executable, and the next ones should be default arguments for all
Once we get a `Storage` class, we can create an instance by providing
it with a filename which indicates where to store the results.
The code uses several concepts, all described by named tuples:
+ An `Instance` specifies a pmtool problem instance, with an instance
and a platform file
+ An `Algorithm` specifies a pmtool algorithm, with three fields:
`alg` is the name of the algorithm, `isBound` is a boolean
indicating whether this is a bound or an algorithm, `params`
contains the param string of the algorithm, with the
`key1=value1:key2=value2` syntax of pmtool.
+ A `Question` is the concatenation of an `Instance` and an
+ A `Record` is a `Question` with an answer, i.e. with the following
additional fields: `mkspan` and `time` provide the result from
pmtool, `date` indicates the date at which the computation was
The code provides helper functions to generate `Algorithm`s and
+ `makeAlgorithm(name, isBound=False, **args)` combines all named
args as parameters for the algorithm (the values should be strings)
+ `makeAlgorithms(name, isBound=False, **args)` in which the values of
named args should be iterables containing string; this function
returns a list of `Algorithm` objects, representing all possible
combinations of parameters, just like the `key1=v1,v2:key2=v3,v4`
syntax from pmtool. Examples:
makeAlgorithms("indep", indep=["dualhp", "balest"], rank=["heft", "min"])
makeAlgorithms("heft", rank=["heft", "min", "none", "unit"])
+ `Storage.makeInstances` works the same as `makeAlgorithms`.
+ `Storage.allCombinations(instances, algs)` generates all possible
`Question`s from the given lists of instances and algorithms.
The most useful functions of the `Storage` class are:
+ `updateFile(questions)` updates the results file by running all
questions which have no valid answer. An answer is valid if there is
at least one, and the most recent answer is at least as recent as
the corresponding instance file. TODO: There is still no test for
the platform file. `updateFile` takes an optional `pred` argument
which can change this definition of "valid answer".
+ `cleanupFile()` keeps only the latest answer for each question in
the file. In addition, it accepts an optional `toBeRemoved`
predicate which given a record specifies if it should be removed or
+ `displayFileContents` provides a human (and R) readable output of
the contents of the storage. It accepts a `file` optional argument
to specify the output file (default is standard output), and a
`pred` argument which given a record specifies if it should be
## Usage example
The `cholesky` archive from the
[Graph Market]( contains a set
of `.rec` files from StarPU, all corresponding to a cholesky execution
for different number of tiles and for different platforms. The file
for size `N` is stored at `NxN/tasks.rec` file. The code to create the
corresponding storage is:
import resultStorage
Ns = [10, 20, 30, 40, 50, 60, 70, 90, 100, 110]
platforms = ["attila", "idgraf", "sirocco", "mirage"]
def getInstanceFile(i):
return "./" + i.N + "x" + i.N + "/tasks.rec"
def getPlatformFile(p):
return "./"+p+".platform"
DagStore = resultStorage.MakeStorage(["N"], getInstanceFile, getPlatformFile, ["/path/to/pmtool", "-t", "0.1"])
store = DagStore("./cholesky.raw")
Then, the following code can be used to populate the store with all
instances = store.makeInstances("cholesky", N = map(str, Ns), platform = platforms)
algorithms = resultStorage.makeAlgorithms("heft", ("rank", ["heft", "min", "none", "unit", "area"]))
algorithms += resultStorage.makeAlgorithms("dmdas", ("rank", ["heft", "min", "none", "unit", "area"]))
algorithms += resultStorage.makeAlgorithms("hetprio", ("rank",["heft", "min", "none", "unit", "area"]))
algorithms += resultStorage.makeAlgorithms("hetprio", ("rank",["heft", "min", "none", "unit", "area"]), ("ssbaf", ["yes"]), ("stealidle", ["yes"]))
algorithms.append(resultStorage.Algorithm(alg = "ect", isBound=False, params=""))
algorithms.append(resultStorage.Algorithm(alg = "area", isBound=True, params="hybrid=no"))
if __name__ == "__main__":
store.updateFile(store.allCombinations(instances, algorithms))
import csv
import sys
from collections import namedtuple, OrderedDict
import subprocess
import shutil
from datetime import datetime
import os
import pickle
dateformat = "%Y-%m-%dT%H:%M:%S"
algorithmFields = ["isBound", "alg", "params"]
resultFields = ["mkspan", "time", "date"]
Algorithm = namedtuple("Algorithm", algorithmFields)
Result = namedtuple("Result", resultFields)
def _makeAlgorithm(row):
return Algorithm(**{k:row[k] for k in algorithmFields})
def _makeResult(row):
return Result(**{k:row[k] for k in resultFields})
def makeAlgorithm(name, isBound=False, **args):
params = OrderedDict(**{k:v for (k, v) in sorted(args.items(), key=lambda c: c[0]) })
return Algorithm(alg = name, isBound = isBound, params = ":".join('='.join(t) for t in p.items()))
def makeAlgorithms(name, isBound=False, **args):
params = [{}]
for (k,v) in sorted(args.items(), key=lambda c: c[0]):
params = [OrderedDict(d, **{k:x}) for x in v for d in params]
return [ Algorithm(alg = name, isBound = isBound, params = ":".join('='.join(t) for t in p.items())) for p in params ]
def MakeStorage(instanceParameters, getInstanceFile, getPlatformFile, commandArgs):
class Storage:
instanceFields = ["platform"] + instanceParameters
questionFields = instanceFields + algorithmFields
recordFields = instanceFields + algorithmFields + resultFields
Instance = namedtuple("Instance", instanceFields)
Question = namedtuple("Question", questionFields)
Record = namedtuple("Record", recordFields)
def _makeInstance(cls, row):
return cls.Instance(**{k:row[k] for k in cls.instanceFields})
def _makeQuestion(cls, row):
return cls.Question(**{k:row[k] for k in cls.questionFields})
def _makeRecord(cls, row):
return cls.Record(**{k:row[k] for k in cls.recordFields})
def makeInstances(cls, **kwargs):
result = [{}]
for k,v in kwargs.items():
result = [dict(d, **{k:x}) for x in v for d in result]
return list(map(cls._makeInstance, result))
def allCombinations(cls, instances, algorithms):
return (cls.Question(**i._asdict(), **a._asdict()) for i in instances for a in algorithms)
def convertRow(cls, row):
row["date"] = datetime.strptime(row["date"], dateformat)
row["isBound"] = True if row["isBound"] == "True" else False
def parseRow(cls, row):
return (cls._makeQuestion(row), _makeResult(row))
def parseRowToRecord(cls, row):
return cls._makeRecord(row)
def __init__(self, filename):
self.filename = filename
def getFilename(cls, i):
return getInstanceFile(i)
def readFile(self):
allData = {}
with open(self.filename, 'r') as fd:
for row in csv.DictReader(fd, fieldnames = self.recordFields):
(question, result) = self.parseRow(row)
if question in allData:
allData[question] = [result]
except OSError:
return allData
def readRecords(self):
with open(self.filename, 'r') as fd:
for row in csv.DictReader(fd, fieldnames = self.recordFields):
record = self.parseRowToRecord(row)
yield record
def _writeRecord(self, csvWriter, record):
d = record._asdict()
d["date"] = d["date"].strftime(dateformat)
def writeRecords(self, results, overwrite=False):
with open(self.filename, 'w' if overwrite else 'a') as fd:
w = csv.DictWriter(fd, fieldnames = self.recordFields)
for r in results:
self._writeRecord(w, r)
def hasValidAnswer(cls, q, data):
if q in data:
lastAnswer = max(data[q], key=lambda
src = cls.getFilename(q)
srcTime = datetime.fromtimestamp(os.path.getmtime(src))
except OSError as e:
print("Instance source file %s is not readable: %s\n" % (src, e), file=sys.stderr)
return >= srcTime
return False
def updateFile(self, questions, pred = lambda q, data: not Storage.hasValidAnswer(q, data)):
data = self.readFile()
toRun = [ q for q in questions if pred(q, data)]
results = self.runMany(toRun)
# Keep only the last answer for each question
def cleanupFile(self, toBeRemoved = lambda x: False):
data = self.readFile()
toWrite = [self.Record(*q, *max(data[q], key=lambda for q in data ]
self.writeRecords((r for r in toWrite if not toBeRemoved(r)), overwrite=True)
def runMany(self, questions):
separated = [ (self._makeInstance(q._asdict()), _makeAlgorithm(q._asdict())) for q in questions ]
## Was a good idea, but pmtool has too many memory problems to open so many instances at once.
## Get list of instances
# allInstances = set(i for (i,a) in separated)
## Group instances which require the same algorithms
# instancesPerAlgList = {}
# for i in allInstances:
# algs = tuple(a for (j, a) in separated if i == j)
# if algs in instancesPerAlgList:
# instancesPerAlgList[algs].append(i)
# else:
# instancesPerAlgList[algs] = [i]
# results = results + sum((self.runOne(instances, algs) for (algs, instances) in instancesPerAlgList.items()), [])
groupedByInstances = {}
for (i, a) in separated:
if i in groupedByInstances:
groupedByInstances[i] = [a]
results = sum( (self.runOne(i, algs) for (i, algs) in groupedByInstances.items()), [])
return results
def runOne(self, instance, algs):
with open(self.filename, 'a') as output:
writer = csv.DictWriter(output, fieldnames = self.recordFields)
filename = self.getFilename(instance)
args = []
args.extend(["--output-raw", "--no-header"])
args.extend(['-p', getPlatformFile(instance.platform)])
args.extend(sum([[("-b" if a.isBound else "-a"),
a.alg+(":"+a.params if a.params else "")] for a in algs], []))
print("Running: " + " ".join(args))
with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) as child:
reader = csv.reader(child.stdout, delimiter=' ')
result = []
for row in reader:
algNameAndParams=row[3].split(":", maxsplit=1)
assert row[0] == filename
rec = self.Record(*list(instance), True if row[2] == 'True' else False, algNameAndParams[0],
algNameAndParams[1] if len(algNameAndParams) >= 2 else "",
row[4], row[5],
returncode = child.wait()
if returncode != 0:
print("Wrong return code %d for command:\n %s\n" % (returncode, " ".join(args)), file=sys.stderr)
print(child.stderr, file=sys.stderr)
raise Exception
return result
def printRecords(self, records, file=""):
if file:
fd = open(file, "w")
fd = sys.stdout
recordsWithParamDicts = [ r._replace(params=OrderedDict(eq.split('=') for eq in r.params.split(":")) if r.params else {}) for r in records ]
allParams = frozenset.union(*(frozenset(r.params.keys()) for r in recordsWithParamDicts))
print(" ".join(instanceParameters),
"platform isBound algorithm",
" ".join(allParams), "makespan time date", file=fd)
for r in recordsWithParamDicts:
print(" ".join(getattr(r, p) if getattr(r, p) else "NA" for p in instanceParameters),
"%s" % (r.platform),
"%s %s" % (r.isBound, r.alg),
" ".join(r.params[k] if k in r.params else "NA" for k in allParams),
"%s %s %s" % (r.mkspan, r.time,, file = fd)
def displayFileContents(self, pred = lambda x: True, **kwargs):
data = self.readRecords()
self.printRecords(filter(pred, data), **kwargs)
def getRecordsFromDatFile(self, file, questions):
knownFields = [("m", "m"), ("k", "k"), ("isBound", "isBound"),
("algorithm", "alg")] + [(p, p) for p in instanceParameters]
resultFields = [ ("makespan", "mkspan"), ("time", "time"), ("date", "date") ]
newQuestions = [(q._replace(params=dict(eq.split('=') for eq in q.params.split(":")) if q.params else {}), q) for q in questions]
result = []
with open(file, "r") as fd:
reader = csv.DictReader(fd, delimiter=' ')
for row in reader:
known = {p:row[p] for (p,z) in knownFields + resultFields}
for (p, z) in knownFields + resultFields:
del row[p]
row = {k:v for (k, v) in row.items() if v != "NA"}
for (q, origQ) in newQuestions:
if all(getattr(q, z) == known[p] for (p, z) in knownFields) and row == q.params:
result.append(self.Record(**origQ._asdict(), mkspan=known["makespan"], time=known["time"],
date = known["date"]))
return result
return Storage
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment