Mentions légales du service

Skip to content
Snippets Groups Projects
Commit db683424 authored by TRABELSI Weissem's avatar TRABELSI Weissem
Browse files

planning tutorial

parent 123bd524
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags:
# Fault-injection on processes
---
- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html
- Instant chat: https://framateam.org/enoslib
- Source code: https://gitlab.inria.fr/discovery/enoslib
---
## Prerequisites
<div class="alert alert-block alert-warning">
Make sure you've run the one time setup for your environment
</div>
%% Cell type:markdown id: tags:
## Rabbitmq / Cron
[RabbitMQ](https://www.rabbitmq.com/) is an open-source message broker that enables different software applications to communicate and exchange data in a reliable and scalable manner. It follows the Advanced Message Queuing Protocol (AMQP) and provides a flexible messaging model based on the concept of queues.
For our experiment, we will deploy a publish / suscribe environment to demonstrate the impact of our api.
[Cron](https://man7.org/linux/man-pages/man8/cron.8.html) is a time-based job scheduler in Unix-like operating systems. It allows you to schedule and automate the execution of commands or scripts at specified intervals or specific times. Cron is commonly used for repetitive tasks, system maintenance, and scheduling periodic jobs.
All asynchronous tools shown here are based on cron. Because of that, we can't schedule an event that will happen before the next minute, after this delay, we can be precise to the second (with a final accuracy of 1/2 seconds).
%% Cell type:markdown id: tags:
## Setup
%% Cell type:code id: tags:
``` python
import signal
import os
from datetime import datetime, timedelta
import enoslib as en
en.init_logging()
en.check()
```
%% Cell type:markdown id: tags:
### Reservation
%% Cell type:code id: tags:
``` python
CLUSTER = "nova"
HERE = os.getcwd()
# claim the resources
conf = (
en.G5kConf.from_settings(
job_name="fault-injection tutorial",
job_type=[],
)
.add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
.add_machine(
roles=["producer"], cluster=CLUSTER, nodes=1
) # all the producers are running on the same machine
.add_machine(
roles=["consumer"], cluster=CLUSTER, nodes=1
) # all the consumers are running on the same machine
)
provider = en.G5k(conf)
roles, networks = provider.init()
# Fill in network information from nodes
roles = en.sync_info(roles, networks)
```
%% Cell type:code id: tags:
``` python
roles
```
%% Cell type:markdown id: tags:
### Rabbitmq configuration
%% Cell type:markdown id: tags:
#### Common node's configuration
%% Cell type:markdown id: tags:
Each node must have this minimal configuration :
- having python and pip
- having procps (to use pgrep)
- having procps (to use kill)
- having pika (for the rabbitmq connection)
%% Cell type:code id: tags:
``` python
# Common configuration
with en.actions(roles=roles) as p:
p.apt(task_name="Installing python", name="python3")
p.apt(task_name="Installing procps", name="procps")
p.command("apt update")
p.apt(task_name="Installing pip", name="python3-pip")
p.pip(task_name="Installing pika", name="pika")
p.file(path="/tmp/rabbitmq", state="absent")
p.file(path="/tmp/rabbitmq", state="directory")
```
%% Cell type:markdown id: tags:
#### Server configuration
%% Cell type:markdown id: tags:
Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.
%% Cell type:code id: tags:
``` python
username_monitoring = "user"
password_monitoring = "password"
# SETUP
## Server configuration
with en.actions(roles=roles["server"]) as p:
# Setting the rabbimq server
p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
p.command("rabbitmq-plugins enable rabbitmq_management")
p.command("systemctl start rabbitmq-server")
p.command("systemctl enable rabbitmq-server")
# For the management interface, adding a new admin
p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")
# Allow all connections (no credentials needed for consumers and producers)
p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')
p.command("systemctl restart rabbitmq-server")
```
%% Cell type:markdown id: tags:
#### Producers' node configuration
%% Cell type:markdown id: tags:
The producers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["producer"]) as p:
p.copy(
src=HERE + "/producer.py",
dest="/tmp/rabbitmq/producer.py",
task_name="copying producer file",
)
```
%% Cell type:markdown id: tags:
#### Consumers' node configuration
%% Cell type:markdown id: tags:
The consumers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["consumer"]) as p:
p.copy(
src=HERE + "/consumer.py",
dest="/tmp/rabbitmq/consumer.py",
task_name="copying consumer file",
)
```
%% Cell type:markdown id: tags:
#### Utility functions
%% Cell type:markdown id: tags:
The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :
- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s))
- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))
- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)
- to launch all producer(s) and consumer(s) by specifying the number of each
- to reset the experiment by going back to its initial state (clean + launch)
%% Cell type:code id: tags:
``` python
## Get server's IP address on the private network
from ast import List
from typing import List
import pandas as pd
import logging
from enoslib.log import DisableLogging
from enoslib.config import config_context
from IPython.display import clear_output
server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface`
## which itself has an `ip` attribute.
server_ip = ip_address_obj.ip.ip
def _get_queues_info() -> List:
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
results = en.run_command(
"rabbitmqctl list_queues -p '/' messages consumers | "
"awk 'NR>3 {printf \"%-15s %-15s\\n\", $1, $2}' ; "
"rabbitmqctl list_connections | grep guest | wc -l",
task_name="Gathering statistics from the queues",
roles=roles["server"],
gather_facts=False,
on_error_continue=True,
)
queues_info : List = []
for r in results:
if r.status == "FAILED" or r.rc != 0:
continue
lines = r.stdout.strip().split("\n")
for l in lines:
info = l.strip().split(" ")
queues_info.append([v for v in info if v!= ""])
return queues_info
def get_queues_info_for(duration: int) -> pd.DataFrame:
results = {}
results["Time"] = []
results["nb_received_messages"] = []
results["queue_depth"] = []
results["nb_consumers"] = []
results["nb_producers"] = []
for _ in range(duration):
results["Time"].append(str(datetime.now().strftime("%H:%M:%S")))
time = str(datetime.now().strftime("%H:%M:%S"))
results["Time"].append(time)
queues_info = _get_queues_info()
try:
queue_depth = queues_info[0][0]
except IndexError as e:
queue_depth = 0
try:
nb_consumers = queues_info[0][1]
except IndexError as e:
nb_consumers = 0
try:
nb_recv_msg = queues_info[1][0]
except IndexError as e:
nb_recv_msg = 0
try:
nb_producers = int(queues_info[2][0]) - int(nb_consumers)
except IndexError as e:
nb_producers = 0
results["queue_depth"].append(int(queue_depth))
results["nb_consumers"].append(int(nb_consumers))
results["nb_producers"].append(int(nb_producers))
results["nb_received_messages"].append(int(nb_recv_msg))
clear_output(wait=False)
print(
f"Time : {time}\n"
f"nb_received_messages : {nb_recv_msg}\n"
f"queue_depth : {queue_depth}\n"
f"nb_consumers: {nb_consumers}\n"
f"nb_producers: {nb_producers}\n"
)
df = pd.DataFrame(data=results)
return df
def clean():
"""
Kill all previouses launched processes,
removes all previouses results,
purges the queue.
"""
cleaning_registry = en.ProcessRegistry()
cleaning_registry.build(
"tuto_",
roles["consumer"] + roles["producer"],
)
cleaning_registry.kill(signal.SIGKILL)
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
en.run_command(
"rabbitmqctl purge_queue fault_injection & "\
"rabbitmqctl purge_queue received_messages & ",
task_name="purging the queue",
roles=roles["server"],
on_error_continue=True,
gather_facts=False,
)
en.run_command(
"crontab -r",
task_name="purging crontab file",
roles=roles["consumer"] + roles["producer"],
on_error_continue=True,
gather_facts=False,
)
def launch(nconsumer, nproducer):
"""
Launch all consumers and producers.
"""
for idx in range(nconsumer):
en.run_command(
f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
task_name=f"run consumer script number {idx}",
roles=roles["consumer"],
background=True,
gather_facts=False,
ns=f"tuto_consumer_{idx}",
)
for idx in range(nproducer):
en.run_command(
f"python3 /tmp/rabbitmq/producer.py {server_ip}",
task_name=f"run producer script number {idx}",
roles=roles["producer"],
background=True,
gather_facts=False,
ns=f"tuto_producer_{idx}",
)
def reset(nconsumer, nproducer):
"""
Return to the initial state of the experiment.
Can throw error (not to take into account !) if :
-> no previous launched processes
-> no previous output files
"""
print(
"\n ------------------------------------ ",
"\n| RESETING THE EXPERIMENT PARAMETERS |",
"\n ------------------------------------ ",
)
clean()
launch(nconsumer, nproducer)
print(
"\n ------------------------------ ",
"\n| DONE - INITIAL STATE REACHED |",
"\n ------------------------------ ",
)
```
%% Cell type:markdown id: tags:
## General knowledge
A ```ProcessRegistry``` records all processes that follows a given regexp on specific roles.
%% Cell type:code id: tags:
``` python
# does not work, it is just an example on how to build one
registry = en.ProcessRegistry()
registry.build("regexp", roles)
registry
```
%% Cell type:markdown id: tags:
One ```Process``` is defined by its pid, its host and its command, a state (ALIVE or DEAD) is also attributed to each instance.
%% Cell type:code id: tags:
``` python
host = en.Host("192.168.0.3", alias="one_alias", user="foo")
process = en.Process("process_name", 1234, host, "cmd")
process
```
%% Cell type:markdown id: tags:
For each case below, we will act on either the consumers or the producers, never both. It can be the entire group or only a subset.
When acting on consumers, we will observe an increase of the queue depth, meaning that the messages are not processed as fast as they are produced.
When acting on producers, we will observe no evolution regarding the number of processed messages.
%% Cell type:markdown id: tags:
## First example : Synchronous case
%% Cell type:markdown id: tags:
Using this type of kill, the user must wait for the end before doing anything else.
We can specify the sended signal.
%% Cell type:markdown id: tags:
### Killing all consumers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry = en.ProcessRegistry()
registry.build(
"tuto_",
roles
)
registry_on_consumers = registry.lookup(roles["consumer"])
registry_on_consumers
```
%% Cell type:code id: tags:
``` python
results_before_kill = get_queues_info_for(5)
registry_on_consumers.kill(signal.SIGKILL)
registry_on_consumers.kill(signum=signal.SIGKILL)
results_after_kill = get_queues_info_for(5)
```
%% Cell type:code id: tags:
``` python
results_before_kill
```
%% Cell type:code id: tags:
``` python
results_after_kill
```
%% Cell type:markdown id: tags:
### Killing all producers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"tuto_",
roles["producer"]
)
registry_on_producers
```
%% Cell type:code id: tags:
``` python
results_before_kill = get_queues_info_for(5)
registry_on_producers.kill(signal.SIGKILL)
registry_on_producers.kill(signum=signal.SIGKILL)
results_after_kill = get_queues_info_for(5)
```
%% Cell type:code id: tags:
``` python
results_before_kill
```
%% Cell type:code id: tags:
``` python
results_after_kill
```
%% Cell type:markdown id: tags:
## Second example : asynchronous case
%% Cell type:markdown id: tags:
Here, each kill is scheduled using the ```time_span``` parameter, which can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object.
Here, each kill is scheduled using the ```start_in``` or ```start_at``` parameters, which can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object.
%% Cell type:markdown id: tags:
### Killing all consumers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"tuto_",
roles["consumer"],
)
registry_on_consumers
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.kill_async(
signum = signal.SIGKILL,
time_span = timedelta(minutes=1, seconds=20),
start_in = timedelta(minutes=1, seconds=20),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
# each iteration last for ~2 sec (2 requests + 1sec sleep)
results = get_queues_info_for(40)
results
```
%% Cell type:markdown id: tags:
### Killing all producers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"tuto_",
roles["producer"],
)
registry_on_producers
```
%% Cell type:code id: tags:
``` python
registry_on_producers.kill_async(
signum = signal.SIGKILL,
time_span = datetime.now() + timedelta(minutes=1, seconds=20),
start_at = datetime.now() + timedelta(minutes=1, seconds=20),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
# each iteration last for ~2 sec (2 requests + 1sec sleep)
results = get_queues_info_for(40)
results
```
%% Cell type:markdown id: tags:
## Third example : Incremental case
%% Cell type:markdown id: tags:
Here, each kill is scheduled and the choice of the killed process(es) is done totally randomly among those registered, we have to specify how many process(es) we want to kill, the beginning (date of the first kill) and the interval between each of them.
The beginning can be expressed either as a delay using a ```datetime.timedelta()``` object, or as a date using a ```datetime.datetime()``` object.
The interval is a ```datetime.timedelta``` object.
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"tuto_",
roles["consumer"],
)
registry_on_consumers
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.kill_async_incr(
signum = signal.SIGKILL,
number = 2,
beginning = datetime.now() + timedelta(minutes=1, seconds=10),
start_at = datetime.now() + timedelta(minutes=1, seconds=10),
interval = timedelta(seconds=20),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
# each iteration last for ~2 sec (2 requests + 1sec sleep)
results = get_queues_info_for(45)
results
```
%% Cell type:markdown id: tags:
We can have an updated version of the registry, with both dead and alive processes.
%% Cell type:code id: tags:
``` python
after_refresh = registry_on_consumers.refresh()
after_refresh
```
%% Cell type:markdown id: tags:
We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset(nconsumer = 3, nproducer = 3)``` has nothing to do with the ```reset(nconsumer = 3, nproducer = 3)``` specificly implemented for this experiment !
We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset(nconsumer, nproducer)``` specificly implemented for this experiment !
%% Cell type:code id: tags:
``` python
registry_on_consumers.reset()
registry_on_consumers
```
%% Cell type:code id: tags:
``` python
results = get_queues_info_for(2)
results
```
%% Cell type:markdown id: tags:
## Fourth example : Restart a registry
%% Cell type:markdown id: tags:
We can restart an entire registry, this means killing all of them (if alive) and starting them again.
It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the time delta before they happen) and its interval with the start(s) of the docker container(s) can be specified.
It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the delay before they happen) and its interval with the start(s) of the process(es) can be specified.
%% Cell type:markdown id: tags:
### Restarting all consumers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"tuto_",
roles["consumer"],
)
registry_on_consumers
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.restart_async(
signum = signal.SIGKILL,
time_span = timedelta(minutes=1, seconds=10),
start_in = timedelta(minutes=1, seconds=10),
interval = timedelta(seconds=20),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
# each iteration last for ~2 sec (2 requests + 1sec sleep)
results = get_queues_info_for(45)
results
```
%% Cell type:markdown id: tags:
### Restarting all producers
%% Cell type:code id: tags:
``` python
reset(nconsumer = 3, nproducer = 3)
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"tuto_",
roles["producer"],
)
registry_on_producers
```
%% Cell type:code id: tags:
``` python
registry_on_producers.restart_async(
signum = signal.SIGKILL,
time_span = datetime.now() + timedelta(minutes=1, seconds=20),
start_at = datetime.now() + timedelta(minutes=1, seconds=10),
interval = timedelta(seconds=20),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
# each iteration last for ~2 sec (2 requests + 1sec sleep)
results = get_queues_info_for(45)
results
```
%% Cell type:markdown id: tags:
## Fifth example : Planning
%% Cell type:markdown id: tags:
We can schedule every event of the experiment in one shot using a ```Planning```. Set all the cron may take time depending on the amount of events scheduled, because of that, the first event must happen at least 5 min after the ```PlanningService.deploy()```
%% Cell type:code id: tags:
``` python
reset(nconsumer = 0, nproducer = 0)
```
%% Cell type:code id: tags:
``` python
def launch_n_actors_cmd(actor:str, n:int):
cmd = []
for _ in range(n):
cmd.append(f"python3 /tmp/rabbitmq/{actor}.py {server_ip}")
string = " & ".join(cmd)
return string
```
%% Cell type:markdown id: tags:
After 6 minutes, we will launch 20 producers (5 each 10 seconds).
After 8 minutes, we will launch 100 consumers (20 each 10 seconds).
After 10 minutes, we will kill all the consumers consumers (20 each 10 seconds).
%% Cell type:code id: tags:
``` python
now = datetime.now()
planning = en.Planning()
cmd = launch_n_actors_cmd(actor = "producer", n = 5)
for m in range(5):
planning.add_event(
en.StartEvent(
date=now+timedelta(minutes=6, seconds=10*m),
host=roles["producer"][0],
cmd=cmd,
ns=f"tuto_producer_{m}",
)
)
cmd = launch_n_actors_cmd(actor = "consumer", n = 5)
for m in range(5):
(
planning.add_event(
en.StartEvent(
date=now+timedelta(minutes=8, seconds=10*m),
host=roles["consumer"][0],
cmd=cmd,
ns=f"tuto_consumer_1_{m}",
)
).add_event(
en.StartEvent(
date=now+timedelta(minutes=8, seconds=10*m),
host=roles["consumer"][0],
cmd=cmd,
ns=f"tuto_consumer_2_{m}",
)
).add_event(
en.StartEvent(
date=now+timedelta(minutes=8, seconds=10*m),
host=roles["consumer"][0],
cmd=cmd,
ns=f"tuto_consumer_3_{m}",
)
).add_event(
en.StartEvent(
date=now+timedelta(minutes=8, seconds=10*m),
host=roles["consumer"][0],
cmd=cmd,
ns=f"tuto_consumer_4_{m}",
)
)
)
```
%% Cell type:code id: tags:
``` python
for m in range(5):
(
planning.add_event(
en.KillEvent(
date=now+timedelta(minutes=10, seconds=10*m),
host=roles["consumer"][0],
ns=f"tuto_consumer_1_{m}",
)
).add_event(
en.KillEvent(
date=now+timedelta(minutes=10, seconds=10*m),
host=roles["consumer"][0],
ns=f"tuto_consumer_2_{m}",
)
).add_event(
en.KillEvent(
date=now+timedelta(minutes=10, seconds=10*m),
host=roles["consumer"][0],
ns=f"tuto_consumer_3_{m}",
)
).add_event(
en.KillEvent(
date=now+timedelta(minutes=10, seconds=10*m),
host=roles["consumer"][0],
ns=f"tuto_consumer_4_{m}",
)
)
)
```
%% Cell type:code id: tags:
``` python
planning_service = en.PlanningService(planning)
planning_service.deploy()
```
%% Cell type:markdown id: tags:
We can have an up-to-date version of the planning, with both current alive and dead processes. The following can be used multiple times instead of ```get_queues_info()``` to see the evolution of the rabbitmq actors.
%% Cell type:code id: tags:
``` python
planning_service.status()
```
%% Cell type:code id: tags:
``` python
results = get_queues_info_for(300)
```
%% Cell type:code id: tags:
``` python
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
```
%% Cell type:markdown id: tags:
## Cleaning
%% Cell type:code id: tags:
``` python
provider.destroy()
```
......
%% Cell type:markdown id: tags:
# Schedule events using a Planning
---
- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html
- Instant chat: https://framateam.org/enoslib
- Source code: https://gitlab.inria.fr/discovery/enoslib
---
## Prerequisites
<div class="alert alert-block alert-warning">
Make sure you've run the one time setup for your environment
</div>
<div class="alert alert-block alert-warning">
Make sure you've done the tutorial : 07_fault_injection_on_processes
</div>
%% Cell type:markdown id: tags:
## Setup
%% Cell type:code id: tags:
``` python
import signal
import os
from datetime import datetime, timedelta
import enoslib as en
en.init_logging()
en.check()
```
%% Cell type:markdown id: tags:
### Reservation
%% Cell type:code id: tags:
``` python
CLUSTER = "nova"
HERE = os.getcwd()
# claim the resources
conf = (
en.G5kConf.from_settings(
job_name="fault-injection tutorial",
job_type=[],
)
.add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
.add_machine(
roles=["producer"], cluster=CLUSTER, nodes=1
) # all the producers are running on the same machine
.add_machine(
roles=["consumer"], cluster=CLUSTER, nodes=1
) # all the consumers are running on the same machine
)
provider = en.G5k(conf)
roles, networks = provider.init()
# Fill in network information from nodes
roles = en.sync_info(roles, networks)
```
%% Cell type:code id: tags:
``` python
roles
```
%% Cell type:markdown id: tags:
### Rabbitmq configuration
%% Cell type:markdown id: tags:
#### Common node's configuration
%% Cell type:markdown id: tags:
Each node must have this minimal configuration :
- having python and pip
- having procps (to use kill)
- having pika (for the rabbitmq connection)
%% Cell type:code id: tags:
``` python
# Common configuration
with en.actions(roles=roles) as p:
p.apt(task_name="Installing python", name="python3")
p.apt(task_name="Installing procps", name="procps")
p.command("apt update")
p.apt(task_name="Installing pip", name="python3-pip")
p.pip(task_name="Installing pika", name="pika")
p.file(path="/tmp/rabbitmq", state="absent")
p.file(path="/tmp/rabbitmq", state="directory")
```
%% Cell type:markdown id: tags:
#### Server configuration
%% Cell type:markdown id: tags:
Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.
%% Cell type:code id: tags:
``` python
username_monitoring = "user"
password_monitoring = "password"
# SETUP
## Server configuration
with en.actions(roles=roles["server"]) as p:
# Setting the rabbimq server
p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
p.command("rabbitmq-plugins enable rabbitmq_management")
p.command("systemctl start rabbitmq-server")
p.command("systemctl enable rabbitmq-server")
# For the management interface, adding a new admin
p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")
# Allow all connections (no credentials needed for consumers and producers)
p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')
p.command("systemctl restart rabbitmq-server")
```
%% Cell type:markdown id: tags:
#### Producers' node configuration
%% Cell type:markdown id: tags:
The producers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["producer"]) as p:
p.copy(
src=HERE + "/producer.py",
dest="/tmp/rabbitmq/producer.py",
task_name="copying producer file",
)
```
%% Cell type:markdown id: tags:
#### Consumers' node configuration
%% Cell type:markdown id: tags:
The consumers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["consumer"]) as p:
p.copy(
src=HERE + "/consumer.py",
dest="/tmp/rabbitmq/consumer.py",
task_name="copying consumer file",
)
```
%% Cell type:markdown id: tags:
#### Utility functions
%% Cell type:markdown id: tags:
The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :
- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))
- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)
- to launch all producer(s) and consumer(s) by specifying the number of each
- to reset the experiment by going back to its initial state (clean + launch)
%% Cell type:code id: tags:
``` python
## Get server's IP address on the private network
from typing import List
import pandas as pd
import logging
from enoslib.log import DisableLogging
from enoslib.config import config_context
from IPython.display import clear_output
server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface`
## which itself has an `ip` attribute.
server_ip = ip_address_obj.ip.ip
def _get_queues_info() -> List:
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
results = en.run_command(
"rabbitmqctl list_queues -p '/' messages consumers | "
"awk 'NR>3 {printf \"%-15s %-15s\\n\", $1, $2}' ; "
"rabbitmqctl list_connections | grep guest | wc -l",
task_name="Gathering statistics from the queues",
roles=roles["server"],
gather_facts=False,
on_error_continue=True,
)
queues_info : List = []
for r in results:
if r.status == "FAILED" or r.rc != 0:
continue
lines = r.stdout.strip().split("\n")
for l in lines:
info = l.strip().split(" ")
queues_info.append([v for v in info if v!= ""])
return queues_info
def get_queues_info_for(duration: int) -> pd.DataFrame:
results = {}
results["Time"] = []
results["nb_received_messages"] = []
results["queue_depth"] = []
results["nb_consumers"] = []
results["nb_producers"] = []
for _ in range(duration):
time = str(datetime.now().strftime("%H:%M:%S"))
results["Time"].append(time)
queues_info = _get_queues_info()
try:
queue_depth = queues_info[0][0]
except IndexError as e:
queue_depth = 0
try:
nb_consumers = queues_info[0][1]
except IndexError as e:
nb_consumers = 0
try:
nb_recv_msg = queues_info[1][0]
except IndexError as e:
nb_recv_msg = 0
try:
nb_producers = int(queues_info[2][0]) - int(nb_consumers)
except IndexError as e:
nb_producers = 0
results["queue_depth"].append(int(queue_depth))
results["nb_consumers"].append(int(nb_consumers))
results["nb_producers"].append(int(nb_producers))
results["nb_received_messages"].append(int(nb_recv_msg))
clear_output(wait=False)
print(
f"Time : {time}\n"
f"nb_received_messages : {nb_recv_msg}\n"
f"queue_depth : {queue_depth}\n"
f"nb_consumers: {nb_consumers}\n"
f"nb_producers: {nb_producers}\n"
)
df = pd.DataFrame(data=results)
return df
def clean():
"""
Kill all previouses launched processes,
removes all previouses results,
purges the queue.
"""
cleaning_registry = en.ProcessRegistry()
cleaning_registry.build(
"producer|consumer",
roles["consumer"] + roles["producer"],
)
cleaning_registry.kill(signal.SIGKILL)
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
en.run_command(
"rabbitmqctl purge_queue fault_injection & "\
"rabbitmqctl purge_queue received_messages & ",
task_name="purging the queue",
roles=roles["server"],
on_error_continue=True,
gather_facts=False,
)
en.run_command(
"crontab -r",
task_name="purging crontab file",
roles=roles["consumer"] + roles["producer"],
on_error_continue=True,
gather_facts=False,
)
```
%% Cell type:markdown id: tags:
## General knowledge
%% Cell type:markdown id: tags:
### Events
An ```Event``` define a command that will run at a specific date (```datetime.datetime```) on a specific host (```Host```)
%% Cell type:markdown id: tags:
#### StartEvent
A ```StartEvent```, as its name suggests, will start a named process.
In addtion of the base class ```Event``` attributes, the ```command``` to launch the process and its ```name``` must be specified and are of type ```str```.
%% Cell type:code id: tags:
``` python
# Not supposed to return anything ! The only purpose of this code is to show how a StartEvent is created.
event_date = datetime.now()
event_host = en.Host("address")
event_cmd = "sleep infinity"
process_name = "name"
start_event = en.StartEvent(
date = event_date,
host = event_host,
cmd = event_cmd,
ns = process_name,
)
```
%% Cell type:markdown id: tags:
#### KillEvent
A ```KillEvent```, as its name suggests, will kill a named process.
In addtion of the base class ```Event``` attributes, only ```name``` of the process to kill must be specified, which is of type ```str```.
%% Cell type:code id: tags:
``` python
# Not supposed to return anything ! The only purpose of this code is to show how a KillEvent is created.
event_date = datetime.now()
event_host = en.Host("address")
process_name = "name"
kill_event = en.KillEvent(
date = event_date,
host = event_host,
ns = process_name,
)
```
%% Cell type:markdown id: tags:
### Planning
A ```Planning``` can be considered as an Event storage.
A ```PlanningService``` records all processes specified in a ```Planning```. It can be seen as an Event registry.
To define a ```PlanningService```, specifying a ```Planning``` is necessary.
%% Cell type:code id: tags:
``` python
planning = en.Planning()
planning.add_event(start_event)
planning.add_event(kill_event)
planning_service = en.PlanningService(planning)
```
%% Cell type:markdown id: tags:
## Examples
%% Cell type:markdown id: tags:
### A first deployment
We are here gonna schedule :
- the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds
- the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched
- the kill of 3 rabbitmq producers 4 minutes after the start of the experiment
- the kill of all the rabbitmq consumers 30 seconds after the producers are killed
%% Cell type:code id: tags:
``` python
clean()
```
%% Cell type:code id: tags:
``` python
planning = en.Planning()
n = 5
time_now = datetime.now()
for idx in range(n):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),
host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
ns = f"producer_{idx}",
)
)
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),
host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
ns = f"consumer_{idx}",
)
)
for idx in range(3):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 4),
host = roles["producer"][0],
ns = f"producer_{idx}",
)
)
for idx in range(n):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 4, seconds = 30),
host = roles["consumer"][0],
ns = f"consumer_{idx}",
)
)
planning_service = en.PlanningService(planning=planning, delay = timedelta(minutes=1))
planning_service.deploy()
```
%% Cell type:markdown id: tags:
We can have a first advice regarding the consistency of the planning.
%% Cell type:code id: tags:
``` python
# Will return an exception if anything seems to be wrong, else None.
planning.check()
```
%% Cell type:markdown id: tags:
We can have an up-to-date state of all the processes.
%% Cell type:code id: tags:
``` python
planning_service.status()
```
%% Cell type:markdown id: tags:
Let's observe the evolution of our events.
%% Cell type:code id: tags:
``` python
results = get_queues_info_for(150)
results
```
%% Cell type:code id: tags:
``` python
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
```
%% Cell type:markdown id: tags:
We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed.
%% Cell type:code id: tags:
``` python
planning_service.destroy()
```
%% Cell type:markdown id: tags:
### Overload example
%% Cell type:markdown id: tags:
Set-up an overload can be easily made using a service such as ```PlanningService```.
Let's see an example of it.
What we are gonna schedule is :
- The launch of 20 rabbitmq producers after 2 minutes
- The launch of 100 rabbitmq consumers after 3 minutes
- The kill of 75 rabbitmq consumers after 4 minutes
%% Cell type:code id: tags:
``` python
clean()
```
%% Cell type:code id: tags:
``` python
planning = en.Planning()
time_now = datetime.now()
for idx in range(20):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 4),
host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
ns = f"producer_{idx}",
)
)
for idx in range(100):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 5),
host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
ns = f"consumer_{idx}",
)
)
for idx in range(70):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 6),
host = roles["consumer"][0],
ns = f"consumer_{idx}",
)
)
planning_service = en.PlanningService(planning=planning)
planning.check()
planning_service.deploy()
```
%% Cell type:code id: tags:
``` python
results = get_queues_info_for(150)
```
%% Cell type:code id: tags:
``` python
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
```
%% Cell type:markdown id: tags:
## Cleaning
%% Cell type:code id: tags:
``` python
provider.destroy()
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment