Mentions légales du service

Skip to content
Snippets Groups Projects
Commit a0b9857b authored by BOULLE Olivier's avatar BOULLE Olivier
Browse files

turned matrix functions into a class object, adder comments & refactoring

parent 2827c253
No related branches found
No related tags found
No related merge requests found
......@@ -13,17 +13,129 @@ import dna_file_reader as dfr
import synthesis_simulation as ss
def generate_references_sequences(seq_number, references_path) -> dict:
class Minimizer_matrix:
"""
create random sequences with a checksum to use as references for the tests
class to handle matrix stuff
"""
h_max = 3 # maxi homopolymere size
def __init__(self, reads_path: str, minimizers_dict: dict) -> None:
self.lines = [] # list of lines
self.soluce_dict = {} # cluster_name : list of reads from the cluster
self.ones_counter = 0 # number of 1 in the matrix
self.line_number = 0
self.column_number = 0
self.generate_matrix(reads_path, minimizers_dict)
def generate_matrix(self, reads_path: str, minimizers_dict: dict) -> None:
"""
fill the matrix from a .fastq of reads and a dict of minimizers from these reads
"""
# read the .fastq file and get basic info
reads_dict = dfr.read_fastq(reads_path)
reads_name_list = list(reads_dict.keys())
self.line_number = len(minimizers_dict)
self.column_number = len(reads_dict)
# shuffle the reads and save the original clusters in a dict
random.shuffle(reads_name_list)
seq_size = 60
for i, read_name in enumerate(reads_name_list):
read_cluster = int(read_name.split("_")[1])
self.soluce_dict[read_cluster] = self.soluce_dict.get(read_cluster, []) + ["r"+str(i)]
generated_seq_dict = {}
self.soluce_dict = dict(sorted(self.soluce_dict.items()))
# construction of the matrix
for minimizer in minimizers_dict.keys():
minimizer_line = []
for read_name in reads_name_list:
read_seq = reads_dict[read_name]
read_seq_rv = dfr.reverse_complement(read_seq)
# test if minimizer is in the read
if minimizer in read_seq or minimizer in read_seq_rv:
minimizer_line.append("1")
self.ones_counter += 1
else:
minimizer_line.append("0")
self.lines.append(minimizer_line)
# show matrix density
cases_counter = self.line_number*self.column_number
print(str(self.ones_counter)+ "/"+ str(cases_counter) +" = "+ str(round(100*self.ones_counter/cases_counter, 1))+"%")
def print_soluce(self, solutions_path: str) -> None:
"""
save the solution clusters in a file
"""
with open(solutions_path, 'w') as output_file:
for cluster_name in self.soluce_dict.keys():
output_file.write(str(cluster_name)+":"+",".join(self.soluce_dict[cluster_name])+"\n")
def print_matrix_to_csv(self, matrix_csv_path: str) -> None:
"""
save the matrix to .csv format
"""
with open(matrix_csv_path, 'w') as output_file:
output_file.write(","+",".join("r"+str(k) for k in range(self.column_number))+"\n")
for i, line in enumerate(self.lines):
output_file.write("m"+str(i)+",")
output_file.write(",".join(line)+"\n")
def print_matrix_to_coo(self, matrix_coo_path: str) -> None:
"""
save the matrix to COO format
first line : line_number column_numer
coord x y of a 1 each line
"""
with open(matrix_coo_path, 'w') as output_matrix:
output_matrix.write(str(self.line_number)+" "+str(self.column_number)+"\n")
for i, line in enumerate(self.lines):
for j, value in enumerate(line):
if value == "1":
output_matrix.write(str(i)+" "+str(j)+"\n")
def print_matrix_to_csvbm(self, matrix_csvbm_path: str) -> None:
"""
save the matrix to .csvbm format
row_br column_nbr ones_nbr\n
distance_to_first_one\n
...
distance_to_next_one_from_last_one\n
...
"""
with open(matrix_csvbm_path, 'w', encoding='utf-8') as csvbm_file:
csvbm_file.write(
f'{self.line_number} {self.column_number} {self.ones_counter}\n',
)
distance = 0
for line in self.lines:
for column in line:
if column == "1":
csvbm_file.write(f'{distance}\n')
distance = 1
else:
distance += 1
def generate_references_sequences(seq_number: int, references_path: str) -> dict:
"""
create random sequences to use as references for the tests
"""
h_max = 3 # maximum homopolymere size
seq_size = 60 # length of sequences
generated_seq_dict = {} # key:sequence_name, value:sequence
for seq_num in range(seq_number):
sequence = ""
......@@ -39,9 +151,10 @@ def generate_references_sequences(seq_number, references_path) -> dict:
dfr.save_dict_to_fasta(generated_seq_dict, references_path)
def generate_random_reads(references_file_path, coverage_list, reads_path):
def generate_random_reads(references_file_path: str, coverage_list: list, reads_path: str) -> None:
"""
generate a read file containing the simulated reads from {assembly_number} sequences, {reads_per_file} times each
generate a read file containing simulated reads from referrences sequences
with a coverage from the list
"""
#i_error, d_error, s_error = 0.005, 0.01, 0.005 # ~2% error rate
......@@ -60,95 +173,63 @@ def generate_random_reads(references_file_path, coverage_list, reads_path):
dfr.save_dict_to_fastq(output_read_dict, reads_path)
def get_minimisers(reads_path):
def get_minimizers(reads_path: str) -> dict:
"""
get a dict of minimizers from the read file
"""
#TODO minimizers that can also be found in primers should not be used
#TODO optimise the search (lot of time to gain)
kmer_size = 10
minimiser_size = 6
window_size = 10 # window to look for the minimizer
minimizer_size = 6 # length of minimizer kmer
minimisers_dict = {}
minimizer_dict = {} # key:minimizer_kmer, value:number of occurrence in reads
reads_dict = dfr.read_fastq(reads_path)
for read_seq in reads_dict.values():
sequence_size = len(read_seq)
read_seq_rv = dfr.reverse_complement(read_seq)
for i in range(sequence_size-kmer_size+1):
sub_fw = read_seq[i:i+kmer_size]
sub_rv = read_seq_rv[sequence_size-kmer_size-i:sequence_size-i]
#print(sub_fw)
#print(sub_rv)
minimiser = "Z"*minimiser_size
for j in range(kmer_size-minimiser_size+1):
sub_minimiser = sub_fw[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
sub_minimiser = sub_rv[j:j+minimiser_size]
if sub_minimiser < minimiser:
minimiser = sub_minimiser
#print(minimiser)
minimisers_dict[minimiser] = minimisers_dict.get(minimiser, 0) +1
print(str(len(minimisers_dict))+" minimisers")
return minimisers_dict
def generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path):
reads_dict = dfr.read_fastq(reads_path)
reads_name_list = list(reads_dict.keys())
for i in range(sequence_size-window_size+1):
sub_fw = read_seq[i:i+window_size]
sub_rv = read_seq_rv[sequence_size-window_size-i:sequence_size-i]
random.shuffle(reads_name_list)
minimizer = "Z"*minimizer_size # default to erase with first real minimizer found
soluce_dict = {}
for i, read_name in enumerate(reads_name_list):
read_cluster = int(read_name.split("_")[1])
soluce_dict[read_cluster] = soluce_dict.get(read_cluster, []) + ["r"+str(i)]
for j in range(window_size-minimizer_size+1): # this is far from an optimal search
sub_minimizer = sub_fw[j:j+minimizer_size]
if sub_minimizer < minimizer:
minimizer = sub_minimizer
soluce_dict = dict(sorted(soluce_dict.items()))
#print(soluce_dict)
sub_minimizer = sub_rv[j:j+minimizer_size]
if sub_minimizer < minimizer:
minimizer = sub_minimizer
with open(solutions_path, 'w') as output_file:
for cluster_name in soluce_dict.keys():
output_file.write(str(cluster_name)+":"+",".join(soluce_dict[cluster_name])+"\n")
minimizer_dict[minimizer] = minimizer_dict.get(minimizer, 0) +1
hidden_reads_name = ["r"+str(k) for k in range(len(reads_dict))]
print(str(len(minimizer_dict))+" minimizers found in reads")
ones_counter = 0
return minimizer_dict
with open(matrix_path, 'w') as output_file:
output_file.write(","+",".join(hidden_reads_name)+"\n")
for i, minimiser in enumerate(minimisers_dict.keys()):
output_file.write("m"+str(i)+",")
minimiser_line = []
for read_name in reads_name_list:
read_seq = reads_dict[read_name]
read_seq_rv = dfr.reverse_complement(read_seq)
if minimiser in read_seq or minimiser in read_seq_rv:
minimiser_line.append("1")
ones_counter += 1
else:
minimiser_line.append("0")
output_file.write(",".join(minimiser_line)+"\n")
cases_counter = len(reads_name_list)*len(minimisers_dict)
print(str(ones_counter)+ "/"+ str(cases_counter) +" = "+ str(round(100*ones_counter/cases_counter, 1))+"%")
def get_coverage_list(total_sum, min_coverage=30) -> list:
def get_coverage_list(total_sum: int, min_coverage=30) -> list:
"""
return a list of number for the coverage fo the generated reads
return a list of number for a coverage of each generated read
total_sum is the required sum of all coverage in the list
"""
coverage_list = []
current_sum = 0
while current_sum <= total_sum:
# random coverage between min and 2*min
coverage_i = random.randint(min_coverage, 2*min_coverage)
current_sum += coverage_i
coverage_list.append(coverage_i)
# adjust the last cluster to remove the excedent
coverage_list[-1] = coverage_list[-1]-(current_sum-total_sum)
# sorted by descending
# sorted by highest->lowest
coverage_list = sorted(coverage_list, reverse=True)
print(coverage_list)
......@@ -157,76 +238,31 @@ def get_coverage_list(total_sum, min_coverage=30) -> list:
return coverage_list
def matrix_generation(column_number, matrix_name, dir_path):
def matrix_generation(column_number: int, dir_path: str) -> None:
"""
generate a matrix of simulated reads
column corresponds to a read
line to a minimizer
cell x y = 1 if the read y contains the minimizer x, else 0
"""
coverage_list = get_coverage_list(column_number) # list of coverage for each reference
ref_path = dir_path +"references.fasta"
reads_path = dir_path +"reads.fastq"
solutions_path = dir_path +"soluce.txt"
matrix_path = dir_path + matrix_name+".csv"
matrix_path = dir_path + "matrix.csv"
generate_references_sequences(len(coverage_list), ref_path)
generate_random_reads(ref_path, coverage_list, reads_path)
minimisers_dict = get_minimisers(reads_path)
generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path)
def matrix_to_COO(input_matrix_csv, output_COO):
matrix_lines = []
with open(input_matrix_csv, 'r') as input_matrix:
input_matrix.readline() # skip columns names
line = input_matrix.readline()
while line != "":
content = line.split(",")
matrix_lines.append(content[1:])
line = input_matrix.readline()
with open(output_COO, 'w') as output_matrix:
output_matrix.write(str(len(matrix_lines))+" "+str(len(matrix_lines[0]))+"\n")
for i, line in enumerate(matrix_lines):
for j, value in enumerate(line):
if value == "1":
output_matrix.write(str(i)+" "+str(j)+"\n")
def matrix_to_CSVBM(input_matrix_csv, output_CSVBM):
"""convert a csv matrix to the csvbm format
row_br column_nbr ones_nbr\n
distance_to_first_one\n
...
distance_to_next_one_from_last_one\n
...
"""
matrix_rows = []
number_of_ones = 0
with open(input_matrix_csv, 'r') as input_matrix:
input_matrix.readline() # skip columns names
line = input_matrix.readline()
while line != "":
content = line.replace("\n","").split(",")[1:] # first one is row name
matrix_rows.append(content)
number_of_ones += sum(int(k) for k in content)
line = input_matrix.readline()
minimizer_dict = get_minimizers(reads_path)
number_rows = len(matrix_rows)
number_columns = len(matrix_rows[0])
with open(output_CSVBM, 'w', encoding='utf-8') as csvbm_file:
csvbm_file.write(
f'{number_rows} {number_columns} {number_of_ones}\n',
)
distance = 0
for row in matrix_rows:
for column in row:
if column == "1":
csvbm_file.write(f'{distance}\n')
distance = 1
else:
distance += 1
matrix_test = Minimizer_matrix(reads_path, minimizer_dict) # init a matrix class object
# save the matrix and solution
matrix_test.print_soluce(solutions_path)
matrix_test.print_matrix_to_csv(matrix_path)
matrix_test.print_matrix_to_csvbm(matrix_path+"bm")
def display_results(input_matrix_csv, results_dir, result_output):
......@@ -272,12 +308,11 @@ if __name__ == "__main__":
print("generate matrix...")
column_number = 10000
matrix_name = "matrix_10k_bis"
column_number = 1000
dir_path = "matrix_tests/matrix_100_test/"
matrix_generation(column_number, dir_path)
dir_path = "matrix_tests/"+matrix_name+"/"
matrix_generation(column_number, matrix_name, dir_path)
matrix_to_CSVBM(dir_path+ matrix_name+".csv", dir_path + matrix_name+".csvbm")
#display_results(dir_path+"/matrix_100/matrix.csv", dir_path+"/matrix_100_results", dir_path+"/matrix_100/matrix_100_ordered_results.csv")
print("\tcompleted !")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment