Mentions légales du service

Skip to content
Snippets Groups Projects
sequence_control.py 5.76 KiB
import sys
import math
import dna_file_reader as dfr
import hashing
import source_encoding
import source_decoding

"""
methods to control if a sequence verify some dna constraints
"""


def check_homopolymere(sequence, max_h):
    """
    count the number of homopolymeres larger than h_max in the sequence
    """
    h_nbr = 0 #number of homopolymere larger than h_max found
    row_size = 0 #size of the current row of consecutive nucleotides
    last_nucleotide = "" #previous nucleotide in the sequence
    for nucleotide in sequence:
        if nucleotide == last_nucleotide:
            row_size += 1
        else:
            if row_size > max_h:
                h_nbr += 1
            row_size = 1
        last_nucleotide = nucleotide
    if row_size > max_h:
        h_nbr += 1
        
    return h_nbr


def check_GC(sequence, window_size):
    """
    returns the minimum and maximum GC percentage for all the windows of the sequence
    """
    
    def check_GC_Window(window):
        A_content = window.count('A')
        C_content = window.count('C')
        T_content = window.count('T')
        G_content = window.count('G')
        
        GC_percent = round(100*(G_content + C_content) / (A_content + C_content + T_content + G_content), 1)
        return GC_percent
    
    if len(sequence) <= window_size:
        GC_percent = check_GC_Window(sequence)
        return GC_percent, GC_percent
        
    max_GC_percent = 0
    min_GC_percent = 100
    
    for index in range(len(sequence)-window_size):
        window = sequence[index:index+window_size]
        GC_percent = check_GC_Window(window)
        max_GC_percent = max(GC_percent, max_GC_percent)
        min_GC_percent = min(GC_percent, min_GC_percent)
        
    return min_GC_percent, max_GC_percent


def check_loop(sequence, loop_size, window_size):
    """
    count the number of potential loops (reverse complement of a sub sequence in a local window) in the sequence
    """
    loop_nbr = 0 #number of loop
    if len(sequence) < 2*loop_size: #sequence too short for any loop
        return 0
    
    for index in range(len(sequence)-loop_size):
        sub_sequence = sequence[index:index+loop_size]
        rev_compl_sub_sequence = dfr.reverse_complement(sub_sequence)
        if rev_compl_sub_sequence in sequence[index+loop_size:index+loop_size+window_size]:
            loop_nbr += 1
        
    return loop_nbr


def sequence_check(sequence, window_size=60, verbose=False):
    """
    test if a the conditions for a correct sequence are met, return True if all 3 constraints are valid
    """
    h_nbr = check_homopolymere(sequence, 3)
    if verbose: print("number of homopolymere larger than",3,":",h_nbr)
    min_GC_percent, max_GC_percent = check_GC(sequence, window_size)
    if verbose: print("GC percentage :",min_GC_percent,"% to",max_GC_percent,"%")
    loop_nbr = check_loop(sequence, 6, window_size)
    if verbose: print("number of potential loop :",loop_nbr)
    
    if h_nbr == 0 and min_GC_percent >= 45 and max_GC_percent <= 55 and loop_nbr == 0:
        if verbose: print("sequence is correct")
        return True
    else:
        if verbose: print("sequence is not correct")
        return False
    
    
def hash_until_correct(sequence, start_key=0):
    """
    use a hash on the sequence until it meets the conditions, return the hash key and the hashed sequence
    """
    hash_key = start_key
    hash_filter = hashing.hash_string_to_formated_number(str(hash_key), len(sequence), 4)
    hashed_sequence = source_encoding.apply_filter(sequence, hash_filter)
    while not sequence_check(hashed_sequence):
        hash_key += 1
        hash_filter = hashing.hash_string_to_formated_number(str(hash_key), len(sequence), 4)
        hashed_sequence = source_encoding.apply_filter(sequence, hash_filter)
    
    return hash_key, hashed_sequence
    
    
def find_hash_keys(sequence):
    """
    find the keys to hash the sequence to pass the conditions
    the sequence is divided in sub sequences and each one is hashed until it passes the conditions
    
    """
    tot_hash = ""
    hash_keys = []
    sub_seq_size = 180 #higher -> less keys, but more time consuming 
    sub_seq_nbr = int(math.ceil(len(sequence)/sub_seq_size))
    for i in range(sub_seq_nbr):
        #display progress bar
        k=int(20*len(hash_keys)/sub_seq_nbr)
        sys.stdout.write('\r')
        sys.stdout.write("[%-20s] %d%%" % ('='*k, 5*k))
        sys.stdout.flush()
        
        sub_seq = sequence[sub_seq_size*i:sub_seq_size*(i+1)]
        
        hash_key, hashed_sub_seq = hash_until_correct(sub_seq)
        
        #if the total hashed sequence do not pass the conditions, the sub_seq is re hashed to find an other key
        while not sequence_check(tot_hash+hashed_sub_seq):
            hash_key, hashed_sub_seq = hash_until_correct(sub_seq, hash_key+1)
        
        
        hash_keys.append(hash_key)
        tot_hash += hashed_sub_seq
    
    print("\n"+tot_hash)
    print(hash_keys)
    sequence_check(tot_hash, 60, True) #will be valid


def decode_sequence(sequence):
    hash_keys = [252, 8427, 26249, 7580, 5552, 6460]
    sub_seq_size = 180
    decoded_seq = ""
    for i in range(6):
        sub_seq = sequence[sub_seq_size*i:sub_seq_size*(i+1)]
        hash_filter = hashing.hash_string_to_formated_number(str(hash_keys[i]), sub_seq_size, 4)
        decoded_sub_seq = source_decoding.remove_filter(sub_seq, hash_filter)
        decoded_seq += decoded_sub_seq
    print(decoded_seq)
   

# =================== main ======================= #
if __name__ == '__main__':
     
    if len(sys.argv) != 2:
        print("usage : sequence_control.py sequence_path")
        sys.exit(1)

    sequence_path = sys.argv[1]

    _, sequence = dfr.read_single_sequence_fasta(sequence_path)
    
    #find_hash_keys(sequence)
    decode_sequence(sequence)