加载和处理数据集
import torch
from torch_geometric.datasets import HGBDataset
from torch_geometric.transforms import RandomLinkSplit
dataset = HGBDataset(root='/tmp/HGB', name='ACM')
data = dataset[0]
transform = RandomLinkSplit(
num_val=0.1, num_test=0.2, is_undirected=True, split_labels=True,
neg_sampling_ratio=1.0,
edge_types=[('paper', 'has-subject', 'subject')]
)
train_data, val_data, test_data = transform(data)
定义 R-GCN 模型
from torch_geometric.nn import RGCNConv
import torch.nn.functional as F
class RGCN(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_relations):
super().__init__()
self.conv1 = RGCNConv(in_channels, hidden_channels, num_relations=num_relations)
self.conv2 = RGCNConv(hidden_channels, out_channels, num_relations=num_relations)
def forward(self, x, edge_index, edge_type):
x = F.relu(self.conv1(x, edge_index, edge_type))
x = self.conv2(x, edge_index, edge_type)
return x
num_relations = len(torch.unique(data.edge_type))
model = RGCN(in_channels=data.num_node_features, hidden_channels=64, out_channels=32, num_relations=num_relations)
训练和测试函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.BCEWithLogitsLoss()
def train():
model.train()
optimizer.zero_grad()
z = model(train_data.x, train_data.edge_index, train_data.edge_type)
loss = criterion(z[train_data.edge_label_index], train_data.edge_label.float())
loss.backward()
optimizer.step()
return loss.item()
def test(data):
model.eval()
with torch.no_grad():
z = model(data.x, data.edge_index, data.edge_type)
loss = criterion(z[data.edge_label_index], data.edge_label.float())
pred = z.sigmoid() > 0.5
correct = pred == data.edge_label.bool()
acc = int(correct.sum()) / int(correct.size(0))
return loss.item(), acc
for epoch in range(100):
loss = train()
val_loss, val_acc = test(val_data)
print(f'Epoch: {epoch+1}, Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
test_loss, test_acc = test(test_data)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')
完整代码
import torch
from torch_geometric.datasets import HGBDataset
from torch_geometric.transforms import RandomLinkSplit
dataset = HGBDataset(root='/tmp/HGB', name='ACM')
data = dataset[0]
transform = RandomLinkSplit(
num_val=0.1, num_test=0.2, is_undirected=True, split_labels=True,
neg_sampling_ratio=1.0,
edge_types=[('paper', 'has-subject', 'subject')]
)
train_data, val_data, test_data = transform(data)
from torch_geometric.nn import RGCNConv
import torch.nn.functional as F
class RGCN(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_relations):
super().__init__()
self.conv1 = RGCNConv(in_channels, hidden_channels, num_relations=num_relations)
self.conv2 = RGCNConv(hidden_channels, out_channels, num_relations=num_relations)
def forward(self, x, edge_index, edge_type):
x = F.relu(self.conv1(x, edge_index, edge_type))
x = self.conv2(x, edge_index, edge_type)
return x
num_relations = len(torch.unique(data.edge_type))
model = RGCN(in_channels=data.num_node_features, hidden_channels=64, out_channels=32, num_relations=num_relations)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.BCEWithLogitsLoss()
def train():
model.train()
optimizer.zero_grad()
z = model(train_data.x, train_data.edge_index, train_data.edge_type)
loss = criterion(z[train_data.edge_label_index], train_data.edge_label.float())
loss.backward()
optimizer.step()
return loss.item()
def test(data):
model.eval()
with torch.no_grad():
z = model(data.x, data.edge_index, data.edge_type)
loss = criterion(z[data.edge_label_index], data.edge_label.float())
pred = z.sigmoid() > 0.5
correct = pred == data.edge_label.bool()
acc = int(correct.sum()) / int(correct.size(0))
return loss.item(), acc
for epoch in range(100):
loss = train()
val_loss, val_acc = test(val_data)
print(f'Epoch: {epoch+1}, Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
test_loss, test_acc = test(test_data)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')