diff --git a/devel/dumprestore_metrics.py b/devel/dumprestore_metrics.py index ddc130b6848550ed32c9995ace22a41c2dc984cf..e1b837305b53b33599daeb49ab96a7d58fe93dd6 100644 --- a/devel/dumprestore_metrics.py +++ b/devel/dumprestore_metrics.py @@ -36,7 +36,13 @@ def sh(cmd, debug=config["debug"], dry_run=config["dry_run"]): if not dry_run: try: r = ( - subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #capture_output=True, + subprocess.run( + cmd, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) # capture_output=True, .stdout.decode() .strip() ) @@ -61,32 +67,23 @@ def dump(date): cmds = [] for i in range(config["jobs"]): - start_date = datetime.datetime.combine(date, datetime.datetime.min.time()) + datetime.timedelta(hours=i * 24 / config["jobs"]) + start_date = datetime.datetime.combine( + date, datetime.datetime.min.time() + ) + datetime.timedelta(hours=i * 24 / config["jobs"]) end_date = start_date + datetime.timedelta(hours=24 / config["jobs"]) cmds.append(f"( {pg_cmd(start_date, end_date)} > {date}_{i}.csv )") sh(" & ".join(cmds)) files = " ".join(f"{date}_{i}.csv" for i in range(config["jobs"])) - sh(f"nohup sh -c \"(zstd -T0 -10 -f {files} -o {date}.csv.zstd.tmp && mv {date}.csv.zstd.tmp {date}.csv.zstd && rm {files})\" >/dev/null 2>&1 &") # && rm {files}") - - -def check_dumped_old(date): - if not os.path.isfile(fname(date)): - return "todo" - else: - out = sh(f"zstdcat {fname(date)} | head -n1") - if not out or not out.split(",")[0].split()[1].startswith("00:00:"): - print(f"First line of {fname(date)} does not start with 00:00") - return "clean" - else: - return "done" - + sh( + f'nohup sh -c "(zstd -T0 -10 -f {files} -o {date}.csv.zstd.tmp && mv {date}.csv.zstd.tmp {date}.csv.zstd && rm {files})" >/dev/null 2>&1 &' + ) # && rm {files}") def check_dumped(date): if not os.path.isfile(fname(date)): return "todo" - elif not os.path.isfile(fname(date)+".dumped"): + elif not os.path.isfile(fname(date) + ".dumped"): pg_cmd = ( f"psql -t {db_uri} " '-c "SET statement_timeout=0; ' @@ -96,7 +93,9 @@ def check_dumped(date): db_c = sh(pg_cmd) f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l") if db_c != f_c: - print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, dumping for {date}") + print( + f"Number of metrics from DB != from file ({db_c} vs {f_c}, dumping for {date}" + ) return "clean" else: sh(f"touch {fname(date)}.dumped") @@ -118,7 +117,9 @@ def check_restored(date): f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l") if db_c != f_c: if int(db_c) != 0: - print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, restoring for {date}") + print( + f"Number of metrics from DB != from file ({db_c} vs {f_c}, restoring for {date}" + ) return "clean" else: print(f"Restoring {f_c} metrics for {date}") @@ -155,7 +156,8 @@ def restore(date): sh( f"(zstdcat -T0 {fname(date)} | timescaledb-parallel-copy --table metrics --workers {config['jobs']} --reporting-period 30s --connection {db_uri}) && touch {fname(date)}.restored" ) - sh("for c in " + sh( + "for c in " f"$(psql -t {db_uri} -c " "\"select chunk_schema || '.' || chunk_name from timescaledb_information.chunks where hypertable_name = 'metrics' and is_compressed = 'f' " f"and range_start < '{datetime.date.fromisoformat(config['from_date'])}' order by range_start asc offset 2\"); " @@ -166,6 +168,50 @@ def restore(date): raise Exception(f"No file found for {date}") +def check_summarize(date): + pg_cmd = ( + f"psql -t {db_uri} " + '-c "SET statement_timeout=0; ' + "SELECT COUNT(*) FROM " + "(SELECT * FROM timescaledb_information.chunks " + f"WHERE hypertable_name = (SELECT materialization_hypertable_name FROM timescaledb_information.continuous_aggregates WHERE view_name = 'metrics_summary_new') " + f"AND range_start < '{date}' AND '{date}' < range_end " + "UNION ALL " + "SELECT * FROM timescaledb_information.chunks " + f"WHERE hypertable_name = (SELECT materialization_hypertable_name FROM timescaledb_information.continuous_aggregates WHERE view_name = 'metrics_summary_new') " + f"AND range_start < '{date + datetime.timedelta(days=1)}' AND '{date + datetime.timedelta(days=1)}' < range_end " + "UNION ALL " + "(SELECT * FROM timescaledb_information.chunks " + f"WHERE hypertable_name = (SELECT materialization_hypertable_name FROM timescaledb_information.continuous_aggregates WHERE view_name = 'metrics_summary_new') " + f"AND range_start > '{date + datetime.timedelta(days=1)}' " + " ORDER BY range_end ASC LIMIT 1)" + ") s\"" + ) + db_c = int(sh(pg_cmd, dry_run=False)) + if db_c != 3: + print(f"No metrics summary for {date} ({db_c} chunks only)") + return "todo" + else: + print(f"Found metrics summary chunk for {date}") + return "done" + + +def summarize(date): + sh( + f"echo \"SET statement_timeout TO 0; CALL refresh_continuous_aggregate('metrics_summary_new', '{date}', '{date + datetime.timedelta(days=1)}');\"" + f"| psql -t {db_uri}" + ) + sh( + "for c in " + f"$(psql -t {db_uri} -c " + "\"select chunk_schema || '.' || chunk_name from timescaledb_information.chunks " + f"WHERE hypertable_name = (SELECT materialization_hypertable_name FROM timescaledb_information.continuous_aggregates WHERE view_name = 'metrics_summary_new') " + "AND is_compressed = 'f' " + f"AND range_end < '{date}' ORDER BY range_start ASC\"); " + f"do psql -t {db_uri} -c " + "\"set statement_timeout = 0; select compress_chunk('$c', true)\" || true; done", + ) + def main(mode): upto_date = datetime.date.fromisoformat(config["upto_date"]) @@ -174,14 +220,16 @@ def main(mode): else: cur_date = datetime.date.today() - datetime.timedelta(days=1) - if mode=="dump": + if mode == "summarize": + check, clean, do = check_summarize, lambda x: None, summarize + elif mode == "dump": check, clean, do = check_dumped, lambda x: None, dump - elif mode=="restore": + elif mode == "restore": check, clean, do = check_restored, clean_restored, restore print(f"{mode}ing data from {cur_date} to {upto_date}") - while cur_date != upto_date: + while cur_date < upto_date: print(f"Processing metrics for {cur_date}") state = check(cur_date) if state == "done": @@ -192,11 +240,11 @@ def main(mode): clean(cur_date) print(f"{mode}ing metrics for {cur_date}") do(cur_date) - cur_date -= datetime.timedelta(days=1) + cur_date += datetime.timedelta(days=1) if __name__ == "__main__": - mode=config['mode'] - if mode not in ("dump", "restore"): - raise Exception("Mode must be 'dump' or 'restore'") + mode = config["mode"] + if mode not in ("dump", "restore", "summarize"): + raise Exception("Mode must be 'dump' or 'restore' or 'summarize'") main(mode)