Commit 821e02bb authored by Maverick Chardet's avatar Maverick Chardet
Browse files

Improved component utilities:

- added a ConcertoG5k class to simplify experimenting with Concerto on G5k
- added typing information to RemoteHost
Added component sequence test
Improved SSH scalability test
- makes use of the new ConcertoG5k class
- added typing information
parent 8ce3b8b8
from typing import List, Optional
from experiment_utilities.remote_host import RemoteHost
class ConcertoG5k:
DEFAULT_LOCAL_WD = '.'
DEFAULT_REMOTE_WD = '~/concertonode'
DEFAULT_CONCERTO_GIT = 'https://gitlab.inria.fr/mchardet/madpp.git'
DEFAULT_CONCERTO_DIR_IN_GIT = 'madpp'
def __init__(self, remote_host: RemoteHost, remote_exp_dir: str, python_file: str, concerto_config,
local_wd: str = DEFAULT_LOCAL_WD,
remote_wd: str = DEFAULT_REMOTE_WD,
concerto_git: str = DEFAULT_CONCERTO_GIT,
concerto_dir_in_git: str = DEFAULT_CONCERTO_DIR_IN_GIT,
additional_git_clone: List[str] = (),
send_ssh_keys: bool = False):
"""
:param remote_host: RemoteHost object for the remote machine which is going to run Concerto
:param remote_exp_dir: Directory in which to perform the experiment on the remote machine, relative to the
remote working directory
:param python_file: Python file to run on the remote server inside the experiment directory
:param concerto_config: object which will be serialized to JSON and sent to the experiment directory as
"concerto_config.json"
:param local_wd: Local working directory
:param remote_wd: Remote working directory
:param concerto_git: Concerto git repository to clone
:param concerto_dir_in_git: Location of Concerto inside the git repository
:param additional_git_clone: Additional git repositories to clone inside the remote working directory
:param send_ssh_keys: Whether or not to send the local public and private SSH keys to the remote machine
"""
self.full_concerto_dir = '%s/%s' % (remote_wd, concerto_dir_in_git)
self.full_remote_exp_dir = '%s/%s' % (remote_wd, remote_exp_dir)
self.concerto_host: RemoteHost = remote_host
self.local_wd = local_wd
self.concerto_config = concerto_config
self.send_ssh_keys = send_ssh_keys
self.python_file = python_file
self.remote_exp_dir_created = False
self.git_clone_cmd = "cd %s;" % remote_wd +\
"git clone %s;" % concerto_git
for git_repo in additional_git_clone:
self.git_clone_cmd += "git clone %s;" % git_repo
def _ensure_exp_dir_exists(self):
if not self.remote_exp_dir_created:
command = "mkdir -p %s" % self.full_remote_exp_dir
with self.concerto_host as concerto_host:
concerto_host.run(command, wait=True)
self.remote_exp_dir_created = True
def send_files(self, files_list: List[str]):
"""
Sends a list of file to the remote experiment directory
:param files_list: List of files to send, relative to the local working directory
"""
self._ensure_exp_dir_exists()
with self.concerto_host as concerto_host:
concerto_host.send_files(
local_files=[self.local_wd + "/" + file for file in files_list],
remote_location=self.full_remote_exp_dir
)
def get_files(self, files_list: List[str]):
"""
Gets a list of file to the local working directory
:param files_list: List of files to get, relative to the remote experiment directory
"""
self._ensure_exp_dir_exists()
with self.concerto_host as concerto_host:
concerto_host.get_files(
remote_files=["%s/%s" % (self.full_remote_exp_dir, fn) for fn in files_list],
local_location=self.local_wd
)
def execute(self, timeout: Optional[str] = None, timeout_graceful_exit_time: str = "10s"):
"""
Executes Concerto on the remote machine after sending the necessary files and cloning the Concerto git
repository
:param timeout: If not None, string in the timeout shell command format giving the maximum execution time.
:param timeout_graceful_exit_time: String in the timeout shell command format giving the time given by timeout
to the Concerto process to exit gracefully before being killed (only relevant if timeout is not None)
"""
self._ensure_exp_dir_exists()
with self.concerto_host as concerto_host:
print("Executing commands: %s" % self.git_clone_cmd)
concerto_host.run(self.git_clone_cmd)
with open(self.local_wd + "/concerto_config.json", "w") as concerto_config_file:
from json import dump
dump(self.concerto_config, concerto_config_file)
concerto_host.send_files(
local_files=[self.local_wd + "/concerto_config.json"],
remote_location=self.full_remote_exp_dir
)
if self.send_ssh_keys:
concerto_host.send_files(
local_files=["~/.ssh/id_rsa", "~/.ssh/id_rsa.pub"],
remote_location="~/.ssh"
)
run_cmd = "cd %s;" % self.full_concerto_dir +\
"source source_dir.sh;" +\
"cd %s;" % self.full_remote_exp_dir
if timeout:
run_cmd += "timeout -k %s %s python3 %s >stdout 2>stderr" % (timeout_graceful_exit_time,
timeout,
self.python_file)
else:
run_cmd += "python3 %s >stdout 2>stderr" % self.python_file
print("Executing commands: %s" % run_cmd)
concerto_host.run(run_cmd, wait=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
from enum import Enum
import subprocess
from typing import List
from execo.action import Put, Get, Remote
from execo.host import Host
......@@ -84,16 +85,16 @@ class RemoteHost:
return "RemoteHost(address=%s, user=%s, backend=%s)" % (self._remote_address, self._remote_user,
str(self._backend))
def run(self, command, **kwargs):
def run(self, command: str, **kwargs):
self._backend.run(command, **kwargs)
def send_files(self, local_files, remote_location='.'):
def send_files(self, local_files: List[str], remote_location: str = '.'):
self._backend.send_files(local_files, remote_location)
def get_files(self, remote_files, local_location='.'):
def get_files(self, remote_files: List[str], local_location: str = '.'):
self._backend.get_files(remote_files, local_location)
def write_file(self, text, remote_file_location):
def write_file(self, text: str, remote_file_location: str):
import tempfile
import os
(fd, fpath) = tempfile.mkstemp(text=True)
......
import time
from typing import List
from concerto.all import Component, Assembly, DepType
from concerto.utility import empty_transition
class ParallelTransitionsComponent(Component):
def __init__(self, nb_parallel_transitions: int, sleep_time: float, first: bool = False):
self._nb_parallel_transitions = nb_parallel_transitions
assert(nb_parallel_transitions >= 1)
self._sleep_time = sleep_time
self._first = first
super().__init__()
def create(self):
self.places = [
'beginning',
'ready',
'end'
]
self.initial_place = "beginning"
self.transitions = {
'wait_dep': ('beginning', 'ready', 'run', 0, empty_transition),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
for i in range(self._nb_parallel_transitions):
self.transitions["trans%d" % i] = ('ready', 'end', 'run', 0, self.run_function),
self.dependencies = {
"finished": (DepType.PROVIDE, ['end'])
}
if not self._first:
self.dependencies["previous"] = (DepType.USE, ['wait_dep'])
def run_function(self):
self.print_color("Waiting for %f seconds" % self._sleep_time)
time.sleep(self._sleep_time)
class SequenceAssembly(Assembly):
def __init__(self):
super().__init__()
self._current_nb = 0
self._last_chain_length = 0
def prepare(self, max_chain_length: int, nb_parallel_transitions: int, sleep_time: float):
self._current_nb = max_chain_length
for i in range(max_chain_length):
self.add_component("comp%d" % i, ParallelTransitionsComponent(nb_parallel_transitions, sleep_time,
first=(i == 0)))
self.synchronize()
def run(self, chain_length: int):
self._last_chain_length = chain_length
if chain_length > self._current_nb:
raise Exception("Chain length larger than the number of components!")
beginning_time = time.perf_counter()
for i in range(chain_length):
self.push_b("comp%d" % i, "run")
self.wait("comp%d" % (chain_length-1))
self.synchronize()
end_time = time.perf_counter()
return end_time-beginning_time
def reset(self):
for i in range(self._last_chain_length):
self.push_b("comp%d" % i, "reset")
self.synchronize()
def run_experiments(list_chain_length: List[int], list_nb_parallel_transitions: List[int], nb_repeats: int,
sleep_time: float = 0,
verbosity: int = -1, printing: bool = False, print_time: bool = False, dryrun: bool = False):
import json
from statistics import mean, stdev
from typing import Dict, Any
from concerto.utility import Printer
assembly = SequenceAssembly()
assembly.set_verbosity(verbosity)
assembly.set_print_time(print_time)
assembly.set_record_gantt(True)
assembly.set_dryrun(dryrun)
running_times: Dict[int, Dict[int, Dict[str, Any]]] = dict()
for nb_trans in list_nb_parallel_transitions:
running_times[nb_trans] = dict()
if printing:
Printer.st_tprint("Preparing the assembly with %d parallel transitions per component" % nb_trans)
assembly.prepare(max(list_chain_length), nb_trans, sleep_time)
time.sleep(5)
for chain_length in list_chain_length:
if printing:
Printer.st_tprint("Testing for a chain of length %d..." % chain_length)
running_times[nb_trans][chain_length] = {
"runs": []
}
for i in range(nb_repeats):
running_time = assembly.run(chain_length)
time.sleep(5)
assembly.reset()
running_times[nb_trans][chain_length]["runs"].append(running_time)
if printing:
Printer.st_tprint("- attempt %d: %f" % (i, running_time))
running_times[nb_trans][chain_length]["average"] = mean(running_times[nb_trans][chain_length]["runs"])
if printing:
Printer.st_tprint("- average: %f" % running_times[nb_trans][chain_length]["average"])
if nb_repeats >= 2:
running_times[nb_trans][chain_length]["std"] = stdev(running_times[nb_trans][chain_length]["runs"])
if printing:
Printer.st_tprint("- std: %f" % running_times[nb_trans][chain_length]["std"])
with open("times.json", "w") as f:
json.dump(running_times, f, indent='\t')
if printing:
Printer.st_tprint("Terminating assembly")
assembly.terminate()
gc = assembly.get_gantt_record()
gc.export_gnuplot("results.gpl")
gc.get_gantt_chart().export_json("results.json")
def load_config(conf_file_location):
from json import load
with open(conf_file_location, "r") as file:
conf = load(file)
return conf
def main():
config = load_config("concerto_config.json")
list_chain_length = config['list_chain_length']
list_nb_parallel_transitions = config['list_nb_parallel_transitions']
nb_repeats = config['nb_repeats']
run_experiments(
list_chain_length, list_nb_parallel_transitions, nb_repeats,
sleep_time=0,
verbosity=-1,
printing=True,
print_time=True,
)
if __name__ == '__main__':
main()
g5k:
# reservation: "2018-03-12 19:00:01"
walltime: "01:00:00"
dhcp: true
job_name: concerto_ssh_scalability
env_name: debian10-x64-base
resources:
machines:
- roles:
- concerto
cluster: ecotype
nodes: 1
primary_network: n1
secondary_networks: []
networks:
- id: n1
roles:
- control_network
- database_network
type: prod
site: nantes
#!/usr/bin/env python
import logging
from experiment_utilities.remote_host import RemoteHost
from experiment_utilities.reserve_g5k import G5kReservation
from experiment_utilities.concerto_g5k import ConcertoG5k
DEFAULT_WORKING_DIRECTORY = '.'
def run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats, conf,
working_directory=DEFAULT_WORKING_DIRECTORY, force_deployment=True, destroy=False):
with G5kReservation(conf, force_deployment, destroy) as g5k:
concerto_machine = g5k.get_hosts_info(role='concerto')[0]
print("Concerto: %s" % str(concerto_machine))
concerto_config = {
"concerto_host": concerto_machine,
"list_chain_length": list_chain_length,
"list_nb_parallel_transitions": list_nb_parallel_transitions,
"nb_repeats": nb_repeats
}
with g5k.ansible_to("concerto") as ansible_to_concerto:
ansible_to_concerto.apt(name=["python3", "git"], state="present")
with RemoteHost(concerto_machine["address"], remote_user="root") as concerto_host:
with ConcertoG5k(
remote_host=concerto_host,
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/component_sequence',
python_file='assembly.py',
concerto_config=concerto_config,
local_wd=working_directory
) as concerto_g5k:
concerto_g5k.execute(timeout="45m")
concerto_g5k.get_files(['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json'])
def perform_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats):
import yaml
from os import makedirs
with open("conf.yaml") as f:
conf = yaml.load(f)
wd = "exp"
makedirs(wd, exist_ok=True)
with open(wd + '/g5k_config.yaml', 'w') as g5k_config_file:
yaml.dump(conf, g5k_config_file)
run_experiment(list_chain_length, list_nb_parallel_transitions, nb_repeats, conf, wd)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
perform_experiment([1, 5, 25, 100], [1, 5, 25, 100], 5)
import time
from typing import List
from concerto.all import Component, Assembly
from concerto.utility import empty_transition
......@@ -16,16 +17,15 @@ class SSHCaller(Component):
'end'
]
self.initial_place = "beginning"
self.transitions = {
'call': ('beginning', 'end', 'call', 0, self.call_function),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
self.initial_place = "beginning"
def call_function(self):
self.print_color("Waiting for %f seconds" % self._sleep_time)
self.print_color(str(self._remote_host))
self._remote_host.run("sleep %f" % self._sleep_time)
......@@ -59,8 +59,8 @@ class SSHAssembly(Assembly):
self.synchronize()
def run_experiments(remote_hosts, list_nb_remote_ssh, nb_repeats,
verbosity: int = 0, printing: bool = False, print_time: bool = False, dryrun: bool = False):
def run_experiments(remote_hosts: List, list_nb_remote_ssh: List[int], nb_repeats: int,
verbosity: int = -1, printing: bool = False, print_time: bool = False, dryrun: bool = False):
import json
from statistics import mean, stdev
from typing import Dict, Any
......@@ -99,7 +99,7 @@ def run_experiments(remote_hosts, list_nb_remote_ssh, nb_repeats,
if printing:
Printer.st_tprint("- std: %f" % running_times[nb_remote_ssh]["std"])
with open("times.json", "w") as f:
json.dump(running_times, f)
json.dump(running_times, f, indent='\t')
if printing:
Printer.st_tprint("Terminating assembly")
......@@ -125,7 +125,7 @@ def main():
run_experiments(
remote_hosts, list_nb_remote_ssh, nb_repeats,
verbosity=1,
verbosity=-1,
printing=True,
print_time=True,
)
......
g5k:
# reservation: "2018-03-12 19:00:01"
oargrid_jobids: [['nantes', 175864]] # overriding the rest
walltime: "01:00:00"
dhcp: true
job_name: concerto_ssh_scalability
env_name: debian10-x64-base
# env_name: debian9-x64-nfs-madpp
resources:
machines:
- roles:
- remote
cluster: ecotype
nodes: 10
nodes: 10 # will be changed by reserve_and_test
primary_network: n1
secondary_networks: []
- roles:
......
......@@ -3,6 +3,7 @@ import logging
from experiment_utilities.remote_host import RemoteHost
from experiment_utilities.reserve_g5k import G5kReservation
from experiment_utilities.concerto_g5k import ConcertoG5k
CONCERTO_GIT = 'https://gitlab.inria.fr/mchardet/madpp.git'
CONCERTO_DIR_IN_GIT = 'madpp'
......@@ -31,39 +32,22 @@ def run_experiment(list_nb_remote_ssh, nb_repeats, conf, working_directory=DEFAU
"list_nb_remote_ssh": list_nb_remote_ssh,
"nb_repeats": nb_repeats
}
with open(working_directory + "/concerto_config.json", "w") as concerto_config_file:
dump(concerto_config, concerto_config_file)
with g5k.ansible_to("concerto") as ansible_to_concerto:
ansible_to_concerto.apt(name=["python3", "python3-pip", "git"], state="present")
ansible_to_concerto.pip(name=["enoslib"], executable="pip3")
with RemoteHost(concerto_machine["address"], remote_user="root") as concerto_host:
run_cmd = "mkdir -p %s;" % ROOT_DIR + \
"cd %s;" % ROOT_DIR + \
"git clone %s;" % CONCERTO_GIT
print("Executing commands: %s" % run_cmd)
concerto_host.run(run_cmd)
concerto_host.send_files(
local_files=[working_directory + "/concerto_config.json"],
remote_location=EXP_DIR
)
concerto_host.send_files(
local_files=["~/.ssh/id_rsa", "~/.ssh/id_rsa.pub"],
remote_location="~/.ssh"
)
run_cmd = "cd %s;" % CONCERTO_DIR + \
"source source_dir.sh;" + \
"cd %s;" % EXP_DIR + \
"timeout -k 10s 30m python3 %s >stdout 2>stderr" % PYTHON_FILE
print("Executing commands: %s" % run_cmd)
concerto_host.run(run_cmd)
files_to_get_names = ['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json']
files_to_get = ["%s/%s" % (EXP_DIR, fn) for fn in files_to_get_names]
concerto_host.get_files(
remote_files=files_to_get,
local_location=working_directory
)
with ConcertoG5k(
remote_host=concerto_host,
remote_exp_dir=ConcertoG5k.DEFAULT_CONCERTO_DIR_IN_GIT + '/tests/ssh_scalability',
python_file='assembly.py',
concerto_config=concerto_config,
local_wd=working_directory,
send_ssh_keys=True
) as concerto_g5k:
concerto_g5k.execute(timeout="45m")
concerto_g5k.get_files(['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json'])
def perform_experiment(list_nb_remote_ssh, nb_repeats):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment