should-vdj-to-tap.py 11.7 KB
Newer Older
1 2 3 4 5 6 7 8
'''
Tests one ore several .should-vdj.fa files.
A .should-vdj.fa file is a fasta file whose identifiers are in the form:

>should_pattern comments

The 'should_pattern' describes the expected V(D)J segmentation in a .vdj format.
All spaces is the pattern have to be replaced by '_'.
9
The script launches vidjil-algo (or any other PROGRAM) on the .shoud-vdj.fasta file,
10 11 12 13 14 15 16 17
expecting that it returns another fasta file whose identifiers are in the form:

>should_pattern result

The 'should_pattern' is then checked against the 'result' part, and a .tap file is produced.
'''

import sys
18
import re
19
import ansi
20 21 22 23 24 25

PY_REQUIRED = (2, 7)
if sys.version_info < PY_REQUIRED:
    print("This script requires Python >= %d.%d." % (PY_REQUIRED))
    sys.exit(1)

26
from subprocess import Popen, PIPE, STDOUT
27
from collections import defaultdict
28 29

import os
30
import argparse
31
import repseq_vdj
32 33

parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
34 35
parser.add_argument('--program', '-p', default=repseq_vdj.VIDJIL_FINE, help='program to launch on each file (%(default)s)')
parser.add_argument('-q', dest='program', action='store_const', const=repseq_vdj.VIDJIL_KMER, help='shortcut for -p (VIDJIL_KMER), to be used with -2')
36
parser.add_argument('--ignore_N', '-N', action='store_true', help='ignore N patterns, checking only gene and allele names')
37
parser.add_argument('--ignore_del', '-s', action='store_true', help='ignore number of deletions at breakpoints')
38
parser.add_argument('--ignore_allele', '-A', action='store_true', help='ignore allele, checking only gene names')
39
parser.add_argument('--ignore_D', '-D', action='store_true', help='ignore D gene names and alleles')
40
parser.add_argument('--ignore_incomplete', '-+', action='store_true', help='ignore incomplete/unusual germlines')
41
parser.add_argument('--ignore_cdr3', '-3', action='store_true', help='ignore CDR3')
42
parser.add_argument('--after-two', '-2', action='store_true', help='compare only the right part of the pattern after two underscores (locus code)')
43
parser.add_argument('--revcomp', '-r', action='store_true', help='duplicate the tests on reverse-complemented files')
44
parser.add_argument('--directory', '-d', default='../..', help='base directory where Vidjil is. This value is used by the default -p and -q values (%(default)s)')
45

46 47
parser.add_argument('--verbose', '-v', action='store_true')

48
parser.add_argument('file', nargs='+', help='''.should-vdj.fa or .vdj files''')
49 50

args = parser.parse_args()
51 52

SHOULD_SUFFIX = '.should-vdj.fa'
53

54 55 56
TAP_SUFFIX = '.tap'
LOG_SUFFIX = '.log'

57 58 59 60 61
PROG_TAG = '.1'

if args.after_two:
    PROG_TAG = '.2'

62 63 64 65
SPECIAL_KEYWORDS = ['TODO']

def special_keywords(after_two):
    return SPECIAL_KEYWORDS + ['BUG' + ('-LOCUS' if after_two else '')]
66

67
global_failed = 0
68
global_stats = defaultdict(int)
69
global_stats_bug = defaultdict(int)
70
global_stats_failed = defaultdict(int)
71
global_stats_todo = defaultdict(int)
72

73 74 75 76 77 78 79 80
def should_pattern_to_regex(p):
    '''
    Converts patterns such as the following ones into a Python regex.

    TRBV6-1 7/0/12 TRBD2 1/8/2 TRBJ2-7
    TRDV2*02 14/TCCCGGCCT/0 TRDD3*01_3/CCACGGC/4_TRAJ29*01
    TRDD2 18//4 TRAJ29 TRA+D # comments comments ...
    TRGV11 2/5/0 TRGJ1
81
    IGHV3-48 0/AA/6 IGHD5-12 (2//7, 3//6, 4//5) IGHJ4*02
82 83 84
    '''


85
    def process_term(term):
86 87 88

        # Comment, stop parsing here
        if term.startswith('#'):
89
            return []
90

91
        if term in special_keywords(args.after_two):
92 93
            return []

94
        # Ambiguous/alternate pattern
95
        if term.startswith(','):
96
            choices = []
97 98
            term_without_comma = term[1:]
            return ['|' + ''.join(process_term(term_without_comma))]
99

100 101 102 103
        # (such as CDR3 / junction)
        if term.startswith('{'):
            term = term.replace('*','[*]').replace('!','#')
            term = term.replace('{', '.*[{].*').replace('}', '.*[}]')
104 105
            if args.ignore_cdr3:
                term = '('+term+')?'
106
            return [term]
107

108 109 110 111 112 113 114 115 116 117
        # deletion/insertion/deletion
        # Note that '/' may be also in gene name, such as in IGKV1/OR-3*01
        if term.count('/') == 2:
            trim_left, n_region, trim_right = term.split('/')
            try:
                n_insert = int(n_region)
                n_region = '[ACGT]{%d}' % n_insert
            except ValueError: # already /ACGTG/
                pass

118
            if args.ignore_N or args.ignore_del:
119 120
                trim_left = '[[:digit:]]*'
                trim_right = '[[:digit:]]*'
121 122
            if args.ignore_N:
                n_region = '[ACGT]*'
123

124 125 126 127
            separator = '/'
            if args.ignore_N:
                separator='/?'
            return [separator.join((trim_left, n_region, trim_right))]
128 129 130 131

        # Gene name, possibly without allele information
        if not '*' in term:
            # Some 'genes', such as KDE, do not have allele information
132
            term += '([*][[:digit:]]*)?'
133
        else:
134 135
            gene, allele = term.split('*')

136 137 138
            if '/' in gene:
                gene = gene.replace('/', '/?')

139
            if args.ignore_D and ('IGHD' in gene or 'TRBD' in gene or 'TRDD' in gene):
140
                gene = '[^[:space]]*'
141
                allele = '[[:digit:]]*'
142

143
            if args.ignore_allele:
144
                allele = '[[:digit:]]*'
145

146 147 148 149
            allele_separator = '[*]'
            if args.ignore_D or args.ignore_allele:
                allele_separator += '?'
            term = gene + allele_separator + allele
150

151
        return [term]
152

153 154

    r = []
155 156 157 158 159 160 161 162
    p = re.sub('\s*,\s*', ' ,', p)

    m = re.search('^(.*)\s*\((.+)\)\s*(.*)$', p)
    if m:
        # We have parentheses
        re1 = should_pattern_to_regex(m.group(1)).pattern
        re2 = '('+should_pattern_to_regex(m.group(2)).pattern+')'
        re3 = should_pattern_to_regex(m.group(3)).pattern
163
        regex_pattern = '[[:space:]]*'.join(x for x in [re1, re2, re3])
164 165 166 167 168
    else:
        # We have a parenthesis free expression
        for term in p.split():
            r += process_term(term)

169
        regex_pattern = '.*'.join(r)
170 171 172 173

    try:
        regex = re.compile(regex_pattern)
    except:
174
        sys.stderr.write("Error. Invalid regex_pattern: " + regex_pattern)
175
        sys.exit(4)
176

177 178 179 180
    if args.verbose:
        print
        print '      ', p, '->', regex_pattern

181 182 183
    return regex


184 185
r_locus = re.compile('\[\S+\]')

186
def should_result_to_tap(should_pattern, result, tap_id):
187
    '''
188 189
    Parses (should, result) couples such as:
    'TRDD2*01 1/AGG/1 TRDD3*01  TRD+', 'TRDD2*01 1/AGG/1 TRDD3*01  TRD+'
190
    or
191
    'TRDV3*01 0//0 TRDJ4*01, 'TRD UNSEG noisy'
192 193 194
    and return a .tap line
    '''

195
    m_locus = r_locus.search(should_pattern)
196

197 198 199 200 201
    if m_locus:
        locus = m_locus.group(0)
        should_pattern = should_pattern.replace(locus, '')
        locus = locus.replace('[','').replace(']','')
    elif '  ' in should_pattern: # deprecated 'two spaces', will be removed
202
        locus = should_pattern.split('  ')[1].strip().split(' ')[0]
203
        should_pattern = should_pattern.split('  ')[0]
204 205 206
    else:
        locus = None

207 208 209
    if locus is not None and '+' in locus and args.ignore_incomplete:
        return None

210 211
    if args.after_two:
        # Testing only the locus code
212
        if not locus:
213
            return '# %d - not tested (no locus)' % tap_id
214

215 216 217 218 219
        found = (locus in result) and not ('%s UNSEG' % locus in result)

    else:
        # Testing the should pattern
        should_regex = should_pattern_to_regex(should_pattern)
220 221
        # match = should_regex.search(result)
        match = os.system("echo '%s' | grep -E '%s' > /dev/null 2>&1" \
222 223
                          % (result.replace("'", "'\\''"),
                             should_regex.pattern.replace("'", "'\\''")))
224
        found = (match == 0) and not ('UNSEG' in result)
225

226 227
    globals()['global_stats'][locus] += 1

228 229
    ### Main output, .tap compliant, and main count

230 231
    tap = ''

232
    if not found:
233
        globals()['global_stats_failed'][locus] += 1
234
        if 'TODO' in should_pattern:
235 236
            globals()['global_stats_todo'][locus] += 1
        else:
237
            globals()['global_failed'] += 1
238 239 240
    if (args.after_two and 'BUG-LOCUS' in should_pattern)\
       or (not args.after_two and 'BUG' in should_pattern):
        globals()['global_stats_bug'][locus] += 1
241

242 243
        tap += 'not '

244 245
    tap += 'ok %d ' % tap_id

246
    ### Additional output/warnings
247

248 249 250
    special = False
    warn = False

251
    for kw in special_keywords(args.after_two):
252
        if kw in should_pattern:
253 254 255 256 257 258 259 260 261 262 263 264
            tap += '# %s ' % kw
            special = True

    warn = not found ^ special

    if warn:
        tap += ansi.Style.BRIGHT \
               + [ansi.Fore.RED, ansi.Fore.GREEN][found] \
               + '#! %s ' % ['not ok', 'ok'][found] \
               + ansi.Style.RESET_ALL

    ### Pattern
265 266

    tap += '- ' + should_pattern
267

268
    if not found:
269 270 271 272 273 274 275
        tap += ' - found instead ' + result

    return tap


def should_to_tap_one_file(f_should):

276
    f_tap = f_should + PROG_TAG + TAP_SUFFIX
277 278
    f_tap = f_tap.replace(SHOULD_SUFFIX, '')

279 280 281
    f_log = f_should + PROG_TAG + LOG_SUFFIX
    f_log = f_log.replace(SHOULD_SUFFIX, '')

282 283 284
    f_vdj = f_should + PROG_TAG + '.vdj'
    f_vdj = f_vdj.replace(SHOULD_SUFFIX, '')

285
    print "<== %s" % f_should
286

287 288 289 290 291 292 293
    vdj = repseq_vdj.VDJ_File()
    vdj.parse_from_gen(repseq_vdj.should_results_from_vidjil(args.program.replace('{directory}',args.directory), f_should, f_log))

    print "==> %s" % f_vdj
    vdj.write(open(f_vdj, 'w'))

    write_should_results_to_tap(vdj, f_tap)
294 295


296
def write_should_results_to_tap(should_results, f_tap):
297 298
    print "==> %s" % f_tap

299 300 301 302
    if not(should_results):
        print "Error. There is no results in this file."
        sys.exit(2)

303
    with open(f_tap, 'w') as ff:
304
        ff.write("1..%d\n" % len(should_results))
305

306
        for tap_id, (should, result) in enumerate(should_results):
307
            tap_line = should_result_to_tap(should, result, tap_id+1)
308 309 310 311
            if tap_line is not None:
                if args.verbose or '#!' in tap_line:
                    print tap_line
                ff.write(tap_line + '\n')
312 313 314 315 316



if __name__ == '__main__':

317
    for f_should in args.file:
318 319 320 321

        if '.vdj' in f_should:
            f_vdj = f_should
            f_tap = f_vdj + TAP_SUFFIX
322 323 324 325

            vdj = repseq_vdj.VDJ_File()
            vdj.parse_from_file(open(f_vdj))
            write_should_results_to_tap(vdj, f_tap)
326 327
            continue

328
        should_to_tap_one_file(f_should)
329 330 331 332 333

        if args.revcomp:
            f_should_rc = f_should + '.rc'
            os.system('python ../../germline/revcomp-fasta.py < %s > %s' % (f_should, f_should_rc))
            should_to_tap_one_file(f_should_rc)
334 335
        print

336
    print "=== Summary, should-vdj tests ===" + (' (only locus)' if args.after_two else '')
337
    print "            tested     passed     bug     failed (todo)"
338
    for locus in sorted(global_stats):
339
        print "    %-5s     %4d       %4d     %4d       %4d   %4s" % (locus, global_stats[locus], global_stats[locus] - global_stats_failed[locus], global_stats_bug[locus], global_stats_failed[locus],
340
                                                              ("(%d)" % global_stats_todo[locus] if global_stats_todo[locus] else ''))
341
    print "    =====     %4d       %4d     %4d       %4d   %4s" % (sum(global_stats.values()), sum(global_stats.values()) - sum(global_stats_failed.values()), sum(global_stats_bug.values()), sum(global_stats_failed.values()),
342
                                                           "(%d)" % sum(global_stats_todo.values()))
343 344
    print

345 346 347
    global_bug = sum(global_stats_bug.values())
    if global_bug < global_failed:
        print "! We were expecting %s failed tests, but there are %s such failures." % (global_bug, global_failed)
348
        sys.exit(1)
349 350 351
    if global_bug > global_failed:
        print "! There were less failed sequences that expected. Please update the files accordingly!"
        sys.exit(2)