Mentions légales du service

Skip to content
Snippets Groups Projects
source_decoding.py 3.73 KiB
import sys
import os.path
import hashing
import image_conversion as img_conv
import file_conversion as file_conv
import dna_file_reader as dfr
import dna_numbering as dnbr

"""
decode the resulting dna fragments of the workflow to recreate the original document
"""


DNA_NUMBER_SIZE = 11 # constant in the workflow

def get_original_sequence(frag_path, frag_length, n_frag):
    """
    remove the filter to the fragments defined by a hash of the fragment number
    :param frag_path: path to a list of the fragments
    :param frag_length: length of a fragment
    :param n_frag: number of fragments in the original document
    :return: a list of the original fragments
    """
    reconstitued_fragment_dict = {}
    
    fragment_list = list(dfr.read_fasta(frag_path).values())
    
    effective_fragment_size = frag_length-DNA_NUMBER_SIZE

    for fragment in fragment_list:
        
        while len(fragment) < frag_length: #extend fragment if too short
            fragment += "A"
        if len(fragment) > frag_length:
            fragment = fragment[:frag_length] #cut fragment if too long
        
        dna_index = fragment[:DNA_NUMBER_SIZE]
        num_frag = dnbr.dna_number_to_int(dna_index)
        if num_frag == -1:
            #print("unrecognized number :",dna_index)
            continue
        

        filter = hashing.hash_string_to_formated_number(str(num_frag), effective_fragment_size, 4)
        original_fragment = remove_filter(fragment[DNA_NUMBER_SIZE:], filter)
        
        if num_frag in reconstitued_fragment_dict:
            print("fragment of number",num_frag,"already read")
        else:
            reconstitued_fragment_dict[num_frag] = original_fragment        
    original_sequence = ""

    for i in range(n_frag):
        if i in reconstitued_fragment_dict:
            original_sequence += reconstitued_fragment_dict[i]
        else:
            #missing fragment, fill with _ instead
            original_sequence += "_"*(fragment_length-DNA_NUMBER_SIZE)
                
    return original_sequence


def remove_filter(fragment, filter):
    """
    remove the applyed filter to a fragment
    """
    base_4_to_dna_dict = {0: "A", 1: "C", 2: "G", 3: "T"}
    dna_to_base_4_dict = {"A": 0, "C": 1, "G": 2, "T": 3}
    
    unfiltered_fragment = ""
    for i in range(len(fragment)):
        fragment_number = dna_to_base_4_dict[fragment[i]]     
        filter_number = int(filter[i])
        sub = (fragment_number-filter_number) % 4
         
        unfiltered_fragment += base_4_to_dna_dict[sub]
    return unfiltered_fragment


def decode_document(sequence, output_path, doc_type, metadata):
    """
    decode the sequence and save the decoded document
    """
    if doc_type == "png":
        img_conv.decode_png(sequence, output_path, metadata)
    else:
        file_conv.decode_file(sequence, output_path)


# =================== main ======================= #
if __name__ == '__main__':
     
    if len(sys.argv) != 8:
        print("usage : source_decoding.py fragments_path reconstructed_source_path output_path doc_type fragment_length n_frag metadata")
        sys.exit(1)

    print("source decoding...")
    frag_path = sys.argv[1]
    reconstructed_source_path = sys.argv[2]
    output_path = sys.argv[3]
    doc_type = sys.argv[4]
    fragment_length = int(sys.argv[5])
    n_frag = int(sys.argv[6])
    metadata = sys.argv[7]
    
    original_sequence = get_original_sequence(frag_path, fragment_length, n_frag)
  
    #save the original sequence to the container
    output = open(reconstructed_source_path, "w")
    output.write(">reconstructed_source\n")
    output.write(original_sequence+"\n")
    output.close()
        
    decode_document(original_sequence, output_path, doc_type, metadata)
    
    print("\tcompleted !")