diff --git a/kwollect/kwollector.py b/kwollect/kwollector.py index 6ddf17df44a173c365439ca103e8c86d5f2647be..3fafcd2197289fb699b9a50234228079ae22cb8d 100755 --- a/kwollect/kwollector.py +++ b/kwollect/kwollector.py @@ -16,19 +16,23 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional from urllib.parse import urlparse, unquote +from jsonpath_ng.ext import parse as jsonpath_parse import yaml import asyncpg import aiosnmp import aiohttp + from pyonf import pyonf try: from orjson import dumps + from orjson import loads as json_loads json_dumps = lambda x: dumps(x).decode() except ImportError: from json import dumps as json_dumps + from json import loads as json_loads config = """ @@ -39,6 +43,7 @@ db_user: kwuser db_password: changeme log_level: warning worker: 8 +ssl_verify: False """ config = pyonf(config) @@ -134,7 +139,7 @@ class MetricDescription: path_template: Optional[str] = None def __post_init__(self): - _url = urlparse(self.url) + _url = urlparse(self.url, allow_fragments=False) self.device = MetricDevice( hostname=_url.hostname, protocol=_url.scheme, @@ -288,6 +293,8 @@ async def process_host_metrics(device, metrics): process_method = process_ipmisensor_host elif device.protocol == "prometheus": process_method = process_prometheus_host + elif device.protocol in ("http", "https"): + process_method = process_http_host else: log.error("Unsupported protocol for device %s", device) return @@ -387,6 +394,62 @@ async def process_ipmisensor_host(device, metrics): return metrics, results +async def process_http_host(device, metrics): + """Process one query for metrics exported on HTTP / JSON""" + + resp_by_path = {} + values = [None] * len(metrics) + timestamp = time.time() + + for idx, metric in enumerate(metrics): + url = "#".join(metric.url.split("#")[:-1]) if "#" in metric.url else metric.url + jsonpath_expr = metric.url.split("#")[-1] if "#" in metric.url else None + if jsonpath_expr: + try: + jsonpath_expr = jsonpath_parse(jsonpath_expr) + except Exception: + log.warning( + "Cannot parse JSONPath %s from %s, skipping metrics %s", + jsonpath_expr, + metric.url, + metric.name, + ) + log.warning(traceback.format_exc()) + continue + + # We store query results for each url we need to query + if url not in resp_by_path: + resp = await make_http_request(url) + if resp: + resp_by_path[url] = resp + else: + log.warning("Cannot fetch HTTP metric at %s, skipping", metric.url) + continue + + if not jsonpath_expr: + values[idx] = resp_by_path[url] + else: + matches = jsonpath_expr.find(json_loads(resp_by_path[url])) + if not matches: + log.warning( + "Cannot find values for JSONPATH at %s, skipping metrics %s", + metric.url, + metric.name, + ) + val = None + else: + if len(matches) > 1: + log.warning( + "Several values found from JSONPATH at %s for metric %s, only keeping the first one", + metric.url, + metric.name, + ) + val = matches[0].value + values[idx] = val + + return metrics, [(timestamp, value) for value in values] + + prom_re = re.compile(r"(\w+)({?.*}?) (.*)") promlabel_re = re.compile(r"(\w+)=\"(.*?)\"") @@ -712,6 +775,26 @@ def init_ipmi(): ipmi_executor = concurrent.futures.ProcessPoolExecutor(max_workers=4*config["worker"]) +async def make_http_request(url): + try: + async with aiohttp.ClientSession() as http_session: + async with http_session.get( + url, ssl=config.get("ssl_verify", True) + ) as resp: + assert resp.status == 200 + return await resp.text() + except aiohttp.ClientConnectorError as ex: + log.warning("Cannot connect to HTTP server at %s (%s)", url, ex) + log.debug(traceback.format_exc()) + except aiohttp.ServerDisconnectedError as ex: + log.warning("Disconnected from HTTP server at %s (%s)", url, ex) + log.debug(traceback.format_exc()) + except AssertionError: + log.warning("Wrong HTTP return code at %s: %s", url, resp.status) + log.debug(traceback.format_exc()) + return None + + async def make_snmp_request( host, snmp_command, oids, community="public", timeout=30, retries=1 ): diff --git a/setup.py b/setup.py index 7afa52c61b171b39212574e8a22c50e51f8f9115..947c0d46f14f96cfd3a42c055d5916d7a800bd18 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ setuptools.setup( "asyncpg", "aiohttp", "aiosnmp", + "jsonpath-ng", ] + (["orjson"] if get_pip_version() >= (19, 3) else []), include_package_data=True, package_data={"kwollect": ["db/*", "grafana/*"]},