diff --git a/e2clab/app.py b/e2clab/app.py index 78c995c8bf6a231392b593f823302c14f4903cbf..79c3c302ef308e6e8199bb984e1b6680d49de3d2 100644 --- a/e2clab/app.py +++ b/e2clab/app.py @@ -26,6 +26,7 @@ from e2clab.constants.workflow import ( SELF_PREFIX, SERV_SELECT, TARGET, + TASK_FINALIZE, ) from e2clab.errors import E2clabFileError from e2clab.grouping import get_grouping @@ -148,7 +149,7 @@ class App: self.logger.debug(f"[TASK CONFIG] {filtered_config}") # Iterate on each hosts defnition - # TODO: check performance impact + # TODO: check performance impact and maybe run a single ansible play for host_task_conf in filtered_config: self.logger.debug(f"[HOST TASK CONF] {host_task_conf}") @@ -172,7 +173,7 @@ class App: Returns: working_dir (Path): Path to the current working directory """ - if task != "finalize": + if task != TASK_FINALIZE: working_dir = self.artifacts_dir else: working_dir = self.app_dir diff --git a/e2clab/cli.py b/e2clab/cli.py index 3b72aae3847004160a6d128b2a64f4b080f90fd4..1c7bbaea9f41151edade6f8ba85e8ba3d01322cb 100644 --- a/e2clab/cli.py +++ b/e2clab/cli.py @@ -98,7 +98,7 @@ def cli(debug: bool, mute_enoslib: bool, mute_ansible: bool): level=logging.DEBUG if debug else logging.INFO, enable_enoslib=not mute_enoslib, mute_ansible=mute_ansible, - file_handler=False, + file_handler=False, # File handler set later in exp markup=True, ) logger.debug(f"Loaded information from '{ENV_FILE}': {loaded_dotenv}") diff --git a/e2clab/constants/layers_services.py b/e2clab/constants/layers_services.py index 7db57357b964c8e1ce71ea661a6315b012852f66..11899ec899369013e6c9c47fd19d398687acf385 100644 --- a/e2clab/constants/layers_services.py +++ b/e2clab/constants/layers_services.py @@ -34,6 +34,7 @@ WALLTIME = "walltime" RESERVATION = "reservation" ENV_NAME = "env_name" QUANTITY = "quantity" +MONITOR = "monitor" SERVICES = "services" NAME = "name" IMAGE = "image" diff --git a/e2clab/errors.py b/e2clab/errors.py index 178ab07ed52469bfc8c5a568303ca74f8687b763..e333d83999fa15fbd69631b7d9e6c23bc7b1b337 100644 --- a/e2clab/errors.py +++ b/e2clab/errors.py @@ -1,3 +1,8 @@ +""" +E2Clab error module +""" + + class E2clabError(Exception): pass diff --git a/e2clab/experiment.py b/e2clab/experiment.py index a2051acc76240fa975a72c3656755b0c44210c07..04c3e8cce228c486e8270123c09d7a173eff8772 100644 --- a/e2clab/experiment.py +++ b/e2clab/experiment.py @@ -35,6 +35,7 @@ from e2clab.errors import E2clabError from e2clab.infra import Infrastructure from e2clab.log import config_file_logger, get_logger from e2clab.network import Network +from e2clab.probe import TaskProbe class Experiment: @@ -68,9 +69,12 @@ class Experiment: self.net = None self.app = None + self.probe = TaskProbe() + self.experiment_dir = None def __setstate__(self, state): + """Ran when unpickling""" self.__dict__.update(state) # re-configure loggers config_file_logger(self.experiment_dir) @@ -176,10 +180,16 @@ class Experiment: # Enforce task self.logger.info(f"Enforcing workflow:{task}") - - self.app.run_task(task=task, current_repeat=self.repeat) + # self.app.run_task(task=task, current_repeat=self.repeat) + self.run_task(task=task, current_repeat=self.repeat) self.logger.info(f"Done enforcing workflow:{task}") + def run_task(self, task: str, current_repeat: int): + """Wrapper for application run_task""" + self.probe.set_start(task_name=task) + self.app.run_task(task=task, current_repeat=current_repeat) + self.probe.set_end(task_name=task) + def finalize(self, app_conf: str = None, destroy: bool = False) -> None: """ Finalize experiment @@ -197,7 +207,8 @@ class Experiment: self.logger.info("Finalizing experiment") self.logger.info("Running workflow 'finalize'") - self.app.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat) + # self.app.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat) + self.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat) self.logger.info("Finalizing layers and services") self.infra.finalize(output_dir=output_dir) self.logger.info("Done finalizing experiment") @@ -296,7 +307,7 @@ class Experiment: def get_exp_dir(self) -> Path: return self.experiment_dir - # TODO: refactor + # TODO: refactor using managers to return information def _dump_application_parameters(self) -> None: """ Generates a file with a list of User-Defined Services to be used by the user in diff --git a/e2clab/managers/__init__.py b/e2clab/managers/__init__.py index 1df54b3160c2bcff5191b1ec79c548f6875a4079..3707a81b343bf5e2955cd802d547fef3c14a1cc1 100644 --- a/e2clab/managers/__init__.py +++ b/e2clab/managers/__init__.py @@ -4,6 +4,7 @@ from enum import Enum from .manager import Manager from .monitoring import MonitoringManager from .monitoring_iot import MonitoringIoTManager +from .monitoring_kwollect import MonitoringKwollectManager from .provenance import ProvenanceManager @@ -11,3 +12,4 @@ class Managers(Enum): PROVENANCE = ProvenanceManager MONITORING = MonitoringManager MONITORING_IOT = MonitoringIoTManager + KWOLLECT = MonitoringKwollectManager diff --git a/e2clab/managers/manager.py b/e2clab/managers/manager.py index 9b1373691bb832ff5333470383c3b793443470fc..27785899393ed4d9642f6e1caa44a43d785ef247 100644 --- a/e2clab/managers/manager.py +++ b/e2clab/managers/manager.py @@ -38,6 +38,15 @@ class Manager(ABC): if not self._validate_config(self.config): raise E2clabConfigError + self.host: Optional[Host] = None + # Iterable[Host] ? + self.agent: Optional[Roles] = None + self.networks: Optional[Networks] = None + self.artifacts_dir: Optional[Path] = None + self.provider = None + self.meta: dict = {} + self.service = None + def init( self, roles: Optional[Roles] = None, diff --git a/e2clab/managers/monitoring_kwollect.py b/e2clab/managers/monitoring_kwollect.py new file mode 100644 index 0000000000000000000000000000000000000000..1d3d2f1a66f93407a0606b8b55d387e7283e1945 --- /dev/null +++ b/e2clab/managers/monitoring_kwollect.py @@ -0,0 +1,145 @@ +""" +Kwollect monitoring manager +""" + +import csv +from typing import Optional + +import pytz +from enoslib.infra.enos_g5k.g5k_api_utils import get_api_client + +from e2clab.constants import WORKFLOW_TASKS, Environment, WorkflowTasks +from e2clab.log import get_logger +from e2clab.managers.manager import Manager +from e2clab.probe import TaskProbe + +METRICS = "metrics" +STEP = "step" + +API_TZ = pytz.timezone("Europe/Paris") + + +class MonitoringKwollectManager(Manager): + """ + Kwollect monitoring manager class + """ + + logger = get_logger(__name__, ["KWOLLECT"]) + + SCHEMA = { + "type": "object", + "title": "Grid5000 kwollect monitoring Schema", + "properties": { + METRICS: { + "description": "Metrics to pull from job, '[all]' to pull all metrics", + "default": ["all"], + "type": "array", + "items": {"type": "string"}, + }, + STEP: { + "description": "Workflow step to monitor", + "type": "string", + "enum": WORKFLOW_TASKS, + "default": WorkflowTasks.LAUNCH.value, + }, + }, + "required": [METRICS], + } + + CONFIG_KEY = "kwollect" + SERVICE_ROLE = None # Not useful + # Not needed ? Do we take the whole job ? + ROLE = "k_monitor" + + def create_service(self): + # No service to create + pass + + def _deploy(self): + self.g5k_client = get_api_client() + self.jobs = self.provider.get_jobs() + for job in self.jobs: + site = job.site + id = job.uid + dash_addr = self._get_viz_address(site) + self.logger.info( + f"Access kwollect metric dashboard for job {id}: {dash_addr}" + ) + + def _backup(self, output_dir): + task_probe = TaskProbe.get_probe() + records = task_probe.get_records() + + metrics = self._get_metrics_str() + + step = self.config.get(STEP, WorkflowTasks.LAUNCH.value) + + rec = records.get(step) + if not rec: + self.logger.warning(f"No timestamps found for {step}. Did you run it ?") + return + + start_str = rec.start.astimezone(API_TZ).isoformat() + end_str = rec.end.astimezone(API_TZ).isoformat() + + for job in self.jobs: + kwargs = { + "metrics": metrics, + "start_time": start_str, + "end_time": end_str, + } + if "all" in metrics: + kwargs.pop("metrics") + site = job.site + nodes_str = self._filter_site_nodes(site) + if nodes_str: + kwargs["nodes"] = nodes_str + else: + oarjobid = job.uid + kwargs["job_id"] = oarjobid + # API call + metrics = self.g5k_client.sites[site].metrics.list(**kwargs) + self._dump_metrics(metrics, output_dir=output_dir) + + def _destroy(self): + # Nothing to do + pass + + def get_environment(self) -> Environment: + """This manager only works for Grid5000""" + return Environment.G5K + + def _get_metrics_str(self): + m_list = self.config[METRICS] + return ",".join(m_list) + + def _filter_site_nodes(self, site: str) -> Optional[None]: + nodes = [] + for agent in self.agent: + addr = agent.address + if site in addr and "grid5000" in addr: + nodes.append(addr.split(".")[0]) + + if len(nodes) > 0: + return ",".join(nodes) + else: + return None + + @classmethod + def get_metrics_str(cls, config): + return ",".join(config[METRICS]) + + def _get_viz_address(self, site: str) -> str: + """Returns address to dashboard""" + addr = f"https://api.grid5000.fr/stable/sites/{site}/metrics/dashboard" + return addr + + @staticmethod + def _dump_metrics(metrics, output_dir): + metrics_csv = list(map(lambda m: m.to_dict(), metrics)) + keys = metrics_csv[0].keys() + out_file = output_dir / "test.csv" + with open(out_file, "w") as out: + dict_writer = csv.DictWriter(out, keys) + dict_writer.writeheader() + dict_writer.writerows(metrics_csv) diff --git a/e2clab/probe.py b/e2clab/probe.py new file mode 100644 index 0000000000000000000000000000000000000000..d2740aa54ddebfc50d2c282ebfc6b9123f317656 --- /dev/null +++ b/e2clab/probe.py @@ -0,0 +1,77 @@ +""" +Probe singletons module +""" + +from dataclasses import dataclass +from datetime import datetime + +from e2clab.log import get_logger + + +@dataclass +class Record: + start: datetime + end: datetime + + +class TaskProbe: + """ + Pickle-resistant implementation of a singleton + + Not an issue as un-pickling is done at the beginning of an e2clab process + """ + + _instance = None + + def __init__(self): + self.logger = get_logger(__name__, ["PROBE"]) + + self._records: dict[str, Record] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __setstate__(self, state): + """ + Ran when unpickling an instance + """ + self.__dict__.update(state) + # restore unpickled probe as singleton + type(self)._instance = self + + def set_start(self, task_name: str) -> None: + self._check_record(task_name) + timestmp = self._get_now() + self._records[task_name].start = timestmp + + self.logger.debug(f"Set start timestamp for {task_name}") + + def set_end(self, task_name: str) -> None: + self._check_record(task_name) + timestmp = self._get_now() + self._records[task_name].end = timestmp + + self.logger.debug(f"Set start timestamp for {task_name}") + + @staticmethod + def _get_now() -> datetime: + return datetime.now().astimezone() + + def _check_record(self, task_name: str): + """Init record if doesn't exist""" + if task_name not in self._records: + self._records[task_name] = Record(start=None, end=None) + + def get_task_record(self, task_name): + return self._records.get(task_name, None) + + def get_records(self) -> dict[str, Record]: + return self._records + + @classmethod + def get_probe(cls): + if not cls._instance: + cls._instance = cls() + return cls._instance diff --git a/e2clab/providers/plugins/G5k.py b/e2clab/providers/plugins/G5k.py index 4b745cfc2afa03ac00953a40ac704e5f81315d5e..07c5fa82c0cd8c4f2253a39045e7a2982853ba7e 100644 --- a/e2clab/providers/plugins/G5k.py +++ b/e2clab/providers/plugins/G5k.py @@ -17,6 +17,7 @@ from e2clab.constants.layers_services import ( JOB_NAME, JOB_TYPE, KEY_NAME, + MONITOR, MONITORING_NETWORK_ROLE, MONITORING_SERVICE_ROLE, MONITORING_SVC, @@ -56,6 +57,7 @@ class G5kConfig(ProviderConfig): self.env_name = self.env.get(ENV_NAME, None) self.cluster = self.env.get(CLUSTER, None) self.keyfile = self.env.get(KEY_NAME, default.SSH_KEYFILE) + self.monitor = self.env.get(MONITOR, None) def init(self, optimization_id: Optional[int] = None) -> None: """Initialize G5k configuration @@ -75,6 +77,7 @@ class G5kConfig(ProviderConfig): reservation=self.reservation, walltime=self.walltime, key=self.keyfile, + monitor=self.monitor, ) self.cluster_list = self._search_clusters_at_service_level() self.prod_network = self._create_production_network_for_clusters( @@ -386,7 +389,7 @@ class G5k(Provider): with config_context(g5k_cache=False): provider = self._provider_g5k(self.optimization_id) - self.provider = provider + self.en_provider = provider roles, networks = provider.init() en.wait_for(roles) @@ -422,7 +425,13 @@ class G5k(Provider): return roles, networks def destroy(self): - self.provider.destroy() + self.en_provider.destroy() + + def get_sites(self) -> list[str]: + return self.en_provider.provider_conf.sites + + def get_jobs(self): + return self.en_provider.jobs def _provider_g5k(self, optimization_id: Optional[int] = None) -> en.G5k: self.config.init(optimization_id) diff --git a/e2clab/schemas/layers_services_schema.py b/e2clab/schemas/layers_services_schema.py index 1d2d8bc0ebd00e3b0036c77f8d16b87d4fcbd31e..485f2a69dbe76b3f4190c37cca05f1eb4edcded2 100644 --- a/e2clab/schemas/layers_services_schema.py +++ b/e2clab/schemas/layers_services_schema.py @@ -17,6 +17,7 @@ from e2clab.constants.layers_services import ( JOB_TYPE, KEY_NAME, LAYERS, + MONITOR, NAME, QUANTITY, RC_FILE, @@ -128,6 +129,10 @@ g5k_schema: dict = { "description": "G5k firewall rules", "type": "array", }, + MONITOR: { + "description": "Activate on demand metrics (e.g.`prom_.*`)", + "type": "string", + }, }, } diff --git a/e2clab/tests/unit/test_managers.py b/e2clab/tests/unit/test_managers.py index 3973f0efa2f4562fe9f1c0514e7c4f8ae3ce5796..0f6fc000f61d8902855283143c594d2521b7bfda 100644 --- a/e2clab/tests/unit/test_managers.py +++ b/e2clab/tests/unit/test_managers.py @@ -39,6 +39,7 @@ from e2clab.errors import E2clabConfigError from e2clab.managers.manager import Manager from e2clab.managers.monitoring import MonitoringManager, MonitoringType from e2clab.managers.monitoring_iot import MonitoringIoTManager +from e2clab.managers.monitoring_kwollect import METRICS, STEP, MonitoringKwollectManager from e2clab.managers.provenance import ProvenanceManager from e2clab.providers.plugins.Iotlab import Iotlab from e2clab.services import Provenance @@ -354,3 +355,44 @@ class TestMonitoringIOTManager(TestE2cLab): def test_get_environment(self): self.assertEqual(self.monit_iot.get_environment(), Environment.IOT_LAB) + + +class TestKwollectManager(TestE2cLab): + + def setUp(self): + config = { + METRICS: [ + "wattmetre_power_watt", + "prom_default_metrics", + "pdu_outlet_power_watt", + ], + STEP: "launch", + } + self.kwo_manager = MonitoringKwollectManager(config) + + def test_schema(self): + badconf = {STEP: "notastep"} + with self.assertRaises(E2clabConfigError): + MonitoringKwollectManager(badconf) + + def test_get_environment(self): + self.assertEqual(self.kwo_manager.get_environment(), Environment.G5K) + + def test_get_metrics_str(self): + metrics_str = self.kwo_manager._get_metrics_str() + excepted = "wattmetre_power_watt,prom_default_metrics,pdu_outlet_power_watt" + self.assertEqual(metrics_str, excepted) + + def test_filter_site_nodes(self): + + self.kwo_manager.agent = [ + Host("paradoxe-1.rennes.grid5000.fr"), + Host("paradoxe-2.rennes.grid5000.fr"), + Host("ecotype-1.nantes.grid5000.fr"), + Host("anothernode.test.com"), + ] + self.assertEqual( + "paradoxe-1,paradoxe-2", self.kwo_manager._filter_site_nodes("rennes") + ) + self.assertEqual("ecotype-1", self.kwo_manager._filter_site_nodes("nantes")) + self.assertIsNone(self.kwo_manager._filter_site_nodes("nancy")) diff --git a/e2clab/tests/unit/test_probe.py b/e2clab/tests/unit/test_probe.py new file mode 100644 index 0000000000000000000000000000000000000000..84e22aab93eef6ba55cd4baeb918d7b0d9998944 --- /dev/null +++ b/e2clab/tests/unit/test_probe.py @@ -0,0 +1,31 @@ +""" +Testing the probe.py module +""" + +import shutil + +from e2clab.probe import TaskProbe + +from . import TestE2cLab + + +class TestTaskProbe(TestE2cLab): + + @classmethod + def setUpClass(cls): + cls.env = cls.test_folder / "test_pickle" + cls.env_object = cls.env / "test" + cls.env.mkdir() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.env) + + def test_singleton(self): + probe = TaskProbe() + probe2 = TaskProbe() + self.assertEqual(id(probe2), id(probe)) + + # TODO: test __setstate__ + def test_setstate(self): + pass