使用R-GCN处理异质图ACM的demo

时间:2024-10-19 19:04:01

加载和处理数据集

import torch
from torch_geometric.datasets import HGBDataset
from torch_geometric.transforms import RandomLinkSplit

# 加载ACM数据集,这是一个包含论文(paper)、主题(subject)以及它们之间关系的异质图数据集
dataset = HGBDataset(root='/tmp/HGB', name='ACM')
data = dataset[0]  # 使用数据集中的第一个图

# 利用RandomLinkSplit将数据集随机划分为训练、验证和测试集
transform = RandomLinkSplit(
    num_val=0.1, num_test=0.2, is_undirected=True, split_labels=True,
    neg_sampling_ratio=1.0,
    edge_types=[('paper', 'has-subject', 'subject')]  # 指定需要划分的边类型
)
train_data, val_data, test_data = transform(data)

定义 R-GCN 模型

from torch_geometric.nn import RGCNConv
import torch.nn.functional as F

class RGCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, num_relations):
        super().__init__()
        # 定义两层RGCN层,每层处理图中的不同关系类型
        self.conv1 = RGCNConv(in_channels, hidden_channels, num_relations=num_relations)
        self.conv2 = RGCNConv(hidden_channels, out_channels, num_relations=num_relations)

    def forward(self, x, edge_index, edge_type):
        # 使用ReLU激活函数处理第一层的输出
        x = F.relu(self.conv1(x, edge_index, edge_type))
        # 第二层RGCN处理并输出节点特征
        x = self.conv2(x, edge_index, edge_type)
        return x

num_relations = len(torch.unique(data.edge_type))  # 计算图中不同关系类型的数量
model = RGCN(in_channels=data.num_node_features, hidden_channels=64, out_channels=32, num_relations=num_relations)



训练和测试函数

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.BCEWithLogitsLoss()

def train():
    model.train()
    optimizer.zero_grad()
    # 前向传播,计算模型在训练数据上的输出
    z = model(train_data.x, train_data.edge_index, train_data.edge_type)
    # 计算二进制交叉熵损失
    loss = criterion(z[train_data.edge_label_index], train_data.edge_label.float())
    loss.backward()
    optimizer.step()
    return loss.item()

def test(data):
    model.eval()
    with torch.no_grad():
        z = model(data.x, data.edge_index, data.edge_type)
        loss = criterion(z[data.edge_label_index], data.edge_label.float())
        # 根据sigmoid阈值判断预测为正类或负类
        pred = z.sigmoid() > 0.5
        # 计算准确率
        correct = pred == data.edge_label.bool()
        acc = int(correct.sum()) / int(correct.size(0))
    return loss.item(), acc

for epoch in range(100):
    loss = train()
    val_loss, val_acc = test(val_data)
    print(f'Epoch: {epoch+1}, Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

test_loss, test_acc = test(test_data)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')

完整代码

import torch
from torch_geometric.datasets import HGBDataset
from torch_geometric.transforms import RandomLinkSplit

# 加载ACM数据集,这是一个包含论文(paper)、主题(subject)以及它们之间关系的异质图数据集
dataset = HGBDataset(root='/tmp/HGB', name='ACM')
data = dataset[0]  # 使用数据集中的第一个图

# 利用RandomLinkSplit将数据集随机划分为训练、验证和测试集
transform = RandomLinkSplit(
    num_val=0.1, num_test=0.2, is_undirected=True, split_labels=True,
    neg_sampling_ratio=1.0,
    edge_types=[('paper', 'has-subject', 'subject')]  # 指定需要划分的边类型
)
train_data, val_data, test_data = transform(data)



from torch_geometric.nn import RGCNConv
import torch.nn.functional as F

class RGCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, num_relations):
        super().__init__()
        # 定义两层RGCN层,每层处理图中的不同关系类型
        self.conv1 = RGCNConv(in_channels, hidden_channels, num_relations=num_relations)
        self.conv2 = RGCNConv(hidden_channels, out_channels, num_relations=num_relations)

    def forward(self, x, edge_index, edge_type):
        # 使用ReLU激活函数处理第一层的输出
        x = F.relu(self.conv1(x, edge_index, edge_type))
        # 第二层RGCN处理并输出节点特征
        x = self.conv2(x, edge_index, edge_type)
        return x

num_relations = len(torch.unique(data.edge_type))  # 计算图中不同关系类型的数量
model = RGCN(in_channels=data.num_node_features, hidden_channels=64, out_channels=32, num_relations=num_relations)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.BCEWithLogitsLoss()

def train():
    model.train()
    optimizer.zero_grad()
    # 前向传播,计算模型在训练数据上的输出
    z = model(train_data.x, train_data.edge_index, train_data.edge_type)
    # 计算二进制交叉熵损失
    loss = criterion(z[train_data.edge_label_index], train_data.edge_label.float())
    loss.backward()
    optimizer.step()
    return loss.item()

def test(data):
    model.eval()
    with torch.no_grad():
        z = model(data.x, data.edge_index, data.edge_type)
        loss = criterion(z[data.edge_label_index], data.edge_label.float())
        # 根据sigmoid阈值判断预测为正类或负类
        pred = z.sigmoid() > 0.5
        # 计算准确率
        correct = pred == data.edge_label.bool()
        acc = int(correct.sum()) / int(correct.size(0))
    return loss.item(), acc

for epoch in range(100):
    loss = train()
    val_loss, val_acc = test(val_data)
    print(f'Epoch: {epoch+1}, Loss: {loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

test_loss, test_acc = test(test_data)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')