Mentions légales du service

Skip to content
Snippets Groups Projects
Commit f86d6ba8 authored by TRABELSI Weissem's avatar TRABELSI Weissem
Browse files

docker tutorial

parent 448adde7
Branches
Tags
No related merge requests found
%% Cell type:markdown id: tags:
# Fault-injection on processes
---
- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html
- Instant chat: https://framateam.org/enoslib
- Source code: https://gitlab.inria.fr/discovery/enoslib
---
## Prerequisites
<div class="alert alert-block alert-warning">
Make sure you've run the one time setup for your environment
</div>
%% Cell type:markdown id: tags:
## Rabbitmq / Cron / Pgrep
[RabbitMQ](https://www.rabbitmq.com/) is an open-source message broker that enables different software applications to communicate and exchange data in a reliable and scalable manner. It follows the Advanced Message Queuing Protocol (AMQP) and provides a flexible messaging model based on the concept of queues.
For our experiment, we will deploy a publish / suscribe environment to demonstrate the impact of our api.
[Cron](https://man7.org/linux/man-pages/man8/cron.8.html) is a time-based job scheduler in Unix-like operating systems. It allows you to schedule and automate the execution of commands or scripts at specified intervals or specific times. Cron is commonly used for repetitive tasks, system maintenance, and scheduling periodic jobs.
All asynchronous tools shown here are based on cron. Because of that, for each event, the date is of the order of a minute.
[pgrep](https://man7.org/linux/man-pages/man1/pgrep.1.html) is a command-line utility in Unix-like operating systems that is used to search for and list the process IDs (PIDs) of running processes based on certain criteria. It allows you to find processes by their names, command lines, or other attributes.
Each process research is based on pgrep which consider all passed string as a regexp.
%% Cell type:markdown id: tags:
## Setup
%% Cell type:code id: tags:
``` python
import signal
import os
from datetime import datetime, timedelta
from pathlib import Path
import enoslib as en
en.init_logging()
en.check()
```
%% Cell type:markdown id: tags:
### Reservation
%% Cell type:code id: tags:
``` python
CLUSTER = "nova"
HERE = os.getcwd()
# claim the resources
conf = (
en.G5kConf.from_settings(
job_name="fault-injection tutorial",
job_type=[],
)
.add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
.add_machine(
roles=["producer"], cluster=CLUSTER, nodes=1
) # all the producers are running on the same machine
.add_machine(
roles=["consumer"], cluster=CLUSTER, nodes=1
) # all the consumers are running on the same machine
)
provider = en.G5k(conf)
roles, networks = provider.init()
# Fill in network information from nodes
roles = en.sync_info(roles, networks)
```
%% Cell type:code id: tags:
``` python
roles
```
%% Cell type:markdown id: tags:
### Rabbitmq configuration
%% Cell type:markdown id: tags:
#### Common node's configuration
%% Cell type:markdown id: tags:
Each node must have this minimal configuration :
- having python and pip
- having procps (to use pgrep)
- having pika (for the rabbitmq connection)
%% Cell type:code id: tags:
``` python
# Common configuration
with en.actions(roles=roles) as p:
p.apt(task_name="Installing python", name="python3")
p.apt(task_name="Installing procps", name="procps")
p.command("apt update")
p.apt(task_name="Installing pip", name="python3-pip")
p.pip(task_name="Installing pika", name="pika")
p.file(path="/tmp/rabbitmq", state="absent")
p.file(path="/tmp/rabbitmq", state="directory")
```
%% Cell type:markdown id: tags:
#### Server configuration
%% Cell type:markdown id: tags:
Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.
%% Cell type:code id: tags:
``` python
nproducer = 3
nconsumer = 3
username_monitoring = "user"
password_monitoring = "password"
username_prod = "prod"
username_cons = "cons"
password_prod = "pwd_prod"
password_cons = "pwd_cons"
# SETUP
## Server configuration
with en.actions(roles=roles["server"]) as p:
# Setting the rabbimq server
p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
p.command("rabbitmq-plugins enable rabbitmq_management")
p.command("systemctl start rabbitmq-server")
p.command("systemctl enable rabbitmq-server")
# For the management interface, adding a new admin
p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")
# For producers
for idx in range(nproducer):
# Add user's specifications (username + password)
p.command(f"rabbitmqctl add_user {username_prod}_{idx} {password_prod}")
# Allow users to connect to the default vhost ('/')
p.command(f"rabbitmqctl set_permissions {username_prod}_{idx} .* .* .* -p '/'")
# For consumers
for idx in range(nconsumer):
# Add user's specifications (username + password)
p.command(f"rabbitmqctl add_user {username_cons}_{idx} {password_cons}")
# Allow users to connect to the default vhost ('/')
p.command(f"rabbitmqctl set_permissions {username_cons}_{idx} .* .* .* -p '/'")
p.command("systemctl restart rabbitmq-server")
```
%% Cell type:markdown id: tags:
#### Producers' node configuration
%% Cell type:markdown id: tags:
The producers' node has to be configured such that it contains the script that is gonna be used.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["producer"]) as p:
p.copy(
src=HERE + "/producer.py",
dest="/tmp/rabbitmq/producer.py",
task_name="copying producer file",
)
```
%% Cell type:markdown id: tags:
#### Consumers' node configuration
%% Cell type:markdown id: tags:
The consumers' node has to be configured such that it contains the script that is gonna be used.
%% Cell type:code id: tags:
``` python
with en.actions(roles=roles["consumer"]) as p:
p.copy(
src=HERE + "/consumer.py",
dest="/tmp/rabbitmq/consumer.py",
task_name="copying consumer file",
)
```
%% Cell type:markdown id: tags:
#### Utility functions
%% Cell type:markdown id: tags:
The only purpose of these functions is to facilitate and to make more readable this experiment. Their objectives are :
- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s))
- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purge the rabbitmq queue)
- to launch all producer(s) and consumer(s)
- to reset the experiment by going back to its initial state (clean + launch)
%% Cell type:code id: tags:
``` python
## Get server's IP address on the private network
from ast import List
import pandas as pd
server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface`
## which itself has an `ip` attribute.
server_ip = ip_address_obj.ip.ip
def get_recv_msg(file: str) -> int:
"""
Shows the total number of processed messages.
"""
results = en.run_command(
f"wc -l {file}",
task_name="getting total number of received messages",
roles=roles["consumer"],
gather_facts=False,
on_error_continue=True,
)
totalnbmsg = 0
for r in results:
if r.status == "FAILED" or r.rc != 0:
print(f"Actual number of received message : 0")
continue
_lines = r.stdout.split("\n")
_total = _lines[-1].strip().split(" ") # last line contain the total number of line if multiple files, else
totalnbmsg += int(_total[0])
return totalnbmsg
def get_queue_size() -> List:
"""
Retreive the current rabbitmq queue size.
"""
results = en.run_command(
"rabbitmqctl list_queues -p '/' messages consumers | "
"awk 'NR>3 {printf \"%-15s %-15s\\n\", $1, $2}'",
task_name="getting number of messages waiting for processing",
roles=roles["server"],
gather_facts=False,
on_error_continue=True,
)
for r in results:
if r.status == "FAILED" or r.rc != 0:
print("Queue is empty")
continue
lines = r.stdout.strip().split("\n")
line = lines[0].strip().split(" ")
return [v for v in line if v!= ""]
def get_stats(duration: int) -> pd.DataFrame:
"""
Retreive general statistics using the rabbitmq management tool.
"""
results = {}
results["Time"] = []
results["nb_received_messages"] = []
results["queue_depth"] = []
results["nb_consumer"] = []
for _ in range(duration):
results["Time"].append(str(datetime.now()))
results["nb_received_messages"].append(get_recv_msg("/tmp/rabbitmq/*_output.txt"))
queue_depth, nb_consumer = get_queue_size()
results["queue_depth"].append(int(queue_depth))
results["nb_consumer"].append(int(nb_consumer))
df = pd.DataFrame(data=results)
return df
def clean():
"""
Kill all previouses launched processes,
removes all previouses results,
purges the queue.
"""
cleaning_registry = en.ProcessRegistry()
cleaning_registry.build(
"'python3 /tmp/rabbitmq/'",
roles["consumer"] + roles["producer"],
)
cleaning_registry.kill(signal.SIGKILL)
en.run_command(
"rm /tmp/rabbitmq/*_output.txt",
task_name="cleaning output files",
roles=roles["consumer"] + roles["producer"],
on_error_continue=True,
gather_facts=False,
)
en.run_command(
"rabbitmqctl purge_queue fault_injection",
task_name="purging the queue",
roles=roles["server"],
on_error_continue=True,
gather_facts=False,
)
def launch():
"""
Launch all consumers and producers.
"""
for idx in range(nconsumer):
en.run_command(
f"python3 /tmp/rabbitmq/consumer.py {idx} {server_ip}"
f" {username_cons} {password_cons}",
task_name=f"run consumer script number {idx}",
roles=roles["consumer"],
background=True,
gather_facts=False,
)
for idx in range(nproducer):
en.run_command(
f"python3 /tmp/rabbitmq/producer.py {idx} {server_ip}"
f" {username_prod} {password_prod}",
task_name=f"run producer script number {idx}",
roles=roles["producer"],
background=True,
gather_facts=False,
)
def reset():
"""
Return to the initial state of the experiment.
Can throw error (not to take into account !) if :
-> no previous launched processes
-> no previous output files
"""
print(
"\n ------------------------------------ ",
"\n| RESETING THE EXPERIMENT PARAMETERS |",
"\n ------------------------------------ ",
)
clean()
launch()
print(
"\n ------------------------------ ",
"\n| DONE - INITIAL STATE REACHED |",
"\n ------------------------------ ",
)
```
%% Cell type:markdown id: tags:
## General knowledge
A ```ProcessRegistry``` records all processes that follows a regexp on specific roles.
%% Cell type:code id: tags:
``` python
# does not work, it is just an example on how to build one
registry = en.ProcessRegistry()
registry.build("regexp", roles)
```
%% Cell type:markdown id: tags:
One ```Process``` is defined by its pid, its host and its command.
One ```Process``` is defined by its pid, its host and its command, a state (ALIVE or DEAD) is also attributed to each instance.
%% Cell type:code id: tags:
``` python
process = en.Process(1234, roles, "cmd")
host = en.Host("192.168.0.3", alias="one_alias", user="foo")
process = en.Process(1234, host, "cmd")
print(process)
```
%% Cell type:markdown id: tags:
For each case below, we will act on either the consumers, either the producers, never both. It can be the entire group or only a subset.
When acting on consumers, we will observe an increase of the queue depth, meaning that the messages are not processed as fast as they are produced.
When acting on consumers, we will observe no evolution regarding the number of processed messages.
When acting on producers, we will observe no evolution regarding the number of processed messages.
%% Cell type:markdown id: tags:
## First example : Synchronous case
%% Cell type:markdown id: tags:
Using this type of kill, the user must wait for the end before doing anything else.
We can specify the sended signal.
%% Cell type:markdown id: tags:
### Killing all consumers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"'python3 /tmp/rabbitmq/consumer.py' | grep -v bin",
roles["consumer"]
)
print(registry_on_consumers)
```
%% Cell type:code id: tags:
``` python
results_before_kill = get_stats(5)
registry_on_consumers.kill(signal.SIGKILL)
results_after_kill = get_stats(5)
```
%% Cell type:code id: tags:
``` python
results_before_kill
```
%% Output
Time nb_received_messages queue_depth nb_consumer
0 2023-07-13 16:36:14.166267 15 3 3
1 2023-07-13 16:36:17.374099 25 3 3
2 2023-07-13 16:36:20.649940 35 3 3
3 2023-07-13 16:36:23.882714 44 3 3
4 2023-07-13 16:36:27.090849 54 3 3
%% Cell type:code id: tags:
``` python
results_after_kill
```
%% Output
Time nb_received_messages queue_depth nb_consumer
0 2023-07-13 16:36:31.781609 64 13 0
1 2023-07-13 16:36:35.061763 64 22 0
2 2023-07-13 16:36:38.278555 64 32 0
3 2023-07-13 16:36:41.566426 64 42 0
4 2023-07-13 16:36:44.790659 64 52 0
%% Cell type:markdown id: tags:
### Killing all producers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"'python3 /tmp/rabbitmq/producer.py' | grep -v bin",
roles["producer"]
)
print(registry_on_producers)
```
%% Cell type:code id: tags:
``` python
results_before_kill = get_stats(5)
registry_on_producers.kill(signal.SIGKILL)
results_after_kill = get_stats(5)
```
%% Cell type:code id: tags:
``` python
results_before_kill
```
%% Cell type:code id: tags:
``` python
results_after_kill
```
%% Cell type:markdown id: tags:
## Second example : asynchronous case with delta
%% Cell type:markdown id: tags:
Here, each kill is scheduled, we have to specify the delay before they happen.
The delay is a ```datetime.timedelta()``` object.
%% Cell type:markdown id: tags:
### Killing all consumers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"'python3 /tmp/rabbitmq/consumer.py' | grep -v bin",
roles["consumer"],
)
print(registry_on_consumers)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.kill_async_after(
signum = signal.SIGKILL,
delta = timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(20)
results
```
%% Cell type:markdown id: tags:
### Killing all producers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"'python3 /tmp/rabbitmq/producer.py' | grep -v bin",
roles["producer"],
)
print(registry_on_producers)
```
%% Cell type:code id: tags:
``` python
registry_on_producers.kill_async_after(
signum = signal.SIGKILL,
delta = timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(20)
results
```
%% Cell type:markdown id: tags:
## Third example : Asynchronous case specifying a date
%% Cell type:markdown id: tags:
Here, each kill is scheduled, we have to specify the exact date at which the kill(s) happen.
The date is a ```datetime.datetime()``` object.
%% Cell type:markdown id: tags:
### Killing all consumers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"'python3 /tmp/rabbitmq/consumer.py' | grep -v bin",
roles["consumer"],
)
print(registry_on_consumers)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.kill_async_at(
signum = signal.SIGKILL,
date = datetime.now() + timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(20)
results
```
%% Cell type:markdown id: tags:
### Killing all producers
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"'python3 /tmp/rabbitmq/producer.py' | grep -v bin",
roles["producer"],
)
print(registry_on_producers)
```
%% Cell type:code id: tags:
``` python
registry_on_producers.kill_async_at(
signum = signal.SIGKILL,
date = datetime.now() + timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(20)
results
```
%% Cell type:markdown id: tags:
## Fourth example : Incremental case
%% Cell type:markdown id: tags:
Here, each kill is scheduled, we have to specify the number, the beginning and the interval between each one of them.
Here, each kill is scheduled and totally random, we have to specify how many processes we want to kill, the beginning (date of the first kill) and the interval between each of them.
The beginning is a ```datetime.datetime``` object.
The interval is a ```datetime.timedelta``` object.
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"'python3 /tmp/rabbitmq/consumer.py' | grep -v bin",
roles["consumer"],
)
print(registry_on_consumers)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.kill_async_incr(
signum = signal.SIGKILL,
number = 2,
beginning = datetime.now() + timedelta(minutes=1),
interval = timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(50)
results
```
%% Cell type:markdown id: tags:
We can have an updated version of the registry, with both dead and alive processes.
%% Cell type:code id: tags:
``` python
after_refresh = registry_on_consumers.refresh()
print(after_refresh)
```
%% Cell type:markdown id: tags:
We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state.
We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset()``` specificly implemented for this experiment !
%% Cell type:code id: tags:
``` python
registry_on_consumers.reset()
print(registry_on_consumers)
```
%% Cell type:markdown id: tags:
## Fifth example : Restart a registry
%% Cell type:markdown id: tags:
We can restart an entire registry, this means killing all of them (if alive) and starting them again.
It can be done either synchronously with '''registry.restart()''', either asynchronously in the same way as the kills previously shown.
It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the time delta before they happen) and its interval with the start(s) of the docker container(s) can be specified.
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_consumers = en.ProcessRegistry()
registry_on_consumers.build(
"'python3 /tmp/rabbitmq/consumer.py' | grep -v bin",
roles["consumer"],
)
print(registry_on_consumers)
```
%% Cell type:code id: tags:
``` python
registry_on_consumers.restart_async_after(
signum = signal.SIGKILL,
delta = timedelta(minutes=1),
interval = timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(40)
results
```
%% Cell type:code id: tags:
``` python
reset()
```
%% Cell type:code id: tags:
``` python
registry_on_producers = en.ProcessRegistry()
registry_on_producers.build(
"'python3 /tmp/rabbitmq/producer.py' | grep -v bin",
roles["producer"],
)
print(registry_on_producers)
```
%% Cell type:code id: tags:
``` python
registry_on_producers.restart_async_at(
signum = signal.SIGKILL,
date = datetime.now() + timedelta(minutes=1),
interval = timedelta(minutes=1),
)
# each iteration last for ~4 sec (2 requests + 1sec sleep)
results = get_stats(40)
results
```
%% Cell type:markdown id: tags:
## Cleaning
%% Cell type:code id: tags:
``` python
clean()
provider.destroy()
```
......
This diff is collapsed.
FROM python:3.11
WORKDIR /tmp/rabbitmq
COPY consumer.py .
RUN pip install pika
ENTRYPOINT ["python", "consumer.py"]
\ No newline at end of file
FROM python:3.11
WORKDIR /tmp/rabbitmq
COPY producer.py .
RUN pip install pika
ENTRYPOINT ["python", "producer.py"]
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment