diff --git a/g5k/07_fault_injection_on_processes.ipynb b/g5k/07_fault_injection_on_processes.ipynb deleted file mode 100644 index df074c53d425285dd8791f251022562749cb0806..0000000000000000000000000000000000000000 --- a/g5k/07_fault_injection_on_processes.ipynb +++ /dev/null @@ -1,1030 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Fault-injection on processes\n", - "\n", - "---\n", - "\n", - "- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html\n", - "- Instant chat: https://framateam.org/enoslib\n", - "- Source code: https://gitlab.inria.fr/discovery/enoslib\n", - "\n", - "---\n", - "\n", - "## Prerequisites\n", - "\n", - "<div class=\"alert alert-block alert-warning\">\n", - " Make sure you've run the one time setup for your environment\n", - "</div>" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Rabbitmq / Cron\n", - "\n", - "[RabbitMQ](https://www.rabbitmq.com/) is an open-source message broker that enables different software applications to communicate and exchange data in a reliable and scalable manner. It follows the Advanced Message Queuing Protocol (AMQP) and provides a flexible messaging model based on the concept of queues.\n", - "\n", - "For our experiment, we will deploy a publish / suscribe environment to demonstrate the impact of our api.\n", - "\n", - "\n", - "[Cron](https://man7.org/linux/man-pages/man8/cron.8.html) is a time-based job scheduler in Unix-like operating systems. It allows you to schedule and automate the execution of commands or scripts at specified intervals or specific times. Cron is commonly used for repetitive tasks, system maintenance, and scheduling periodic jobs.\n", - "\n", - "All asynchronous tools shown here are based on cron. Because of that, we can't schedule an event that will happen before the next minute, after this delay, we can be precise to the second (with a final accuracy of 1/2 seconds)." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Setup" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import signal\n", - "import os\n", - "from datetime import datetime, timedelta\n", - "\n", - "import enoslib as en\n", - "\n", - "en.init_logging()\n", - "en.check()\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Reservation" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "CLUSTER = \"nova\"\n", - "HERE = os.getcwd()\n", - "# claim the resources\n", - "conf = (\n", - " en.G5kConf.from_settings(\n", - " job_name=\"fault-injection tutorial\",\n", - " job_type=[],\n", - " )\n", - " .add_machine(roles=[\"server\"], cluster=CLUSTER, nodes=1)\n", - " .add_machine(\n", - " roles=[\"producer\"], cluster=CLUSTER, nodes=1\n", - " ) # all the producers are running on the same machine\n", - " .add_machine(\n", - " roles=[\"consumer\"], cluster=CLUSTER, nodes=1\n", - " ) # all the consumers are running on the same machine\n", - ")\n", - "\n", - "provider = en.G5k(conf)\n", - "\n", - "roles, networks = provider.init()\n", - "\n", - "# Fill in network information from nodes\n", - "roles = en.sync_info(roles, networks)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "roles" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Rabbitmq configuration" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Common node's configuration" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Each node must have this minimal configuration :\n", - "- having python and pip\n", - "- having procps (to use kill)\n", - "- having pika (for the rabbitmq connection)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Common configuration\n", - "with en.actions(roles=roles) as p:\n", - " p.apt(task_name=\"Installing python\", name=\"python3\")\n", - " p.apt(task_name=\"Installing procps\", name=\"procps\")\n", - "\n", - " p.command(\"apt update\")\n", - "\n", - " p.apt(task_name=\"Installing pip\", name=\"python3-pip\")\n", - " p.pip(task_name=\"Installing pika\", name=\"pika\")\n", - "\n", - " p.file(path=\"/tmp/rabbitmq\", state=\"absent\")\n", - " p.file(path=\"/tmp/rabbitmq\", state=\"directory\")\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Server configuration" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "username_monitoring = \"user\"\n", - "password_monitoring = \"password\"\n", - "\n", - "# SETUP\n", - "## Server configuration\n", - "with en.actions(roles=roles[\"server\"]) as p:\n", - " # Setting the rabbimq server\n", - " p.apt(task_name=\"Installing rabbitmq-server\", name=\"rabbitmq-server\")\n", - " p.command(\"rabbitmq-plugins enable rabbitmq_management\")\n", - " p.command(\"systemctl start rabbitmq-server\")\n", - " p.command(\"systemctl enable rabbitmq-server\")\n", - " \n", - " # For the management interface, adding a new admin\n", - " p.command(f\"rabbitmqctl add_user {username_monitoring} {password_monitoring}\")\n", - " p.command(f\"rabbitmqctl set_user_tags {username_monitoring} administrator\")\n", - " p.command(f\"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'\")\n", - "\n", - " # Allow all connections (no credentials needed for consumers and producers)\n", - " p.shell('echo \"loopback_users.guest = false\" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')\n", - "\n", - " p.command(\"systemctl restart rabbitmq-server\")\n", - " " - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Producers' node configuration" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The producers' node has to be configured such that it contains its specific script." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with en.actions(roles=roles[\"producer\"]) as p:\n", - " p.copy(\n", - " src=HERE + \"/producer.py\",\n", - " dest=\"/tmp/rabbitmq/producer.py\",\n", - " task_name=\"copying producer file\",\n", - " )" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Consumers' node configuration" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The consumers' node has to be configured such that it contains its specific script." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "with en.actions(roles=roles[\"consumer\"]) as p:\n", - " p.copy(\n", - " src=HERE + \"/consumer.py\",\n", - " dest=\"/tmp/rabbitmq/consumer.py\",\n", - " task_name=\"copying consumer file\",\n", - " )" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Utility functions" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", - "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))\n", - "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all cronjobs if any, purges the rabbitmq queues)\n", - "- to launch all producer(s) and consumer(s) by specifying the number of each\n", - "- to reset the experiment by going back to its initial state (clean + launch) " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import logging\n", - "import json\n", - "\n", - "from typing import List\n", - "\n", - "from enoslib.log import DisableLogging\n", - "from enoslib.config import config_context\n", - "from IPython.display import clear_output\n", - "\n", - "\n", - "server = roles[\"server\"][0]\n", - "ip_address_obj = server.filter_addresses(networks=networks[\"prod\"])[0]\n", - "server_ip = ip_address_obj.ip.ip\n", - "\n", - "# list all producer(s) ip(s)\n", - "producer_ips : List[str] = []\n", - "for p in roles[\"producer\"]:\n", - " ip_address_obj = p.filter_addresses(networks=networks[\"prod\"])[0]\n", - " producer_ips.append(str(ip_address_obj.ip.ip))\n", - "\n", - "# list all consumer(s) ip(s)\n", - "consumer_ips : List[str] = []\n", - "for c in roles[\"consumer\"]:\n", - " ip_address_obj = c.filter_addresses(networks=networks[\"prod\"])[0]\n", - " consumer_ips.append(str(ip_address_obj.ip.ip))\n", - "\n", - "def _get_queues_info() -> List:\n", - " with DisableLogging(level=logging.ERROR):\n", - " with config_context(ansible_stdout=\"noop\"):\n", - " result = en.run_command(\n", - " \"rabbitmqctl list_queues name messages --formatter json; echo '---'; \"\n", - " \"rabbitmqctl list_connections name --formatter json\",\n", - " task_name=\"Gathering statistics from the queues\",\n", - " roles=roles[\"server\"],\n", - " gather_facts=False,\n", - " on_error_continue=True,\n", - " )\n", - "\n", - " queues_info : List = []\n", - " r = result[0]\n", - " if r.status == \"FAILED\" or r.rc != 0:\n", - " return\n", - " \n", - " queues_info, connections = r.stdout.split('---')\n", - "\n", - " return (json.loads(queues_info), json.loads(connections))\n", - "\n", - "def get_queues_info_for(duration: int) -> pd.DataFrame:\n", - " results = {}\n", - " results[\"Time\"] = []\n", - " results[\"nb_received_messages\"] = []\n", - " results[\"queue_depth\"] = []\n", - " results[\"nb_consumers\"] = []\n", - " results[\"nb_producers\"] = []\n", - " for _ in range(duration):\n", - " time = str(datetime.now().strftime(\"%H:%M:%S\"))\n", - " results[\"Time\"].append(time)\n", - "\n", - " queues_info, connections = _get_queues_info()\n", - "\n", - " queue_depth = 0\n", - " nb_consumers = 0\n", - " nb_recv_msg = 0\n", - " nb_producers = 0\n", - "\n", - " for d in queues_info:\n", - " if d[\"name\"] == \"fault_injection\":\n", - " queue_depth = int(d[\"messages\"])\n", - " elif d[\"name\"] == \"received_messages\":\n", - " nb_recv_msg = int(d[\"messages\"])\n", - "\n", - " for actor in connections:\n", - " actor_ip = actor[\"name\"].split(\":\")[0]\n", - "\n", - " if actor_ip in producer_ips:\n", - " nb_producers +=1\n", - " elif actor_ip in consumer_ips:\n", - " nb_consumers +=1\n", - "\n", - " results[\"queue_depth\"].append(queue_depth)\n", - " results[\"nb_consumers\"].append(nb_consumers)\n", - " results[\"nb_producers\"].append(nb_producers)\n", - " results[\"nb_received_messages\"].append(nb_recv_msg)\n", - "\n", - " clear_output(wait=False)\n", - " print(\n", - " f\"Time : {time}\\n\"\n", - " f\"nb_received_messages : {nb_recv_msg}\\n\"\n", - " f\"queue_depth : {queue_depth}\\n\"\n", - " f\"nb_consumers: {nb_consumers}\\n\"\n", - " f\"nb_producers: {nb_producers}\\n\"\n", - " )\n", - "\n", - " df = pd.DataFrame(data=results)\n", - "\n", - " return df\n", - "\n", - "def clean():\n", - " \"\"\"\n", - " Kill all previouses launched processes, \n", - " removes all cronjobs,\n", - " purges the queues.\n", - " \"\"\"\n", - " cleaning_registry = en.ProcessRegistry()\n", - " cleaning_registry.build(\n", - " \"tuto_\",\n", - " roles[\"consumer\"] + roles[\"producer\"],\n", - " )\n", - " cleaning_registry.kill(signal.SIGKILL)\n", - "\n", - " with DisableLogging(level=logging.ERROR):\n", - " with config_context(ansible_stdout=\"noop\"):\n", - " en.run_command(\n", - " \"rabbitmqctl purge_queue fault_injection & \"\\\n", - " \"rabbitmqctl purge_queue received_messages & \",\n", - " task_name=\"purging the queue\",\n", - " roles=roles[\"server\"],\n", - " on_error_continue=True,\n", - " gather_facts=False,\n", - " )\n", - "\n", - " en.run_command(\n", - " \"crontab -r\",\n", - " task_name=\"purging crontab file\",\n", - " roles=roles[\"consumer\"] + roles[\"producer\"],\n", - " on_error_continue=True,\n", - " gather_facts=False,\n", - " )\n", - " \n", - "def launch(nconsumer, nproducer):\n", - " \"\"\"\n", - " Launch specified number of consumers and producers.\n", - " \"\"\"\n", - "\n", - " for idx in range(nconsumer):\n", - " en.run_command(\n", - " f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", - " task_name=f\"run consumer script number {idx}\",\n", - " roles=roles[\"consumer\"],\n", - " background=True,\n", - " gather_facts=False,\n", - " ns=f\"tuto_consumer_{idx}\",\n", - " )\n", - "\n", - " for idx in range(nproducer):\n", - " en.run_command(\n", - " f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", - " task_name=f\"run producer script number {idx}\",\n", - " roles=roles[\"producer\"],\n", - " background=True,\n", - " gather_facts=False,\n", - " ns=f\"tuto_producer_{idx}\",\n", - " )\n", - "\n", - "def reset(nconsumer, nproducer):\n", - " \"\"\"\n", - " Return to the initial state of the experiment.\n", - "\n", - " Can throw error (not to take into account !) if :\n", - " -> no previous launched processes\n", - " \n", - " \"\"\"\n", - " print(\n", - " \"\\n ------------------------------------ \",\n", - " \"\\n| RESETING THE EXPERIMENT PARAMETERS |\",\n", - " \"\\n ------------------------------------ \",\n", - " )\n", - "\n", - " clean()\n", - " launch(nconsumer, nproducer)\n", - " \n", - " print(\n", - " \"\\n ------------------------------ \",\n", - " \"\\n| DONE - INITIAL STATE REACHED |\",\n", - " \"\\n ------------------------------ \",\n", - " )" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## General knowledge\n", - "\n", - "A ```ProcessRegistry``` records all processes that follows a given regexp on specific roles. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# does not work, it is just an example on how to build one \n", - "registry = en.ProcessRegistry()\n", - "registry.build(\"regexp\", roles)\n", - "registry" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "One ```Process``` is defined by its pid, its host and its command, a state (ALIVE or DEAD) is also attributed to each instance." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "host = en.Host(\"192.168.0.3\", alias=\"one_alias\", user=\"foo\")\n", - "process = en.Process(\"process_name\", 1234, host, \"cmd\")\n", - "process" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "For each case below, we will act on either the consumers or the producers, never both. It can be the entire group or only a subset.\n", - "\n", - "When acting on consumers, we will observe an increase of the queue depth, meaning that the messages are not processed as fast as they are produced.\n", - "\n", - "When acting on producers, we will observe no evolution regarding the number of processed messages." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## First example : Synchronous case" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Using this type of kill, the user must wait for the end before doing anything else.\n", - "\n", - "We can specify the sended signal." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Killing all consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry = en.ProcessRegistry()\n", - "registry.build(\n", - " \"tuto_\",\n", - " roles\n", - ")\n", - "registry_on_consumers = registry.lookup(roles[\"consumer\"])\n", - "registry_on_consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_before_kill = get_queues_info_for(5)\n", - "registry_on_consumers.kill(signum=signal.SIGKILL)\n", - "results_after_kill = get_queues_info_for(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_before_kill" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_after_kill" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Killing all producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_producers = en.ProcessRegistry()\n", - "registry_on_producers.build(\n", - " \"tuto_\",\n", - " roles[\"producer\"]\n", - ")\n", - "registry_on_producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_before_kill = get_queues_info_for(5)\n", - "registry_on_producers.kill(signum=signal.SIGKILL)\n", - "results_after_kill = get_queues_info_for(5)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_before_kill" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results_after_kill" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Second example : asynchronous case" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here, each kill is scheduled using the ```start_in``` or ```start_at``` parameters, which can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Killing all consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers = en.ProcessRegistry()\n", - "registry_on_consumers.build(\n", - " \"tuto_\",\n", - " roles[\"consumer\"],\n", - ")\n", - "registry_on_consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers.kill_async(\n", - " signum = signal.SIGKILL,\n", - " start_in = timedelta(minutes=1, seconds=20),\n", - ")\n", - "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", - "results = get_queues_info_for(40)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Killing all producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_producers = en.ProcessRegistry()\n", - "registry_on_producers.build(\n", - " \"tuto_\",\n", - " roles[\"producer\"],\n", - ")\n", - "registry_on_producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_producers.kill_async(\n", - " signum = signal.SIGKILL,\n", - " start_at = datetime.now() + timedelta(minutes=1, seconds=20),\n", - ")\n", - "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", - "results = get_queues_info_for(40)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Third example : Incremental case" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here, each kill is scheduled and the choice of the killed process(es) is done totally randomly among those registered, we have to specify how many process(es) we want to kill, the beginning (date of the first kill) and the interval between each of them.\n", - "\n", - "The beginning can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object.\n", - "\n", - "The interval is a ```datetime.timedelta``` object." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers = en.ProcessRegistry()\n", - "registry_on_consumers.build(\n", - " \"tuto_\",\n", - " roles[\"consumer\"],\n", - ")\n", - "registry_on_consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers.kill_async_incr(\n", - " signum = signal.SIGKILL,\n", - " number = 2,\n", - " start_at = datetime.now() + timedelta(minutes=1, seconds=10),\n", - " interval = timedelta(seconds=20),\n", - ")\n", - "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", - "results = get_queues_info_for(45)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can have an updated version of the registry, with both dead and alive processes." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "after_refresh = registry_on_consumers.refresh()\n", - "after_refresh" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset(nconsumer, nproducer)``` specificly implemented for this experiment ! " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers.reset()\n", - "registry_on_consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "results = get_queues_info_for(2)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Fourth example : Restart a registry" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can restart an entire registry, this means killing all of them (if alive) and starting them again.\n", - "\n", - "It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the delay before they happen) and its interval with the start(s) of the process(es) can be specified." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Restarting all consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers = en.ProcessRegistry()\n", - "registry_on_consumers.build(\n", - " \"tuto_\",\n", - " roles[\"consumer\"],\n", - ")\n", - "registry_on_consumers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_consumers.restart_async(\n", - " signum = signal.SIGKILL,\n", - " start_in = timedelta(minutes=1, seconds=10),\n", - " interval = timedelta(seconds=20),\n", - ")\n", - "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", - "results = get_queues_info_for(45)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Restarting all producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "reset(nconsumer = 3, nproducer = 3)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_producers = en.ProcessRegistry()\n", - "registry_on_producers.build(\n", - " \"tuto_\",\n", - " roles[\"producer\"],\n", - ")\n", - "registry_on_producers" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "registry_on_producers.restart_async(\n", - " signum = signal.SIGKILL,\n", - " start_at = datetime.now() + timedelta(minutes=1, seconds=10),\n", - " interval = timedelta(seconds=20),\n", - ")\n", - "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", - "results = get_queues_info_for(45)\n", - "results" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Cleaning" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "provider.destroy()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "enoslib", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.4" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/g5k/07_planning_service.ipynb b/g5k/07_planning_service.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..ad72cef09beb5121ee221ed25ac17c5f86bf47e5 --- /dev/null +++ b/g5k/07_planning_service.ipynb @@ -0,0 +1,230 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "3647b70c-d999-4b29-bb46-99fb3acca648", + "metadata": {}, + "source": [ + "# Schedule events using a Planning\n", + "\n", + "---\n", + "\n", + "- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html\n", + "- Instant chat: https://framateam.org/enoslib\n", + "- Source code: https://gitlab.inria.fr/discovery/enoslib\n", + "\n", + "---\n", + "\n", + "## Prerequisites\n", + "\n", + "<div class=\"alert alert-block alert-warning\">\n", + " Make sure you've run the one time setup for your environment\n", + "</div>" + ] + }, + { + "cell_type": "markdown", + "id": "420ab62c-2c71-4005-9f78-1860eb33bdfe", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c47e663e-4591-418f-9522-f384571aa3c0", + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "\n", + "import enoslib as en\n", + "import time\n", + "\n", + "en.init_logging(level=logging.INFO)\n", + "en.check()\n", + "\n", + "job_name = \"stress-planning\"\n", + "\n", + "conf = (\n", + " en.G5kConf.from_settings(job_name=job_name, walltime=\"3:00:00\", job_type=\"deploy\", env_name=\"debian11-nfs\")\n", + " .add_machine(roles=[\"groupA\", \"xp\"], cluster=\"paravance\", nodes=1)\n", + " .add_machine(roles=[\"groupB\", \"xp\"], cluster=\"paravance\", nodes=1)\n", + ")\n", + "\n", + "# This will validate the configuration, but not reserve resources yet\n", + "provider = en.G5k(conf)\n", + "\n", + "# Get actual resources\n", + "roles, networks = provider.init()\n", + "\n", + "\n", + "from datetime import datetime, timedelta\n", + "\n", + "with en.actions(roles=roles) as p:\n", + " p.apt(name=\"stress\", state=\"present\")\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "1497e51a-b684-4132-be9d-dc5ed4c7d4ca", + "metadata": {}, + "source": [ + "## Build the planning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "349f36a5-988e-40c1-b020-afedfa172c13", + "metadata": {}, + "outputs": [], + "source": [ + "ps = en.PlanningService()\n", + "\n", + "N = 30\n", + "delay = 70\n", + "# incremental start\n", + "for i in range(N):\n", + " (\n", + " # start every 3 second a stress process\n", + " ps.add_event(\n", + " en.StartEvent(\n", + " date=datetime.now() + timedelta(seconds=delay + i * 3), \n", + " host=roles[\"groupA\"][0],\n", + " cmd=\"stress -c 1\",\n", + " name=f\"mysleep-{i}\"\n", + " )\n", + " )\n", + " .add_event(\n", + " en.StartEvent(\n", + " date=datetime.now() + timedelta(seconds=delay + i * 3), \n", + " host=roles[\"groupB\"][0],\n", + " cmd=\"stress -c 1\",\n", + " name=f\"mysleep-{i}\"\n", + " )\n", + " )\n", + " )\n", + "\n", + "for i in range(N):\n", + " start = delay + N * 3 + 3\n", + " (\n", + " ps.add_event(\n", + " en.KillEvent(\n", + " date=datetime.now() + timedelta(seconds=start + i * 3), \n", + " host=roles[\"groupA\"][0],\n", + " name=f\"mysleep-{i}\"\n", + " )\n", + " )\n", + " .add_event(\n", + " en.KillEvent(\n", + " date=datetime.now() + timedelta(seconds=start + i * 3), \n", + " host=roles[\"groupB\"][0],\n", + " name=f\"mysleep-{i}\"\n", + " )\n", + " )\n", + " )\n", + " \n", + "\n", + "ps" + ] + }, + { + "cell_type": "markdown", + "id": "09c4dba8-8a0c-4f4d-83b1-b0ea29983034", + "metadata": {}, + "source": [ + "## Execute the planning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dcb50003-46b5-4cb9-a92b-734d43e81bf3", + "metadata": {}, + "outputs": [], + "source": [ + "ps.until_end.total_seconds()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "129479b8-3d5d-4280-a6ef-43c517ee6a27", + "metadata": {}, + "outputs": [], + "source": [ + "# start monitoring\n", + "dstat = en.Dstat(nodes=roles[\"xp\"])\n", + "dstat.destroy()\n", + "dstat.deploy()\n", + "\n", + "# deploy the planning\n", + "ps.deploy()\n", + "# waiting a bit\n", + "time.sleep(ps.until_end.total_seconds() + 60)\n", + "\n", + "# backup the data\n", + "dstat.destroy()\n", + "dstat.backup()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df7e173e-058d-4ceb-a15d-f0b9c02482f5", + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ba577023-7738-4b1e-84e1-0676184519ff", + "metadata": {}, + "outputs": [], + "source": [ + "import seaborn as sns\n", + "df = en.Dstat.to_pandas(Path(\"./__enoslib_dstat__/\"))\n", + "\n", + "sns.lineplot(df, x=\"epoch\", y=\"usr\", hue=\"csv\", legend=\"full\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30a06fb9-2ea2-412d-87b5-e6be4b4386ff", + "metadata": {}, + "outputs": [], + "source": [ + "ps.destroy()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/g5k/08_schedule_events_using_a_planning.ipynb b/g5k/08_planning_service_revisited.ipynb similarity index 83% rename from g5k/08_schedule_events_using_a_planning.ipynb rename to g5k/08_planning_service_revisited.ipynb index ac0608af4c76bdf4103e8b6886d458d6d607cbbf..870aa17d5d9accabcebdcb2d3d37d57254ef62c9 100644 --- a/g5k/08_schedule_events_using_a_planning.ipynb +++ b/g5k/08_planning_service_revisited.ipynb @@ -5,7 +5,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Schedule events using a Planning\n", + "# Planning service revisited\n", "\n", "---\n", "\n", @@ -70,7 +70,8 @@ "conf = (\n", " en.G5kConf.from_settings(\n", " job_name=\"fault-injection tutorial\",\n", - " job_type=[],\n", + " job_type=[\"deploy\"],\n", + " env_name=\"debian11-nfs\"\n", " )\n", " .add_machine(roles=[\"server\"], cluster=CLUSTER, nodes=1)\n", " .add_machine(\n", @@ -134,7 +135,6 @@ "# Common configuration\n", "with en.actions(roles=roles) as p:\n", " p.apt(task_name=\"Installing python\", name=\"python3\")\n", - " p.apt(task_name=\"Installing procps\", name=\"procps\")\n", "\n", " p.command(\"apt update\")\n", "\n", @@ -383,7 +383,7 @@ " \"\"\"\n", " cleaning_registry = en.ProcessRegistry()\n", " cleaning_registry.build(\n", - " \"producer|consumer\",\n", + " \"{producer,consumer}\",\n", " roles[\"consumer\"] + roles[\"producer\"],\n", " )\n", " cleaning_registry.kill(signal.SIGKILL)\n", @@ -408,114 +408,6 @@ " )" ] }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## General knowledge" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Events\n", - "\n", - "An ```Event``` define a command that will run at a specific date (```datetime.datetime```) on a specific host (```Host```)." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### StartEvent\n", - "\n", - "A ```StartEvent```, as its name suggests, will start a named process.\n", - "\n", - "In addtion of the base class ```Event``` attributes, the ```command``` to launch the process and its ```name``` must be specified and are of type ```str```.\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Not supposed to return anything ! The only purpose of this code is to show how a StartEvent is created.\n", - "event_date = datetime.now()\n", - "event_host = en.Host(\"address\")\n", - "event_cmd = \"sleep infinity\"\n", - "process_name = \"name\"\n", - "\n", - "start_event = en.StartEvent(\n", - " date = event_date,\n", - " host = event_host,\n", - " cmd = event_cmd,\n", - " ns = process_name, \n", - ")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### KillEvent\n", - "\n", - "A ```KillEvent```, as its name suggests, will kill a previously started named process.\n", - "\n", - "In addition of the base class ```Event``` attributes, only the ```name``` of the process to kill must be specified, which is of type ```str```." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Not supposed to return anything ! The only purpose of this code is to show how a KillEvent is created.\n", - "event_date = datetime.now()\n", - "event_host = en.Host(\"address\")\n", - "process_name = \"name\"\n", - "\n", - "kill_event = en.KillEvent(\n", - " date = event_date,\n", - " host = event_host,\n", - " ns = process_name, \n", - ")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Planning\n", - "\n", - "A ```Planning``` can be considered as an Event storage.\n", - "\n", - "A ```PlanningService``` records all processes specified in a ```Planning```. It can be seen as an Event registry.\n", - "\n", - "To define a ```PlanningService```, specifying a ```Planning``` is necessary." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "planning = en.Planning()\n", - "planning.add_event(start_event)\n", - "planning.add_event(kill_event)\n", - "\n", - "planning_service = en.PlanningService(planning)" - ] - }, { "attachments": {}, "cell_type": "markdown", @@ -553,7 +445,7 @@ "metadata": {}, "outputs": [], "source": [ - "planning = en.Planning()\n", + "planning = en.PlanningService()\n", "\n", "n = 5\n", "\n", @@ -564,7 +456,7 @@ " date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),\n", " host = roles[\"producer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", - " ns = f\"producer_{idx}\", \n", + " name = f\"producer_{idx}\", \n", " )\n", " )\n", " planning.add_event(\n", @@ -572,7 +464,7 @@ " date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),\n", " host = roles[\"consumer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", - " ns = f\"consumer_{idx}\", \n", + " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", @@ -581,7 +473,7 @@ " en.KillEvent(\n", " date = time_now + timedelta(minutes = 4),\n", " host = roles[\"producer\"][0],\n", - " ns = f\"producer_{idx}\", \n", + " name = f\"producer_{idx}\", \n", " )\n", " )\n", "\n", @@ -590,12 +482,10 @@ " en.KillEvent(\n", " date = time_now + timedelta(minutes = 4, seconds = 30),\n", " host = roles[\"consumer\"][0],\n", - " ns = f\"consumer_{idx}\", \n", + " name = f\"consumer_{idx}\", \n", " )\n", " )\n", - "\n", - "planning_service = en.PlanningService(planning=planning, delay = timedelta(minutes=1))\n", - "planning_service.deploy()" + "planning.deploy()" ] }, { @@ -606,16 +496,6 @@ "We can have a first advice regarding the consistency of the planning." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Will return an exception if anything seems to be wrong, else None.\n", - "planning.check()" - ] - }, { "attachments": {}, "cell_type": "markdown", @@ -624,15 +504,6 @@ "We can have an up-to-date state of all the processes." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "planning_service.status()" - ] - }, { "attachments": {}, "cell_type": "markdown", @@ -675,7 +546,7 @@ "metadata": {}, "outputs": [], "source": [ - "planning_service.destroy()" + "planning.destroy()" ] }, { @@ -716,7 +587,7 @@ "metadata": {}, "outputs": [], "source": [ - "planning = en.Planning()\n", + "planning = en.PlanningService()\n", "\n", "time_now = datetime.now()\n", "for idx in range(20):\n", @@ -725,7 +596,7 @@ " date = time_now + timedelta(minutes = 4),\n", " host = roles[\"producer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", - " ns = f\"producer_{idx}\", \n", + " name = f\"producer_{idx}\", \n", " )\n", " )\n", "for idx in range(100):\n", @@ -734,7 +605,7 @@ " date = time_now + timedelta(minutes = 5),\n", " host = roles[\"consumer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", - " ns = f\"consumer_{idx}\", \n", + " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", @@ -743,13 +614,12 @@ " en.KillEvent(\n", " date = time_now + timedelta(minutes = 6),\n", " host = roles[\"consumer\"][0],\n", - " ns = f\"consumer_{idx}\", \n", + " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", - "planning_service = en.PlanningService(planning=planning)\n", "planning.check()\n", - "planning_service.deploy()" + "planning.deploy()" ] }, { @@ -771,6 +641,33 @@ "results.plot(x=\"Time\", y=[\"nb_consumers\", \"nb_producers\"], rot=45)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "r = planning.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning.destroy()" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -787,11 +684,18 @@ "source": [ "provider.destroy()" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "enoslib", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -805,10 +709,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.4" - }, - "orig_nbformat": 4 + "version": "3.10.5" + } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/g5k/index.rst b/g5k/index.rst index 9953925fed5fe76b8c5aa440aef60f355462b37e..69124faecf703882ce1fc60e5dcd0825b47814b4 100644 --- a/g5k/index.rst +++ b/g5k/index.rst @@ -12,4 +12,6 @@ Jupyter + EnOSlib + Grid'5000 = 💖 03_using_several_networks.ipynb 04_working_with_virtualized_resources.ipynb 05_network_emulation.ipynb - 06_orchestrators.ipynb \ No newline at end of file + 06_orchestrators.ipynb + 07_planning_service.ipynb + 08_planning_service_revisited.ipynb \ No newline at end of file