Mentions légales du service

Skip to content
Snippets Groups Projects
Commit e82af8b5 authored by BOULLE Olivier's avatar BOULLE Olivier
Browse files

init

parent ba1cca2f
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
import os
import sys
import inspect
import time
import random
import reads_consensus_class as rcc
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.dirname(currentdir)+"/synthesis_modules")
import dna_file_reader as dfr
import synthesis_simulation as ss
def generate_references_sequences(seq_number, references_path) -> dict:
"""
create random sequences with a checksum to use as references for the tests
"""
h_max = 3 # maxi homopolymere size
seq_size = 60
generated_seq_dict = {}
for seq_num in range(seq_number):
sequence = ""
while len(sequence) < seq_size :
alphabet = ["A", "G", "C", "T"]
letter = random.choice(alphabet)
if sequence[-h_max:] == letter*h_max: # if the end of the sequence is a homopolymer of this letter
alphabet.remove(letter)
letter = random.choice(alphabet) # then pick another one
sequence += letter
generated_seq_dict["ref_"+str(seq_num)] = sequence
dfr.save_dict_to_fasta(generated_seq_dict, references_path)
def generate_random_reads(references_file_path, coverage_list, reads_path):
"""
generate a read file containing the simulated reads from {assembly_number} sequences, {reads_per_file} times each
"""
#i_error, d_error, s_error = 0.005, 0.01, 0.005 # ~2% error rate
i_error, d_error, s_error = 0.01, 0.025, 0.01 # ~4% error rate
ref_dict = dfr.read_fasta(references_file_path)
output_read_dict = {}
for i, (seq_name, sequence) in enumerate(ref_dict.items()):
for j in range(coverage_list[i]):
simulated_read_seq = ss.add_errors(sequence, i_error, d_error, s_error)
output_read_dict[seq_name+"_"+str(j)] = simulated_read_seq
dfr.save_dict_to_fastq(output_read_dict, reads_path)
def get_minimisers(reads_path):
kmer_size = 10
minimiser_size = 6
minimisers_dict = {}
reads_dict = dfr.read_fastq(reads_path)
for read_seq in reads_dict.values():
sequence_size = len(read_seq)
read_seq_rv = dfr.reverse_complement(read_seq)
for i in range(sequence_size-kmer_size+1):
sub_fw = read_seq[i:i+kmer_size]
sub_rv = read_seq_rv[sequence_size-kmer_size-i:sequence_size-i]
#print(sub_fw)
#print(sub_rv)
minimiser = "Z"*minimiser_size
for j in range(kmer_size-minimiser_size+1):
sub_minimiser = sub_fw[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
sub_minimiser = sub_rv[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
#print(minimiser)
minimisers_dict[minimiser] = minimisers_dict.get(minimiser, 0) +1
print(str(len(minimisers_dict))+" minimisers")
return minimisers_dict
def generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path):
reads_dict = dfr.read_fastq(reads_path)
reads_name_list = list(reads_dict.keys())
random.shuffle(reads_name_list)
soluce_dict = {}
for i, read_name in enumerate(reads_name_list):
read_cluster = int(read_name.split("_")[1])
soluce_dict[read_cluster] = soluce_dict.get(read_cluster, []) + ["r"+str(i)]
soluce_dict = dict(sorted(soluce_dict.items()))
print(soluce_dict)
with open(solutions_path, 'w') as output_file:
for cluster_name in soluce_dict.keys():
output_file.write(str(cluster_name)+":"+",".join(soluce_dict[cluster_name])+"\n")
hidden_reads_name = ["r"+str(k) for k in range(len(reads_dict))]
ones_counter = 0
with open(matrix_path, 'w') as output_file:
output_file.write(","+",".join(hidden_reads_name)+"\n")
for i, minimiser in enumerate(minimisers_dict.keys()):
output_file.write("m"+str(i)+",")
minimiser_line = []
for read_name in reads_name_list:
read_seq = reads_dict[read_name]
read_seq_rv = dfr.reverse_complement(read_seq)
if minimiser in read_seq or minimiser in read_seq_rv:
minimiser_line.append("1")
ones_counter += 1
else:
minimiser_line.append("0")
output_file.write(",".join(minimiser_line)+"\n")
cases_counter = len(reads_name_list)*len(minimisers_dict)
print(str(ones_counter)+ "/"+ str(cases_counter) +" = "+ str(round(100*ones_counter/cases_counter, 1))+"%")
def matrix_generation():
coverage_list = [13,11,11,10,9,9,8,8,8,7,5,1]
ref_path = "matrix_tests/references.fasta"
reads_path = "matrix_tests/reads.fastq"
solutions_path = "matrix_tests/soluce.txt"
matrix_path = "matrix_tests/matrix.csv"
generate_references_sequences(12, ref_path)
generate_random_reads(ref_path, coverage_list, reads_path)
minimisers_dict = get_minimisers(reads_path)
generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path)
if __name__ == "__main__":
print("generate matrix...")
#for i in [1000*k for k in range(1,21)]:
# min_occ_testing(i)
#for i in range(1):
# consensus_testing(3000)
matrix_generation()
#consensus_testing(50000)
#s = verify_check_sum("test_consensus/consensus_test.fasta")
#print(s)
print("\tcompleted !")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment