From f93bd4d9a8a21836d200d0136777044c6cfca09b Mon Sep 17 00:00:00 2001
From: tbadts <thomas.badts@inria.fr>
Date: Wed, 20 Nov 2024 17:17:52 +0100
Subject: [PATCH] Init kwollect manager

---
 e2clab/app.py                            |   5 +-
 e2clab/cli.py                            |   2 +-
 e2clab/constants/layers_services.py      |   1 +
 e2clab/errors.py                         |   5 +
 e2clab/experiment.py                     |  19 ++-
 e2clab/managers/__init__.py              |   2 +
 e2clab/managers/manager.py               |   9 ++
 e2clab/managers/monitoring_kwollect.py   | 145 +++++++++++++++++++++++
 e2clab/probe.py                          |  77 ++++++++++++
 e2clab/providers/plugins/G5k.py          |  13 +-
 e2clab/schemas/layers_services_schema.py |   5 +
 e2clab/tests/unit/test_managers.py       |  42 +++++++
 e2clab/tests/unit/test_probe.py          |  31 +++++
 13 files changed, 347 insertions(+), 9 deletions(-)
 create mode 100644 e2clab/managers/monitoring_kwollect.py
 create mode 100644 e2clab/probe.py
 create mode 100644 e2clab/tests/unit/test_probe.py

diff --git a/e2clab/app.py b/e2clab/app.py
index 78c995c8..79c3c302 100644
--- a/e2clab/app.py
+++ b/e2clab/app.py
@@ -26,6 +26,7 @@ from e2clab.constants.workflow import (
     SELF_PREFIX,
     SERV_SELECT,
     TARGET,
+    TASK_FINALIZE,
 )
 from e2clab.errors import E2clabFileError
 from e2clab.grouping import get_grouping
@@ -148,7 +149,7 @@ class App:
         self.logger.debug(f"[TASK CONFIG] {filtered_config}")
 
         # Iterate on each hosts defnition
-        # TODO: check performance impact
+        # TODO: check performance impact and maybe run a single ansible play
         for host_task_conf in filtered_config:
 
             self.logger.debug(f"[HOST TASK CONF] {host_task_conf}")
@@ -172,7 +173,7 @@ class App:
         Returns:
             working_dir (Path): Path to the current working directory
         """
-        if task != "finalize":
+        if task != TASK_FINALIZE:
             working_dir = self.artifacts_dir
         else:
             working_dir = self.app_dir
diff --git a/e2clab/cli.py b/e2clab/cli.py
index 3b72aae3..1c7bbaea 100644
--- a/e2clab/cli.py
+++ b/e2clab/cli.py
@@ -98,7 +98,7 @@ def cli(debug: bool, mute_enoslib: bool, mute_ansible: bool):
         level=logging.DEBUG if debug else logging.INFO,
         enable_enoslib=not mute_enoslib,
         mute_ansible=mute_ansible,
-        file_handler=False,
+        file_handler=False,  # File handler set later in exp
         markup=True,
     )
     logger.debug(f"Loaded information from '{ENV_FILE}': {loaded_dotenv}")
diff --git a/e2clab/constants/layers_services.py b/e2clab/constants/layers_services.py
index 7db57357..11899ec8 100644
--- a/e2clab/constants/layers_services.py
+++ b/e2clab/constants/layers_services.py
@@ -34,6 +34,7 @@ WALLTIME = "walltime"
 RESERVATION = "reservation"
 ENV_NAME = "env_name"
 QUANTITY = "quantity"
+MONITOR = "monitor"
 SERVICES = "services"
 NAME = "name"
 IMAGE = "image"
diff --git a/e2clab/errors.py b/e2clab/errors.py
index 178ab07e..e333d839 100644
--- a/e2clab/errors.py
+++ b/e2clab/errors.py
@@ -1,3 +1,8 @@
+"""
+E2Clab error module
+"""
+
+
 class E2clabError(Exception):
     pass
 
diff --git a/e2clab/experiment.py b/e2clab/experiment.py
index a2051acc..04c3e8cc 100644
--- a/e2clab/experiment.py
+++ b/e2clab/experiment.py
@@ -35,6 +35,7 @@ from e2clab.errors import E2clabError
 from e2clab.infra import Infrastructure
 from e2clab.log import config_file_logger, get_logger
 from e2clab.network import Network
+from e2clab.probe import TaskProbe
 
 
 class Experiment:
@@ -68,9 +69,12 @@ class Experiment:
         self.net = None
         self.app = None
 
+        self.probe = TaskProbe()
+
         self.experiment_dir = None
 
     def __setstate__(self, state):
+        """Ran when unpickling"""
         self.__dict__.update(state)
         # re-configure loggers
         config_file_logger(self.experiment_dir)
@@ -176,10 +180,16 @@ class Experiment:
 
         # Enforce task
         self.logger.info(f"Enforcing workflow:{task}")
-
-        self.app.run_task(task=task, current_repeat=self.repeat)
+        # self.app.run_task(task=task, current_repeat=self.repeat)
+        self.run_task(task=task, current_repeat=self.repeat)
         self.logger.info(f"Done enforcing workflow:{task}")
 
+    def run_task(self, task: str, current_repeat: int):
+        """Wrapper for application run_task"""
+        self.probe.set_start(task_name=task)
+        self.app.run_task(task=task, current_repeat=current_repeat)
+        self.probe.set_end(task_name=task)
+
     def finalize(self, app_conf: str = None, destroy: bool = False) -> None:
         """
         Finalize experiment
@@ -197,7 +207,8 @@ class Experiment:
 
         self.logger.info("Finalizing experiment")
         self.logger.info("Running workflow 'finalize'")
-        self.app.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat)
+        # self.app.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat)
+        self.run_task(workflow_const.TASK_FINALIZE, current_repeat=self.repeat)
         self.logger.info("Finalizing layers and services")
         self.infra.finalize(output_dir=output_dir)
         self.logger.info("Done finalizing experiment")
@@ -296,7 +307,7 @@ class Experiment:
     def get_exp_dir(self) -> Path:
         return self.experiment_dir
 
-    # TODO: refactor
+    # TODO: refactor using managers to return information
     def _dump_application_parameters(self) -> None:
         """
         Generates a file with a list of User-Defined Services to be used by the user in
diff --git a/e2clab/managers/__init__.py b/e2clab/managers/__init__.py
index 1df54b31..3707a81b 100644
--- a/e2clab/managers/__init__.py
+++ b/e2clab/managers/__init__.py
@@ -4,6 +4,7 @@ from enum import Enum
 from .manager import Manager
 from .monitoring import MonitoringManager
 from .monitoring_iot import MonitoringIoTManager
+from .monitoring_kwollect import MonitoringKwollectManager
 from .provenance import ProvenanceManager
 
 
@@ -11,3 +12,4 @@ class Managers(Enum):
     PROVENANCE = ProvenanceManager
     MONITORING = MonitoringManager
     MONITORING_IOT = MonitoringIoTManager
+    KWOLLECT = MonitoringKwollectManager
diff --git a/e2clab/managers/manager.py b/e2clab/managers/manager.py
index 9b137369..27785899 100644
--- a/e2clab/managers/manager.py
+++ b/e2clab/managers/manager.py
@@ -38,6 +38,15 @@ class Manager(ABC):
         if not self._validate_config(self.config):
             raise E2clabConfigError
 
+        self.host: Optional[Host] = None
+        # Iterable[Host] ?
+        self.agent: Optional[Roles] = None
+        self.networks: Optional[Networks] = None
+        self.artifacts_dir: Optional[Path] = None
+        self.provider = None
+        self.meta: dict = {}
+        self.service = None
+
     def init(
         self,
         roles: Optional[Roles] = None,
diff --git a/e2clab/managers/monitoring_kwollect.py b/e2clab/managers/monitoring_kwollect.py
new file mode 100644
index 00000000..1d3d2f1a
--- /dev/null
+++ b/e2clab/managers/monitoring_kwollect.py
@@ -0,0 +1,145 @@
+"""
+Kwollect monitoring manager
+"""
+
+import csv
+from typing import Optional
+
+import pytz
+from enoslib.infra.enos_g5k.g5k_api_utils import get_api_client
+
+from e2clab.constants import WORKFLOW_TASKS, Environment, WorkflowTasks
+from e2clab.log import get_logger
+from e2clab.managers.manager import Manager
+from e2clab.probe import TaskProbe
+
+METRICS = "metrics"
+STEP = "step"
+
+API_TZ = pytz.timezone("Europe/Paris")
+
+
+class MonitoringKwollectManager(Manager):
+    """
+    Kwollect monitoring manager class
+    """
+
+    logger = get_logger(__name__, ["KWOLLECT"])
+
+    SCHEMA = {
+        "type": "object",
+        "title": "Grid5000 kwollect monitoring Schema",
+        "properties": {
+            METRICS: {
+                "description": "Metrics to pull from job, '[all]' to pull all metrics",
+                "default": ["all"],
+                "type": "array",
+                "items": {"type": "string"},
+            },
+            STEP: {
+                "description": "Workflow step to monitor",
+                "type": "string",
+                "enum": WORKFLOW_TASKS,
+                "default": WorkflowTasks.LAUNCH.value,
+            },
+        },
+        "required": [METRICS],
+    }
+
+    CONFIG_KEY = "kwollect"
+    SERVICE_ROLE = None  # Not useful
+    # Not needed ? Do we take the whole job ?
+    ROLE = "k_monitor"
+
+    def create_service(self):
+        # No service to create
+        pass
+
+    def _deploy(self):
+        self.g5k_client = get_api_client()
+        self.jobs = self.provider.get_jobs()
+        for job in self.jobs:
+            site = job.site
+            id = job.uid
+            dash_addr = self._get_viz_address(site)
+            self.logger.info(
+                f"Access kwollect metric dashboard for job {id}: {dash_addr}"
+            )
+
+    def _backup(self, output_dir):
+        task_probe = TaskProbe.get_probe()
+        records = task_probe.get_records()
+
+        metrics = self._get_metrics_str()
+
+        step = self.config.get(STEP, WorkflowTasks.LAUNCH.value)
+
+        rec = records.get(step)
+        if not rec:
+            self.logger.warning(f"No timestamps found for {step}. Did you run it ?")
+            return
+
+        start_str = rec.start.astimezone(API_TZ).isoformat()
+        end_str = rec.end.astimezone(API_TZ).isoformat()
+
+        for job in self.jobs:
+            kwargs = {
+                "metrics": metrics,
+                "start_time": start_str,
+                "end_time": end_str,
+            }
+            if "all" in metrics:
+                kwargs.pop("metrics")
+            site = job.site
+            nodes_str = self._filter_site_nodes(site)
+            if nodes_str:
+                kwargs["nodes"] = nodes_str
+            else:
+                oarjobid = job.uid
+                kwargs["job_id"] = oarjobid
+            # API call
+            metrics = self.g5k_client.sites[site].metrics.list(**kwargs)
+            self._dump_metrics(metrics, output_dir=output_dir)
+
+    def _destroy(self):
+        # Nothing to do
+        pass
+
+    def get_environment(self) -> Environment:
+        """This manager only works for Grid5000"""
+        return Environment.G5K
+
+    def _get_metrics_str(self):
+        m_list = self.config[METRICS]
+        return ",".join(m_list)
+
+    def _filter_site_nodes(self, site: str) -> Optional[None]:
+        nodes = []
+        for agent in self.agent:
+            addr = agent.address
+            if site in addr and "grid5000" in addr:
+                nodes.append(addr.split(".")[0])
+
+        if len(nodes) > 0:
+            return ",".join(nodes)
+        else:
+            return None
+
+    @classmethod
+    def get_metrics_str(cls, config):
+        return ",".join(config[METRICS])
+
+    def _get_viz_address(self, site: str) -> str:
+        """Returns address to dashboard"""
+        addr = f"https://api.grid5000.fr/stable/sites/{site}/metrics/dashboard"
+        return addr
+
+    @staticmethod
+    def _dump_metrics(metrics, output_dir):
+        metrics_csv = list(map(lambda m: m.to_dict(), metrics))
+        keys = metrics_csv[0].keys()
+        out_file = output_dir / "test.csv"
+        with open(out_file, "w") as out:
+            dict_writer = csv.DictWriter(out, keys)
+            dict_writer.writeheader()
+            dict_writer.writerows(metrics_csv)
diff --git a/e2clab/probe.py b/e2clab/probe.py
new file mode 100644
index 00000000..d2740aa5
--- /dev/null
+++ b/e2clab/probe.py
@@ -0,0 +1,77 @@
+"""
+Probe singletons module
+"""
+
+from dataclasses import dataclass
+from datetime import datetime
+
+from e2clab.log import get_logger
+
+
+@dataclass
+class Record:
+    start: datetime
+    end: datetime
+
+
+class TaskProbe:
+    """
+    Pickle-resistant implementation of a singleton
+
+    Not an issue as un-pickling is done at the beginning of an e2clab process
+    """
+
+    _instance = None
+
+    def __init__(self):
+        self.logger = get_logger(__name__, ["PROBE"])
+
+        self._records: dict[str, Record] = {}
+
+    def __new__(cls):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+        return cls._instance
+
+    def __setstate__(self, state):
+        """
+        Ran when unpickling an instance
+        """
+        self.__dict__.update(state)
+        # restore unpickled probe as singleton
+        type(self)._instance = self
+
+    def set_start(self, task_name: str) -> None:
+        self._check_record(task_name)
+        timestmp = self._get_now()
+        self._records[task_name].start = timestmp
+
+        self.logger.debug(f"Set start timestamp for {task_name}")
+
+    def set_end(self, task_name: str) -> None:
+        self._check_record(task_name)
+        timestmp = self._get_now()
+        self._records[task_name].end = timestmp
+
+        self.logger.debug(f"Set start timestamp for {task_name}")
+
+    @staticmethod
+    def _get_now() -> datetime:
+        return datetime.now().astimezone()
+
+    def _check_record(self, task_name: str):
+        """Init record if doesn't exist"""
+        if task_name not in self._records:
+            self._records[task_name] = Record(start=None, end=None)
+
+    def get_task_record(self, task_name):
+        return self._records.get(task_name, None)
+
+    def get_records(self) -> dict[str, Record]:
+        return self._records
+
+    @classmethod
+    def get_probe(cls):
+        if not cls._instance:
+            cls._instance = cls()
+        return cls._instance
diff --git a/e2clab/providers/plugins/G5k.py b/e2clab/providers/plugins/G5k.py
index 4b745cfc..07c5fa82 100644
--- a/e2clab/providers/plugins/G5k.py
+++ b/e2clab/providers/plugins/G5k.py
@@ -17,6 +17,7 @@ from e2clab.constants.layers_services import (
     JOB_NAME,
     JOB_TYPE,
     KEY_NAME,
+    MONITOR,
     MONITORING_NETWORK_ROLE,
     MONITORING_SERVICE_ROLE,
     MONITORING_SVC,
@@ -56,6 +57,7 @@ class G5kConfig(ProviderConfig):
         self.env_name = self.env.get(ENV_NAME, None)
         self.cluster = self.env.get(CLUSTER, None)
         self.keyfile = self.env.get(KEY_NAME, default.SSH_KEYFILE)
+        self.monitor = self.env.get(MONITOR, None)
 
     def init(self, optimization_id: Optional[int] = None) -> None:
         """Initialize G5k configuration
@@ -75,6 +77,7 @@ class G5kConfig(ProviderConfig):
             reservation=self.reservation,
             walltime=self.walltime,
             key=self.keyfile,
+            monitor=self.monitor,
         )
         self.cluster_list = self._search_clusters_at_service_level()
         self.prod_network = self._create_production_network_for_clusters(
@@ -386,7 +389,7 @@ class G5k(Provider):
         with config_context(g5k_cache=False):
             provider = self._provider_g5k(self.optimization_id)
 
-        self.provider = provider
+        self.en_provider = provider
 
         roles, networks = provider.init()
         en.wait_for(roles)
@@ -422,7 +425,13 @@ class G5k(Provider):
         return roles, networks
 
     def destroy(self):
-        self.provider.destroy()
+        self.en_provider.destroy()
+
+    def get_sites(self) -> list[str]:
+        return self.en_provider.provider_conf.sites
+
+    def get_jobs(self):
+        return self.en_provider.jobs
 
     def _provider_g5k(self, optimization_id: Optional[int] = None) -> en.G5k:
         self.config.init(optimization_id)
diff --git a/e2clab/schemas/layers_services_schema.py b/e2clab/schemas/layers_services_schema.py
index 1d2d8bc0..485f2a69 100644
--- a/e2clab/schemas/layers_services_schema.py
+++ b/e2clab/schemas/layers_services_schema.py
@@ -17,6 +17,7 @@ from e2clab.constants.layers_services import (
     JOB_TYPE,
     KEY_NAME,
     LAYERS,
+    MONITOR,
     NAME,
     QUANTITY,
     RC_FILE,
@@ -128,6 +129,10 @@ g5k_schema: dict = {
             "description": "G5k firewall rules",
             "type": "array",
         },
+        MONITOR: {
+            "description": "Activate on demand metrics (e.g.`prom_.*`)",
+            "type": "string",
+        },
     },
 }
 
diff --git a/e2clab/tests/unit/test_managers.py b/e2clab/tests/unit/test_managers.py
index 3973f0ef..0f6fc000 100644
--- a/e2clab/tests/unit/test_managers.py
+++ b/e2clab/tests/unit/test_managers.py
@@ -39,6 +39,7 @@ from e2clab.errors import E2clabConfigError
 from e2clab.managers.manager import Manager
 from e2clab.managers.monitoring import MonitoringManager, MonitoringType
 from e2clab.managers.monitoring_iot import MonitoringIoTManager
+from e2clab.managers.monitoring_kwollect import METRICS, STEP, MonitoringKwollectManager
 from e2clab.managers.provenance import ProvenanceManager
 from e2clab.providers.plugins.Iotlab import Iotlab
 from e2clab.services import Provenance
@@ -354,3 +355,44 @@ class TestMonitoringIOTManager(TestE2cLab):
 
     def test_get_environment(self):
         self.assertEqual(self.monit_iot.get_environment(), Environment.IOT_LAB)
+
+
+class TestKwollectManager(TestE2cLab):
+
+    def setUp(self):
+        config = {
+            METRICS: [
+                "wattmetre_power_watt",
+                "prom_default_metrics",
+                "pdu_outlet_power_watt",
+            ],
+            STEP: "launch",
+        }
+        self.kwo_manager = MonitoringKwollectManager(config)
+
+    def test_schema(self):
+        badconf = {STEP: "notastep"}
+        with self.assertRaises(E2clabConfigError):
+            MonitoringKwollectManager(badconf)
+
+    def test_get_environment(self):
+        self.assertEqual(self.kwo_manager.get_environment(), Environment.G5K)
+
+    def test_get_metrics_str(self):
+        metrics_str = self.kwo_manager._get_metrics_str()
+        excepted = "wattmetre_power_watt,prom_default_metrics,pdu_outlet_power_watt"
+        self.assertEqual(metrics_str, excepted)
+
+    def test_filter_site_nodes(self):
+
+        self.kwo_manager.agent = [
+            Host("paradoxe-1.rennes.grid5000.fr"),
+            Host("paradoxe-2.rennes.grid5000.fr"),
+            Host("ecotype-1.nantes.grid5000.fr"),
+            Host("anothernode.test.com"),
+        ]
+        self.assertEqual(
+            "paradoxe-1,paradoxe-2", self.kwo_manager._filter_site_nodes("rennes")
+        )
+        self.assertEqual("ecotype-1", self.kwo_manager._filter_site_nodes("nantes"))
+        self.assertIsNone(self.kwo_manager._filter_site_nodes("nancy"))
diff --git a/e2clab/tests/unit/test_probe.py b/e2clab/tests/unit/test_probe.py
new file mode 100644
index 00000000..84e22aab
--- /dev/null
+++ b/e2clab/tests/unit/test_probe.py
@@ -0,0 +1,31 @@
+"""
+Testing the probe.py module
+"""
+
+import shutil
+
+from e2clab.probe import TaskProbe
+
+from . import TestE2cLab
+
+
+class TestTaskProbe(TestE2cLab):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.env = cls.test_folder / "test_pickle"
+        cls.env_object = cls.env / "test"
+        cls.env.mkdir()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.env)
+
+    def test_singleton(self):
+        probe = TaskProbe()
+        probe2 = TaskProbe()
+        self.assertEqual(id(probe2), id(probe))
+
+    # TODO: test __setstate__
+    def test_setstate(self):
+        pass
-- 
GitLab