-
BOULLE Olivier authoredBOULLE Olivier authored
source_decoding.py 3.73 KiB
import sys
import os.path
import hashing
import image_conversion as img_conv
import file_conversion as file_conv
import dna_file_reader as dfr
import dna_numbering as dnbr
"""
decode the resulting dna fragments of the workflow to recreate the original document
"""
DNA_NUMBER_SIZE = 11 # constant in the workflow
def get_original_sequence(frag_path, frag_length, n_frag):
"""
remove the filter to the fragments defined by a hash of the fragment number
:param frag_path: path to a list of the fragments
:param frag_length: length of a fragment
:param n_frag: number of fragments in the original document
:return: a list of the original fragments
"""
reconstitued_fragment_dict = {}
fragment_list = list(dfr.read_fasta(frag_path).values())
effective_fragment_size = frag_length-DNA_NUMBER_SIZE
for fragment in fragment_list:
while len(fragment) < frag_length: #extend fragment if too short
fragment += "A"
if len(fragment) > frag_length:
fragment = fragment[:frag_length] #cut fragment if too long
dna_index = fragment[:DNA_NUMBER_SIZE]
num_frag = dnbr.dna_number_to_int(dna_index)
if num_frag == -1:
#print("unrecognized number :",dna_index)
continue
filter = hashing.hash_string_to_formated_number(str(num_frag), effective_fragment_size, 4)
original_fragment = remove_filter(fragment[DNA_NUMBER_SIZE:], filter)
if num_frag in reconstitued_fragment_dict:
print("fragment of number",num_frag,"already read")
else:
reconstitued_fragment_dict[num_frag] = original_fragment
original_sequence = ""
for i in range(n_frag):
if i in reconstitued_fragment_dict:
original_sequence += reconstitued_fragment_dict[i]
else:
#missing fragment, fill with _ instead
original_sequence += "_"*(fragment_length-DNA_NUMBER_SIZE)
return original_sequence
def remove_filter(fragment, filter):
"""
remove the applyed filter to a fragment
"""
base_4_to_dna_dict = {0: "A", 1: "C", 2: "G", 3: "T"}
dna_to_base_4_dict = {"A": 0, "C": 1, "G": 2, "T": 3}
unfiltered_fragment = ""
for i in range(len(fragment)):
fragment_number = dna_to_base_4_dict[fragment[i]]
filter_number = int(filter[i])
sub = (fragment_number-filter_number) % 4
unfiltered_fragment += base_4_to_dna_dict[sub]
return unfiltered_fragment
def decode_document(sequence, output_path, doc_type, metadata):
"""
decode the sequence and save the decoded document
"""
if doc_type == "png":
img_conv.decode_png(sequence, output_path, metadata)
else:
file_conv.decode_file(sequence, output_path)
# =================== main ======================= #
if __name__ == '__main__':
if len(sys.argv) != 8:
print("usage : source_decoding.py fragments_path reconstructed_source_path output_path doc_type fragment_length n_frag metadata")
sys.exit(1)
print("source decoding...")
frag_path = sys.argv[1]
reconstructed_source_path = sys.argv[2]
output_path = sys.argv[3]
doc_type = sys.argv[4]
fragment_length = int(sys.argv[5])
n_frag = int(sys.argv[6])
metadata = sys.argv[7]
original_sequence = get_original_sequence(frag_path, fragment_length, n_frag)
#save the original sequence to the container
output = open(reconstructed_source_path, "w")
output.write(">reconstructed_source\n")
output.write(original_sequence+"\n")
output.close()
decode_document(original_sequence, output_path, doc_type, metadata)
print("\tcompleted !")