Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 382e55bc authored by BOULLE Olivier's avatar BOULLE Olivier
Browse files

refactoring, added print for tests assembly analysis

parent 5bc5b5e3
No related branches found
No related tags found
No related merge requests found
......@@ -18,115 +18,135 @@ overhang_size = 4
bsaI_size = 7
def remove_buffer(eBlocks_dict: dict) -> dict:
def remove_buffer(eBlocks_list: list) -> list:
"""
buffer at the beginning and end of each blocks
"""
result_dict = {}
for key, sequence in eBlocks_dict.items():
result_dict[key] = sequence[buffer_size:-buffer_size]
result_list = []
for sequence in eBlocks_list:
result_list.append(sequence[buffer_size:-buffer_size])
return result_dict
return result_list
def digest_bsaI(eBlocks_dict: dict) -> dict:
def digest_bsaI(eBlocks_list: list) -> list:
"""
bsaI next to any overhangs, beginning and end of each blocks
"""
result_dict = {}
bsaI_block_list = list(eBlocks_dict.values())
result_list = []
for i in range(len(bsaI_block_list)):
bsaI_block_list[i] = bsaI_block_list[i][bsaI_size:-bsaI_size]
for sequence in eBlocks_list:
result_list.append(sequence[bsaI_size:-bsaI_size])
for i, key in enumerate(eBlocks_dict.keys()):
result_dict[key] = bsaI_block_list[i]
return result_list
return result_dict
def assemble_blocks(assembly_name: str, eBlocks_dict: dict) -> dict:
def assemble_blocks(eBlocks_list: list) -> dict:
"""
just a single perfect assembly of blocks
"""
block_sequences = list(eBlocks_dict.values())
assembly_dict = {assembly_name : block_sequences[0]}
assembly_sequence = eBlocks_list[0]
for sequence in block_sequences[1:]:
if sequence[:overhang_size] == assembly_dict[assembly_name][-overhang_size:]:
assembly_dict[assembly_name] += sequence[overhang_size:]
for sequence in eBlocks_list[1:]:
if sequence[:overhang_size] == assembly_sequence[-overhang_size:]:
assembly_sequence += sequence[overhang_size:]
else:
print("error assembly simulation, overhangs not compatibles",assembly_dict[assembly_name][-overhang_size:],sequence[:overhang_size])
print("error assembly simulation, overhangs not compatibles",assembly_sequence[-overhang_size:],sequence[:overhang_size])
exit(1)
# remove overhangs before start primer and after stop primers in extremities blocks
assembly_dict[assembly_name] = assembly_dict[assembly_name][overhang_size:-overhang_size]
assembly_sequence = assembly_sequence[overhang_size:-overhang_size]
return assembly_dict
# return a dict with 1 sequence
return {"molecule_0" : assembly_sequence}
def assemble_blocks_simulation(eBlocks_dict: dict, nbr_mol: int) -> dict:
def assemble_blocks_simulation(eBlocks_list: list, nbr_mol: int) -> dict:
"""
use the statistics of overhangs assemblies to simulation the assembly of blocks
"""
overhang_assembly_stats_dict = dfr.read_fasta("file_test_assembly/overhangs_stats.txt")
blocks_list = list(eBlocks_dict.values())
overhang_assembly_stats_dict = dfr.read_fasta("file_test_assembly/overhangs_stats.txt")
assembly_dict = {}
i = 0
mol_index = 0
for assembly_try in range(nbr_mol): # an assembly try can result in multiple molecules
current_assembly = eBlocks_list[0] # start an assembly from the first block
current_assembly = blocks_list[0] # start an assembly from the first block
for block in blocks_list[1:]:
for block in eBlocks_list[1:]:
last_overhang = current_assembly[-overhang_size:]
overhang_link_proba = float(overhang_assembly_stats_dict[last_overhang])
if random.random() <= overhang_link_proba: # chance to succeed overhang linking
print("link success")
current_assembly += block[overhang_size:] # link the block to the current assembly
else:
print("link fail")
assembly_dict[i] = current_assembly # the current assembly is not linked to the block, so its is completed
i += 1
assembly_dict["molecule_"+str(mol_index)] = current_assembly # the current assembly is not linked to the block, so its is completed
current_assembly = block[overhang_size:] # begin a new assembly starting from this block
mol_index += 1
# add the assembly to the dict
assembly_dict[i] = current_assembly
i += 1
for value in assembly_dict.values():
print(value)
return eBlocks_dict
assembly_dict["molecule_"+str(mol_index)] = current_assembly
return assembly_dict
def eBlocks_assembly(input_path: str, assembly_name, output_path: str, nbr_mol: int) -> None:
def test_assembly(input_path, output_path: str, nbr_mol: int) -> None:
"""
print the assembly directly as a txt file of extracted blocks format
for result analysis
used to test simulation results
"""
eBlocks_dict = dfr.read_fasta(input_path)
blocks_list = list(eBlocks_dict.values())[::2] # get all odd elements of eBlocks dict (= forward fragments)
# take a dict of only eBlocks, not tests simu for bsaI and buffer removal
assembly_dict = assemble_blocks_simulation(blocks_list, nbr_mol)
output = open(output_path, "w")
for name, assembly in assembly_dict.items():
list_frag = []
assembly = assembly.replace("primer_____________", "") # remove the primer
blocks_indexes = [i for i in range(len(assembly)) if assembly.startswith('block_', i)] # get all index of blocks in the assembly # supposedly inefficient in time but elegant
for block_index in blocks_indexes:
list_frag += [(assembly[block_index:block_index+8]+"_Fw", [block_index, block_index+8], '5p', '3p', 10)]
framed_list_frag = [("First", [0, 0], "_", "_", 10)] + list_frag + [("Last", [len(assembly), len(assembly)], "_", "_", 10)]
output.write(name+"\n")
output.write(assembly+"\n")
output.write(str(framed_list_frag)+"\n")
output.write("\n")
output.close()
def eBlocks_assembly(input_path: str, output_path: str, nbr_mol: int) -> None:
"""
simulate an ordered assembly of eBlocks surrounded by the primers
the resulting molecules are saved to a .fasta file to simulate the storing
"""
eBlocks_dict = dfr.read_fasta(input_path)
if len(eBlocks_dict) == 0:
print("error : not sequences found at :",input_path)
exit(1)
blocks_list = list(eBlocks_dict.values())
# simulate removal of buffers
eBlocks_dict = remove_buffer(eBlocks_dict)
blocks_list = remove_buffer(blocks_list)
# simulate bsaI digestion
eBlocks_dict = digest_bsaI(eBlocks_dict)
blocks_list = digest_bsaI(blocks_list)
# simulation assembly by overhangs, fatal error if overhangs are not compatibles for successive blocks
#assembly_dict = assemble_blocks(assembly_name, eBlocks_dict)
assembly_dict = assemble_blocks_simulation(eBlocks_dict, nbr_mol)
assembly_dict = assemble_blocks(blocks_list)
# save the assemblies into file
dfr.save_dict_to_fasta(assembly_dict, output_path)
......@@ -136,12 +156,12 @@ def container_assembly(input_dir_path, output_dir_path, nbr_mol):
for filename in os.listdir(input_dir_path):
file_path = os.path.join(input_dir_path, filename)
if os.path.isdir(file_path):
input_file = file_path+"/5_blocks_buffer.fasta"
output_file = os.path.join(output_dir_path, filename+".fasta")
eBlocks_assembly(input_file, filename, output_file, nbr_mol)
eBlocks_assembly(input_file, output_file, nbr_mol)
# =================== main =======================#
if __name__ == '__main__':
......@@ -160,6 +180,7 @@ if __name__ == '__main__':
print("fragment assembly simulation...")
container_assembly(arg.input_dir_path, arg.output_dir_path, arg.nbr_mol)
#test_assembly(arg.input_dir_path, arg.output_dir_path, arg.nbr_mol)
print("\tcompleted !")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment