Mentions légales du service

Skip to content
Snippets Groups Projects
read_matrix.py 5.41 KiB
#!/usr/bin/python3

import os
import sys
import inspect
import time
import random


import reads_consensus_class as rcc

currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.dirname(currentdir)+"/synthesis_modules")
import dna_file_reader as dfr
import synthesis_simulation as ss



def generate_references_sequences(seq_number, references_path) -> dict:
    """
    create random sequences with a checksum to use as references for the tests
    """

    h_max = 3 # maxi homopolymere size

    seq_size = 60

    generated_seq_dict = {}

    for seq_num in range(seq_number):
        sequence = ""
        while len(sequence) < seq_size :
            alphabet = ["A", "G", "C", "T"]
            letter = random.choice(alphabet)
            if sequence[-h_max:] == letter*h_max:  # if the end of the sequence is a homopolymer of this letter
                alphabet.remove(letter)
                letter = random.choice(alphabet)  # then pick another one
            sequence += letter
        generated_seq_dict["ref_"+str(seq_num)] = sequence

    dfr.save_dict_to_fasta(generated_seq_dict, references_path)


def generate_random_reads(references_file_path, coverage_list, reads_path):
    """
    generate a read file containing the simulated reads from {assembly_number} sequences, {reads_per_file} times each
    """

    #i_error, d_error, s_error = 0.005, 0.01, 0.005 # ~2% error rate
    i_error, d_error, s_error = 0.01, 0.025, 0.01 # ~4% error rate

    ref_dict = dfr.read_fasta(references_file_path)

    output_read_dict = {}

    for i, (seq_name, sequence) in enumerate(ref_dict.items()):

        for j in range(coverage_list[i]):
            simulated_read_seq = ss.add_errors(sequence, i_error, d_error, s_error)
            output_read_dict[seq_name+"_"+str(j)] = simulated_read_seq

    dfr.save_dict_to_fastq(output_read_dict, reads_path)


def get_minimisers(reads_path):

    kmer_size = 10
    minimiser_size = 6


    minimisers_dict = {}

    reads_dict = dfr.read_fastq(reads_path)

    for read_seq in reads_dict.values():
        sequence_size = len(read_seq)
        read_seq_rv = dfr.reverse_complement(read_seq)
        for i in range(sequence_size-kmer_size+1):
            sub_fw = read_seq[i:i+kmer_size]
            sub_rv = read_seq_rv[sequence_size-kmer_size-i:sequence_size-i]
            #print(sub_fw)
            #print(sub_rv)
            minimiser = "Z"*minimiser_size
            for j in range(kmer_size-minimiser_size+1):
                sub_minimiser = sub_fw[j:j+minimiser_size]
                if sub_minimiser < minimiser:
                    minimiser = sub_minimiser
                sub_minimiser = sub_rv[j:j+minimiser_size]
                if sub_minimiser < minimiser:
                    minimiser = sub_minimiser
                #print(minimiser)
            minimisers_dict[minimiser] = minimisers_dict.get(minimiser, 0) +1

    print(str(len(minimisers_dict))+" minimisers")
    return minimisers_dict


def generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path):

    reads_dict = dfr.read_fastq(reads_path)
    reads_name_list = list(reads_dict.keys())

    random.shuffle(reads_name_list)

    soluce_dict = {}
    for i, read_name in enumerate(reads_name_list):
        read_cluster = int(read_name.split("_")[1])
        soluce_dict[read_cluster] = soluce_dict.get(read_cluster, []) + ["r"+str(i)]

    soluce_dict = dict(sorted(soluce_dict.items()))
    print(soluce_dict)

    with open(solutions_path, 'w') as output_file:
        for cluster_name in soluce_dict.keys():
            output_file.write(str(cluster_name)+":"+",".join(soluce_dict[cluster_name])+"\n")

    hidden_reads_name = ["r"+str(k) for k in range(len(reads_dict))]

    ones_counter = 0

    with open(matrix_path, 'w') as output_file:
        output_file.write(","+",".join(hidden_reads_name)+"\n")
        for i, minimiser in enumerate(minimisers_dict.keys()):
            output_file.write("m"+str(i)+",")
            minimiser_line = []
            for read_name in reads_name_list:
                read_seq = reads_dict[read_name]
                read_seq_rv = dfr.reverse_complement(read_seq)
                if minimiser in read_seq or minimiser in read_seq_rv:
                    minimiser_line.append("1")
                    ones_counter += 1
                else:
                    minimiser_line.append("0")
            output_file.write(",".join(minimiser_line)+"\n")
    cases_counter = len(reads_name_list)*len(minimisers_dict)
    print(str(ones_counter)+ "/"+ str(cases_counter) +" = "+ str(round(100*ones_counter/cases_counter, 1))+"%")


def matrix_generation():

    coverage_list = [13,11,11,10,9,9,8,8,8,7,5,1]
    ref_path = "matrix_tests/references.fasta"
    reads_path = "matrix_tests/reads.fastq"
    solutions_path = "matrix_tests/soluce.txt"
    matrix_path = "matrix_tests/matrix.csv"

    generate_references_sequences(12, ref_path)
    generate_random_reads(ref_path, coverage_list, reads_path)

    minimisers_dict = get_minimisers(reads_path)

    generate_matrix(reads_path, minimisers_dict, solutions_path, matrix_path)



if __name__ == "__main__":


    print("generate matrix...")
    #for i in [1000*k for k in range(1,21)]:
    #    min_occ_testing(i)
    #for i in range(1):
    #    consensus_testing(3000)

    matrix_generation()
    #consensus_testing(50000)
    #s = verify_check_sum("test_consensus/consensus_test.fasta")
    #print(s)       
    print("\tcompleted !")