Mentions légales du service

Skip to content
Snippets Groups Projects
Commit bbcf8bd4 authored by IMBERT Matthieu's avatar IMBERT Matthieu
Browse files

[execo_engine] ParamSweeper: add __filtered_<done|inprogress|skipped>

parent 7fcf0ac7
No related merge requests found
...@@ -263,6 +263,37 @@ class ParamSweeper(object): ...@@ -263,6 +263,37 @@ class ParamSweeper(object):
self.__done = set() self.__done = set()
self.__inprogress = set() self.__inprogress = set()
self.__skipped = set() self.__skipped = set()
self.__filtered_done = set()
self.__filtered_inprogress = set()
self.__filtered_skipped = set()
# __filtered_done, __filtered_inprogress, __filtered_skipped
# are the intersections of __sweeps and __done, __inprogress,
# __skipped. They exist because:
#
# - client may call set_sweeps with different sweeps, still we
# want to remember the complete list of __done,
# __inprogress, __skipped
#
# - different ParamSweeper instances may share the same
# storage though having different individual
# __sweeps. __inprogress and __done on storage will be the
# union of all inprogress and done, so the ParamSweeper must
# be prepared to deal correctly with __inprogress and __done
# containing elements not in *its* __sweeps (and it must not
# discard them)
#
# - on the other hand, when displaying with __str__(),
# stats(), or when retrieving with get_done(),
# get_inprogress(), get_skipped(), client expects to get
# only lists of done, inprogress, skipped relative to the
# current __sweeps.
#
# - we could filter (do the intersection with __sweeps) only
# when displaying or returning the lists, but it's a costly
# operation to do the full intersection, whereas doing it
# incrementaly is fast.
self.__remaining = set() self.__remaining = set()
self.__done_filepos = None self.__done_filepos = None
...@@ -306,6 +337,9 @@ class ParamSweeper(object): ...@@ -306,6 +337,9 @@ class ParamSweeper(object):
inprogress_file.truncate(0) inprogress_file.truncate(0)
self.__inprogress.clear() self.__inprogress.clear()
self.__remaining = set(self.__sweeps).difference(self.__done, self.__skipped, self.__inprogress) self.__remaining = set(self.__sweeps).difference(self.__done, self.__skipped, self.__inprogress)
self.__filtered_done = self.__done.intersection(self.__sweeps)
self.__filtered_inprogress = self.__inprogress.intersection(self.__sweeps)
self.__filtered_skipped = self.__skipped.intersection(self.__sweeps)
def full_update(self): def full_update(self):
"""Reload completely the ParamSweeper state from disk (may take some time).""" """Reload completely the ParamSweeper state from disk (may take some time)."""
...@@ -329,6 +363,7 @@ class ParamSweeper(object): ...@@ -329,6 +363,7 @@ class ParamSweeper(object):
break break
self.__done.update(new_done) self.__done.update(new_done)
self.__remaining.difference_update(new_done) self.__remaining.difference_update(new_done)
self.__filtered_done.update(set(new_done).intersection(self.__sweeps))
inprogress_file.seek(0, os.SEEK_SET) inprogress_file.seek(0, os.SEEK_SET)
try: try:
self.__inprogress = pickle.load(inprogress_file) self.__inprogress = pickle.load(inprogress_file)
...@@ -336,6 +371,7 @@ class ParamSweeper(object): ...@@ -336,6 +371,7 @@ class ParamSweeper(object):
inprogress_file.truncate(0) inprogress_file.truncate(0)
self.__inprogress.clear() self.__inprogress.clear()
self.__remaining.difference_update(self.__inprogress) self.__remaining.difference_update(self.__inprogress)
self.__filtered_inprogress.intersection_update(self.__sweeps)
elif new_done_filepos < self.__done_filepos: elif new_done_filepos < self.__done_filepos:
self.__nolock_full_update(done_file, inprogress_file) self.__nolock_full_update(done_file, inprogress_file)
...@@ -355,9 +391,9 @@ class ParamSweeper(object): ...@@ -355,9 +391,9 @@ class ParamSweeper(object):
return "%s <%i total, %i done, %i skipped, %i in progress, %i remaining>" % ( return "%s <%i total, %i done, %i skipped, %i in progress, %i remaining>" % (
self.__name, self.__name,
len(self.__sweeps), len(self.__sweeps),
len(self.__done), len(self.__filtered_done),
len(self.__skipped), len(self.__filtered_skipped),
len(self.__inprogress), len(self.__filtered_inprogress),
len(self.__remaining)) len(self.__remaining))
def get_sweeps(self): def get_sweeps(self):
...@@ -370,7 +406,7 @@ class ParamSweeper(object): ...@@ -370,7 +406,7 @@ class ParamSweeper(object):
The returned iterable is a copy (safe to use without fearing The returned iterable is a copy (safe to use without fearing
concurrent mutations by another thread). concurrent mutations by another thread).
""" """
return self.__skipped.copy() return self.__filtered_skipped.copy()
def get_remaining(self): def get_remaining(self):
"""returns an iterable of current remaining *todo* elements """returns an iterable of current remaining *todo* elements
...@@ -386,7 +422,7 @@ class ParamSweeper(object): ...@@ -386,7 +422,7 @@ class ParamSweeper(object):
The returned iterable is a copy (safe to use without fearing The returned iterable is a copy (safe to use without fearing
concurrent mutations by another thread). concurrent mutations by another thread).
""" """
return self.__inprogress.copy() return self.__filtered_inprogress.copy()
def get_done(self): def get_done(self):
"""returns an iterable of currently *done* elements """returns an iterable of currently *done* elements
...@@ -394,7 +430,7 @@ class ParamSweeper(object): ...@@ -394,7 +430,7 @@ class ParamSweeper(object):
The returned iterable is a copy (safe to use without fearing The returned iterable is a copy (safe to use without fearing
concurrent mutations by another thread). concurrent mutations by another thread).
""" """
return self.__done.copy() return self.__filtered_done.copy()
def get_next(self, filtr = None): def get_next(self, filtr = None):
"""Return the next element which is *todo*. Returns None if reached end. """Return the next element which is *todo*. Returns None if reached end.
...@@ -419,6 +455,8 @@ class ParamSweeper(object): ...@@ -419,6 +455,8 @@ class ParamSweeper(object):
return None return None
self.__remaining.discard(combination) self.__remaining.discard(combination)
self.__inprogress.add(combination) self.__inprogress.add(combination)
if combination in self.__sweeps:
self.__filtered_inprogress.add(combination)
inprogress_file.truncate(0) inprogress_file.truncate(0)
pickle.dump(self.__inprogress, inprogress_file) pickle.dump(self.__inprogress, inprogress_file)
logger.info("%s new combination: %s", self.__name, combination) logger.info("%s new combination: %s", self.__name, combination)
...@@ -433,7 +471,10 @@ class ParamSweeper(object): ...@@ -433,7 +471,10 @@ class ParamSweeper(object):
self.__nolock_update(done_file, inprogress_file) self.__nolock_update(done_file, inprogress_file)
self.__remaining.discard(combination) self.__remaining.discard(combination)
self.__inprogress.discard(combination) self.__inprogress.discard(combination)
self.__filtered_inprogress.discard(combination)
self.__done.add(combination) self.__done.add(combination)
if combination in self.__sweeps:
self.__filtered_done.add(combination)
done_file.seek(0, os.SEEK_END) done_file.seek(0, os.SEEK_END)
pickle.dump(combination, done_file) pickle.dump(combination, done_file)
inprogress_file.truncate(0) inprogress_file.truncate(0)
...@@ -448,7 +489,10 @@ class ParamSweeper(object): ...@@ -448,7 +489,10 @@ class ParamSweeper(object):
with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file: with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file:
self.__nolock_update(done_file, inprogress_file) self.__nolock_update(done_file, inprogress_file)
self.__skipped.add(combination) self.__skipped.add(combination)
if combination in self.__sweeps:
self.__filtered_skipped.add(combination)
self.__inprogress.discard(combination) self.__inprogress.discard(combination)
self.__filtered_inprogress.discard(combination)
inprogress_file.truncate(0) inprogress_file.truncate(0)
pickle.dump(self.__inprogress, inprogress_file) pickle.dump(self.__inprogress, inprogress_file)
logger.info("%s combination skipped: %s", self.__name, combination) logger.info("%s combination skipped: %s", self.__name, combination)
...@@ -460,8 +504,10 @@ class ParamSweeper(object): ...@@ -460,8 +504,10 @@ class ParamSweeper(object):
with _openlock(os.path.join(self.__persistence_dir, "done")) as done_file: with _openlock(os.path.join(self.__persistence_dir, "done")) as done_file:
with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file: with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file:
self.__nolock_update(done_file, inprogress_file) self.__nolock_update(done_file, inprogress_file)
self.__remaining.add(combination) if combination in self.__sweeps:
self.__remaining.add(combination)
self.__inprogress.discard(combination) self.__inprogress.discard(combination)
self.__filtered_inprogress.discard(combination)
inprogress_file.truncate(0) inprogress_file.truncate(0)
pickle.dump(self.__inprogress, inprogress_file) pickle.dump(self.__inprogress, inprogress_file)
logger.info("%s combination cancelled: %s", self.__name, combination) logger.info("%s combination cancelled: %s", self.__name, combination)
...@@ -477,10 +523,12 @@ class ParamSweeper(object): ...@@ -477,10 +523,12 @@ class ParamSweeper(object):
with _openlock(os.path.join(self.__persistence_dir, "done")) as done_file: with _openlock(os.path.join(self.__persistence_dir, "done")) as done_file:
with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file: with _openlock(os.path.join(self.__persistence_dir, "inprogress")) as inprogress_file:
self.__nolock_update(done_file, inprogress_file) self.__nolock_update(done_file, inprogress_file)
self.__remaining.update(self.__skipped) self.__remaining.update(self.__filtered_skipped)
self.__skipped.clear() self.__skipped.clear()
self.__filtered_skipped.clear()
if reset_inprogress: if reset_inprogress:
self.__inprogress.clear() self.__inprogress.clear()
self.__filtered_inprogress.clear()
inprogress_file.truncate(0) inprogress_file.truncate(0)
pickle.dump(self.__inprogress, inprogress_file) pickle.dump(self.__inprogress, inprogress_file)
logger.info("%s reset", self.__name) logger.info("%s reset", self.__name)
...@@ -503,9 +551,9 @@ class ParamSweeper(object): ...@@ -503,9 +551,9 @@ class ParamSweeper(object):
with self.__lock: with self.__lock:
sweeps = self.get_sweeps() sweeps = self.get_sweeps()
remaining = self.get_remaining() remaining = self.get_remaining()
skipped = self.get_skipped().intersection(sweeps) skipped = self.get_skipped()
inprogress = self.get_inprogress().intersection(sweeps) inprogress = self.get_inprogress()
done = self.get_done().intersection(sweeps) done = self.get_done()
ctotal = count(sweeps) ctotal = count(sweeps)
cremaining = count(remaining) cremaining = count(remaining)
cskipped = count(skipped) cskipped = count(skipped)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment