diff --git a/README.md b/README.md index e35da06e0311893563f40edcbff5f2bf485787c5..226a25886a4f11af7eb7eeb96a178b54f75eddf0 100644 --- a/README.md +++ b/README.md @@ -381,6 +381,12 @@ SELECT AVG(value)*8/1024/1024 AS job_network_avginput_mbps FROM jobmetrics WHERE SELECT MAX(value)*8/1024/1024 AS jobnode_network_maxinput_mbps FROM jobmetrics WHERE metric_id = 'network_ifaceout_bytes_total' ``` +An experimental feature also allows metrics to be inserted from inside a job, without requiring authentication by performing `POST` request on `insert_user_metrics` function. For instance, +``` +curl https://kwollect.host:3000/rpc/insert_user_metrics -X POST -H 'content-type: application/json' -d '{"metric_id": "my_custom_metric", "value": 42}' +``` +executed from a node XXX belonging to a running job, will add metric `{"metric_id": "my_custom_metric", "device_id": "XXX", "value": 42}` to Kwollect metrics database. + ## Optional metrics diff --git a/kwollect/db/kwollect_setup_db_oar.py b/kwollect/db/kwollect_setup_db_oar.py index 0ec818775b480895679bf917b61b2e13d55dd307..fed2c50285b004fac74382e9a4c402dfcdb9a747 100644 --- a/kwollect/db/kwollect_setup_db_oar.py +++ b/kwollect/db/kwollect_setup_db_oar.py @@ -15,6 +15,7 @@ oardb_port: 5432 oardb_name: oar2 oardb_user: oarreader oardb_password: read +max_user_metrics_per_min: 1000 """ config = pyonf(config) globals().update(config) @@ -73,6 +74,23 @@ CREATE OR REPLACE VIEW nodetime_by_oarjob AS CREATE OR REPLACE VIEW nodetime_by_job AS SELECT * FROM nodetime_by_oarjob; +DROP MATERIALIZED VIEW IF EXISTS oar_runningjobs; +CREATE MATERIALIZED VIEW oar_runningjobs AS + SELECT DISTINCT jobs.job_id, + to_timestamp(jobs.start_time::double precision) AS start_time, + split_part(resources.network_address::text, '.'::text, 1) AS node, + job_user AS user, + NOW() AS last_refresh + FROM oar.jobs, + oar.assigned_resources, + oar.resources + WHERE jobs.stop_time = 0 + AND jobs.state = 'Running' + AND jobs.assigned_moldable_job = assigned_resources.moldable_job_id + AND assigned_resources.resource_id = resources.resource_id + AND resources.type::text = 'default'::text; + + CREATE OR REPLACE VIEW promoted_metrics_job AS SELECT DISTINCT jobs.job_id, @@ -99,6 +117,152 @@ BEGIN RETURNING *; END; $$ LANGUAGE 'plpgsql'; + + +DROP FUNCTION api.insert_user_metrics; +CREATE OR REPLACE FUNCTION api.insert_user_metrics(JSON) +RETURNS INT AS $$ + +import time +import json +import socket +from plpy import spiexceptions + + +def get_orig_hostname_from_request_header(): + + req = "SELECT current_setting('request.header.x-forwarded-for')" + host_ip = "" + try: + host_ip = plpy.execute(req)[0] + host_ip = dict(host_ip).get("current_setting", "").split(",")[0] + assert host_ip != "" + except (spiexceptions.UndefinedObject, AssertionError) as ex: + raise Exception(f"Missing x-forwarded-for header (content: '{{host_ip}}'): {{ex}}") + + hostname_full = socket.gethostbyaddr(host_ip)[0] + hostname = hostname_full.split(".")[0] + if "-kavlan-" in hostname: + hostname = "-".join(hostname.split("-")[0:2]) + return hostname + + +def get_authenticated_user_from_request_header(): + req = "SELECT current_setting('request.header.x-api-user-cn')" + try: + res = plpy.execute(req)[0] + except spiexceptions.UndefinedObject as ex: + # No authentication header + return None + + user = dict(res).get("current_setting", "") + if user == "unknown": + return None + + return user + + +def get_current_reservations(cache_timeout=5): + + r = plpy.execute("SELECT extract(epoch from NOW()-last_refresh) as last_refresh FROM oar_runningjobs LIMIT 1") + if len(r) == 0 or r[0]["last_refresh"] > cache_timeout: + plpy.execute("REFRESH MATERIALIZED VIEW oar_runningjobs") + return plpy.execute("SELECT node,STRING_AGG(oar_runningjobs.user, ',') AS user FROM oar_runningjobs GROUP BY node") + + +def check_max_insert_count(user, insert_metrics_count, insert_time): + + MAX_REQ_TIME = 60 + MAX_REQ_COUNT = {max_user_metrics_per_min} + + if not SD.get("reqs"): + SD["reqs"] = {{}} + if not SD.get("reqs").get(user): + SD["reqs"][user] = {{}} + + previous_reqs = SD["reqs"][user] + + # previous_reqs is a dict = {{time1: req_count, time2: req_count, ...}} + # containing number of previous insertion mades at each timeX for user + # and ensuring that no timeX is older than MAX_REQ_TIME seconds + + for t in list(previous_reqs.keys()): + if t < insert_time - MAX_REQ_TIME: + del previous_reqs[t] + + previous_reqs_count = sum(previous_reqs.values()) + + if previous_reqs_count + insert_metrics_count > MAX_REQ_COUNT: + raise Exception(f"More than {{MAX_REQ_COUNT}} allowed metrics inserted during last {{MAX_REQ_TIME}} seconds") + + if not previous_reqs.get(int(insert_time)): + previous_reqs[int(insert_time)] = 0 + previous_reqs[int(insert_time)] += insert_metrics_count + + +metrics = json.loads(args[0]) +if not isinstance(metrics, list): + metrics = [metrics] + +time_int = round(time.time(), 6) +time_sql = plpy.execute(f"SELECT to_timestamp({{time_int}})")[0]["to_timestamp"] + +src_node = get_orig_hostname_from_request_header() +user = get_authenticated_user_from_request_header() or None # "user1,user2" if several jobs on same node + +existing_reservation = get_current_reservations() # uses a cache (5sec) + +allowed_nodes = set() + +for r in existing_reservation: + if user: + # Authenticated mode: all reserved nodes are allowed + if user in r["user"].split(","): + allowed_nodes.add(r["node"]) + else: + # Unauthenticated mode: find user from current jobs and request source node + if src_node == r["node"]: + user = r["user"] + allowed_nodes.add(r["node"]) + break +if not user: + raise Exception(f"Cannot insert metrics from host '{{src_node}}' which does not belong to a running job for unauthenticated request") + +# raise error if more than "max_user_metrics_per_min" metrics inserted during last 60 seconds by this user +check_max_insert_count(user, len(metrics), time_int) + +for metric in metrics: + if not metric.get("device_id"): + metric["device_id"] = src_node + if metric["device_id"] not in allowed_nodes: + raise Exception(f"Cannot insert metrics for device {{metric['device_id']}} which does not belong to '{{user}}' running jobs") + + if not metric.get("metric_id") or not metric.get("value"): + raise Exception("Missing mandatory 'metric_id' or 'value' in entry") + + if not metric.get("timestamp"): + metric["timestamp"] = time_sql + else: + timestamp_diff = plpy.execute(f"SELECT extract(epoch FROM '{{time_sql}}'::timestamp - '{{metric['timestamp']}}'::timestamp) AS timestamp_diff")[0]["timestamp_diff"] + if timestamp_diff < -5 or timestamp_diff > 3600: + raise Exception("Cannot insert metrics with timestamp in the future or older than 1 hour") + + if not metric.get("labels"): + metric["labels"] = {{}} + metric["labels"]["_insert_time"] = time_int + metric["labels"]["_insert_user"] = user + +req = plpy.prepare( + "INSERT INTO metrics(timestamp, device_id, metric_id, value, labels) " + "SELECT timestamp, device_id, metric_id, value, labels " + "FROM json_to_recordset($1) " + "AS metrics(timestamp TIMESTAMPTZ, device_id TEXT, " + "metric_id TEXT, value DOUBLE PRECISION, labels JSONB)", + ["json"]) +r = plpy.execute(req, [json.dumps(metrics)]) +plpy.warning(r, dir(r)) +return int(r.nrows()) +$$ LANGUAGE 'plpython3u' SECURITY DEFINER; """ ) print("Database setup for OAR integration done.")