Commit a209cf9e authored by FRIEDEMANN Sebastian's avatar FRIEDEMANN Sebastian
Browse files

Reactivate speculative scheduling

parent f6a6a4bd
Pipeline #407482 failed with stage
in 30 minutes and 1 second
......@@ -25,7 +25,7 @@ def select_parent(runner_id, alpha, len_alpha, state_cache, R):
split_factors_bad = {} # try to schedule from this with lowest prio, stuff you may not split but we split it anyway before doing nothing
split_factors = {} # next highest prio, stuff that may be used regarding the split factor
split_factors_new = {} # highest prio, stuff that is not cached on any runner so far to possibly worksteal less...
# very highest prio is stuff that is already in the cache
# very highest prio is stuff that is already in the cache. this is managed up there.
# REM: within all those groups we try to select the one with the highest split factor (except for the very highes prio group.)
......
......@@ -205,14 +205,11 @@ class DueDates:
# job was already scheduled or running.
# scheduled, running or finished jobs are not stealable. so now it is stealable again...
# REM: at the moment there is no *stealing* but the name is still kept
if jid.t == assimilation_cycle: # don't remove jobs from alpha that were only speculative!
if jid.t == assimilation_cycle: # don't remove jobs from alpha that were only speculative! (we calculate alpha newly all the time anyway...
stealable_jobs += 1
len_alpha += 1
# FIXME: we mess around here a lot if the job was not in alpha but in alpha_2!
# stealable jobs could also be calculated from all jobs that are in the alpha list ;)
if pid not in alpha:
alpha[pid] = 0
alpha[pid] += 1
dict_remove(DueDates.rpp, pid, rid) # remove from rpp
dict_remove(DueDates.all_per_runner, rid, (dd, jid, pid))
......@@ -428,7 +425,7 @@ def accept_prefetch(msg):
parent_id, cachehit = select_parent(runner_id, alpha, StateCache, len(runners_last))
if parent_id:
trigger_select(runner_id, parent_id.id, cachehit)
trigger_select(runner_id, parent_id, cachehit)
job_id = generate_job_id(parent_id.t + 1)
job_to_parent_map[job_id] = parent_id
scheduled_jobs[runner_id] = (job_id, parent_id)
......@@ -502,7 +499,7 @@ scheduled_jobs = {}
def trigger_select(runner_id, state_id, was_cached):
if trigger.enabled:
now = time.time() - trigger.null_time
trigger_select.evts.append((now, runner_id, assimilation_cycle-1, state_id, was_cached))
trigger_select.evts.append((now, runner_id, state_id.t, state_id.id, was_cached))
trigger_select.evts = []
......@@ -564,9 +561,18 @@ def select_parent(runner_id, alpha, StateCache, R):
state_loads_wo_cache += 1
else:
if IS_SPECULATIVE:
parent_id, cachehit = scheduling_policy.select_parent(runner_id, alpha, len_alpha, StateCache, R)
alpha_2, _ = resample(assimilation_cycle)
alpha_2, _ = remove_weighted_particles(alpha_2)
alpha_2, _ = remove_duedate_particles(alpha_2)
P = len(alpha_2)
if P == 0:
return None, None
len_alpha_2 = count_members(alpha_2)
parent_id, cachehit = scheduling_policy.select_parent(runner_id, alpha_2, len_alpha_2, StateCache, R)
if parent_id:
len_alpha2 -= 1
# Stat counting
if not cachehit:
speculative_state_loads += 1
......@@ -603,7 +609,7 @@ def remove_duedate_particles(alpha_):
# Remove already scheduled particles
for _, entries in DueDates.due_dates.items():
for _, _, parent_id in entries:
if parent_id.t == assimilation_cycle and parent_id in alpha_:
if parent_id.t == assimilation_cycle and parent_id in out:
number_removed += 1
out[parent_id] -= 1
if out[parent_id] == 0:
......@@ -656,7 +662,7 @@ def handle_job_requests(launcher, nsteps):
parent_id, cachehit = select_parent(runner_id, alpha, StateCache, len(runners_last))
if parent_id:
trigger_select(runner_id, parent_id.id, cachehit)
trigger_select(runner_id, parent_id, cachehit)
# bookkeeping
if remove_from_alpha and \
parent_id.t == assimilation_cycle-1: # filter out speculative particles
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment