should.py 34.9 KB
Newer Older
1 2
#!/usr/bin/env python3

3
# should -- Test command-line applications through .should files
4 5 6 7 8 9 10 11 12 13 14
#
# Copyright (C) 2018 by CRIStAL (UMR CNRS 9189, Université Lille) and Inria Lille
# Contributors:
#     Mathieu Giraud <mathieu.giraud@vidjil.org>
#     Mikaël Salson <mikael.salson@vidjil.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
15
# "should" is distributed in the hope that it will be useful,
16 17 18 19 20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
21
# along with "should". If not, see <http://www.gnu.org/licenses/>
22 23 24

import sys

25 26
if not (sys.version_info >= (3, 4)):
    print("Python >= 3.4 required")
27 28
    sys.exit(1)

29
__version_info__ = ('2','0','0')
30 31
__version__ = '.'.join(__version_info__)

32 33 34 35
import re
import argparse
import subprocess
import time
36
import random
37
import os.path
38 39 40
from collections import defaultdict, OrderedDict
import xml.etree.ElementTree as ET
import datetime
41
import tempfile
42 43 44 45 46 47

# Make sure the output is in utf8
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
sys.stderr = open(sys.stderr.fileno(), mode='w', encoding='utf8', buffering=1)

DEFAULT_CFG = 'should.cfg'
48 49
RETRY = 'should.retry'
RETRY_FLAG = '--retry'
50 51 52 53 54 55 56 57 58 59

TOKEN_COMMENT = '#'
TOKEN_DIRECTIVE = '!'
TOKEN_NAME = '$'
TOKEN_TEST = ':'
RE_TEST = re.compile('^(\S)*[:]')

DIRECTIVE_REQUIRES = '!REQUIRES:'
DIRECTIVE_NO_LAUNCHER = '!NO_LAUNCHER:'
DIRECTIVE_SCRIPT = '!LAUNCH:'
60
DIRECTIVE_NO_EXTRA = '!NO_EXTRA:'
61 62 63 64 65
DIRECTIVE_OPTIONS = '!OPTIONS:'
DIRECTIVE_SOURCE = '!OUTPUT_FILE:'
DIRECTIVE_EXIT_CODE = '!EXIT_CODE:'

VAR_LAUNCHER = '$LAUNCHER'
66
VAR_EXTRA = '$EXTRA'
67 68

MOD_TODO = 'f'
69
MOD_ALLOW = 'a'
70 71
MOD_REGEX = 'r'
MOD_COUNT_ALL = 'w'
72
MOD_IGNORE_CASE = 'i'
73 74 75 76
MOD_BLANKS = 'b'
MOD_MULTI_LINES = 'l'
MOD_KEEP_LEADING_TRAILING_SPACES = 'z'

77 78 79
MOD_MORE_THAN = '>'
MOD_LESS_THAN = '<'

80 81 82 83 84 85 86
TIMEOUT = 120
SHOW_ELAPSED_TIME_ABOVE = 1.0

RE_MODIFIERS = re.compile('^(\D*)(\d*)(\D*)$')

OUT_LOG = '.log'
OUT_TAP = '.tap'
87
OUT_XML = 'should.xml'
88 89 90 91

LINE = '-' * 40
ENDLINE_CHARS = '\r\n'
CONTINUATION_CHAR = '\\'
92 93
MAX_HALF_DUMP_LINES = 45
MAX_DUMP_LINES = 2*MAX_HALF_DUMP_LINES + 10
94 95 96 97 98 99

SKIP = 'SKIP'

TODO = 'TODO'
TODO_PASSED = 'TODO_PASSED'

100 101
ALLOW_FAILED = 'ALLOW_FAILED'

102 103 104 105 106 107 108
STATUS = {
    None: 'not run',
    False: 'failed',
    True: 'ok',
    SKIP: 'skip',
    TODO: 'TODO',
    TODO_PASSED: 'TODO-but-ok',
109
    ALLOW_FAILED: 'failed-but-ALLOW',
110 111
}

112 113 114 115 116 117 118 119 120 121 122 123 124
STATUS_ORDER = [
    # Failed
    False, TODO_PASSED,
    # Warnings
    TODO, ALLOW_FAILED, SKIP,
    # Passed
    True,
    # 'Forgotten' status when mixed to other tests
    None
    ]

FAIL_STATUS = [False, TODO_PASSED]
WARN_STATUS = FAIL_STATUS + [ALLOW_FAILED, TODO, SKIP]
125

126 127 128 129 130 131 132
STATUS_TAP = {
    None: 'not run',
    False: 'not ok',
    True: 'ok',
    SKIP: 'ok # SKIP',
    TODO: 'not ok # TODO',
    TODO_PASSED: 'ok # TODO',
133
    ALLOW_FAILED: 'not ok # SKIP',
134 135
}

136 137 138 139
STATUS_XML = STATUS.copy()
STATUS_XML[False] = 'failure'
STATUS_XML[SKIP] = 'skipped'

140

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
def combine_status(s1, s2):
    '''
    >>> combine_status(TODO, False)
    False

    >>> combine_status(True, SKIP) == SKIP
    True

    >>> combine_status(True, TODO_PASSED) == TODO_PASSED
    True
    '''

    i1 = STATUS_ORDER.index(s1)
    i2 = STATUS_ORDER.index(s2)
    return STATUS_ORDER[min(i1,i2)]

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
# Simple colored output

CSIm = '\033[%sm'

class ANSI:
    RESET = 0
    BRIGHT = 1
    BLACK = 30
    RED = 31
    GREEN = 32
    YELLOW = 33
    BLUE = 34
    MAGENTA = 35
    CYAN = 36
    WHITE = 37

def color(col, text, colorize = True):
    if not colorize:
        return text
176
    return CSIm % col + text + CSIm % ANSI.RESET
177 178

STATUS_COLORS = {
179
    None: ANSI.CYAN,
180 181
    False: ANSI.RED,
    True: ANSI.GREEN,
182 183 184 185
    SKIP: ANSI.CYAN,
    TODO: ANSI.CYAN,
    TODO_PASSED: ANSI.RED,
    ALLOW_FAILED: ANSI.CYAN,
186 187 188 189 190 191
}

# Modifier parser

MODIFIERS = [
    (MOD_TODO, 'todo', 'consider that the test should fail'),
192
    (MOD_ALLOW, 'allow', 'consider that the test is allowed to fail'),
193 194
    (MOD_REGEX, 'regex', 'consider as a regular expression'),
    (MOD_COUNT_ALL, 'count-all', 'count all occurrences, even on a same line'),
195
    (MOD_IGNORE_CASE, 'ignore-case', 'ignore case changes'),
196 197 198
    (MOD_BLANKS, 'blanks', "ignore whitespace differences as soon as there is at least one space. Implies 'r'"),
    (MOD_MULTI_LINES, 'multi-lines', 'search on all the output rather than on every line'),
    (MOD_KEEP_LEADING_TRAILING_SPACES, 'ltspaces', 'keep leading and trailing spaces'),
199 200 201

    (MOD_MORE_THAN, 'more-than', 'requires that the expression occurs strictly more than the given number'),
    (MOD_LESS_THAN, 'less-than', 'requires that the expression occurs strictly less than the given number'),
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
]



class ArgParser(argparse.ArgumentParser):

    def convert_arg_line_to_args(self, l):
        '''
        More flexible argument parsing from configuration file:
          - ignore leading and trailing spaces
          - allow empty lines
        '''
        ll = l.strip()
        return [ ll ] if ll else [ ]


class ModifierParser(ArgParser):

    def parse_modifiers(self, modifiers):
        mods, unknown = self.parse_known_args(['-' + mod for mod in modifiers])
222 223
        for m in unknown:
            sys.stderr.write("! Unknown modifier '%s'\n" % m[1])
224 225 226 227 228 229 230 231
        return mods

parser_mod = ModifierParser()
parser_mod.help = 'modifiers (uppercase letters cancel previous modifiers)\n'

for (mod_char, mod_long, mod_help) in MODIFIERS:
    parser_mod.add_argument('-' + mod_char, '--' + mod_long, action='store_true', help=mod_help)

232 233 234 235 236 237 238 239
    if mod_char.upper() != mod_char:
        parser_mod.add_argument('-' + mod_char.upper(), dest=mod_long.replace('-', '_'), action='store_const', const=False, default=False,
                                help='back to default, overriding any previous -%s' % mod_char)
        help_upper = '/%s' % mod_char.upper()
    else:
        help_upper = '  '

    parser_mod.help += '  %s%s %s\n' % (mod_char, help_upper, mod_help)
240 241 242 243 244

# Main argument parser

parser = ArgParser(description='Test command-line applications through .should files',
                   fromfile_prefix_chars='@',
245
                   epilog='''Example (see also README.md and demo/*.should):
246
  %(prog)s demo/hello.should''',
247
                   add_help=False,
248 249
                                 formatter_class=argparse.RawTextHelpFormatter)

250

251 252
options = ArgParser(fromfile_prefix_chars='@') # Can be used in !OPTIONS: directive

253 254 255
group = parser.add_argument_group('running tests (can also be set per test in !OPTIONS)')

for p in (group, options):
256 257 258
    p.add_argument('--cd', metavar='PATH', help='directory from which to run the test commands')
    p.add_argument('--cd-same', action='store_true', help='run the test commands from the same directory as the .should files')
    p.add_argument('--launcher', metavar='CMD', default='', help='launcher preceding each command (or replacing %s)' % VAR_LAUNCHER)
259
    p.add_argument('--extra', metavar='ARG', default='', help='extra argument after the first word of each command (or replacing %s)' % VAR_EXTRA)
260 261
    p.add_argument('--mod', metavar='MODIFIERS', action='append', help='global ' + parser_mod.help)
    p.add_argument('--var', metavar='NAME=value', action='append', help='variable definition (then use $NAME in .should files)')
262
    p.add_argument('--timeout', type=int, default = TIMEOUT, help = 'Delay (in seconds) after which the task is stopped (default: %(default)d)')
263

264 265 266 267 268 269 270
group = parser.add_argument_group('selecting tests to be run')

group.add_argument('--shuffle', action='store_true', help='shuffle the tests')
group.add_argument('--no-a', action='store_true', help="do not launch 'a' tests")
group.add_argument('--no-f', action='store_true', help="do not launch 'f' tests")
group.add_argument('--only-a', action='store_true', help="launches only 'a' tests")
group.add_argument('--only-f', action='store_true', help="launches only 'f' tests")
271

272 273 274
group.add_argument(RETRY_FLAG, action='store_true', help='launches only the last failed or warned tests')

output = parser.add_argument_group('controlling output')
275 276 277

output.add_argument('--log', action='append_const', dest='output', const=OUT_LOG, help='stores the output into .log files')
output.add_argument('--tap', action='append_const', dest='output', const=OUT_TAP, help='outputs .tap files')
278
output.add_argument('--xml', action='append_const', dest='output', const=OUT_XML, help='outputs JUnit-like XML into %s' % OUT_XML)
279 280 281
output.add_argument('-v', '--verbose', action='count', help='increase verbosity', default=1)
output.add_argument('-q', '--quiet', action='store_const', dest='verbose', const=0, help='verbosity to zero')

282 283 284 285 286 287
output.add_argument('--fail-a', action='store_true', help="fail on passing 'a' tests")
output.add_argument("-h", "--help", action="help", help="show this help message and exit")
output.add_argument('--version', action='version',
                    version='%(prog)s {version}'.format(version=__version__))


288 289 290 291 292 293 294 295 296 297 298
parser.add_argument('file', metavar='should-file', nargs='+', help='''input files (.should)''')

class ShouldException(BaseException):
    pass


def write_to_file(f, what):
    print('==> %s' % f)
    with open(f, 'w', encoding='utf-8') as ff:
        ff.write(what)

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315


# Command pre-processing

def pre_process(cmd):

    cc = cmd.split(' ')

    if not VAR_EXTRA in cmd:
        cc = [cc[0], VAR_EXTRA] + cc[1:]

    if not VAR_LAUNCHER in cmd:
        cc = [VAR_LAUNCHER] + cc

    return ' '.join(cc)


316 317 318 319 320
# Variables definition and expansion

def populate_variables(var):
    '''
    >>> populate_variables(['ab=cd', 'ef=xyz'])
321
    [('$ef', 'xyz'), ('$ab', 'cd')]
322 323 324 325 326 327 328 329
    '''

    variables = []

    if var:
        for v in var:
            try:
                key, var = v.split('=')
330
                variables = [('$' + key, var)] + variables
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359

            except IOError:
                raise ShouldException('Error in parsing variable definition: ' + v)
    return variables


def print_variables(variables):
    for (k, v) in variables:
        print('%s=%s' % (k, v))
    print('')

def replace_variables(s, variables):
    '''
    >>> replace_variables('hello', None)
    'hello'

    >>> replace_variables('hello', [('hell', 'w'), ('o', 'orld')])
    'world'

    >>> replace_variables('xyz xyz', [('y', 'abc')])
    'xabcz xabcz'
    '''

    if variables:
        for (key, val) in variables:
            s = s.replace(key, val)
    return s


360 361 362 363
class OrderedDefaultListDict(OrderedDict):
    def __missing__(self, key):
        self[key] = value = []
        return value
364 365 366 367

class Stats():
    '''
    >>> s = Stats('foo')
368
    >>> s.up(2)
369 370
    >>> list(s.keys())
    [2]
371 372
    >>> s[2]
    [1]
373 374

    >>> t = Stats()
375 376
    >>> t.up(2, 'hello')
    >>> t.up(3)
377 378 379 380 381 382

    >>> u = s + t
    >>> sorted(u.keys())
    [2, 3]
    >>> list(s.keys())
    [2]
383 384 385

    >>> sorted(u.items())
    [(2, [1, 'hello']), (3, [1])]
386 387 388
    '''

    def __init__(self, item=''):
389
        self.stats = OrderedDefaultListDict()
390 391 392 393 394
        self.item = item

    def __getitem__(self, key):
        return self.stats[key]

395 396 397
    def up(self, key, data=1):
        self.stats[key].append(data)

398 399 400 401 402 403
    def __setitem__(self, key, value):
        self.stats[key] = value

    def keys(self):
        return self.stats.keys()

404 405 406
    def items(self):
        return self.stats.items()

407 408 409 410 411 412
    def __iter__(self):
        '''Ordered according to STATUS_ORDER'''
        for key in STATUS_ORDER[::-1]:
            if key in self.keys():
                yield (key, self[key])

413 414 415 416 417 418
    def values(self):
        return self.stats.values()

    def total(self):
        return sum(map(len, self.stats.values()))

419 420 421 422
    def __add__(self, other):
        result = Stats(self.item)
        for data in (self, other):
            for key in data.keys():
423
                result.stats[key] += data.stats[key]
424 425 426 427 428 429
        return result

    def str_status(self, status, colorize=True):
        s = '==> '
        s += STATUS[status]
        s += ' - '
430 431 432 433
        s = color(STATUS_COLORS[status], s, colorize)
        s += ' '.join([color(STATUS_COLORS[key], '%s:%d', colorize) % (STATUS[key], len(val)) for (key, val) in self])

        nb_items = '- total:%s' % self.total()
434
        if self.item:
435 436 437 438
            nb_items += ' ' + self.item + ('s' if self.total() > 1 else '')
        s += ' ' + color(STATUS_COLORS[status], nb_items, colorize)

        return s
439 440 441 442




443
class TestCaseAbstract:
444 445 446 447 448 449
    def __init__(self):
        raise NotImplemented

    def str_additional_status(self, verbose=False):
        return ''

450
    def str_status(self, verbose=False, names=STATUS, colorize=True):
451
        s = ''
452
        s += color(STATUS_COLORS[self.status], names[self.status], colorize)
453 454 455 456
        s += self.str_additional_status(verbose)

        return s

457 458 459 460 461 462 463
    def xml(self):
        x = ET.Element('testcase', {'name': self.name, 'status': STATUS_XML[self.status]})
        if self.status in WARN_STATUS:
            x.append(ET.Element(STATUS_XML[self.status],
                                {'message': repr(self) + '\n' + self.str_status(names=STATUS_XML, colorize = False)}))
        return x

464
    def tap(self, names=STATUS_TAP, colorize=False):
465 466 467
        s = []

        if self.status is not None:
468
            s.append(self.str_status(names=names, colorize=colorize))
469 470 471 472 473 474

        if self.name:
            s.append(self.name)

        return ' - '.join(s)

475 476 477
    def str(self, colorize):
        return self.tap(names=STATUS, colorize=colorize)

478
    def __str__(self):
479
        return self.str(colorize=True)
480

481 482
    def __repr__(self):
        raise NotImplemented
483

484
class ExternalTestCase(TestCaseAbstract):
485 486 487 488
    def __init__(self, name, status, info=''):
        self.name = name
        self.status = status
        self.info = info
489
        self.modifiers = ''
490 491 492

    def str_additional_status(self, verbose = False):
        s = ''
493
        if self.status in WARN_STATUS or verbose:
494 495 496 497 498 499
            s += ' (%s)' % self.info
        return s

    def test(self, *args, **kwargs):
        pass

500 501 502 503 504
    def __repr__(self):
        return self.info


class TestCase(TestCaseAbstract):
505
    '''
506
    >>> test = TestCase('', 'hello')
507 508 509
    >>> repr(test)
    ':hello'

510
    >>> test.str_status(colorize=False)
511 512 513 514 515 516 517 518 519 520 521 522
    'not run'

    >>> test.test(['world'])
    False

    >>> test.status
    False

    >>> test.test(['hello'])
    True


523
    >>> test = TestCase('3', 'hello')
524 525 526 527 528 529 530
    >>> repr(test)
    '3:hello'

    >>> test.test(['hello'])
    False
    >>> test.count
    1
531
    >>> print(test.str(colorize=False))
532 533 534 535 536 537 538 539
    failed (1/3)
    >>> test.tap()
    'not ok (1/3)'

    >>> test.test(['hello'] * 3)
    True


540
    >>> TestCase('r2', ' e.*o ').test(['hello', 'ello', 'world'])
541 542
    True

543
    >>> TestCase('z1', ' e').test(['hello', 'h ello'])
544 545
    True

546
    >>> TestCase('rl', 'e.*o').test(['hel', 'lo'])
547 548
    True

549
    >>> TestCase('', 'e o').test(['e  o'])
550 551
    False

552
    >>> TestCase('f', 'e o').test(['e  o'])
553 554
    'TODO'

555
    >>> TestCase('b', 'e o').test(['e  o'])
556 557
    True

558
    >>> TestCase('b', 'e    o').test(['e  o'])
559 560
    True

561
    >>> TestCase('w2', 'o').test(['hello world'])
562 563
    True

564
    >>> TestCase('wW2', 'o').test(['hello world'])
565 566
    False

567
    >>> TestCase('wr2', 'a.c').test(['bli abc axc bla'])
568 569 570
    True


571
    >>> repr(TestCase('x3y', 'hello'))
572 573
    'xy3:hello'

574
    >>> print(TestCase('1x2', 'hello')) # doctest: +IGNORE_EXCEPTION_DETAIL
575 576 577 578 579 580 581 582
    Traceback (most recent call last):
     ...
    ShouldException: Error in parsing modifiers: 1x2
    '''

    def __init__(self, modifiers, expression, name=''):
        self.name = name
        self.status = None
583
        self.count = '?'
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600

        # Extract self.expected_count from modifiers
        m = RE_MODIFIERS.match(modifiers)
        if not m:
            raise ShouldException('Error in parsing modifiers: ' + modifiers)
        self.modifiers = m.group(1) + m.group(3)
        self.expected_count = int(m.group(2)) if m.group(2) else None

        # Parse modifiers
        self.mods = parser_mod.parse_modifiers(self.modifiers)

        self.expression = expression if self.mods.ltspaces else expression.strip()
        if self.mods.blanks:
            while '  ' in self.expression:
                self.expression = self.expression.replace('  ', ' ')
            self.expression = self.expression.replace(' ', '\s+')
            self.mods.regex = True
601 602 603 604 605 606 607

        self.regex = None
        if self.mods.regex:
            if self.mods.ignore_case:
                self.regex = re.compile(self.expression, re.IGNORECASE)
            else:
                self.regex = re.compile(self.expression)
608 609 610 611 612 613 614

    def test(self, lines, variables=None, verbose=0):
        if self.mods.multi_lines:
            lines = [' '.join([l.rstrip(ENDLINE_CHARS) for l in lines])]

        expression_var = replace_variables(self.expression, variables)

615 616 617
        if not self.regex and self.mods.ignore_case:
            expression_var = expression_var.upper()

618 619 620 621 622 623 624
        self.count = 0
        for l in lines:
            if self.regex:
                if self.mods.count_all:
                    self.count += len(self.regex.findall(l))
                elif self.regex.search(l):
                    self.count += 1
625 626 627 628 629
            else:
                if self.mods.ignore_case:
                    l = l.upper()
                if expression_var in l:
                    self.count += l.count(expression_var) if self.mods.count_all else 1
630

631 632 633 634 635 636 637 638
        if self.expected_count is None:
            self.status = (self.count > 0)
        elif self.mods.less_than:
            self.status = (self.count < self.expected_count)
        elif self.mods.more_than:
            self.status = (self.count > self.expected_count)
        else:
            self.status = (self.count == self.expected_count)
639 640 641

        if self.mods.todo:
            self.status = [TODO, TODO_PASSED][self.status]
642 643
        if self.mods.allow:
            self.status = [ALLOW_FAILED, True][self.status]
644 645 646 647 648 649 650 651 652 653

        if verbose > 0:
            print('')
            print(self.str_status(True) + ' ' + repr(self))

        return self.status

    def str_additional_status(self, verbose=False):
        s = ''

654
        if self.status in WARN_STATUS or verbose:
655 656 657
            s += ' (%s/%s%s)' % (self.count,
                                 MOD_LESS_THAN if self.mods.less_than else MOD_MORE_THAN if self.mods.more_than else '',
                                 self.expected_count if self.expected_count is not None else '+')
658 659 660 661 662 663 664

        return s

    def __repr__(self):
        return '%s%s:%s' % (self.modifiers, self.expected_count if self.expected_count is not None else '', self.expression)


665
class TestSuite():
666
    '''
667
    >>> s = TestSuite()
668
    >>> s.test(['echo "hello"', '$My test', ':hello'], colorize = False)
669 670 671 672
    True
    >>> s.tests[0].status
    True

673
    >>> s2 = TestSuite('r')
674
    >>> s2.variables.append(("$LAUNCHER", ""))
675
    >>> s2.variables.append(("$EXTRA", ""))
676
    >>> s2.test(['echo "hello"', '$ A nice test', ':e.*o'], verbose = 1, colorize = False)   # doctest: +NORMALIZE_WHITESPACE
677 678 679 680 681 682 683 684
    echo "hello"
      stdout --> 1 lines
      stderr --> 0 lines
    ok - A nice test
    ok - Exit code is 0
    True

    >>> s2.str_status(colorize = False)
685
    '==> ok - ok:2 - total:2 tests'
686 687 688 689 690 691 692

    >>> print(s2.tap())   # doctest: +NORMALIZE_WHITESPACE
    1..2
    ok - A nice test
    ok - Exit code is 0
    '''

693
    def __init__(self, modifiers = '', cd = None, name = '', timeout = TIMEOUT):
694
        self.name = name
695 696
        self.requires = True
        self.requires_cmd = None
697
        self.requires_stderr = []
698 699
        self.cmds = []
        self.tests = []
700 701
        self.stdin = []
        self.stdout = []
702
        self.test_lines = []
703
        self.skip = False
704 705
        self.status = None
        self.modifiers = modifiers
706
        self.opt_modifiers = ''
707
        self.variables = []
708
        self.status = None
709 710 711 712 713 714
        self.stats = Stats('test')
        self.source = None
        self.cd = cd
        self.use_launcher = True
        self.expected_exit_code = 0
        self.elapsed_time = None
715
        self.timeout = timeout
716

717 718 719 720 721 722 723 724 725
    def cmd_variables_cd(self, cmd, verbose, colorize):
        cmd = replace_variables(cmd, self.variables_all)
        if self.cd:
            cmd = 'cd %s ; ' % self.cd + cmd
        if verbose > 0:
            print(color(ANSI.MAGENTA, cmd, colorize))
        return cmd

    def test(self, should_lines, variables=[], verbose=0, colorize=True, only=None):
726
        name = ''
727 728 729 730 731 732 733 734 735
        current_cmd = ''   # multi-line command
        current_cmds = []  # commands since the last command run
        current_tests = [] # tests since the last command run
        self.only = only
        self.variables_all = self.variables + variables

        # Iterate over should_lines
        # then use once DIRECTIVE_SCRIPT to flush the last tests
        for l in list(should_lines) + [DIRECTIVE_SCRIPT]:
736 737 738 739 740 741 742 743 744 745 746 747

            l = l.lstrip().rstrip(ENDLINE_CHARS)
            if not l:
                continue

            # Comment
            if l.startswith(TOKEN_COMMENT):
                continue

            # Directive -- Requires
            if l.startswith(DIRECTIVE_REQUIRES):
                self.requires_cmd = l[len(DIRECTIVE_REQUIRES):].strip()
748 749 750 751 752 753 754 755 756 757

                self.variables_all = self.variables + variables
                requires_cmd = self.cmd_variables_cd(self.requires_cmd, verbose, colorize)
                p = subprocess.Popen(requires_cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
                self.requires = (p.wait() == 0)
                self.requires_stderr = [l.decode(errors='replace') for l in p.stderr.readlines()]
                if not self.requires:
                    self.skip_set('Condition is not met: %s' % self.requires_cmd, verbose)
                if verbose > 0:
                    print(color(ANSI.CYAN, ''.join(self.requires_stderr), colorize))
758 759 760 761 762
                continue

            # Directive -- No launcher
            if l.startswith(DIRECTIVE_NO_LAUNCHER):
                self.use_launcher = False
763 764
                if replace_variables(VAR_LAUNCHER, self.variables_all):
                    self.skip_set('%s while %s is given' % (DIRECTIVE_NO_LAUNCHER, VAR_LAUNCHER), verbose)
765 766
                continue

767 768 769
            # Directive -- No extra options
            if l.startswith(DIRECTIVE_NO_EXTRA):
                self.variables = [(VAR_EXTRA, '')] + self.variables
770
                self.variables_all = self.variables + variables
771 772
                continue

773 774 775 776 777 778 779 780
            # Directive -- Source
            if l.startswith(DIRECTIVE_SOURCE):
                self.source = os.path.join(self.cd if self.cd else '', l[len(DIRECTIVE_SOURCE):].strip())
                continue

            # Directive -- Options
            if l.startswith(DIRECTIVE_OPTIONS):
                opts, unknown = options.parse_known_args(l[len(DIRECTIVE_OPTIONS):].split())
781
                self.variables = populate_variables(opts.var) + self.variables
782 783
                self.variables_all = self.variables + variables
                self.opt_modifiers = ''.join(opts.mod) if opts.mod else ''
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
                continue

            # Directive -- Exit code
            if l.startswith(DIRECTIVE_EXIT_CODE):
                self.expected_exit_code = int(l[len(DIRECTIVE_EXIT_CODE):].strip())
                continue

            # Name
            if l.startswith(TOKEN_NAME):
                name = l[1:].strip()
                continue

            # Directive -- Command
            if l.startswith(DIRECTIVE_SCRIPT):
                l = l[len(DIRECTIVE_SCRIPT):]

            # Directive -- Others
            if l.startswith(TOKEN_DIRECTIVE):
                sys.stderr.write('! Unknown directive: %s\n' % l)
                continue

            # Test
            if RE_TEST.search(l):
                pos = l.find(TOKEN_TEST)
                modifiers, expression = l[:pos], l[pos+1:]
809 810 811
                test = TestCase(modifiers + self.opt_modifiers + self.modifiers, expression, name)
                current_tests.append(test)
                self.tests.append(test)
812 813
                continue

814 815 816

            # Command : flush and test the previous tests
            # If the command is empty (for example at the ned), launch previous commands even when there are no tests
817
            l = l.strip()
818

819
            if current_tests or not l:
820

821 822 823 824 825 826 827 828 829
                # Test current_cmds with current_tests
                if not self.skip:
                    test_lines, exit_test = self.launch(current_cmds, verbose, colorize)
                    current_tests.append(exit_test)
                    self.test_lines += test_lines
                    self.tests_on_lines(current_tests, test_lines, verbose, colorize)
                    self.debug(self.status, "\n".join(current_cmds), test_lines, verbose, colorize)
                else:
                    self.skip_tests(current_tests)
830

831 832
                current_cmds = []
                current_tests = []
833

834 835 836
            # Command
            if not l:
                continue
837

838 839 840
            next_cmd_continues = l.endswith(CONTINUATION_CHAR)
            if next_cmd_continues:
                l = l[:-1]
841

842
            current_cmd += l
843

844 845 846 847
            if not next_cmd_continues:
                current_cmds.append(current_cmd)
                self.cmds.append(current_cmd)
                current_cmd = ''
848

849

850
        # end of loop on should_lines
851

852 853
        if verbose > 1:
            print_variables(self.variables_all)
854

855
        return self.status
856

857

858
    def launch(self, cmds, verbose, colorize):
859 860
        start_time = time.time()

861 862
        cmd = ' ; '.join(map(pre_process, cmds))
        cmd = self.cmd_variables_cd(cmd, verbose, colorize)
863

864 865
        f_stdout = tempfile.TemporaryFile()
        f_stderr = tempfile.TemporaryFile()
866
        p = subprocess.Popen([cmd], shell=True,
867
                             stdout=f_stdout, stderr=f_stderr,
868 869 870
                             close_fds=True)

        try:
871
            self.exit_code = p.wait(self.timeout)
872
            exit_test = ExternalTestCase('Exit code is %d' % self.expected_exit_code, self.exit_code == self.expected_exit_code, str(self.exit_code))
873
        except subprocess.TimeoutExpired:
874
            self.exit_code = None
875
            exit_test = ExternalTestCase('Exit code is %d' % self.expected_exit_code, SKIP, 'timeout after %s seconds' % self.timeout)
876
            p.kill()
877

878 879 880 881 882 883 884
        self.tests.append(exit_test)
        self.status = combine_status(self.status, exit_test.status)

        if self.elapsed_time is None:
            self.elapsed_time = 0
        self.elapsed_time += time.time() - start_time

885 886 887 888 889 890
        f_stdout.seek(0)
        f_stderr.seek(0)
        self.stdout = [l.decode(errors='replace') for l in f_stdout.readlines()]
        self.stderr = [l.decode(errors='replace') for l in f_stderr.readlines()]
        f_stdout.close()
        f_stderr.close()
891

892 893 894
        if verbose > 0:
            self.print_stderr(colorize)

895
        return open(self.source).readlines() if self.source else self.stdout, exit_test
896 897 898



899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915
    def tests_on_lines(self, tests, test_lines, verbose, colorize):
        '''
        Test all tests in 'tests' on 'test_lines',
        taking into accound self modifiers
        and gathering statuses in self.status
        '''
        for test in tests:
            # Filter
            if self.only:
                if not self.only(test):
                    test.status = SKIP
                    continue

            # Test the test
            test.test(test_lines, variables=self.variables_all, verbose=verbose-1)
            self.stats.up(test.status)
            self.status = combine_status(self.status, test.status)
916

917 918
            if verbose > 0 or test.status in WARN_STATUS:
                print(test.str(colorize))
919

920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
    def print_stderr(self, colorize=True):
        print('  stdout --> %s lines' % len(self.stdout))
        print('  stderr --> %s lines' % len(self.stderr))
        print(color(ANSI.CYAN, ''.join(self.stderr), colorize))

    def skip_set(self, reason, verbose=1):
        if verbose > 0:
            print('Skipping tests: %s' % reason)
        self.skip = True
        self.status = combine_status(self.status, SKIP)

    def skip_tests(self, tests):
        for test in tests:
            test.status = SKIP
            self.stats.up(test.status)

    def debug(self, status, cmd, test_lines, verbose, colorize):
        if status in FAIL_STATUS and verbose <= 0:
938 939 940
            print(color(ANSI.MAGENTA, cmd, colorize))
            self.print_stderr(colorize)

941
        if status in FAIL_STATUS or verbose > 1:
942
            print(LINE)
943 944
            if len(test_lines) <= MAX_DUMP_LINES:
                print(''.join(test_lines), end='')
945
            else:
946 947 948
                print(''.join(test_lines[:MAX_HALF_DUMP_LINES]), end='')
                print(color(ANSI.MAGENTA, '... %d other lines ...' % (len(test_lines) - 2*MAX_HALF_DUMP_LINES), colorize))
                print(''.join(test_lines[-MAX_HALF_DUMP_LINES:]), end='')
949

950 951 952 953 954 955 956
            print(LINE)



    def str_status(self, colorize=True):
        return self.stats.str_status(self.status, colorize)

957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
    def xml(self):
        x = ET.Element('testsuite',
                       {'id': self.name,
                        'name': self.name,
                        'cmd': str(self.cmds),
                        'tests': str(self.stats.total()),
                        'failures': str(len(self.stats[False])),
                        'skipped': str(len(self.stats[SKIP])),
                        'time': self.str_elapsed_time(tag=''),
                        'timestamp': datetime.datetime.now().isoformat()
                       })
        for test in self.tests:
            x.append(test.xml())

        v = ET.Element('properties')
        for (key, val) in self.variables_all:
            v.append(ET.Element('property', {'name': key, 'value': val}))
        x.append(v)

        return x

978 979 980
    def tap(self):
        s = ''
        s += '1..%d' % len(self.tests) + '\n'
981
        s += '\n'.join(map(TestCase.tap, self.tests))
982 983 984
        s += '\n'
        return s

985 986
    def str_elapsed_time(self, tag='s'):
        return ('%.2f%s' % (self.elapsed_time, tag)) if self.elapsed_time is not None else ''
987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001

    def __str__(self):
        s = ''
        s += '\n'.join(self.cmds)
        s += '\n'
        s += self.str_elapsed_time()
        s += '\n'.join(map(str,self.tests))
        if self.status is not None:
            s += '\n'
            s += self.str_status()
        return s


class FileSet():

1002
    def __init__(self, files, modifiers = '', timeout = TIMEOUT):
1003
        self.files = files
1004
        self.sets = []
1005 1006 1007 1008
        self.modifiers = modifiers
        self.status = None
        self.stats = Stats('file')
        self.stats_tests = Stats('test')
1009
        self.timeout = timeout
1010

1011 1012 1013
    def __len__(self):
        return len(self.files)

1014 1015
    def test(self, variables=None, cd=None, cd_same=False, output=None, verbose=0, only=None):
        self.status = None
1016

1017 1018
        try:
          for f in self.files:
1019 1020 1021
            if verbose > 0:
                print(f)
            cd_f = os.path.dirname(f) if cd_same else cd
1022
            s = TestSuite(self.modifiers, cd_f, name = f, timeout = self.timeout)
1023
            self.sets.append(s)
1024
            s.test(open(f), variables, verbose - 1, only=only)
1025
            self.stats.up(s.status, f)
1026
            self.status = combine_status(self.status, s.status)
1027 1028
            self.stats_tests += s.stats

1029 1030
            filename_without_ext = os.path.splitext(f)[0]

1031
            if output and OUT_LOG in output:
1032
                write_to_file(filename_without_ext + OUT_LOG, ''.join(s.test_lines))
1033 1034

            if output and OUT_TAP in output:
1035
                write_to_file(filename_without_ext + OUT_TAP, s.tap())
1036 1037 1038 1039 1040 1041 1042 1043 1044

            if verbose > 0 or s.status is False:
                if not verbose:
                    print(f, end=' ')
                if s.elapsed_time:
                    if s.elapsed_time >= SHOW_ELAPSED_TIME_ABOVE:
                        print(s.str_elapsed_time())
                print(s.str_status())
                print('')
1045 1046
        except KeyboardInterrupt:
            print('==== interrupted ====\n')
1047

1048 1049 1050
        if output and OUT_XML in output:
            self.xml().write(OUT_XML)

1051
        print()
1052 1053 1054 1055
        print('Summary', end=' ')
        print(self.stats.str_status(self.status))
        print('Summary', end=' ')
        print(self.stats_tests.str_status(self.status))
1056
        print()
1057

1058 1059 1060 1061 1062 1063
        for sta in self.stats.keys():
            if sta == True:
                continue
            print('files with %s:' % color(STATUS_COLORS[sta], STATUS[sta]))
            for f in self.stats[sta]:
                print('  ' + f)
1064 1065 1066

        return self.status

1067 1068 1069 1070 1071
    def xml(self):
        x = ET.Element('testsuites',
                       {'id': 'Test at %s' % datetime.datetime.now().isoformat(),
                        'name': 'tested by should',
                        'tests': str(self.stats_tests.total()),
1072
                        'failures': str(len(self.stats_tests[False]) if False in self.stats_tests else 0),
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
                       })

        for s in self.sets:
            x.append(s.xml())

        return ET.ElementTree(x)

    def write_retry(self, argv, argv_remove, verbose=1):
        '''
        If there were tests in WARN_STATUS, write the RETRY file
        with, non-file arguments and WARN_STATUS files.
        '''

        # cmd = [sys.executable, sys.argv[0]]

        files = []

        for sta in WARN_STATUS:
            files += self.stats[sta]

        if not files:
            return

        args = []
        for arg in argv:
            if arg not in argv_remove:
                args.append(arg)

        with open(RETRY, 'w') as f:
            f.write('\n'.join(args + files) + '\n')

        if verbose > 0:
            print('%s %s will relaunch these tests.' % (sys.argv[0], RETRY_FLAG))

def read_retry():
    try:
        return [l.rstrip() for l in open(RETRY).readlines()]
    except:
        return []

1113 1114 1115

if __name__ == '__main__':
    argv = (['@' + DEFAULT_CFG] + sys.argv[1:]) if os.path.exists(DEFAULT_CFG) else sys.argv[1:]
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125

    if RETRY_FLAG in argv:
        retry = read_retry()
        argv += retry
        if retry:
            print(color(ANSI.BLUE, "Retrying previous failed or warned tests"))
        else:
            print(color(ANSI.RED, "Nothing to retry"))
            sys.exit(2)

1126 1127 1128
    args = parser.parse_args(argv)
    variables = populate_variables(args.var)
    variables.append((VAR_LAUNCHER, args.launcher))
1129
    variables.append((VAR_EXTRA, args.extra))
1130 1131 1132 1133

    if args.verbose>0:
        print_variables(variables)

1134 1135 1136 1137
    if args.shuffle:
        print("Shuffling test files")
        random.shuffle(args.file)

1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
    # Filters
    only = lambda test: (
        ((MOD_TODO in test.modifiers) <= (not args.no_f)) and
        ((MOD_TODO in test.modifiers) >= args.only_f) and
        ((MOD_ALLOW in test.modifiers) <= (not args.no_a)) and
        ((MOD_ALLOW in test.modifiers) >= args.only_a)
        )

    if args.fail_a:
        args.mod = (args.mod if args.mod else []) + ['A']

    # Launch tests
1150
    fs = FileSet(args.file, timeout = args.timeout, modifiers=''.join(args.mod if args.mod else []))
1151
    status = fs.test(variables = variables, cd = args.cd, cd_same = args.cd_same, output = args.output, verbose = args.verbose, only = only)
1152

1153 1154
    if len(fs) > 1:
        retry = fs.write_retry(sys.argv[1:], args.file, verbose = args.verbose)
1155

1156
    sys.exit(1 if status in FAIL_STATUS else 0)
1157 1158