Mentions légales du service

Skip to content
Snippets Groups Projects
Commit b5ccd0bb authored by Simon Delamare's avatar Simon Delamare
Browse files

Add dumprestore_metrics tool

parent 112f2620
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
import datetime
import os
import subprocess
from pyonf import pyonf
config = """
debug: False
dry_run: False
dbhost: kwollect
db: kwdb
user: kwuser
password: changeme
upto_date: "2020-06-30"
from_date: ""
mode: dump
jobs: 1
"""
config = pyonf(config)
db_uri = f"postgresql://{config['user']}:{config['password']}@{config['dbhost']}/{config['db']}"
fname = lambda date: f"{date}.csv.zstd"
# The number of jobs must divide 24
while 24 % config["jobs"] != 0:
config["jobs"] -= 1
def sh(cmd, debug=config["debug"], dry_run=config["dry_run"]):
if debug or dry_run:
print(f"+{datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}: {cmd}")
r = None
if not dry_run:
try:
r = (
subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #capture_output=True,
.stdout.decode()
.strip()
)
except subprocess.CalledProcessError as e:
if e.stdout:
print("> " + "> ".join(e.stdout.decode().splitlines()))
raise e
if debug and r:
print("> " + "> ".join(r.splitlines()))
return r
def dump(date):
def pg_cmd(begin, end):
return (
f"psql {db_uri} "
'-c "SET statement_timeout=0; '
"COPY (SELECT * FROM metrics "
f"WHERE timestamp >= '{begin}' AND timestamp < '{end}' "
'ORDER BY timestamp ASC) TO STDOUT csv" '
)
cmds = []
for i in range(config["jobs"]):
start_date = datetime.datetime.combine(date, datetime.datetime.min.time()) + datetime.timedelta(hours=i * 24 / config["jobs"])
end_date = start_date + datetime.timedelta(hours=24 / config["jobs"])
cmds.append(f"( {pg_cmd(start_date, end_date)} > {date}_{i}.csv )")
sh(" & ".join(cmds))
files = " ".join(f"{date}_{i}.csv" for i in range(config["jobs"]))
sh(f"nohup sh -c \"(zstd -T0 -10 -f {files} -o {date}.csv.zstd.tmp && mv {date}.csv.zstd.tmp {date}.csv.zstd && rm {files})\" >/dev/null 2>&1 &") # && rm {files}")
def check_dumped_old(date):
if not os.path.isfile(fname(date)):
return "todo"
else:
out = sh(f"zstdcat {fname(date)} | head -n1")
if not out or not out.split(",")[0].split()[1].startswith("00:00:"):
print(f"First line of {fname(date)} does not start with 00:00")
return "clean"
else:
return "done"
def check_dumped(date):
if not os.path.isfile(fname(date)):
return "todo"
elif not os.path.isfile(fname(date)+".dumped"):
pg_cmd = (
f"psql -t {db_uri} "
'-c "SET statement_timeout=0; '
"SELECT COUNT(*) FROM metrics "
f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\""
)
db_c = sh(pg_cmd)
f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l")
if db_c != f_c:
print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, dumping for {date}")
return "clean"
else:
sh(f"touch {fname(date)}.dumped")
return "done"
else:
return "done"
def check_restored(date):
if sh(f"ls {fname(date)}.restored || true") == f"{fname(date)}.restored":
return "done"
pg_cmd = (
f"psql -t {db_uri} "
'-c "SET statement_timeout=0; '
"SELECT COUNT(*) FROM metrics "
f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\""
)
db_c = sh(pg_cmd)
f_c = sh(f"zstdcat -T0 {fname(date)} | wc -l")
if db_c != f_c:
if int(db_c) != 0:
print(f"Number of metrics from DB != from file ({db_c} vs {f_c}, restoring for {date}")
return "clean"
else:
print(f"Restoring {f_c} metrics for {date}")
return "todo"
else:
sh(f"touch {fname(date)}.restored")
return "done"
def clean_restored(date):
tries = 0
while tries < 50:
tries += 1
c = sh(
f"psql {db_uri} "
'-c "SET statement_timeout=0; '
"DELETE FROM metrics "
f"WHERE timestamp >= '{date}' AND timestamp < '{date + datetime.timedelta(days=1)}'\" || true"
)
print(c)
if c.endswith("as it is compressed"):
chunk = c.split('"')[1]
sh(
f"psql -t {db_uri} "
'-c "SET statement_timeout=0; '
f"SELECT decompress_chunk('_timescaledb_internal.{chunk}')\""
)
elif c.startswith("DELETE "):
break
def restore(date):
if os.path.isfile(fname(date)):
sh(
f"(zstdcat -T0 {fname(date)} | timescaledb-parallel-copy --table metrics --workers {config['jobs']} --reporting-period 30s --connection {db_uri}) && touch {fname(date)}.restored"
)
sh("for c in "
f"$(psql -t {db_uri} -c "
"\"select chunk_schema || '.' || chunk_name from timescaledb_information.chunks where hypertable_name = 'metrics' and is_compressed = 'f' "
f"and range_start < '{datetime.date.fromisoformat(config['from_date'])}' order by range_start asc offset 2\"); "
f"do psql -t {db_uri} -c "
"\"set statement_timeout = 0; select compress_chunk('$c', true)\" || true; done",
)
else:
raise Exception(f"No file found for {date}")
def main(mode):
upto_date = datetime.date.fromisoformat(config["upto_date"])
if config["from_date"]:
cur_date = datetime.date.fromisoformat(config["from_date"])
else:
cur_date = datetime.date.today() - datetime.timedelta(days=1)
if mode=="dump":
check, clean, do = check_dumped, lambda x: None, dump
elif mode=="restore":
check, clean, do = check_restored, clean_restored, restore
print(f"{mode}ing data from {cur_date} to {upto_date}")
while cur_date != upto_date:
print(f"Processing metrics for {cur_date}")
state = check(cur_date)
if state == "done":
print(f"Metrics already {mode}ed for {cur_date}")
else:
if state == "clean":
print(f"Data exists for {cur_date}, but check failed")
clean(cur_date)
print(f"{mode}ing metrics for {cur_date}")
do(cur_date)
cur_date -= datetime.timedelta(days=1)
if __name__ == "__main__":
mode=config['mode']
if mode not in ("dump", "restore"):
raise Exception("Mode must be 'dump' or 'restore'")
main(mode)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment