should-vdj-to-tap.py 9.28 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
'''
Tests one ore several .should-vdj.fa files.
A .should-vdj.fa file is a fasta file whose identifiers are in the form:

>should_pattern comments

The 'should_pattern' describes the expected V(D)J segmentation in a .vdj format.
All spaces is the pattern have to be replaced by '_'.
The script launches Vidjil (or any other PROGRAM) on the .shoud-vdj.fasta file,
expecting that it returns another fasta file whose identifiers are in the form:

>should_pattern result

The 'should_pattern' is then checked against the 'result' part, and a .tap file is produced.
'''

import sys
18
import re
19
import ansi
20 21 22 23 24 25

PY_REQUIRED = (2, 7)
if sys.version_info < PY_REQUIRED:
    print("This script requires Python >= %d.%d." % (PY_REQUIRED))
    sys.exit(1)

26
from subprocess import Popen, PIPE, STDOUT
27
from collections import defaultdict
28 29

import os
30
import argparse
31

32
VIDJIL_FINE = '{directory}/vidjil -X 100 -# "#" -c segment -i -3 -d -g {directory}/germline %s >> %s'
33
VIDJIL_KMER = '{directory}/vidjil -# "#" -b out -c windows -uU -2 -i -g {directory}/germline %s > /dev/null ; cat out/out.segmented.vdj.fa out/out.unsegmented.vdj.fa >> %s'
34 35 36 37

parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--program', '-p', default=VIDJIL_FINE, help='program to launch on each file (%(default)s)')
parser.add_argument('-q', dest='program', action='store_const', const=VIDJIL_KMER, help='shortcut for -p (VIDJIL_KMER), to be used with -2')
38
parser.add_argument('--ignore_N', '-N', action='store_true', help='ignore N patterns, checking only gene and allele names')
39
parser.add_argument('--ignore_allele', '-A', action='store_true', help='ignore allele, checking only gene names')
40
parser.add_argument('--ignore_D', '-D', action='store_true', help='ignore D gene names and alleles')
41
parser.add_argument('--after-two', '-2', action='store_true', help='compare only the right part of the pattern after two underscores (locus code)')
42
parser.add_argument('--revcomp', '-r', action='store_true', help='duplicate the tests on reverse-complemented files')
43
parser.add_argument('--directory', '-d', default='../..', help='base directory where Vidjil is. This value is used by the default -p and -q values (%(default)s)')
44 45
parser.add_argument('--verbose', '-v', action='store_true')

46 47 48
parser.add_argument('file', nargs='+', help='''.should-vdj.fa files''')

args = parser.parse_args()
49 50

SHOULD_SUFFIX = '.should-vdj.fa'
51

52 53 54
TAP_SUFFIX = '.tap'
LOG_SUFFIX = '.log'

55 56 57 58 59 60
PROG_TAG = '.1'

if args.after_two:
    PROG_TAG = '.2'


61
global_failed = False
62 63
global_stats = defaultdict(int)
global_stats_failed = defaultdict(int)
64
global_stats_todo = defaultdict(int)
65 66

def fasta_id_lines_from_program(f_should):
67
    f_log = f_should + PROG_TAG + LOG_SUFFIX
68 69
    f_log = f_log.replace(SHOULD_SUFFIX, '')

70 71
    program = args.program.replace('{directory}',args.directory)
    cmd = program % (f_should, f_log)
72 73 74

    with open(f_log, 'w') as f:
        f.write('# %s\n\n' % cmd)
75 76 77 78 79

    exit_code = os.system(cmd)
    if exit_code:
        print "Error. The program halted with exit code %s." % exit_code
        sys.exit(3)
80 81 82 83 84 85 86

    for l in open(f_log):
        if not l:
            continue
        if l[0] == '>':
            yield l

87 88 89 90 91 92 93 94
def should_pattern_to_regex(p):
    '''
    Converts patterns such as the following ones into a Python regex.

    TRBV6-1 7/0/12 TRBD2 1/8/2 TRBJ2-7
    TRDV2*02 14/TCCCGGCCT/0 TRDD3*01_3/CCACGGC/4_TRAJ29*01
    TRDD2 18//4 TRAJ29 TRA+D # comments comments ...
    TRGV11 2/5/0 TRGJ1
95
    IGHV3-48 0/AA/6 IGHD5-12 (2//7, 3//6, 4//5) IGHJ4*02
96 97 98
    '''


99
    def process_term(term):
100 101 102

        # Comment, stop parsing here
        if term.startswith('#'):
103
            return []
104

105 106 107 108 109 110 111 112
        # Ambiguous/alternate pattern
        if term.startswith('('):
            choices = []
            term_without_par = term[1:-1]
            for t in term_without_par.split(','):
                choices += process_term(t)
            return ['(%s)' % '|'.join(choices)]

113 114 115 116
        # (such as CDR3 / junction)
        if term.startswith('{'):
            term = term.replace('*','[*]').replace('!','#')
            term = term.replace('{', '.*[{].*').replace('}', '.*[}]')
117
            return [term]
118

119 120 121 122 123 124 125 126 127 128
        # deletion/insertion/deletion
        # Note that '/' may be also in gene name, such as in IGKV1/OR-3*01
        if term.count('/') == 2:
            trim_left, n_region, trim_right = term.split('/')
            try:
                n_insert = int(n_region)
                n_region = '[ACGT]{%d}' % n_insert
            except ValueError: # already /ACGTG/
                pass

129 130 131 132 133
            if args.ignore_N:
                trim_left = '\d+'
                n_region = '[ACGT]*'
                trim_right = '\d+'

134
            return ['/'.join((trim_left, n_region, trim_right))]
135 136 137 138 139 140

        # Gene name, possibly without allele information
        if not '*' in term:
            # Some 'genes', such as KDE, do not have allele information
            term += '([*]\d*)?'
        else:
141 142
            gene, allele = term.split('*')

143 144 145 146
            if args.ignore_D and ('IGHD' in gene or 'TRBD' in gene or 'TRDD' in gene):
                gene = '\S*'
                allele = '\d*'

147 148 149 150
            if args.ignore_allele:
                allele = '\d*'

            term = gene + '[*]' + allele
151

152
        return [term]
153

154 155

    r = []
156
    p = p.replace(', ', ',')
157 158 159 160

    for term in p.split():
        r += process_term(term)
        
161
    regex_pattern = ' '.join(r)
162 163 164 165

    try:
        regex = re.compile(regex_pattern)
    except:
166
        sys.stderr.write("Error. Invalid regex_pattern: " + regex_pattern)
167
        sys.exit(4)
168

169 170 171 172
    if args.verbose:
        print
        print '      ', p, '->', regex_pattern

173 174 175
    return regex


176 177
r_locus = re.compile('\[\S+\]')

178 179 180 181
def id_line_to_tap(l, tap_id):
    '''
    Parses lines such as:
    >TRDD2*01_1/AGG/1_TRDD3*01__TRD+ + VJ 	0 84 88 187	TRDD2*01 1/AGG/1 TRDD3*01  TRD+
182 183
    or
    >TRDV3*01_0//0_TRDJ4*01 ! + VJ	0 49 50 97       TRD UNSEG noisy
184 185 186 187
    and return a .tap line
    '''

    l = l.strip()
188 189
    pos = l.find('\t')
    should = l[1:pos].replace(' + VJ','').replace(' + VDJ','').replace(' - VJ','').replace(' - VDJ','')
190
    result = l[pos+1:] + ' '
191 192

    should_pattern = should.replace('_', ' ')
193
    m_locus = r_locus.search(should_pattern)
194

195 196 197 198 199
    if m_locus:
        locus = m_locus.group(0)
        should_pattern = should_pattern.replace(locus, '')
        locus = locus.replace('[','').replace(']','')
    elif '  ' in should_pattern: # deprecated 'two spaces', will be removed
200
        locus = should_pattern.split('  ')[1].strip().split(' ')[0]
201
        should_pattern = should_pattern.split('  ')[0]
202 203 204
    else:
        locus = None

205 206
    if args.after_two:
        # Testing only the locus code
207
        if not locus:
208
            return '# %d - not tested (no locus)' % tap_id
209

210 211 212 213 214 215 216 217
        found = (locus in result) and not ('%s UNSEG' % locus in result)

    else:
        # Testing the should pattern
        should_regex = should_pattern_to_regex(should_pattern)
        match = should_regex.search(result)
        found = (match is not None)

218 219
    globals()['global_stats'][locus] += 1

220 221
    tap = ''

222
    if not found:
223
        globals()['global_stats_failed'][locus] += 1
224 225 226 227 228
        if 'TODO' in should:
            globals()['global_stats_todo'][locus] += 1
        else:
            globals()['global_failed'] = True

229 230
        tap += 'not '

231 232 233 234
    tap += 'ok %d ' % tap_id

    if 'TODO' in should:
        tap += '# TODO '
235 236 237
    else:
        if not found:
            tap += ansi.Style.BRIGHT + ansi.Fore.RED + '# not ok ' + ansi.Style.RESET_ALL
238 239

    tap += '- ' + should_pattern
240

241
    if not found:
242 243 244 245 246 247 248
        tap += ' - found instead ' + result

    return tap


def should_to_tap_one_file(f_should):

249
    f_tap = f_should + PROG_TAG + TAP_SUFFIX
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    f_tap = f_tap.replace(SHOULD_SUFFIX, '')

    print "<== %s" % f_should
    id_lines = list(fasta_id_lines_from_program(f_should))

    if not(id_lines):
        print "Error. There is no '>' line in this file."
        sys.exit(2)

    print "==> %s" % f_tap

    with open(f_tap, 'w') as ff:
        ff.write("1..%d\n" % len(id_lines))

        for tap_id, l in enumerate(id_lines):
            tap_line = id_line_to_tap(l, tap_id+1)
266
            if args.verbose or 'not ok' in tap_line:
267 268 269 270 271 272 273
                print tap_line
            ff.write(tap_line + '\n')



if __name__ == '__main__':

274
    for f_should in args.file:
275
        should_to_tap_one_file(f_should)
276 277 278 279 280

        if args.revcomp:
            f_should_rc = f_should + '.rc'
            os.system('python ../../germline/revcomp-fasta.py < %s > %s' % (f_should, f_should_rc))
            should_to_tap_one_file(f_should_rc)
281 282
        print

283
    print "=== Summary, should-vdj tests ===" + (' (only locus)' if args.after_two else '')
284
    print "            tested     passed     failed (todo)"
285
    for locus in sorted(global_stats):
286 287 288 289
        print "    %-5s     %4d       %4d       %4d   %4s" % (locus, global_stats[locus], global_stats[locus] - global_stats_failed[locus], global_stats_failed[locus],
                                                              ("(%d)" % global_stats_todo[locus] if global_stats_todo[locus] else ''))
    print "    =====     %4d       %4d       %4d   %4s" % (sum(global_stats.values()), sum(global_stats.values()) - sum(global_stats_failed.values()), sum(global_stats_failed.values()),
                                                           "(%d)" % sum(global_stats_todo.values()))
290 291
    print

292 293
    if global_failed:
        sys.exit(1)