Commit b9151364 authored by Mikaël Salson's avatar Mikaël Salson

generate-recombinations.py: update script

Generate all possible recombinations and add the possibility to have
random number of insertions and deletions at the junctions (following a
Gaussian law) and add the possiblity to have mutation in the sequences.
parent e9befd96
......@@ -4,32 +4,200 @@ from __future__ import print_function
import json
import fasta
import random
import argparse
random.seed(33328778554)
def recombine_VJ(seq5, remove5, N, remove3, seq3):
name = "%s %d/%s/%d %s" % (seq5.name, remove5, N, remove3, seq3.name)
seq = seq5.seq[:len(seq5)-remove5] + '\n' + N + '\n' + seq3.seq[remove3:]
class vdj_repertoire:
'''
The class generates recombinations among a set of sequences
'''
labels = []
sequences = []
def __init__(self, labels = None, repertoire = None):
'''
repertoire should be a list of dictionary of a list of Fasta sequences
'''
if repertoire is not None:
self.sequences = repertoire
self.labels = labels
@classmethod
def files(self, labels, repertoire):
'''
Provide a list of list of sequences
'''
sequences = []
for filenames in repertoire:
current_sequences = []
for filename in filenames:
current_sequences += list(fasta.parse_as_Fasta(open(filename)))
sequences.append(current_sequences)
return self(labels, sequences)
def germlines(self):
return self.labels
def nb_sequences(self, label):
'''
>>> rep = vdj_repertoire(['a', 'b'], [['aat', 'taa'], ['gcc']])
>>> rep.nb_sequences('a')
2
>>> rep.nb_sequences('b')
1
>>> rep.nb_sequences('c')
Traceback (most recent call last):
...
ValueError: 'c' is not in list
'''
index = self.labels.index(label)
return len(self.sequences[index])
def recombinations(self, at_most = None):
'''
Returns a list of recombinations.
The recombinations are given under the form of a list.
>>> [v for v in vdj_repertoire(['a', 'b'], [['aat', 'taa'],\
['gcc']]).recombinations()]
[['aat', 'gcc'], ['taa', 'gcc']]
>>> len([v for v in vdj_repertoire(['a', 'b'], [['aat', 'taa'],\
['gcc']]).recombinations(1)])
1
'''
if at_most is not None:
return self._at_most_recombinations_(at_most)
else:
return self._all_recombinations_()
def _at_most_recombinations_(self, at_most):
nb = 0
while nb < at_most:
recombination = []
for current_rep in self.sequences:
recombination.append(random.choice(current_rep))
yield recombination
nb += 1
return fasta.Fasta(name, seq)
def _all_recombinations_(self):
return list_recombinations(self.sequences)
def list_recombinations(l):
'''
>>> [i for i in list_recombinations([[1], [10, 11], [100, 102]])]
[[1, 10, 100], [1, 11, 100], [1, 10, 102], [1, 11, 102]]
>>> [i for i in list_recombinations([[1, 2], [3]])]
[[1, 3], [2, 3]]
'''
if len(l) <= 0:
yield []
else:
for item in l[len(l)-1]:
for recomb in list_recombinations(l[:-1]):
recomb.append(item)
yield recomb
class vdj_recombination:
deletions = None
insertions = None
processing = []
def __init__(self, insertions = None, deletions = None, processing = None):
'''insertions and deletions are lists of length 1 or whose length are the
number of locations where insertions and deletions take place. They
contain a function with no parameter returning a natural integer
corresponding to the number of expected insertions/deletions.
processing is a list of the same size which contains a function which
is applied to a string and which returns a string. It is used to alter
the input sequence.
'''
if insertions is not None:
self.insertions = insertions
else:
self.insertions = [(lambda: 0)]
if deletions is not None:
self.deletions = deletions
else:
self.deletions = [(lambda: 0)]
if processing is not None:
self.processing = processing
else:
self.processing = [(lambda s: s)]
def recombine(self, sequences):
'''
Recombine the sequences with the provided recombinations
>>> str(vdj_recombination().recombine([fasta.Fasta('a', 'AATTAT'),\
fasta.Fasta('b', 'GGGACACAT'),\
fasta.Fasta('c', 'ATAGATATGA')]))
'>a 0//0 b 0//0 c\\nAATTAT\\nGGGACACAT\\nATAGATATGA\\n\\n'
>>> str(vdj_recombination(deletions=[(lambda: 2)]).recombine([fasta.Fasta('a', 'AATTAT'),\
fasta.Fasta('b', 'GGGACACAT'),\
fasta.Fasta('c', 'ATAGATATGA')]))
'>a 2//2 b 2//2 c\\nAATT\\nGACAC\\nAGATATGA\\n\\n'
'''
name = ''
seq = ''
insertions = self.insertions * (len(sequences)-1)
deletions = self.deletions * (len(sequences)*2-2)
process = self.processing * (len(sequences))
for i, sequence in enumerate(sequences):
nb_deletions_start = 0
nb_deletions_end = 0
N_insertions = ''
if i > 0:
# Start deletion
nb_deletions_start = deletions[2*i-1]()
name += '/%d ' % nb_deletions_start
name += sequence.name
if i < len(sequences) - 1:
# End deletion
nb_deletions_end = deletions[2*i]()
N_insertions = random_sequence(['A', 'C', 'G', 'T'],\
insertions[i]())
name += ' %d/%s' % (nb_deletions_end, N_insertions)
nb_deletions_end = -nb_deletions_end if nb_deletions_end > 0 else None
seq += process[i](sequence.seq[nb_deletions_start:nb_deletions_end])+"\n"+N_insertions+"\n"
return fasta.Fasta(name, seq)
def random_sequence(characters, length):
return ''.join([random.choice(characters) for x in range(length)])
def recombine_VJ_with_removes(seq5, remove5, Nlength, remove3, seq3):
'''Recombine V and J with a random N, ensuring that the bases in N are not the same that what was ultimately removed from V or J'''
assert(Nlength >= 1)
available = ['A', 'C', 'G', 'T']
if remove5:
available.remove(seq5.seq[len(seq5)-remove5].upper())
if remove3:
c = seq3.seq[remove3-1].upper()
if c in available:
available.remove(c)
def mutate_sequence(sequence, probability):
'''
Mutate the original DNA sequence given in parameter.
The probability is a per nucleotide probability.
return recombine_VJ(seq5, remove5, random_sequence(available, Nlength), remove3, seq3)
This solution is inspired from Blckknght's: http://stackoverflow.com/a/24063748/1192742
'''
mutated = []
nucleotides = ['A', 'C', 'G', 'T']
for nt in sequence:
if random.random() < probability:
if nt.upper() in nucleotides:
nt = nucleotides[nucleotides.index(nt.upper()) - random.randint(1, 3)]
else:
nt = random.choice(nucleotides)
mutated.append(nt)
return ''.join(mutated)
def random_pos_int(mean, stddev):
'''
Returns a random number whose distribution
has the mean provided has a parameter and the standard deviation
is stddev
'''
result = random.gauss(mean, stddev)
if result < 0:
return 0
return int(result)
def get_gene_name(allele):
'''
......@@ -37,93 +205,94 @@ def get_gene_name(allele):
'''
return allele.name[:allele.name.find('*')]
def select_genes(rep5, rep3, at_most=0):
if at_most > 0 and len(rep5) * len(rep3) > at_most:
return select_genes_randomly(rep5, rep3, at_most)
return select_all_genes(rep5, rep3)
def select_all_genes(rep5, rep3):
genes5 = {}
genes3 = {}
for seq5 in rep5:
gene_name_5 = get_gene_name(seq5)
if not gene_name_5 in genes5:
genes5[gene_name_5] = True
for seq3 in rep3:
gene_name_3 = get_gene_name(seq3)
if not gene_name_5+gene_name_3 in genes3:
genes3[gene_name_5+gene_name_3] = True
yield (seq5, seq3)
def select_genes_randomly(rep5, rep3, at_most):
nb = 0
while nb < at_most:
yield (random.choice(rep5), random.choice(rep3))
nb += 1
def write_seq_to_file(seq, code, file):
seq.header = seq.header.replace(' ', '_')+"__"+code
file.write(str(seq))
def generate_to_file_rec(rep5, rep4, rep3, code, output, recomb_function):
if rep4 == []:
recomb1_left = rep5
else:
recomb1_left = rep4
recomb1_right = rep3
def generate_to_file(repertoire, recombination, code, f, nb_recomb):
print(" ==>", f)
output = open(f, 'w')
nb = 0
for seq5, seq3 in select_genes(recomb1_left, recomb1_right):
seq = recomb_function(seq5, seq3)
if rep4 != []:
nb += generate_to_file_rec(rep5, [], [seq], code, output, recomb_function)
else:
seq.header = seq.header.replace(' ', '_')+"__"+code
output.write(str(seq))
for recomb in repertoire.recombinations():
for i in range(nb_recomb):
write_seq_to_file(recombination.recombine(recomb), code, output)
nb += 1
return nb
def generate_to_file(rep5, rep4, rep3, code, f, recomb_function):
print(" ==>", f)
print(" ==> %d recombinations" % generate_to_file_rec(rep5, rep4, rep3, code, open(f, 'w'), recomb_function))
germlines_json = open('germlines.data').read().replace('germline_data = ', '')
germlines = json.loads(germlines_json)
for code in germlines:
g = germlines[code]
print("--- %s - %-4s - %s" % (g['shortcut'], code, g['description']))
# Read germlines
rep5 = []
for r5 in g['5']:
rep5 += list(fasta.parse_as_Fasta(open(r5)))
rep4 = []
if '4' in g:
for r4 in g['4']:
rep4 += list(fasta.parse_as_Fasta(open(r4)))
rep3 = []
for r3 in g['3']:
rep3 += list(fasta.parse_as_Fasta(open(r3)))
print(" 5: %3d sequences" % len(rep5),
" 4: %3d sequences" % len(rep4),
" 3: %3d sequences" % len(rep3))
# Generate recombinations
generate_to_file(rep5, rep4, rep3, code, '../data/gen/0-removes-%s.should-vdj.fa' % code,
(lambda seq5, seq3: recombine_VJ(seq5, 0, 'ATCG', 0, seq3)))
generate_to_file(rep5, rep4, rep3, code, '../data/gen/5-removes-%s.should-vdj.fa' % code,
(lambda seq5, seq3: recombine_VJ_with_removes(seq5, 5, 4, 5, seq3)))
print()
print(" ==> %d recombinations" % nb)
def list_random_tuple(s):
try:
list = s.split(':')
result_list = []
for item in list:
one, two = map(float, item.split(','))
result_list.append((lambda: random_pos_int(one, two)))
return result_list
except Exception, e:
raise argparse.ArgumentTypeError('A list separated by colons, of couples separated by commas must be provided (ex: 1,2:2,1) '+str(e))
def list_int(s):
try:
result_list = []
for item in s.split(':'):
result_list.append((lambda: int(item)))
return result_list
except Exception, e:
raise argparse.ArgumentTypeError('A list of integers separated by colons must be provided (ex: 1:2) '+str(e))
if __name__ == '__main__':
DESCRIPTION='Script generating fake V(D)J recombinations'
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument('-g', '--germlines', type=file, default='germlines.data', help='path to the germlines.data file')
parser.add_argument('--deletions', '-d', type=list_int, default = [(lambda: 5)], help='List -- separated by colons -- of the number of deletions at junctions (or single value, if the number is the same everywhere).')
parser.add_argument('--insertions', '-i', type=list_int, default = [(lambda: 3)], help='List -- separated by colons -- of the number of insertions at junctions (or single value, if the number is the same everywhere')
parser.add_argument('--random-deletions', '-D', type=list_random_tuple, help='List of random deletions at junctions under the format mean,standard_deviation (or single value, if the number is the same everywhere')
parser.add_argument('--random-insertions', '-I', type=list_random_tuple, help='List of the number of insertions at junctions under the format mean,standard_deviation (or single value, if the number is the same everywhere')
parser.add_argument('-n', '--nb-recombinations', type=int, default=5, help='Number of times each recombination (with insertions/deletions) is generated')
parser.add_argument('-e', '--error', type=float, default = 0., help='Probability of error at the nucleotide level')
args = parser.parse_args()
germlines_json = args.germlines.read().replace('germline_data = ', '')
germlines = json.loads(germlines_json)
for code in germlines:
g = germlines[code]
print("--- %s - %-4s - %s" % (g['shortcut'], code, g['description']))
# Read germlines
nb_recomb = 0
for recomb in g['recombinations']:
labels = ['V']
files = [recomb['5']]
if '4' in recomb:
labels.append('D')
files.append(recomb['4'])
labels.append('J')
files.append(recomb['3'])
repertoire = vdj_repertoire.files(labels, files)
print(" 5: %3d sequences\n" % repertoire.nb_sequences('V'),
" 4: %3d sequences\n" % (repertoire.nb_sequences('D') if 'D' in labels else 0),
" 3: %3d sequences\n" % repertoire.nb_sequences('J'))
code_in_filename = code
if nb_recomb > 0:
code_in_filename = code_in_filename + '-%d' % (nb_recomb+1)
# Generate recombinations
recombination0 = vdj_recombination()
generate_to_file(repertoire, recombination0, code, '../data/gen/0-removes-%s.should-vdj.fa' % code_in_filename, 1)
deletions = args.deletions if args.random_deletions is None else args.random_deletions
insertions = args.insertions if args.random_insertions is None else args.random_insertions
recombination5 = vdj_recombination(deletions=deletions, insertions=insertions, processing = [(lambda s: mutate_sequence(s, args.error))])
generate_to_file(repertoire, recombination5, code, '../data/gen/5-removes-%s.should-vdj.fa' % code_in_filename, args.nb_recombinations)
print()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment