Mentions légales du service

Skip to content
Snippets Groups Projects
script.py 4.61 KiB
from nixos_compose.nxc_execo import get_oar_job_nodes_nxc, build_nxc_execo

from execo import Process, SshProcess, Remote
from execo_g5k import oardel, oarsub, OarSubmission, get_oar_job_nodes
from execo_engine import Engine, logger, ParamSweeper, sweep

import sys
import os
import shutil


class MyEngine(Engine):
    def __init__(self):
        super(MyEngine, self).__init__()
        parser = self.args_parser
        parser.add_argument('--nxc_build_file', help='Path to the NXC deploy file')
        parser.add_argument('--nb_nodes', help='Number of nodes')
        parser.add_argument('--block_size', help='Size of the file to write')
        parser.add_argument('--walltime', help='walltime in hours')
        parser.add_argument('--result_dir', help='where to store results')
        parser.add_argument('--flavour', help='Flavour')
        parser.add_argument('--outfile', help='outfile')
        self.nodes = {}
        self.oar_job_id = -1
        self.nb_nodes = -1
        self.flavour = None

    def init(self):
        self.nb_nodes = int(self.args.nb_nodes) if self.args.nb_nodes else 2
        walltime_hours = float(self.args.walltime) if self.args.walltime else 1
        nxc_build_file = self.args.nxc_build_file
        self.flavour = self.args.flavour if self.args.flavour else "g5k-image"


        site = "nancy"
        cluster = "gros"
        # site = "grenoble"
        # cluster = "dahu"

        oar_job = reserve_nodes(self.nb_nodes, site, cluster, "deploy" if self.flavour == "g5k-image" else "allow_classic_ssh",  walltime=walltime_hours*60*60)
        self.oar_job_id, site = oar_job[0]

        roles_quantities = {"server": 1, "node": self.nb_nodes - 1}

        self.nodes = get_oar_job_nodes_nxc(
            self.oar_job_id,
            site,
            flavour_name=self.flavour,
            compose_info_file=nxc_build_file,
            roles_quantities=roles_quantities)
        print(self.nodes)

    def run(self):
        result_dir = self.args.result_dir if self.args.result_dir else os.getcwd()
        block_size = self.args.block_size if self.args.block_size else "1G"

        zip_archive_name = f"{result_dir}/results_ior_{self.nb_nodes}_nodes_{block_size}_block_size_{self.flavour}"
        outfile = self.args.outfile[:-4] if self.args.outfile else zip_archive_name

        folder_name = f"{result_dir}/expe_nfs_{self.flavour}_{self.nb_nodes}_{block_size}"
        create_folder(folder_name)

        logger.info("Generating IOR config")
        run_ior_config_remote = Remote(f"generate_ior_config {self.nb_nodes - 1} {block_size}", self.nodes["node"][0], connection_params={'user': 'root'})
        run_ior_config_remote.run()

        for nb_node in range(self.nb_nodes - 1, 0, -1):
            is_ok = False
            while not is_ok:
                # Run IOR
                logger.info(f"Starting IOR with {nb_node} nodes")
                run_ior_remote = Remote(f"start_ior_nodes {nb_node} {self.nb_nodes - 1}", self.nodes["node"][0], connection_params={'user': 'root'})
                run_ior_remote.run()
                is_ok = run_ior_remote.ok

            logger.info(f"IOR with {nb_node} nodes succeed")

            is_ok = False
            while not is_ok:
                # Get the result file back
                logger.info(f"Retreving the result file for IOR with {nb_node}")
                get_file_command = f"cp /srv/shared/results_ior.json {folder_name}/results_ior_total_{self.nb_nodes}_active_{nb_node}_{self.flavour}.json"
                get_file_remote = Remote(get_file_command, self.nodes["server"], connection_params={'user': 'root'})
                get_file_remote.run()
                is_ok = get_file_remote.ok
            logger.info(f"Result file for IOR with {nb_node} retrieved")

        logger.info(f"Zipping the data -> {outfile}.zip")
        zip_files(outfile, folder_name)
        remove_folder(folder_name)

        logger.info(f"Giving back the resources")
        oardel([(self.oar_job_id, "nancy")])

def reserve_nodes(nb_nodes, site, cluster, job_type, walltime=3600):
    jobs = oarsub([(OarSubmission("{{cluster='{}'}}/nodes={}".format(cluster, nb_nodes), walltime, job_type=[job_type]), site)])
    return jobs

def zip_files(name, folder):
    """
    zip the folder
    """
    shutil.make_archive(name, "zip", folder)

def create_folder(folder):
    """
    Create a folder
    """
    if not os.path.exists(folder):
        os.makedirs(folder)

def remove_folder(folder):
    """
    Remove the folder
    """
    shutil.rmtree(folder)


if __name__ == "__main__":
    ENGINE = MyEngine()
    try:
        ENGINE.start()
    except Exception as ex:
        print(f"Failing with error {ex}")