...
 
Commits (57)
# -*- coding: utf-8 -*-
from typing import Tuple, List, Iterable
import argparse
import sys
import numpy
import torch
import torch
import torch.utils.data
import torch.nn
from .definitions import Alphabet, ALPHABETS, LabelType, LABEL_TYPES, TestCase, uses_gpu
from . import dataset
class SupervisedTrainingConfiguration(object):
@staticmethod
def setup_argument_parser(parser):
"""Add argument definitions to an existing argparse.ArgumentParser."""
# Definition of dataset input
dataset_definition = parser.add_argument_group("Dataset definition")
dataset_definition.add_argument("--alphabet", dest="alphabet",
type=str, choices=ALPHABETS.keys(), default="dna",
help="Alphabet of dataset")
dataset_definition.add_argument("--label-type", dest="label_type",
type=str, choices=LABEL_TYPES.keys(), default="bool",
help="How to interpret the label value")
# Computation
performance = parser.add_argument_group("Computation performance")
performance.add_argument("--platform", dest="platform",
type=str, choices=["cpu", "cuda"], default="cpu",
help="Computing platform, pytorch syntax")
performance.add_argument("--batch-size", dest="batch_size",
type=int, metavar="N", default=128,
help="Batch size")
# Misc TODO
misc = parser.add_argument_group("Misc")
misc.add_argument("--validation-dataset-ratio", dest="validation_dataset_ratio",
type=float, default=0.25, metavar="R",
help="Ratio of dataset to keep for validation")
def __init__(self, parser_args):
"""Extract configuration from the result of ArgumentParser.parse_args"""
self.alphabet: Alphabet = ALPHABETS[parser_args.alphabet]
self.label_type: LabelType = LABEL_TYPES[parser_args.label_type]
self.validation_dataset_ratio: float = parser_args.validation_dataset_ratio # [0,1] TODO validate
self.device: torch.device = torch.device(parser_args.platform)
self.batch_size: int = parser_args.batch_size # > 0 TODO validate
def do_supervised_training(config: SupervisedTrainingConfiguration): # FIXME
all_dataset = dataset.FastaDataset.parse_from(
sys.stdin,
alphabet = config.alphabet,
label_type = config.label_type
)
train_dataset, test_dataset = dataset.train_test_split(
dataset = all_dataset,
test_ratio = config.validation_dataset_ratio,
stratified = config.label_type.stratified_split
)
# TODO option to use explicit train/test datasets. use only train for "all" ?
# TODO dataloader, optim loop
# optimizer = torch.optim.Adam() good default
def get_arguments():
formatter = lambda prog: argparse.ArgumentDefaultsHelpFormatter( # Print default values
prog, max_help_position=50, width=120) # Allow more columns to be used to prevent line-breakings
parser = argparse.ArgumentParser(prog="ckn", description="CKN implementation", formatter_class=formatter)
sub_parsers = parser.add_subparsers(dest="subcommand")
sub_parsers.required = True # Can be specified in add_subparsers since python 3.7
SupervisedTrainingConfiguration.setup_argument_parser(sub_parsers.add_parser(
"train_supervised", help="Train the CKN in supervised mode", formatter_class=formatter))
sub_parsers.add_parser("pattern", help="Extract patterns from a trained CKN")
sub_parsers.add_parser("predict", help="Use a trained CKN to predict labels")
return parser.parse_args()
### Entry point ###
if __name__ == "__main__":
args = get_arguments()
print(args)
if args.subcommand == "train_supervised":
config = SupervisedTrainingConfiguration(args)
print(config.__dict__)
do_supervised_training(config)
else:
assert False, "unknown subcommand: {}".format(args.subcommand)
\ No newline at end of file
import typing
from typing import Generic, List, Tuple
T = typing.TypeVar("T")
import Bio.SeqIO.FastaIO
import sklearn.model_selection
import torch
import torch.utils.data
from .definitions import Alphabet, LabelType, TestCase
class FastaDataset(torch.utils.data.Dataset, Generic[T]):
"""Dataset for a fasta file with labels.
Contains for each sequence :
- title: str
- sequence content: 1D array of indexes as returned by Alphabet.bytes_to_index_array.
- label: type T, determined by LabelType.type
"""
@classmethod
def parse_from(cls, file_handle: typing.TextIO, alphabet: Alphabet, label_type: LabelType):
"""Create dataset by parsing the content of file_handle as Fasta.
On error, raise exceptions: invalid fasta, wrong alphabet / letters, bad label parsing, ...
The label is extracted from the title, with format: "<title> <label> [...]".
The label is converted from text to the type given by LabelType.
TODO parsing details.
TODO nice parsing errors ?
"""
def entry(title: str, sequence_text: str) -> Tuple[str, torch.Tensor, T]:
title_parts = title.split()
return (
title_parts[0],
alphabet.bytes_to_index_array(bytes(sequence_text, "utf-8")),
label_type.type(float(title_parts[1])) # Parse as float (numeric, permissive) then convert to actual type
)
return cls([
entry(title, sequence) for title, sequence in Bio.SeqIO.FastaIO.SimpleFastaParser(file_handle)
]) # cls is FastaDataset.
def __init__(self, entries: List[Tuple[str, torch.Tensor, T]]):
"""Build dataset from list of entries."""
super().__init__()
self.entries: List[Tuple[str, torch.Tensor, T]] = entries
def __getitem__(self, index: int) -> Tuple[str, torch.Tensor, T]:
"""Return sequence data for index: (title, sequence as indexes, label)."""
return self.entries[index]
def __len__(self) -> int:
"""Return the number of sequences"""
return len(self.entries)
def sort_by_length(self):
"""Sort entries by sequence length.
Can be used to group sequences by similar lengths for better batching.
"""
self.entries.sort(key = lambda entry: entry[1].size(0))
# TODO way to print back sequences to have way to check parsing step
# TODO add warnings for degenerate labels (all 0, all 1), may indicate parsing error.
def train_test_split(dataset: FastaDataset, test_ratio: float, stratified: bool) -> Tuple[FastaDataset, FastaDataset]:
"""Randomly splits dataset into train and test datasets (with no overlap).
Returns a tuple (train_dataset, test_dataset).
Returned datasets share the same entries as the initial dataset for memory efficiency.
- test_ratio : float in [0, 1], fraction of the entries placed in test dataset.
- stratified : if true, try to split the dataset while keeping the ratio of entries for each label value.
"""
if stratified:
splitter_class = sklearn.model_selection.StratifiedShuffleSplit
else:
splitter_class = sklearn.model_selection.ShuffleSplit
# Both splitters do not need the sequence tensor, only the length and list of labels for Stratified.
# Thus give the label list as X and y, it does not matter that X is fake as all we want are permutation indexes.
assert 0. <= test_ratio < 1.
labels = [label for _, _, label in dataset.entries]
splitter = splitter_class(test_size = test_ratio)
train_indexes, test_indexes = next(splitter.split(X = labels, y = labels)) # split(): iterator of pair of index lists
return (
FastaDataset([dataset[i] for i in train_indexes]),
FastaDataset([dataset[i] for i in test_indexes])
)
def collate_to_padded_sequences_and_labels(fasta_entries: List[Tuple[str, torch.Tensor, T]], padding_value: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Collate function suitable for use in a Dataloader.
Takes a batch of entries from a FastaDataset.
Returns a tuple of tensors suitable for batched CKN computations:
- padded_sequence_indexes: int(batch_size, max_sequence_len)
- sequence_lengths: int(batch_size), with all values in [0, max_sequence_len].
- labels: float(batch_size). float due to loss_fn(label, predicted: float) requiring matching types.
padding_value must be the Alphabet.ambiguous_index from the used alphabet.
Padding is always added to the right of the sequence.
"""
assert 0 <= padding_value < 0xff # Fits uint8
_, sequences, labels = zip(*fasta_entries) # Convert to tuple of lists
# Convert obvious values to tensors
labels = torch.tensor(labels, dtype = torch.float) # Convert to float values.
lengths = torch.tensor([len (s) for s in sequences]) # int(batch_size)
# Pack sequences in single 2D tensor with padding on the right
max_length: int = lengths.max().item()
packed_sequences = torch.full((len(sequences), max_length), padding_value, dtype = torch.uint8)
for index, sequence in enumerate(sequences):
packed_sequences[index, :len(sequence)] = sequence
return (packed_sequences, lengths, labels)
class TestDatasets(TestCase):
def test_fasta_parsing(self):
import io
from .definitions import ALPHABETS, LABEL_TYPES
input = io.StringIO(
">first 1\n"
"ACGT\n"
">second 0 more_comments\n"
"AAAA\n"
)
dna = ALPHABETS["dna"]
dataset = FastaDataset.parse_from(input, dna, LABEL_TYPES["bool"])
self.assertEqual(len(dataset), 2)
# Unittest does not like custom types in tuples, extract manually
title_0, seq_0, label_0 = dataset[0]
self.assertEqual((title_0, label_0), ("first", True))
self.assertEqual(seq_0, dna.bytes_to_index_array(b"ACGT"))
title_1, seq_1, label_1 = dataset[1]
self.assertEqual((title_1, label_1), ("second", False))
self.assertEqual(seq_1, dna.bytes_to_index_array(b"AAAA"))
def test_dataset_manipulation(self):
dataset = FastaDataset([
("a", torch.tensor([0, 1, 2, 3]), True),
("b", torch.tensor([0, 1, 2]), True),
("c", torch.tensor([0, 1]), False),
("d", torch.tensor([4]), False)
])
# split
non_stratified = train_test_split(dataset, 0.5, False)
self.assertEqual(len(non_stratified[0]), len(non_stratified[1]))
stratified = train_test_split(dataset, 0.5, True)
self.assertEqual(len(stratified[0]), len(stratified[1]))
self.assertEqual(
set([True, False]),
set([label for _, _, label in stratified[0]])
)
self.assertEqual(
set([True, False]),
set([label for _, _, label in stratified[1]])
)
# sort
dataset.sort_by_length()
self.assertEqual("d", dataset[0][0])
self.assertEqual("a", dataset[3][0])
def test_collate_fn_fasta(self):
entry_batch = [
("first", torch.tensor([0, 1, 2, 3]), True),
("second", torch.tensor([4, 1]), False)
]
packed, lengths, labels = collate_to_padded_sequences_and_labels(entry_batch, padding_value = 4)
self.assertEqual(lengths, torch.tensor([4, 2]))
self.assertEqual(labels, torch.tensor([1., 0.]))
self.assertEqual(
packed,
torch.tensor([
[0, 1, 2, 3],
[4, 1, 4, 4]
], dtype = torch.uint8)
)
\ No newline at end of file
from typing import Dict, List, Optional, Type
import numpy
import torch
import torch.nn
import unittest
# This value is mostly used to force non-zero divisions during computation.
EPSILON: float = 1e-4
def uses_gpu(device: torch.device) -> bool:
return device.type == "cuda"
class Alphabet:
def __init__(self, name: str, valid_chars: bytes, ambiguous_chars: bytes, complement_chars: Optional[bytes] = None):
"""Create new alphabet.
name: name of the alphabet.
valid_chars: characters of the alphabet.
ambiguous_chars: characters representing unknown / ignored values.
complement_chars: complement characters to 'valid_chars' if applicable (reverse complement in ADN)
"""
assert len(valid_chars) < 0xff # Maximum number of chars that can be supported
self.name: str = name
self.valid_chars: str = valid_chars
self.ambiguous_index: int = len(valid_chars) # Index used for ambiguous chars and padding
# Prepare translation table for bytes_to_index_array (uses bytes.translate)
# Translate character code c to:
# - i in [0, len(valid_chars) - 1] for valid characters, with valid_chars[i] == c.
# - len(valid_chars) if c in ambiguous.
# - 0xff as marker for unrecognized characters, used to trigger errors.
byte_to_index = bytearray([0xff] * 256)
for (i, c) in enumerate(valid_chars):
assert byte_to_index[c] == 0xff, "duplicated character"
byte_to_index[c] = i
for c in ambiguous_chars:
assert byte_to_index[c] == 0xff, "duplicated character"
byte_to_index[c] = self.ambiguous_index
self._byte_to_index: bytes = bytes(byte_to_index)
# Precompute permutation table, so that for c a valid character:
# byte_to_index(complement of c) = permutation[byte_to_index(c)]
# Does not check that this is valid permutation and involution. Not critical as only dna uses it.
if complement_chars is not None:
assert len(valid_chars) == len(complement_chars)
permutation = [0xff] * len(valid_chars)
for (i, c) in enumerate(valid_chars):
index_of_c_complement = byte_to_index[complement_chars[i]]
assert 0 <= index_of_c_complement < len(valid_chars), "invalid complement character"
permutation[i] = index_of_c_complement
self._complement_permutation: List[int] = permutation
else:
self._complement_permutation = None
def __len__(self) -> int:
"""Returns the alphabet size."""
return len(self.valid_chars)
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return 'Alphabet({})'.format(self.name)
def __reduce__(self):
"""Pickle serialization override.
Serialize Alphabets by their name.
Deserialize by using the global instance in ALPHABETS using the name only.
"""
return (_alphabet_from_name, (self.name,))
def bytes_to_index_array(self, sequence: bytes) -> torch.Tensor:
"""Convert a character sequence to a sequence of indexes.
Translated indexes are:
- i if the character is the i-th in the list of valid characters.
- self.ambiguous_index == len(valid_chars) if the character is ambiguous.
An exception is raised in case of unrecognized characters.
"""
index_bytes = sequence.translate(self._byte_to_index)
if 0xff in index_bytes:
raise ValueError("Unrecognized character code in sequence {}".format (index_bytes))
return torch.from_numpy(numpy.frombuffer(index_bytes, dtype = numpy.uint8))
def complement_index_permutation(self) -> Optional[List[int]]:
"""Return the permutation implementing reverse complement in index space if supported, or None.
Returns a list permutation of size len(alphabet), with the given property for c a valid character:
byte_to_index(complement of c) = permutation[byte_to_index(c)].
"""
return self._complement_permutation
ALPHABETS: Dict[str, Alphabet] = dict((str(a), a) for a in [
Alphabet(name = "dna", valid_chars = b"ACGT", ambiguous_chars = b"N", complement_chars = b"TGCA"),
Alphabet(name = "protein", valid_chars = b"ARNDCQEGHILKMFPSTWYV", ambiguous_chars = b"XBZJUO")
])
def _alphabet_from_name(name: str) -> Alphabet:
"""Module global function to load Alphabet from string, used for pickle"""
return ALPHABETS[name]
class LabelType:
"""Group configuration associated to a label type.
- type: Python type
- loss_class: Pytorch loss function constructor used for this type of label.
- stratified_split: can we use stratification when splitting the dataset intro train and test data.
"""
def __init__(self, name: str, type: Type, loss_class: Type[torch.nn.Module], stratified_split: bool):
self.name: str = name
self.type: Type = type
self.loss_class: Type[torch.nn.Module] = loss_class
self.stratified_split: bool = stratified_split
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return 'LabelType({})'.format(self.name)
LABEL_TYPES: Dict[str, LabelType] = dict((str(l), l) for l in [
LabelType(
name = "bool", type = bool,
loss_class = torch.nn.BCEWithLogitsLoss,
stratified_split = True
),
LabelType(
name = "float", type = float,
loss_class = torch.nn.MSELoss,
stratified_split = False
)
])
class TestCase(unittest.TestCase):
""" Override unittest.TestCase to support more types of comparison (numpy, torch.Tensor) """
def setUp(self):
super().setUp()
self.addTypeEqualityFunc(numpy.ndarray, self.assert_numpy_equal)
self.addTypeEqualityFunc(torch.Tensor, self.assert_torch_equal)
def assert_numpy_equal(self, lhs, rhs, msg = None):
if not numpy.array_equal(lhs, rhs):
if msg is None:
msg = "\n{}\n\t!=\n{}".format(lhs, rhs)
raise self.failureException(msg)
def assert_torch_equal(self, lhs, rhs, msg = None):
if not torch.all(torch.eq(lhs, rhs)):
if msg is None:
msg = "\n{}\n\t!=\n{}".format(lhs, rhs)
raise self.failureException(msg)
class TestAlphabet(TestCase):
def test_conversion(self):
alphabet = Alphabet("name", b"ABCD", b"E", b"DCBA")
self.assertEqual(len(alphabet), 4)
self.assertEqual(
alphabet.bytes_to_index_array(b"ABCDE"),
torch.tensor([0, 1, 2, 3, 4], dtype = torch.uint8)
)
with self.assertRaises(ValueError):
alphabet.bytes_to_index_array(b"F")
self.assertEqual(alphabet.complement_index_permutation(), [3, 2, 1, 0])
def test_predefined(self):
self.assertEqual(len(ALPHABETS["dna"]), 4)
self.assertEqual(len(ALPHABETS["protein"]), 20)
def test_failures(self):
with self.assertRaises(AssertionError):
Alphabet("name", b"AA", b"F") # Repetition
with self.assertRaises(AssertionError):
Alphabet("name", b"A", b"A") # Repetition
with self.assertRaises(AssertionError):
Alphabet("name", b"A" * 1000, b"F") # Alphabet size
with self.assertRaises(AssertionError):
Alphabet("name", b"AB", b"", b"AC") # Invalid char in complement
def test_serialization(self):
"""Check that serialization with pickle works correctly, by using global instances"""
import pickle
dna = ALPHABETS["dna"]
serialized = pickle.dumps(dna)
deserialized_dna = pickle.loads(serialized)
self.assertIs(dna, deserialized_dna)
\ No newline at end of file
This diff is collapsed.
from typing import Dict, Iterable, List, Optional, Tuple, Type
import enum
import numpy
import scipy.optimize
import torch
import torch.nn
import torch.optim
from .definitions import Alphabet, ALPHABETS, TestCase
from . import layers
class GlobalPoolingType(enum.Enum):
AVERAGE = 1
MAX = 2
# All pooling module classes should be buildable with no arguments.
GLOBAL_POOLING_CLASS: Dict[GlobalPoolingType, Type[torch.nn.Module]] = {
GlobalPoolingType.AVERAGE: layers.MaskedAverage1d,
GlobalPoolingType.MAX: layers.MaskedMax1d,
}
class SupervisedParameters:
"""This class is designed to be serializable with pickle"""
def __init__(self, alphabet: Alphabet):
self.alphabet: Alphabet = alphabet
self.nb_motifs: int = 128
self.motif_size: int = 12
self.kernel_sigma: float = 0.3 #TODO default from alphabet ?
self.global_pooling_type: GlobalPoolingType = GlobalPoolingType.AVERAGE
def create_model(self):
return SupervisedNetwork(self)
class SupervisedNetwork(torch.nn.Module):
def __init__(self, p: SupervisedParameters):
super().__init__()
self.parameters = p # Saved for later serialization ?
self.one_hot_conversion = layers.OneHotConversion(len(p.alphabet))
rc_permutation = p.alphabet.complement_index_permutation()
self.one_hot_conversion_rc = layers.OneHotConversionWithPermutation(rc_permutation) if rc_permutation is not None else None
self.ckn_layer = layers.CknLayer(len(p.alphabet), p.nb_motifs, p.motif_size, layers.ExpActivationFunction(p.kernel_sigma))
self.global_pooling = GLOBAL_POOLING_CLASS[p.global_pooling_type]()
self.classifier = layers.Classifier(in_features = p.nb_motifs, bias = True)
#FIXME classifier / max mode in a better interface...
def classifier_inputs(self, input_batch: torch.Tensor, input_batch_lengths: torch.Tensor) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""Compute inputs of the classifier step : pooled weights for each motif.
input_batch: int(batch_size, length)
input_batch_lengths: int(batch_size)
output: (normal = float(batch_size), rc = float(batch_size) or None).
"""
def pooled_motif_weights(one_hot_input_batch: torch.Tensor) -> torch.Tensor:
"""Performs computation from a one-hot encoded batch to pooled motifs weights.
one_hot_input_batch: float(batch_size, alphabet_size, length)
output: float(batch_size)
"""
feature_batch, feature_lengths = self.ckn_layer(one_hot_input_batch, input_batch_lengths)
return self.global_pooling(feature_batch, feature_lengths)
return (
pooled_motif_weights(self.one_hot_conversion(input_batch)),
pooled_motif_weights(self.one_hot_conversion_rc(input_batch)) if self.one_hot_conversion_rc is not None else None
)
def forward(self, input_batch: torch.Tensor, input_batch_lengths: torch.Tensor) -> torch.Tensor:
"""
input_batch: int(batch_size, length)
input_batch_lengths: int(batch_size)
output: float(batch_size)
"""
# With reverse complement, run computation for both and return max after classifier.
classifier_inputs, classifier_inputs_rc = self.classifier_inputs(input_batch, input_batch_lengths)
return self.classifier(classifier_inputs, classifier_inputs_rc)
# TODO move away
import torch.utils.data
def predicted_output(network: torch.nn.Module, dataset: torch.utils.data.DataLoader, device: torch.device) -> torch.Tensor:
"""Compute predicted outputs for dataset in data loader.
network: (int(batch_size, length), int(batch_size)) -> float(batch_size)
dataset:
- sequences: int(nb_sequences, max_length)
- sequences_len: int(nb_sequences)
device: device used for computation
"""
raise NotImplemented # Use forward which is a public api.
# And similar for classifier input, find a correct api...
def optimize_classifier_lbfgs(
classifier: layers.Classifier, loss_class: Type[torch.nn.Module],
input_batch: torch.Tensor, input_batch_rc: Optional[torch.Tensor],
target_batch: torch.Tensor):
"""Optimize the classifier parameters for the given (input, target) values, and loss function.
Dirties the gradients for the classifier.
input_batch: float(batch_size, in_features)
target_batch: float(batch_size)
loss_class: loss function module name, which supports reduction (ex: MSELoss, BCEWithLogitsLoss)
"""
# Pack tensors in a 1d array for numpy.
def numpy_1d_from(tensors: Iterable[torch.Tensor]) -> numpy.ndarray:
with torch.no_grad():
return torch.cat([
tensor.flatten() for tensor in tensors
]).numpy().astype(numpy.float64)
# Unpack 1d array into tensors.
def set_parameters_from_numpy_1d(module: torch.nn.Module, array: numpy.ndarray):
packed_1d_tensor = torch.from_numpy(array)
(nb_elements,) = packed_1d_tensor.size()
offset: int = 0
with torch.no_grad():
for param in module.parameters():
flattened_size: int = param.numel()
param.copy_(packed_1d_tensor[offset:offset + flattened_size].reshape_as(param))
offset += flattened_size
assert offset == nb_elements
# Return (loss_val: float, grad: array) by evaluting the classifier + loss_fn.
loss_fn = loss_class(reduction = "sum")
def eval_classifier_and_grad(packed_parameters: numpy.ndarray) -> Tuple[float, numpy.ndarray]:
set_parameters_from_numpy_1d(classifier, packed_parameters)
classifier.zero_grad()
loss = loss_fn(classifier(input_batch, input_batch_rc), target_batch)
loss.backward()
packed_gradients = numpy_1d_from(
parameter.grad for parameter in classifier.parameters()
)
return loss.item(), packed_gradients
# Optimize
final_parameters, _, _ = scipy.optimize.fmin_l_bfgs_b(
eval_classifier_and_grad,
x0 = numpy_1d_from(classifier.parameters()),
maxiter = 500,
disp = 0 # No info text
)
set_parameters_from_numpy_1d(classifier, final_parameters)
class Tests(TestCase):
def test_optimizers(self):
"""Dimensions only. Just use it"""
classifier = layers.Classifier(in_features = 10, bias = True)
optimize_classifier_lbfgs(
classifier, torch.nn.MSELoss,
input_batch = torch.rand(20, 10),
input_batch_rc = torch.rand(20, 10),
target_batch = torch.rand(20)
)
\ No newline at end of file
......@@ -3,7 +3,8 @@ import pandas as pd
from scipy import stats
from sklearn.metrics import (
roc_auc_score, log_loss, accuracy_score,
precision_recall_curve, average_precision_score
precision_recall_curve, average_precision_score,
mean_squared_error
)
......@@ -78,16 +79,25 @@ def get_roc(y_true, y_pred, cutoff):
def compute_metrics(y_true, y_pred):
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
class TryOrNan:
def __init__(self, f):
self.f = f
def __call__(self, *args):
try:
return self.f(*args)
except:
return float('nan')
metric = {}
metric['log.loss'] = log_loss(y_true, y_pred)
metric['accuracy'] = accuracy_score(y_true, y_pred > 0.5)
metric['auROC'] = roc_auc_score(y_true, y_pred)
metric['auROC50'] = get_roc(y_true, y_pred, 50)
metric['auPRC'] = average_precision_score(y_true, y_pred)
metric['recall_at_10_fdr'] = recall_at_fdr(y_true, y_pred, 0.10)
metric['recall_at_5_fdr'] = recall_at_fdr(y_true, y_pred, 0.05)
metric["pearson.r"], metric["pearson.p"] = stats.pearsonr(y_true, y_pred)
metric["spearman.r"], metric["spearman.p"] = stats.spearmanr(y_true, y_pred)
metric['log.loss'] = TryOrNan(log_loss)(y_true, y_pred)
metric['mse'] = TryOrNan(mean_squared_error)(y_true, y_pred)
metric['accuracy'] = TryOrNan(accuracy_score)(y_true, y_pred > 0.5)
metric['auROC'] = TryOrNan(roc_auc_score)(y_true, y_pred)
metric['auROC50'] = TryOrNan(get_roc)(y_true, y_pred, 50)
metric['auPRC'] = TryOrNan(average_precision_score)(y_true, y_pred)
metric['recall_at_10_fdr'] = TryOrNan(recall_at_fdr)(y_true, y_pred, 0.10)
metric['recall_at_5_fdr'] = TryOrNan(recall_at_fdr)(y_true, y_pred, 0.05)
metric["pearson.r"], metric["pearson.p"] = TryOrNan(stats.pearsonr)(y_true, y_pred)
metric["spearman.r"], metric["spearman.p"] = TryOrNan(stats.spearmanr)(y_true, y_pred)
df = pd.DataFrame.from_dict(metric, orient='index')
df.columns = ['value']
df.sort_index(inplace=True)
......
# Test and compare LBFGS optimizer implementations.
# Results:
# Pytorch is slower, and finds a worse minimum loss.
# -> keep scipy implementation.
from typing import Iterable, Optional, Tuple, Type
import torch
import torch.nn
import torch.optim
import scipy
import scipy.optimize
class Classifier(torch.nn.Module):
def __init__(self, in_features: int, bias: bool):
super().__init__()
self.in_features = in_features
self.linear = torch.nn.Linear(in_features = in_features, out_features = 1, bias = bias)
def forward(self, input: torch.Tensor, input_rc: Optional[torch.Tensor] = None) -> torch.Tensor:
"""
input: float(batch_size, in_features)
input_rc: float(batch_size, in_features), or None if not using reverse complement.
output: float(batch_size)
"""
if input_rc is not None:
return torch.max(
self.linear(input).squeeze(dim = 1),
self.linear(input_rc).squeeze(dim = 1)
)
else:
return self.linear(input).squeeze(dim = 1)
def optimize_pytorch(
classifier: Classifier, loss_class: Type[torch.nn.Module],
input_batch: torch.Tensor, input_batch_rc: Optional[torch.Tensor],
target_batch: torch.Tensor):
"""Optimize the classifier parameters for the given (input, target) values, and loss function.
Dirties the gradients for the classifier.
input_batch: float(batch_size, in_features)
target_batch: float(batch_size)
loss_class: loss function module name, which supports reduction (ex: MSELoss, BCEWithLogitsLoss)
"""
batch_size, in_features = input_batch.size()
assert target_batch.size() == (batch_size,)
assert in_features == classifier.in_features
loss_fn = loss_class(reduction = "sum") # Original code always uses sum reduction.
optimizer = torch.optim.LBFGS(classifier.parameters(), max_iter = 500)
def compute_loss():
# LBGFS requires a closure doing the computation
optimizer.zero_grad()
loss = loss_fn(classifier(input_batch), target_batch) # float(1), due to reduction
loss.backward() # compute gradients for parameters
return loss
optimizer.step(compute_loss)
def optimize_scipy(
classifier: Classifier, loss_class: Type[torch.nn.Module],
input_batch: torch.Tensor, input_batch_rc: Optional[torch.Tensor],
target_batch: torch.Tensor):
"""Version of optimize_classifier_lbfgs using scipy, for testing and reference."""
import scipy.optimize
import numpy
# Pack tensors in a 1d array for numpy.
def numpy_1d_from(tensors: Iterable[torch.Tensor]) -> numpy.ndarray:
with torch.no_grad():
return torch.cat([
tensor.flatten() for tensor in tensors
]).numpy().astype(numpy.float64)
# Unpack 1d array into tensors.
def set_parameters_from_numpy_1d(module: torch.nn.Module, array: numpy.ndarray):
packed_1d_tensor = torch.from_numpy(array)
(nb_elements,) = packed_1d_tensor.size()
offset: int = 0
with torch.no_grad():
for param in module.parameters():
flattened_size: int = param.numel()
param.copy_(packed_1d_tensor[offset:offset + flattened_size].reshape_as(param))
offset += flattened_size
assert offset == nb_elements
# Return (loss_val: float, grad: array) by evaluting the classifier + loss_fn.
loss_fn = loss_class(reduction = "sum")
def eval_classifier_and_grad(packed_parameters: numpy.ndarray) -> Tuple[float, numpy.ndarray]:
set_parameters_from_numpy_1d(classifier, packed_parameters)
classifier.zero_grad()
loss = loss_fn(classifier(input_batch, input_batch_rc), target_batch)
loss.backward()
packed_gradients = numpy_1d_from(
parameter.grad for parameter in classifier.parameters()
)
return loss.item(), packed_gradients
# Optimize
final_parameters, _, _ = scipy.optimize.fmin_l_bfgs_b(
eval_classifier_and_grad,
x0 = numpy_1d_from(classifier.parameters()),
maxiter = 500,
disp = 0 # No info text
)
set_parameters_from_numpy_1d(classifier, final_parameters)
in_features = 10
nb_sample = 100
loss_class = torch.nn.MSELoss
def compare_optimizers(input, input_rc, target):
classifier_pt = Classifier(in_features = in_features, bias = True)
classifier_sp = Classifier(in_features = in_features, bias = True)
classifier_sp.load_state_dict(classifier_pt.state_dict()) # Start with same param values
# Optimize
optimize_pytorch(classifier_pt, loss_class, input, input_rc, target)
optimize_scipy(classifier_sp, loss_class, input, input_rc, target)
# Compare
with torch.no_grad():
loss_fn = loss_class(reduction = "sum")
print("Loss pytorch", loss_fn(classifier_pt(input, input_rc), target).item())
print("Loss scipy", loss_fn(classifier_sp(input, input_rc), target).item())
for param_pt, param_sp in zip(classifier_pt.parameters(), classifier_sp.parameters()):
print("Param pytorch:", param_pt)
print("Param scipy:", param_sp)
print("##### Linear #####")
compare_optimizers(
input = torch.rand(nb_sample, in_features),
input_rc = None,
target = torch.rand(nb_sample)
)
print("##### Linmax #####")
compare_optimizers(
input = torch.rand(nb_sample, in_features),
input_rc = torch.rand(nb_sample, in_features),
target = torch.rand(nb_sample)
)
\ No newline at end of file
import os
import argparse
import itertools
import ckn.main
import torch.utils.data
import sklearn.model_selection
import sklearn.metrics
from ckn.models import supCKN
import torch
from torch import nn
from torch.optim.lr_scheduler import StepLR, MultiStepLR, ReduceLROnPlateau
import torch.optim as optim
import numpy
np = numpy
from timeit import default_timer as timer
name = 'ori'
def load_args():
parser = argparse.ArgumentParser(
description="CKN-seq for Encode experiments")
parser.add_argument("-f", type=str, dest="filename", required=True, help="Filename")
parser.add_argument(
'--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument(
'--batch-size', type=int, default=128, metavar='M',
help='input batch size for training (default: 128)')
parser.add_argument(
'--epochs', type=int, default=100, metavar='N',
help='number of epochs to train (default: 100)')
parser.add_argument(
"--sigma", dest="kernel_params", default=[0.3],
nargs='+', type=float, help="sigma for each layer (default: [0.3])")
parser.add_argument(
"--rc", action='store_false',
help="use reverse complement mode or not (default: true)")
parser.add_argument(
"--sampling-patches", dest="n_sampling_patches", default=250000,
type=int, help="number of sampled patches (default: 250000)")
parser.add_argument(
"--kfold", dest="kfold", default=5, type=int,
help="k-fold cross validation (default: 5)")
parser.add_argument(
"--ntrials", dest="ntrials", default=1, type=int,
help="number of trials for training (default: 1)")
parser.add_argument(
"--penalty", metavar="penal", dest="penalty", default='l2',
type=str, choices=['l2', 'l1'],
help="regularization used in the last layer (default: l2)")
parser.add_argument( "--preprocessor", type=str, default='standard_row', choices=['standard_row', 'standard_col'], help="preprocessor for unsup CKN (default: standard_row)")
parser.add_argument( "--use-cuda", action='store_true', default=False, help="use gpu (default: False)")
parser.add_argument( "--pooling", default='mean', choices=['mean', 'max'], type=str, help='mean or max global pooling (default: mean)')
parser.add_argument( "--logo", type=str, dest="outdir", default=None, help="generate logos in dir")
args = parser.parse_args()
args.use_cuda = args.use_cuda and torch.cuda.is_available()
# check shape
args.n_layers = 1
torch.manual_seed(args.seed)
if args.use_cuda:
torch.cuda.manual_seed(args.seed)
np.random.seed(args.seed)
return args
def main():
args = load_args()
print(args)
torch.manual_seed(args.seed)
if args.use_cuda:
torch.cuda.manual_seed(args.seed)
np.random.seed(args.seed)
loader_args = {}
if args.use_cuda:
loader_args = {'num_workers': 1, 'pin_memory': True}
args.fit_bias = True
# Load data
alphabet = ckn.main.ALPHABETS["dna"]
(sequences, label_array) = ckn.main.read_dataset_from_fasta(open(args.filename), alphabet)
sequence_array = ckn.main.to_uniform_length_sequence_array(sequences).astype(dtype=numpy.int64)
def to_loader(sequences, labels, shuffle):
dataset = torch.utils.data.TensorDataset(torch.from_numpy(sequences), torch.from_numpy(labels))
return torch.utils.data.DataLoader(dataset, batch_size=args.batch_size, shuffle=shuffle, **loader_args)
test_loader = to_loader(sequence_array, label_array, shuffle=False)
split = sklearn.model_selection.train_test_split(sequence_array, label_array, test_size=0.25)
train_loader = to_loader(split[0], split[2], shuffle=True)
val_loader = to_loader(split[1], split[3], shuffle=False)
def do_training(regularization, pattern_size, pattern_number):
print("##### Training for ({}, {}, {})\n".format(regularization, pattern_size, pattern_number))
model = supCKN(
len(alphabet), [pattern_number], [pattern_size], [1],
kernel_funcs=None,
kernel_args_list=args.kernel_params, alpha=regularization,
reverse_complement=args.rc, fit_bias=args.fit_bias,
global_pool=args.pooling, penalty=args.penalty, scaler=args.preprocessor)
#print(model)
tic = timer()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.ckn_model.parameters(), lr=0.1)
patience = 4
lr_scheduler = ReduceLROnPlateau(
optimizer, factor=0.5, patience=patience, min_lr=1e-4)
model.sup_train(
train_loader, criterion, optimizer, lr_scheduler,
init_train_loader=test_loader, epochs=args.epochs,
val_loader=val_loader,
n_sampling_patches=args.n_sampling_patches,
use_cuda=args.use_cuda)
toc = timer()
training_time = (toc - tic) / 60
print("Testing...")
y_pred, y_true = model.predict(test_loader, proba=True, use_cuda=args.use_cuda)
score = sklearn.metrics.mean_squared_error(y_true, y_pred)
print(score)
print("##### Trained for ({}, {}, {})".format(regularization, pattern_size, pattern_number))
return (model, score, y_pred)
# Find best config
regularization_values = [1e-3, 1e-4, 1e-5, 1e-6][:1]
pattern_size_values = [4, 6, 8][:1]
pattern_number_values = [50, 100]
best = None
for config in itertools.product(regularization_values, pattern_size_values, pattern_number_values):
model, score, y_pred = do_training(*config)
if best is None or best["score"] < score:
best = {
"model": model,
"score": score,
"y_pred": y_pred.numpy(),
"config": config,
}
print("##### Best config: {}".format(best["config"]))
model = best["model"]
y_pred = best["y_pred"]
if args.outdir is not None:
if not os.path.exists(args.outdir):
os.makedirs(args.outdir)
np.save(args.outdir + "/predict", y_pred)
import matplotlib
matplotlib.use('Agg')
from ckn.data.pltlogo import draw_logo, pwm_to_bits
import matplotlib.pyplot as plt
model.cpu()
print("Generating logo...")
pwm_all = model.compute_motif()
np.savez_compressed(args.outdir+'/pwm', pwm=pwm_all)
for i in range(pwm_all.shape[0]):
pwm = pwm_all[i]
bits = pwm_to_bits(pwm, trim=True)
draw_logo(bits, width_scale=0.8, palette='deepbind')
plt.savefig(args.outdir + "/logo_{}.png".format(i))
bits_rc = np.flip(np.flip(bits, axis=0), axis=1)
draw_logo(bits_rc, width_scale=0.8, palette='deepbind')
plt.savefig(args.outdir + "/logo_{}_rc.png".format(i))
plt.close("all")
if __name__ == "__main__":
main()