should-vdj-to-tap.py 10.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
'''
Tests one ore several .should-vdj.fa files.
A .should-vdj.fa file is a fasta file whose identifiers are in the form:

>should_pattern comments

The 'should_pattern' describes the expected V(D)J segmentation in a .vdj format.
All spaces is the pattern have to be replaced by '_'.
The script launches Vidjil (or any other PROGRAM) on the .shoud-vdj.fasta file,
expecting that it returns another fasta file whose identifiers are in the form:

>should_pattern result

The 'should_pattern' is then checked against the 'result' part, and a .tap file is produced.
'''

import sys
18
import re
19
import ansi
20 21 22 23 24 25

PY_REQUIRED = (2, 7)
if sys.version_info < PY_REQUIRED:
    print("This script requires Python >= %d.%d." % (PY_REQUIRED))
    sys.exit(1)

26
from subprocess import Popen, PIPE, STDOUT
27
from collections import defaultdict
28 29

import os
30
import argparse
31

32
VIDJIL_FINE = '{directory}/vidjil -X 100 -# "#" -c segment -i -3 -d -g {directory}/germline %s >> %s'
33
VIDJIL_KMER = '{directory}/vidjil -w 20 -# "#" -b out -c windows -uuuU -2 -i -g {directory}/germline %s > /dev/null ; cat out/out.segmented.vdj.fa out/out.unsegmented.vdj.fa >> %s'
34 35 36 37

parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--program', '-p', default=VIDJIL_FINE, help='program to launch on each file (%(default)s)')
parser.add_argument('-q', dest='program', action='store_const', const=VIDJIL_KMER, help='shortcut for -p (VIDJIL_KMER), to be used with -2')
38
parser.add_argument('--ignore_N', '-N', action='store_true', help='ignore N patterns, checking only gene and allele names')
39
parser.add_argument('--ignore_allele', '-A', action='store_true', help='ignore allele, checking only gene names')
40
parser.add_argument('--ignore_D', '-D', action='store_true', help='ignore D gene names and alleles')
41
parser.add_argument('--after-two', '-2', action='store_true', help='compare only the right part of the pattern after two underscores (locus code)')
42
parser.add_argument('--revcomp', '-r', action='store_true', help='duplicate the tests on reverse-complemented files')
43
parser.add_argument('--directory', '-d', default='../..', help='base directory where Vidjil is. This value is used by the default -p and -q values (%(default)s)')
44 45
parser.add_argument('--expected-fails', '-e', type=int, default=0, help='number of expected non-TODO failed tests -- do not exit with error for this number')

46 47
parser.add_argument('--verbose', '-v', action='store_true')

48 49 50
parser.add_argument('file', nargs='+', help='''.should-vdj.fa files''')

args = parser.parse_args()
51 52

SHOULD_SUFFIX = '.should-vdj.fa'
53

54 55 56
TAP_SUFFIX = '.tap'
LOG_SUFFIX = '.log'

57 58 59 60 61
PROG_TAG = '.1'

if args.after_two:
    PROG_TAG = '.2'

62
SPECIAL_KEYWORDS = ['TODO', 'BUG']
63

64
global_failed = 0
65 66
global_stats = defaultdict(int)
global_stats_failed = defaultdict(int)
67
global_stats_todo = defaultdict(int)
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95

def should_result_from_vidjil(l):
    '''
    Return a (should, result) couple from a Vidjil output line in the form of:
    >TRDD2*01_1/AGG/1_TRDD3*01__TRD+ + VJ 	0 84 88 187	TRDD2*01 1/AGG/1 TRDD3*01  TRD+
    or
    >TRDV3*01_0//0_TRDJ4*01 ! + VJ	0 49 50 97       TRD UNSEG noisy
    '''

    l = l.strip()
    pos = l.find(' + ') if ' + ' in l else l.find(' - ')
    should = l[1:pos].replace('_', ' ')

    pos = l.find('\t')
    result = l[pos+1:] + ' '

    return (should, result)


def should_results_from_program(f_should):
    '''
    Launch the program on f_should
    Yields (#, >) couples of V(D)J designations, such as in:
    #TRDD2*01 1/AGG/1 TRDD3*01  TRD+
    >TRDD2*01  TRDD3*01
    '''

96
    f_log = f_should + PROG_TAG + LOG_SUFFIX
97 98
    f_log = f_log.replace(SHOULD_SUFFIX, '')

99 100
    program = args.program.replace('{directory}',args.directory)
    cmd = program % (f_should, f_log)
101 102 103

    with open(f_log, 'w') as f:
        f.write('# %s\n\n' % cmd)
104 105 106 107 108

    exit_code = os.system(cmd)
    if exit_code:
        print "Error. The program halted with exit code %s." % exit_code
        sys.exit(3)
109 110 111 112 113

    for l in open(f_log):
        if not l:
            continue
        if l[0] == '>':
114
            yield should_result_from_vidjil(l)
115

116 117 118 119 120 121 122 123
def should_pattern_to_regex(p):
    '''
    Converts patterns such as the following ones into a Python regex.

    TRBV6-1 7/0/12 TRBD2 1/8/2 TRBJ2-7
    TRDV2*02 14/TCCCGGCCT/0 TRDD3*01_3/CCACGGC/4_TRAJ29*01
    TRDD2 18//4 TRAJ29 TRA+D # comments comments ...
    TRGV11 2/5/0 TRGJ1
124
    IGHV3-48 0/AA/6 IGHD5-12 (2//7, 3//6, 4//5) IGHJ4*02
125 126 127
    '''


128
    def process_term(term):
129 130 131

        # Comment, stop parsing here
        if term.startswith('#'):
132
            return []
133

134 135 136
        if term in SPECIAL_KEYWORDS:
            return []

137
        # Ambiguous/alternate pattern
138
        if term.startswith(','):
139
            choices = []
140 141
            term_without_comma = term[1:]
            return ['|' + ''.join(process_term(term_without_comma))]
142

143 144 145 146
        # (such as CDR3 / junction)
        if term.startswith('{'):
            term = term.replace('*','[*]').replace('!','#')
            term = term.replace('{', '.*[{].*').replace('}', '.*[}]')
147
            return [term]
148

149 150 151 152 153 154 155 156 157 158
        # deletion/insertion/deletion
        # Note that '/' may be also in gene name, such as in IGKV1/OR-3*01
        if term.count('/') == 2:
            trim_left, n_region, trim_right = term.split('/')
            try:
                n_insert = int(n_region)
                n_region = '[ACGT]{%d}' % n_insert
            except ValueError: # already /ACGTG/
                pass

159 160 161 162 163
            if args.ignore_N:
                trim_left = '\d+'
                n_region = '[ACGT]*'
                trim_right = '\d+'

164
            return ['/'.join((trim_left, n_region, trim_right))]
165 166 167 168 169 170

        # Gene name, possibly without allele information
        if not '*' in term:
            # Some 'genes', such as KDE, do not have allele information
            term += '([*]\d*)?'
        else:
171 172
            gene, allele = term.split('*')

173 174 175 176
            if args.ignore_D and ('IGHD' in gene or 'TRBD' in gene or 'TRDD' in gene):
                gene = '\S*'
                allele = '\d*'

177 178 179 180
            if args.ignore_allele:
                allele = '\d*'

            term = gene + '[*]' + allele
181

182
        return [term]
183

184 185

    r = []
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
    p = re.sub('\s*,\s*', ' ,', p)

    m = re.search('^(.*)\s*\((.+)\)\s*(.*)$', p)
    if m:
        # We have parentheses
        re1 = should_pattern_to_regex(m.group(1)).pattern
        re2 = '('+should_pattern_to_regex(m.group(2)).pattern+')'
        re3 = should_pattern_to_regex(m.group(3)).pattern
        regex_pattern = '\s*'.join(x for x in [re1, re2, re3])
    else:
        # We have a parenthesis free expression
        for term in p.split():
            r += process_term(term)

        regex_pattern = ' .*'.join(r)
201 202 203 204

    try:
        regex = re.compile(regex_pattern)
    except:
205
        sys.stderr.write("Error. Invalid regex_pattern: " + regex_pattern)
206
        sys.exit(4)
207

208 209 210 211
    if args.verbose:
        print
        print '      ', p, '->', regex_pattern

212 213 214
    return regex


215 216
r_locus = re.compile('\[\S+\]')

217
def should_result_to_tap(should_pattern, result, tap_id):
218
    '''
219 220
    Parses (should, result) couples such as:
    'TRDD2*01 1/AGG/1 TRDD3*01  TRD+', 'TRDD2*01 1/AGG/1 TRDD3*01  TRD+'
221
    or
222
    'TRDV3*01 0//0 TRDJ4*01, 'TRD UNSEG noisy'
223 224 225
    and return a .tap line
    '''

226
    m_locus = r_locus.search(should_pattern)
227

228 229 230 231 232
    if m_locus:
        locus = m_locus.group(0)
        should_pattern = should_pattern.replace(locus, '')
        locus = locus.replace('[','').replace(']','')
    elif '  ' in should_pattern: # deprecated 'two spaces', will be removed
233
        locus = should_pattern.split('  ')[1].strip().split(' ')[0]
234
        should_pattern = should_pattern.split('  ')[0]
235 236 237
    else:
        locus = None

238 239
    if args.after_two:
        # Testing only the locus code
240
        if not locus:
241
            return '# %d - not tested (no locus)' % tap_id
242

243 244 245 246 247 248 249 250
        found = (locus in result) and not ('%s UNSEG' % locus in result)

    else:
        # Testing the should pattern
        should_regex = should_pattern_to_regex(should_pattern)
        match = should_regex.search(result)
        found = (match is not None)

251 252
    globals()['global_stats'][locus] += 1

253 254
    ### Main output, .tap compliant, and main count

255 256
    tap = ''

257
    if not found:
258
        globals()['global_stats_failed'][locus] += 1
259
        if 'TODO' in should_pattern:
260 261
            globals()['global_stats_todo'][locus] += 1
        else:
262
            globals()['global_failed'] += 1
263

264 265
        tap += 'not '

266 267
    tap += 'ok %d ' % tap_id

268
    ### Additional output/warnings
269

270 271 272 273
    special = False
    warn = False

    for kw in SPECIAL_KEYWORDS:
274
        if kw in should_pattern:
275 276 277 278 279 280 281 282 283 284 285 286
            tap += '# %s ' % kw
            special = True

    warn = not found ^ special

    if warn:
        tap += ansi.Style.BRIGHT \
               + [ansi.Fore.RED, ansi.Fore.GREEN][found] \
               + '#! %s ' % ['not ok', 'ok'][found] \
               + ansi.Style.RESET_ALL

    ### Pattern
287 288

    tap += '- ' + should_pattern
289

290
    if not found:
291 292 293 294 295 296 297
        tap += ' - found instead ' + result

    return tap


def should_to_tap_one_file(f_should):

298
    f_tap = f_should + PROG_TAG + TAP_SUFFIX
299 300 301
    f_tap = f_tap.replace(SHOULD_SUFFIX, '')

    print "<== %s" % f_should
302
    id_lines = list(should_results_from_program(f_should))
303 304 305 306 307 308 309 310 311 312

    if not(id_lines):
        print "Error. There is no '>' line in this file."
        sys.exit(2)

    print "==> %s" % f_tap

    with open(f_tap, 'w') as ff:
        ff.write("1..%d\n" % len(id_lines))

313 314
        for tap_id, (should, result) in enumerate(id_lines):
            tap_line = should_result_to_tap(should, result, tap_id+1)
315
            if args.verbose or '#!' in tap_line:
316 317 318 319 320 321 322
                print tap_line
            ff.write(tap_line + '\n')



if __name__ == '__main__':

323
    for f_should in args.file:
324
        should_to_tap_one_file(f_should)
325 326 327 328 329

        if args.revcomp:
            f_should_rc = f_should + '.rc'
            os.system('python ../../germline/revcomp-fasta.py < %s > %s' % (f_should, f_should_rc))
            should_to_tap_one_file(f_should_rc)
330 331
        print

332
    print "=== Summary, should-vdj tests ===" + (' (only locus)' if args.after_two else '')
333
    print "            tested     passed     failed (todo)"
334
    for locus in sorted(global_stats):
335 336 337 338
        print "    %-5s     %4d       %4d       %4d   %4s" % (locus, global_stats[locus], global_stats[locus] - global_stats_failed[locus], global_stats_failed[locus],
                                                              ("(%d)" % global_stats_todo[locus] if global_stats_todo[locus] else ''))
    print "    =====     %4d       %4d       %4d   %4s" % (sum(global_stats.values()), sum(global_stats.values()) - sum(global_stats_failed.values()), sum(global_stats_failed.values()),
                                                           "(%d)" % sum(global_stats_todo.values()))
339 340
    print

341 342 343
    if args.revcomp:
        args.expected_fails *= 2

344 345
    if not global_failed == args.expected_fails:
        print "! We were expecting %s non-TODO failed tests, but there are %s such failures." % (args.expected_fails, global_failed)
346
        sys.exit(1)