Commit 52580155 authored by Andrei Paskevich's avatar Andrei Paskevich

Session_scheduler: rework the scheduler environment type

- drop the "running_check" queue, unused and unusable.

- drop the "running_proofs" list, maintained in the proof server now.

- send a proof task to the server as soon as it is added to
  "proof_attempts_queue" (still limited by 3 * maximum_running_proofs).

- process "proof_attempts_queue" in the timeout_handler.

- do not clear proof_attempts_queue on cancel_scheduled_proofs,
  since every task in it is already sent to the server, and we
  have no means at the moment to pass cancellation events to it.

  TODO: this probably should be supported: it is quite frustrating
  to have thirty proof tasks in proof_attempts_queue and not be able
  to stop those that have not started yet. To do this, we should
  decide whether we want cancel tasks one by one, or all at once,
  or both. A server should send a single answer for every task,
  which may be "ProverFinished" (if the cancellation request arrived
  too late), or "ProverInterrupted" (if we want to extend the
  "prover_update" type), or "ProverFinished (Unknown (Interrupted))"
  (if we want to extend the "reason_unknown" type).

IMPORTANT: schedule_any_timeout behaves differently now: all
such callbacks are immediately added to proof_attempts_queue,
and thus are executed on every timeout event, without being
limited by "maximum_running_proofs". This function is only
used in why3session_run, with a comment saying that it should
not be used there, so maybe we should drop this completely.
parent 8159c0d7
......@@ -85,7 +85,8 @@ type action =
| Action_delayed of (unit -> unit)
type timeout_action =
| Check_prover of (proof_attempt_status -> unit) * Call_provers.prover_call
| Check_prover of
(proof_attempt_status -> unit) * bool * Call_provers.prover_call
| Any_timeout of (unit -> bool)
type t =
......@@ -94,13 +95,9 @@ type t =
(** Quota of action slot *)
mutable maximum_running_proofs : int;
(** Running actions which take one action slot *)
mutable running_proofs : timeout_action list;
(** Running check which doesn't take a running slot.
Check the end of some computation *)
mutable running_check : (unit -> bool) list;
mutable running_proofs : int;
(** proof attempt that wait some available action slot *)
proof_attempts_queue :
((proof_attempt_status -> unit) * Call_provers.prover_call) Queue.t;
proof_attempts_queue : timeout_action Queue.t;
(** timeout handler state *)
mutable timeout_handler_activated : bool;
mutable timeout_handler_running : bool;
......@@ -118,8 +115,7 @@ let init max =
Prove_client.set_max_running_provers max;
{ actions_queue = Queue.create ();
maximum_running_proofs = max;
running_proofs = [];
running_check = [];
running_proofs = 0;
proof_attempts_queue = Queue.create ();
timeout_handler_activated = false;
timeout_handler_running = false;
......@@ -130,7 +126,7 @@ let notify_timer_state t continue =
O.notify_timer_state
(Queue.length t.actions_queue)
(Queue.length t.proof_attempts_queue)
(List.length t.running_proofs);
t.running_proofs;
continue
(* timeout handler *)
......@@ -140,44 +136,33 @@ let timeout_handler t =
assert (not t.timeout_handler_running);
t.timeout_handler_running <- true;
(* Check if some action ended *)
let l = List.fold_left
(fun acc c ->
match c with
| Check_prover(callback,call) ->
(match Call_provers.query_call call with
| Call_provers.NoUpdates
| Call_provers.ProverStarted -> c::acc
| Call_provers.ProverFinished res ->
callback (Done res);
acc)
| Any_timeout callback ->
let b = callback () in
if b then c::acc else acc)
[] t.running_proofs
in
(* Check if some new actions must be started *)
let l =
if List.length l < t.maximum_running_proofs then
begin try
let (callback,call) = Queue.pop t.proof_attempts_queue in
callback Running;
Debug.dprintf debug "[Sched] proof attempts started@.";
(Check_prover(callback,call))::l
with Queue.Empty -> l
end
else l
in
t.running_proofs <- l;
(* Call the running check *)
t.running_check <- List.fold_left
(fun acc check -> if check () then check::acc else acc)
[] t.running_check;
let q = Queue.create () in
while not (Queue.is_empty t.proof_attempts_queue) do
match Queue.pop t.proof_attempts_queue with
| Check_prover (callback,started,call) as c ->
begin match Call_provers.query_call call with
| Call_provers.NoUpdates ->
Queue.add c q
| Call_provers.ProverStarted when started ->
Queue.add c q (* should not happen *)
| Call_provers.ProverStarted ->
callback Running;
t.running_proofs <- t.running_proofs + 1;
Debug.dprintf debug "[Sched] proof attempts started@.";
Queue.add (Check_prover (callback,true,call)) q
| Call_provers.ProverFinished res ->
if started then t.running_proofs <- t.running_proofs - 1;
callback (Done res)
end
| Any_timeout callback as c ->
if callback () then Queue.add c q
done;
Queue.transfer q t.proof_attempts_queue;
let continue =
match l with
| [] ->
if Queue.is_empty t.proof_attempts_queue then begin
Debug.dprintf debug "[Sched] Timeout handler stopped@.";
false
| _ -> true
end else true
in
t.timeout_handler_activated <- continue;
t.timeout_handler_running <- false;
......@@ -193,23 +178,15 @@ let run_timeout_handler t =
let schedule_any_timeout t callback =
Debug.dprintf debug "[Sched] schedule a new timeout@.";
t.running_proofs <- (Any_timeout callback) :: t.running_proofs;
run_timeout_handler t
(* unused
let schedule_check t callback =
Debug.dprintf debug "[Sched] add a new check@.";
t.running_check <- callback :: t.running_check;
Queue.add (Any_timeout callback) t.proof_attempts_queue;
run_timeout_handler t
*)
(* idle handler *)
let idle_handler t =
try
if Queue.length t.proof_attempts_queue < 3 * t.maximum_running_proofs then
begin
match Queue.pop t.actions_queue with
begin match Queue.pop t.actions_queue with
| Action_proof_attempt(cntexample,limit,
old,inplace,command,driver,callback,goal) ->
begin
......@@ -218,7 +195,8 @@ let idle_handler t =
Driver.prove_task ?old ~cntexample ~inplace ~command
~limit driver goal
in
Queue.push (callback,call) t.proof_attempts_queue;
let pa = Check_prover (callback,false,call) in
Queue.push pa t.proof_attempts_queue;
run_timeout_handler t
with e when not (Debug.test_flag Debug.stack_trace) ->
Format.eprintf
......@@ -227,7 +205,7 @@ let idle_handler t =
callback (InternalFailure e)
end
| Action_delayed callback -> callback ()
end
end
else
usleep (float default_delay_ms /. 1000.);
notify_timer_state t true
......@@ -266,14 +244,17 @@ let cancel_scheduled_proofs t =
done
with Queue.Empty ->
Queue.transfer new_queue t.actions_queue;
(* NOTE: we cannot cancel proof attempts sent to the server *)
(*
try
while true do
let (callback,_) = Queue.pop t.proof_attempts_queue in
let (callback,_,_) = Queue.pop t.proof_attempts_queue in
callback Interrupted
done
with
| Queue.Empty ->
O.notify_timer_state 0 0 (List.length t.running_proofs)
*)
ignore (notify_timer_state t false)
let schedule_proof_attempt ~cntexample ~limit ?old ~inplace
......@@ -292,7 +273,7 @@ let schedule_edition t command filename callback =
Debug.dprintf debug "[Sched] Scheduling an edition@.";
let call = Call_provers.call_editor ~command filename in
callback Running;
t.running_proofs <- (Check_prover(callback, call)) :: t.running_proofs;
Queue.add (Check_prover(callback,false,call)) t.proof_attempts_queue;
run_timeout_handler t
let schedule_delayed_action t callback =
......
......@@ -94,12 +94,6 @@ module Make(O: OBSERVER) : sig
val init : int -> t
(** [init max] *)
(* not used
val schedule_check: t -> (unit -> bool) -> unit
(** test the check time to time, reschedule it if it returns true *)
*)
(* used by why3session_run, but it should not as it is a low-level scheduler
function *)
val schedule_any_timeout: t -> (unit -> bool) -> unit
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment