Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 3e048d9d authored by BOULLE Olivier's avatar BOULLE Olivier
Browse files

rename file

parent d1329e29
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python3
import os
import sys
import inspect
import time
import random
import reads_consensus_class as rcc
import extract_payload_from_assembly as epfa
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.dirname(currentdir)+"/synthesis_modules")
import dna_file_reader as dfr
sys.path.insert(0, os.path.dirname(currentdir)+"/source_encoding")
import binary_dna_conversion as bdc
import file_to_dna as ftd
def merge_random_sequencing_files(sequencing_dir, result_file, assembly_number, reads_per_file):
list_files = os.listdir(sequencing_dir)
selected_files = random.sample(list_files, assembly_number)
files_content_sum = ""
for file in selected_files:
with open(sequencing_dir+"/"+file) as input:
#with open("barcode06.fastq") as input: # TODO
content_list = input.read().split("---\n")[:-1] # get list of lines
files_content_sum += "---\n".join(random.sample(content_list, reads_per_file))+"---\n" # 4 lines per read in fastq
with open(result_file, 'w') as output:
output.write(files_content_sum)
def verify_consensus(input_path, result_dict):
sequences_dict = dfr.read_fasta(input_path) # get all the sequences from the fasta file
score = 0
bin_list = []
for seq_name, sequence in sequences_dict.items():
binary_string = decode_sequence(sequence)
# different consensus can be decoded to the same bin string (yes for real)
if binary_string not in bin_list:
bin_list.append(binary_string)
score += result_dict.get(binary_string, 0)
#print(score, binary_string)
return score
def decode_sequence(sequence):
# convert the dna sequence into a binary string
binary_from_dna_string = bdc.dna_to_binary_abaab(sequence)
if not binary_from_dna_string:
print("warning file conversion, decoding an empty sequence",seq_name,"in",input_path)
# test if the check_sum corresponds to the binary string
CHECK_SUM_SIZE = 20
binary_string = binary_from_dna_string[:-CHECK_SUM_SIZE]
binary_check_sum = binary_from_dna_string[-CHECK_SUM_SIZE:]
if ftd.compute_check_sum(binary_string) != binary_check_sum:
#print(ftd.compute_check_sum(binary_string),"!=",binary_check_sum)
return "invalid_sequence"
# apply the same filter used in the encoding to the binary string to remove it
binary_string = ftd.apply_binary_filter(binary_string)
binary_string = binary_string[::-1] # reverse the binary string to get the o
return binary_string
def consensus_testing(blocks_size):
ftd.set_block_size(blocks_size)
dir_path = "test_consensus/consensus_test_random_blocks/"
results_path = dir_path+"encoded_bits_"+str(blocks_size)+".txt"
sequencing_dir = dir_path+"sequencing_simu_"+str(blocks_size)
result_merged_file = dir_path+"merged_assembly_"+str(blocks_size)+".fastq"
arr = os.listdir(sequencing_dir) # get list of files
[start_primer, stop_primer, _] = arr[0].split("_")
result_dict = {}
with open(results_path) as input:
for line in input:
line = line.replace("\n", "")
result_dict[line] = 1
repetition_number = 1
assembly_number_list = [100]#[1,3,6,10,30,60,100,300,600]
reads_per_file_list = [400]#[10*k for k in range(1, 101)]
for assembly_number in assembly_number_list:
print("assembly number", assembly_number, "...")
perfect_score_streak = 0
for reads_per_file in reads_per_file_list:
total_score = 0
consensus_time = 0
for i in range(repetition_number): # repeat multiple times to reduce variance
#merge_random_sequencing_files(sequencing_dir, result_merged_file, assembly_number, reads_per_file) # create file with merged assemblies
consensus_file = dir_path+"temp_consensus_files_"+str(blocks_size)+"/consensus_"+str(i)+".fasta"
start_time = time.time()
rcc.kmer_consensus(result_merged_file, consensus_file, start_primer, stop_primer, 25, 10)
consensus_time += (1/repetition_number) * (time.time()-start_time)
epfa.extract_payload_container(dir_path+"temp_consensus_files_"+str(blocks_size), dir_path+"temp_payload_files_"+str(blocks_size))
for j in range(repetition_number):
payload_file = dir_path+"temp_payload_files_"+str(blocks_size)+"/consensus_"+str(j)+".fasta"
score_consensus = verify_consensus(payload_file, result_dict) # cannot be higher than assembly number
total_score += (1/repetition_number) * (score_consensus/assembly_number)
if score_consensus > assembly_number:
print("score error for consensus_"+str(j)+".fasta")
exit(0)
if total_score == 1: # if multiple perfect scores in a row, no need to increase further the number of reads
perfect_score_streak += 1
if perfect_score_streak == 5:
break
else:
perfect_score_streak = 0
with open(dir_path+"scores/score_result_"+str(blocks_size)+"_"+str(assembly_number)+".txt", 'a') as output:
output.write(str(reads_per_file)+" "+ str(round(100*total_score, 2)) + "% "+str(round(consensus_time, 3))+"s\n")
print("score : "+str(round(100*total_score, 2)) + "% "+str(round(consensus_time, 3))+"s")
if __name__ == "__main__":
print("testing consensus...")
consensus_testing(300)
#consensus_testing(700)
#s = verify_check_sum("test_consensus/consensus_test.fasta")
#print(s)
print("\tcompleted !")
#!/usr/bin/python3
import os
import sys
import inspect
import time
import random
import reads_consensus_class as rcc
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.dirname(currentdir)+"/synthesis_modules")
import dna_file_reader as dfr
import synthesis_simulation as ss
def merge_random_sequencing_files(sequencing_dir, result_file, assembly_number, reads_per_file):
list_files = os.listdir(sequencing_dir)
selected_files = random.sample(list_files, assembly_number)
files_content_sum = ""
for file in selected_files:
with open(sequencing_dir+"/"+file) as input:
#with open("barcode06.fastq") as input: # TODO
content_list = input.read().split("---\n")[:-1] # get list of lines
files_content_sum += "---\n".join(random.sample(content_list, reads_per_file))+"---\n" # 4 lines per read in fastq
with open(result_file, 'w') as output:
output.write(files_content_sum)
def verify_consensus(input_path, result_dict):
sequences_dict = dfr.read_fasta(input_path) # get all the sequences from the fasta file
score = 0
bin_list = []
for seq_name, sequence in sequences_dict.items():
binary_string = decode_sequence(sequence)
# different consensus can be decoded to the same bin string (yes for real)
if binary_string not in bin_list:
bin_list.append(binary_string)
score += result_dict.get(binary_string, 0)
#print(score, binary_string)
return score
def decode_sequence(sequence):
# convert the dna sequence into a binary string
binary_from_dna_string = bdc.dna_to_binary_abaab(sequence)
if not binary_from_dna_string:
print("warning file conversion, decoding an empty sequence",seq_name,"in",input_path)
# test if the check_sum corresponds to the binary string
CHECK_SUM_SIZE = 20
binary_string = binary_from_dna_string[:-CHECK_SUM_SIZE]
binary_check_sum = binary_from_dna_string[-CHECK_SUM_SIZE:]
if ftd.compute_check_sum(binary_string) != binary_check_sum:
#print(ftd.compute_check_sum(binary_string),"!=",binary_check_sum)
return "invalid_sequence"
# apply the same filter used in the encoding to the binary string to remove it
binary_string = ftd.apply_binary_filter(binary_string)
binary_string = binary_string[::-1] # reverse the binary string to get the o
return binary_string
def consensus_testing(blocks_size):
ftd.set_block_size(blocks_size)
dir_path = "test_consensus/consensus_test_random_blocks/"
results_path = dir_path+"encoded_bits_"+str(blocks_size)+".txt"
sequencing_dir = dir_path+"sequencing_simu_"+str(blocks_size)
result_merged_file = dir_path+"merged_assembly_"+str(blocks_size)+".fastq"
arr = os.listdir(sequencing_dir) # get list of files
[start_primer, stop_primer, _] = arr[0].split("_")
result_dict = {}
with open(results_path) as input:
for line in input:
line = line.replace("\n", "")
result_dict[line] = 1
repetition_number = 1
assembly_number_list = [100]#[1,3,6,10,30,60,100,300,600]
reads_per_file_list = [400]#[10*k for k in range(1, 101)]
for assembly_number in assembly_number_list:
print("assembly number", assembly_number, "...")
perfect_score_streak = 0
for reads_per_file in reads_per_file_list:
total_score = 0
consensus_time = 0
for i in range(repetition_number): # repeat multiple times to reduce variance
#merge_random_sequencing_files(sequencing_dir, result_merged_file, assembly_number, reads_per_file) # create file with merged assemblies
consensus_file = dir_path+"temp_consensus_files_"+str(blocks_size)+"/consensus_"+str(i)+".fasta"
start_time = time.time()
rcc.kmer_consensus(result_merged_file, consensus_file, start_primer, stop_primer, 25, 10)
consensus_time += (1/repetition_number) * (time.time()-start_time)
epfa.extract_payload_container(dir_path+"temp_consensus_files_"+str(blocks_size), dir_path+"temp_payload_files_"+str(blocks_size))
for j in range(repetition_number):
payload_file = dir_path+"temp_payload_files_"+str(blocks_size)+"/consensus_"+str(j)+".fasta"
score_consensus = verify_consensus(payload_file, result_dict) # cannot be higher than assembly number
total_score += (1/repetition_number) * (score_consensus/assembly_number)
if score_consensus > assembly_number:
print("score error for consensus_"+str(j)+".fasta")
exit(0)
if total_score == 1: # if multiple perfect scores in a row, no need to increase further the number of reads
perfect_score_streak += 1
if perfect_score_streak == 5:
break
else:
perfect_score_streak = 0
with open(dir_path+"scores/score_result_"+str(blocks_size)+"_"+str(assembly_number)+".txt", 'a') as output:
output.write(str(reads_per_file)+" "+ str(round(100*total_score, 2)) + "% "+str(round(consensus_time, 3))+"s\n")
print("score : "+str(round(100*total_score, 2)) + "% "+str(round(consensus_time, 3))+"s")
if __name__ == "__main__":
print("testing consensus...")
consensus_testing(3000)
#consensus_testing(700)
#s = verify_check_sum("test_consensus/consensus_test.fasta")
#print(s)
print("\tcompleted !")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment