-
Quentin Guilloteau authoredQuentin Guilloteau authored
script.py 4.61 KiB
from nixos_compose.nxc_execo import get_oar_job_nodes_nxc, build_nxc_execo
from execo import Process, SshProcess, Remote
from execo_g5k import oardel, oarsub, OarSubmission, get_oar_job_nodes
from execo_engine import Engine, logger, ParamSweeper, sweep
import sys
import os
import shutil
class MyEngine(Engine):
def __init__(self):
super(MyEngine, self).__init__()
parser = self.args_parser
parser.add_argument('--nxc_build_file', help='Path to the NXC deploy file')
parser.add_argument('--nb_nodes', help='Number of nodes')
parser.add_argument('--block_size', help='Size of the file to write')
parser.add_argument('--walltime', help='walltime in hours')
parser.add_argument('--result_dir', help='where to store results')
parser.add_argument('--flavour', help='Flavour')
parser.add_argument('--outfile', help='outfile')
self.nodes = {}
self.oar_job_id = -1
self.nb_nodes = -1
self.flavour = None
def init(self):
self.nb_nodes = int(self.args.nb_nodes) if self.args.nb_nodes else 2
walltime_hours = float(self.args.walltime) if self.args.walltime else 1
nxc_build_file = self.args.nxc_build_file
self.flavour = self.args.flavour if self.args.flavour else "g5k-image"
site = "nancy"
cluster = "gros"
# site = "grenoble"
# cluster = "dahu"
oar_job = reserve_nodes(self.nb_nodes, site, cluster, "deploy" if self.flavour == "g5k-image" else "allow_classic_ssh", walltime=walltime_hours*60*60)
self.oar_job_id, site = oar_job[0]
roles_quantities = {"server": 1, "node": self.nb_nodes - 1}
self.nodes = get_oar_job_nodes_nxc(
self.oar_job_id,
site,
flavour_name=self.flavour,
compose_info_file=nxc_build_file,
roles_quantities=roles_quantities)
print(self.nodes)
def run(self):
result_dir = self.args.result_dir if self.args.result_dir else os.getcwd()
block_size = self.args.block_size if self.args.block_size else "1G"
zip_archive_name = f"{result_dir}/results_ior_{self.nb_nodes}_nodes_{block_size}_block_size_{self.flavour}"
outfile = self.args.outfile[:-4] if self.args.outfile else zip_archive_name
folder_name = f"{result_dir}/expe_nfs_{self.flavour}_{self.nb_nodes}_{block_size}"
create_folder(folder_name)
logger.info("Generating IOR config")
run_ior_config_remote = Remote(f"generate_ior_config {self.nb_nodes - 1} {block_size}", self.nodes["node"][0], connection_params={'user': 'root'})
run_ior_config_remote.run()
for nb_node in range(self.nb_nodes - 1, 0, -1):
is_ok = False
while not is_ok:
# Run IOR
logger.info(f"Starting IOR with {nb_node} nodes")
run_ior_remote = Remote(f"start_ior_nodes {nb_node} {self.nb_nodes - 1}", self.nodes["node"][0], connection_params={'user': 'root'})
run_ior_remote.run()
is_ok = run_ior_remote.ok
logger.info(f"IOR with {nb_node} nodes succeed")
is_ok = False
while not is_ok:
# Get the result file back
logger.info(f"Retreving the result file for IOR with {nb_node}")
get_file_command = f"cp /srv/shared/results_ior.json {folder_name}/results_ior_total_{self.nb_nodes}_active_{nb_node}_{self.flavour}.json"
get_file_remote = Remote(get_file_command, self.nodes["server"], connection_params={'user': 'root'})
get_file_remote.run()
is_ok = get_file_remote.ok
logger.info(f"Result file for IOR with {nb_node} retrieved")
logger.info(f"Zipping the data -> {outfile}.zip")
zip_files(outfile, folder_name)
remove_folder(folder_name)
logger.info(f"Giving back the resources")
oardel([(self.oar_job_id, "nancy")])
def reserve_nodes(nb_nodes, site, cluster, job_type, walltime=3600):
jobs = oarsub([(OarSubmission("{{cluster='{}'}}/nodes={}".format(cluster, nb_nodes), walltime, job_type=[job_type]), site)])
return jobs
def zip_files(name, folder):
"""
zip the folder
"""
shutil.make_archive(name, "zip", folder)
def create_folder(folder):
"""
Create a folder
"""
if not os.path.exists(folder):
os.makedirs(folder)
def remove_folder(folder):
"""
Remove the folder
"""
shutil.rmtree(folder)
if __name__ == "__main__":
ENGINE = MyEngine()
try:
ENGINE.start()
except Exception as ex:
print(f"Failing with error {ex}")