diff --git a/docs/apidoc/examples/conda.py b/docs/apidoc/examples/conda.py deleted file mode 100644 index 772107a34ea0fd5a7ff0a936b9cc4c9c9ce446fe..0000000000000000000000000000000000000000 --- a/docs/apidoc/examples/conda.py +++ /dev/null @@ -1,51 +0,0 @@ -from enoslib.infra.enos_g5k.provider import G5k -from enoslib.infra.enos_g5k.configuration import (Configuration, - NetworkConfiguration) -from enoslib.service.conda import Conda, conda_run_command, conda_play_on -import logging -import os -import time - -logging.basicConfig(level=logging.DEBUG) - -# claim the resources -conf = Configuration.from_settings(job_type="allow_classic_ssh", - job_name="conda") -network = NetworkConfiguration(id="n1", - type="prod", - roles=["my_network"], - site="rennes") -conf.add_network_conf(network)\ - .add_machine(roles=["control"], - cluster="parapluie", - nodes=2, - primary_network=network)\ - .finalize() - -provider = G5k(conf) -roles, networks = provider.init() - -# let's provision a new env -m = Conda(nodes=roles["control"]) -m.deploy(env_name="plop", packages=["dask"]) - -# make use of this environment new environment -r = conda_run_command("conda env export", "plop", roles=roles) -print(r) - -# make use of an existing environment (somewhere in ~/miniconda3 most probably) -# this is practical because the env can be created on a shared filesystem -# ans use on all the nodes -r = conda_run_command("conda env export", "spark", roles=roles, run_as="msimonin") -print(r) - -# run this in the new (local to the node) environment -with conda_play_on("plop", roles=roles) as p: - p.shell("conda env export > /tmp/plop.env") - p.fetch(src="/tmp/plop.env", dest="/tmp/plop.env") - -# run this in a shared environment -with conda_play_on("spark", roles=roles, run_as="msimonin") as p: - # launch a script that requires spark n'co - p.shell("conda env export > /tmp/spark.env") - p.fetch(src="/tmp/plop.env", dest="/tmp/spark.env") \ No newline at end of file diff --git a/docs/apidoc/examples/dask.ipynb b/docs/apidoc/examples/dask.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..ae95b54043ec8edecbddd22e6199c989b6efb86b --- /dev/null +++ b/docs/apidoc/examples/dask.ipynb @@ -0,0 +1,351 @@ +{ + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5-final" + }, + "orig_nbformat": 2, + "kernelspec": { + "name": "python3", + "display_name": "Python 3.8.5 64-bit ('andromak': conda)", + "metadata": { + "interpreter": { + "hash": "044ddde3072526b14262047f16da4fe10c08ccab664ff325d5dd19288f92d50e" + } + } + } + }, + "nbformat": 4, + "nbformat_minor": 2, + "cells": [ + { + "source": [ + "# Dask deployment on Grid'5000\n", + "\n", + "This notebook will deploy a Dask cluster on Grid'5000 and launch a simpe computation.\n", + "\n", + "Requirements: \n", + " - A conda[[1]] environment setup on the Grid'5000 frontend with dask installed and EnOSlib.\n", + " - The same environment can be use to run this notebook from your local machine. \n", + "\n", + "[1]: https://docs.conda.io/en/latest/miniconda.html#linux-installers\n", + "\n", + "\n", + "## Initial impors\n", + " " + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "from enoslib import *\n", + "import logging\n", + "\n", + "# get some logs\n", + "logging.basicConfig(level=logging.INFO)" + ] + }, + { + "source": [ + "## Get some resources on Grid'5000\n", + "\n", + "This will reserve two nodes, where the Dask cluster will be deployed later." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from grenoble\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from lille\n", + "{'roles': ['scheduler'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}\n", + "{'roles': ['worker'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}\n", + "{\n", + " \"dhcp\": true,\n", + " \"force_deploy\": false,\n", + " \"env_name\": \"debian10-x64-nfs\",\n", + " \"job_name\": \"dask\",\n", + " \"job_type\": \"allow_classic_ssh\",\n", + " \"key\": \"/home/msimonin/.ssh/id_rsa.pub\",\n", + " \"queue\": \"default\",\n", + " \"walltime\": \"02:00:00\",\n", + " \"resources\": {\n", + " \"machines\": [\n", + " {\n", + " \"roles\": [\n", + " \"scheduler\"\n", + " ],\n", + " \"primary_network\": \"prod\",\n", + " \"secondary_networks\": [],\n", + " \"cluster\": \"parapide\",\n", + " \"nodes\": 1\n", + " },\n", + " {\n", + " \"roles\": [\n", + " \"worker\"\n", + " ],\n", + " \"primary_network\": \"prod\",\n", + " \"secondary_networks\": [],\n", + " \"cluster\": \"parapide\",\n", + " \"nodes\": 1\n", + " }\n", + " ],\n", + " \"networks\": [\n", + " {\n", + " \"id\": \"prod\",\n", + " \"type\": \"prod\",\n", + " \"roles\": [\n", + " \"network\"\n", + " ],\n", + " \"site\": \"rennes\"\n", + " }\n", + " ]\n", + " }\n", + "}\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from luxembourg\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from lyon\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from nancy\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from nantes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from rennes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading 1425746 from rennes\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from sophia\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:Waiting for 1425746 on rennes [2021-01-12 13:37:51]\n", + "INFO:enoslib.infra.enos_g5k.g5k_api_utils:All jobs are Running !\n" + ] + } + ], + "source": [ + "prod = G5kNetworkConf(id=\"prod\", roles=[\"network\"], type=\"prod\", site=\"rennes\")\n", + "conf = (\n", + " G5kConf.from_settings(job_name=\"dask\", job_type=\"allow_classic_ssh\")\n", + " .add_machine(roles=[\"scheduler\"], cluster=\"parapide\", nodes=1, primary_network=prod)\n", + " .add_machine(roles=[\"worker\"], cluster=\"parapide\", nodes=1, primary_network=prod)\n", + " .add_network_conf(prod)\n", + ").finalize()\n", + "provider = G5k(conf)\n", + "roles, _ = provider.init()" + ] + }, + { + "source": [ + "# Deploy Dask on the nodes\n", + "This assumes that the conda environment (dask-base) is configured in your home directory in `/home/<user>/miniconda3`.\n", + "\n", + "If the installation path differs, you can specify it using the `conda_prefix` parameter. " + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpc775senv with vars:\n", + "{}\n", + "source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler\n", + "\n", + "PLAY [scheduler] ***************************************************************\n", + "\n", + "TASK [(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler'] ***\n", + "Tuesday 12 January 2021 14:14:40 +0100 (0:13:33.402) 0:28:23.917 ******* \n", + "changed: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "TASK [__calling__ wait_for] ****************************************************\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:01.219) 0:28:25.136 ******* \n", + "ok: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-12.rennes.grid5000.fr : ok=2 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:00.436) 0:28:25.573 ******* \n", + "=============================================================================== \n", + "(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler' --- 1.22s\n", + "__calling__ wait_for ---------------------------------------------------- 0.44s\n", + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmp8e30zh9l with vars:\n", + "{}\n", + "{'code': 0, 'result': [{'parapide-12.rennes.grid5000.fr': {'ok': 2, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpc775senv'}\n", + "\n", + "PLAY [worker] ******************************************************************\n", + "\n", + "TASK [(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-worker tcp://parapide-12.rennes.grid5000.fr:8786 '] ***\n", + "Tuesday 12 January 2021 14:14:42 +0100 (0:00:00.082) 0:28:25.656 ******* \n", + "changed: [parapide-16.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-16.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:43 +0100 (0:00:01.167) 0:28:26.823 ******* \n", + "=============================================================================== \n", + "(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-worker tcp://parapide-12.rennes.grid5000.fr:8786 ' --- 1.17s\n", + "{'code': 0, 'result': [{'parapide-16.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmp8e30zh9l'}\n" + ] + } + ], + "source": [ + "username = g5k_api_utils.get_api_username()\n", + "dask = Dask(\"dask-base\", scheduler=roles[\"scheduler\"][0], workers=roles[\"worker\"], run_as=username)\n", + "dask.deploy()" + ] + }, + { + "source": [ + "## Using Dask\n", + "\n", + "Here we go with a simple computation (3 tasks, 2 dependent-ones). \n", + "The below code will create all the tunnels needed to access the Dask dashboard and the scheduler." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "from dask import delayed\n", + "import time\n", + "\n", + "def inc(x):\n", + " time.sleep(5)\n", + " return x + 1\n", + "\n", + "def dec(x):\n", + " time.sleep(3)\n", + " return x - 1\n", + "\n", + "def add(x, y):\n", + " time.sleep(7)\n", + " return x + y\n", + "\n", + "x = delayed(inc)(1)\n", + "y = delayed(dec)(2)\n", + "total = delayed(add)(x, y)" + ] + }, + { + "source": [ + "## Launch the computation\n", + "\n", + "In the mean time you can check the web dashboard. The connection URL will be displayed." + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)\n", + "INFO:paramiko.transport:Authentication (publickey) successful!\n", + "INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)\n", + "dashboard: http://0.0.0.0:38383\n", + "INFO:paramiko.transport:Authentication (publickey) successful!\n", + "Scheduler address: 0.0.0.0:35945\n", + "result=3\n" + ] + } + ], + "source": [ + "from dask.distributed import Client\n", + "# Tunnel to the dashboard\n", + "addr, port, tunnel = G5kTunnel(dask.scheduler.address, 8787).start()\n", + "print(f\"dashboard: http://{addr}:{port}\")\n", + "with G5kTunnel(dask.scheduler.address, 8786) as (addr, port, _):\n", + " print(f\"Scheduler address: {addr}:{port}\")\n", + " client = Client(f\"tcp://{addr}:{port}\")\n", + " # launch a computation\n", + " print(f\"result={total.compute()}\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stderr", + "text": [ + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpt2mbeu_y with vars:\n", + "{}\n", + "\n", + "PLAY [scheduler] ***************************************************************\n", + "\n", + "TASK [Killing the dask scheduler] **********************************************\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:13.913) 0:28:40.736 ******* \n", + "changed: [parapide-12.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-12.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:00.203) 0:28:40.940 ******* \n", + "=============================================================================== \n", + "Killing the dask scheduler ---------------------------------------------- 0.21s\n", + "INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpr0joxpnq with vars:\n", + "{}\n", + "{'code': 0, 'result': [{'parapide-12.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpt2mbeu_y'}\n", + "\n", + "PLAY [worker] ******************************************************************\n", + "\n", + "TASK [Killing the dask worker] *************************************************\n", + "Tuesday 12 January 2021 14:14:57 +0100 (0:00:00.074) 0:28:41.014 ******* \n", + "changed: [parapide-16.rennes.grid5000.fr]\n", + "\n", + "PLAY RECAP *********************************************************************\n", + "parapide-16.rennes.grid5000.fr : ok=1 changed=1 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0 \n", + "\n", + "Tuesday 12 January 2021 14:14:58 +0100 (0:00:00.202) 0:28:41.217 ******* \n", + "=============================================================================== \n", + "Killing the dask worker ------------------------------------------------ 0.21s\n", + "{'code': 0, 'result': [{'parapide-16.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 1, 'skipped': 0, 'rescued': 0, 'ignored': 0}}], 'playbook': '/home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpr0joxpnq'}\n" + ] + } + ], + "source": [ + "# will stop the tunnel to the dashboard and the Dask cluster.\n", + "if tunnel is not None:\n", + " tunnel.stop(force=True)\n", + "dask.destroy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ] +} \ No newline at end of file diff --git a/docs/apidoc/examples/dask.py b/docs/apidoc/examples/dask.py deleted file mode 100644 index 5ddb01e3e1a8812a9f53740001b3cabf9ca81885..0000000000000000000000000000000000000000 --- a/docs/apidoc/examples/dask.py +++ /dev/null @@ -1,41 +0,0 @@ -from enoslib.infra.enos_g5k.provider import G5k -from enoslib.infra.enos_g5k.configuration import (Configuration, - NetworkConfiguration) -from enoslib.service import Dask - -import logging -import os -import time - -logging.basicConfig(level=logging.INFO) - -# path to the inventory -inventory = os.path.join(os.getcwd(), "hosts") - -# claim the resources -conf = Configuration.from_settings(job_type="allow_classic_ssh", - job_name="dask") -network = NetworkConfiguration(id="n1", - type="prod", - roles=["my_network"], - site="rennes") -conf.add_network_conf(network)\ - .add_machine(roles=["control"], - cluster="paravance", - nodes=2, - primary_network=network)\ - .finalize() - -provider = G5k(conf) -roles, networks = provider.init() - -m = Dask(scheduler=roles["control"][0], - worker=roles["control"], - env_file="./environment.yml") -m.deploy() - -time.sleep(10) -m.destroy() - -# destroy the boxes -provider.destroy() diff --git a/enoslib/__init__.py b/enoslib/__init__.py index 12d2f85d7ca34ff55d87d2a6c75b9023acd90fae..8b9e5d525ba53bfb63dff189c9c2827d24092694 100644 --- a/enoslib/__init__.py +++ b/enoslib/__init__.py @@ -14,7 +14,8 @@ from enoslib.api import ( ) # Services -from enoslib.service.conda.conda import Conda, conda_play_on, conda_run_command, Dask +from enoslib.service.conda.conda import Dask, in_conda_cmd + from enoslib.service.docker.docker import Docker from enoslib.service.dstat.dstat import Dstat from enoslib.service.locust.locust import Locust @@ -24,7 +25,8 @@ from enoslib.service.netem.netem import Netem, SimpleNetem from enoslib.service.skydive.skydive import Skydive # Providers -from enoslib.infra.enos_g5k.provider import G5k +from enoslib.infra.enos_g5k.provider import G5k, G5kTunnel +import enoslib.infra.enos_g5k.g5k_api_utils as g5k_api_utils from enoslib.infra.enos_g5k.configuration import Configuration as G5kConf from enoslib.infra.enos_g5k.configuration import NetworkConfiguration as G5kNetworkConf from enoslib.infra.enos_g5k.configuration import ServersConfiguration as G5kServersConf @@ -64,16 +66,14 @@ from enoslib.infra.enos_vmong5k.provider import start_virtualmachines from enoslib.infra.enos_iotlab.provider import Iotlab from enoslib.infra.enos_iotlab.configuration import Configuration as IotlabConf -from enoslib.infra.enos_iotlab.objects import ( - IotlabSensor, - IotlabSniffer, - IotlabSerial, -) +from enoslib.infra.enos_iotlab.objects import IotlabSensor, IotlabSniffer, IotlabSerial try: from enoslib.infra.enos_chameleonbaremetal.provider import Chameleonbaremetal as CBM - from enoslib.infra.enos_chameleonbaremetal.configuration import Configuration as CBMConf + from enoslib.infra.enos_chameleonbaremetal.configuration import ( + Configuration as CBMConf, + ) from enoslib.infra.enos_chameleonbaremetal.configuration import ( MachineConfiguration as CBMMachineConf, ) diff --git a/enoslib/infra/enos_g5k/provider.py b/enoslib/infra/enos_g5k/provider.py index 4045c56778a91176fca2440313ca1221644e219f..989886e6eb541cb314676b5850cee346f366e003 100644 --- a/enoslib/infra/enos_g5k/provider.py +++ b/enoslib/infra/enos_g5k/provider.py @@ -3,8 +3,11 @@ import copy import logging import operator from itertools import groupby +import os from typing import List, Optional, Tuple, cast +from sshtunnel import SSHTunnelForwarder + from enoslib.host import Host from enoslib.infra.enos_g5k.concrete import ( ConcreteClusterConf, @@ -27,7 +30,7 @@ from enoslib.infra.enos_g5k.constants import ( ) from enoslib.infra.enos_g5k.driver import get_driver from enoslib.infra.enos_g5k.error import MissingNetworkError -from enoslib.infra.enos_g5k.g5k_api_utils import OarNetwork +from enoslib.infra.enos_g5k.g5k_api_utils import OarNetwork, get_api_username from enoslib.infra.enos_g5k.objects import ( G5kHost, G5kNetwork, @@ -207,6 +210,48 @@ def _join(machines: List[ConcreteGroup], networks: List[G5kNetwork]) -> List[G5k return hosts +class G5kTunnel(object): + """A context Manager that initiate a tunnel to a grid node if needed.""" + + def __init__(self, address: str, port: int): + """ + Args: + address: The ip address/fqdn of the targetted service + port: The port of the targetted service + """ + self.address = address + self.port = port + + # computed + self.tunnel = None + + def start(self): + import socket + + if "grid5000.fr" not in socket.getfqdn(): + logging.debug(f"Creating a tunnel to {self.address}:{self.port}") + self.tunnel = SSHTunnelForwarder( + "access.grid5000.fr", + ssh_username=get_api_username(), + remote_bind_address=(self.address, self.port), + ) + self.tunnel.start() + local_address, local_port = self.tunnel.local_bind_address + return local_address, local_port, self.tunnel + return self.address, self.port, None + + def close(self): + if self.tunnel is not None: + logging.debug(f"Closing the tunnel to {self.address}:{self.port}") + self.tunnel.stop(force=True) + + def __enter__(self): + return self.start() + + def __exit__(self, *args): + self.close() + + class G5k(Provider): """The provider to use when deploying on Grid'5000.""" @@ -408,6 +453,19 @@ class G5k(Provider): networks.extend(network.to_enos()) return roles, networks + @staticmethod + def tunnel(address: str, port: int): + """Create a tunnel if necessary between here and there (in G5k). + + Args: + address: The remote address to reach (assuming inside g5k) + port: The remote port to reach + + Returns + The context manager + """ + return G5kTunnel(address, port).start() + def __str__(self): return "G5k" diff --git a/enoslib/infra/enos_g5k/utils.py b/enoslib/infra/enos_g5k/utils.py index a7d11ecac80f2f9e768c6da6bfb2317baad3ae2b..d3a0a2574ef79fe8e59c5216070c48110ecd5501 100644 --- a/enoslib/infra/enos_g5k/utils.py +++ b/enoslib/infra/enos_g5k/utils.py @@ -5,7 +5,7 @@ from operator import itemgetter from typing import Dict, List, Tuple import logging -from enoslib.infra.enos_g5k import remote +from . import remote logger = logging.getLogger(__name__) diff --git a/enoslib/service/__init__.py b/enoslib/service/__init__.py index 638fed4a98ffd820a180564ed1cbb523c1b588bc..b0dcbbdfaa111ccbdedc256ef0aa58448f620d98 100644 --- a/enoslib/service/__init__.py +++ b/enoslib/service/__init__.py @@ -4,4 +4,4 @@ from .netem.netem import Netem, SimpleNetem # noqa from .skydive.skydive import Skydive # noqa from .locust.locust import Locust # noqa from .dstat.dstat import Dstat # noqa -from .conda.conda import Conda, Dask # noqa +from .conda.conda import _Conda, Dask # noqa diff --git a/enoslib/service/conda/__init__.py b/enoslib/service/conda/__init__.py index b5babcba718d366cfdbf2b88acc5191b0b667bda..4aa693b481ca887cb094ca3d756b88b69dadb882 100644 --- a/enoslib/service/conda/__init__.py +++ b/enoslib/service/conda/__init__.py @@ -1 +1 @@ -from .conda import Conda, Dask, conda_run_command, conda_play_on # noqa +from .conda import _Conda, Dask, _conda_run_command, _conda_play_on # noqa diff --git a/enoslib/service/conda/conda.py b/enoslib/service/conda/conda.py index ba015fd7ad0ff4ded5e49ee29d08042cc5f52491..ea2b20d74b36ab0557c93d4b64b2cc133a3161c3 100644 --- a/enoslib/service/conda/conda.py +++ b/enoslib/service/conda/conda.py @@ -68,7 +68,7 @@ def _inject_wrapper_script(env_name, **kwargs): _create_wrapper_script(p, env_name) -def conda_run_command(command: str, env_name: str, **kwargs: Any): +def _conda_run_command(command: str, env_name: str, **kwargs: Any): """Run a single shell command in the context of a Conda environment. Wrapper around :py:func:`enoslib.api.run_command` that is conda aware. @@ -85,7 +85,7 @@ def conda_run_command(command: str, env_name: str, **kwargs: Any): return run_command(command, extra_vars=extra_vars, **kwargs) -class conda_play_on(play_on): +class _conda_play_on(play_on): """Run Ansible modules in the context of a Conda environment.""" def __init__(self, env_name: str, **kwargs: Any): @@ -95,7 +95,7 @@ class conda_play_on(play_on): _inject_wrapper_script(env_name, **kwargs) -class Conda(Service): +class _Conda(Service): def __init__(self, *, nodes: List[Host]): """Manage Conda on your nodes. @@ -184,18 +184,28 @@ class Conda(Service): pass -class Dask(Service): +class _Dask(Service): def __init__( - self, scheduler: Host, worker: List[Host], worker_args: str = "", env_file: Optional[str] = None, + self, + scheduler: Host, + worker: List[Host], + worker_args: str = "", + env_file: Optional[str] = None, ): """Initialize a Dask cluster on the nodes. + This installs a dask cluster from scratch by installing the dependency from the conda env_file. + As a consequence bootstraping the dask cluster is easy but not fast + (conda might be slow to do his job). Also everything run as root which might not be ideal. + Instead you can have a look to the Dask Service (:py:class:`enoslib.service.conda.conda.Dask`) + that will be faster at bootstraping Dask and run as a regular user. + Args: scheduler: the scheduler host worker: the workers Hosts worker_args: args to be passed when starting the worker (see dask-worker --help) env_file: conda environment with you specific dependencies. - Dask should be present in this environment. + Dask must be present in this environment. Examples: @@ -210,7 +220,7 @@ class Dask(Service): self.worker_args = worker_args self.env_file = env_file self.env_name = _get_env_name(env_file) if env_file is not None else "__dask__" - self.conda = Conda(nodes=worker + [scheduler]) + self.conda = _Conda(nodes=worker + [scheduler]) self.roles = {"scheduler": [self.scheduler], "worker": self.worker} def deploy(self): @@ -263,3 +273,149 @@ class Dask(Service): executable="/bin/bash", display_name="Killing the dask worker ", ) + + +def in_conda_cmd(cmd: str, env: str, prefix: str): + """Build a command line that will run inside a conda env. + + Make sure conda env is sourced correctly. + + Args: + cmd: The command to run + env: The conda environment to activate + prefix: The conda prefix, where the conda installation is + + Return: + The command string prefixed by the right command to jump into the + conda env. + """ + return f"source {prefix}/etc/profile.d/conda.sh && conda activate andromak && {cmd}" + + +class Dask(Service): + def __init__( + self, + conda_env: str, + conda_prefix: str = None, + scheduler: Host = None, + workers: List[Host] = None, + worker_args: str = "", + run_as: str = "root", + ): + """Deploy a Dask Cluster. + + It bootstraps a Dask scheduler and workers by activating the passed conda environment. + User must have an environment ready with, at least, dask installed inside. + The agents will be started as the passed user. + + It can be used as a context manager. + Note that the exit method isn't optimal though (see + :py:method:`enoslib.service.conda.conda.AutoDask.destroy`) + + Args: + conda_env: name of the conda environment (on the remote system) + conda_prefix: prefix of the conda installation (will be used to bring conda in the env) + Default to /home/<run_as>/miniconda3 + scheduler: Host that will serve as the dask scheduler + workers: List of Host that will serve as workers + worker_args: specific worker args to pass (e.g "--nthreads 1 --nprocs 8") + run_as: remote user to use. Conda must be available to this user. + + Examples: + + `Notebook <examples/dask.ipynb>`_ + """ + self.conda_env = conda_env + if conda_prefix is None: + self.conda_prefix = f"/home/{run_as}/miniconda3" + else: + self.conda_prefix = conda_prefix + self.scheduler = scheduler + self.workers = workers + self.worker_args = worker_args + self.run_as = run_as + + # computed + self.roles = dict(scheduler=[self.scheduler], worker=self.workers) + # can be set to optimize destroy + self.client = None + + def in_conda_cmd(self, cmd: str): + """Transforms a command to be executed in the context of the current conda env. + + Args: + cmd: the command string + + Returns: + The transformed command string prefixed by some other to activate the conda env. + """ + return in_conda_cmd(cmd, self.conda_env, self.conda_prefix) + + def deploy(self): + cmd = self.in_conda_cmd("dask-scheduler") + print(cmd) + with play_on( + pattern_hosts="scheduler", + roles=self.roles, + run_as=self.run_as, + gather_facts=False, + ) as p: + p.raw( + f"(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d '{cmd}'", + executable="/bin/bash", + ) + p.wait_for(host="{{ inventory_hostname }}", port="8786") + + scheduler_addr = self.roles["scheduler"][0].address + cmd = self.in_conda_cmd( + f"dask-worker tcp://{scheduler_addr}:8786 {self.worker_args}" + ) + with play_on( + pattern_hosts="worker", + roles=self.roles, + run_as=self.run_as, + gather_facts=False, + ) as p: + p.raw( + f"(tmux ls | grep dask-worker )|| tmux new-session -s dask-worker -d '{cmd}'", + executable="/bin/bash", + ) + + def destroy(self): + """Destroy the dask cluster. + + Note that client.shutdown() is much more efficient. + """ + if self.client is not None: + self.client.shutdown() + else: + # wipe all tmux created + with play_on( + pattern_hosts="scheduler", + roles=self.roles, + gather_facts=False, + run_as=self.run_as, + ) as p: + p.raw( + "tmux kill-session -t dask-scheduler || true", + executable="/bin/bash", + display_name="Killing the dask scheduler", + ) + with play_on( + pattern_hosts="worker", + roles=self.roles, + gather_facts=False, + run_as=self.run_as, + ) as p: + p.raw( + "tmux kill-session -t dask-worker || true", + executable="/bin/bash", + display_name="Killing the dask worker ", + ) + + def __enter__(self): + self.deploy() + return self + + def __exit__(self, *args): + self.destroy()