-
BOULLE Olivier authoredBOULLE Olivier authored
read_matrix.py 5.41 KiB
#!/usr/bin/python3
import os
import sys
import inspect
import time
import random
import reads_consensus_class as rcc
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.dirname(currentdir)+"/synthesis_modules")
import dna_file_reader as dfr
import synthesis_simulation as ss
def generate_references_sequences(seq_number, references_path) -> dict:
"""
create random sequences with a checksum to use as references for the tests
"""
h_max = 3 # maxi homopolymere size
seq_size = 60
generated_seq_dict = {}
for seq_num in range(seq_number):
sequence = ""
while len(sequence) < seq_size :
alphabet = ["A", "G", "C", "T"]
letter = random.choice(alphabet)
if sequence[-h_max:] == letter*h_max: # if the end of the sequence is a homopolymer of this letter
alphabet.remove(letter)
letter = random.choice(alphabet) # then pick another one
sequence += letter
generated_seq_dict["ref_"+str(seq_num)] = sequence
dfr.save_dict_to_fasta(generated_seq_dict, references_path)
def generate_random_reads(references_file_path, coverage_list, reads_path):
"""
generate a read file containing the simulated reads from {assembly_number} sequences, {reads_per_file} times each
"""
#i_error, d_error, s_error = 0.005, 0.01, 0.005 # ~2% error rate
i_error, d_error, s_error = 0.01, 0.025, 0.01 # ~4% error rate
ref_dict = dfr.read_fasta(references_file_path)
output_read_dict = {}
for i, (seq_name, sequence) in enumerate(ref_dict.items()):
for j in range(coverage_list[i]):
simulated_read_seq = ss.add_errors(sequence, i_error, d_error, s_error)
output_read_dict[seq_name+"_"+str(j)] = simulated_read_seq
dfr.save_dict_to_fastq(output_read_dict, reads_path)
def get_minimisers(reads_path):
kmer_size = 10
minimiser_size = 6
minimisers_dict = {}
reads_dict = dfr.read_fastq(reads_path)
for read_seq in reads_dict.values():
sequence_size = len(read_seq)
read_seq_rv = dfr.reverse_complement(read_seq)
for i in range(sequence_size-kmer_size+1):
sub_fw = read_seq[i:i+kmer_size]
sub_rv = read_seq_rv[sequence_size-kmer_size-i:sequence_size-i]
#print(sub_fw)
#print(sub_rv)
minimiser = "Z"*minimiser_size
for j in range(kmer_size-minimiser_size+1):
sub_minimiser = sub_fw[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
sub_minimiser = sub_rv[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
#print(minimiser)
minimisers_dict[minimiser] = minimisers_dict.get(minimiser, 0) +1
print(str(len(minimisers_dict))+" minimisers")
return minimisers_dict
def generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path):
reads_dict = dfr.read_fastq(reads_path)
reads_name_list = list(reads_dict.keys())
random.shuffle(reads_name_list)
soluce_dict = {}
for i, read_name in enumerate(reads_name_list):
read_cluster = int(read_name.split("_")[1])
soluce_dict[read_cluster] = soluce_dict.get(read_cluster, []) + ["r"+str(i)]
soluce_dict = dict(sorted(soluce_dict.items()))
print(soluce_dict)
with open(solutions_path, 'w') as output_file:
for cluster_name in soluce_dict.keys():
output_file.write(str(cluster_name)+":"+",".join(soluce_dict[cluster_name])+"\n")
hidden_reads_name = ["r"+str(k) for k in range(len(reads_dict))]
ones_counter = 0
with open(matrix_path, 'w') as output_file:
output_file.write(","+",".join(hidden_reads_name)+"\n")
for i, minimiser in enumerate(minimisers_dict.keys()):
output_file.write("m"+str(i)+",")
minimiser_line = []
for read_name in reads_name_list:
read_seq = reads_dict[read_name]
read_seq_rv = dfr.reverse_complement(read_seq)
if minimiser in read_seq or minimiser in read_seq_rv:
minimiser_line.append("1")
ones_counter += 1
else:
minimiser_line.append("0")
output_file.write(",".join(minimiser_line)+"\n")
cases_counter = len(reads_name_list)*len(minimisers_dict)
print(str(ones_counter)+ "/"+ str(cases_counter) +" = "+ str(round(100*ones_counter/cases_counter, 1))+"%")
def matrix_generation():
coverage_list = [13,11,11,10,9,9,8,8,8,7,5,1]
ref_path = "matrix_tests/references.fasta"
reads_path = "matrix_tests/reads.fastq"
solutions_path = "matrix_tests/soluce.txt"
matrix_path = "matrix_tests/matrix.csv"
generate_references_sequences(12, ref_path)
generate_random_reads(ref_path, coverage_list, reads_path)
minimisers_dict = get_minimisers(reads_path)
generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path)
if __name__ == "__main__":
print("generate matrix...")
#for i in [1000*k for k in range(1,21)]:
# min_occ_testing(i)
#for i in range(1):
# consensus_testing(3000)
matrix_generation()
#consensus_testing(50000)
#s = verify_check_sum("test_consensus/consensus_test.fasta")
#print(s)
print("\tcompleted !")