diff --git a/devel/dumprestore_metrics.py b/devel/dumprestore_metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..ddc130b6848550ed32c9995ace22a41c2dc984cf --- /dev/null +++ b/devel/dumprestore_metrics.py @@ -0,0 +1,202 @@ +#!/usr/bin/python3 + +import datetime +import os +import subprocess + +from pyonf import pyonf + +config = """ +debug: False +dry_run: False +dbhost: kwollect +db: kwdb +user: kwuser +password: changeme +upto_date: "2020-06-30" +from_date: "" +mode: dump +jobs: 1 +""" +config = pyonf(config) + +db_uri = f"postgresql://{config['user']}:{config['password']}@{config['dbhost']}/{config['db']}" + +fname = lambda date: f"{date}.csv.zstd" + +# The number of jobs must divide 24 +while 24 % config["jobs"] != 0: + config["jobs"] -= 1 + + +def sh(cmd, debug=config["debug"], dry_run=config["dry_run"]): + if debug or dry_run: + print(f"+{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}: {cmd}") + r = None + if not dry_run: + try: + r = ( + subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #capture_output=True, + .stdout.decode() + .strip() + ) + except subprocess.CalledProcessError as e: + if e.stdout: + print("> " + "> ".join(e.stdout.decode().splitlines())) + raise e + if debug and r: + print("> " + "> ".join(r.splitlines())) + return r + + +def dump(date): + def pg_cmd(begin, end): + return ( + f"psql {db_uri} " + '-c "SET statement_timeout=0; ' + "COPY (SELECT * FROM metrics " + f"WHERE timestamp >= '{begin}' AND timestamp < '{end}' " + 'ORDER BY timestamp ASC) TO STDOUT csv" ' + ) + + cmds = [] + for i in range(config["jobs"]): + start_date = datetime.datetime.combine(date, datetime.datetime.min.time()) + datetime.timedelta(hours=i * 24 / config["jobs"]) + end_date = start_date + datetime.timedelta(hours=24 / config["jobs"]) + cmds.append(f"( {pg_cmd(start_date, end_date)} > {date}_{i}.csv )") + sh(" & ".join(cmds)) + + files = " ".join(f"{date}_{i}.csv" for i in range(config["jobs"])) + sh(f"nohup sh -c \"(zstd -T0 -10 -f {files} -o {date}.csv.zstd.tmp && mv {date}.csv.zstd.tmp {date}.csv.zstd && rm {files})\" >/dev/null 2>&1 &") # && rm {files}") + + +def check_dumped_old(date): + if not os.path.isfile(fname(date)): + return "todo" + else: + out = sh(f"zstdcat {fname(date)} | head -n1") + if not out or not out.split(",")[0].split()[1].startswith("00:00:"): + print(f"First line of {fname(date)} does not start with 00:00") + return "clean" + else: + return "done" + + + +def check_dumped(date): + if not os.path.isfile(fname(date)): + return "todo" + elif not os.path.isfile(fname(date)+".dumped"): + pg_cmd = ( + f"psql -t {db_uri} " + '-c "SET statement_timeout=0; ' + "SELECT COUNT(*) FROM metrics " + f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\"" + ) + db_c = sh(pg_cmd) + f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l") + if db_c != f_c: + print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, dumping for {date}") + return "clean" + else: + sh(f"touch {fname(date)}.dumped") + return "done" + else: + return "done" + + +def check_restored(date): + if sh(f"ls {fname(date)}.restored || true") == f"{fname(date)}.restored": + return "done" + pg_cmd = ( + f"psql -t {db_uri} " + '-c "SET statement_timeout=0; ' + "SELECT COUNT(*) FROM metrics " + f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\"" + ) + db_c = sh(pg_cmd) + f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l") + if db_c != f_c: + if int(db_c) != 0: + print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, restoring for {date}") + return "clean" + else: + print(f"Restoring {f_c} metrics for {date}") + return "todo" + else: + sh(f"touch {fname(date)}.restored") + return "done" + + +def clean_restored(date): + tries = 0 + while tries < 50: + tries += 1 + c = sh( + f"psql {db_uri} " + '-c "SET statement_timeout=0; ' + "DELETE FROM metrics " + f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\" || true" + ) + print(c) + if c.endswith("as it is compressed"): + chunk = c.split('"')[1] + sh( + f"psql -t {db_uri} " + '-c "SET statement_timeout=0; ' + f"SELECT decompress_chunk('_timescaledb_internal.{chunk}')\"" + ) + elif c.startswith("DELETE "): + break + + +def restore(date): + if os.path.isfile(fname(date)): + sh( + f"(zstdcat -T0 {fname(date)} | timescaledb-parallel-copy --table metrics --workers {config['jobs']} --reporting-period 30s --connection {db_uri}) && touch {fname(date)}.restored" + ) + sh("for c in " + f"$(psql -t {db_uri} -c " + "\"select chunk_schema || '.' || chunk_name from timescaledb_information.chunks where hypertable_name = 'metrics' and is_compressed = 'f' " + f"and range_start < '{datetime.date.fromisoformat(config['from_date'])}' order by range_start asc offset 2\"); " + f"do psql -t {db_uri} -c " + "\"set statement_timeout = 0; select compress_chunk('$c', true)\" || true; done", + ) + else: + raise Exception(f"No file found for {date}") + + + +def main(mode): + upto_date = datetime.date.fromisoformat(config["upto_date"]) + if config["from_date"]: + cur_date = datetime.date.fromisoformat(config["from_date"]) + else: + cur_date = datetime.date.today() - datetime.timedelta(days=1) + + if mode=="dump": + check, clean, do = check_dumped, lambda x: None, dump + elif mode=="restore": + check, clean, do = check_restored, clean_restored, restore + + print(f"{mode}ing data from {cur_date} to {upto_date}") + + while cur_date != upto_date: + print(f"Processing metrics for {cur_date}") + state = check(cur_date) + if state == "done": + print(f"Metrics already {mode}ed for {cur_date}") + else: + if state == "clean": + print(f"Data exists for {cur_date}, but check failed") + clean(cur_date) + print(f"{mode}ing metrics for {cur_date}") + do(cur_date) + cur_date -= datetime.timedelta(days=1) + + +if __name__ == "__main__": + mode=config['mode'] + if mode not in ("dump", "restore"): + raise Exception("Mode must be 'dump' or 'restore'") + main(mode)