Slight revision of the Threshold buffers
Experiments with heat-pde at scale brought to light the presence of a racing condition with the ThresholdQueue
.
This MR aims at revising the buffer
module in the hopes of solving this issue.
As indicated in the code, the ThresholdQueue
derives from the official Python Queue
which instantiates the following attributes:
-
self.mutex = threading.Lock()
, -
self.not_full = threading.Condition(self.mutex)
,self.not_empty = threading.Condition(self.mutex)
.
For the ThresholdQueue
, this justifies the introduction of self.enough_data = threading.Condition(self.mutex)
which replaces self.not_empty
. Since no put/get
method depends on the latter it was removed from both ThresholdQueue
and ThresholdReservoirQueue
for clarity.
Another specificity of the ThresholdQueue
lies in the need to set self.threshold
to 0 as soon as the reception is over. This must be done by first getting self.mutex
, the lock common to all threading conditions, and then by notifying self.enough_data
, the condition on which the get
method may be waiting. This MR adds this notification which was omitted in ThresholdQueue
only.
It is unclear if the racing condition is actually due to this omission or not since many experiments were successfully performed before without any problem.