diff --git a/src/execo/action.py b/src/execo/action.py index 1a17db26bd645e7e9cf1bcc1eebb2fa87e6f8372..36ea2785922538aae14d2e59e8b11acffefbb9d5 100644 --- a/src/execo/action.py +++ b/src/execo/action.py @@ -31,7 +31,8 @@ from .utils import name_from_cmdline, non_retrying_intr_cond_wait, intr_event_wa from traceback import format_exc from .substitutions import get_caller_context, remote_substitute from .time_utils import get_seconds, format_date, Timer -import threading, time, pipes, tempfile, os, shutil, stat +import threading, time, pipes, tempfile, os, shutil, stat, functools + class ActionLifecycleHandler(object): @@ -314,11 +315,8 @@ class Action(object): self.kill() return False - def expect(self, regexes, timeout = False, stream = STDOUT, backtrack_size = 2000, start_from_current = False): - """searches the process output stream(s) for some regex. It mimics/takes ideas from Don Libes expect, or python-pexpect, but in parallel on several processes. - - It is an easier-to-use frontend for - `execo.process.ExpectOutputHandler`. + def expect(self, regexes, timeout=False, stream=STDOUT, backtrack=True, expect_output_handler=None): + """searches the process output stream(s) for some regex. It mimics/takes ideas from Don Libes expect, or python-pexpect, and does so on multiple processes running in parallel. It waits for a regex to match on all processes. Then it returns a list of tuples (process, regex index, match @@ -327,68 +325,50 @@ class Action(object): tuple is (process, None, None). The returned list has the same process sort order than self.processes. + It uses `execo.process.ProcessBase.expect`, see its + documentation for more details. + + It waits for a regex to match, then returns the tuple + (regex_index, match_object), or (None, None) if timeout + reached, or if eof reached, or stream in error, without any + match. + It uses thread local storage such that concurrent expects in parallel threads do not interfere which each other. - :param regexes: a regex or list of regexes. May be given as string - or as compiled regexes. + :param regexes: a regex or list of regexes. May be given as + string or as compiled regexes (If given as compiled regexes, + do not forget flags, most likely re.MULTILINE. regex passed + as string are compiled with re.MULTILINE) :param timeout: wait timeout after which it returns (None, None) if no match was found. If False (the default): use the - default expect timeout. If None: no timeout. + default expect timeout. If None: no timeout. If 0: return + immediately, no blocking if no match found. :param stream: stream to monitor for this process, STDOUT or STDERR. - :param backtrack_size: Each time some data is received, this - ouput handler needs to perform the regex search not only on - the incoming data, but also on the previously received data, - or at least on the last n bytes of the previously received - data, because the regex may match on a boundary between what - was received in a previous read and what is received in the - incoming read. These n bytes are the backtrack_size. (for - special cases: if backtrack_size == None, the regex search - is always done on the whole received data, but beware, this - is probably not what you want) - - :param start_from_current: boolean. If True: when a process is - monitored by this handler for the first time, the regex - matching is started from the position in the stream at the - time that this output hander starts receiving data. If - False: when a process is monitored by this handler for the - first time, the regex matching is started from the beginning - of the stream. + :param backtrack: If True (the default), the first expect is + done from the beginning of the process start. If False, the + first expect is done from the next received process output. + + :param expect_output_handler: If not None, a specific + ExpectOutputHandler instance to use. Otherwise, if None (the + default), a thread local ExpectOutputHandler is + instanciated. """ if timeout == False: timeout = self.default_expect_timeout - countdown = Timer(timeout) - cond = threading.Condition() - num_found_and_list = [0, {}] - for p in self.processes: num_found_and_list[1][p] = (None, None) - def internal_callback(process, stream, re_index, match_object): - num_found_and_list[0] +=1 - num_found_and_list[1][process] = (re_index, match_object) - with cond: - cond.notify_all() - if self._thread_local_storage.expect_handler == None: - self._thread_local_storage.expect_handler = ExpectOutputHandler() - self._thread_local_storage.expect_handler.expect(regexes, - callback = internal_callback, - backtrack_size = backtrack_size, - start_from_current = start_from_current) - with cond: - for p in self.processes: - if stream == STDOUT: - p.stdout_handlers.append(self._thread_local_storage.expect_handler) - else: - p.stderr_handlers.append(self._thread_local_storage.expect_handler) - while (countdown.remaining() == None or countdown.remaining() > 0) and num_found_and_list[0] < len(self.processes): - non_retrying_intr_cond_wait(cond, countdown.remaining()) retval = [] + countdown = Timer(timeout) for p in self.processes: - if num_found_and_list[1][p][0] == None: - p._notify_expect_fail(regexes) - retval.append((p, num_found_and_list[1][p][0], num_found_and_list[1][p][1])) + if timeout: + p_timeout = max(0, countdown.remaining()) + else: + p_timeout = None + f = p.expect(regexes, timeout=p_timeout, stream=stream, backtrack=backtrack) + retval.append((p, f[0], f[1])) return retval def wait_any_actions(actions, timeout = None): diff --git a/src/execo/process.py b/src/execo/process.py index a658b599751ac4eea5ff7611a82c437a2bdc5869..d2fe5f8a7aaa4f925adcbc6fcd86b260990a7062 100644 --- a/src/execo/process.py +++ b/src/execo/process.py @@ -43,6 +43,13 @@ STDOUT = 1 """Identifier for the stdout stream""" STDERR = 2 """Identifier for the stderr stream""" +def stream_name(stream): + if stream == STDOUT: + return 'STDOUT' + elif stream == STDERR: + return 'STDERR' + else: + raise ValueError("Invalid stream %s" % (stream,)) class ProcessLifecycleHandler(object): @@ -196,133 +203,123 @@ class ExpectOutputHandler(ProcessOutputHandler): """Handler for monitoring stdout / stderr of a Process and being notified when some regex matches. It mimics/takes ideas from Don Libes expect, or python-pexpect. To use this `execo.process.ProcessOutputHandler`, instanciate one, - call its expect method, and add it to the stdout_handlers / - stderr_handlers of one or more processes. It is also possible to - add it to a process output handler before calling expect. + add it to the monitored process(es) stdout / stderr handlers. As + soon as added, it records the stream output. Then call the expect + method to expect some regexes in the stream. One instance of ExpectOutputHandler can handle stdout/stderr of several processes. It tracks each process's stream search position - independently. when a process is monitored by this handler for the - first time, the regex matching is started from the position in the - stream at the time that this output hander starts receiving data - or from the beginning of the stream, depending on param - start_from_current. For subsequent matches, the search start - position in the stream is the end of the previous match. + independently. For subsequent matches, the search start position + in the stream is the end of the previous match. """ def __init__(self): super(ExpectOutputHandler, self).__init__() - self.lock = threading.RLock() - self.last_pos = {} - - def expect(self, - process, - regexes, - callback = None, - condition = None, - backtrack_size = 2000, - start_from_current = False): - """:param regexes: a regex or list of regexes. May be given as string - or as compiled regexes (If given as compiled regexes, do not - forget flags, most likely re.MULTILINE. regex passed as - string are compiled with re.MULTILINE) - - :param callback: a callback function to call when there is a - match. The callback will take the following parameters: - process (the process instance for which there was a match), - stream (the stream index STDOUT / STDERR for which there was - a match), re_index (the index in the list of regex of the - regex which matched), mo (the match object). If no match was - found and eof was reached, or stream is in error, re_index - and mo are set to None. - - :param condition: a Threading.Condition wich will be notified - when there is a match (but in this case, you don't get the - process, stream, match object of the match) - - :param backtrack_size: Each time some data is received, this - ouput handler needs to perform the regex search not only on - the incoming data, but also on the previously received data, - or at least on the last n bytes of the previously received - data, because the regex may match on a boundary between what - was received in a previous read and what is received in the - incoming read. These n bytes are the backtrack_size. (for - special cases: if backtrack_size == None, the regex search - is always done on the whole received data, but beware, this - is probably not what you want) - - :param start_from_current: boolean. If True: when a process is - monitored by this handler for the first time, the regex - matching is started from the position in the stream at the - time that this output hander starts receiving data. If - False: when a process is monitored by this handler for the - first time, the regex matching is started from the beginning - of the stream. - """ - with self.lock: - self.regexes = singleton_to_collection(regexes) - for i, r in enumerate(self.regexes): - if not isinstance(r, type(re.compile(''))): - self.regexes[i] = re.compile(r, re.MULTILINE) - self.callback = callback - self.condition = condition - self.backtrack_size = backtrack_size - self.start_from_current = start_from_current - self._scan(process, STDOUT, '', False, False) - self._scan(process, STDERR, '', False, False) - - def _scan(self, process, stream, string, eof, error): - """When there is a match, the match position in the process stream - becomes the new position from which subsequent searches on the - same process / stream. - """ - k = (process, stream) - streamdata = [ process.stdout, process.stderr ][stream - 1] - with self.lock: - if not k in self.last_pos: - if self.start_from_current: - self.last_pos[k] = len(streamdata) - len(string) - else: - self.last_pos[k] = 0 - elif self.backtrack_size != None: - self.last_pos[k] = max(self.last_pos[k], len(streamdata) - len(string) - self.backtrack_size) - logger.debug("ExpectOuputHandler: scan stream %s at position %s of process %s" % (stream, self.last_pos[k], process)) - logger.iodebug("ExpectOuputHandler: stream from this position is: %r" % (streamdata[self.last_pos[k]:],)) - for re_index, r in enumerate(self.regexes): - mo = r.search(streamdata, self.last_pos[k]) - if mo != None: - self.last_pos[k] = mo.end() - logger.debug("ExpectOuputHandler: match found for %r in stream %s at position %s: %r in %s" % ( - r.pattern, - stream, - mo.span(), - streamdata[mo.span()[0]:mo.span()[1]], - process)) - break - if mo == None: re_index = None - if eof or error: - del self.last_pos[k] - if mo != None: - if stream == STDOUT: - try: - process.stdout_handlers.remove(self) - except ValueError: - pass - if stream == STDERR: - try: - process.stderr_handlers.remove(self) - except ValueError: - pass - if mo != None or eof or error: - if self.condition != None: - with self.condition: - self.condition.notify_all() - if self.callback: - self.callback(process, stream, re_index, mo) + self._lock = threading.RLock() + self._read_cv = threading.Condition(self._lock) + self._dirty_streams = set() def read(self, process, stream, string, eof, error): - logger.debug("ExpectOuputHandler: stream %s of process %s: read %r" % (stream, process, string)) - self._scan(process, stream, string, eof, error) + logger.debug("ExpectOutputHandler.read: read %r on stream %s of %s" % (string, stream_name(stream), process)) + k = (process, stream) + with self._read_cv: + if not k in self._buffers: + self._buffers[k] = "" + logger.debug("ExpectOutputHandler.read: %s of %s not yet in buffers" % (stream_name(stream), process)) + self._buffers[k] += string + self._dirty_streams.add(k) + logger.debug("ExpectOutputHandler.read: notifying condition to all") + self._read_cv.notify_all() + logger.debug("ExpectOutputHandler.read: notification sent to all") + + def expect(self, regexes, timeout=None): + """Expect a regex or a list of regexes. + + :param regexes: a regex or list of regexes. May be given as + string or as compiled regexes (If given as compiled regexes, + do not forget flags, most likely re.MULTILINE. regex passed + as string are compiled with re.MULTILINE) + + :param timeout: wait timeout after which it returns {} if no + match was found. If None (the default): no timeout. If 0: + return immediately, no blocking if no match found. + + Returns a dict whose keys are tuples (process, stream) and + whose values are tuples (regex_index, match_object). The + processes in the keys are the processes where a match was + found. The stream in the keys are the streams (STDIN, STDOUT) + where the match was found. The regex_index in the values is + the index of the regex which matched, when given several + regexes to search, or always 0 if only one regex was + search. The match_object in the values is the regex match + object. The (process, stream) tuples that are monitored are + the ones to which this ExpectOutputHandler has been added as + ProcessOutputHandler. For each (process, stream) monitored, + only the first match is returned by a call to expect. To get + more matches, call expect again. When timeout == None or there + is a timeout, if no match found, will block until at least one + match is found for a (process, stream), or until reaching the + timeout (in this case, it will return an empty dict). If + timeout == 0 and no match were immediately found, will return + immediately with an empty dict. + + """ + regexes = singleton_to_collection(regexes) + for i, r in enumerate(regexes): + if not isinstance(r, type(re.compile(''))): + regexes[i] = re.compile(r, re.MULTILINE) + matches = {} + with self._read_cv: + logger.debug("ExpectOutputHandler.expect: searching for %r in %i streams" % (regexes, len(self._buffers))) + for (process, stream) in self._buffers: + k = (process, stream) + logger.debug("ExpectOutputHandler.expect: searching in %s of %s" % (stream_name(stream), process)) + for re_index, r in enumerate(regexes): + mo = r.search(self._buffers[k]) + if mo != None: + matches[k] = (re_index, mo) + logger.debug("ExpectOuputHandler.expect: match found for %r in stream %s at position %s: %r in %s" % ( + r.pattern, + stream_name(stream), + mo.span(), + self._buffers[k][mo.span()[0]:mo.span()[1]], + process)) + self._buffers[k] = self._buffers[k][mo.end():] + self._dirty_streams.discard(k) + # if there are some matches, we return them immediately + # otherwise, we block until there is at least one match + if len(matches) > 0 or timeout == 0: + logger.debug("ExpectOutputHandler.expect: there are %i matches: %r, return immediately" % (len(matches), matches)) + return matches + countdown = Timer(timeout) + while len(matches) == 0 and (timeout == None or countdown.remaining() > 0): + if timeout: + cv_timeout = max(0, countdown.remaining()) + else: + cv_timeout = None + logger.debug("ExpectOutputHandler.expect: blocking wait for incoming data, timeout = %s" % (cv_timeout,)) + self._read_cv.wait(cv_timeout) + # maybe we should not wait for finished processes (or even + # wait when there are no processes at all), but not + # practical, as the ExpectOutputHandler has no knowledge + # of what processes/streams it has been added to + logger.debug("ExpectOutputHandler.expect: Unblocked due to notification of data received on stream or timeout. Searching for %r in %i dirty streams" % (regexes, len(self._dirty_streams), )) + for (process, stream) in self._dirty_streams: + k = (process, stream) + logger.debug("ExpectOutputHandler.expect: searching in %s of %s" % (stream_name(stream), process)) + for re_index, r in enumerate(regexes): + mo = r.search(self._buffers[k]) + if mo != None: + matches[k] = (re_index, mo) + logger.debug("ExpectOuputHandler: while blocking, match found for %r in stream %s at position %s: %r in %s" % ( + r.pattern, + stream_name(stream), + mo.span(), + self._buffers[k][mo.span()[0]:mo.span()[1]], + process)) + self._buffers[k] = self._buffers[k][mo.end():] + self._dirty_streams.clear() + return matches class ProcessBase(object): @@ -597,6 +594,12 @@ class ProcessBase(object): self.stderr = "" self.stdout_ioerror = False self.stderr_ioerror = False + if self._thread_local_storage.__dict__.get('expect_handler') != None: + for h in [ self.stdout_handlers, self.stderr_handlers ]: + try: + h.remove(self._thread_local_storage.expect_handler) + except ValueError: + pass self._thread_local_storage.expect_handler = None def _args(self): @@ -850,81 +853,66 @@ class ProcessBase(object): else: logger.warning(s) - def expect(self, regexes, timeout = False, stream = STDOUT, backtrack_size = 2000, start_from_current = False, expect_output_handler = None): + def expect(self, regexes, timeout=False, stream=STDOUT, backtrack=True, expect_output_handler=None): """searches the process output stream(s) for some regex. It mimics/takes ideas from Don Libes expect, or python-pexpect. - It is an easier-to-use frontend for - `execo.process.ExpectOutputHandler`. + It is a more user-friendly interface than + `execo.process.ExpectOutputHandler` (in particular, it manages + backtracking). - It waits for a regex to match, then returns tuple (regex - index, match object), or (None, None) if timeout reached, or - if eof reached, or stream in error, without any match. + It waits for a regex to match, then returns the tuple + (regex_index, match_object), or (None, None) if timeout + reached, or if eof reached, or stream in error, without any + match. It uses thread local storage such that concurrent expects in parallel threads do not interfere which each other. - :param regexes: a regex or list of regexes. May be given as string - or as compiled regexes. + :param regexes: a regex or list of regexes. May be given as + string or as compiled regexes (If given as compiled regexes, + do not forget flags, most likely re.MULTILINE. regex passed + as string are compiled with re.MULTILINE) :param timeout: wait timeout after which it returns (None, None) if no match was found. If False (the default): use the - default expect timeout. If None: no timeout. + default expect timeout. If None: no timeout. If 0: return + immediately, no blocking if no match found. :param stream: stream to monitor for this process, STDOUT or STDERR. - :param backtrack_size: Each time some data is received, this - ouput handler needs to perform the regex search not only on - the incoming data, but also on the previously received data, - or at least on the last n bytes of the previously received - data, because the regex may match on a boundary between what - was received in a previous read and what is received in the - incoming read. These n bytes are the backtrack_size. (for - special cases: if backtrack_size == None, the regex search - is always done on the whole received data, but beware, this - is probably not what you want) - - :param start_from_current: boolean. If True: when a process is - monitored by this handler for the first time, the regex - matching is started from the position in the stream at the - time that this output hander starts receiving data. If - False: when a process is monitored by this handler for the - first time, the regex matching is started from the beginning - of the stream. - - :param expect_output_handler: ExpectOutputHandler. If not - None, a specific ExpectOutputHandler instance to - use. Otherwise, a thread local ExpectOutputHandler is + :param backtrack: If True (the default), the first expect is + done from the beginning of the process start. If False, the + first expect is done from the next received process output. + + :param expect_output_handler: If not None, a specific + ExpectOutputHandler instance to use. Otherwise, if None (the + default), a thread local ExpectOutputHandler is instanciated. """ if timeout == False: timeout = self.default_expect_timeout - countdown = Timer(timeout) - cond = threading.Condition() - re_index_and_match_object = [None, None] - def internal_callback(process, stream, re_index, match_object): - re_index_and_match_object[0] = re_index - re_index_and_match_object[1] = match_object - with cond: - cond.notify_all() if expect_output_handler == None: - if self._thread_local_storage.__dict__.setdefault('expect_handler') == None: + expect_output_handler = self._thread_local_storage.__dict__.get('expect_handler') + if expect_output_handler == None: self._thread_local_storage.expect_handler = ExpectOutputHandler() - expect_output_handler = self._thread_local_storage.expect_handler - expect_output_handler.expect(self, - regexes, - callback = internal_callback, - backtrack_size = backtrack_size, - start_from_current = start_from_current) - with cond: - if stream == STDOUT: - self.stdout_handlers.append(expect_output_handler) - else: - self.stderr_handlers.append(expect_output_handler) - while (countdown.remaining() == None or countdown.remaining() > 0) and re_index_and_match_object[0] == None: - non_retrying_intr_cond_wait(cond, countdown.remaining()) - if re_index_and_match_object[0] == None: - self._notify_expect_fail(regexes) - return (re_index_and_match_object[0], re_index_and_match_object[1]) + expect_output_handler = self._thread_local_storage.expect_handler + if expect_output_handler not in (self.stdout_handlers, self.stderr_handlers)[stream - 1]: + with expect_output_handler._lock: + # we prevent the handler from being able to receive data while we initialize its buffer with stdout/stderr + if backtrack: + expect_output_handler._buffers[(self, stream)] = (self.stdout, self.stderr)[stream - 1] + logger.debug("Process.expect: backtracking, so initialize buffer with history of stream %s: %r" % (stream_name(stream), expect_output_handler._buffers[(self, stream)],)) + (self.stdout_handlers, self.stderr_handlers)[stream - 1].append(expect_output_handler) + matches = expect_output_handler.expect(regexes, timeout) + # note: if eof or error, ExpectOutputHandler.read() will be + # called, ExpectOutputHandler._dirty_streams will be updated + # with this stream, the condition variable will be notified, + # and the blocking pass of ExpectOutputHandler.expect() will + # return with an empty dict + if (self, stream) in matches: + return matches[(self, stream)] + else: + return (None, None) def _get_childs(pid): childs = []