__main__.py 10.4 KB
Newer Older
MARIJON Pierre's avatar
MARIJON Pierre committed
1
import os
MARIJON Pierre's avatar
MARIJON Pierre committed
2
import sys
3
import json
MARIJON Pierre's avatar
MARIJON Pierre committed
4 5 6
import logging
import argparse
import requests
7
import itertools
MARIJON Pierre's avatar
MARIJON Pierre committed
8 9
import subprocess
import statistics
MARIJON Pierre's avatar
MARIJON Pierre committed
10
import multiprocessing
MARIJON Pierre's avatar
MARIJON Pierre committed
11 12 13 14

from pathlib import Path
from bs4 import BeautifulSoup

MARIJON Pierre's avatar
MARIJON Pierre committed
15
canu_call_template = "canu -p canu -pacbio-raw {} -d {} genomeSize={} correctedErrorRate={} corOutCoverage={} corMinCoverage={} maxThreads=15"
16
canu_call_template_unitig = canu_call_template + " stopAfter=unitig"
MARIJON Pierre's avatar
MARIJON Pierre committed
17

18 19
minimap_call_template = "minimap -x ava10k -t 10 {} {}"
miniasm_call_template = "miniasm -f {} {}"
MARIJON Pierre's avatar
MARIJON Pierre committed
20

21 22 23
base_dir = os.path.dirname(__file__) + os.sep
bact_dict = json.load(open(base_dir+'data'+ os.sep + 'NCTC.json'))

MARIJON Pierre's avatar
MARIJON Pierre committed
24 25 26 27 28 29 30 31 32 33 34 35
def nb_base_in_file(path: Path):
    nb_base = 0

    path = os.path.abspath(str(path))
    with open(path, "r") as sequences:
        line_count = 0
        for line in sequences:
            line_count += 1
            if line_count % 4 == 2: # take second line each 4 line
                nb_base += len(line)

    return nb_base
MARIJON Pierre's avatar
MARIJON Pierre committed
36 37 38 39 40 41

def main(args = None):
    
    if args is None:
        args = sys.argv[1:]

MARIJON Pierre's avatar
MARIJON Pierre committed
42 43
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
44
    formatter = logging.Formatter('%(asctime)s :: %(message)s')
MARIJON Pierre's avatar
MARIJON Pierre committed
45 46 47 48 49 50 51
    steam_handler = logging.StreamHandler()
    steam_handler.setFormatter(formatter)
    logger.addHandler(steam_handler)

    parser = argparse.ArgumentParser(prog="assembly_report",
                                     formatter_class=argparse.
                                     ArgumentDefaultsHelpFormatter)
52 53
    subparser = parser.add_subparsers()

MARIJON Pierre's avatar
MARIJON Pierre committed
54 55 56 57 58 59 60 61 62
    parser.add_argument("-i", "--input",
                        help="File contain on SRA accession sample per line",
                        type=argparse.FileType("r"), required=True)
    parser.add_argument("-d", "--data",
                        help="directory where sequence are download",
                        type=Path, required=True)
    parser.add_argument("-a", "--assembly",
                        help="directory where assembly are save",
                        type=Path, required=True)
63 64 65 66 67 68
    
    subparser_select = subparser.add_parser('select', help="Partial assembly first run")
    subparser_select.set_defaults(func=select)

    subparser_finish = subparser.add_parser('finish', help="Finish canu assembly and run mini{map|asm}")
    subparser_finish.set_defaults(func=finish)
MARIJON Pierre's avatar
MARIJON Pierre committed
69 70

    args = vars(parser.parse_args(args))
71 72 73 74
    if "func" not in args:
        print("You forget task specify select or finish")
        parser.print_help()
        return 1
MARIJON Pierre's avatar
MARIJON Pierre committed
75

76 77
    args["func"](args)

78 79
    return 0

80 81
def finish(args):
    for accession in args["input"]:
82 83
        print(accession)
        accession, ERS_accession = accession.strip().split()
84 85
        
        logging.info("begin of get information for "+accession)
86
        run_id, assembly_len = get_info_about_ERS(ERS_accession)
87
        logging.info("end of get information for "+accession)
88 89 90 91 92
       
        if not assembly_len:
            logging.error("We didn't find any assembly len, we skip this !")
            continue

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
        data_path = args["data"] / accession
        data_path.mkdir(parents=True, exist_ok=True)
        
        logging.info("begin of compute canu parametre for "+accession)
        fastq_files, *canu_parameters = compute_canu_parameter(data_path, assembly_len)
        logging.info("end of compute canu parametre for "+accession)

        if not fastq_files:
            logging.error("We didn't find any file in data directory, we skip this !")
            continue

        assembly_path = args["assembly"] / accession / "canu"
        assembly_path.mkdir(parents=True, exist_ok=True)

        logging.info("begin finish canu assembly")
MARIJON Pierre's avatar
MARIJON Pierre committed
108
        canu_call = canu_call_template.format(fastq_files,
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
                                              os.path.abspath(assembly_path),
                                              assembly_len, *canu_parameters)
        logging.info(canu_call)
        logname = os.path.abspath(str(assembly_path / "canu_log_finish"))
        with open(logname+".out", "w") as outfile:
            pass
            out = subprocess.call(canu_call.split(" "), stdout=outfile,
                                  stderr=subprocess.STDOUT,
                                  universal_newlines=True)
        logging.info("end finish canu assembly")

        minimap_path = args["assembly"] / accession / "minimap"
        minimap_path.mkdir(parents=True, exist_ok=True)

        logging.info("begin minimap")
124
        minimap_call = minimap_call_template.format(fastq_files, fastq_files)
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        minimap_out = str(minimap_path / "minimap.paf") 
        minimap_log = str(minimap_path / "minimap.log")
       
        logging.info(minimap_call)
        
        with open(minimap_log, "w") as errfile:
            with open(minimap_out, "w") as outfile:
                pass
                out = subprocess.call(minimap_call.split(" "),
                                      stdout=outfile,
                                      stderr=errfile,
                                      universal_newlines=True)
        logging.info("end minimap")

        miniasm_path = args["assembly"] / accession / "miniasm"
        miniasm_path.mkdir(parents=True, exist_ok=True)
        
        logging.info("begin miniasm")
MARIJON Pierre's avatar
MARIJON Pierre committed
143
        miniasm_call = miniasm_call_template.format(fastq_files, minimap_out)
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
        miniasm_out = str(miniasm_path / "miniasm.gfa") 
        miniasm_log = str(miniasm_path / "miniasm.log")
       
        logging.info(miniasm_call)
        
        with open(miniasm_log, "w") as errfile:
            with open(miniasm_out, "w") as outfile:
                pass
                out = subprocess.call(miniasm_call.split(" "),
                                      stdout=outfile,
                                      stderr=errfile,
                                      universal_newlines=True)
        logging.info("end miniasm")

def select(args):
MARIJON Pierre's avatar
MARIJON Pierre committed
159
    for accession in args["input"]:
160
        accession, ERS_accession = accession.strip().split()
MARIJON Pierre's avatar
MARIJON Pierre committed
161 162 163

        data_path = args["data"] / accession
        
164
        logging.info("begin of download data for "+accession)
MARIJON Pierre's avatar
MARIJON Pierre committed
165
        run_id, assembly_len = get_info_about_ERS(ERS_accession)
166 167
        download_extract_merge(data_path, accession)
        logging.info("end of download data for "+accession)
168 169 170 171
        
        if not assembly_len:
            logging.error("We didn't find any assembly len, we skip this !")
            continue
172

MARIJON Pierre's avatar
MARIJON Pierre committed
173 174
        # compute coverage
        logging.info("begin of compute canu parametre for "+accession)
175
        fastq_files, *canu_parameters = compute_canu_parameter(data_path, assembly_len)
MARIJON Pierre's avatar
MARIJON Pierre committed
176 177 178
        logging.info("end of compute canu parametre for "+accession)

        logging.info("begin of canu assembly "+accession)
179 180 181 182

        assembly_path = args["assembly"] / accession / "canu"
        assembly_path.mkdir(parents=True, exist_ok=True)
        
MARIJON Pierre's avatar
MARIJON Pierre committed
183
        canu_call = canu_call_template_unitig.format(fastq_files,
184 185
                                                     os.path.abspath(assembly_path),
                                                     assembly_len, *canu_parameters)
186
        logging.info(canu_call)
187
        logname = os.path.abspath(str(assembly_path / "canu_log_unitig"))
188 189 190 191
        with open(logname+".out", "w") as outfile:
            out = subprocess.call(canu_call.split(" "), stdout=outfile,
                                  stderr=subprocess.STDOUT,
                                  universal_newlines=True)
192

MARIJON Pierre's avatar
MARIJON Pierre committed
193
        logging.info("end of canu assembly "+accession)
MARIJON Pierre's avatar
MARIJON Pierre committed
194

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
base_request = "https://www.ebi.ac.uk/ena/data/view/"
info_request = base_request + "{}&display=xml"
assembly_request = base_request + "Taxon:{}&display=xml&portal=assembly"

def get_info_about_ERS(accession):
    # find taxon id and run id
    r = requests.get(info_request.format(accession))
    soup = BeautifulSoup(r.text, "xml")

    for db in soup.find_all():
        if db.get_text() == "ENA-RUN":
            run_id = db.parent.ID.get_text().strip().replace(",", " ")
    
    taxon_id = soup.find("TAXON_ID").get_text()
    r = requests.get(assembly_request.format(taxon_id))
    soup = BeautifulSoup(r.text, "xml")
    list_of_size = [int(attr.parent.VALUE.get_text())
                    for attr in soup.find_all("TAG")
                    if attr.get_text() == "total-length"]
    if len(list_of_size) == 0:
        return None, None
    
    assembly_len = statistics.mean(list_of_size)

    return run_id, assembly_len

def compute_canu_parameter(data_path, assembly_len):
    nb_base = 0
223 224
    fastq_file = data_path / "merged.fasta"
    nb_base = nb_base_in_file(str(fastq_file))
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244

    coverage = nb_base / assembly_len 

    # run assembly
    correctedErrorRate = 0.045
    if coverage >= 60:
        correctedErrorRate = 0.105
    if coverage <= 30:
        correctedErrorRate = 0.040

    corOutCoverage = 40 
    if coverage < 40 :
        corOutCoverage = int(coverage)
    if corOutCoverage < 1.0:
        corOutCoverage = 1.0

    corMinCoverage = 4
    if coverage < 30 :
        corMinCoverage = 0

245 246 247 248 249 250 251 252 253 254
    return fastq_file, correctedErrorRate, corOutCoverage, corMinCoverage 

ftp_url = "ftp://ftp.sra.ebi.ac.uk//vol1/"

def download_extract_merge(data_path, NCTC_id):
    
    data_path.mkdir(parents=True, exist_ok=True)
    
    logname = os.path.abspath(str(data_path / "wget_log"))
    bax_files = list()
MARIJON Pierre's avatar
MARIJON Pierre committed
255
    job_list = list()
256 257 258
    with open(logname+".out", "w") as outfile:
        for file_path in itertools.chain.from_iterable(bact_dict[NCTC_id]["file_paths"].values()):
            if file_path.endswith(".bax.h5"):
MARIJON Pierre's avatar
MARIJON Pierre committed
259
                bax_files.append(str(data_path / os.path.basename(file_path)))
260

MARIJON Pierre's avatar
MARIJON Pierre committed
261 262
            cmd = ["wget", "-N", ftp_url + file_path, "-P", str(data_path)]
            job_list.append(cmd)
263
    
MARIJON Pierre's avatar
MARIJON Pierre committed
264 265 266 267
    p = multiprocessing.Pool()
    p.map(run_paralelle, job_list)

    dextract_cmd = ["dextract", "-o"] + bax_files
268
    logname = os.path.abspath(str(data_path / "dextract_log"))
MARIJON Pierre's avatar
MARIJON Pierre committed
269 270
    with open(logname+".err", "w") as errfile, open(str(data_path / "merged.fasta"), "w") as outfile:
            subprocess.call(dextract_cmd, stdout=outfile, stderr=errfile,
271 272
                            universal_newlines=True)
        
MARIJON Pierre's avatar
MARIJON Pierre committed
273 274 275 276 277
def run_paralelle(cmd):
    logging.info("run " + " ".join(cmd))
    subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    universal_newlines=True)
    logging.info("end " + " ".join(cmd))
278 279 280 281 282

def fastq_file(data_path):
    for child in data_path.iterdir():
        if child.suffix == ".fasta":
            yield str(child)
283

MARIJON Pierre's avatar
MARIJON Pierre committed
284
if __name__ == "__main__":
MARIJON Pierre's avatar
MARIJON Pierre committed
285

286
    sys.exit(main(sys.argv[1:]))