...
 
Commits (1)
......@@ -3,6 +3,8 @@ This is the Pytorch implementation of CKN-seq for reproducing the results of the
>Dexiong Chen, Laurent Jacob, Julien Mairal.
[Biological Sequence Modeling with Convolutional Kernel Networks][4]. preprint BiorXiv. 2018.
#### If you want to reproduce the results in the paper please use the branch `recomb2019`.
CKN-seq is free for academic use only
© 2018 All rights reserved - Inria, CNRS, Univ. Grenoble Alpes
......@@ -18,7 +20,7 @@ We strongly recommend users to use [anaconda][1] to install the following packag
numpy
scipy
scikit-learn=0.19.0
pytorch=0.3.0
pytorch=0.4.1
biopython=1.69
pandas
matplotlib
......@@ -31,41 +33,41 @@ Then we install CKN-seq
pip install .
```
## Reproduction of results for ENCODE ChIP-seq datasets.
or simply run
```
export PYTHONPATH=$PWD:$PYTHONPATH
```
## Training model on ENCODE ChIP-seq and SCOP 1.67 datasets.
Download data from [DeepBind website][3] and put encode folder to data/.
Download data from [DeepBind website][3] and put encode folder to data/. Download data from [jlstm website][6] and put it to data/SCOP167-superfamily/.
#### Supervised training without perturbation
* One-layer model
If you want to reproduce the results for a one-layer CKN-seq
If you want to train a one-layer CKN-seq
```
cd experiments
python train_encode.py --rc --num-layers 1 --num-motifs 16 --len-motifs 12 --subsamplings 1 --kernel-params 0.3 --epochs 100 --batch-size 128 --ntrials 1 --method sup --pooling mean --outdir ../out
python train_encode.py --num-motifs 128 --method sup --outdir ../out
```
You can add `--use-cuda` if you have cuda installed.
If you want to use max pooling, just replace `--pooling mean` with `--pooling max` and set a lager sigma, such as `--kernel-params 0.6`. This setting was used for training models on SCOP 1.67 datasets.
* Two-layer model
on SCOP 1.67
```
cd experiments
python train_encode.py --num-layers 2 --num-motifs 64 16 --len-motifs 12 3 --subsamplings 2 1 --kernel-params 0.3 0.6 --epochs 120 --batch-size 128 --ntrials 1 --method sup --pooling mean --outdir ../out --rc
python train_scop.py --num-motifs 128 --method sup --outdir ../out
```
#### Generation of logos
You can add `--use-cuda` if you have cuda installed. Specify `--tfid INDEX_OF_DATASET` to train a model on a given dataset. Add `--logo` when training and evaluating your model to generate sequence logos.
There are two ways of generating logos
* Two-layer model
1. Add `--logo` when training and evaluating your model.
2. After training a model and save to `../out` then you can run
```
python report_encode.py --rc --num-layers 1 --num-motifs 16 --len-motifs 12 --subsamplings 1 --kernel-params 0.3 --epochs 120 --batch-size 128 --ntrials 1 --method sup --pooling mean --outdir ../out
cd experiments
python train_encode.py --num-layers 2 --num-motifs 64 16 --len-motifs 12 3 --subsamplings 2 1 --kernel-params 0.3 0.6 --method sup --outdir ../out
```
#### Unsupervised training
......@@ -73,37 +75,52 @@ There are two ways of generating logos
CKN-seq also provides an unsupervised training fashion, however the number of filters should be much larger to achieve similar performance as supervised training. We give an example of training a one-layer model here
```
python train_encode.py --rc --num-layers 1 --num-motifs 512 --len-motifs 12 --subsamplings 1 --kernel-params 0.3 --batch-size 200 --ntrials 1 --method unsup --pooling mean --outdir ../out
python train_encode.py --num-motifs 4096 --batch-size 200 --method unsup --outdir ../out
```
#### Augmented training and hybrid training with perturbation
supervised CKN-seq can achieve better performance on _small-scale_ datasets by augmenting training samples with mismatch noises. An example for training with adding 20% mismatch noise to training samples
Supervised CKN-seq can achieve better performance on _small-scale_ datasets by augmenting training samples with mismatch noises. An example for training with adding 20% mismatch noise to training samples
```
python train_encode.py --rc --num-layers 1 --num-motifs 16 --len-motifs 12 --subsamplings 1 --kernel-params 0.3 --epochs 100 --batch-size 128 --ntrials 1 --method augmentation --pooling mean --outdir ../out --perturbation 0.2
python train_encode.py --epochs 150 --method sup --outdir ../out --noise 0.2
```
It can be further improved using the hybrid model described in the paper
It can be further improved using the hybrid model described in the paper once you have trained an unsupervised model saved in `$PATH_TO_A_TRAINED_UNSUP_MODEL`.
```
python train_encode.py --rc --num-layers 1 --num-motifs 16 --len-motifs 12 --subsamplings 1 --kernel-params 0.3 --epochs 100 --batch-size 128 --ntrials 1 --method semisup --pooling mean --outdir ../out --perturbation 0.2 --unsup-dir $PATH_TO_A_TRAINED_UNSUP_MODEL
python train_encode.py --epochs 150 --method sup --outdir ../out --noise 0.2 --unsup-dir $PATH_TO_A_TRAINED_UNSUP_MODEL
```
## Reproduction of results for Zeng's motif occupancy datasets
## Further use of CKN-seq
Download data from [website][5] and put it to data/encode_occupancy/. Then check
If you want to create a CKN-seq network, run
```
python train_encode_occupancy --help
from ckn.models import supCKN, unsupCKN
if method == "sup":
M = supCKN
elif method == "unsup":
M = unsupCKN
n_alphabet = 4
model = M(4, [16], [12], [1], alpha=1e-6, reverse_complement=True, fit_bias=True,
global_pool='mean')
# consult ckn/models.py for more options
```
## Reproduction of results for SCOP 1.67 datasets
Download data from [jlstm website][6] and put it to data/SCOP167-superfamily/.
Then you can train the network with a given data loader
```
python train_scop.py --help
# sup train
model.sup_train(data_loader, criterion, optimizer, lr_scheduler, epochs=100,
val_loader=val_loader, use_cuda=True)
# unsup train with cross validation for regularization parameter
model.unsup_cross_val(train_loader,
n_sampling_patches=args.n_sampling_patches,
alpha_grid=search_grid,
use_cuda=True)
```
[1]: https://anaconda.org
......
from .ops import *
from .kernels import *
......@@ -5,12 +5,12 @@ from __future__ import division
import sys
from Bio import SeqIO
from Bio.Seq import Seq
import numpy as np
import Bio
import csv
import pandas as pd
from .dishuffle import doublet_shuffle
# from Bio.Seq import Seq
import numpy as np
# import Bio
# import csv
import pandas as pd
# from .dishuffle import doublet_shuffle
import torch
from torch.utils.data import Dataset
......@@ -20,99 +20,10 @@ if sys.version_info < (3,):
else:
maketrans = str.maketrans
DNA_TRANS = maketrans('NACGT', '\x00\x01\x02\x03\x04')
DNA_COMP_TRANS = maketrans('ACGT', '\x04\x03\x02\x01')
# def load_fasta(filename, alphabet=None):
# return SeqIO.parse(filename, 'fasta', alphabet=alphabet)
def load_fasta(filename, alphabet=None):
return SeqIO.parse(filename, 'fasta', alphabet=alphabet)
# def load_labels(filename):
# labels_handler = open(filename, 'rU')
# labels = csv.DictReader(labels_handler, delimiter='\t')
# strength = labels.fieldnames[-2]
# return np.array([float(label[strength]) for label in labels], dtype='float32')
# def seq_order(s):
# l = s.split(':')
# chrome = l[0][3:]
# start = int(l[1].split('-')[0])
# return (chrome, start)
def preprocess(filename, outpath, labelname=None, sort=True, remove_missing=False):
records = load_fasta(filename)
labels = None
if labelname is not None:
labels_handler = open(labelname, 'rU')
labels = csv.DictReader(labels_handler, delimiter='\t')
fieldnames = labels.fieldnames
ori_id = labels.fieldnames[-1]
if sort:
records = sorted(records, key=lambda x: seq_order(x.id))
labels = sorted(labels, key=lambda x: seq_order(x[ori_id]))
if remove_missing:
records = [record.upper() for record in records if not 'n' in record and not 'N' in record]
i = 0
new_labels = []
for label in labels:
if i >= len(records):
break
if label[ori_id] == records[i].id:
new_labels.append(label)
i += 1
missing_seq = len(labels) - len(new_labels)
print('removed %d missing sequences' % missing_seq)
labels = new_labels
if labelname is not None:
labels_handler.close()
SeqIO.write(records, outpath+'cleaned_seq.fa', 'fasta')
with open(outpath+'cleaned_label.beb', 'w+') as handler:
writer = csv.DictWriter(handler, fieldnames=fieldnames, delimiter='\t')
writer.writeheader()
writer.writerows(labels)
def seq_to_index(records, alphabet):
"""
transform biological sequences to list of indices
"""
if alphabet == 'DNA':
translator = DNA_TRANS
else:
raise ValueError('not implemented!')
sequences = []
for record in records:
seq = map(int, str(record.seq).translate(translator))
sequences.append(seq)
return sequences
def seq2index(seq, alphabet='DNA'):
"""
transform biological sequence {A,C,G,T} to {1,2,3,4}
"""
if alphabet == 'DNA':
translator = DNA_TRANS
else:
raise ValueError('not implemented!')
seq = seq.translate(translator)
seq = np.fromstring(seq, dtype='uint8')
return seq.astype('int32')
# def revcomp(seq, alphabet='DNA'):
# """
# reverse complement of the given sequence
# """
# if alphabet == 'DNA':
# translator = DNA_COMP_TRANS
# else:
# raise ValueError('not implemented!')
# seq = seq.translate(translator)
# return seq[::-1]
def pad_sequences(sequences, pre_padding=0, maxlen=None, dtype='int32',
padding='pre', truncating='pre', value=0.):
......@@ -146,7 +57,8 @@ def pad_sequences(sequences, pre_padding=0, maxlen=None, dtype='int32',
elif truncating == 'post':
trunc = s[:maxlen]
else:
raise ValueError('Truncating type "%s" not understood' % truncating)
raise ValueError(
'Truncating type "%s" not understood' % truncating)
# check `trunc` has expected shape
trunc = np.asarray(trunc, dtype=dtype)
......@@ -162,36 +74,11 @@ def pad_sequences(sequences, pre_padding=0, maxlen=None, dtype='int32',
raise ValueError('Padding type "%s" not understood' % padding)
return x
def load_data(filename, compression='gzip'):
return pd.read_csv(filename, compression=compression, delimiter='\t')
def df_seq2index(df):
"""convert letters to numbers:
A=1, C=2, G=3, T=4, N=0
"""
df['seq_index'] = df['seq'].apply(seq2index)
return df
def load_seq(filename, compression='gzip', mode='top'):#, data_augment=False, reverse_complement=False):
df = load_data(filename, compression)
if mode == 'top':
df = df.iloc[:500]
df = df_seq2index(df)
return df
def augment_data(df):
df2 = df.copy()
df2['seq'] = df['seq'].apply(doublet_shuffle)
df2['Bound'] = 0
df2['EventID'] += '_background'
df2.index += df.shape[0]
df2 = df_seq2index(df2)
return df2
def augment_noise(df, perturbation=0.1, quantity=10, max_index=4):
def augment(df, noise=0.1, quantity=10, max_index=4):
def add_noise(seq):
new_seq = seq.copy()
mask = np.random.rand(len(seq)) < perturbation
mask = np.random.rand(len(seq)) < noise
new_seq[mask] = np.random.randint(1, max_index+1, size=np.count_nonzero(mask), dtype='int32')
return new_seq
df_list = [df]
......@@ -201,76 +88,39 @@ def augment_noise(df, perturbation=0.1, quantity=10, max_index=4):
df_list.append(new_df)
return pd.concat(df_list)
class TensorDataset(Dataset):
def __init__(self, data_tensor, target_tensor, label_mask=None, count_tensor=None, max_index=4):
def __init__(self, data_tensor, target_tensor, noise=0.0, max_index=4):
assert data_tensor.size(0) == target_tensor.size(0)
self.data_tensor = data_tensor
self.target_tensor = target_tensor
self.label_mask = label_mask
self.count_tensor = count_tensor
self.data_tensor = data_tensor
self.target_tensor = target_tensor
self.noise = noise
self.max_index = max_index + 1
def set_mask(self, mask):
self.label_mask = mask
def __getitem__(self, index):
# if self.label_mask is not None:
# return {'data': self.data_tensor[index], 'label': self.target_tensor[index], 'mask': self.label_mask[index]}
if self.count_tensor is not None:
return {'data': self.data_tensor[index], 'label': self.target_tensor[index], 'count': self.count_tensor[index]}
return {'data': self.data_tensor[index], 'label': self.target_tensor[index]}
if self.noise == 0.:
return self.data_tensor[index], self.target_tensor[index]
data_tensor = self.data_tensor[index].clone()
noise_mask = torch.ByteTensor([0])
if np.random.rand() < 0.5:
noise_mask = torch.ByteTensor([1])
# mask = torch.Tensor(data_tensor.size(0)).uniform_() < self.noise
mask = torch.rand_like(data_tensor, dtype=torch.float) < self.noise
data_tensor[mask] = torch.LongTensor(
mask.sum().item()).random_(1, self.max_index)
return data_tensor, self.target_tensor[index], noise_mask
def __len__(self):
return self.data_tensor.size(0)
def augment(self, perturbation=0.1, quantity=10):
def augment(self, noise=0.1, quantity=10):
if noise <= 0.:
return
new_tensor = [self.data_tensor]
for i in range(quantity - 1):
t = self.data_tensor.clone()
mask = torch.Tensor(t.size()).uniform_() < perturbation
t[mask] = torch.LongTensor(mask.sum()).random_(1, self.max_index)
mask = torch.rand_like(t, dtype=torch.float) < noise
t[mask] = torch.LongTensor(mask.sum().item()).random_(1, self.max_index)
new_tensor.append(t)
new_tensor = torch.cat(new_tensor)
self.data_tensor = new_tensor
self.data_tensor = torch.cat(new_tensor)
self.target_tensor = self.target_tensor.repeat(quantity)
class NoisyDataset(Dataset):
def __init__(self, data_tensor, target_tensor, amount=0.1, max_index=4):
assert data_tensor.size(0) == target_tensor.size(0)
self.data_tensor = data_tensor
self.target_tensor = target_tensor
self.amount = amount
self.max_index = max_index + 1
def __getitem__(self, index):
data_tensor = self.data_tensor[index].clone()
noise_mask = torch.ByteTensor([0])
if np.random.rand() < 0.5:
noise_mask = torch.ByteTensor([1])
mask = torch.Tensor(data_tensor.size(0)).uniform_() < self.amount
data_tensor[mask] = torch.LongTensor(mask.sum()).random_(1, self.max_index)
return {'data': data_tensor, 'label': self.target_tensor[index], 'mask': noise_mask}
def __len__(self):
return self.data_tensor.size(0)
# def rolling_window(a, window):
# shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
# strides = a.strides + (a.strides[-1],)
# return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
# def np_obtain_kmer_weights(seq, k, n_char=20):
# """n_char=4 for DNA 20 for protein
# return len=len(seq) - k + 1
# """
# kmers = rolling_window(seq, k)
# bases = np.logspace(k - 1, 0, num=k, base=n_char)
# kmers_idx = np.sum(kmers * bases, axis=1)
# _, indices, counts = np.unique(kmers_idx, return_inverse=True, return_counts=True)
# counts = 1./counts
# return counts[indices]
if __name__ == "__main__":
import sys
import numpy as np
from collections import defaultdict
import numpy as np
from collections import defaultdict
import sys
if sys.version_info < (3,):
import string
......@@ -9,22 +9,25 @@ else:
_RNA = maketrans('U', 'T')
def seq_to_graph(seq):
graph = defaultdict(list)
for i in range(len(seq) - 1):
graph[seq[i]].append(seq[i+1])
return graph
def graph_to_seq(graph, first_vertex, len_seq):
is_done = False
new_seq = first_vertex
while not is_done:
last_vertex = new_seq[-1]
last_vertex = new_seq[-1]
new_seq += graph[last_vertex][0]
graph[last_vertex].pop(0)
if len(new_seq) >= len_seq:
is_done = True
return new_seq
return new_seq
# function from: http://www.python.org/doc/essays/graphs/
def find_path(graph, start, end, path=[]):
......@@ -36,8 +39,10 @@ def find_path(graph, start, end, path=[]):
for node in graph[start][-1]:
if node not in path:
newpath = find_path(graph, node, end, path)
if newpath: return newpath
return None
if newpath:
return newpath
return None
def sample_new_graph(graph, last_vertex):
new_graph = defaultdict(list)
......
import os
import sys
import numpy as np
import pandas as pd
import torch
from Bio import SeqIO
from collections import defaultdict
from sklearn.model_selection import train_test_split
from .data_helper import pad_sequences, TensorDataset, augment
from .dishuffle import doublet_shuffle
if sys.version_info < (3,):
import string
maketrans = string.maketrans
else:
maketrans = str.maketrans
ALPHABETS = {
'DNA': (
'ACGT',
'\x01\x02\x03\x04'
),
'PROTEIN': (
'ARNDCQEGHILKMFPSTWYV',
'\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14'
),
}
AMBIGUOUS = {
'DNA': ('N', '\x00'),
'PROTEIN': ('XBZJUO', '\x00' * 6),
}
class SeqLoader(object):
def __init__(self, alphabet='DNA'):
self.alphabet, self.code = ALPHABETS[alphabet]
alpha_ambi, code_ambi = AMBIGUOUS[alphabet]
self.translator = maketrans(
alpha_ambi + self.alphabet, code_ambi + self.code)
self.alpha_nb = len(self.alphabet)
def pad_seq(self, seq):
seq = pad_sequences(seq, pre_padding=self.pre_padding,
maxlen=self.maxlen, padding='post',
truncating='post', dtype='int64')
self.maxlen = seq.shape[1]
return seq
def seq2index(self, seq):
seq = seq.translate(self.translator)
seq = np.fromstring(seq, dtype='uint8')
return seq.astype('int64')
def get_ids(self, ids=None):
pass
def get_tensor(self, dataid, split='train', noise=0.0, fixed_noise=0.0,
val_split=0.25, top=False, generate_neg=True):
df = self.load_data(dataid, split)
if fixed_noise > 0.:
df = augment(df, noise=fixed_noise, max_index=self.alpha_nb)
if split == 'train' and generate_neg and hasattr(self, "aug_neg"):
df = self.aug_neg(df)
X, y = df['seq_index'], df['y'].values
X = self.pad_seq(X)
if top:
X, _, y, _ = train_test_split(
X, y, stratify=y, train_size=500)
if split == 'train' and val_split > 0:
X, X_val, y, y_val = train_test_split(
X, y, test_size=val_split, stratify=y)
X, y = torch.from_numpy(X), torch.from_numpy(y)
X_val, y_val = torch.from_numpy(X_val), torch.from_numpy(y_val)
train_dset = TensorDataset(
X, y, noise=noise, max_index=self.alpha_nb)
val_dset = TensorDataset(X_val, y_val, max_index=self.alpha_nb)
return train_dset, val_dset
X, y = torch.from_numpy(X), torch.from_numpy(y)
return TensorDataset(X, y, noise=noise, max_index=self.alpha_nb)
class EncodeLoader(SeqLoader):
def __init__(self, datadir, ext='.seq.gz', maxlen=None,
pre_padding=0):
super(EncodeLoader, self).__init__()
self.datadir = datadir
self.ext = ext
self.pre_padding = pre_padding
self.maxlen = maxlen
def get_ids(self, ids=None):
targetnames = sorted(
[filename.replace("_AC"+self.ext, "") for filename in os.listdir(
self.datadir) if filename.endswith("_AC"+self.ext)])
if ids is not None and ids != []:
targetnames = [
targetnames[tfid] for tfid in ids if tfid < len(targetnames)
]
return targetnames
def load_data(self, tfid, split='train'):
if split == 'train':
filename = "%s/%s_AC%s" % (self.datadir, tfid, self.ext)
else:
filename = "%s/%s_B%s" % (self.datadir, tfid, self.ext)
df = pd.read_csv(filename, compression='gzip', delimiter='\t')
df.rename(columns={'Bound': 'y', 'seq': 'Seq'}, inplace=True)
df['seq_index'] = df['Seq'].apply(self.seq2index)
return df
def aug_neg(self, df):
df2 = df.copy()
df2['Seq'] = df['Seq'].apply(doublet_shuffle)
df2['y'] = 0
df2['EventID'] += '_neg'
df2.index += df2.shape[0]
df2['seq_index'] = df2['Seq'].apply(self.seq2index)
df2 = pd.concat([df, df2])
return df2
class SCOPLoader(SeqLoader):
def __init__(self, datadir='data/SCOP', ext='fasta', maxlen=400,
pre_padding=0):
super(SCOPLoader, self).__init__('PROTEIN')
self.datadir = datadir
self.ext = ext
self.pre_padding = pre_padding
self.maxlen = maxlen
self.filename_tp = datadir + '/{}-{}.{}.' + ext
def get_ids(self, ids=None):
names = sorted([".".join(filename.split('.')[1:-1])
for filename in os.listdir(self.datadir)
if filename.startswith('pos-train')])
if ids is not None and ids != []:
names = [names[index] for index in ids if index < len(names)]
return names
def get_nb(self, dataid, split='train'):
pos = self.filename_tp.format('pos', split, dataid)
neg = self.filename_tp.format('neg', split, dataid)
return self.get_nb_fasta(pos) + self.get_nb_fasta(neg)
def load_data(self, dataid, split='train'):
pos = self.filename_tp.format('pos', split, dataid)
neg = self.filename_tp.format('neg', split, dataid)
table = defaultdict(list)
for y, filename in enumerate([neg, pos]):
for record in SeqIO.parse(filename, self.ext):
seq = str(record.seq).upper()
name = record.id
table['Seq'].append(seq)
table['seq_index'].append(self.seq2index(seq))
table['name'].append(name)
table['y'].append(y)
df = pd.DataFrame(table)
return df
if __name__ == '__main__':
load = EncodeLoader('../../data/encode')
......@@ -4,26 +4,27 @@ from matplotlib.text import TextPath
from matplotlib.patches import PathPatch
from matplotlib.font_manager import FontProperties
import seaborn as sns
sns.set_style("white")
import numpy as np
sns.set_style("white")
import numpy as np
DNA_ALPHABET = ['A', 'C', 'G', 'T']
DEEPBIND_PL = {'G': 'orange',
'A': 'lime',
'C': 'blue',
'T': 'crimson'}
DEEPBIND_PL = {'G': 'orange',
'A': 'lime',
'C': 'blue',
'T': 'crimson'}
MEME_PL = {'G': 'orange',
'A': 'red',
'C': 'blue',
'T': 'darkgreen'}
MEME_PL = {'G': 'orange',
'A': 'red',
'C': 'blue',
'T': 'darkgreen'}
PALLETE = {'meme': MEME_PL, 'deepbind': DEEPBIND_PL}
def trim_pwm(info):
def trim_pwm(info):
max_ic = np.max(info)
ic_threshold = np.max([0.1*max_ic, 0.1]);
ic_threshold = np.max([0.1*max_ic, 0.1])
w = info.shape[0]
pfm_st = 0
pfm_sp = w
......@@ -32,18 +33,20 @@ def trim_pwm(info):
pfm_st += 1
else:
break
for inf in reversed(info):
if inf < ic_threshold:
pfm_sp -= 1
for inf in reversed(info):
if inf < ic_threshold:
pfm_sp -= 1
else:
break
return pfm_st, pfm_sp
return pfm_st, pfm_sp
def pwm_to_bits(pwm, trim=False):
"""pwm: m x 4 dataframe
"""
pwm = np.asarray(pwm)
info = np.asarray(np.log2(pwm.shape[1]) + np.sum(pwm * np.ma.log2(pwm), axis=1, keepdims=True))
info = np.asarray(np.log2(
pwm.shape[1]) + np.sum(pwm * np.ma.log2(pwm), axis=1, keepdims=True))
print("IC: {}".format(np.sum(info)))
bits = pwm * info
if trim:
......@@ -51,14 +54,13 @@ def pwm_to_bits(pwm, trim=False):
return bits[bits_st:bits_sp]
return bits
def draw_logo(bits, width_scale=0.8, font_family="Arial", palette="meme"):
# bits = pwm_to_bits(pwm)
# print(np.sum(bits))
try:
alphabet_inverse = pwm.columns[::-1]
alphabet_inverse = bits.columns[::-1]
except:
alphabet_inverse = DNA_ALPHABET[::-1]
fp = FontProperties(family=font_family, weight="bold")
fp = FontProperties(family=font_family, weight="bold")
COLOR_SCHEME = PALLETE[palette]
......@@ -72,9 +74,12 @@ def draw_logo(bits, width_scale=0.8, font_family="Arial", palette="meme"):
w_scale = 1./w
h_scale = yscale * 0.95 / h
transform = mpl.transforms.Affine2D().scale(w_scale, h_scale) + mpl.transforms.Affine2D().translate(x - 0.5 - w_scale*anchor, y) + ax.transData
txt = PathPatch(text, lw=0, fc=COLOR_SCHEME[letter], transform=transform)
if ax != None:
transform = mpl.transforms.Affine2D().scale(w_scale, h_scale) + \
mpl.transforms.Affine2D().translate(x - 0.5 - w_scale*anchor, y) + \
ax.transData
txt = PathPatch(
text, lw=0, fc=COLOR_SCHEME[letter], transform=transform)
if ax is not None:
ax.add_artist(txt)
return txt
......@@ -84,7 +89,8 @@ def draw_logo(bits, width_scale=0.8, font_family="Arial", palette="meme"):
for i in range(len(bits)):
scores = bits[i]
y = 0
for score, base in sorted(zip(scores[::-1], alphabet_inverse), key=lambda x: x[0]):
for score, base in sorted(
zip(scores[::-1], alphabet_inverse), key=lambda x: x[0]):
draw_base(base, x, y, score, ax)
y += score
x += 1
......@@ -92,7 +98,7 @@ def draw_logo(bits, width_scale=0.8, font_family="Arial", palette="meme"):
ax.set_yticks(range(0, 3))
ax.set_xticks(range(1, x))
sns.despine(ax=ax, trim=True)
ax.set_xlim((0, x))
ax.set_xlim((0, x))
ax.set_ylabel("bits", fontsize=15)
try:
fig.tight_layout()
......@@ -101,13 +107,11 @@ def draw_logo(bits, width_scale=0.8, font_family="Arial", palette="meme"):
if __name__ == '__main__':
from Bio import motifs
from Bio.Seq import Seq
import pandas as pd
import matplotlib.pyplot as plt
from Bio import motifs
from Bio.Seq import Seq
import pandas as pd
instances = [Seq('AACTCACGAAT'), Seq('ACTAAACGAAA'), Seq('ACGACAATAAC')]
m = motifs.create(instances)
df = pd.DataFrame.from_dict(m.pwm)
draw_logo(df, width_scale=0.8, palette='meme')
plt.savefig("test.png", dpi=200)
# -*- coding: utf-8 -*-
import torch
import torch
def exp(x, alpha):
"""Element wise non-linearity
......@@ -9,5 +10,12 @@ def exp(x, alpha):
"""
return torch.exp(alpha*(x - 1.))
def res_exp(x, alpha):
return 0.5*exp(x, alpha) + 0.5*x
def add_exp(x, alpha):
return 0.5 * (exp(x, alpha) + x)
kernels = {
"exp": exp,
"add_exp": add_exp
}
This diff is collapsed.
This diff is collapsed.
import torch
import torch
from torch.autograd import Variable
import numpy as np
from scipy import optimize
from scipy.stats import chi2
import numpy as np
from scipy import optimize
from scipy.stats import chi2
from ckn.utils import proj_on_simplex
def sequences_to_pwm(X):
"""Compute the PFM for the given sequences
X: m x n ndarray, m sequences of length n, the value is
......@@ -19,6 +20,7 @@ def sequences_to_pwm(X):
pwm = pfm/np.sum(pfm, axis=0)
return pwm.T
def compute_pwm(model, max_iter=2000):
seq_model = model[0]
embed_layer = seq_model.embed_layer
......@@ -45,7 +47,6 @@ def compute_pwm(model, max_iter=2000):
threshold = 1. - chi2.ppf(0.9, n - 1) * sigma2 / 2
print("threshold: {}".format(threshold))
for i, motif_idx in enumerate(filter_indices):
if abs_weights[motif_idx] <= max_weight * 0.05:
continue
......@@ -55,48 +56,52 @@ def compute_pwm(model, max_iter=2000):
# print(loss)
# print("threshold: {}".format(threshold))
if loss < threshold:
print("ok")
print("{} is ok".format(i))
pwm_all.append(motif)
pwm_all = np.asarray(pwm_all)
return pwm_all
def optimize_motif(motif_idx, ckn_model, max_iter):
n_layers = ckn_model.n_layers
if n_layers == 1:
motif = ckn_model[-1].filters.data[motif_idx].clone()
motif = ckn_model[-1].weight.data[motif_idx].clone()
proj_on_simplex(motif)
else:
motif = torch.ones(4, ckn_model.len_motif)/4.
motif = Variable(motif, requires_grad=True)
filter_c = ckn_model[-1].filters.data[motif_idx:motif_idx+1].clone()
filter_c = Variable(filter_c, volatile=True)
filter_c = ckn_model[-1]._conv_layer(filter_c)
lintrans = ckn_model[-1]._compute_lintrans()
filter_c = ckn_model[-1]._mult_layer(filter_c, lintrans)
filter_c = ckn_model[-1].weight.data[motif_idx:motif_idx+1].clone()
# filter_c = Variable(filter_c, volatile=True)
with torch.no_grad():
filter_c = ckn_model[-1]._conv_layer(filter_c)
lintrans = ckn_model[-1]._compute_lintrans()
filter_c = ckn_model[-1]._mult_layer(filter_c, lintrans)
filter_c = Variable(filter_c.data, requires_grad=False)
def eval_loss(w):
w = w.reshape(4, -1)
if motif.grad is not None:
motif.grad = None
motif.grad = None
motif.data.copy_(torch.from_numpy(w))
proj_on_simplex(motif.data)
out1 = ckn_model(motif.unsqueeze(0), n_layers-1)
out1 = ckn_model.representation(motif.unsqueeze(0), n_layers-1)
out1 = ckn_model[-1]._conv_layer(out1)
lintrans = ckn_model[-1]._compute_lintrans()
out2 = ckn_model[-1]._mult_layer(out1, lintrans)
loss = torch.norm(out2 - filter_c)**2
loss.backward()
return loss.data[0]
return loss.item()
def eval_grad(w):
dw = motif.grad.data
dw = motif.grad.data
return dw.cpu().numpy().ravel().astype('float64')
w_init = motif.data.cpu().numpy().ravel().astype("float64")
w = optimize.fmin_l_bfgs_b(eval_loss, w_init, fprime=eval_grad, maxiter=max_iter, pgtol=1e-8, m=10, disp=0)
w = optimize.fmin_l_bfgs_b(
eval_loss, w_init, fprime=eval_grad,
maxiter=max_iter, pgtol=1e-8, m=10, disp=0)
if isinstance(w, tuple):
w = w[0]
loss = eval_loss(w)
return motif.data.numpy().T, loss
\ No newline at end of file
return motif.data.numpy().T, loss
# -*- coding: utf-8 -*-
import torch
from torch import nn
import torch
from torch.autograd import Variable
class MatrixInverseSqrt(torch.autograd.Function):
"""Matrix inverse square root for a symmetric definite positive matrix
"""Matrix inverse square root for a symmetric definite positive matrix
"""
def forward(self, input, eps=1e-6):
@staticmethod
def forward(ctx, input, eps=1e-2):
use_cuda = input.is_cuda
input = input.cpu()
e, v = torch.symeig(input, eigenvectors=True)
e_sqrt = torch.sqrt(e) + eps
self.save_for_backward(e_sqrt, v)
e_rsqrt = torch.reciprocal(e_sqrt)
if use_cuda:
e = e.cuda()
v = v.cuda()
e = e.clamp(min=0)
e_sqrt = e.sqrt_().add_(eps)
ctx.e_sqrt = e_sqrt
ctx.v = v
e_rsqrt = e_sqrt.reciprocal()
output = v.mm(torch.diag(e_rsqrt).mm(v.t()))
return output, e_sqrt, v
return output
def backward(self, grad_output, grad_e_sqrt, grad_v):
e_sqrt, v = self.saved_tensors
grad_input = None
@staticmethod
def backward(ctx, grad_output):
e_sqrt, v = Variable(ctx.e_sqrt), Variable(ctx.v)
ei = e_sqrt.expand_as(v)
ej = e_sqrt.view([-1,1]).expand_as(v)
f = torch.reciprocal((ei+ej)*ei*ej)
ej = e_sqrt.view([-1, 1]).expand_as(v)
f = torch.reciprocal((ei + ej) * ei * ej)
grad_input = -v.mm((f*(v.t().mm(grad_output.mm(v)))).mm(v.t()))
return grad_input, None
def matrix_inverse_sqrt(input, eps=1e-10):
def matrix_inverse_sqrt(input, eps=1e-2):
"""Wrapper for MatrixInverseSqrt"""
eps = Variable(torch.Tensor([eps]).type_as(input.data), requires_grad=False)
return MatrixInverseSqrt()(input, eps)[0]
return MatrixInverseSqrt.apply(input, eps)
import os
from Bio import SeqIO
import numpy as np
import torch
# from torch.utils.data import Dataset
from .data_helper import TensorDataset, NoisyDataset
from sklearn.model_selection import train_test_split
import sys
if sys.version_info < (3,):
import string
maketrans = string.maketrans
else:
maketrans = str.maketrans
def pad_sequences(sequences, pre_padding=0, maxlen=None, dtype='int32',
padding='post', truncating='pre', value=0.):
if not hasattr(sequences, '__len__'):
raise ValueError('`sequences` must be iterable.')
lengths = []
for x in sequences:
if not hasattr(x, '__len__'):
raise ValueError('`sequences` must be a list of iterables. '
'Found non-iterable: ' + str(x))
lengths.append(len(x))
num_samples = len(sequences)
if maxlen is None:
maxlen = np.max(lengths) + 2*pre_padding
# take the sample shape from the first non empty sequence
# checking for consistency in the main loop below.
sample_shape = tuple()
for s in sequences:
if len(s) > 0:
sample_shape = np.asarray(s).shape[1:]
break
x = (np.ones((num_samples, maxlen) + sample_shape) * value).astype(dtype)
for idx, s in enumerate(sequences):
if not len(s):
continue # empty list/array was found
pre_pad = [value]*pre_padding
s = np.hstack([pre_pad, s, pre_pad])
if truncating == 'pre':
trunc = s[-maxlen:]
elif truncating == 'post':
trunc = s[:maxlen]
else:
raise ValueError('Truncating type "%s" not understood' % truncating)
# check `trunc` has expected shape
trunc = np.asarray(trunc, dtype=dtype)
if trunc.shape[1:] != sample_shape:
raise ValueError('Shape of sample %s of sequence at position %s is different from expected shape %s' %
(trunc.shape[1:], idx, sample_shape))
if padding == 'post':
x[idx, :len(trunc)] = trunc
elif padding == 'pre':
x[idx, -len(trunc):] = trunc
else:
raise ValueError('Padding type "%s" not understood' % padding)
return x
class SCOPLoader(object):
def __init__(self, datadir='data/SCOP', ext='fasta', maxlen=None, len_motif=None,
pre_padding=0, swissdir='../data/SCOP/uniprot_sprot.fasta'):
self.datadir = datadir
self.ext = ext
self.filename_tp = datadir + '/{}-{}.{}.' + ext
self.dict = ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F',
'P', 'S', 'T', 'W', 'Y', 'V']
self.ambiguous = 'XBZJUO'
self.protein_trans = maketrans(self.ambiguous + ''.join(self.dict),
'\x00'*len(self.ambiguous) + '\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14')
self.maxlen = maxlen
self.pre_padding = pre_padding
self.swissdir = swissdir
if len_motif is None:
self.len_motif = pre_padding + 1
else:
self.len_motif = len_motif
def get_ids(self, indices=None):
names = sorted([".".join(filename.split('.')[1:-1])
for filename in os.listdir(self.datadir)
if filename.startswith('pos-train')])
if indices is not None and indices != []:
names = [names[index] for index in indices if index < len(names)]
return names
def get_nb_fasta(self, filename):
records = SeqIO.parse(filename, self.ext)
return sum([1 for r in records])
def get_nb(self, dataid, split='train'):
pos = self.filename_tp.format('pos', split, dataid)
neg = self.filename_tp.format('neg', split, dataid)
return self.get_nb_fasta(pos), self.get_nb_fasta(neg)
def seq2index(self, seq):
seq = seq.translate(self.protein_trans)
seq = np.fromstring(seq, dtype='uint8').astype('int64')
return seq
def read_fasta(self, filename):
records = SeqIO.parse(filename, self.ext)
sequences = []
names = []
for record in records:
seq = str(record.seq).upper()
name = record.id
sequences.append(self.seq2index(seq))
names.append(name)
# sequences = pad_sequences(sequences)
return sequences, names
def get_swissprot(self, torch_tensor=True, max_size=100000):
if self.swissdir is None:
return None
seq, name = self.read_fasta(self.swissdir)
seq, name = seq[:max_size], name[:max_size]
seq = pad_sequences(seq, pre_padding=self.pre_padding,
maxlen=self.maxlen, truncating='post', dtype='int64')
if torch_tensor:
seq = torch.from_numpy(seq)
return TensorDataset(seq, torch.zeros(len(seq))), name
return seq, name
def get_dataset(self, dataid, split='train', torch_tensor=True, val_split=0, semisup=False, perturbation=0.1, top=False):
pos = self.filename_tp.format('pos', split, dataid)
neg = self.filename_tp.format('neg', split, dataid)
pos_seq, pos_name = self.read_fasta(pos)
neg_seq, neg_name = self.read_fasta(neg)
train_name = pos_name + neg_name
train_seq = pos_seq + neg_seq
train_seq = pad_sequences(train_seq, pre_padding=self.pre_padding,
maxlen=self.maxlen, truncating='post', dtype='int64')
self.maxlen = train_seq.shape[1]
train_label = np.hstack((np.ones(len(pos_seq)), np.zeros(len(neg_seq))))
train_label = train_label.astype('float32')
if top:
train_seq, _, train_label, _ = train_test_split(train_seq, train_label,
stratify=train_label, train_size=500)
if torch_tensor:
if split == 'train' and val_split > 0:
train_seq, val_seq, train_label, val_label = train_test_split(
train_seq, train_label, test_size=val_split, stratify=train_label)
train_seq = torch.from_numpy(train_seq)
train_label = torch.from_numpy(train_label)
val_seq = torch.from_numpy(val_seq)
val_label = torch.from_numpy(val_label)
if semisup:
train_dataset = NoisyDataset(train_seq, train_label, amount=perturbation, max_index=len(self.dict))
else:
train_dataset = TensorDataset(train_seq, train_label, max_index=len(self.dict))
return train_dataset, TensorDataset(val_seq, val_label, max_index=len(self.dict)), train_name
train_seq = torch.from_numpy(train_seq)
train_label = torch.from_numpy(train_label)
return TensorDataset(train_seq, train_label, max_index=len(self.dict)), train_name
return train_seq, train_label, train_name
if __name__ == '__main__':
data_loader = SCOPLoader('SCOP167-superfamily', maxlen=400)
seq, name = data_loader.get_swissprot()
print(len(seq))
print(seq[:10])
# print(data_loader.get_ids())
# print(len(data_loader.get_ids()))
# train_mat, train_label, train_name = data_loader.get_data('a.1.1.2')
# print(train_mat)
# print(train_mat.shape)
# print(train_label.shape)
# print(train_label.sum())
# print(train_name)
import numpy as np
import pandas as pd
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.metrics import roc_auc_score, log_loss, accuracy_score, precision_recall_curve, average_precision_score
from sklearn.metrics import (
roc_auc_score, log_loss, accuracy_score,
precision_recall_curve, average_precision_score
)
def bootstrap_auc(y_true, y_pred, ntrial=10):
n = len(y_true)
aucs = []
for t in range(ntrial):
sample = np.random.randint(0,n,n)
sample = np.random.randint(0, n, n)
try:
auc = roc_auc_score(y_true[sample],y_pred[sample])
auc = roc_auc_score(y_true[sample], y_pred[sample])
except:
return np.nan, np.nan # If any of the samples returned NaN, the whole bootstrap result is NaN
aucs.append(auc)
return np.mean(aucs), np.std(aucs)
def recall_at_fdr(y_true, y_score, fdr_cutoff=0.05):
# print y_true, y_score
precision, recall, thresholds = precision_recall_curve(y_true, y_score)
fdr = 1- precision
fdr = 1 - precision
cutoff_index = next(i for i, x in enumerate(fdr) if x <= fdr_cutoff)
return recall[cutoff_index]
def xroc(res, cutoff):
"""
:type res: List[List[label, score]]
......@@ -49,13 +55,8 @@ def xroc(res, cutoff):
lroc = 0
return lroc
def get_roc(y_true, y_pred, cutoff):
'''
:param y_true:
:param y_pred:
:param cutoff:
:return:
'''
score = []
label = []
......@@ -74,7 +75,9 @@ def get_roc(y_true, y_pred, cutoff):
score = xroc(t_label, cutoff)
return score
def compute_metrics(y_true, y_pred):
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
metric = {}
metric['log.loss'] = log_loss(y_true, y_pred)
metric['accuracy'] = accuracy_score(y_true, y_pred > 0.5)
......@@ -87,4 +90,5 @@ def compute_metrics(y_true, y_pred):
metric["spearman.r"], metric["spearman.p"] = stats.spearmanr(y_true, y_pred)
df = pd.DataFrame.from_dict(metric, orient='index')
df.columns = ['value']
return df
df.sort_index(inplace=True)
return df
# -*- coding: utf-8 -*-
import torch
import math
import numpy as np
import random
import numpy as np
import torch
EPS = 1e-4
def gaussian_filter_1d(size, sigma=None):
"""Create 1D Gaussian filter
......@@ -10,87 +14,124 @@ def gaussian_filter_1d(size, sigma=None):
return torch.ones(1)
if sigma is None:
sigma = (size - 1.)/(2.*math.sqrt(2))
m = (size - 1.)/2.
m = float((size - 1) // 2)
filt = torch.arange(-m, m+1)
filt = torch.exp(-filt.pow(2)/(2.*sigma*sigma))
return filt/torch.sum(filt)
def spherical_kmeans(x, n_clusters, max_iters=1000, verbose=True, init=None):
def init_kmeans(x, n_clusters, n_local_trials=None, use_cuda=False):
n_samples, n_features = x.size()
clusters = torch.Tensor(n_clusters, n_features)
if use_cuda:
clusters = clusters.cuda()
if n_local_trials is None:
n_local_trials = 2 + int(np.log(n_clusters))
clusters[0] = x[np.random.randint(n_samples)]
closest_dist_sq = 1 - clusters[[0]].mm(x.t())
closest_dist_sq = closest_dist_sq.view(-1)
current_pot = closest_dist_sq.sum()
for c in range(1, n_clusters):
rand_vals = np.random.random_sample(n_local_trials) * current_pot
candidate_ids = np.searchsorted(closest_dist_sq.cumsum(-1), rand_vals)
distance_to_candidates = 1 - x[candidate_ids].mm(x.t())
best_candidate = None
best_pot = None
best_dist_sq = None
for trial in range(n_local_trials):
# Compute potential when including center candidate
new_dist_sq = torch.min(closest_dist_sq,
distance_to_candidates[trial])
new_pot = new_dist_sq.sum()
# Store result if it is the best local trial so far
if (best_candidate is None) or (new_pot < best_pot):
best_candidate = candidate_ids[trial]
best_pot = new_pot
best_dist_sq = new_dist_sq
clusters[c] = x[best_candidate]
current_pot = best_pot
closest_dist_sq = best_dist_sq
return clusters
def spherical_kmeans(x, n_clusters, max_iters=100, verbose=True,
init=None, eps=1e-4):
"""Spherical kmeans
Args:
x (Tensor n_samples x n_features): data points
n_clusters (int): number of clusters
"""
use_cuda = x.is_cuda
n_samples, n_features = x.size()
if init is None:
if init == "kmeans++":
print(init)
clusters = init_kmeans(x, n_clusters, use_cuda=use_cuda)
else:
indices = torch.randperm(n_samples)[:n_clusters]
if use_cuda:
indices = indices.cuda()
clusters = x[indices]
indices = torch.arange(0, n_samples).long()
prev_sim = np.inf
prev_sim = np.inf
for n_iter in range(max_iters):
# assign data points to clusters
cos_sim = x.mm(clusters.t())
sim, assign = cos_sim.max(dim=-1)
sim = sim.mean()
tmp, assign = cos_sim.max(dim=-1)
sim = tmp.mean()
if (n_iter + 1) % 10 == 0 and verbose:
print("Spherical kmeans iter {}, objective value {}".format(n_iter+1, sim))
print("Spherical kmeans iter {}, objective value {}".format(
n_iter + 1, sim))
# update clusters
# update clusters
for j in range(n_clusters):
index = indices[assign==j]
if index.dim() == 0:
clusters[j] = x[np.random.randint(n_samples)]
index = assign == j
if index.sum() == 0:
# clusters[j] = x[random.randrange(n_samples)]
idx = tmp.argmin()
clusters[j] = x[idx]
tmp[idx] = 1
else:
xj = x.index_select(0, indices[assign==j])
xj = x[index]
c = xj.mean(0)
clusters[j] = c/c.norm()
clusters[j] = c / c.norm()
if np.abs(prev_sim - sim)/(np.abs(sim)+1e-20) < 1e-6:
break
prev_sim = sim
prev_sim = sim
return clusters
def normalize(x, p=2, dim=-1, return_norm=False, eps=1e-7):
"""Normalize the given Tensor along the given dim
Args:
x (Tensor): matrix to be normalized
p: p-norm for the matrix
dim: along which to normalize
"""
norm = torch.norm(x, p, dim, keepdim=True)
out = x/(norm + eps)
if return_norm:
return out, norm
return out
def filter_normalize_(x, p=2, eps=1e-6):
"""Normalize the given Tensor along the given dim
Args:
x (Tensor out_channels x in_channels x filter_size): filter to be normalized
p: p-norm for the matrix
def normalize_(x, p=2, dim=-1):
norm = x.norm(p=p, dim=dim, keepdim=True)
x.div_(norm.clamp(min=EPS))
return x
def flip(x, dim=-1):
"""Reverse a tensor along given axis
can be removed later when Pytorch updated
"""
norm = x.view(x.size(0), -1).norm(p=p, dim=-1).view(-1, 1, 1)
x.div_(norm + eps)
return norm
reverse_indices = torch.arange(x.size(dim) - 1, -1, -1)
reverse_indices = reverse_indices.type_as(x.data).long()
return x.index_select(dim=dim, index=reverse_indices)
def proj_on_simplex(x, axis=0, r=1., inplace=True):
d = x.size(axis)
mu, indices = torch.sort(x, dim=axis, descending=True)
diag = torch.cumsum(mu, dim=axis) - r
theta = diag/torch.arange(1, d+1).view(-1, 1).expand_as(diag)
theta = diag / torch.arange(1., d+1).view(-1, 1).expand_as(diag)
indices = torch.sum((mu > theta).long(), dim=axis, keepdim=True) - 1
theta = torch.gather(theta, dim=axis, index=indices)
if inplace:
x.add_(-theta).clamp_(min=0)
return x
return x
return torch.clamp(x - theta, min=0)
if __name__ == "__main__":
x = torch.rand(10000,50)
x = normalize(x, dim=-1)
print(x.norm(2, dim=-1))
z = spherical_kmeans(x, 32)
print(z)
print(z.norm(2, dim=-1))
This diff is collapsed.
This diff is collapsed.
import os
import numpy as np
import ckn.motifs as motifs
from ckn.data_helper import pad_sequences
from train_encode import load_tfids, load_data, load_args, process_data
from torch.utils.data import DataLoader
from utils import load_model
import matplotlib
matplotlib.use('Agg')
from ckn.pltlogo import draw_logo, pwm_to_bits
import matplotlib.pyplot as plt
def generate_logo(args):
tfids = load_tfids(args)
for tfid in tfids:
print("tfid: {}".format(tfid))
tfid_outdir = args.outdir + '/{}'.format(tfid)
path = tfid_outdir + '/{}'
model = load_model(path.format("params.txt"), path.format("model.pkl"))
pwm_all = motifs.compute_pwm(model)
# return
logo_outdir = tfid_outdir + "/logo"
if not os.path.exists(logo_outdir):
os.makedirs(logo_outdir)
np.savez_compressed(logo_outdir+'/pwm', pwm=pwm_all)
for i in range(pwm_all.shape[0]):
pwm = pwm_all[i]
bits = pwm_to_bits(pwm, trim=True)
# print(pwm)
draw_logo(bits, width_scale=0.8, palette='deepbind')
plt.savefig(logo_outdir + "/logo_{}.png".format(i))
# plt.savefig(logo_outdir + "/logo_{}.eps".format(i))
bits_rc = np.flip(np.flip(bits, axis=0), axis=1)
draw_logo(bits_rc, width_scale=0.8, palette='deepbind')
plt.savefig(logo_outdir + "/logo_{}_rc.png".format(i))
# plt.savefig(logo_outdir + "/logo_{}_rc.eps".format(i))
plt.close("all")
if __name__ == "__main__":
args = load_args()
print(args)
generate_logo(args)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import torch
from torch import nn
from ckn.models import CKNModel, SeqCKNModel, LinearMax
from ckn.layers import BioEmbedding
import json
import numpy as np
from ckn import kernels
class Struct:
def __init__(self, **entries):
self.__dict__.update(entries)
def load_args(params_path):
with open(params_path, 'r') as f:
params = json.loads(f.read())
args = Struct(**params)
return args
def load_model(args, weights_path):
if isinstance(args, str):
args = load_args(args)
DNA = 4
embed_layer = BioEmbedding(num_embeddings=DNA, reverse_complement=args.rc, mask_zeros=True)
ckn_model = CKNModel(args.n_layers, DNA, args.n_motifs, args.len_motifs, args.subsamplings,
None, args.new_kernel_params, pooling=args.pooling)
ckn_model.train(False)
model = SeqCKNModel(embed_layer, ckn_model)
out_layer = LinearMax(model.out_features, 1, alpha=args