Commit a46c8cd8 authored by Maverick Chardet's avatar Maverick Chardet
Browse files

Added experiment utilities files (G5K reservation with enoslib, SSH abstraction, ...)

Added SSH scalability experiment (TODO: debug)
parent d7adeb8c
from enum import Enum
import subprocess
from execo.action import Put, Get, Remote
from execo.host import Host
class RemoteHost:
class Backend(Enum):
SSH = 0
EXECO = 1
class _ExecoBackend:
def __init__(self, remote_address, remote_user):
self._remote_address = remote_address
self._remote_user = remote_user
self._host = Host(remote_address, user=remote_user)
def run(self, command):
act = Remote(
cmd=command,
hosts=[self._host]
).run()
def send_files(self, local_files, remote_location):
act = Put(
hosts=[self._host],
local_files=local_files,
remote_location=remote_location
).run()
def get_files(self, remote_files, local_location):
act = Get(
hosts=[self._host],
remote_files=remote_files,
local_location=local_location
).run()
class _SSHBackend:
def __init__(self, remote_address, remote_user):
self._remote_address = remote_address
self._remote_user = remote_user
self._ssh_adr = '%s@%s' % (remote_user, remote_address)
def run(self, command):
cproc = subprocess.run(['ssh', self._ssh_adr, command], check=True)
def send_files(self, local_files, remote_location):
args = ['scp']
for lf in local_files:
args.append(lf)
args.append("%s:%s" % (self._ssh_adr, remote_location))
cproc = subprocess.run(args, check=True)
def get_files(self, remote_files, local_location):
args = ['scp']
for rf in remote_files:
args.append("%s:%s" % (self._ssh_adr, rf))
args.append(local_location)
cproc = subprocess.run(args, check=True)
def __init__(self, remote_address, remote_user='root', backend=Backend.EXECO):
self._remote_address = remote_address
if backend is RemoteHost.Backend.EXECO:
self._backend = self._ExecoBackend(remote_address, remote_user)
else:
self._backend = self._SSHBackend(remote_address, remote_user)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def run(self, command):
self._backend.run(command)
def send_files(self, local_files, remote_location='.'):
self._backend.send_files(local_files, remote_location)
def get_files(self, remote_files, local_location='.'):
self._backend.get_files(remote_files, local_location)
def write_file(self, text, remote_file_location):
import tempfile, os
(fd, fpath) = tempfile.mkstemp(text=True)
b = str.encode(text)
os.write(fd, b)
os.close(fd)
self.send_files([fpath], remote_file_location)
os.remove(fpath)
def write_jinja2(self, template_text, parameters, remote_file_location):
from jinja2 import Template
template = Template(template_text)
final_text = template.render(parameters)
self.write_file(final_text, remote_file_location)
def write_jinja2_file(self, local_jinja2_file, parameters, remote_file_location):
with open(local_jinja2_file) as f:
self.write_jinja2(f.read(), parameters, remote_file_location)
def wait_for_port(self, port, sleep=1., timeout=300):
from exepriment_utilities.wait_for import wait_for_port
wait_for_port(port, self._remote_address, sleep=sleep, timeout=timeout)
#!/usr/bin/env python
import logging
import time
import yaml
class G5kReservation:
@staticmethod
def _g5k_deploy(g5k_config, force_deploy=False, **kwargs):
from enoslib.infra.enos_g5k.provider import G5k
provider = G5k(g5k_config)
roles, networks = provider.init(force_deploy=force_deploy)
env = {'roles': roles, 'networks': networks}
logging.info('Wait 30 seconds for iface to be ready...')
time.sleep(30)
return env, provider
@staticmethod
def _allocate(conf, provider='g5k', force_deployment=False):
env = {}
if isinstance(conf, str):
# Get the config object from a yaml file
with open(conf) as f:
config = yaml.load(f)
elif isinstance(conf, dict):
# Get the config object from a dict
config = conf
else:
# Data format error
raise Exception(
'conf is type {!r} while it should be a yaml file or a dict'.format(type(conf)))
env['db'] = config.get('database', 'mariadb')
env['monitoring'] = config.get('monitoring', False)
env['config'] = config
# Claim resources on Grid'5000
if not (provider == 'g5k' and 'g5k' in config):
raise Exception(
'The provider {!r} is not supported or it lacks a configuration'.format(provider))
env['provider'] = 'g5k'
updated_env, g5k_job = G5kReservation._g5k_deploy(config['g5k'], force_deploy=force_deployment)
env.update(updated_env)
return env, g5k_job
@staticmethod
def _get_ip(g5k_address):
from subprocess import run, PIPE
ip = run("dig +short %s" % g5k_address, shell=True, stdout=PIPE).stdout.decode('utf-8').strip(' \r\n')
return ip
@staticmethod
def _get_host_dict(g5k_address):
return {"address": g5k_address, "ip": G5kReservation._get_ip(g5k_address)}
def __init__(self, conf, force_deployment=True, destroy=False):
from execo.action import Put, Get, Remote
from execo.host import Host
from json import dump
self._env, self._g5k_job = G5kReservation._allocate(conf, 'g5k', force_deployment)
self._destroy = destroy
self._alive = True
def get_roles(self):
if not self._alive:
raise Exception("G5k reservation not alive!")
return self._env['roles'].keys()
def get_hosts_info(self, role):
if not self._alive:
raise Exception("G5k reservation not alive!")
return [G5kReservation._get_host_dict(host.address) for host in self._env['roles'][role]]
def terminate(self):
if not self._alive:
raise Exception("G5k reservation not alive!")
if self._destroy:
self._g5k_job.destroy()
self._alive = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._alive:
self.terminate()
def __del__(self):
if self._alive:
self.terminate()
import time
import socket
def wait_for_port(port, host='localhost', sleep=1., timeout=300):
start_time = time.perf_counter()
while True:
try:
with socket.create_connection((host, port), timeout=timeout):
break
except OSError as e:
if time.perf_counter() - start_time >= timeout:
raise TimeoutError('Timeout (%f s) waiting for port %d on host %s.'%(timeout,port,host)) from e
time.sleep(sleep)
import time
from concerto.all import Component, Assembly
from concerto.utility import empty_transition
class SSHCaller(Component):
def __init__(self, remote_address, sleep_time):
from exepriment_utilities.remote_host import RemoteHost
self._remote_host = RemoteHost(remote_address)
self._sleep_time = sleep_time
super().__init__()
def create(self):
self.places = [
'beginning',
'end'
]
self.transitions = {
'call': ('beginning', 'end', 'call', 0, self.call_function),
'reset': ('end', 'beginning', 'reset', 0, empty_transition)
}
self.initial_place = "beginning"
def call_function(self):
self._remote_host.run("sleep %f" % self._sleep_time)
class SSHAssembly(Assembly):
def __init__(self):
super().__init__()
self._current_nb = 0
self._last_nb_calls = 0
def prepare(self, remote_addresses, sleep_time):
self._current_nb = len(remote_addresses)
for i, address in enumerate(remote_addresses, sleep_time):
self.add_component("caller%d" % i, address)
self.synchronize()
def call(self, nb_calls):
self._last_nb_calls = nb_calls
if nb_calls > self._current_nb:
raise Exception("Number of calls larger than the number of components!")
beginning_time = time.perf_counter()
for i in range(nb_calls):
self.push_b("caller%d" % i, "call")
self.wait_all()
self.synchronize()
end_time = time.perf_counter()
return end_time-beginning_time
def reset(self):
for i in range(self._last_nb_calls):
self.push_b("caller%d" % i, "reset")
self.synchronize()
def run_experiments(remote_hosts, list_nb_remote_ssh, nb_repeats,
verbosity: int = 0, printing: bool = False, print_time: bool = False, dryrun: bool = False):
import json
from statistics import mean, stdev
from typing import Dict, Any
from concerto.utility import Printer
assembly = SSHAssembly()
assembly.set_verbosity(verbosity)
assembly.set_print_time(print_time)
assembly.set_use_gantt_chart(True)
assembly.set_dryrun(dryrun)
if printing:
Printer.st_tprint("Preparing the assembly")
assembly.prepare(remote_hosts, 10)
time.sleep(5)
running_times: Dict[int, Dict[str, Any]] = dict()
for nb_remote_ssh in list_nb_remote_ssh:
if printing:
Printer.st_tprint("Testing for %d remote SSH connections..." % nb_remote_ssh)
running_times[nb_remote_ssh] = {
"runs": []
}
for i in range(nb_repeats):
running_time = assembly.call(nb_remote_ssh)
time.sleep(5)
assembly.reset()
running_times[nb_remote_ssh]["runs"].append(running_time)
if printing:
Printer.st_tprint("- attempt %d: %f" % (i, running_time))
running_times[nb_remote_ssh]["average"] = mean(running_times[nb_remote_ssh]["runs"])
if printing:
Printer.st_tprint("- average: %f" % running_times[nb_remote_ssh]["average"])
if nb_repeats >= 2:
running_times[nb_remote_ssh]["std"] = stdev(running_times[nb_remote_ssh]["runs"])
if printing:
Printer.st_tprint("- std: %f" % running_times[nb_remote_ssh]["std"])
with open("times.json", "w") as f:
json.dump(running_times, f)
if printing:
Printer.st_tprint("Terminating assembly")
assembly.terminate()
gc = assembly.get_gantt_chart()
gc.export_gnuplot("results.gpl")
gc.export_json("results.json")
def load_config(conf_file_location):
from json import load
with open(conf_file_location, "r") as file:
conf = load(file)
return conf
def main():
config = load_config("concerto_config.json")
remote_hosts = config['remote_hosts']
list_nb_remote_ssh = config['list_nb_remote_ssh']
nb_repeats = config['nb_repeats']
run_experiments(
remote_hosts, list_nb_remote_ssh, nb_repeats,
verbosity=1,
printing=True,
print_time=True,
)
if __name__ == '__main__':
main()
g5k:
# reservation: "2018-03-12 19:00:01"
oargrid_jobid: 65542 # overriding the rest
walltime: "02:00:00"
dhcp: true
job_name: concerto_ssh_scalability
env_name: debian9-x64-nfs-madpp
resources:
machines:
- roles:
- remote
cluster: ecotype
nodes: 20
primary_network: n1
secondary_networks: []
- roles:
- concerto
cluster: ecotype
nodes: 1
primary_network: n1
secondary_networks: []
networks:
- id: n1
roles:
- control_network
- database_network
type: prod
site: nantes
#!/usr/bin/env python
import logging
from exepriment_utilities.remote_host import RemoteHost
from exepriment_utilities.reserve_g5k import G5kReservation
CONCERTO_GIT = 'https://gitlab.inria.fr/mchardet/madpp.git'
CONCERTO_DIR_IN_GIT = 'madpp'
EXP_DIR_IN_GIT = CONCERTO_DIR_IN_GIT + '/tests/ssh_scalability'
ROOT_DIR = '~/concertonode'
PYTHON_FILE = 'assembly.py'
CONCERTO_DIR = '%s/%s' % (ROOT_DIR, CONCERTO_DIR_IN_GIT)
EXP_DIR = '%s/%s' % (ROOT_DIR, EXP_DIR_IN_GIT)
DEFAULT_WORKING_DIRECTORY = '.'
def run_experiment(list_nb_remote_ssh, nb_repeat, conf, working_directory=DEFAULT_WORKING_DIRECTORY,
force_deployment=True, destroy=False):
from json import dump
with G5kReservation(conf, force_deployment, destroy) as g5k:
remote_machines = g5k.get_hosts_info(role='remote')
concerto_machine = g5k.get_hosts_info(role='concerto')[0]
print("Remote: %s" % str(remote_machines))
print("Concerto: %s" % str(concerto_machine))
concerto_config = {
"remote_hosts": remote_machines,
"concerto_host": concerto_machine,
"list_nb_remote_ssh": list_nb_remote_ssh,
"nb_repeat": nb_repeat
}
with open(working_directory + "/concerto_config.json", "w") as concerto_config_file:
dump(concerto_config, concerto_config_file)
with RemoteHost(concerto_machine["address"], remote_user="root") as concerto_host:
run_cmd = "mkdir -p %s;" % ROOT_DIR + \
"cd %s;" % ROOT_DIR + \
"git clone %s;" % CONCERTO_GIT
print("Executing commands: %s" % run_cmd)
concerto_host.run(run_cmd)
concerto_host.send_files(
local_files=[working_directory + "/concerto_config.json"],
remote_location=EXP_DIR
)
concerto_host.send_files(
local_files=["~/.ssh/id_rsa", "~/.ssh/id_rsa.pub"],
remote_location="~/.ssh"
)
run_cmd = "cd %s;" % CONCERTO_DIR + \
"source source_dir.sh;" + \
"cd %s;" % EXP_DIR + \
"timeout -k 10s 30m python3 %s >stdout 2>stderr" % PYTHON_FILE
print("Executing commands: %s" % run_cmd)
concerto_host.run(run_cmd)
files_to_get_names = ['stdout', 'stderr', 'results.gpl', 'results.json', 'times.json']
files_to_get = ["%s/%s" % (EXP_DIR, fn) for fn in files_to_get_names]
concerto_host.get_files(
remote_files=files_to_get,
local_location=working_directory
)
def perform_experiment(list_nb_remote_ssh, nb_repeat):
import yaml
from os import makedirs
with open("conf.yaml") as f:
conf = yaml.load(f)
conf['g5k']['resources']['machines'][0]['nodes'] = max(list_nb_remote_ssh)
wd = "exp"
makedirs(wd, exist_ok=True)
with open(wd + '/g5k_config.yaml', 'w') as g5k_config_file:
yaml.dump(conf, g5k_config_file)
run_experiment(list_nb_remote_ssh, nb_repeat, conf, wd)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
perform_experiment([1, 5, 10, 15, 20], 5)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment