Source code for diffnets.nnutils

# supervised autoencoders
import mdtraj as md
import multiprocessing as mp
import numpy as np
import os
import itertools
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.autograd import Variable

[docs]class split_ae(nn.Module): """Unsupervised autoencoder with a split input (i.e. 2 encoders) Parameters ---------- layer_sizes : list List of integers indicating the size of each layer in the encoder including the latent layer. First two must be identical. inds1 : np.ndarray Indices in the training input array that go into encoder1. inds2 : np.ndarray Indices in the training input array that go into encoder2. wm : np.ndarray, shape=(n_inputs,n_inputs) Whitening matrix -- is applied to input data uwm : np.ndarray, shape=(n_inputs,n_inputs) unwhitening matrix """ def __init__(self,layer_sizes,inds1,inds2,wm,uwm): super(split_ae, self).__init__() self.sizes = layer_sizes self.n = len(self.sizes) self.inds1 = inds1 self.inds2 = inds2 self.n_features = len(self.inds1)+len(self.inds2) self.wm1 = wm[self.inds1[:,None],self.inds1] self.wm2 = wm[self.inds2[:,None],self.inds2] self.uwm = uwm self.ratio = len(inds1)/(len(inds1)+len(inds2)) self.encoder1 = nn.ModuleList() self.encoder2 = nn.ModuleList() self.encoder1.append(nn.Linear(len(inds1),len(inds1))) self.encoder2.append(nn.Linear(len(inds2),len(inds2))) for i in range(1,self.n-1): small_layer_in = int(np.ceil(self.sizes[i]*self.ratio)) print(small_layer_in) small_layer_out = int(np.ceil(self.sizes[i+1]*self.ratio)) print(small_layer_out) big_layer_in = int(np.floor(self.sizes[i] * (1-self.ratio))) print(big_layer_in) big_layer_out = int(np.floor(self.sizes[i+1] * (1-self.ratio))) print(big_layer_out) #if small_layer_in < 3: # small_layer_in = 3 # big_layer_in = self.sizes[i]-3 #if small_layer_out < 3: # small_layer_out = 3 # big_layer_out = self.sizes[i]-3 self.encoder1.append(nn.Linear(small_layer_in, small_layer_out)) self.encoder2.append(nn.Linear(big_layer_in,big_layer_out)) self.decoder = nn.ModuleList() for i in range(self.n-1,0,-1): self.decoder.append(nn.Linear(self.sizes[i], self.sizes[i-1])) @property def split_inds(self): return True
[docs] def freeze_weights(self,old_net=None): """Procedure to make the whitening matrix and unwhitening matrix as untrainable layers. Additionally, freezes weights associated with a previously learned encoder layer. Parameters ---------- old_net : split_ae object Previously trained network with overlapping architecture. Weights learned in this previous networks encoder will be frozen in the new network. """ vwm = Variable(torch.from_numpy(self.wm1).type(torch.FloatTensor)) self.encoder1[0].weight.data = vwm vz = Variable(torch.from_numpy(np.zeros(len(self.inds1))).type(torch.FloatTensor)) self.encoder1[0].bias.data = vz vwm2 = Variable(torch.from_numpy(self.wm2).type(torch.FloatTensor)) self.encoder2[0].weight.data = vwm2 vz2 = Variable(torch.from_numpy(np.zeros(len(self.inds2))).type(torch.FloatTensor)) self.encoder2[0].bias.data = vz2 for p in self.encoder1[0].parameters(): p.requires_grad = False for p in self.encoder2[0].parameters(): p.requires_grad = False self.decoder[-1].weight.data = Variable(torch.from_numpy(self.uwm).type(torch.FloatTensor)) self.decoder[-1].bias.data = Variable(torch.from_numpy(np.zeros(self.n_features)).type(torch.FloatTensor)) for p in self.decoder[-1].parameters(): p.requires_grad = False if old_net: n_old = len(old_net.encoder1) for i in range(1,n_old): self.encoder1[i].weight.data = old_net.encoder1[i].weight.data self.encoder1[i].bias.data = old_net.encoder1[i].bias.data for p in self.encoder1[i].parameters(): p.requires_grad = False self.encoder2[i].weight.data = old_net.encoder2[i].weight.data self.encoder2[i].bias.data = old_net.encoder2[i].bias.data for p in self.encoder2[i].parameters(): p.requires_grad = False
[docs] def unfreeze_weights(self): """Makes all encoders weights trainable. """ n_old = len(self.encoder1) for i in range(1,n_old): for p in self.encoder1[i].parameters(): p.requires_grad = True for p in self.encoder2[i].parameters(): p.requires_grad = True
[docs] def encode(self,x): """Pass the data through the encoder to the latent layer. Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data for a given sample Returns ------- lat1 : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector associated with encoder1 lat2 : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector associated with encoder2 """ x1 = x[:,self.inds1] x2 = x[:,self.inds2] lat1 = self.encoder1[0](x1) lat2 = self.encoder2[0](x2) for i in range(1, self.n-1): lat1 = F.leaky_relu(self.encoder1[i](lat1)) lat2 = F.leaky_relu(self.encoder2[i](lat2)) return lat1, lat2
[docs] def decode(self,latent): """Pass the latent space vector through the decoder Parameters ---------- latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data """ recon = latent for i in range(self.n-2): recon = F.leaky_relu(self.decoder[i](recon)) recon = self.decoder[-1](recon) return recon
[docs] def forward(self,x): """Pass data through the entire network Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data for a given sample Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector """ lat1, lat2 = self.encode(x) latent = torch.cat((lat1,lat2),1) recon = self.decode(latent) return recon, latent, None
[docs]class split_sae(split_ae): """Supervised autoencoder with split architecture""" def __init__(self, layer_sizes,inds1,inds2,wm,uwm): super(split_sae, self).__init__(layer_sizes,inds1,inds2,wm,uwm) self.classifier = nn.Linear(self.encoder1[-1].weight.data.shape[0], 1)
[docs] def classify(self, latent): """Perfom classification task using latent space representation Parameters ---------- latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- Value between 0 and 1 """ return torch.sigmoid(self.classifier(latent))
[docs] def forward(self, x): """Pass data through the entire network Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data for a given sample Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector label : Value between 0 and 1 """ lat1, lat2 = self.encode(x) label = self.classify(lat1) latent = torch.cat((lat1,lat2),1) recon = self.decode(latent) return recon, latent, label
[docs]class ae(nn.Module): """Unsupervised autoencoder Parameters ---------- layer_sizes : list List of integers indicating the size of each layer in the encoder including the latent layer. First two must be identical. wm : np.ndarray, shape=(n_inputs,n_inputs) Whitening matrix -- is applied to input data uwm : np.ndarray, shape=(n_inputs,n_inputs) unwhitening matrix """ def __init__(self, layer_sizes,wm,uwm): super(ae, self).__init__() self.sizes = layer_sizes self.n = len(self.sizes) self.wm = wm self.uwm = uwm self.encoder = nn.ModuleList() for i in range(self.n-1): self.encoder.append(nn.Linear(self.sizes[i], self.sizes[i+1])) self.decoder = nn.ModuleList() for i in range(self.n-1, 0, -1): self.decoder.append(nn.Linear(self.sizes[i], self.sizes[i-1]))
[docs] def freeze_weights(self,old_net=None): """Procedure to make the whitening matrix and unwhitening matrix as untrainable layers. Additionally, freezes weights associated with a previously learned encoder layer. Parameters ---------- old_net : ae object Previously trained network with overlapping architecture. Weights learned in this previous networks encoder will be frozen in the new network. """ self.encoder[0].weight.data = Variable(torch.from_numpy(self.wm).type(torch.FloatTensor)) self.encoder[0].bias.data = Variable(torch.from_numpy(np.zeros(len(self.wm))).type(torch.FloatTensor)) for p in self.encoder[0].parameters(): p.requires_grad = False self.decoder[-1].weight.data = Variable(torch.from_numpy(self.uwm).type(torch.FloatTensor)) self.decoder[-1].bias.data = Variable(torch.from_numpy(np.zeros(len(self.uwm))).type(torch.FloatTensor)) for p in self.decoder[-1].parameters(): p.requires_grad = False if old_net: n_old = len(old_net.encoder) for i in range(1,n_old): self.encoder[i].weight.data = old_net.encoder[i].weight.data self.encoder[i].bias.data = old_net.encoder[i].bias.data for p in self.encoder[i].parameters(): p.requires_grad = False
[docs] def unfreeze_weights(self): """Makes all encoders weights trainable. """ n_old = len(self.encoder) for i in range(1,n_old): for p in self.encoder[i].parameters(): p.requires_grad = True
[docs] def encode(self, x): """Pass the data through the encoder to the latent layer. Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data for a given sample Returns ------- latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector associated with encoder1 """ # whiten, without applying non-linearity latent = self.encoder[0](x) # do non-linear layers for i in range(1, self.n-1): latent = F.leaky_relu(self.encoder[i](latent)) return latent
[docs] def decode(self, x): """Pass the latent space vector through the decoder Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data """ # do non-linear layers recon = x for i in range(self.n-2): recon = F.leaky_relu(self.decoder[i](recon)) # unwhiten, without applying non-linearity recon = self.decoder[-1](recon) return recon
[docs] def forward(self, x): """Pass data through the entire network Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data for a given sample Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector """ latent = self.encode(x) recon = self.decode(latent) # None for labels, so number returns same as sae class return recon, latent, None
# build classifier based on latent representation from an AE
[docs]class classify_ae(nn.Module): """Logistic Regression model Parameters ---------- n_latent : int Number of latent variables """ def __init__(self, n_latent): super(classify_ae, self).__init__() self.n_latent = n_latent self.fc1 = nn.Linear(self.n_latent, 1)
[docs] def classify(self, x): """Perfom classification task using latent space representation Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- Value between 0 and 1 """ return torch.sigmoid(self.fc1(x))
[docs] def forward(self, x): """Perfom classification task using latent space representation Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- Value between 0 and 1 """ return self.classify(x)
[docs]class sae(ae): """Supervised autoencoder Parameters ---------- layer_sizes : list List of integers indicating the size of each layer in the encoder including the latent layer. First two must be identical. wm : np.ndarray, shape=(n_inputs,n_inputs) Whitening matrix -- is applied to input data uwm : np.ndarray, shape=(n_inputs,n_inputs) unwhitening matrix """ def __init__(self, layer_sizes, wm, uwm): super(sae, self).__init__(layer_sizes,wm,uwm) self.classifier = nn.Linear(self.sizes[-1], 1)
[docs] def classify(self, latent): """Perfom classification task using latent space representation Parameters ---------- latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- Value between 0 and 1 """ return torch.sigmoid(self.classifier(latent))
[docs] def forward(self, x): """Pass through the entire network Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector Returns ------- recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstruction of the original input data latent : torch.cuda.FloatTensor or torch.FloatTensor Latent space vector label : Value between 0 and 1 """ latent = self.encode(x) label = self.classify(latent) recon = self.decode(latent) return recon, latent, label
[docs]class vae(ae): def __init__(self, layer_sizes): super(vae, self).__init__(layer_sizes) # last layer of encoder is mu, also need logvar of equal size self.logvar = nn.Linear(self.sizes[-2], self.sizes[-1])
[docs] def encode(self, x): # whiten, without applying non-linearity latent = self.encoder[0](x) # do non-linear layers for i in range(1, self.n-2): latent = F.leaky_relu(self.encoder[i](latent)) mu = F.leaky_relu(self.encoder[-1](latent)) logvar = F.leaky_relu(self.logvar(latent)) return mu, logvar
[docs] def reparameterize(self, mu, logvar): std = torch.exp(0.5*logvar) eps = torch.randn_like(std) return eps.mul(std).add_(mu)
[docs] def forward(self, x): mu, logvar = self.encode(x) z = self.reparameterize(mu, logvar) recon = self.decode(z) # None for labels, so number returns same as sae class return recon, mu, logvar, None
[docs]class svae(vae): def __init__(self, layer_sizes): super(svae, self).__init__(layer_sizes) self.classifier = nn.Linear(self.sizes[-1], 1)
[docs] def classify(self, latent): return torch.sigmoid(self.classifier(latent))
[docs] def forward(self, x): mu, logvar = self.encode(x) z = self.reparameterize(mu, logvar) label = self.classify(z) recon = self.decode(z) # None for labels, so number returns same as sae class return recon, mu, logvar, label
[docs]def my_mse(x, x_recon): """Calculate mean squared error loss Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data x_recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstructed input Returns ------- torch.cuda.FloatTensor or torch.FloatTensor """ return torch.mean(torch.pow(x-x_recon, 2))
[docs]def my_l1(x, x_recon): """Calculate l1 loss Parameters ---------- x : torch.cuda.FloatTensor or torch.FloatTensor Input data x_recon : torch.cuda.FloatTensor or torch.FloatTensor Reconstructed input Returns ------- torch.cuda.FloatTensor or torch.FloatTensor """ return torch.mean(torch.abs(x-x_recon))
[docs]def chunks(arr, chunk_size): """Yield successive chunk_size chunks from arr.""" for i in range(0, len(arr), chunk_size): yield arr[i:i + chunk_size]
[docs]def split_inds(pdb,resnum,focus_dist): """Identify indices close and far from a residue of interest. Each index corresponds to an X,Y, or Z coordinate of an atom in the pdb. Parameters ---------- pdb : md.Trajectory object Structure used to find close/far indices. resnum : integer The residue number of interest. focus_dist : float (nannmeters) All indices within this distance of resnum will be selected as close indices. Returns ------- close_xyz_inds : np.ndarray Indices of x,y,z positions of atoms in pdb that are close to resnum. non_close_xyz_inds : np.ndarray Indices of x,y,z positions of atoms in pdb that are not close to resnum. """ res_atoms = pdb.topology.select("resSeq %s" % resnum) dist_combos = [res_atoms,np.arange(pdb.n_atoms)] dist_combos = np.array(list(itertools.product(*dist_combos))) dpdb = md.compute_distances(pdb,dist_combos) ind_loc = np.where(dpdb.flatten()<focus_dist)[0] inds = np.unique(dist_combos[ind_loc].flatten()) close_xyz_inds = [] for i in inds: close_xyz_inds.append(i*3) close_xyz_inds.append((i*3)+1) close_xyz_inds.append((i*3)+2) all_inds = np.arange((pdb.n_atoms*3)) non_close_xyz_inds = np.setdiff1d(all_inds,close_xyz_inds) return np.array(close_xyz_inds), non_close_xyz_inds