Commit 648f4bf0 authored by Maverick Chardet's avatar Maverick Chardet
Browse files

Modified parallel_test and sequential_test to be what we want in the

article.
Added indentation to the Concerto config file generated by ConcertoG5k.
parent d18c11b8
......@@ -112,7 +112,7 @@ class ConcertoG5k:
with self.concerto_host as concerto_host:
with open(self.local_wd + "/concerto_config.json", "w") as concerto_config_file:
from json import dump
dump(self.concerto_config, concerto_config_file)
dump(self.concerto_config, concerto_config_file, indent='\t')
concerto_host.send_files(
local_files=[self.local_wd + "/concerto_config.json"],
remote_location=self.full_remote_exp_dir
......
import time
from typing import List, Optional
from concerto.all import Component, Assembly, DepType
from concerto.utility import empty_transition
class ParallelTransitionsComponent(Component):
def __init__(self, nb_parallel_transitions: int,
sleep_time: float = 0, remote_address: Optional[str] = None):
self._nb_parallel_transitions = nb_parallel_transitions
assert(nb_parallel_transitions >= 1)
self._sleep_time = sleep_time
self._remote_host = None
if remote_address:
from experiment_utilities.remote_host import RemoteHost
self._remote_host = RemoteHost(remote_address)
super().__init__()
def create(self):
self.places = [
'beginning',
'ready',
'end'
]
self.initial_place = "beginning"
self.transitions = {
'wait_dep': ('beginning', 'ready', 'run', 0, empty_transition),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
for i in range(self._nb_parallel_transitions):
self.transitions["trans%d" % i] = 'ready', 'end', 'run', 0, self.run_function,
self.dependencies = {
"service": (DepType.PROVIDE, ['end']),
"use_service": (DepType.USE, ['wait_dep'])
}
def run_function(self):
if self._remote_host:
self._remote_host.run("sleep %f" % self._sleep_time)
elif self._sleep_time > 0:
time.sleep(self._sleep_time)
class ProviderComponent(Component):
def create(self):
self.places = [
'beginning',
'end'
]
self.initial_place = "beginning"
self.transitions = {
'provide': ('beginning', 'end', 'provide', 0, empty_transition),
'reset': ('end', 'beginning', 'reset', 0, empty_transition),
}
self.dependencies = {
"service": (DepType.PROVIDE, ['end'])
}
class ParallelAssembly(Assembly):
def __init__(self):
super().__init__()
self._current_nb_active_components = 0
self._last_nb_active_components = 0
def prepare(self, nb_components: int, nb_parallel_transitions: int,
remote_addresses: Optional[List[str]] = None, sleep_time: float = 0):
if remote_addresses:
assert(len(remote_addresses) == nb_components)
self._current_nb_active_components = nb_components
self.add_component("provider", ProviderComponent())
for i in range(nb_components):
remote_address = remote_addresses[i] if remote_addresses else None
self.add_component("user%d" % i, ParallelTransitionsComponent(nb_parallel_transitions,
sleep_time,
remote_address))
self.connect("provider", "service", "user%d" % i, "use_service")
self.synchronize()
def run(self, nb_active_components):
self._last_nb_active_components = nb_active_components
if nb_active_components > self._current_nb_active_components:
raise Exception("Number of calls larger than the number of components!")
beginning_time = time.perf_counter()
for i in range(nb_active_components):
self.push_b("user%d" % i, "run")
self.push_b("provider", "provide")
self.wait_all()
self.synchronize()
end_time = time.perf_counter()
return end_time-beginning_time
def reset(self):
for i in range(self._last_nb_active_components):
self.push_b("user%d" % i, "reset")
self.push_b("provider", "reset")
self.wait_all()
self.synchronize()
def run_experiments(list_nb_components: List[int], list_nb_parallel_transitions: List[int], nb_repeats: int,
remote_hosts: List[str] = (), sleep_time: float = 0,
verbosity: int = -1, printing: bool = False, print_time: bool = False, dryrun: bool = False):
import json
from statistics import mean, stdev
from typing import Dict, Any
from concerto.utility import Printer
running_times: Dict[int, Dict[int, Dict[str, Any]]] = dict()
for nb_trans in list_nb_parallel_transitions:
assembly = ParallelAssembly()
assembly.set_verbosity(verbosity)
assembly.set_print_time(print_time)
assembly.set_record_gantt(True)
assembly.set_dryrun(dryrun)
running_times[nb_trans] = dict()
if printing:
Printer.st_tprint("Preparing the assembly with %d parallel transitions per component" % nb_trans)
assembly.prepare(max(list_nb_components), nb_trans, remote_hosts, sleep_time)
time.sleep(1)
for nb_components in list_nb_components:
if printing:
Printer.st_tprint("Testing for %d components..." % nb_components)
running_times[nb_trans][nb_components] = {
"runs": []
}
for i in range(nb_repeats):
running_time = assembly.run(nb_components)
time.sleep(2)
assembly.reset()
running_times[nb_trans][nb_components]["runs"].append(running_time)
if printing:
Printer.st_tprint("- attempt %d: %f" % (i, running_time))
running_times[nb_trans][nb_components]["average"] = mean(running_times[nb_trans][nb_components]["runs"])
if printing:
Printer.st_tprint("- average: %f" % running_times[nb_trans][nb_components]["average"])
if nb_repeats >= 2:
running_times[nb_trans][nb_components]["std"] = stdev(running_times[nb_trans][nb_components]["runs"])
if printing:
Printer.st_tprint("- std: %f" % running_times[nb_trans][nb_components]["std"])
if printing:
Printer.st_tprint("Terminating assembly")
assembly.terminate()
gc = assembly.get_gantt_record()
gc.export_gnuplot("results_%d_transitions.gpl" % nb_trans)
gc.get_gantt_chart().export_json("results_%d_transitions.json" % nb_trans)
with open("times.json", "w") as f:
json.dump(running_times, f, indent='\t')
def load_config(conf_file_location):
from json import load
with open(conf_file_location, "r") as file:
conf = load(file)
return conf
def main():
config = load_config("concerto_config.json")
list_nb_components = config['list_nb_components']
list_nb_parallel_transitions = config['list_nb_parallel_transitions']
nb_repeats = config['nb_repeats']
remote_hosts = config['remote_hosts']
run_experiments(
list_nb_components,
list_nb_parallel_transitions,
nb_repeats,
remote_hosts,
verbosity=-1,
printing=True,
print_time=True,
)
if __name__ == '__main__':
main()
#!/usr/bin/env python
import logging
from typing import List
from experiment_utilities.remote_host import RemoteHost
from experiment_utilities.reserve_g5k import G5kReservation
from experiment_utilities.concerto_g5k import ConcertoG5k
CONCERTO_GIT = 'https://gitlab.inria.fr/mchardet/madpp.git'
CONCERTO_DIR_IN_GIT = 'madpp'
EXP_DIR_IN_GIT = CONCERTO_DIR_IN_GIT + '/tests/ssh_scalability'
ROOT_DIR = '~/concertonode'
PYTHON_FILE = 'assembly.py'
CONCERTO_DIR = '%s/%s' % (ROOT_DIR, CONCERTO_DIR_IN_GIT)
EXP_DIR = '%s/%s' % (ROOT_DIR, EXP_DIR_IN_GIT)
DEFAULT_WORKING_DIRECTORY = '.'
def run_experiment(list_nb_remote_ssh, nb_repeats, conf, working_directory=DEFAULT_WORKING_DIRECTORY,
def run_experiment(list_nb_components, list_nb_parallel_transitions, nb_repeats, conf, working_directory,
force_deployment=True, destroy=False):
from json import dump
with G5kReservation(conf, force_deployment, destroy) as g5k:
remote_machines = g5k.get_hosts_info(role='remote')
concerto_machine = g5k.get_hosts_info(role='concerto')[0]
......@@ -28,8 +16,9 @@ def run_experiment(list_nb_remote_ssh, nb_repeats, conf, working_directory=DEFAU
print("Concerto: %s" % str(concerto_machine))
concerto_config = {
"remote_hosts": remote_machines,
"concerto_host": concerto_machine,
"list_nb_remote_ssh": list_nb_remote_ssh,
"concerto_host": concerto_machine if concerto_machine else None,
"list_nb_components": list_nb_components,
"list_nb_parallel_transitions": list_nb_parallel_transitions,
"nb_repeats": nb_repeats
}
......@@ -40,7 +29,7 @@ def run_experiment(list_nb_remote_ssh, nb_repeats, conf, working_directory=DEFAU
with RemoteHost(concerto_machine["address"], remote_user="root") as concerto_host:
with ConcertoG5k(
remote_host=concerto_host,
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/ssh_scalability',
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/parallel_test',
python_file='assembly.py',
concerto_config=concerto_config,
local_wd=working_directory,
......@@ -50,23 +39,36 @@ def run_experiment(list_nb_remote_ssh, nb_repeats, conf, working_directory=DEFAU
concerto_g5k.get_files(['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json'])
def perform_experiment(list_nb_remote_ssh, nb_repeats):
def perform_experiment(list_nb_components: List[int], list_nb_parallel_transitions: List[int], nb_repeats: int,
working_directory: str = 'exp', ssh_test=True):
import yaml
from os import makedirs
with open("conf.yaml") as f:
conf = yaml.load(f)
conf['g5k']['resources']['machines'][0]['nodes'] = max(list_nb_remote_ssh)
conf['g5k']['resources']['machines'][0]['nodes'] = max(list_nb_components) if ssh_test else 0
wd = "exp"
makedirs(wd, exist_ok=True)
with open(wd + '/g5k_config.yaml', 'w') as g5k_config_file:
makedirs(working_directory, exist_ok=True)
with open(working_directory + '/g5k_config.yaml', 'w') as g5k_config_file:
yaml.dump(conf, g5k_config_file)
run_experiment(list_nb_remote_ssh, nb_repeats, conf, wd)
run_experiment(list_nb_components, list_nb_parallel_transitions, nb_repeats, conf, working_directory)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
perform_experiment([1, 5, 10, 15, 20], 5)
perform_experiment(
list_nb_components=[1, 5, 10, 15, 20],
list_nb_parallel_transitions=[1, 5, 10, 20],
nb_repeats=5,
working_directory="exp_ssh",
ssh_test=True
)
perform_experiment(
list_nb_components=[1, 5, 10, 15, 20, 50],
list_nb_parallel_transitions=[1, 5, 10, 20],
nb_repeats=5,
working_directory="exp_no_ssh",
ssh_test=False
)
......@@ -4,10 +4,8 @@ from concerto.all import Component, Assembly, DepType
from concerto.utility import empty_transition
class ParallelTransitionsComponent(Component):
def __init__(self, nb_parallel_transitions: int, sleep_time: float, first: bool = False):
self._nb_parallel_transitions = nb_parallel_transitions
assert(nb_parallel_transitions >= 1)
class SingleTransitionsComponent(Component):
def __init__(self, sleep_time: float = 0, first: bool = False):
self._sleep_time = sleep_time
self._first = first
super().__init__()
......@@ -15,28 +13,25 @@ class ParallelTransitionsComponent(Component):
def create(self):
self.places = [
'beginning',
'ready',
'end'
]
self.initial_place = "beginning"
self.transitions = {
'wait_dep': ('beginning', 'ready', 'run', 0, empty_transition),
'run': ('beginning', 'end', 'run', 0, self.run_function),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
for i in range(self._nb_parallel_transitions):
self.transitions["trans%d" % i] = ('ready', 'end', 'run', 0, self.run_function),
self.dependencies = {
"finished": (DepType.PROVIDE, ['end'])
}
if not self._first:
self.dependencies["previous"] = (DepType.USE, ['wait_dep'])
self.dependencies["previous"] = (DepType.USE, ['run'])
def run_function(self):
self.print_color("Waiting for %f seconds" % self._sleep_time)
time.sleep(self._sleep_time)
if self._sleep_time > 0:
time.sleep(self._sleep_time)
class SequenceAssembly(Assembly):
......@@ -45,11 +40,13 @@ class SequenceAssembly(Assembly):
self._current_nb = 0
self._last_chain_length = 0
def prepare(self, max_chain_length: int, nb_parallel_transitions: int, sleep_time: float):
def prepare(self, max_chain_length: int, sleep_time: float):
self._current_nb = max_chain_length
for i in range(max_chain_length):
self.add_component("comp%d" % i, ParallelTransitionsComponent(nb_parallel_transitions, sleep_time,
first=(i == 0)))
self.add_component("comp%d" % i, SingleTransitionsComponent(sleep_time=sleep_time,
first=(i == 0)))
if i > 0:
self.connect("comp%d" % (i-1), "finished", "comp%d" % i, "previous")
self.synchronize()
def run(self, chain_length: int):
......@@ -67,53 +64,49 @@ class SequenceAssembly(Assembly):
def reset(self):
for i in range(self._last_chain_length):
self.push_b("comp%d" % i, "reset")
self.wait_all()
self.synchronize()
def run_experiments(list_chain_length: List[int], list_nb_parallel_transitions: List[int], nb_repeats: int,
def run_experiments(list_chain_length: List[int], nb_repeats: int,
sleep_time: float = 0,
verbosity: int = -1, printing: bool = False, print_time: bool = False, dryrun: bool = False):
import json
from statistics import mean, stdev
from typing import Dict, Any
from concerto.utility import Printer
running_times: Dict[int, Dict[str, Any]] = dict()
assembly = SequenceAssembly()
assembly.set_verbosity(verbosity)
assembly.set_print_time(print_time)
assembly.set_record_gantt(True)
assembly.set_dryrun(dryrun)
running_times: Dict[int, Dict[int, Dict[str, Any]]] = dict()
assembly.prepare(max(list_chain_length), sleep_time)
time.sleep(1)
for nb_trans in list_nb_parallel_transitions:
running_times[nb_trans] = dict()
for chain_length in list_chain_length:
if printing:
Printer.st_tprint("Preparing the assembly with %d parallel transitions per component" % nb_trans)
assembly.prepare(max(list_chain_length), nb_trans, sleep_time)
time.sleep(5)
for chain_length in list_chain_length:
Printer.st_tprint("Testing for a chain of length %d..." % chain_length)
running_times[chain_length] = {
"runs": []
}
for i in range(nb_repeats):
running_time = assembly.run(chain_length)
time.sleep(2)
assembly.reset()
running_times[chain_length]["runs"].append(running_time)
if printing:
Printer.st_tprint("Testing for a chain of length %d..." % chain_length)
running_times[nb_trans][chain_length] = {
"runs": []
}
for i in range(nb_repeats):
running_time = assembly.run(chain_length)
time.sleep(5)
assembly.reset()
running_times[nb_trans][chain_length]["runs"].append(running_time)
if printing:
Printer.st_tprint("- attempt %d: %f" % (i, running_time))
running_times[nb_trans][chain_length]["average"] = mean(running_times[nb_trans][chain_length]["runs"])
Printer.st_tprint("- attempt %d: %f" % (i, running_time))
running_times[chain_length]["average"] = mean(running_times[chain_length]["runs"])
if printing:
Printer.st_tprint("- average: %f" % running_times[chain_length]["average"])
if nb_repeats >= 2:
running_times[chain_length]["std"] = stdev(running_times[chain_length]["runs"])
if printing:
Printer.st_tprint("- average: %f" % running_times[nb_trans][chain_length]["average"])
if nb_repeats >= 2:
running_times[nb_trans][chain_length]["std"] = stdev(running_times[nb_trans][chain_length]["runs"])
if printing:
Printer.st_tprint("- std: %f" % running_times[nb_trans][chain_length]["std"])
with open("times.json", "w") as f:
json.dump(running_times, f, indent='\t')
Printer.st_tprint("- std: %f" % running_times[chain_length]["std"])
if printing:
Printer.st_tprint("Terminating assembly")
......@@ -123,6 +116,9 @@ def run_experiments(list_chain_length: List[int], list_nb_parallel_transitions:
gc.export_gnuplot("results.gpl")
gc.get_gantt_chart().export_json("results.json")
with open("times.json", "w") as f:
json.dump(running_times, f, indent='\t')
def load_config(conf_file_location):
from json import load
......@@ -134,11 +130,10 @@ def load_config(conf_file_location):
def main():
config = load_config("concerto_config.json")
list_chain_length = config['list_chain_length']
list_nb_parallel_transitions = config['list_nb_parallel_transitions']
nb_repeats = config['nb_repeats']
run_experiments(
list_chain_length, list_nb_parallel_transitions, nb_repeats,
list_chain_length, nb_repeats,
sleep_time=0,
verbosity=-1,
printing=True,
......
#!/usr/bin/env python
import logging
from typing import List
from experiment_utilities.remote_host import RemoteHost
from experiment_utilities.reserve_g5k import G5kReservation
from experiment_utilities.concerto_g5k import ConcertoG5k
DEFAULT_WORKING_DIRECTORY = '.'
def run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats, conf,
working_directory=DEFAULT_WORKING_DIRECTORY, force_deployment=True, destroy=False):
def run_experiment(list_chain_length, nb_repeats, conf, working_directory,
force_deployment=True, destroy=False):
with G5kReservation(conf, force_deployment, destroy) as g5k:
concerto_machine = g5k.get_hosts_info(role='concerto')[0]
print("Concerto: %s" % str(concerto_machine))
concerto_config = {
"concerto_host": concerto_machine,
"list_chain_length": list_chain_length,
"list_nb_parallel_transitions": list_nb_parallel_transitions,
"nb_repeats": nb_repeats
}
......@@ -27,7 +24,7 @@ def run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats,
with RemoteHost(concerto_machine["address"], remote_user="root") as concerto_host:
with ConcertoG5k(
remote_host=concerto_host,
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/component_sequence',
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/sequential_test',
python_file='assembly.py',
concerto_config=concerto_config,
local_wd=working_directory
......@@ -36,21 +33,24 @@ def run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats,
concerto_g5k.get_files(['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json'])
def perform_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats):
def perform_experiment(list_chain_length: List[int], nb_repeats: int,
working_directory: str = 'exp'):
import yaml
from os import makedirs
with open("conf.yaml") as f:
conf = yaml.load(f)
wd = "exp"
makedirs(wd, exist_ok=True)
with open(wd + '/g5k_config.yaml', 'w') as g5k_config_file:
makedirs(working_directory, exist_ok=True)
with open(working_directory + '/g5k_config.yaml', 'w') as g5k_config_file:
yaml.dump(conf, g5k_config_file)
run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats, conf, wd)
run_experiment(list_chain_length, nb_repeats, conf, working_directory)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
perform_experiment([1, 5, 25, 100], [1, 5, 25, 100], 5)
perform_experiment(
list_chain_length=[1, 5, 25, 100],
nb_repeats=5
)
import time
from typing import List
from concerto.all import Component, Assembly
from concerto.utility import empty_transition
class SSHCaller(Component):
def __init__(self, remote_address, sleep_time):
from experiment_utilities.remote_host import RemoteHost
self._remote_host = RemoteHost(remote_address)
self._sleep_time = sleep_time
super().__init__()
def create(self):
self.places = [
'beginning',
'end'
]
self.initial_place = "beginning"
self.transitions = {
'call': ('beginning', 'end', 'call', 0, self.call_function),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
def call_function(self):
self.print_color("Waiting for %f seconds" % self._sleep_time)
self._remote_host.run("sleep %f" % self._sleep_time)
class SSHAssembly(Assembly):
def __init__(self):
super().__init__()
self._current_nb = 0
self._last_nb_calls = 0
def prepare(self, remote_addresses, sleep_time):
self._current_nb = len(remote_addresses)
for i, address in enumerate(remote_addresses):
self.add_component("caller%d" % i, SSHCaller(address["ip"], sleep_time))
self.synchronize()
def call(self, nb_calls):
self._last_nb_calls = nb_calls
if nb_calls > self._current_nb:
raise Exception("Number of calls larger than the number of components!")
beginning_time = time.perf_counter()
for i in range(nb_calls):
self.push_b("caller%d" % i, "call")
self.wait_all()
self.synchronize()
end_time = time.perf_counter()
return end_time-beginning_time
def reset(self):
for i in range(self._last_nb_calls):