Commit e2771a83 authored by Pierre Marijon's avatar Pierre Marijon

Clean up helped by pylint

parent bdb9b9ee
import os
import sys
import time
import logging
import argparse
from paf2gfa import Parser
def main(args=None):
......@@ -23,18 +20,15 @@ def main(args=None):
help="Remove all internal match")
arg = vars(parser.parse_args(args))
paf = arg["paf"].name
gfa = arg["gfa"].name
rm_all_contain = arg["remove_all_containment"]
rm_all_internal = arg["remove_all_internal"]
parser = Parser(not rm_all_contain, not rm_all_internal)
parser = Parser(not arg["remove_all_containment"],
not arg["remove_all_internal"])
for line in arg["paf"]:
result = parser.parse_line(line)
if result is not None:
logging.warning(str(res))
logging.warning(str(result))
for line in parser.generate_gfa():
arg["gfa"].write(line+"\n")
......
......@@ -3,14 +3,14 @@
import re
import networkx as nx
from enum import Enum
from types import SimpleNamespace
from collections import defaultdict
import networkx as nx
class ParsingWarning(Enum):
NOTHING = ""
LINK_REPLACE_LENGTH = "Replace previous overlap between same read with less overlap length"
LINK_REPLACE_MATCH = "Replace previous overlap between same read with less kmer match"
......@@ -35,7 +35,7 @@ class Parser:
self.__graph = nx.DiGraph()
self.__containments = defaultdict(list)
self.__overhang_maplen_ratio_limit = 0.8
@property
......@@ -72,9 +72,9 @@ class Parser:
Return None, or a ParsingWarning object
"""
l = SimpleNamespace(
**{self.__cores[i]: v for i, v in enumerate(re.split("\s+", line)[:12])})
**{self.__cores[i]: v for i, v in enumerate(re.split(r"\s+", line)[:12])})
l = Parser.__line_type_correction(l)
......@@ -94,55 +94,49 @@ class Parser:
return self._add_internal(l.read_a, "+", l.read_b, "+",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_internal(l.read_a, "+", l.read_b, "-",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_internal(l.read_a, "+", l.read_b, "-", l.nb_base,
l.nb_match, overhang/maplen)
elif l.strand == "+" and l.beg_a <= l.beg_b and l.len_a - l.end_a < l.len_b - l.end_b:
# B containe A
return self._add_containment(l.read_b, "+", l.read_a, "+", l.beg_b,
l.nb_base, l.len_b, l.len_a)
l.nb_base, l.len_b, l.len_a)
elif l.strand == "-" and l.beg_a <= l.len_b - l.end_b and l.len_a - l.end_a < l.beg_b:
# B containe A
return self._add_containment(l.read_b, "+", l.read_a, "-", l.beg_b,
l.nb_base, l.len_b, l.len_a)
l.nb_base, l.len_b, l.len_a)
elif l.strand == "+" and l.beg_a >= l.beg_b and l.len_a - l.end_a > l.len_b - l.end_b:
# A containe B
return self._add_containment(l.read_a, "+", l.read_b, "+", l.beg_a,
l.nb_base, l.len_a, l.len_b)
l.nb_base, l.len_a, l.len_b)
elif l.strand == "-" and l.beg_a >= l.len_b - l.end_b and l.len_a - l.end_a > l.beg_b:
# A containe B
return self._add_containment(l.read_a, "+", l.read_b, "-", l.beg_a,
l.nb_base, l.len_a, l.len_b)
l.nb_base, l.len_a, l.len_b)
elif l.strand == "+":
if l.beg_a > l.beg_b:
# A overlap B
return self._add_link(l.read_a, "+", l.read_b, "+", l.nb_base,
l.nb_match, overhang/maplen)
else:
# B overlap A
return self._add_link(l.read_b, "+", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
# B overlap A
return self._add_link(l.read_b, "+", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
else:
if l.beg_a > l.len_a - l.end_a:
if l.beg_a > l.len_b - l.end_b:
return self._add_link(l.read_a, "+", l.read_b, "-",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_link(l.read_b, "+", l.read_a, "-",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_link(l.read_b, "+", l.read_a, "-", l.nb_base,
l.nb_match, overhang/maplen)
else:
if l.len_a - l.beg_a > l.end_b:
return self._add_link(l.read_a, "-", l.read_b, "+",
l.nb_base, l.nb_match,
overhang/maplen)
else:
return self._add_link(l.read_b, "-", l.read_a, "+",
l.nb_base, l.nb_match,
overhang/maplen)
return self._add_link(l.read_b, "-", l.read_a, "+", l.nb_base,
l.nb_match, overhang/maplen)
def parse_lines(self, lines):
for line in lines:
......@@ -156,7 +150,7 @@ class Parser:
if not self.__containment:
for contained in self.__containments:
remove_node.add(contained)
self.__containments = defaultdict(list)
for c in remove_node:
self.__graph.remove_node(c)
......@@ -188,7 +182,7 @@ class Parser:
for conted, list_conter in self.__containments.items():
for (conter, straner, straned, pos, ov, *_) in list_conter:
yield "\t".join(["C", conter, straner, conted, straned,
str(pos), str(ov)]) + "M"
str(pos), str(ov)]) + "M"
def get_gfa(self):
return "\n".join(list(self.generate_gfa()))
......@@ -218,7 +212,7 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_LENGTH
return ParsingWarning.LINK_REPLACE_LENGTH
elif edge["nb_match"] < nb_match:
self.__graph.remove_edge(name_a, name_b)
......@@ -226,7 +220,7 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_MATCH
return ParsingWarning.LINK_REPLACE_MATCH
elif edge["overhang_maplen"] > overhang_maplen:
self.__graph.remove_edge(name_a, name_b)
......@@ -234,11 +228,11 @@ class Parser:
strand_b=strand_b, ov_len=ov_len,
nb_match=nb_match,
overhang_maplen=overhang_maplen)
return ParsingWarning.LINK_REPLACE_OVMAPLEN
return ParsingWarning.LINK_REPLACE_OVMAPLEN
else:
self.__graph.add_edge(name_a, name_b, strand_a=strand_a, strand_b=strand_b,
ov_len=ov_len, nb_match=nb_match,
overhang_maplen=overhang_maplen)
ov_len=ov_len, nb_match=nb_match,
overhang_maplen=overhang_maplen)
return None
def _add_internal(self, name_a, strand_a, name_b, strand_b, ov_len,
......@@ -276,16 +270,15 @@ class Parser:
Return None if all its ok or a string with warning message
"""
self.__containments[contained].append((container, strand_ner,
strand_ned, pos, length,
len_ner, len_ned))
if container in self.__containments and \
any([contained == c[0] for c in self.__containments[container]]):
return ParsingWarning.CONTAINMENT_PREVIOUS
else:
return None
return None
@staticmethod
def __line_type_correction(l):
......@@ -296,13 +289,12 @@ class Parser:
l.beg_b = int(l.beg_b)
l.end_b = int(l.end_b)
l.len_b = int(l.len_b)
return l
@staticmethod
def __compute_overhang(l):
if l.strand == "+":
return min(l.beg_a, l.beg_b) + min(l.len_a - l.end_a, l.len_b - l.end_b)
else:
return min(l.beg_a, l.len_b - l.end_b) + min(l.beg_b, l.len_a - l.end_a)
return min(l.beg_a, l.len_b - l.end_b) + min(l.beg_b, l.len_a - l.end_a)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment