Reduce the size of COCO experiment data
The snippet can be accessed without any authentication.
Authored by
Nikolaus Hansen
See docstring
below.
reduce_size.py 5.57 KiB
"""reduce the data size of a COCO experiment folder from the default logger.
Data are copied into a new folder and then reduced in place.
Usage::
python reduce_size.py folder_name
Further arguments denote the functions applied to the files, by default
``reduce_dat reduce_tdat remove_x``. Under *nix, the result can be check
with ``du -sh *``.
"""
import math
import os
import shutil
import sys
import time
number_of_ftargets = 20
final_ftarget = 1e-8
allowed_x_names = ['DIM2.', 'DIM3.', 'DIM5.']
def main(folder_name, apply=('reduce_dat', 'reduce_tdat', 'remove_x')):
"""`folder_name` contains output of a single COCO experiment, usually a folder in ``exdata``
"""
new_name = '{0}-{1}'.format(folder_name, time.strftime("%m%d%Hh%M%S"))
shutil.copytree(folder_name, new_name) # raise `FileExistsError` when new_name exists
for folder, dirs, files in os.walk(new_name):
# these dirs and files are in the current folder
for filename in files:
for transform in [globals()[n] for n in apply]:
if _condition(filename, transform): # transformation applies to this file type?
_rewrite(os.path.join(folder, filename), transform)
def is_comment(s):
return s.lstrip().startswith(('%', '#'))
def is_empty(s):
return len(s.strip()) == 0
def _condition(filename, transform):
"""should `transform` be applied to `filename`?
Call the condition function for this `transform` on `filename`.
"""
return {
remove_x: remove_x_condition,
reduce_tdat: reduce_tdat_condition,
reduce_dat: reduce_dat_condition,
}[transform](filename)
class TargetHit:
"""quick and dirty class to indicate target hits, reset if evals (first column) decrease"""
def __init__(self, number_of_targets, final_target):
self.number_of_targets = number_of_ftargets
self.final_target = final_ftarget
self.reset()
def reset(self):
self.current_target = math.inf
self.current_eval = -1
def __call__(self, line):
"""determine whether there is a target between last_f and line"""
s = line.split()
current_eval = int(s[0])
if current_eval < self.current_eval:
self.reset()
return True
self.current_eval = current_eval
new_f = float(line.split()[2])
return self.update_target(new_f)
def update_target(self, new_f):
"""return whether target was hit"""
if new_f > self.current_target:
return False
if self.current_target == 0:
return False
if new_f < self.final_target or new_f <= 0:
self.current_target = 0
return True
logf = math.log10(new_f)
t = math.floor(logf)
while t < logf:
t += 1. / self.number_of_targets
self.current_target = 10**(t - 1. / self.number_of_targets)
return True
def reduce_dat_condition(filename):
return filename.endswith('.dat')
def reduce_dat(lines):
"""return a new list with fewer lines, remove everything after a negative target was hit too"""
# % f evaluations | g evaluations | best noise-free fitness - Fopt (7.948000000000e+01) + sum g_i+ | measured fitness | best measured fitness or single-digit g-values | x1 | x2...
new_lines = []
overwrite = False # keep always last line
target_hit = TargetHit(number_of_ftargets, final_ftarget)
for line in lines:
if is_empty(line) or is_comment(line):
overwrite = False
target_hit.reset()
new_lines.append(line)
continue
if overwrite:
new_lines[-1] = line
else:
new_lines.append(line)
overwrite = not target_hit(line)
return new_lines
def reduce_tdat_condition(filename):
return filename.endswith('.tdat')
def reduce_tdat(lines):
"""return a new list with first and last data lines only"""
# % f evaluations | g evaluations | best noise-free fitness - Fopt (7.948000000000e+01) + sum g_i+ | measured fitness | best measured fitness or single-digit g-values | x1 | x2...
last_eval = -1
new_lines = []
for line in lines:
current_eval = line.split(maxsplit=1)[0]
if not current_eval or is_comment(line):
new_lines.append(line)
last_eval = -1 # reset and keep this and next line
continue
if last_eval > 1: # overwrite previous line
new_lines[-1] = line
else:
new_lines.append(line)
last_eval += 1
if last_eval:
last_eval = int(current_eval)
return new_lines
def remove_x_condition(filename):
"""define on which files to apply `remove_x`"""
if not filename.endswith(('.dat', '.mdat', '.tdat')):
return False
return not any(s in filename for s in allowed_x_names)
def remove_x(lines):
"""change lines in place, keep only first 5 entries of data lines"""
for i, line in enumerate(lines):
if is_comment(line):
continue
lines[i] = ' '.join(line.split()[:5]) + '\n'
return lines
def _rewrite(file_path, transform):
"""read in `file_path`, apply `transform` to its lines, and rewrite it"""
with open(file_path, 'r') as f:
lines = f.readlines()
lines = transform(lines)
with open(file_path, 'w') as f:
f.writelines(lines)
if __name__ == "__main__":
if len(sys.argv) < 2:
raise ValueError("need a folder name as argument")
elif len(sys.argv) == 2:
main(sys.argv[1])
else:
main(sys.argv[1], apply=sys.argv[2:])
Please register or sign in to comment