Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 369a9d3d authored by TRABELSI Weissem's avatar TRABELSI Weissem
Browse files

small changes

parent 78dee0a3
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Schedule events using a Planning # Schedule events using a Planning
--- ---
- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html - Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html
- Instant chat: https://framateam.org/enoslib - Instant chat: https://framateam.org/enoslib
- Source code: https://gitlab.inria.fr/discovery/enoslib - Source code: https://gitlab.inria.fr/discovery/enoslib
--- ---
## Prerequisites ## Prerequisites
<div class="alert alert-block alert-warning"> <div class="alert alert-block alert-warning">
Make sure you've run the one time setup for your environment Make sure you've run the one time setup for your environment
</div> </div>
<div class="alert alert-block alert-warning"> <div class="alert alert-block alert-warning">
Make sure you've done the tutorial : 07_fault_injection_on_processes Make sure you've done the tutorial : 07_fault_injection_on_processes
</div> </div>
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Setup ## Setup
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import signal import signal
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
import enoslib as en import enoslib as en
en.init_logging() en.init_logging()
en.check() en.check()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Reservation ### Reservation
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
CLUSTER = "nova" CLUSTER = "nova"
HERE = os.getcwd() HERE = os.getcwd()
# claim the resources # claim the resources
conf = ( conf = (
en.G5kConf.from_settings( en.G5kConf.from_settings(
job_name="fault-injection tutorial", job_name="fault-injection tutorial",
job_type=[], job_type=[],
) )
.add_machine(roles=["server"], cluster=CLUSTER, nodes=1) .add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
.add_machine( .add_machine(
roles=["producer"], cluster=CLUSTER, nodes=1 roles=["producer"], cluster=CLUSTER, nodes=1
) # all the producers are running on the same machine ) # all the producers are running on the same machine
.add_machine( .add_machine(
roles=["consumer"], cluster=CLUSTER, nodes=1 roles=["consumer"], cluster=CLUSTER, nodes=1
) # all the consumers are running on the same machine ) # all the consumers are running on the same machine
) )
provider = en.G5k(conf) provider = en.G5k(conf)
roles, networks = provider.init() roles, networks = provider.init()
# Fill in network information from nodes # Fill in network information from nodes
roles = en.sync_info(roles, networks) roles = en.sync_info(roles, networks)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
roles roles
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Rabbitmq configuration ### Rabbitmq configuration
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### Common node's configuration #### Common node's configuration
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Each node must have this minimal configuration : Each node must have this minimal configuration :
- having python and pip - having python and pip
- having procps (to use kill) - having procps (to use kill)
- having pika (for the rabbitmq connection) - having pika (for the rabbitmq connection)
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Common configuration # Common configuration
with en.actions(roles=roles) as p: with en.actions(roles=roles) as p:
p.apt(task_name="Installing python", name="python3") p.apt(task_name="Installing python", name="python3")
p.apt(task_name="Installing procps", name="procps") p.apt(task_name="Installing procps", name="procps")
p.command("apt update") p.command("apt update")
p.apt(task_name="Installing pip", name="python3-pip") p.apt(task_name="Installing pip", name="python3-pip")
p.pip(task_name="Installing pika", name="pika") p.pip(task_name="Installing pika", name="pika")
p.file(path="/tmp/rabbitmq", state="absent") p.file(path="/tmp/rabbitmq", state="absent")
p.file(path="/tmp/rabbitmq", state="directory") p.file(path="/tmp/rabbitmq", state="directory")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### Server configuration #### Server configuration
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration. Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
username_monitoring = "user" username_monitoring = "user"
password_monitoring = "password" password_monitoring = "password"
# SETUP # SETUP
## Server configuration ## Server configuration
with en.actions(roles=roles["server"]) as p: with en.actions(roles=roles["server"]) as p:
# Setting the rabbimq server # Setting the rabbimq server
p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server") p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
p.command("rabbitmq-plugins enable rabbitmq_management") p.command("rabbitmq-plugins enable rabbitmq_management")
p.command("systemctl start rabbitmq-server") p.command("systemctl start rabbitmq-server")
p.command("systemctl enable rabbitmq-server") p.command("systemctl enable rabbitmq-server")
# For the management interface, adding a new admin # For the management interface, adding a new admin
p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}") p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator") p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'") p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")
# Allow all connections (no credentials needed for consumers and producers) # Allow all connections (no credentials needed for consumers and producers)
p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf') p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')
p.command("systemctl restart rabbitmq-server") p.command("systemctl restart rabbitmq-server")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### Producers' node configuration #### Producers' node configuration
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
The producers' node has to be configured such that it contains its specific script. The producers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
with en.actions(roles=roles["producer"]) as p: with en.actions(roles=roles["producer"]) as p:
p.copy( p.copy(
src=HERE + "/producer.py", src=HERE + "/producer.py",
dest="/tmp/rabbitmq/producer.py", dest="/tmp/rabbitmq/producer.py",
task_name="copying producer file", task_name="copying producer file",
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### Consumers' node configuration #### Consumers' node configuration
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
The consumers' node has to be configured such that it contains its specific script. The consumers' node has to be configured such that it contains its specific script.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
with en.actions(roles=roles["consumer"]) as p: with en.actions(roles=roles["consumer"]) as p:
p.copy( p.copy(
src=HERE + "/consumer.py", src=HERE + "/consumer.py",
dest="/tmp/rabbitmq/consumer.py", dest="/tmp/rabbitmq/consumer.py",
task_name="copying consumer file", task_name="copying consumer file",
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### Utility functions #### Utility functions
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are : The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :
- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s)) - to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))
- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues) - to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)
- to launch all producer(s) and consumer(s) by specifying the number of each - to launch all producer(s) and consumer(s) by specifying the number of each
- to reset the experiment by going back to its initial state (clean + launch) - to reset the experiment by going back to its initial state (clean + launch)
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
## Get server's IP address on the private network ## Get server's IP address on the private network
from typing import List from typing import List
import pandas as pd import pandas as pd
import logging import logging
from enoslib.log import DisableLogging from enoslib.log import DisableLogging
from enoslib.config import config_context from enoslib.config import config_context
from IPython.display import clear_output from IPython.display import clear_output
server = roles["server"][0] server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0] ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface` ## This may seem weird: ip_address_obj.ip is a `netaddr.IPv4Interface`
## which itself has an `ip` attribute. ## which itself has an `ip` attribute.
server_ip = ip_address_obj.ip.ip server_ip = ip_address_obj.ip.ip
def _get_queues_info() -> List: def _get_queues_info() -> List:
with DisableLogging(level=logging.ERROR): with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"): with config_context(ansible_stdout="noop"):
results = en.run_command( results = en.run_command(
"rabbitmqctl list_queues -p '/' messages consumers | " "rabbitmqctl list_queues -p '/' messages consumers | "
"awk 'NR>3 {printf \"%-15s %-15s\\n\", $1, $2}' ; " "awk 'NR>3 {printf \"%-15s %-15s\\n\", $1, $2}' ; "
"rabbitmqctl list_connections | grep guest | wc -l", "rabbitmqctl list_connections | grep guest | wc -l",
task_name="Gathering statistics from the queues", task_name="Gathering statistics from the queues",
roles=roles["server"], roles=roles["server"],
gather_facts=False, gather_facts=False,
on_error_continue=True, on_error_continue=True,
) )
queues_info : List = [] queues_info : List = []
for r in results: for r in results:
if r.status == "FAILED" or r.rc != 0: if r.status == "FAILED" or r.rc != 0:
continue continue
lines = r.stdout.strip().split("\n") lines = r.stdout.strip().split("\n")
for l in lines: for l in lines:
info = l.strip().split(" ") info = l.strip().split(" ")
queues_info.append([v for v in info if v!= ""]) queues_info.append([v for v in info if v!= ""])
return queues_info return queues_info
def get_queues_info_for(duration: int) -> pd.DataFrame: def get_queues_info_for(duration: int) -> pd.DataFrame:
results = {} results = {}
results["Time"] = [] results["Time"] = []
results["nb_received_messages"] = [] results["nb_received_messages"] = []
results["queue_depth"] = [] results["queue_depth"] = []
results["nb_consumers"] = [] results["nb_consumers"] = []
results["nb_producers"] = [] results["nb_producers"] = []
for _ in range(duration): for _ in range(duration):
time = str(datetime.now().strftime("%H:%M:%S")) time = str(datetime.now().strftime("%H:%M:%S"))
results["Time"].append(time) results["Time"].append(time)
queues_info = _get_queues_info() queues_info = _get_queues_info()
try: try:
queue_depth = queues_info[0][0] queue_depth = queues_info[0][0]
except IndexError as e: except IndexError as e:
queue_depth = 0 queue_depth = 0
try: try:
nb_consumers = queues_info[0][1] nb_consumers = queues_info[0][1]
except IndexError as e: except IndexError as e:
nb_consumers = 0 nb_consumers = 0
try: try:
nb_recv_msg = queues_info[1][0] nb_recv_msg = queues_info[1][0]
except IndexError as e: except IndexError as e:
nb_recv_msg = 0 nb_recv_msg = 0
try: try:
nb_producers = int(queues_info[2][0]) - int(nb_consumers) nb_producers = int(queues_info[2][0]) - int(nb_consumers)
except IndexError as e: except IndexError as e:
nb_producers = 0 nb_producers = 0
results["queue_depth"].append(int(queue_depth)) results["queue_depth"].append(int(queue_depth))
results["nb_consumers"].append(int(nb_consumers)) results["nb_consumers"].append(int(nb_consumers))
results["nb_producers"].append(int(nb_producers)) results["nb_producers"].append(int(nb_producers))
results["nb_received_messages"].append(int(nb_recv_msg)) results["nb_received_messages"].append(int(nb_recv_msg))
clear_output(wait=False) clear_output(wait=False)
print( print(
f"Time : {time}\n" f"Time : {time}\n"
f"nb_received_messages : {nb_recv_msg}\n" f"nb_received_messages : {nb_recv_msg}\n"
f"queue_depth : {queue_depth}\n" f"queue_depth : {queue_depth}\n"
f"nb_consumers: {nb_consumers}\n" f"nb_consumers: {nb_consumers}\n"
f"nb_producers: {nb_producers}\n" f"nb_producers: {nb_producers}\n"
) )
df = pd.DataFrame(data=results) df = pd.DataFrame(data=results)
return df return df
def clean(): def clean():
""" """
Kill all previouses launched processes, Kill all previouses launched processes,
removes all previouses results, removes all previouses results,
purges the queue. purges the queue.
""" """
cleaning_registry = en.ProcessRegistry() cleaning_registry = en.ProcessRegistry()
cleaning_registry.build( cleaning_registry.build(
"producer|consumer", "producer|consumer",
roles["consumer"] + roles["producer"], roles["consumer"] + roles["producer"],
) )
cleaning_registry.kill(signal.SIGKILL) cleaning_registry.kill(signal.SIGKILL)
with DisableLogging(level=logging.ERROR): with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"): with config_context(ansible_stdout="noop"):
en.run_command( en.run_command(
"rabbitmqctl purge_queue fault_injection & "\ "rabbitmqctl purge_queue fault_injection & "\
"rabbitmqctl purge_queue received_messages & ", "rabbitmqctl purge_queue received_messages & ",
task_name="purging the queue", task_name="purging the queue",
roles=roles["server"], roles=roles["server"],
on_error_continue=True, on_error_continue=True,
gather_facts=False, gather_facts=False,
) )
en.run_command( en.run_command(
"crontab -r", "crontab -r",
task_name="purging crontab file", task_name="purging crontab file",
roles=roles["consumer"] + roles["producer"], roles=roles["consumer"] + roles["producer"],
on_error_continue=True, on_error_continue=True,
gather_facts=False, gather_facts=False,
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## General knowledge ## General knowledge
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Events ### Events
An ```Event``` define a command that will run at a specific date (```datetime.datetime```) on a specific host (```Host```) An ```Event``` define a command that will run at a specific date (```datetime.datetime```) on a specific host (```Host```).
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### StartEvent #### StartEvent
A ```StartEvent```, as its name suggests, will start a named process. A ```StartEvent```, as its name suggests, will start a named process.
In addtion of the base class ```Event``` attributes, the ```command``` to launch the process and its ```name``` must be specified and are of type ```str```. In addtion of the base class ```Event``` attributes, the ```command``` to launch the process and its ```name``` must be specified and are of type ```str```.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Not supposed to return anything ! The only purpose of this code is to show how a StartEvent is created. # Not supposed to return anything ! The only purpose of this code is to show how a StartEvent is created.
event_date = datetime.now() event_date = datetime.now()
event_host = en.Host("address") event_host = en.Host("address")
event_cmd = "sleep infinity" event_cmd = "sleep infinity"
process_name = "name" process_name = "name"
start_event = en.StartEvent( start_event = en.StartEvent(
date = event_date, date = event_date,
host = event_host, host = event_host,
cmd = event_cmd, cmd = event_cmd,
ns = process_name, ns = process_name,
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
#### KillEvent #### KillEvent
A ```KillEvent```, as its name suggests, will kill a named process. A ```KillEvent```, as its name suggests, will kill a previously started named process.
In addtion of the base class ```Event``` attributes, only ```name``` of the process to kill must be specified, which is of type ```str```. In addition of the base class ```Event``` attributes, only the ```name``` of the process to kill must be specified, which is of type ```str```.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Not supposed to return anything ! The only purpose of this code is to show how a KillEvent is created. # Not supposed to return anything ! The only purpose of this code is to show how a KillEvent is created.
event_date = datetime.now() event_date = datetime.now()
event_host = en.Host("address") event_host = en.Host("address")
process_name = "name" process_name = "name"
kill_event = en.KillEvent( kill_event = en.KillEvent(
date = event_date, date = event_date,
host = event_host, host = event_host,
ns = process_name, ns = process_name,
) )
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Planning ### Planning
A ```Planning``` can be considered as an Event storage. A ```Planning``` can be considered as an Event storage.
A ```PlanningService``` records all processes specified in a ```Planning```. It can be seen as an Event registry. A ```PlanningService``` records all processes specified in a ```Planning```. It can be seen as an Event registry.
To define a ```PlanningService```, specifying a ```Planning``` is necessary. To define a ```PlanningService```, specifying a ```Planning``` is necessary.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
planning = en.Planning() planning = en.Planning()
planning.add_event(start_event) planning.add_event(start_event)
planning.add_event(kill_event) planning.add_event(kill_event)
planning_service = en.PlanningService(planning) planning_service = en.PlanningService(planning)
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Examples ## Examples
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### A first deployment ### A first deployment
We are here gonna schedule : We are here gonna schedule :
- the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds - the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds
- the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched - the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched
- the kill of 3 rabbitmq producers 4 minutes after the start of the experiment - the kill of 3 rabbitmq producers 4 minutes after the start of the experiment
- the kill of all the rabbitmq consumers 30 seconds after the producers are killed - the kill of all the rabbitmq consumers 30 seconds after the producers are killed
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
clean() clean()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
planning = en.Planning() planning = en.Planning()
n = 5 n = 5
time_now = datetime.now() time_now = datetime.now()
for idx in range(n): for idx in range(n):
planning.add_event( planning.add_event(
en.StartEvent( en.StartEvent(
date = time_now + timedelta(minutes = 2, seconds = (idx * 10)), date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),
host = roles["producer"][0], host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}", cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
ns = f"producer_{idx}", ns = f"producer_{idx}",
) )
) )
planning.add_event( planning.add_event(
en.StartEvent( en.StartEvent(
date = time_now + timedelta(minutes = 3, seconds = (idx * 10)), date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),
host = roles["consumer"][0], host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}", cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
ns = f"consumer_{idx}", ns = f"consumer_{idx}",
) )
) )
for idx in range(3): for idx in range(3):
planning.add_event( planning.add_event(
en.KillEvent( en.KillEvent(
date = time_now + timedelta(minutes = 4), date = time_now + timedelta(minutes = 4),
host = roles["producer"][0], host = roles["producer"][0],
ns = f"producer_{idx}", ns = f"producer_{idx}",
) )
) )
for idx in range(n): for idx in range(n):
planning.add_event( planning.add_event(
en.KillEvent( en.KillEvent(
date = time_now + timedelta(minutes = 4, seconds = 30), date = time_now + timedelta(minutes = 4, seconds = 30),
host = roles["consumer"][0], host = roles["consumer"][0],
ns = f"consumer_{idx}", ns = f"consumer_{idx}",
) )
) )
planning_service = en.PlanningService(planning=planning, delay = timedelta(minutes=1)) planning_service = en.PlanningService(planning=planning, delay = timedelta(minutes=1))
planning_service.deploy() planning_service.deploy()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
We can have a first advice regarding the consistency of the planning. We can have a first advice regarding the consistency of the planning.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Will return an exception if anything seems to be wrong, else None. # Will return an exception if anything seems to be wrong, else None.
planning.check() planning.check()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
We can have an up-to-date state of all the processes. We can have an up-to-date state of all the processes.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
planning_service.status() planning_service.status()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Let's observe the evolution of our events. Let's observe the evolution of our events.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
results = get_queues_info_for(150) results = get_queues_info_for(150)
results results
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45) results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45) results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed. We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed.
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
planning_service.destroy() planning_service.destroy()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Overload example ### Overload example
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
Set-up an overload can be easily made using a service such as ```PlanningService```. Set-up an overload can be easily made using a service such as ```PlanningService```.
Let's see an example of it. Let's see an example of it.
What we are gonna schedule is : What we are gonna schedule is :
- The launch of 20 rabbitmq producers after 4 minutes - The launch of 20 rabbitmq producers after 4 minutes
- The launch of 100 rabbitmq consumers after 5 minutes - The launch of 100 rabbitmq consumers after 5 minutes
- The kill of 70 rabbitmq consumers after 6 minutes - The kill of 70 rabbitmq consumers after 6 minutes
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
clean() clean()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
planning = en.Planning() planning = en.Planning()
time_now = datetime.now() time_now = datetime.now()
for idx in range(20): for idx in range(20):
planning.add_event( planning.add_event(
en.StartEvent( en.StartEvent(
date = time_now + timedelta(minutes = 4), date = time_now + timedelta(minutes = 4),
host = roles["producer"][0], host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}", cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
ns = f"producer_{idx}", ns = f"producer_{idx}",
) )
) )
for idx in range(100): for idx in range(100):
planning.add_event( planning.add_event(
en.StartEvent( en.StartEvent(
date = time_now + timedelta(minutes = 5), date = time_now + timedelta(minutes = 5),
host = roles["consumer"][0], host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}", cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
ns = f"consumer_{idx}", ns = f"consumer_{idx}",
) )
) )
for idx in range(70): for idx in range(70):
planning.add_event( planning.add_event(
en.KillEvent( en.KillEvent(
date = time_now + timedelta(minutes = 6), date = time_now + timedelta(minutes = 6),
host = roles["consumer"][0], host = roles["consumer"][0],
ns = f"consumer_{idx}", ns = f"consumer_{idx}",
) )
) )
planning_service = en.PlanningService(planning=planning) planning_service = en.PlanningService(planning=planning)
planning.check() planning.check()
planning_service.deploy() planning_service.deploy()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
results = get_queues_info_for(150) results = get_queues_info_for(150)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45) results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45) results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Cleaning ## Cleaning
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
provider.destroy() provider.destroy()
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment