diff --git a/.remi/config.yaml b/.remi/config.yaml deleted file mode 100644 index e0326ea431874362df91e3bdb588b5ba732b6cd6..0000000000000000000000000000000000000000 --- a/.remi/config.yaml +++ /dev/null @@ -1,139 +0,0 @@ -# Name for your project -project_name: rl_hm - - -# Inria username -username: galepage - - -# Name of your Inria workstation -pc_name: alya - - -# Location of the project on the remote computer -project_remote_path: /scratch/alya/galepage/.remi_projects/rl_hm - - -# Bastion used to ssh into Inria resources -bastion: - hostname: bastion.inrialpes.fr - username: galepage - - -# Desktop background jobs -background: - # Which backend to use (`screen` or `tmux`) - backend: screen - - # Whether to keep the session alive after the job has ended. - # It lets you attach to the session to see the program output. - # If 'false', the session will be closed when the job is over and stdout/stderr will be lost. - # CAUTION: If true, you will have to manually re-attach and close the session. - keep_session_alive: false - - -# Virtual environment -virtual_env: - # Enable the virtual environment - enabled: false - - # Which virtual environment backend to use (`conda` or `virtualenv`) - type: virtualenv - - # For `virtualenv` or `conda` virtual environments, you can specify a custom path. - path: venv/ - - # The name of your virtual environment (for `conda` environments) - name: my_conda_env - - # For `conda` environments, path to a `yaml` configuration path - conda_env_file: environment.yaml - - # For `conda` environments, you may specify a python version - python_version: 3.9 - - -# Singularity container options -singularity: - # The name of the 'recipe' file (`.def`) to build the singularity container. - def_file_name: container.def - - # The name of the singularity image. - output_sif_name: container.sif - - # A dictionnary of binds for the singularity container. - # If the value is empty (''), the mount point is the same as the path on the host. - # By default, the project folder is bound within the singularity container: This configuration - # then allows you to add extra locations. - # Example: - # /path_on_host/my_data: /path_in_container/my_data - bindings: - - -# Oarsub options (for more details on `oarsub`, please refer to -# https://oar.imag.fr/docs/latest/user/commands/oarsub.html). -oarsub: - - # Job name - job_name: rl_hm - - # Number of cpus requested. - num_cpus: 1 - - # Number of cpu cores requested. - # If the value is 0, all the cores for the requested cpus will be used. - num_cpu_cores: 0 - - # Number of GPUs requested. - # If the value is 0, no GPU will be requested (CPU only). - num_gpus: 1 - - # The maximum allowed duration for your job. - walltime: '72:00:00' - - # The name of the requested cluster (perception, mistis, thoth...) - cluster_name: perception - - # Optionnaly specify the id of a specific node (gpu3, node2...) - host_id: - - # If the options above are too restricive for your use-case, you may - # directly provide a property list that will be provided to `oarsub` with the - # `-p` flag. - custom_property_query: - - # Whether to schedule the job in the besteffort queue. - besteffort: true - - # Whether to set the job as idempotent (see oarsub documentation for more details). - idempotent: false - - -# Remote servers -# Remote servers are applications that run on a remote computer and can be accessed from your local -# browser thanks to remi. -# Two such servers are supported right now: -# - Jupyter notebook -# - TensorBoard -remote_servers: - # The command to run for opening the local browser (`<browser_cmd> <url>`) - browser_cmd: firefox - - # Jupyter notebook - jupyter: - # The port (local and remote) for the server - port: 8080 - - # If true, automatically open the jupyter notebook in the local browser. - open_browser: true - - # TensorBoard - tensorboard: - # The port (local and remote) for TensorBoard - port: 9090 - - # Directory from where to run tensorboard. - logdir: 'output/' - - # If true, automatically open TensorBoard in the local browser. - open_browser: true diff --git a/.remi/exclude.txt b/.remi/exclude.txt deleted file mode 100644 index 0c7f755ebd394b8aad6456c7833ebe97a1308cc1..0000000000000000000000000000000000000000 --- a/.remi/exclude.txt +++ /dev/null @@ -1,17 +0,0 @@ -.remi -output/ -notebooks/ -.git -__pycache__ -.ipynb_checkpoints -logs -.envrc -.DS_Store -.*.swp -*.egg-info/ -**/__pycache__/.idea -.mypy_cache/ -venv/ -*.sif -build-temp-* -venv/ diff --git a/README.md b/README.md index f1ecb99d1db8c1d022019d580824c5b6eb90c08a..d8d800df096c1cd4e0991ef3b7f0e1a6cad1d517 100644 --- a/README.md +++ b/README.md @@ -6,23 +6,21 @@ Link: [robotlearn.gitlabpages.inria.fr/cluster-monitor](https://robotlearn.gitla ## Implementation overview -The cluster monitor counts three entities: -- The **data fetcher** is running directly on an inria workstation (currently `alya`). It is - performing `ssh` commands to other nodes (especially `access1-cp`) to gather the cluster state.\ - **Code:** `rl_hm/data_fetcher/` -- The **backend server** is running on a Linux server external to Inria and is receiving the **data - fetcher** updates through a TCP socket connection running over an SSH tunnel. - It exposes a [Socket.IO](https://socket.io/) server to the web clients that connect to it. - As soon as it receives an update from the **data fetcher**, it pushes it to all of the connected - clients through the _socket-io_ connection.\ - **Code:** `rl_hm/backend/` -- Finally, the web clients run a javascript application that connects to the _socket-io_ server - (**backend**) and updates the html page with the reveived data.\ +The cluster monitor counts two entities: +- The **backend** server is running on an Inria machine (`perception.inrialpes.fr`). + It is performing `ssh` commands to other nodes (especially `access1-cp`) to gather the cluster + state.\ + It also exposes a [Socket.IO](https://socket.io/) server to the web clients that connect to it. + It pushes the cluster information to all of the connected clients through the _socket-io_ + connection.\ + **Code:** `backend/` +- The **frontend* is a javascript application that connects to the _socket-io_ server + (**backend**) and updates the html page with the received data.\ **Code:** `public/` -## Acknowlegment +## Acknowledgment - **David Emukpere:** for his numerous advice and help about web development and infrastructure. -- [**Tanguy Lepage:**](https://tanguylepage.com/) for the front-end HTML/CSS design. +- [**Tanguy Lepage:**](https://tanguylepage.com/) for the frontend HTML/CSS design. - [**Anand Ballou:**](https://team.inria.fr/robotlearn/team-members/anand-ballou/) for his advice and his help on the cluster data fetching. diff --git a/rl_hm/__init__.py b/backend/__init__.py similarity index 100% rename from rl_hm/__init__.py rename to backend/__init__.py diff --git a/rl_hm/_logging.py b/backend/_logging.py similarity index 100% rename from rl_hm/_logging.py rename to backend/_logging.py diff --git a/rl_hm/data_fetcher/cluster.py b/backend/cluster.py similarity index 88% rename from rl_hm/data_fetcher/cluster.py rename to backend/cluster.py index bed1503a2aa7704728e603532dc454f2279c3b8e..f265565909e8e30f1fd30c75c5b81acd4c9c604d 100644 --- a/rl_hm/data_fetcher/cluster.py +++ b/backend/cluster.py @@ -31,7 +31,7 @@ USERNAMES: list[str] = [ 'dmeng', 'wguo', 'xbie', - 'lgomezca', + 'adupless', 'bbasavas', ] @@ -58,11 +58,13 @@ def _run_remote_command_and_fetch_dict(update_cmd: list[str]) -> dict: update_cmd = ['ssh', 'access1-cp'] + update_cmd try: - cmd_output: subprocess.CompletedProcess = subprocess.run(args=update_cmd, - check=False, - capture_output=True, - text=True, - timeout=TIMEOUT) + cmd_output: subprocess.CompletedProcess = subprocess.run( + args=update_cmd, + check=False, + capture_output=True, + text=True, + timeout=TIMEOUT + ) return_code: int = cmd_output.returncode stdout: str = cmd_output.stdout @@ -128,9 +130,10 @@ def _fetch_and_parse_oarnodes_dict() -> tuple[dict, list[str]]: if gpu_device not in job['gpu_ids']: job['gpu_ids'].append(gpu_device) else: - node_dict['running_jobs'][job_id] = {'nb_cores': 1, - 'gpu_ids': [gpu_device] - } + node_dict['running_jobs'][job_id] = { + 'nb_cores': 1, + 'gpu_ids': [gpu_device] + } jobs_list.append(job_id) @@ -169,9 +172,11 @@ def _fetch_and_parse_oarstat_dict(jobs_list: list[str]) -> dict: start_time: datetime = datetime.fromtimestamp(job_dict['startTime']) hours, mins, secs = _extract_walltime(job_dict['message']).split(':') - walltime: timedelta = timedelta(hours=int(hours), - minutes=int(mins), - seconds=int(secs)) + walltime: timedelta = timedelta( + hours=int(hours), + minutes=int(mins), + seconds=int(secs) + ) max_time: datetime = start_time + walltime @@ -225,7 +230,9 @@ def update() -> dict: jobs_dict: dict = _fetch_and_parse_oarstat_dict(jobs_list=jobs_list) - cluster_dict: dict = _merge(nodes_dict=nodes_dict, - jobs_dict=jobs_dict) + cluster_dict: dict = _merge( + nodes_dict=nodes_dict, + jobs_dict=jobs_dict + ) return cluster_dict diff --git a/backend/main.py b/backend/main.py new file mode 100755 index 0000000000000000000000000000000000000000..a0fffeb32211eabf2f5d1cca79ec8a7c9e41fed3 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 + +from logging import Logger, getLogger +import time +from datetime import datetime + +from backend._logging import init_logger +from backend import cluster + +import socketio # type: ignore +import eventlet # type: ignore +eventlet.monkey_patch() + +# Init logger +init_logger() +LOGGER: Logger = getLogger('rl_hm.web_app') + +REFRESH_TIME: int = 1 +DATETIME_FORMAT: str = '%m/%d/%Y-%H:%M:%S' + +SOCKET_IO_PORT: int = 8888 +# 16_384 +MAX_MESSAGE_LENGTH: int = 2 ** 14 + +socket_io: socketio.Server = socketio.Server( + async_mode='eventlet', + cors_allowed_origins='*' +) + +num_connected_clients: int = 0 + + +def _update_loop(socket_io: socketio.Server) -> None: + + # Tracking + init_time: datetime = datetime.now() + step_counter: int = 1 + + while True: + + LOGGER.info("%i clients are currently connected", num_connected_clients) + + if num_connected_clients > 0: + + LOGGER.info("--> updating") + LOGGER.info("step n°%i", step_counter) + LOGGER.info("started: %s", init_time.strftime(DATETIME_FORMAT)) + + # If the update fails, just ignore it and try another time. + try: + payload_dict: dict = cluster.update() + # payload_dict: dict = {} + + socket_io.emit( + event='update', + data=payload_dict + ) + + except Exception as exception: + LOGGER.error("Exception:", exc_info=exception) + + # Eventually wait before updating again + if REFRESH_TIME > 0: + LOGGER.info("waiting %is before updating again", REFRESH_TIME) + time.sleep(REFRESH_TIME) + + step_counter += 1 + + +@socket_io.on('connect') +def callback_connect(*args) -> None: + global num_connected_clients + num_connected_clients += 1 + + LOGGER.info("A client is connected (total: %i)", num_connected_clients) + + if num_connected_clients == 1: + LOGGER.info("This is the first client: starting to fetch cluster updates") + + +@socket_io.on('disconnect') +def callback_disconnect(*args) -> None: + global num_connected_clients + num_connected_clients = max(0, num_connected_clients - 1) + + LOGGER.info("A client has disconnected (%i remaining)", num_connected_clients) + + if num_connected_clients == 0: + LOGGER.info("This was the last client: interrupting cluster updates fetching") + + +def main() -> None: + + # Run the update loop that gets updates from the data fetcher. + socket_io.start_background_task( + target=_update_loop, + socket_io=socket_io + ) + + LOGGER.info("Starting WSGIApp") + app: socketio.ASGIApp = socketio.WSGIApp(socket_io) + eventlet.wsgi.server( + eventlet.listen( + ('0.0.0.0', SOCKET_IO_PORT) + ), + app + ) + + +if __name__ == '__main__': + main() diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000000000000000000000000000000000000..090a1c0c60f0ec94c5c0bdbbc95db03251ff0288 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +rsync -rav \ + --delete \ + --exclude "*.mypy_cache/" \ + --exclude "venv/" \ + --exclude ".git/" \ + --exclude "*/__pycache__/" \ + . alya:~/rl_hm_2/ + # . server:~/rl_hm_2/ diff --git a/requirements.txt b/requirements.txt index f08cca1411952dda51821e14d0501138b29aa8f2..033ebd0cf8f550a15684c6655b4e01cfe6f3c326 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ +eventlet +python-socketio termcolor +numpy diff --git a/resources.md b/resources.md new file mode 100644 index 0000000000000000000000000000000000000000..ec23728a55c84a4133ec0903ba83d895f39cc289 --- /dev/null +++ b/resources.md @@ -0,0 +1,25 @@ +# Ressources + +## Deployment + +- [GitLab Pages](https://docs.gitlab.com/ee/user/project/pages/) + + +## Flask and websockets + +- [Doc Flask](https://flask.palletsprojects.com/en/2.0.x/) +- [Doc Flask-SocketIO](https://flask-socketio.readthedocs.io/en/latest/index.html) +- [WebSockets in Python](https://www.fullstackpython.com/websockets.html) +- [Building apps using Flask-SocketIO and JavaScript Socket.IO](https://medium.com/@abhishekchaudhary_28536/building-apps-using-flask-socketio-and-javascript-socket-io-part-1-ae448768643) +- [Implementation of WebSocket using Flask Socket IO in Python](https://www.includehelp.com/python/implementation-of-websocket-using-flask-socket-io-in-python.aspx) +- [Implement a WebSocket Using Flask and Socket-IO(Python)](https://medium.com/swlh/implement-a-websocket-using-flask-and-socket-io-python-76afa5bbeae1) +- [Easy WebSockets with Flask and Gevent](https://blog.miguelgrinberg.com/post/easy-websockets-with-flask-and-gevent) +- [Flask-SocketIO, Background Threads , Jquery, Python Demo](https://timmyreilly.azurewebsites.net/flask-socketio-and-more/) +- [Flask-socketio: Emitting from background thread to the second room blocks the first room](https://bleepcoder.com/flask-socketio/383418577/emitting-from-background-thread-to-the-second-room-blocks) + + +## General Web development + + - [JavaScript basics](https://developer.mozilla.org/en-US/docs/Learn/Getting_started_with_the_web/JavaScript_basics) + - [OpenClassrooms HTML/CSS](https://openclassrooms.com/fr/courses/5664271-learn-programming-with-javascript) + - [OpenClassrooms JS](https://openclassrooms.com/fr/courses/1603881-apprenez-a-creer-votre-site-web-avec-html5-et-css3/1604361-creez-votre-premiere-page-web-en-html) diff --git a/rl_hm/backend/__init__.py b/rl_hm/backend/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/rl_hm/backend/main.py b/rl_hm/backend/main.py deleted file mode 100644 index a6da8b1067f1434e87cb3023f3108db9f593ca9e..0000000000000000000000000000000000000000 --- a/rl_hm/backend/main.py +++ /dev/null @@ -1,95 +0,0 @@ -import socket -import json -from logging import Logger, getLogger - -from rl_hm._logging import init_logger - -import socketio # type: ignore -import eventlet # type: ignore -eventlet.monkey_patch() - -# Init logger -init_logger() -LOGGER: Logger = getLogger('rl_hm.web_app') - -DATA_FETCHER_PORT: int = 9999 -SOCKET_IO_PORT: int = 8888 -# 16_384 -MAX_MESSAGE_LENGTH: int = 2 ** 14 - -socket_io: socketio.Server = socketio.Server(async_mode='eventlet', - cors_allowed_origins='*') - -num_connected_clients: int = 0 - -data_fetcher_socket: socket.socket = socket.socket(family=socket.AF_INET, - type=socket.SOCK_STREAM) - - -def _update_loop(socket_io: socketio.Server, - data_fetcher_socket: socket.socket) -> None: - - LOGGER.info("connecting to data fetcher socket on port %i", DATA_FETCHER_PORT) - data_fetcher_socket.connect(('localhost', DATA_FETCHER_PORT)) - LOGGER.info("succesfully connected to data fetcher") - - while True: - received_message_bytes: bytes = data_fetcher_socket.recv(MAX_MESSAGE_LENGTH) - decoded_string: str = received_message_bytes.decode() - - try: - payload_dict = json.loads(decoded_string) - except json.decoder.JSONDecodeError as json_exception: - # if the received data is corrupted, just ignore this packet - LOGGER.error("JSONDecodeError while parsing received payload: %s", str(json_exception)) - LOGGER.error("received payload: %s", decoded_string) - - continue - - source: str = payload_dict.pop('source') - LOGGER.info("Received update for '%s'", source) - - # TODO use Socket.IO rooms to emit update - socket_io.emit(event='update', - data=payload_dict) - - -@socket_io.on('connect') -def callback_connect(*args) -> None: - global num_connected_clients - num_connected_clients += 1 - - LOGGER.info("A client is connected (total: %i)", num_connected_clients) - - if num_connected_clients == 1: - LOGGER.info("This is the first client: starting to fetch cluster updates") - data_fetcher_socket.sendall('cluster_start'.encode()) - - -@socket_io.on('disconnect') -def callback_disconnect(*args) -> None: - global num_connected_clients - num_connected_clients = max(0, num_connected_clients - 1) - - LOGGER.info("A client has disconnected (%i remaining)", num_connected_clients) - - if num_connected_clients == 0: - LOGGER.info("This was the last client: interrupting cluster updates fetching") - data_fetcher_socket.sendall('cluster_stop'.encode()) - - -def main() -> None: - - # Run the update loop that gets updates from the data fetcher. - socket_io.start_background_task(target=_update_loop, - socket_io=socket_io, - data_fetcher_socket=data_fetcher_socket) - - LOGGER.info("Starting WSGIApp") - app: socketio.ASGIApp = socketio.WSGIApp(socket_io) - eventlet.wsgi.server(eventlet.listen(('0.0.0.0', SOCKET_IO_PORT)), - app) - - -if __name__ == '__main__': - main() diff --git a/rl_hm/backend/requirements.txt b/rl_hm/backend/requirements.txt deleted file mode 100644 index bf40bd19859353db2325ff1e5c9974994505c2c2..0000000000000000000000000000000000000000 --- a/rl_hm/backend/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -eventlet -python-socketio -termcolor diff --git a/rl_hm/data_fetcher/__init__.py b/rl_hm/data_fetcher/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/rl_hm/data_fetcher/main.py b/rl_hm/data_fetcher/main.py deleted file mode 100755 index bf17773962ad693c6cf6b196591a7eddeb69297b..0000000000000000000000000000000000000000 --- a/rl_hm/data_fetcher/main.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 - -from multiprocessing import Process, Queue -import time -from datetime import datetime -import logging -import socket -import json -from typing import Callable - -from rl_hm._logging import init_logger -from rl_hm.data_fetcher import workstations, cluster - -init_logger() - -WS_REFRESH_TIME: int = 5 -CLUSTER_REFRESH_TIME: int = 0 - -DATETIME_FORMAT: str = '%m/%d/%Y-%H:%M:%S' -HOST: str = 'localhost' -PORT: int = 9999 - - -def _update_loop(update_fn: Callable, - queue: Queue, - socket_connection: socket.socket, - task_name: str, - refresh_time: int = 0) -> None: - - # Wait starting instruction before running the loop - while queue.get() != 'start': - pass - - # Tracking - init_time: datetime = datetime.now() - step_counter: int = 1 - - logger: logging.Logger = logging.getLogger('rl_hm.data_fetcher - ' + task_name) - - while True: - - # If no client is connected, stop the loop - if not queue.empty(): - message: str = queue.get() - if message == 'stop': - - # Wait for the 'start' instruction to resume - while queue.get() != 'start': - pass - - logger.info("--> updating") - logger.info("step n°%i", step_counter) - logger.info("started: %s", init_time.strftime(DATETIME_FORMAT)) - - # If the update fails, just ignore it and try another time. - try: - update_dict: dict = update_fn() - - update_dict['source'] = task_name - - data: str = json.dumps(update_dict) - logger.debug('len(data) = %i', len(data)) - data_bytes: bytes = data.encode() - logger.debug('len(data_bytes) = %i', len(data_bytes)) - - socket_connection.sendall(data_bytes) - - except Exception as exception: - logger.error("Exception:", exc_info=exception) - continue - - # Eventually wait before updating again - if refresh_time > 0: - logger.info("waiting %is before updating again", refresh_time) - time.sleep(refresh_time) - - step_counter += 1 - - -def main() -> None: - - logger: logging.Logger = logging.getLogger('rl_hm.data_fetcher') - - # Create a socket (SOCK_STREAM means a TCP socket) - sock: socket.socket = socket.socket(family=socket.AF_INET, - type=socket.SOCK_STREAM) - logger.info("Binding TCP socket on port %i", PORT) - not_bound: bool = True - while not_bound: - try: - sock.bind((HOST, PORT)) - not_bound = False - except OSError: - # Wait before retrying - logger.warning("Port seems to be busy. Retrying...") - time.sleep(1) - - # Listen for client - logger.info("Listening for connections") - sock.listen() - - socket_connection: socket.socket - addr: tuple[str, int] - logger.info("Waiting for the web app to connect to socket...") - socket_connection, addr = sock.accept() - logger.info("Got connection. Addr: %s, Port: %i", addr[0], addr[1]) - - ws_queue: Queue = Queue() - cluster_queue: Queue = Queue() - - with socket_connection: - - # Create two processes: workstations and cluster - ws_process: Process = Process(target=_update_loop, - args=(workstations.update, - ws_queue, - socket_connection, - 'workstations', - WS_REFRESH_TIME)) - cluster_process: Process = Process(target=_update_loop, - args=(cluster.update, - cluster_queue, - socket_connection, - 'cluster', - CLUSTER_REFRESH_TIME)) - - ws_process.start() - cluster_process.start() - - while True: - received_message_bytes: bytes = socket_connection.recv(10000) - decoded_string: str = received_message_bytes.decode() - - if decoded_string == 'cluster_start': - logger.info('Starting cluster updates') - cluster_queue.put('start') - - elif decoded_string == 'cluster_stop': - logger.info('Pausing cluster updates') - cluster_queue.put('stop') - - elif decoded_string == 'ws_start': - logger.info('Starting workstations updates') - ws_queue.put('start') - - elif decoded_string == 'ws_stop': - logger.info('Pausing workstations updates') - ws_queue.put('stop') - - else: - logger.warning("Unhandled message: %s", decoded_string) - - -if __name__ == '__main__': - main() diff --git a/rl_hm/data_fetcher/parsing.py b/rl_hm/data_fetcher/parsing.py deleted file mode 100644 index 49867fa23a8bb4811c45043833e09b6edbb7fa12..0000000000000000000000000000000000000000 --- a/rl_hm/data_fetcher/parsing.py +++ /dev/null @@ -1,106 +0,0 @@ -""" -TODO -""" -# type: ignore - -import json -import xml.etree.ElementTree as ET # noqa: N817 -from typing import Any - - -def parse_cpu_output(lscpu_output: str, - mpstat_output: str) -> dict[str, Any]: - cpu_dict: dict[str, Any] = {} - - lscpu_fields_list: list[dict] = json.loads(lscpu_output)['lscpu'] - - for item in lscpu_fields_list: - if item['field'] == 'CPU(s):': - cpu_dict['num_threads'] = int(item['data']) - elif item['field'] == 'Core(s) per socket:': - cores_per_socket: int = int(item['data']) - elif item['field'] == 'Socket(s):': - cpu_dict['num_sockets'] = int(item['data']) - elif item['field'] == 'Model name:': - cpu_dict['model_name'] = item['data'] - elif item['field'] == 'CPU MHz:': - cpu_dict['current_freq'] = float(item['data']) - elif item['field'] == 'CPU max MHz:': - cpu_dict['min_freq'] = float(item['data']) - elif item['field'] == 'CPU min MHz:': - cpu_dict['max_freq'] = float(item['data']) - - cpu_dict['num_cores'] = cpu_dict['num_sockets'] * cores_per_socket - - # print(cpu_dict) - mpstat_list: list[dict] = \ - json.loads(mpstat_output)['sysstat']['hosts'][0]['statistics'][0]['cpu-load'] - - cpu_dict['cores'] = {} - for cpu_stat in mpstat_list: - usage: float = round(cpu_stat['usr'] + cpu_stat['sys'], 2) - if cpu_stat['cpu'] == 'all': - cpu_dict['global_usage'] = usage - else: - cpu_dict['cores'][cpu_stat['cpu']] = usage - - return cpu_dict - - -def parse_free_output(free_output: str) -> dict[str, Any]: - ram_dict: dict[str, Any] = {} - - # print(free_output) - output_lines: list[str] = free_output.splitlines() - - mem_list: list[str] = output_lines[1].split() - ram_dict['memory'] = { - 'total': mem_list[1], - 'used': mem_list[2], - 'free': mem_list[3] - } - - swap_list: list[str] = output_lines[2].split() - ram_dict['swap'] = { - 'total': swap_list[1], - 'used': swap_list[2], - 'free': swap_list[3] - } - - return ram_dict - - -def parse_nvidia_output(nvidia_output: str) -> dict[str, Any]: - gpu_dict: dict = {} - - # print(nvidia_output) - tree_root: ET.Element = ET.fromstring(nvidia_output) - - gpu_dict['driver_version'] = tree_root.find('driver_version').text # type: ignore - gpu_dict['cuda_version'] = tree_root.find('cuda_version').text # type: ignore - gpu_dict['num_gpus'] = int(tree_root.find('attached_gpus').text) # type: ignore - gpu_dict['gpus'] = {} - - for gpu_id, gpu in enumerate(tree_root.findall('gpu')): - - utilization_node: ET.Element = gpu.find('utilization') # type: ignore - memory_node: ET.Element = gpu.find('fb_memory_usage') # type: ignore - - temperature_node: ET.Element = gpu.find('temperature') # type: ignore - - gpu_dict['gpus'][gpu_id] = { - 'model_name': gpu.find('product_name').text, # type: ignore - 'memory': { - 'total': memory_node.find('total').text, # type: ignore - 'used': memory_node.find('used').text, # type: ignore - 'free': memory_node.find('free').text, # type: ignore - 'util': utilization_node.find('memory_util').text.replace(' %', '') # type: ignore - }, - 'fan': gpu.find('fan_speed').text, # type: ignore - 'usage': utilization_node.find('gpu_util').text.replace(' %', ''), # type: ignore - 'temp': int(temperature_node.find('gpu_temp').text.replace(' C', '')) # type: ignore - } - - assert len(gpu_dict['gpus']) == gpu_dict['num_gpus'] - - return gpu_dict diff --git a/rl_hm/data_fetcher/requirements.txt b/rl_hm/data_fetcher/requirements.txt deleted file mode 100644 index db5d81e01ea66c581a5c41c45df1bfe61111e41e..0000000000000000000000000000000000000000 --- a/rl_hm/data_fetcher/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -matplotlib -numpy diff --git a/rl_hm/data_fetcher/workstations.py b/rl_hm/data_fetcher/workstations.py deleted file mode 100644 index 85bb0995edf274bf7353963f49e3ab7be0793de0..0000000000000000000000000000000000000000 --- a/rl_hm/data_fetcher/workstations.py +++ /dev/null @@ -1,150 +0,0 @@ -import csv -import subprocess -from datetime import datetime -from multiprocessing import Pool -from collections import namedtuple -from socket import gethostname -import logging - -from . import parsing - -USERNAME: str = 'aballou' -TIMEOUT: int = 20 - - -COMMANDS: list[str] = [ - 'lscpu -J', - 'mpstat -o JSON -P ALL', - 'free -h', - 'nvidia-smi -q -x' -] - -Workstation: namedtuple = namedtuple('Workstation', - ['hostname', 'office', 'user']) - - -def _register_workstations() -> list[Workstation]: - - ws_list: list[Workstation] = [] - - with open('workstations.csv', newline='') as csv_workstation_list: - csv_reader = csv.reader(csv_workstation_list, delimiter=',', quotechar='|') - - for row in csv_reader: - - hostname, office, user = row - - ws_list.append(Workstation(hostname=hostname, - office=office, - user=user)) - - return ws_list - - -def _get_update_cmd(hostname: str) -> list[str]: - - update_cmd: list[str] = [] - - # No need to ssh on the current workstation. - if hostname != gethostname(): - update_cmd = ['ssh', hostname, - 'export', 'PATH=$HOME/.local/bin:$PATH;'] - - else: - update_cmd = ['bash', '-c'] - - update_cmd.append('') - for cmd in COMMANDS: - update_cmd[-1] += f"echo '>>>{cmd.split()[0]}' && {cmd} && " - - # Remove the unecessary trailing ' && ' - update_cmd[-1] = update_cmd[-1][:-4] - - return update_cmd - - -def _fetch_state(workstation: Workstation) -> dict: - - state_dict: dict = { - 'office': workstation.office, - 'user': workstation.user, - } - - timestamp = datetime.fromtimestamp(0) - - logger: logging.Logger = logging.getLogger(__name__ + f".{workstation.hostname}") - - # Run the command - try: - cmd_output: subprocess.CompletedProcess = subprocess.run( - args=_get_update_cmd(hostname=workstation.hostname), - check=False, - capture_output=True, - text=True, - timeout=TIMEOUT) - - return_code: int = cmd_output.returncode - stdout: str = cmd_output.stdout - # stderr: str = cmd_output.stderr - - if return_code > 0: - logger.error("Update command failed with return code %i", return_code) - # print("\nstdout:\n", stdout) - # print("\nstderr:\n", stderr) - - elif return_code == 0: - logger.info("Update was succesfull") - - # Split the output - output_list: list[str] = stdout.splitlines() - output_dict: dict[str, str] = {} - command: str = '' - for line in output_list: - if line.startswith('>>>'): - command = line[3:] - output_dict[command] = '' - else: - output_dict[command] += line + '\n' - - # Parse each section's output - state_dict['cpu'] = parsing.parse_cpu_output(lscpu_output=output_dict['lscpu'], - mpstat_output=output_dict['mpstat']) - state_dict['ram'] = parsing.parse_free_output(free_output=output_dict['free']) - - # TODO this code is unreachable (we are in the case where return_code=0) - # `nvidia-smi` can eventually fail. - # This should not prevent other informations to being parsed. - if return_code in [9, 255]: - state_dict['gpu'] = { - 'error_code': return_code, - 'stderr': output_dict['nvidia-smi'] - } - else: - state_dict['gpu'] = \ - parsing.parse_nvidia_output(nvidia_output=output_dict['nvidia-smi']) - - timestamp = datetime.now() - - except subprocess.TimeoutExpired: - logger.warning("Update command for has timed out") - - state_dict['last_updated'] = timestamp.strftime('%m/%d/%Y-%H:%M:%S') - - state_dict = {workstation.hostname: state_dict} - - return state_dict - - -def update() -> dict: - - ws_list: list[Workstation] = _register_workstations() - - with Pool(len(ws_list)) as pool: - result_list: list[dict] = pool.map(_fetch_state, ws_list) - - state_dict: dict = {} - - for result in result_list: - state_dict.update(result) - - return state_dict diff --git a/start.sh b/start.sh new file mode 100755 index 0000000000000000000000000000000000000000..4d455d80d78f672652c14dae64ce2afc7c29334d --- /dev/null +++ b/start.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +export PYTHONPATH=. + +python3 cluster_monitor/main.py diff --git a/start_backend.sh b/start_backend.sh deleted file mode 100755 index c8b3489c351fff8a045d7348d642df681bbaa095..0000000000000000000000000000000000000000 --- a/start_backend.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/sh - -tmux -python rl_hm/web_app/app.py & - -python -m http.server 8000 --directory rl_hm/web_app/static & - - -trap 'echo signal received!; kill $(jobs -p); wait;' SIGINT SIGTERM - -wait diff --git a/todo.md b/todo.md deleted file mode 100644 index 809bf26fc80c5e1e3ef7b03bf7337293a4d0e267..0000000000000000000000000000000000000000 --- a/todo.md +++ /dev/null @@ -1,33 +0,0 @@ -# TODO - - -- [x] Make cluster and workstations updates mutually asynchronous. - - [x] Have two independent concurrent loops for updates - - [x] Send two distinct packages - -## Web app - -- [ ] CAS Inria --> Actually, [GitLab pages](https://docs.gitlab.com/ee/user/project/pages/) - should be perfect to host the frontend. -- [ ] Display the packet timestamp in the web app -- [ ] Webapp startup script\ - Tunnel command: `ssh -N -L 9999:localhost:9999 alya` -- [ ] Favicon - -### Ressources -- Deployment - - [GitLab Pages](https://docs.gitlab.com/ee/user/project/pages/) -- Flask and websockets - - [Doc Flask](https://flask.palletsprojects.com/en/2.0.x/) - - [Doc Flask-SocketIO](https://flask-socketio.readthedocs.io/en/latest/index.html) - - [WebSockets in Python](https://www.fullstackpython.com/websockets.html) - - [Building apps using Flask-SocketIO and JavaScript Socket.IO](https://medium.com/@abhishekchaudhary_28536/building-apps-using-flask-socketio-and-javascript-socket-io-part-1-ae448768643) - - [Implementation of WebSocket using Flask Socket IO in Python](https://www.includehelp.com/python/implementation-of-websocket-using-flask-socket-io-in-python.aspx) - - [Implement a WebSocket Using Flask and Socket-IO(Python)](https://medium.com/swlh/implement-a-websocket-using-flask-and-socket-io-python-76afa5bbeae1) - - [Easy WebSockets with Flask and Gevent](https://blog.miguelgrinberg.com/post/easy-websockets-with-flask-and-gevent) - - [Flask-SocketIO, Background Threads , Jquery, Python Demo](https://timmyreilly.azurewebsites.net/flask-socketio-and-more/) - - [Flask-socketio: Emitting from background thread to the second room blocks the first room](https://bleepcoder.com/flask-socketio/383418577/emitting-from-background-thread-to-the-second-room-blocks) -- General Web development - - [JavaScript basics](https://developer.mozilla.org/en-US/docs/Learn/Getting_started_with_the_web/JavaScript_basics) - - [OpenClassrooms HTML/CSS](https://openclassrooms.com/fr/courses/5664271-learn-programming-with-javascript) - - [OpenClassrooms JS](https://openclassrooms.com/fr/courses/1603881-apprenez-a-creer-votre-site-web-avec-html5-et-css3/1604361-creez-votre-premiere-page-web-en-html) diff --git a/workstations.csv b/workstations.csv deleted file mode 100644 index aa6fc8017c70e557c34e473b20ea647dbfd370a1..0000000000000000000000000000000000000000 --- a/workstations.csv +++ /dev/null @@ -1,15 +0,0 @@ -auriga,none,none -alya,H213,galepage -bacchus,H213,xilin -mensa,H213,demukper -andromeda,H214,aballou -scorpio,H214,yixu -virgo,H214,xbie -bootes,H215,zkang -chamaeleon,H215,lgomezca -hydra,H216,creinke -ursa,H216,aauterna -kapelos,H217,nduboisq -octans,H217,wguo -pictor,H217,lairale -carina,H220,twintz