diff --git a/g5k/07_fault_injection_on_processes.ipynb b/g5k/07_fault_injection_on_processes.ipynb index f0e38156f0ffb8224f473bdec0a7e5c33695782b..334dea8c48d70c0c9e436858a946767034690fd8 100644 --- a/g5k/07_fault_injection_on_processes.ipynb +++ b/g5k/07_fault_injection_on_processes.ipynb @@ -134,7 +134,7 @@ "source": [ "Each node must have this minimal configuration :\n", "- having python and pip\n", - "- having procps (to use pgrep)\n", + "- having procps (to use kill)\n", "- having pika (for the rabbitmq connection)" ] }, @@ -278,7 +278,7 @@ "metadata": {}, "source": [ "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", - "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s))\n", + "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))\n", "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)\n", "- to launch all producer(s) and consumer(s) by specifying the number of each\n", "- to reset the experiment by going back to its initial state (clean + launch) " @@ -291,11 +291,12 @@ "outputs": [], "source": [ "## Get server's IP address on the private network\n", - "from ast import List\n", + "from typing import List\n", "import pandas as pd\n", "import logging\n", "from enoslib.log import DisableLogging\n", "from enoslib.config import config_context\n", + "from IPython.display import clear_output\n", "\n", "\n", "server = roles[\"server\"][0]\n", @@ -335,7 +336,8 @@ " results[\"nb_consumers\"] = []\n", " results[\"nb_producers\"] = []\n", " for _ in range(duration):\n", - " results[\"Time\"].append(str(datetime.now().strftime(\"%H:%M:%S\")))\n", + " time = str(datetime.now().strftime(\"%H:%M:%S\"))\n", + " results[\"Time\"].append(time)\n", "\n", " queues_info = _get_queues_info()\n", "\n", @@ -364,6 +366,15 @@ " results[\"nb_producers\"].append(int(nb_producers))\n", " results[\"nb_received_messages\"].append(int(nb_recv_msg))\n", "\n", + " clear_output(wait=False)\n", + " print(\n", + " f\"Time : {time}\\n\"\n", + " f\"nb_received_messages : {nb_recv_msg}\\n\"\n", + " f\"queue_depth : {queue_depth}\\n\"\n", + " f\"nb_consumers: {nb_consumers}\\n\"\n", + " f\"nb_producers: {nb_producers}\\n\"\n", + " )\n", + "\n", "\n", " df = pd.DataFrame(data=results)\n", "\n", @@ -561,7 +572,7 @@ "outputs": [], "source": [ "results_before_kill = get_queues_info_for(5)\n", - "registry_on_consumers.kill(signal.SIGKILL)\n", + "registry_on_consumers.kill(signum=signal.SIGKILL)\n", "results_after_kill = get_queues_info_for(5)" ] }, @@ -621,7 +632,7 @@ "outputs": [], "source": [ "results_before_kill = get_queues_info_for(5)\n", - "registry_on_producers.kill(signal.SIGKILL)\n", + "registry_on_producers.kill(signum=signal.SIGKILL)\n", "results_after_kill = get_queues_info_for(5)" ] }, @@ -656,7 +667,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Here, each kill is scheduled using the ```time_span``` parameter, which can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object." + "Here, each kill is scheduled using the ```start_in``` or ```start_at``` parameters, which can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object." ] }, { @@ -698,9 +709,9 @@ "source": [ "registry_on_consumers.kill_async(\n", " signum = signal.SIGKILL,\n", - " time_span = timedelta(minutes=1, seconds=20),\n", + " start_in = timedelta(minutes=1, seconds=20),\n", ")\n", - "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", + "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", "results = get_queues_info_for(40)\n", "results" ] @@ -744,9 +755,9 @@ "source": [ "registry_on_producers.kill_async(\n", " signum = signal.SIGKILL,\n", - " time_span = datetime.now() + timedelta(minutes=1, seconds=20),\n", + " start_at = datetime.now() + timedelta(minutes=1, seconds=20),\n", ")\n", - "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", + "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", "results = get_queues_info_for(40)\n", "results" ] @@ -803,10 +814,10 @@ "registry_on_consumers.kill_async_incr(\n", " signum = signal.SIGKILL,\n", " number = 2,\n", - " beginning = datetime.now() + timedelta(minutes=1, seconds=10),\n", + " start_at = datetime.now() + timedelta(minutes=1, seconds=10),\n", " interval = timedelta(seconds=20),\n", ")\n", - "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", + "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", "results = get_queues_info_for(45)\n", "results" ] @@ -834,7 +845,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset(nconsumer = 3, nproducer = 3)``` has nothing to do with the ```reset(nconsumer = 3, nproducer = 3)``` specificly implemented for this experiment ! " + "We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset(nconsumer, nproducer)``` specificly implemented for this experiment ! " ] }, { @@ -847,6 +858,16 @@ "registry_on_consumers" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results = get_queues_info_for(2)\n", + "results" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -862,7 +883,7 @@ "source": [ "We can restart an entire registry, this means killing all of them (if alive) and starting them again.\n", "\n", - "It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the time delta before they happen) and its interval with the start(s) of the docker container(s) can be specified." + "It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the delay before they happen) and its interval with the start(s) of the process(es) can be specified." ] }, { @@ -904,10 +925,10 @@ "source": [ "registry_on_consumers.restart_async(\n", " signum = signal.SIGKILL,\n", - " time_span = timedelta(minutes=1, seconds=10),\n", + " start_in = timedelta(minutes=1, seconds=10),\n", " interval = timedelta(seconds=20),\n", ")\n", - "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", + "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", "results = get_queues_info_for(45)\n", "results" ] @@ -951,10 +972,10 @@ "source": [ "registry_on_producers.restart_async(\n", " signum = signal.SIGKILL,\n", - " time_span = datetime.now() + timedelta(minutes=1, seconds=20),\n", + " start_at = datetime.now() + timedelta(minutes=1, seconds=10),\n", " interval = timedelta(seconds=20),\n", ")\n", - "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", + "# each iteration last for ~2 sec (2 requests + 1sec sleep)\n", "results = get_queues_info_for(45)\n", "results" ] diff --git a/g5k/10_schedule_events_using_a_planning.ipynb b/g5k/10_schedule_events_using_a_planning.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..a1f0230b149a7d1deb16cf3291b80dfff453a829 --- /dev/null +++ b/g5k/10_schedule_events_using_a_planning.ipynb @@ -0,0 +1,805 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Schedule events using a Planning\n", + "\n", + "---\n", + "\n", + "- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html\n", + "- Instant chat: https://framateam.org/enoslib\n", + "- Source code: https://gitlab.inria.fr/discovery/enoslib\n", + "\n", + "---\n", + "\n", + "## Prerequisites\n", + "\n", + "<div class=\"alert alert-block alert-warning\">\n", + " Make sure you've run the one time setup for your environment\n", + "</div>\n", + "\n", + "<div class=\"alert alert-block alert-warning\">\n", + " Make sure you've done the tutorial : 07_fault_injection_on_processes\n", + "</div>" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import signal\n", + "import os\n", + "from datetime import datetime, timedelta\n", + "\n", + "import enoslib as en\n", + "\n", + "en.init_logging()\n", + "en.check()\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Reservation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "CLUSTER = \"nova\"\n", + "HERE = os.getcwd()\n", + "# claim the resources\n", + "conf = (\n", + " en.G5kConf.from_settings(\n", + " job_name=\"fault-injection tutorial\",\n", + " job_type=[],\n", + " )\n", + " .add_machine(roles=[\"server\"], cluster=CLUSTER, nodes=1)\n", + " .add_machine(\n", + " roles=[\"producer\"], cluster=CLUSTER, nodes=1\n", + " ) # all the producers are running on the same machine\n", + " .add_machine(\n", + " roles=[\"consumer\"], cluster=CLUSTER, nodes=1\n", + " ) # all the consumers are running on the same machine\n", + ")\n", + "\n", + "provider = en.G5k(conf)\n", + "\n", + "roles, networks = provider.init()\n", + "\n", + "# Fill in network information from nodes\n", + "roles = en.sync_info(roles, networks)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "roles" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Rabbitmq configuration" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Common node's configuration" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Each node must have this minimal configuration :\n", + "- having python and pip\n", + "- having procps (to use kill)\n", + "- having pika (for the rabbitmq connection)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Common configuration\n", + "with en.actions(roles=roles) as p:\n", + " p.apt(task_name=\"Installing python\", name=\"python3\")\n", + " p.apt(task_name=\"Installing procps\", name=\"procps\")\n", + "\n", + " p.command(\"apt update\")\n", + "\n", + " p.apt(task_name=\"Installing pip\", name=\"python3-pip\")\n", + " p.pip(task_name=\"Installing pika\", name=\"pika\")\n", + "\n", + " p.file(path=\"/tmp/rabbitmq\", state=\"absent\")\n", + " p.file(path=\"/tmp/rabbitmq\", state=\"directory\")\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Server configuration" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "username_monitoring = \"user\"\n", + "password_monitoring = \"password\"\n", + "\n", + "# SETUP\n", + "## Server configuration\n", + "with en.actions(roles=roles[\"server\"]) as p:\n", + " # Setting the rabbimq server\n", + " p.apt(task_name=\"Installing rabbitmq-server\", name=\"rabbitmq-server\")\n", + " p.command(\"rabbitmq-plugins enable rabbitmq_management\")\n", + " p.command(\"systemctl start rabbitmq-server\")\n", + " p.command(\"systemctl enable rabbitmq-server\")\n", + " \n", + " # For the management interface, adding a new admin\n", + " p.command(f\"rabbitmqctl add_user {username_monitoring} {password_monitoring}\")\n", + " p.command(f\"rabbitmqctl set_user_tags {username_monitoring} administrator\")\n", + " p.command(f\"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'\")\n", + "\n", + " # Allow all connections (no credentials needed for consumers and producers)\n", + " p.shell('echo \"loopback_users.guest = false\" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')\n", + "\n", + " p.command(\"systemctl restart rabbitmq-server\")\n", + " " + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Producers' node configuration" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The producers' node has to be configured such that it contains its specific script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with en.actions(roles=roles[\"producer\"]) as p:\n", + " p.copy(\n", + " src=HERE + \"/producer.py\",\n", + " dest=\"/tmp/rabbitmq/producer.py\",\n", + " task_name=\"copying producer file\",\n", + " )" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Consumers' node configuration" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The consumers' node has to be configured such that it contains its specific script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with en.actions(roles=roles[\"consumer\"]) as p:\n", + " p.copy(\n", + " src=HERE + \"/consumer.py\",\n", + " dest=\"/tmp/rabbitmq/consumer.py\",\n", + " task_name=\"copying consumer file\",\n", + " )" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Utility functions" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", + "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))\n", + "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)\n", + "- to launch all producer(s) and consumer(s) by specifying the number of each\n", + "- to reset the experiment by going back to its initial state (clean + launch) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "## Get server's IP address on the private network\n", + "from typing import List\n", + "import pandas as pd\n", + "import logging\n", + "from enoslib.log import DisableLogging\n", + "from enoslib.config import config_context\n", + "from IPython.display import clear_output\n", + "\n", + "\n", + "server = roles[\"server\"][0]\n", + "ip_address_obj = server.filter_addresses(networks=networks[\"prod\"])[0]\n", + "## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface`\n", + "## which itself has an `ip` attribute.\n", + "server_ip = ip_address_obj.ip.ip\n", + "\n", + "def _get_queues_info() -> List:\n", + " with DisableLogging(level=logging.ERROR):\n", + " with config_context(ansible_stdout=\"noop\"):\n", + " results = en.run_command(\n", + " \"rabbitmqctl list_queues -p '/' messages consumers | \"\n", + " \"awk 'NR>3 {printf \\\"%-15s %-15s\\\\n\\\", $1, $2}' ; \"\n", + " \"rabbitmqctl list_connections | grep guest | wc -l\",\n", + " task_name=\"Gathering statistics from the queues\",\n", + " roles=roles[\"server\"],\n", + " gather_facts=False,\n", + " on_error_continue=True,\n", + " )\n", + " queues_info : List = []\n", + " for r in results:\n", + " if r.status == \"FAILED\" or r.rc != 0:\n", + " continue\n", + " lines = r.stdout.strip().split(\"\\n\")\n", + " for l in lines:\n", + " info = l.strip().split(\" \")\n", + " queues_info.append([v for v in info if v!= \"\"])\n", + "\n", + " return queues_info\n", + "\n", + "def get_queues_info_for(duration: int) -> pd.DataFrame:\n", + " results = {}\n", + " results[\"Time\"] = []\n", + " results[\"nb_received_messages\"] = []\n", + " results[\"queue_depth\"] = []\n", + " results[\"nb_consumers\"] = []\n", + " results[\"nb_producers\"] = []\n", + " for _ in range(duration):\n", + " time = str(datetime.now().strftime(\"%H:%M:%S\"))\n", + " results[\"Time\"].append(time)\n", + "\n", + " queues_info = _get_queues_info()\n", + "\n", + " try:\n", + " queue_depth = queues_info[0][0]\n", + " except IndexError as e:\n", + " queue_depth = 0\n", + "\n", + " try:\n", + " nb_consumers = queues_info[0][1]\n", + " except IndexError as e:\n", + " nb_consumers = 0\n", + "\n", + " try:\n", + " nb_recv_msg = queues_info[1][0]\n", + " except IndexError as e:\n", + " nb_recv_msg = 0\n", + "\n", + " try:\n", + " nb_producers = int(queues_info[2][0]) - int(nb_consumers)\n", + " except IndexError as e:\n", + " nb_producers = 0\n", + "\n", + " results[\"queue_depth\"].append(int(queue_depth))\n", + " results[\"nb_consumers\"].append(int(nb_consumers))\n", + " results[\"nb_producers\"].append(int(nb_producers))\n", + " results[\"nb_received_messages\"].append(int(nb_recv_msg))\n", + "\n", + " clear_output(wait=False)\n", + " print(\n", + " f\"Time : {time}\\n\"\n", + " f\"nb_received_messages : {nb_recv_msg}\\n\"\n", + " f\"queue_depth : {queue_depth}\\n\"\n", + " f\"nb_consumers: {nb_consumers}\\n\"\n", + " f\"nb_producers: {nb_producers}\\n\"\n", + " )\n", + "\n", + " df = pd.DataFrame(data=results)\n", + "\n", + " return df\n", + "\n", + "def clean():\n", + " \"\"\"\n", + " Kill all previouses launched processes, \n", + " removes all previouses results,\n", + " purges the queue.\n", + " \"\"\"\n", + " cleaning_registry = en.ProcessRegistry()\n", + " cleaning_registry.build(\n", + " \"producer|consumer\",\n", + " roles[\"consumer\"] + roles[\"producer\"],\n", + " )\n", + " cleaning_registry.kill(signal.SIGKILL)\n", + "\n", + " with DisableLogging(level=logging.ERROR):\n", + " with config_context(ansible_stdout=\"noop\"):\n", + " en.run_command(\n", + " \"rabbitmqctl purge_queue fault_injection & \"\\\n", + " \"rabbitmqctl purge_queue received_messages & \",\n", + " task_name=\"purging the queue\",\n", + " roles=roles[\"server\"],\n", + " on_error_continue=True,\n", + " gather_facts=False,\n", + " )\n", + "\n", + " en.run_command(\n", + " \"crontab -r\",\n", + " task_name=\"purging crontab file\",\n", + " roles=roles[\"consumer\"] + roles[\"producer\"],\n", + " on_error_continue=True,\n", + " gather_facts=False,\n", + " )" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## General knowledge" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Events\n", + "\n", + "An ```Event``` define a command that will run at a specific date (```datetime.datetime```) on a specific host (```Host```)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### StartEvent\n", + "\n", + "A ```StartEvent```, as its name suggests, will start a named process.\n", + "\n", + "In addtion of the base class ```Event``` attributes, the ```command``` to launch the process and its ```name``` must be specified and are of type ```str```.\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Not supposed to return anything ! The only purpose of this code is to show how a StartEvent is created.\n", + "event_date = datetime.now()\n", + "event_host = en.Host(\"address\")\n", + "event_cmd = \"sleep infinity\"\n", + "process_name = \"name\"\n", + "\n", + "start_event = en.StartEvent(\n", + " date = event_date,\n", + " host = event_host,\n", + " cmd = event_cmd,\n", + " ns = process_name, \n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### KillEvent\n", + "\n", + "A ```KillEvent```, as its name suggests, will kill a named process.\n", + "\n", + "In addtion of the base class ```Event``` attributes, only ```name``` of the process to kill must be specified, which is of type ```str```." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Not supposed to return anything ! The only purpose of this code is to show how a KillEvent is created.\n", + "event_date = datetime.now()\n", + "event_host = en.Host(\"address\")\n", + "process_name = \"name\"\n", + "\n", + "kill_event = en.KillEvent(\n", + " date = event_date,\n", + " host = event_host,\n", + " ns = process_name, \n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Planning\n", + "\n", + "A ```Planning``` can be considered as an Event storage.\n", + "\n", + "A ```PlanningService``` records all processes specified in a ```Planning```. It can be seen as an Event registry.\n", + "\n", + "To define a ```PlanningService```, specifying a ```Planning``` is necessary." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning = en.Planning()\n", + "planning.add_event(start_event)\n", + "planning.add_event(kill_event)\n", + "\n", + "planning_service = en.PlanningService(planning)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Examples" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### A first deployment\n", + "\n", + "We are here gonna schedule :\n", + "- the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds\n", + "- the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched\n", + "- the kill of 3 rabbitmq producers 4 minutes after the start of the experiment\n", + "- the kill of all the rabbitmq consumers 30 seconds after the producers are killed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "clean()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning = en.Planning()\n", + "\n", + "n = 5\n", + "\n", + "time_now = datetime.now()\n", + "for idx in range(n):\n", + " planning.add_event(\n", + " en.StartEvent(\n", + " date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),\n", + " host = roles[\"producer\"][0],\n", + " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", + " ns = f\"producer_{idx}\", \n", + " )\n", + " )\n", + " planning.add_event(\n", + " en.StartEvent(\n", + " date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),\n", + " host = roles[\"consumer\"][0],\n", + " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", + " ns = f\"consumer_{idx}\", \n", + " )\n", + " )\n", + "\n", + "for idx in range(3):\n", + " planning.add_event(\n", + " en.KillEvent(\n", + " date = time_now + timedelta(minutes = 4),\n", + " host = roles[\"producer\"][0],\n", + " ns = f\"producer_{idx}\", \n", + " )\n", + " )\n", + "\n", + "for idx in range(n):\n", + " planning.add_event(\n", + " en.KillEvent(\n", + " date = time_now + timedelta(minutes = 4, seconds = 30),\n", + " host = roles[\"consumer\"][0],\n", + " ns = f\"consumer_{idx}\", \n", + " )\n", + " )\n", + "\n", + "planning_service = en.PlanningService(planning=planning, delay = timedelta(minutes=1))\n", + "planning_service.deploy()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can have a first advice regarding the consistency of the planning." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Will return an exception if anything seems to be wrong, else None.\n", + "planning.check()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can have an up-to-date state of all the processes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning_service.status()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's observe the evolution of our events." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results = get_queues_info_for(150)\n", + "results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results.plot(x=\"Time\", y=[\"nb_received_messages\", \"queue_depth\"], rot=45)\n", + "results.plot(x=\"Time\", y=[\"nb_consumers\", \"nb_producers\"], rot=45)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning_service.destroy()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Overload example" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set-up an overload can be easily made using a service such as ```PlanningService```.\n", + "\n", + "Let's see an example of it.\n", + "\n", + "What we are gonna schedule is : \n", + "- The launch of 20 rabbitmq producers after 2 minutes\n", + "- The launch of 100 rabbitmq consumers after 3 minutes\n", + "- The kill of 75 rabbitmq consumers after 4 minutes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "clean()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "planning = en.Planning()\n", + "\n", + "time_now = datetime.now()\n", + "for idx in range(20):\n", + " planning.add_event(\n", + " en.StartEvent(\n", + " date = time_now + timedelta(minutes = 4),\n", + " host = roles[\"producer\"][0],\n", + " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", + " ns = f\"producer_{idx}\", \n", + " )\n", + " )\n", + "for idx in range(100):\n", + " planning.add_event(\n", + " en.StartEvent(\n", + " date = time_now + timedelta(minutes = 5),\n", + " host = roles[\"consumer\"][0],\n", + " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", + " ns = f\"consumer_{idx}\", \n", + " )\n", + " )\n", + "\n", + "for idx in range(70):\n", + " planning.add_event(\n", + " en.KillEvent(\n", + " date = time_now + timedelta(minutes = 6),\n", + " host = roles[\"consumer\"][0],\n", + " ns = f\"consumer_{idx}\", \n", + " )\n", + " )\n", + "\n", + "planning_service = en.PlanningService(planning=planning)\n", + "planning.check()\n", + "planning_service.deploy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results = get_queues_info_for(150)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results.plot(x=\"Time\", y=[\"nb_received_messages\", \"queue_depth\"], rot=45)\n", + "results.plot(x=\"Time\", y=[\"nb_consumers\", \"nb_producers\"], rot=45)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleaning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "provider.destroy()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "enoslib", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.4" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +}