diff --git a/g5k/07_fault_injection_on_processes.ipynb b/g5k/07_fault_injection_on_processes.ipynb index 9b182c30333c15853ded772fb42d2b47d97c9c1b..b72cd67158f6001e5084ed781c912cf145a071af 100644 --- a/g5k/07_fault_injection_on_processes.ipynb +++ b/g5k/07_fault_injection_on_processes.ipynb @@ -36,7 +36,7 @@ "\n", "[Cron](https://man7.org/linux/man-pages/man8/cron.8.html) is a time-based job scheduler in Unix-like operating systems. It allows you to schedule and automate the execution of commands or scripts at specified intervals or specific times. Cron is commonly used for repetitive tasks, system maintenance, and scheduling periodic jobs.\n", "\n", - "All asynchronous tools shown here are based on cron. Because of that, for each event, the date is of the order of a minute and please take not that if a cron job is setup at HH:MM:50 for the minute after, it will be executed at HH:MM+1:00 (only 10 seconds after).\n", + "All asynchronous tools shown here are based on cron. Because of that, for each event, the date is of the order of a minute and please take note that if a cron job is setup at HH:MM:50 for the minute after, it will be executed at HH:MM+1:00 (only 10 seconds after).\n", "\n", "\n", "[pgrep](https://man7.org/linux/man-pages/man1/pgrep.1.html) is a command-line utility in Unix-like operating systems that is used to search for and list the process IDs (PIDs) of running processes based on certain criteria. It allows you to find processes by their names, command lines, or other attributes.\n", @@ -240,7 +240,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The producers' node has to be configured such that it contains the script that is gonna be used." + "The producers' node has to be configured such that it contains its specific script." ] }, { @@ -270,7 +270,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The consumers' node has to be configured such that it contains the script that is gonna be used." + "The consumers' node has to be configured such that it contains its specific script." ] }, { @@ -300,9 +300,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The only purpose of these functions is to facilitate and to make more readable this experiment. Their objectives are :\n", + "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s))\n", - "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purge the rabbitmq queue)\n", + "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)\n", "- to launch all producer(s) and consumer(s)\n", "- to reset the experiment by going back to its initial state (clean + launch) " ] @@ -324,52 +324,28 @@ "## which itself has an `ip` attribute.\n", "server_ip = ip_address_obj.ip.ip\n", "\n", - "def get_recv_msg(file: str) -> int:\n", - " \"\"\"\n", - " Shows the total number of processed messages.\n", - " \"\"\"\n", - " results = en.run_command(\n", - " f\"wc -l {file}\",\n", - " task_name=\"getting total number of received messages\",\n", - " roles=roles[\"consumer\"],\n", - " gather_facts=False,\n", - " on_error_continue=True,\n", - " )\n", - " totalnbmsg = 0\n", - " for r in results:\n", - " if r.status == \"FAILED\" or r.rc != 0:\n", - " print(f\"Actual number of received message : 0\")\n", - " continue\n", - " _lines = r.stdout.split(\"\\n\")\n", - " _total = _lines[-1].strip().split(\" \") # last line contain the total number of line if multiple files, else\n", - " totalnbmsg += int(_total[0])\n", - "\n", - " return totalnbmsg\n", - "\n", - "def get_queue_size() -> List:\n", - " \"\"\"\n", - " Retreive the current rabbitmq queue size.\n", - " \"\"\"\n", + "def _get_queues_info() -> List:\n", " results = en.run_command(\n", " \"rabbitmqctl list_queues -p '/' messages consumers | \"\n", " \"awk 'NR>3 {printf \\\"%-15s %-15s\\\\n\\\", $1, $2}'\",\n", - " task_name=\"getting number of messages waiting for processing\",\n", + " task_name=\"Gathering statistics from the queues\",\n", " roles=roles[\"server\"],\n", " gather_facts=False,\n", " on_error_continue=True,\n", " )\n", + " queues_info : List = []\n", " for r in results:\n", " if r.status == \"FAILED\" or r.rc != 0:\n", - " print(\"Queue is empty\")\n", + " queues_info.append([-1, -1])\n", " continue\n", " lines = r.stdout.strip().split(\"\\n\")\n", - " line = lines[0].strip().split(\" \")\n", - " return [v for v in line if v!= \"\"]\n", + " for l in lines:\n", + " info = l.strip().split(\" \")\n", + " queues_info.append([v for v in info if v!= \"\"])\n", "\n", - "def get_stats(duration: int) -> pd.DataFrame:\n", - " \"\"\"\n", - " Retreive general statistics using the rabbitmq management tool.\n", - " \"\"\"\n", + " return queues_info\n", + "\n", + "def get_queues_info_for(duration: int) -> pd.DataFrame:\n", " results = {}\n", " results[\"Time\"] = []\n", " results[\"nb_received_messages\"] = []\n", @@ -377,12 +353,14 @@ " results[\"nb_consumer\"] = []\n", " for _ in range(duration):\n", " results[\"Time\"].append(str(datetime.now()))\n", - " \n", - " results[\"nb_received_messages\"].append(get_recv_msg(\"/tmp/rabbitmq/*_output.txt\"))\n", - "\n", - " queue_depth, nb_consumer = get_queue_size()\n", + " \n", + " queues_info = _get_queues_info()\n", + " queue_depth = queues_info[0][0]\n", + " nb_consumer = queues_info[0][1]\n", + " nb_recv_msg = queues_info[1][0]\n", " results[\"queue_depth\"].append(int(queue_depth))\n", " results[\"nb_consumer\"].append(int(nb_consumer))\n", + " results[\"nb_received_messages\"].append(int(nb_recv_msg))\n", "\n", "\n", " df = pd.DataFrame(data=results)\n", @@ -403,15 +381,8 @@ " cleaning_registry.kill(signal.SIGKILL)\n", "\n", " en.run_command(\n", - " \"rm /tmp/rabbitmq/*_output.txt\",\n", - " task_name=\"cleaning output files\",\n", - " roles=roles[\"consumer\"] + roles[\"producer\"],\n", - " on_error_continue=True,\n", - " gather_facts=False,\n", - " )\n", - "\n", - " en.run_command(\n", - " \"rabbitmqctl purge_queue fault_injection\",\n", + " \"rabbitmqctl purge_queue fault_injection & \"\\\n", + " \"rabbitmqctl purge_queue received_messages & \",\n", " task_name=\"purging the queue\",\n", " roles=roles[\"server\"],\n", " on_error_continue=True,\n", @@ -475,7 +446,7 @@ "source": [ "## General knowledge\n", "\n", - "A ```ProcessRegistry``` records all processes that follows a regexp on specific roles. " + "A ```ProcessRegistry``` records all processes that follows a given regexp on specific roles. " ] }, { @@ -513,7 +484,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "For each case below, we will act on either the consumers, either the producers, never both. It can be the entire group or only a subset.\n", + "For each case below, we will act on either the consumers or the producers, never both. It can be the entire group or only a subset.\n", "\n", "When acting on consumers, we will observe an increase of the queue depth, meaning that the messages are not processed as fast as they are produced.\n", "\n", @@ -576,9 +547,9 @@ "metadata": {}, "outputs": [], "source": [ - "results_before_kill = get_stats(5)\n", + "results_before_kill = get_queues_info_for(5)\n", "registry_on_consumers.kill(signal.SIGKILL)\n", - "results_after_kill = get_stats(5)" + "results_after_kill = get_queues_info_for(5)" ] }, { @@ -636,9 +607,9 @@ "metadata": {}, "outputs": [], "source": [ - "results_before_kill = get_stats(5)\n", + "results_before_kill = get_queues_info_for(5)\n", "registry_on_producers.kill(signal.SIGKILL)\n", - "results_after_kill = get_stats(5)" + "results_after_kill = get_queues_info_for(5)" ] }, { @@ -720,7 +691,7 @@ " delta = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -766,7 +737,7 @@ " delta = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -831,7 +802,7 @@ " date = datetime.now() + timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -877,7 +848,7 @@ " date = datetime.now() + timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -894,7 +865,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Here, each kill is scheduled and totally random, we have to specify how many processes we want to kill, the beginning (date of the first kill) and the interval between each of them.\n", + "Here, each kill is scheduled and the choice of the killed process is done totally randomly among those registered, we have to specify how many processes we want to kill, the beginning (date of the first kill) and the interval between each of them.\n", "\n", "The beginning is a ```datetime.datetime``` object.\n", "\n", @@ -937,7 +908,7 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(50)\n", + "results = get_queues_info_for(50)\n", "results" ] }, @@ -995,6 +966,14 @@ "It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the time delta before they happen) and its interval with the start(s) of the docker container(s) can be specified." ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Restarting all consumers" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1030,10 +1009,18 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(40)\n", + "results = get_queues_info_for(45)\n", "results" ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Restarting all producers" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1069,7 +1056,7 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(40)\n", + "results = get_queues_info_for(45)\n", "results" ] }, diff --git a/g5k/08_fault_injection_on_docker_containers.ipynb b/g5k/08_fault_injection_on_docker_containers.ipynb index df48298b8bf7fcbdac9907acca5833b0f61f9fc4..5e98ea7f230e0844f5fd7d1d9a55c4c8ef79d6a8 100644 --- a/g5k/08_fault_injection_on_docker_containers.ipynb +++ b/g5k/08_fault_injection_on_docker_containers.ipynb @@ -33,12 +33,12 @@ "\n", "[RabbitMQ](https://www.rabbitmq.com/) is an open-source message broker that enables different software applications to communicate and exchange data in a reliable and scalable manner. It follows the Advanced Message Queuing Protocol (AMQP) and provides a flexible messaging model based on the concept of queues.\n", "\n", - "For our experiment, we will deploy a publish / suscribe environment to demonstrate the impact of our api.\n", + "For our experiment, we will deploy a publish / suscribe environment to demonstrate the impact of our api. Each Docker container will run/host one producer or consumer.\n", "\n", "\n", "[Cron](https://man7.org/linux/man-pages/man8/cron.8.html) is a time-based job scheduler in Unix-like operating systems. It allows you to schedule and automate the execution of commands or scripts at specified intervals or specific times. Cron is commonly used for repetitive tasks, system maintenance, and scheduling periodic jobs.\n", "\n", - "All asynchronous tools shown here are based on cron. Because of that, for each event, the date is of the order of a minute and please take not that if a cron job is setup at HH:MM:50 for the minute after, it will be executed at HH:MM+1:00 (only 10 seconds after)." + "All asynchronous tools shown here are based on cron. Because of that, for each event, the date is of the order of a minute and please take note that if a cron job is setup at HH:MM:50 for the minute after, it will be executed at HH:MM+1:00 (only 10 seconds after)." ] }, { @@ -237,7 +237,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The producers' node has to be configured such that it contains the script and the Dockerfile (used to build the Docker image)." + "The producers' node has to be configured such that it contains its specific script and Dockerfile (used to build the Docker image)." ] }, { @@ -273,7 +273,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The consumers' node has to be configured such that it contains the script and the Dockerfile (used to build the Docker image)." + "The consumers' node has to be configured such that it contains its specific script and Dockerfile (used to build the Docker image)." ] }, { @@ -310,9 +310,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The only purpose of these functions is to facilitate and to make more readable this experiment. Their objectives are :\n", + "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s))\n", - "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purge the rabbitmq queue)\n", + "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)\n", "- to launch all producer(s) and consumer(s)\n", "- to reset the experiment by going back to its initial state (clean + launch) " ] @@ -334,49 +334,28 @@ "## which itself has an `ip` attribute.\n", "server_ip = ip_address_obj.ip.ip\n", "\n", - "def get_recv_msg(file: str) -> int:\n", - " \"\"\"\n", - " Shows the total number of processed messages.\n", - " \"\"\"\n", - " results = en.run_command(\n", - " f\"wc -l {file}\",\n", - " task_name=\"getting total number of received messages\",\n", - " roles=roles[\"consumer\"],\n", - " gather_facts=False,\n", - " on_error_continue=True,\n", - " )\n", - " totalnbmsg = 0\n", - " for r in results:\n", - " if r.status == \"FAILED\" or r.rc != 0:\n", - " print(f\"Actual number of received message : 0\")\n", - " continue\n", - " _lines = r.stdout.split(\"\\n\")\n", - " _total = _lines[-1].strip().split(\" \") # last line contain the total number of line if multiple files, else\n", - " totalnbmsg += int(_total[0])\n", - "\n", - " return totalnbmsg\n", - "\n", - "def get_queue_size() -> List:\n", + "def _get_queues_info() -> List:\n", " results = en.run_command(\n", " \"rabbitmqctl list_queues -p '/' messages consumers | \"\n", " \"awk 'NR>3 {printf \\\"%-15s %-15s\\\\n\\\", $1, $2}'\",\n", - " task_name=\"getting number of messages waiting for processing\",\n", + " task_name=\"Gathering statistics from the queues\",\n", " roles=roles[\"server\"],\n", " gather_facts=False,\n", " on_error_continue=True,\n", " )\n", + " queues_info : List = []\n", " for r in results:\n", " if r.status == \"FAILED\" or r.rc != 0:\n", - " print(\"Queue is empty\")\n", + " queues_info.append([-1, -1])\n", " continue\n", " lines = r.stdout.strip().split(\"\\n\")\n", - " line = lines[0].strip().split(\" \")\n", - " return [v for v in line if v!= \"\"]\n", + " for l in lines:\n", + " info = l.strip().split(\" \")\n", + " queues_info.append([v for v in info if v!= \"\"])\n", "\n", - "def get_stats(duration: int) -> pd.DataFrame:\n", - " \"\"\"\n", - " Retreive general statistics using the rabbitmq management tool.\n", - " \"\"\"\n", + " return queues_info\n", + "\n", + "def get_queues_info_for(duration: int) -> pd.DataFrame:\n", " results = {}\n", " results[\"Time\"] = []\n", " results[\"nb_received_messages\"] = []\n", @@ -384,12 +363,14 @@ " results[\"nb_consumer\"] = []\n", " for _ in range(duration):\n", " results[\"Time\"].append(str(datetime.now()))\n", - " \n", - " results[\"nb_received_messages\"].append(get_recv_msg(\"/tmp/rabbitmq/*_output.txt\"))\n", - "\n", - " queue_depth, nb_consumer = get_queue_size()\n", + " \n", + " queues_info = _get_queues_info()\n", + " queue_depth = queues_info[0][0]\n", + " nb_consumer = queues_info[0][1]\n", + " nb_recv_msg = queues_info[1][0]\n", " results[\"queue_depth\"].append(int(queue_depth))\n", " results[\"nb_consumer\"].append(int(nb_consumer))\n", + " results[\"nb_received_messages\"].append(int(nb_recv_msg))\n", "\n", "\n", " df = pd.DataFrame(data=results)\n", @@ -398,7 +379,7 @@ "\n", "def clean():\n", " \"\"\"\n", - " Kill all previouses launched processes, \n", + " Kill all previouses launched docker containers, \n", " removes all previouses results,\n", " purges the queue.\n", " \"\"\"\n", @@ -410,15 +391,16 @@ " cleaning_registry.kill(signal.SIGKILL)\n", "\n", " en.run_command(\n", - " \"rm /tmp/rabbitmq/*_output.txt & docker rm -f $(docker ps -aq)\",\n", - " task_name=\"cleaning output files and build containers\",\n", + " \"docker rm -f $(docker ps -aq)\",\n", + " task_name=\"cleaning previously build containers\",\n", " roles=roles[\"consumer\"] + roles[\"producer\"],\n", " on_error_continue=True,\n", " gather_facts=False,\n", " )\n", "\n", " en.run_command(\n", - " \"rabbitmqctl purge_queue fault_injection\",\n", + " \"rabbitmqctl purge_queue fault_injection & \"\\\n", + " \"rabbitmqctl purge_queue received_messages\",\n", " task_name=\"purging the queue\",\n", " roles=roles[\"server\"],\n", " on_error_continue=True,\n", @@ -429,7 +411,7 @@ "\n", " for idx in range(nconsumer):\n", " en.run_command(\n", - " f\"docker run -v /tmp/rabbitmq:/tmp/rabbitmq --name tuto_fault_cons_{idx} consumer_image {idx} {server_ip}\"\n", + " f\"docker run --name tuto_fault_cons_{idx} consumer_image {idx} {server_ip}\"\n", " f\" {username_cons} {password_cons}\",\n", " task_name=f\"run consumer script number {idx}\",\n", " roles=roles[\"consumer\"],\n", @@ -439,7 +421,7 @@ "\n", " for idx in range(nproducer):\n", " en.run_command(\n", - " f\"docker run -v /tmp/rabbitmq:/tmp/rabbitmq --name tuto_fault_prod_{idx} producer_image {idx} {server_ip}\"\n", + " f\"docker run --name tuto_fault_prod_{idx} producer_image {idx} {server_ip}\"\n", " f\" {username_prod} {password_prod}\",\n", " task_name=f\"run producer script number {idx}\",\n", " roles=roles[\"producer\"],\n", @@ -474,7 +456,7 @@ "source": [ "## General knowledge\n", "\n", - "A ```ContainerDockerRegistry``` is a a kind of directory that records all Docker containers that follows a regexp on specific roles. " + "A ```ContainerDockerRegistry``` is a a kind of directory that records all Docker containers that follows a given regexp on specific roles. " ] }, { @@ -512,7 +494,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "For each case below, we will act on either the consumers, either the producers, never both. It can be the entire group or only a subset.\n", + "For each case below, we will act on either the consumers or producers, never both. It can be the entire group or only a subset.\n", "\n", "When acting on consumers, we will observe an increase of the queue depth, meaning that the messages are not processed as fast as they are produced.\n", "\n", @@ -575,9 +557,9 @@ "metadata": {}, "outputs": [], "source": [ - "results_before_kill = get_stats(5)\n", + "results_before_kill = get_queues_info_for(5)\n", "registry_on_consumers.kill(signal.SIGKILL)\n", - "results_after_kill = get_stats(5)" + "results_after_kill = get_queues_info_for(5)" ] }, { @@ -635,9 +617,9 @@ "metadata": {}, "outputs": [], "source": [ - "results_before_kill = get_stats(5)\n", + "results_before_kill = get_queues_info_for(5)\n", "registry_on_producers.kill(signal.SIGKILL)\n", - "results_after_kill = get_stats(5)" + "results_after_kill = get_queues_info_for(5)" ] }, { @@ -719,7 +701,7 @@ " delta = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -733,7 +715,7 @@ }, { "cell_type": "code", - "execution_count": 86, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -765,7 +747,7 @@ " delta = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -830,7 +812,7 @@ " date = datetime.now() + timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -876,7 +858,7 @@ " date = datetime.now() + timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(20)\n", + "results = get_queues_info_for(25)\n", "results" ] }, @@ -893,7 +875,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Here, each kill is scheduled and totally random, we have to specify how many docker containers we want to kill, the beginning (date of the first kill) and the interval between each of them.\n", + "Here, each kill is scheduled and the choice of the killed docker container is done totally randomly among those registered,, we have to specify how many docker containers we want to kill, the beginning (date of the first kill) and the interval between each of them.\n", "\n", "The beginning is a ```datetime.datetime``` object.\n", "\n", @@ -936,7 +918,7 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(50)\n", + "results = get_queues_info_for(50)\n", "results" ] }, @@ -963,7 +945,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "We can also restart all killed processes without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset()``` specificly implemented for this experiment ! " + "We can also restart all killed docker containers without acting on the others. In a way we go back to the registry's initial state. This ```registry.reset()``` has nothing to do with the ```reset()``` specificly implemented for this experiment ! " ] }, { @@ -994,6 +976,14 @@ "It can be done either synchronously with ```registry.restart()```, either asynchronously in the same way as the kills previously shown. If asynchronous, the date of the kill(s) (or the time delta before they happen) and its interval with the start(s) of the docker container(s) can be specified." ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Restarting all consumers" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1029,10 +1019,18 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(40)\n", + "results = get_queues_info_for(45)\n", "results" ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Restarting all producers" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1068,7 +1066,7 @@ " interval = timedelta(minutes=1),\n", ")\n", "# each iteration last for ~4 sec (2 requests + 1sec sleep)\n", - "results = get_stats(40)\n", + "results = get_queues_info_for(45)\n", "results" ] }, diff --git a/g5k/consumer.py b/g5k/consumer.py index 2b3ce3669ac0b645a6b8da8465b0bd65f5ff3deb..6ebffc00ec90441aaf197e7bbb3ceb20e2795827 100644 --- a/g5k/consumer.py +++ b/g5k/consumer.py @@ -10,16 +10,9 @@ def callback(ch, method, properties, body): body_str = body.decode("utf8").replace("'", '"') id_sender, number = body_str.split(";") - with open(f"/tmp/rabbitmq/consumer_{idx}_output.txt", "a") as f: # append mode - f.write( - " [x] Consumer " - + str(idx) - + " has received " - + str(number) - + " from " - + str(id_sender) - + "\n" - ) + channel.basic_publish( + exchange="", routing_key="received_messages", body=str(idx) + ";" + str(number) + ";" + str(id_sender) + ) time.sleep(1.0) @@ -45,9 +38,13 @@ if __name__ == "__main__": # declaring the queue again (to be sure) channel.queue_declare(queue="fault_injection") - # auto_ack: as soon as collected, a message is considered as acked channel.basic_consume(queue="fault_injection", auto_ack=False, on_message_callback=callback) + # declaring the queue for received messages + # no messages will be processed ! + # the only purpose of this is to retrieve the total number of messages processed by the first queue + channel.queue_declare(queue="received_messages") + # wait for messages channel.start_consuming() diff --git a/g5k/producer.py b/g5k/producer.py index d44ca60cd07bc792999960355e7117313b82e8d7..6567c21c0fb3b8a879cb1f5f503af868d67e729a 100644 --- a/g5k/producer.py +++ b/g5k/producer.py @@ -28,10 +28,7 @@ if __name__ == "__main__": channel.basic_publish( exchange="", routing_key="fault_injection", body=str(idx) + ";" + str(number) ) - - with open(f"/tmp/rabbitmq/producer_{idx}_output.txt", "a") as f: - f.write(" [x] Producer " + str(idx) + " has sent " + str(number) + "\n") - + time.sleep(1.0) # gently close (flush)