...
 
Commits (2)
......@@ -2,4 +2,4 @@
__version__ = "0.1"
from paf2gfa.parser import Parser
from paf2gfa.parser import Parser, ParsingWarning
import os
import sys
import time
import logging
import argparse
from paf2gfa import Parser
def main(args=None):
......@@ -23,18 +20,15 @@ def main(args=None):
help="Remove all internal match")
arg = vars(parser.parse_args(args))
paf = arg["paf"].name
gfa = arg["gfa"].name
rm_all_contain = arg["remove_all_containment"]
rm_all_internal = arg["remove_all_internal"]
parser = Parser(not rm_all_contain, not rm_all_internal)
parser = Parser(not arg["remove_all_containment"],
not arg["remove_all_internal"])
for line in arg["paf"]:
result = parser.parse_line(line)
if result is not None:
logging.warning(str(res))
logging.warning(str(result))
for line in parser.generate_gfa():
arg["gfa"].write(line+"\n")
......
......@@ -3,14 +3,14 @@
import re
import networkx as nx
from enum import Enum
from types import SimpleNamespace
from collections import defaultdict
import networkx as nx
class ParsingWarning(Enum):
NOTHING = ""
LINK_REPLACE_LENGTH = "Replace previous overlap between same read with less overlap length"
LINK_REPLACE_MATCH = "Replace previous overlap between same read with less kmer match"
......@@ -35,7 +35,7 @@ class Parser:
self.__graph = nx.DiGraph()
self.__containments = defaultdict(list)
self.__overhang_maplen_ratio_limit = 0.8
@property
......@@ -72,9 +72,9 @@ class Parser:
Return None, or a ParsingWarning object
"""
l = SimpleNamespace(
**{self.__cores[i]: v for i, v in enumerate(re.split("\s+", line)[:12])})
**{self.__cores[i]: v for i, v in enumerate(re.split(r"\s+", line)[:12])})
l = Parser.__line_type_correction(l)
......@@ -94,65 +94,55 @@ class Parser:
return self._add_internal(l.read_a, "+", l.read_b, "+",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_internal(l.read_a, "+", l.read_b, "-",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_internal(l.read_a, "+", l.read_b, "-", l.nb_base,
l.nb_match, overhang/maplen)
elif l.strand == "+" and l.beg_a <= l.beg_b and l.len_a - l.end_a < l.len_b - l.end_b:
# B containe A
return self._add_containment(l.read_b, "+", l.read_a, "+", l.beg_b,
l.nb_base, l.len_b, l.len_a)
l.nb_base, l.len_b, l.len_a)
elif l.strand == "-" and l.beg_a <= l.len_b - l.end_b and l.len_a - l.end_a < l.beg_b:
# B containe A
return self._add_containment(l.read_b, "+", l.read_a, "-", l.beg_b,
l.nb_base, l.len_b, l.len_a)
l.nb_base, l.len_b, l.len_a)
elif l.strand == "+" and l.beg_a >= l.beg_b and l.len_a - l.end_a > l.len_b - l.end_b:
# A containe B
return self._add_containment(l.read_a, "+", l.read_b, "+", l.beg_a,
l.nb_base, l.len_a, l.len_b)
l.nb_base, l.len_a, l.len_b)
elif l.strand == "-" and l.beg_a >= l.len_b - l.end_b and l.len_a - l.end_a > l.beg_b:
# A containe B
return self._add_containment(l.read_a, "+", l.read_b, "-", l.beg_a,
l.nb_base, l.len_a, l.len_b)
l.nb_base, l.len_a, l.len_b)
elif l.strand == "+":
if l.beg_a > l.beg_b:
# A overlap B
return self._add_link(l.read_a, "+", l.read_b, "+", l.nb_base,
l.nb_match, overhang/maplen)
else:
# B overlap A
return self._add_link(l.read_b, "+", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
# B overlap A
return self._add_link(l.read_b, "+", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
else:
if l.beg_a > l.len_a - l.end_a:
if l.beg_a > l.len_b - l.end_b:
return self._add_link(l.read_a, "+", l.read_b, "-",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_link(l.read_b, "+", l.read_a, "-",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_link(l.read_b, "+", l.read_a, "-", l.nb_base,
l.nb_match, overhang/maplen)
else:
if l.len_a - l.beg_a > l.end_b:
return self._add_link(l.read_a, "-", l.read_b, "+",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_link(l.read_b, "-", l.read_a, "+",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_link(l.read_b, "-", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
def parse_lines(self, lines):
for line in lines:
if line.strip() != "":
self.parse_line(line)
def parse_file(self, file):
for line in file:
self.parse_line(line)
def generate_gfa(self):
yield "H\tVN:Z:1.0"
remove_node = set()
......@@ -160,7 +150,7 @@ class Parser:
if not self.__containment:
for contained in self.__containments:
remove_node.add(contained)
self.__containments = defaultdict(list)
for c in remove_node:
self.__graph.remove_node(c)
......@@ -192,7 +182,7 @@ class Parser:
for conted, list_conter in self.__containments.items():
for (conter, straner, straned, pos, ov, *_) in list_conter:
yield "\t".join(["C", conter, straner, conted, straned,
str(pos), str(ov)]) + "M"
str(pos), str(ov)]) + "M"
def get_gfa(self):
return "\n".join(list(self.generate_gfa()))
......@@ -222,7 +212,7 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_LENGTH
return ParsingWarning.LINK_REPLACE_LENGTH
elif edge["nb_match"] < nb_match:
self.__graph.remove_edge(name_a, name_b)
......@@ -230,7 +220,7 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_MATCH
return ParsingWarning.LINK_REPLACE_MATCH
elif edge["overhang_maplen"] > overhang_maplen:
self.__graph.remove_edge(name_a, name_b)
......@@ -238,11 +228,11 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_OVMAPLEN
return ParsingWarning.LINK_REPLACE_OVMAPLEN
else:
self.__graph.add_edge(name_a, name_b, strand_a=strand_a, strand_b=strand_b,
ov_len=ov_len, nb_match=nb_match,
overhang_maplen=overhang_maplen)
ov_len=ov_len, nb_match=nb_match,
overhang_maplen=overhang_maplen)
return None
def _add_internal(self, name_a, strand_a, name_b, strand_b, ov_len,
......@@ -280,16 +270,15 @@ class Parser:
Return None if all its ok or a string with warning message
"""
self.__containments[contained].append((container, strand_ner,
strand_ned, pos, length,
len_ner, len_ned))
if container in self.__containments and \
any([contained == c[0] for c in self.__containments[container]]):
return ParsingWarning.CONTAINMENT_PREVIOUS
else:
return None
return None
@staticmethod
def __line_type_correction(l):
......@@ -300,13 +289,12 @@ class Parser:
l.beg_b = int(l.beg_b)
l.end_b = int(l.end_b)
l.len_b = int(l.len_b)
return l
@staticmethod
def __compute_overhang(l):
if l.strand == "+":
return min(l.beg_a, l.beg_b) + min(l.len_a - l.end_a, l.len_b - l.end_b)
else:
return min(l.beg_a, l.len_b - l.end_b) + min(l.beg_b, l.len_a - l.end_a)
return min(l.beg_a, l.len_b - l.end_b) + min(l.beg_b, l.len_a - l.end_a)