使用Pytorch構建圖卷積網絡預測化學分子性質
在本文中,我們將通過化學的視角探索圖卷積網絡,我們將嘗試將網絡的特征與自然科學中的傳統模型進行比較,并思考為什么它的工作效果要比傳統的方法好。
圖和圖神經網絡
化學或物理中的模型通常是一個連續函數,例如y=f(x?,x?,x?,…,x),其中x?,x?,x?,…,x是輸入,y是輸出。這種模型的一個例子是確定兩個點電荷q 1和q 2之間的靜電相互作用(或力)的方程,它們之間的距離r存在于具有相對介電常數ε?的介質中,通常稱為庫侖定律。
如果我們不知道這種關系,我們只有多個數據點,每個數據點都包括點電荷(輸出)和相應的輸入之間的相互作用,那么可以擬合人工神經網絡來預測在具有指定介電常數的介質中任何給定分離的任何給定點電荷的相互作用,因為為物理問題創建數據驅動模型相對簡單。
但是考慮從分子結構預測某一特定性質的問題,比如在水中的溶解度。沒有一組明顯的輸入來描述一個分子,雖然可以使用各種特性,例如鍵長、鍵角、不同類型元素的數量、環的數量等等。但是并不能保證任何這樣的任意集合一定能很好地適用于所有分子。
另外與點電荷的例子不同,輸入不一定駐留在連續空間中。例如,我們可以把甲醇、乙醇和丙醇想象成一組鏈長不斷增加的分子;鏈長是一個離散的參數,沒有辦法在甲醇和乙醇之間插入來得到其他分子。具有連續的輸入空間對于計算模型的導數是必不可少的,然后可以用于所選屬性的優化。
為了克服這些問題人們提出了各種分子編碼方法。其中一種方法是使用SMILES和SELFIES等進行文本表示。關于這種表示有大量的文獻,有興趣的話可以搜索閱讀。第二種方法是用圖形表示分子。雖然每種方法都有其優點和缺點,但圖形表示對化學來說更直觀。
圖是一種數學結構,由表示節點之間關系的邊連接的節點組成。分子自然適應這種結構——原子成為節點,化學鍵成為邊緣。圖中的每個節點都由一個向量表示,該向量編碼相應原子的屬性。通常,獨熱編碼模式就足夠了(下一節將對此進行詳細介紹)。這些向量可以堆疊以創建節點矩陣。節點之間的關系——用邊表示——可以通過一個方形鄰接矩陣來描述,其中每個元素a′′?要么是1,要么是0,這取決于兩個節點i和j是否分別由邊連接。對角線元素設置為1,表示自連接,這使得矩陣可以進行卷積。這些節點和鄰接矩陣將作為我們模型的輸入。
神經網絡模型接受一維輸入向量。對于多維輸入,例如圖像則使用一類稱為卷積神經網絡的模型。在我們的例子中,也是二維矩陣作為輸入。圖神經網絡被開發用于操作這樣的節點和鄰接矩陣,將它們轉換成合適的一維向量,然后可以通過普通人工神經網絡的隱藏層來生成輸出。有許多類型的圖神經網絡,如圖卷積網絡、消息傳遞網絡、圖注意網絡等,它們的主要區別在于在圖中的節點和邊之間交換信息的方法不同。由于圖卷積網絡相對簡單,我們將仔細研究它。
圖卷積和池化層
輸入的初始狀態,節點矩陣表示每行中每個原子的獨熱編碼,其中原子序數為n的原子在第n個索引處為1,在其他地方為0。鄰接矩陣表示節點之間的連接。在當前狀態下,節點矩陣不能作為人工神經網絡的輸入,因為:(1)它是二維的,(2)它不是置換不變的,(3)它不是唯一的。在這種情況下,排列不變性意味著無論你如何排列節點,輸入都應該保持不變;同一分子可以由同一節點矩陣的多個排列表示(假設鄰接矩陣中也有適當的排列)。
對于前兩個問題,有一個簡單的解決方案——池化。如果節點矩陣沿著列維度池化,那么它將被簡化為一個排列不變的一維向量。通常,這種池化是一個簡單的均值池化,這意味著最終的池化向量包含節點矩陣中每列的均值。但是池化兩個異構體的節點矩陣,如正戊烷和新戊烷,將產生相同的池化向量,也就是說解決第三個問題還需要其他的方法。
為了使最終的池化向量唯一,需要在節點矩陣中合并一些鄰居信息。對于同分異構體,雖然它們的化學式相同,但它們的結構卻不同。合并鄰居信息的一種簡單方法是對每個節點及其鄰居執行一些操作,例如求和。這可以表示為節點和鄰接矩陣的乘法:鄰接矩陣乘以節點矩陣產生一個更新的節點矩陣,每個節點向量等于它的鄰居節點向量與它自己的和,這個和通過預乘以對角度矩陣的逆,通過每個節點的度(或鄰居的數量)進行歸一化,使其成為鄰居的平均值。最后這個乘積后乘一個權重矩陣,整個操作稱為圖卷積。下圖顯示了一個直觀而簡單的圖卷積形式
Thomas Kipf和Max Welling的工作提供了一個數學上更嚴格和數值上更穩定的形式,使用鄰接矩陣的修改歸一化。卷積和池化操作的組合也可以被解釋為經驗群體貢獻方法的非線性形式。
圖卷積網絡的最終結構如下:首先計算給定分子的節點矩陣和鄰接矩陣,然后池化以產生包含有關分子的所有信息的單個向量。通過標準人工神經網絡的隱藏層來產生輸出。隱藏層、池化層和卷積層的權重通過應用于基于回歸的損失函數(如均方損失)的反向傳播同時確定。
Pytorch代碼實現
上面我們討論了圖卷積網絡相關的所有關鍵思想,下面開始使用PyTorch進行簡單的實現。雖然有一個靈活的、高性能的gnn框架PyTorch Geometric,但我們不會使用它,因為我們的目標是深入我們的理解。
1、使用RDKit創建圖
RDKit是一個化學信息學庫,允許高通量訪問小分子的特性。我們將需要它完成兩個任務——將分子中每個原子的原子序數變為1——對節點矩陣進行編碼并獲得鄰接矩陣。我們假設分子是根據它們的SMILES字符串提供的(這對于大多數化學信息學數據都是正確的)。為了確保節點矩陣和鄰接矩陣的大小在所有分子中都是一致的(這在默認情況下是不可能的,因為兩者的大小都取決于分子中的原子數量)我們用0填充矩陣。
除此以外我們將對上面提出的卷積進行一個小的修改——將鄰接矩陣中的“1”替換為相應鍵長的倒數。通過這種方式,網絡將獲得更多關于分子幾何形狀的信息,并且它還將根據相鄰鍵的長度對每個節點周圍的卷積進行加權。
class Graph:
def __init__(
self, molecule_smiles: str,
node_vec_len: int,
max_atoms: int = None
):
# Store properties
self.smiles = molecule_smiles
self.node_vec_len = node_vec_len
self.max_atoms = max_atoms
# Call helper function to convert SMILES to RDKit mol
self.smiles_to_mol()
# If valid mol is created, generate a graph of the mol
if self.mol is not None:
self.smiles_to_graph()
def smiles_to_mol(self):
# Use MolFromSmiles from RDKit to get molecule object
mol = Chem.MolFromSmiles(self.smiles)
# If a valid mol is not returned, set mol as None and exit
if mol is None:
self.mol = None
return
# Add hydrogens to molecule
self.mol = Chem.AddHs(mol)
def smiles_to_graph(self):
# Get list of atoms in molecule
atoms = self.mol.GetAtoms()
# If max_atoms is not provided, max_atoms is equal to maximum number
# of atoms in this molecule.
if self.max_atoms is None:
n_atoms = len(list(atoms))
else:
n_atoms = self.max_atoms
# Create empty node matrix
node_mat = np.zeros((n_atoms, self.node_vec_len))
# Iterate over atoms and add to node matrix
for atom in atoms:
# Get atom index and atomic number
atom_index = atom.GetIdx()
atom_no = atom.GetAtomicNum()
# Assign to node matrix
node_mat[atom_index, atom_no] = 1
# Get adjacency matrix using RDKit
adj_mat = rdmolops.GetAdjacencyMatrix(self.mol)
self.std_adj_mat = np.copy(adj_mat)
# Get distance matrix using RDKit
dist_mat = molDG.GetMoleculeBoundsMatrix(self.mol)
dist_mat[dist_mat == 0.] = 1
# Get modified adjacency matrix with inverse bond lengths
adj_mat = adj_mat * (1 / dist_mat)
# Pad the adjacency matrix with 0s
dim_add = n_atoms - adj_mat.shape[0]
adj_mat = np.pad(
adj_mat, pad_width=((0, dim_add), (0, dim_add)), mode="constant"
)
# Add an identity matrix to adjacency matrix
# This will make an atom its own neighbor
adj_mat = adj_mat + np.eye(n_atoms)
# Save both matrices
self.node_mat = node_mat
self.adj_mat = adj_mat
2、在生成Dataset
PyTorch提供了一個方便的Dataset類來存儲和訪問各種數據。我們將用它來獲取節點和鄰接矩陣以及每個分子的輸出。
class GraphData(Dataset):
def __init__(self, dataset_path: str, node_vec_len: int, max_atoms: int):
# Save attributes
self.node_vec_len = node_vec_len
self.max_atoms = max_atoms
# Open dataset file
df = pd.read_csv(dataset_path)
# Create lists
self.indices = df.index.to_list()
self.smiles = df["smiles"].to_list()
self.outputs = df["measured log solubility in mols per litre"].to_list()
def __len__(self):
return len(self.indices)
def __getitem__(self, i: int):
# Get smile
smile = self.smiles[i]
# Create MolGraph object using the Graph abstraction
mol = Graph(smile, self.node_vec_len, self.max_atoms)
# Get node and adjacency matrices
node_mat = torch.Tensor(mol.node_mat)
adj_mat = torch.Tensor(mol.adj_mat)
# Get output
output = torch.Tensor([self.outputs[i]])
return (node_mat, adj_mat), output, smile
除此以外,我們還需要自定義collate函數,來使得dataset可以進行批處理,也就是說把單一的dataset合并成一個batch
def collate_graph_dataset(dataset: Dataset):
# Create empty lists of node and adjacency matrices, outputs, and smiles
node_mats = []
adj_mats = []
outputs = []
smiles = []
# Iterate over list and assign each component to the correct list
for i in range(len(dataset)):
(node_mat,adj_mat), output, smile = dataset[i]
node_mats.append(node_mat)
adj_mats.append(adj_mat)
outputs.append(output)
smiles.append(smile)
# Create tensors
node_mats_tensor = torch.cat(node_mats, dim=0)
adj_mats_tensor = torch.cat(adj_mats, dim=0)
outputs_tensor = torch.stack(outputs, dim=0)
# Return tensors
return (node_mats_tensor, adj_mats_tensor), outputs_tensor, smiles
3、圖卷積網絡
在完成了數據處理之后現在可以構建模型了,這里為了學習將構建自己的卷積層和池化層,但如果在實際使用時可以直接使用PyTorch Geometric模塊。卷積層基本上做三件事-(1)計算鄰接矩陣的反對角度矩陣,(2)四個矩陣的乘法(D?1ANW),(3)對層輸出應用非線性激活函數。與其他PyTorch類一樣,我們將繼承Module基類。
class ConvolutionLayer(nn.Module):
def __init__(self, node_in_len: int, node_out_len: int):
# Call constructor of base class
super().__init__()
# Create linear layer for node matrix
self.conv_linear = nn.Linear(node_in_len, node_out_len)
# Create activation function
self.conv_activation = nn.LeakyReLU()
def forward(self, node_mat, adj_mat):
# Calculate number of neighbors
n_neighbors = adj_mat.sum(dim=-1, keepdims=True)
# Create identity tensor
self.idx_mat = torch.eye(
adj_mat.shape[-2], adj_mat.shape[-1], device=n_neighbors.device
)
# Add new (batch) dimension and expand
idx_mat = self.idx_mat.unsqueeze(0).expand(*adj_mat.shape)
# Get inverse degree matrix
inv_degree_mat = torch.mul(idx_mat, 1 / n_neighbors)
# Perform matrix multiplication: D^(-1)AN
node_fea = torch.bmm(inv_degree_mat, adj_mat)
node_fea = torch.bmm(node_fea, node_mat)
# Perform linear transformation to node features
# (multiplication with W)
node_fea = self.conv_linear(node_fea)
# Apply activation
node_fea = self.conv_activation(node_fea)
return node_fea
接下來,讓我們構造只執行一個操作,即沿第二維(節點數)取平均值的PoolingLayer層。
class PoolingLayer(nn.Module):
def __init__(self):
# Call constructor of base class
super().__init__()
def forward(self, node_fea):
# Pool the node matrix
pooled_node_fea = node_fea.mean(dim=1)
return pooled_node_fea
最后就是創建包含卷積層、池化層和隱藏層的ChemGCN類。我們將對所有的層輸出應用LeakyReLU激活函數,還將使用dropout來最小化過擬合。
class ChemGCN(nn.Module):
def __init__(
self,
node_vec_len: int,
node_fea_len: int,
hidden_fea_len: int,
n_conv: int,
n_hidden: int,
n_outputs: int,
p_dropout: float = 0.0,
):
# Call constructor of base class
super().__init__()
# Define layers
# Initial transformation from node matrix to node features
self.init_transform = nn.Linear(node_vec_len, node_fea_len)
# Convolution layers
self.conv_layers = nn.ModuleList(
[
ConvolutionLayer(
node_in_len=node_fea_len,
node_out_len=node_fea_len,
)
for i in range(n_conv)
]
)
# Pool convolution outputs
self.pooling = PoolingLayer()
pooled_node_fea_len = node_fea_len
# Pooling activation
self.pooling_activation = nn.LeakyReLU()
# From pooled vector to hidden layers
self.pooled_to_hidden = nn.Linear(pooled_node_fea_len, hidden_fea_len)
# Hidden layer
self.hidden_layer = nn.Linear(hidden_fea_len, hidden_fea_len)
# Hidden layer activation function
self.hidden_activation = nn.LeakyReLU()
# Hidden layer dropout
self.dropout = nn.Dropout(p=p_dropout)
# If hidden layers more than 1, add more hidden layers
self.n_hidden = n_hidden
if self.n_hidden > 1:
self.hidden_layers = nn.ModuleList(
[self.hidden_layer for _ in range(n_hidden - 1)]
)
self.hidden_activation_layers = nn.ModuleList(
[self.hidden_activation for _ in range(n_hidden - 1)]
)
self.hidden_dropout_layers = nn.ModuleList(
[self.dropout for _ in range(n_hidden - 1)]
)
# Final layer going to the output
self.hidden_to_output = nn.Linear(hidden_fea_len, n_outputs)
def forward(self, node_mat, adj_mat):
# Perform initial transform on node_mat
node_fea = self.init_transform(node_mat)
# Perform convolutions
for conv in self.conv_layers:
node_fea = conv(node_fea, adj_mat)
# Perform pooling
pooled_node_fea = self.pooling(node_fea)
pooled_node_fea = self.pooling_activation(pooled_node_fea)
# First hidden layer
hidden_node_fea = self.pooled_to_hidden(pooled_node_fea)
hidden_node_fea = self.hidden_activation(hidden_node_fea)
hidden_node_fea = self.dropout(hidden_node_fea)
# Subsequent hidden layers
if self.n_hidden > 1:
for i in range(self.n_hidden - 1):
hidden_node_fea = self.hidden_layers[i](hidden_node_fea)
hidden_node_fea = self.hidden_activation_layers[i](hidden_node_fea)
hidden_node_fea = self.hidden_dropout_layers[i](hidden_node_fea)
# Output
out = self.hidden_to_output(hidden_node_fea)
return out
4、訓練
最后就是編寫輔助函數來訓練和測試我們的模型,并編寫腳本來運行一個制作圖形、構建網絡和訓練模型的工作流。
首先,我們將定義一個標準化類來標準化輸出,這有助于快速收斂
class Standardizer:
def __init__(self, X):
self.mean = torch.mean(X)
self.std = torch.std(X)
def standardize(self, X):
Z = (X - self.mean) / (self.std)
return Z
def restore(self, Z):
X = self.mean + Z * self.std
return X
def state(self):
return {"mean": self.mean, "std": self.std}
def load(self, state):
self.mean = state["mean"]
self.std = state["std"]
然后就是我們的訓練過程:
def train_model(
epoch,
model,
training_dataloader,
optimizer,
loss_fn,
standardizer,
use_GPU,
max_atoms,
node_vec_len,
):
# Create variables to store losses and error
avg_loss = 0
avg_mae = 0
count = 0
# Switch model to train mode
model.train()
# Go over each batch in the dataloader
for i, dataset in enumerate(training_dataloader):
# Unpack data
node_mat = dataset[0][0]
adj_mat = dataset[0][1]
output = dataset[1]
# Reshape inputs
first_dim = int((torch.numel(node_mat)) / (max_atoms * node_vec_len))
node_mat = node_mat.reshape(first_dim, max_atoms, node_vec_len)
adj_mat = adj_mat.reshape(first_dim, max_atoms, max_atoms)
# Standardize output
output_std = standardizer.standardize(output)
# Package inputs and outputs; check if GPU is enabled
if use_GPU:
nn_input = (node_mat.cuda(), adj_mat.cuda())
nn_output = output_std.cuda()
else:
nn_input = (node_mat, adj_mat)
nn_output = output_std
# Compute output from network
nn_prediction = model(*nn_input)
# Calculate loss
loss = loss_fn(nn_output, nn_prediction)
avg_loss += loss
# Calculate MAE
prediction = standardizer.restore(nn_prediction.detach().cpu())
mae = mean_absolute_error(output, prediction)
avg_mae += mae
# Set zero gradients for all tensors
optimizer.zero_grad()
# Do backward prop
loss.backward()
# Update optimizer parameters
optimizer.step()
# Increase count
count += 1
# Calculate avg loss and MAE
avg_loss = avg_loss / count
avg_mae = avg_mae / count
# Print stats
print(
"Epoch: [{0}]\tTraining Loss: [{1:.2f}]\tTraining MAE: [{2:.2f}]"\
.format(
epoch, avg_loss, avg_mae
)
)
# Return loss and MAE
return avg_loss, avg_mae
最后,這個腳本將調用上面定義的所有內容。
#### Fix seeds
np.random.seed(0)
torch.manual_seed(0)
use_GPU = torch.cuda.is_available()
#### Inputs
max_atoms = 200
node_vec_len = 60
train_size = 0.7
batch_size = 32
hidden_nodes = 60
n_conv_layers = 4
n_hidden_layers = 2
learning_rate = 0.01
n_epochs = 50
#### Start by creating dataset
main_path = Path(__file__).resolve().parent
data_path = main_path / "data" / "solubility_data.csv"
dataset = GraphData(dataset_path=data_path, max_atoms=max_atoms,
node_vec_len=node_vec_len)
#### Split data into training and test sets
# Get train and test sizes
dataset_indices = np.arange(0, len(dataset), 1)
train_size = int(np.round(train_size * len(dataset)))
test_size = len(dataset) - train_size
# Randomly sample train and test indices
train_indices = np.random.choice(dataset_indices, size=train_size,
replace=False)
test_indices = np.array(list(set(dataset_indices) - set(train_indices)))
# Create dataoaders
train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)
train_loader = DataLoader(dataset, batch_size=batch_size,
sampler=train_sampler,
collate_fn=collate_graph_dataset)
test_loader = DataLoader(dataset, batch_size=batch_size,
sampler=test_sampler,
collate_fn=collate_graph_dataset)
#### Initialize model, standardizer, optimizer, and loss function
# Model
model = ChemGCN(node_vec_len=node_vec_len, node_fea_len=hidden_nodes,
hidden_fea_len=hidden_nodes, n_cnotallow=n_conv_layers,
n_hidden=n_hidden_layers, n_outputs=1, p_dropout=0.1)
# Transfer to GPU if needed
if use_GPU:
model.cuda()
# Standardizer
outputs = [dataset[i][1] for i in range(len(dataset))]
standardizer = Standardizer(torch.Tensor(outputs))
# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# Loss function
loss_fn = torch.nn.MSELoss()
#### Train the model
loss = []
mae = []
epoch = []
for i in range(n_epochs):
epoch_loss, epoch_mae = train_model(
i,
model,
train_loader,
optimizer,
loss_fn,
standardizer,
use_GPU,
max_atoms,
node_vec_len,
)
loss.append(epoch_loss)
mae.append(epoch_mae)
epoch.append(i)
#### Test the model
# Call test model function
test_loss, test_mae = test_model(model, test_loader, loss_fn, standardizer,
use_GPU, max_atoms, node_vec_len)
#### Print final results
print(f"Training Loss: {loss[-1]:.2f}")
print(f"Training MAE: {mae[-1]:.2f}")
print(f"Test Loss: {test_loss:.2f}")
print(f"Test MAE: {test_mae:.2f}")
結果
我們在開源DeepChem存儲庫的溶解度數據集上進行訓練,該存儲庫包含約1000個小分子的水溶性。下圖顯示了一個特定訓練-測試分層的測試集的訓練損失曲線圖。訓練集和測試集的平均絕對誤差分別為0.59和0.58 (log mol/l),低于線性模型的0.69 log mol/l(基于數據集中的預測)。可以看到神經網絡比線性回歸模型表現得更好;這種比較雖然粗略,但是我們的模型所做的預測是合理的。
總結
我們通過一個簡單的化學問題介紹了圖卷積網絡的基本原理,實現了自己的網絡,并且對比基類模型,證明了我們模型的有效性,這是我們學習GCN的第一步。
本文完整的代碼在GitHub地址如下:
https://github.com/gauravsdeshmukh/ChemGCN
數據集:
https://github.com/deepchem/deepchem