From b4570736d9f6c17ab9fb7aa7b0e77662e8fb6265 Mon Sep 17 00:00:00 2001 From: msimonin <matthieu.simonin@inria.fr> Date: Mon, 11 Jan 2021 22:24:31 +0100 Subject: [PATCH] service/conda: introduce new Dask deployment. This will manage the dask agents life cycle. But this assumes conda to be set up in the resources prior to the deployment. This obviously speed up the deployment. In G5k this can be achieved by setting up your conda environment in a frontend (shared space) and running the Dask service on the production nodes (allow_classic_ssh job type). The service follows the standard (deploy, destroy) service definition, but it can also be used as a context manager. Also: - G5k: introduce G5kTunnel - make g5k_api_utils available in __all__ --- docs/apidoc/examples/conda.py | 51 ----- docs/apidoc/examples/dask.ipynb | 351 +++++++++++++++++++++++++++++ docs/apidoc/examples/dask.py | 41 ---- enoslib/__init__.py | 16 +- enoslib/infra/enos_g5k/provider.py | 60 ++++- enoslib/infra/enos_g5k/utils.py | 2 +- enoslib/service/__init__.py | 2 +- enoslib/service/conda/__init__.py | 2 +- enoslib/service/conda/conda.py | 170 +++++++++++++- 9 files changed, 584 insertions(+), 111 deletions(-) delete mode 100644 docs/apidoc/examples/conda.py create mode 100644 docs/apidoc/examples/dask.ipynb delete mode 100644 docs/apidoc/examples/dask.py diff --git a/docs/apidoc/examples/conda.py b/docs/apidoc/examples/conda.py deleted file mode 100644 index 772107a3..00000000 --- a/docs/apidoc/examples/conda.py +++ /dev/null @@ -1,51 +0,0 @@ -from enoslib.infra.enos_g5k.provider import G5k -from enoslib.infra.enos_g5k.configuration import (Configuration, - NetworkConfiguration) -from enoslib.service.conda import Conda, conda_run_command, conda_play_on -import logging -import os -import time - -logging.basicConfig(level=logging.DEBUG) - -# claim the resources -conf = Configuration.from_settings(job_type="allow_classic_ssh", - job_name="conda") -network = NetworkConfiguration(id="n1", - type="prod", - roles=["my_network"], - site="rennes") -conf.add_network_conf(network)\ - .add_machine(roles=["control"], - cluster="parapluie", - nodes=2, - primary_network=network)\ - .finalize() - -provider = G5k(conf) -roles, networks = provider.init() - -# let's provision a new env -m = Conda(nodes=roles["control"]) -m.deploy(env_name="plop", packages=["dask"]) - -# make use of this environment new environment -r = conda_run_command("conda env export", "plop", roles=roles) -print(r) - -# make use of an existing environment (somewhere in ~/miniconda3 most probably) -# this is practical because the env can be created on a shared filesystem -# ans use on all the nodes -r = conda_run_command("conda env export", "spark", roles=roles, run_as="msimonin") -print(r) - -# run this in the new (local to the node) environment -with conda_play_on("plop", roles=roles) as p: - p.shell("conda env export > /tmp/plop.env") - p.fetch(src="/tmp/plop.env", dest="/tmp/plop.env") - -# run this in a shared environment -with conda_play_on("spark", roles=roles, run_as="msimonin") as p: - # launch a script that requires spark n'co - p.shell("conda env export > /tmp/spark.env") - p.fetch(src="/tmp/plop.env", dest="/tmp/spark.env") \ No newline at end of file diff --git a/docs/apidoc/examples/dask.ipynb b/docs/apidoc/examples/dask.ipynb new file mode 100644 index 00000000..ae95b540 --- /dev/null +++ b/docs/apidoc/examples/dask.ipynb @@ -0,0 +1,351 @@ +{ + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5-final" + }, + "orig_nbformat": 2, + "kernelspec": { + "name": "python3", + "display_name": "Python 3.8.5 64-bit ('andromak': conda)", + "metadata": { + "interpreter": { + "hash": "044ddde3072526b14262047f16da4fe10c08ccab664ff325d5dd19288f92d50e" + } + } + } + }, + "nbformat": 4, + "nbformat_minor": 2, + "cells": [ + { + "source": [ + "# Dask deployment on Grid'5000\n", + "\n", + "This notebook will deploy a Dask cluster on Grid'5000 and launch a simpe computation.\n", + "\n", + "Requirements: \n", + " - A conda[[1]] environment setup on the Grid'5000 frontend with dask installed and EnOSlib.\n", + " - The same environment can be use to run this notebook from your local machine. \n", + "\n", + "[1]: https://docs.conda.io/en/latest/miniconda.html#linux-installers\n", + "\n", + "\n", + "## Initial impors\n", + " " + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "from enoslib import *\n", + "import logging\n", + "\n", + "# get some logs\n", + "logging.basicConfig(level=logging.INFO)" + ] + }, + { + "source": [ + "## Get some resources on Grid'5000\n", + "\n", + "This will reserve two nodes, where the Dask cluster will be deployed later." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from grenoble\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from lille\n", + "{'roles': ['scheduler'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}\n", + "{'roles': ['worker'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}\n", + "{\n", + " \"dhcp\": true,\n", + " \"force_deploy\": false,\n", + " \"env_name\": \"debian10-x64-nfs\",\n", + " \"job_name\": \"dask\",\n", + " \"job_type\": \"allow_classic_ssh\",\n", + " \"key\": \"/home/msimonin/.ssh/id_rsa.pub\",\n", + " \"queue\": \"default\",\n", + " \"walltime\": \"02:00:00\",\n", + " \"resources\": {\n", + " \"machines\": [\n", + " {\n", + " \"roles\": [\n", + " \"scheduler\"\n", + " ],\n", + " \"primary_network\": \"prod\",\n", + " \"secondary_networks\": [],\n", + " \"cluster\": \"parapide\",\n", + " \"nodes\": 1\n", + " },\n", + " {\n", + " \"roles\": [\n", + " \"worker\"\n", + " ],\n", + " \"primary_network\": \"prod\",\n", + " \"secondary_networks\": [],\n", + " \"cluster\": \"parapide\",\n", + " \"nodes\": 1\n", + " }\n", + " ],\n", + " \"networks\": [\n", + " {\n", + " \"id\": \"prod\",\n", + " \"type\": \"prod\",\n", + " \"roles\": [\n", + " \"network\"\n", + " ],\n", + " \"site\": \"rennes\"\n", + " }\n", + " ]\n", + " }\n", + "}\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from luxembourg\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from lyon\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from nancy\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from nantes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from rennes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading 1425746 from rennes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from sophia\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Waiting for 1425746 on rennes [2021-01-12 13:37:51]\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:All jobs are Running !\n" + ] + } + ], + "source": [ + "prod = G5kNetworkConf(id=\"prod\", roles=[\"network\"], type=\"prod\", site=\"rennes\")\n", + "conf = (\n", + " G5kConf.from_settings(job_name=\"dask\", job_type=\"allow_classic_ssh\")\n", + " .add_machine(roles=[\"scheduler\"], cluster=\"parapide\", nodes=1, primary_network=prod)\n", + " .add_machine(roles=[\"worker\"], cluster=\"parapide\", nodes=1, primary_network=prod)\n", + " .add_network_conf(prod)\n", + ").finalize()\n", + "provider = G5k(conf)\n", + "roles, _ = provider.init()" + ] + }, + { + "source": [ + "# Deploy Dask on the nodes\n", + "This assumes that the conda environment (dask-base) is configured in your home directory in `/home/<user>/miniconda3`.\n", + "\n", + "If the installation path differs, you can specify it using the `conda_prefix` parameter. " + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpc775senv with vars:\n", + "{}\n", + "source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler\n", + "\n", + "PLAY [scheduler] ***************************************************************\n", + "\n", + "TASK [(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler'] ***\n", + "Tuesday 12 January 2021 14:14:40 +0100 (0:13:33.402) 0:28:23.917 ******* \n", + "changed: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "TASK [__calling__ wait_for] ****************************************************\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:01.219) 0:28:25.136 ******* \n", + "ok: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-12.rennes.grid5000.fr : ok=2 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:00.436) 0:28:25.573 ******* \n", + "=============================================================================== \n", + "(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler' --- 1.22s\n", + "__calling__ wait_for ---------------------------------------------------- 0.44s\n", + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmp8e30zh9l with vars:\n", + "{}\n", + "{'code': 0, 'result': [{'parapide-12.rennes.grid5000.fr': {'ok': 2, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpc775senv'}\n", + "\n", + "PLAY [worker] ******************************************************************\n", + "\n", + "TASK [(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-worker tcp://parapide-12.rennes.grid5000.fr:8786 '] ***\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:00.082) 0:28:25.656 ******* \n", + "changed: [parapide-16.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-16.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:43 +0100 (0:00:01.167) 0:28:26.823 ******* \n", + "=============================================================================== \n", + "(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-worker tcp://parapide-12.rennes.grid5000.fr:8786 ' --- 1.17s\n", + "{'code': 0, 'result': [{'parapide-16.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmp8e30zh9l'}\n" + ] + } + ], + "source": [ + "username = g5k_api_utils.get_api_username()\n", + "dask = Dask(\"dask-base\", scheduler=roles[\"scheduler\"][0], workers=roles[\"worker\"], run_as=username)\n", + "dask.deploy()" + ] + }, + { + "source": [ + "## Using Dask\n", + "\n", + "Here we go with a simple computation (3 tasks, 2 dependent-ones). \n", + "The below code will create all the tunnels needed to access the Dask dashboard and the scheduler." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "from dask import delayed\n", + "import time\n", + "\n", + "def inc(x):\n", + " time.sleep(5)\n", + " return x + 1\n", + "\n", + "def dec(x):\n", + " time.sleep(3)\n", + " return x - 1\n", + "\n", + "def add(x, y):\n", + " time.sleep(7)\n", + " return x + y\n", + "\n", + "x = delayed(inc)(1)\n", + "y = delayed(dec)(2)\n", + "total = delayed(add)(x, y)" + ] + }, + { + "source": [ + "## Launch the computation\n", + "\n", + "In the mean time you can check the web dashboard. The connection URL will be displayed." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)\n", + "INFO:paramiko.transport:Authentication (publickey) successful!\n", + "INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)\n", + "dashboard: http://0.0.0.0:38383\n", + "INFO:paramiko.transport:Authentication (publickey) successful!\n", + "Scheduler address: 0.0.0.0:35945\n", + "result=3\n" + ] + } + ], + "source": [ + "from dask.distributed import Client\n", + "# Tunnel to the dashboard\n", + "addr, port, tunnel = G5kTunnel(dask.scheduler.address, 8787).start()\n", + "print(f\"dashboard: http://{addr}:{port}\")\n", + "with G5kTunnel(dask.scheduler.address, 8786) as (addr, port, _):\n", + " print(f\"Scheduler address: {addr}:{port}\")\n", + " client = Client(f\"tcp://{addr}:{port}\")\n", + " # launch a computation\n", + " print(f\"result={total.compute()}\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpt2mbeu_y with vars:\n", + "{}\n", + "\n", + "PLAY [scheduler] ***************************************************************\n", + "\n", + "TASK [Killing the dask scheduler] **********************************************\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:13.913) 0:28:40.736 ******* \n", + "changed: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-12.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:00.203) 0:28:40.940 ******* \n", + "=============================================================================== \n", + "Killing the dask scheduler ---------------------------------------------- 0.21s\n", + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpr0joxpnq with vars:\n", + "{}\n", + "{'code': 0, 'result': [{'parapide-12.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpt2mbeu_y'}\n", + "\n", + "PLAY [worker] ******************************************************************\n", + "\n", + "TASK [Killing the dask worker] *************************************************\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:00.074) 0:28:41.014 ******* \n", + "changed: [parapide-16.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-16.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:58 +0100 (0:00:00.202) 0:28:41.217 ******* \n", + "=============================================================================== \n", + "Killing the dask worker ------------------------------------------------ 0.21s\n", + "{'code': 0, 'result': [{'parapide-16.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpr0joxpnq'}\n" + ] + } + ], + "source": [ + "# will stop the tunnel to the dashboard and the Dask cluster.\n", + "if tunnel is not None:\n", + " tunnel.stop(force=True)\n", + "dask.destroy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ] +} \ No newline at end of file diff --git a/docs/apidoc/examples/dask.py b/docs/apidoc/examples/dask.py deleted file mode 100644 index 5ddb01e3..00000000 --- a/docs/apidoc/examples/dask.py +++ /dev/null @@ -1,41 +0,0 @@ -from enoslib.infra.enos_g5k.provider import G5k -from enoslib.infra.enos_g5k.configuration import (Configuration, - NetworkConfiguration) -from enoslib.service import Dask - -import logging -import os -import time - -logging.basicConfig(level=logging.INFO) - -# path to the inventory -inventory = os.path.join(os.getcwd(), "hosts") - -# claim the resources -conf = Configuration.from_settings(job_type="allow_classic_ssh", - job_name="dask") -network = NetworkConfiguration(id="n1", - type="prod", - roles=["my_network"], - site="rennes") -conf.add_network_conf(network)\ - .add_machine(roles=["control"], - cluster="paravance", - nodes=2, - primary_network=network)\ - .finalize() - -provider = G5k(conf) -roles, networks = provider.init() - -m = Dask(scheduler=roles["control"][0], - worker=roles["control"], - env_file="./environment.yml") -m.deploy() - -time.sleep(10) -m.destroy() - -# destroy the boxes -provider.destroy() diff --git a/enoslib/__init__.py b/enoslib/__init__.py index 12d2f85d..8b9e5d52 100644 --- a/enoslib/__init__.py +++ b/enoslib/__init__.py @@ -14,7 +14,8 @@ from enoslib.api import ( ) # Services -from enoslib.service.conda.conda import Conda, conda_play_on, conda_run_command, Dask +from enoslib.service.conda.conda import Dask, in_conda_cmd + from enoslib.service.docker.docker import Docker from enoslib.service.dstat.dstat import Dstat from enoslib.service.locust.locust import Locust @@ -24,7 +25,8 @@ from enoslib.service.netem.netem import Netem, SimpleNetem from enoslib.service.skydive.skydive import Skydive # Providers -from enoslib.infra.enos_g5k.provider import G5k +from enoslib.infra.enos_g5k.provider import G5k, G5kTunnel +import enoslib.infra.enos_g5k.g5k_api_utils as g5k_api_utils from enoslib.infra.enos_g5k.configuration import Configuration as G5kConf from enoslib.infra.enos_g5k.configuration import NetworkConfiguration as G5kNetworkConf from enoslib.infra.enos_g5k.configuration import ServersConfiguration as G5kServersConf @@ -64,16 +66,14 @@ from enoslib.infra.enos_vmong5k.provider import start_virtualmachines from enoslib.infra.enos_iotlab.provider import Iotlab from enoslib.infra.enos_iotlab.configuration import Configuration as IotlabConf -from enoslib.infra.enos_iotlab.objects import ( - IotlabSensor, - IotlabSniffer, - IotlabSerial, -) +from enoslib.infra.enos_iotlab.objects import IotlabSensor, IotlabSniffer, IotlabSerial try: from enoslib.infra.enos_chameleonbaremetal.provider import Chameleonbaremetal as CBM - from enoslib.infra.enos_chameleonbaremetal.configuration import Configuration as CBMConf + from enoslib.infra.enos_chameleonbaremetal.configuration import ( + Configuration as CBMConf, + ) from enoslib.infra.enos_chameleonbaremetal.configuration import ( MachineConfiguration as CBMMachineConf, ) diff --git a/enoslib/infra/enos_g5k/provider.py b/enoslib/infra/enos_g5k/provider.py index 4045c567..989886e6 100644 --- a/enoslib/infra/enos_g5k/provider.py +++ b/enoslib/infra/enos_g5k/provider.py @@ -3,8 +3,11 @@ import copy import logging import operator from itertools import groupby +import os from typing import List, Optional, Tuple, cast +from sshtunnel import SSHTunnelForwarder + from enoslib.host import Host from enoslib.infra.enos_g5k.concrete import ( ConcreteClusterConf, @@ -27,7 +30,7 @@ from enoslib.infra.enos_g5k.constants import ( ) from enoslib.infra.enos_g5k.driver import get_driver from enoslib.infra.enos_g5k.error import MissingNetworkError -from enoslib.infra.enos_g5k.g5k_api_utils import OarNetwork +from enoslib.infra.enos_g5k.g5k_api_utils import OarNetwork, get_api_username from enoslib.infra.enos_g5k.objects import ( G5kHost, G5kNetwork, @@ -207,6 +210,48 @@ def _join(machines: List[ConcreteGroup], networks: List[G5kNetwork]) -> List[G5k return hosts +class G5kTunnel(object): + """A context Manager that initiate a tunnel to a grid node if needed.""" + + def __init__(self, address: str, port: int): + """ + Args: + address: The ip address/fqdn of the targetted service + port: The port of the targetted service + """ + self.address = address + self.port = port + + # computed + self.tunnel = None + + def start(self): + import socket + + if "grid5000.fr" not in socket.getfqdn(): + logging.debug(f"Creating a tunnel to {self.address}:{self.port}") + self.tunnel = SSHTunnelForwarder( + "access.grid5000.fr", + ssh_username=get_api_username(), + remote_bind_address=(self.address, self.port), + ) + self.tunnel.start() + local_address, local_port = self.tunnel.local_bind_address + return local_address, local_port, self.tunnel + return self.address, self.port, None + + def close(self): + if self.tunnel is not None: + logging.debug(f"Closing the tunnel to {self.address}:{self.port}") + self.tunnel.stop(force=True) + + def __enter__(self): + return self.start() + + def __exit__(self, *args): + self.close() + + class G5k(Provider): """The provider to use when deploying on Grid'5000.""" @@ -408,6 +453,19 @@ class G5k(Provider): networks.extend(network.to_enos()) return roles, networks + @staticmethod + def tunnel(address: str, port: int): + """Create a tunnel if necessary between here and there (in G5k). + + Args: + address: The remote address to reach (assuming inside g5k) + port: The remote port to reach + + Returns + The context manager + """ + return G5kTunnel(address, port).start() + def __str__(self): return "G5k" diff --git a/enoslib/infra/enos_g5k/utils.py b/enoslib/infra/enos_g5k/utils.py index a7d11eca..d3a0a257 100644 --- a/enoslib/infra/enos_g5k/utils.py +++ b/enoslib/infra/enos_g5k/utils.py @@ -5,7 +5,7 @@ from operator import itemgetter from typing import Dict, List, Tuple import logging -from enoslib.infra.enos_g5k import remote +from . import remote logger = logging.getLogger(__name__) diff --git a/enoslib/service/__init__.py b/enoslib/service/__init__.py index 638fed4a..b0dcbbdf 100644 --- a/enoslib/service/__init__.py +++ b/enoslib/service/__init__.py @@ -4,4 +4,4 @@ from .netem.netem import Netem, SimpleNetem # noqa from .skydive.skydive import Skydive # noqa from .locust.locust import Locust # noqa from .dstat.dstat import Dstat # noqa -from .conda.conda import Conda, Dask # noqa +from .conda.conda import _Conda, Dask # noqa diff --git a/enoslib/service/conda/__init__.py b/enoslib/service/conda/__init__.py index b5babcba..4aa693b4 100644 --- a/enoslib/service/conda/__init__.py +++ b/enoslib/service/conda/__init__.py @@ -1 +1 @@ -from .conda import Conda, Dask, conda_run_command, conda_play_on # noqa +from .conda import _Conda, Dask, _conda_run_command, _conda_play_on # noqa diff --git a/enoslib/service/conda/conda.py b/enoslib/service/conda/conda.py index ba015fd7..ea2b20d7 100644 --- a/enoslib/service/conda/conda.py +++ b/enoslib/service/conda/conda.py @@ -68,7 +68,7 @@ def _inject_wrapper_script(env_name, **kwargs): _create_wrapper_script(p, env_name) -def conda_run_command(command: str, env_name: str, **kwargs: Any): +def _conda_run_command(command: str, env_name: str, **kwargs: Any): """Run a single shell command in the context of a Conda environment. Wrapper around :py:func:`enoslib.api.run_command` that is conda aware. @@ -85,7 +85,7 @@ def conda_run_command(command: str, env_name: str, **kwargs: Any): return run_command(command, extra_vars=extra_vars, **kwargs) -class conda_play_on(play_on): +class _conda_play_on(play_on): """Run Ansible modules in the context of a Conda environment.""" def __init__(self, env_name: str, **kwargs: Any): @@ -95,7 +95,7 @@ class conda_play_on(play_on): _inject_wrapper_script(env_name, **kwargs) -class Conda(Service): +class _Conda(Service): def __init__(self, *, nodes: List[Host]): """Manage Conda on your nodes. @@ -184,18 +184,28 @@ class Conda(Service): pass -class Dask(Service): +class _Dask(Service): def __init__( - self, scheduler: Host, worker: List[Host], worker_args: str = "", env_file: Optional[str] = None, + self, + scheduler: Host, + worker: List[Host], + worker_args: str = "", + env_file: Optional[str] = None, ): """Initialize a Dask cluster on the nodes. + This installs a dask cluster from scratch by installing the dependency from the conda env_file. + As a consequence bootstraping the dask cluster is easy but not fast + (conda might be slow to do his job). Also everything run as root which might not be ideal. + Instead you can have a look to the Dask Service (:py:class:`enoslib.service.conda.conda.Dask`) + that will be faster at bootstraping Dask and run as a regular user. + Args: scheduler: the scheduler host worker: the workers Hosts worker_args: args to be passed when starting the worker (see dask-worker --help) env_file: conda environment with you specific dependencies. - Dask should be present in this environment. + Dask must be present in this environment. Examples: @@ -210,7 +220,7 @@ class Dask(Service): self.worker_args = worker_args self.env_file = env_file self.env_name = _get_env_name(env_file) if env_file is not None else "__dask__" - self.conda = Conda(nodes=worker + [scheduler]) + self.conda = _Conda(nodes=worker + [scheduler]) self.roles = {"scheduler": [self.scheduler], "worker": self.worker} def deploy(self): @@ -263,3 +273,149 @@ class Dask(Service): executable="/bin/bash", display_name="Killing the dask worker ", ) + + +def in_conda_cmd(cmd: str, env: str, prefix: str): + """Build a command line that will run inside a conda env. + + Make sure conda env is sourced correctly. + + Args: + cmd: The command to run + env: The conda environment to activate + prefix: The conda prefix, where the conda installation is + + Return: + The command string prefixed by the right command to jump into the + conda env. + """ + return f"source {prefix}/etc/profile.d/conda.sh && conda activate andromak && {cmd}" + + +class Dask(Service): + def __init__( + self, + conda_env: str, + conda_prefix: str = None, + scheduler: Host = None, + workers: List[Host] = None, + worker_args: str = "", + run_as: str = "root", + ): + """Deploy a Dask Cluster. + + It bootstraps a Dask scheduler and workers by activating the passed conda environment. + User must have an environment ready with, at least, dask installed inside. + The agents will be started as the passed user. + + It can be used as a context manager. + Note that the exit method isn't optimal though (see + :py:method:`enoslib.service.conda.conda.AutoDask.destroy`) + + Args: + conda_env: name of the conda environment (on the remote system) + conda_prefix: prefix of the conda installation (will be used to bring conda in the env) + Default to /home/<run_as>/miniconda3 + scheduler: Host that will serve as the dask scheduler + workers: List of Host that will serve as workers + worker_args: specific worker args to pass (e.g "--nthreads 1 --nprocs 8") + run_as: remote user to use. Conda must be available to this user. + + Examples: + + `Notebook <examples/dask.ipynb>`_ + """ + self.conda_env = conda_env + if conda_prefix is None: + self.conda_prefix = f"/home/{run_as}/miniconda3" + else: + self.conda_prefix = conda_prefix + self.scheduler = scheduler + self.workers = workers + self.worker_args = worker_args + self.run_as = run_as + + # computed + self.roles = dict(scheduler=[self.scheduler], worker=self.workers) + # can be set to optimize destroy + self.client = None + + def in_conda_cmd(self, cmd: str): + """Transforms a command to be executed in the context of the current conda env. + + Args: + cmd: the command string + + Returns: + The transformed command string prefixed by some other to activate the conda env. + """ + return in_conda_cmd(cmd, self.conda_env, self.conda_prefix) + + def deploy(self): + cmd = self.in_conda_cmd("dask-scheduler") + print(cmd) + with play_on( + pattern_hosts="scheduler", + roles=self.roles, + run_as=self.run_as, + gather_facts=False, + ) as p: + p.raw( + f"(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d '{cmd}'", + executable="/bin/bash", + ) + p.wait_for(host="{{ inventory_hostname }}", port="8786") + + scheduler_addr = self.roles["scheduler"][0].address + cmd = self.in_conda_cmd( + f"dask-worker tcp://{scheduler_addr}:8786 {self.worker_args}" + ) + with play_on( + pattern_hosts="worker", + roles=self.roles, + run_as=self.run_as, + gather_facts=False, + ) as p: + p.raw( + f"(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d '{cmd}'", + executable="/bin/bash", + ) + + def destroy(self): + """Destroy the dask cluster. + + Note that client.shutdown() is much more efficient. + """ + if self.client is not None: + self.client.shutdown() + else: + # wipe all tmux created + with play_on( + pattern_hosts="scheduler", + roles=self.roles, + gather_facts=False, + run_as=self.run_as, + ) as p: + p.raw( + "tmux kill-session -t dask-scheduler || true", + executable="/bin/bash", + display_name="Killing the dask scheduler", + ) + with play_on( + pattern_hosts="worker", + roles=self.roles, + gather_facts=False, + run_as=self.run_as, + ) as p: + p.raw( + "tmux kill-session -t dask-worker || true", + executable="/bin/bash", + display_name="Killing the dask worker ", + ) + + def __enter__(self): + self.deploy() + return self + + def __exit__(self, *args): + self.destroy() -- GitLab