【模型】Encoder-Decoder

时间:2024-10-15 18:40:06

Encoder-Decoder模型是一种神经网络架构,广泛应用于需要将一个序列映射到另一个序列的任务中,最典型的应用是机器翻译。它的核心思想是通过两个独立但相互关联的神经网络来处理输入序列和生成输出序列。

1. 模型结构

Encoder-Decoder模型主要由两个部分组成:

1.1 Encoder(编码器)

编码器的任务是将输入序列编码成一个固定长度的向量表示(即上下文向量)。该部分通常由递归神经网络(RNN)、长短时记忆网络(LSTM)或门控循环单元(GRU)组成,负责逐步读取输入序列。

  • 编码器逐个处理输入序列中的元素(如文本中的单词),并将每一步的信息编码进其隐藏状态。
  • 最后一步的隐藏状态将作为编码器的输出,即上下文向量,它总结了整个输入序列的信息。
1.2 Decoder(解码器)

解码器接收从编码器传递过来的上下文向量,并根据该向量生成输出序列。解码器也是由RNN、LSTM或GRU等神经网络结构组成,它使用上下文向量以及之前生成的输出序列来预测接下来的输出元素。

  • 解码器逐步生成输出序列,在每个时间步生成一个输出。
  • 每个时间步解码器的输入是前一步生成的输出以及隐藏状态。

2. 工作流程

  1. 编码阶段:编码器逐步处理输入序列,将其转换为隐藏状态,最后将整个输入序列的信息编码为固定长度的上下文向量。
  2. 解码阶段:解码器接收上下文向量,并逐步生成目标序列。初始输入通常是特殊的开始标记 <SOS>,然后解码器在每个时间步生成一个新的输出元素。
  3. 预测下一个输出:每一个新的输出都是基于上下文向量和先前生成的输出来预测的。
机器翻译代码示例:
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Encoder(编码器)
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers)

    def forward(self, input_seq, hidden_state):
        # input_seq shape: (seq_len, batch_size)
        embedded = self.embedding(input_seq)
        # embedded shape: (seq_len, batch_size, hidden_size)
        output, (hidden, cell) = self.lstm(embedded, hidden_state)
        return output, (hidden, cell)

    def init_hidden(self, batch_size, device):
        return (torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device),
                torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device))


# Decoder(解码器)
class Decoder(nn.Module):
    def __init__(self, output_size, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input_step, hidden_state):
        # input_step shape: (1, batch_size) [decoding one time step at a time]
        embedded = self.embedding(input_step)
        # embedded shape: (1, batch_size, hidden_size)
        output, (hidden, cell) = self.lstm(embedded, hidden_state)
        # output shape: (1, batch_size, hidden_size)
        output = self.fc(output.squeeze(0))
        # output shape: (batch_size, output_size)
        output = self.softmax(output)
        return output, (hidden, cell)


# Seq2Seq 模型
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, input_seq, target_seq, teacher_forcing_ratio=0.5):
        batch_size = input_seq.size(1)
        target_len = target_seq.size(0)
        target_vocab_size = self.decoder.fc.out_features
        
        # 初始化解码器的输出张量
        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(self.device)

        # 编码器的隐藏状态初始化
        hidden = self.encoder.init_hidden(batch_size, self.device)
        
        # 输入序列通过编码器
        encoder_output, hidden = self.encoder(input_seq, hidden)

        # 解码器的第一个输入是特殊的 <SOS> 标记
        decoder_input = target_seq[0, :]

        # 解码每个时间步
        for t in range(1, target_len):
            decoder_output, hidden = self.decoder(decoder_input.unsqueeze(0), hidden)
            outputs[t] = decoder_output

            # Teacher forcing: 有一定概率使用目标序列作为下一步的输入
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = decoder_output.argmax(1)
            decoder_input = target_seq[t] if teacher_force else top1
        
        return outputs

# 数据预处理

# 简单的英语到法语的句子对示例
pairs = [
    ["i am a student", "je suis un étudiant"],
    ["he is a teacher", "il est un professeur"],
    ["she loves apples", "elle aime les pommes"],
    ["we are friends", "nous sommes amis"]
]

# 构建词汇表
def build_vocab(sentences):
    vocab = set()
    for sentence in sentences:
        for word in sentence.split(' '):
            vocab.add(word)
    word2idx = {word: idx for idx, word in enumerate(vocab, start=2)}
    word2idx['<PAD>'] = 0  # 填充符
    word2idx['<SOS>'] = 1  # 开始符
    word2idx['<EOS>'] = 2  # 结束符
    idx2word = {idx: word for word, idx in word2idx.items()}
    return word2idx, idx2word

# 为英语和法语句子构建词汇表
eng_sentences = [pair[0] for pair in pairs]
fra_sentences = [pair[1] for pair in pairs]
eng_word2idx, eng_idx2word = build_vocab(eng_sentences)
fra_word2idx, fra_idx2word = build_vocab(fra_sentences)

# 将句子转换为索引序列
def sentence_to_indices(sentence, word2idx):
    return [word2idx[word] for word in sentence.split(' ')] + [word2idx['<EOS>']]  # 句尾添加结束符

input_seqs = [sentence_to_indices(pair[0], eng_word2idx) for pair in pairs]
target_seqs = [sentence_to_indices(pair[1], fra_word2idx) for pair in pairs]

# 填充序列到相同长度
def pad_sequences(sequences, max_len, padding_value=0):
    padded_sequences = torch.zeros((len(sequences), max_len), dtype=torch.long)
    for i, seq in enumerate(sequences):
        padded_sequences[i, :len(seq)] = torch.tensor(seq, dtype=torch.long)
    return padded_sequences

# 找到最长句子的长度用于填充
input_max_len = max([len(seq) for seq in input_seqs])
target_max_len = max([len(seq) for seq in target_seqs])

input_seqs_padded = pad_sequences(input_seqs, input_max_len)
target_seqs_padded = pad_sequences(target_seqs, target_max_len)

from torch.utils.data import DataLoader, TensorDataset

# 创建数据加载器
batch_size = 2
dataset = TensorDataset(input_seqs_padded, target_seqs_padded)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 模型训练

# 假设 input_size 和 output_size 是词汇表的大小
input_size = len(eng_word2idx)
output_size = len(fra_word2idx)
hidden_size = 256
num_layers = 1
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 初始化编码器、解码器和 Seq2Seq 模型
encoder = Encoder(input_size, hidden_size, num_layers).to(device)
decoder = Decoder(output_size, hidden_size, num_layers).to(device)
seq2seq = Seq2Seq(encoder, decoder, device).to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=fra_word2idx['<PAD>'])
optimizer = optim.Adam(seq2seq.parameters(), lr=0.001)

# 训练模型
num_epochs = 100
for epoch in range(num_epochs):
    for input_batch, target_batch in data_loader:
        input_batch, target_batch = input_batch.transpose(0, 1).to(device), target_batch.transpose(0, 1).to(device)
        
        optimizer.zero_grad()
        
        hidden = seq2seq.encoder.init_hidden(batch_size=input_batch.size(1), device=device)

        # 前向传播
        output = seq2seq(input_batch, target_batch)
        
        # 计算损失
        output_dim = output.shape[-1]
        loss = criterion(output[1:].reshape(-1, output_dim), target_batch[1:].reshape(-1))
        
        # 反向传播和优化
        loss.backward()
        optimizer.step()

    if epoch % 10 == 0:
        print(f'Epoch: {epoch}, Loss: {loss.item()}')


# 模型测试

def translate(sentence, seq2seq, max_length=10):
    # 将输入句子转换为索引
    input_seq = sentence_to_indices(sentence, eng_word2idx)
    input_seq = torch.tensor(input_seq, dtype=torch.long).unsqueeze(1).to(device)

    # 编码器的隐藏状态初始化
    hidden = seq2seq.encoder.init_hidden(1, device)
    encoder_output, hidden = seq2seq.encoder(input_seq, hidden)

    # 解码器的第一个输入是 <SOS>
    decoder_input = torch.tensor([fra_word2idx['<SOS>']], dtype=torch.long).to(device)
    translated_sentence = []

    for _ in range(max_length):
        decoder_output, hidden = seq2seq.decoder(decoder_input.unsqueeze(0), hidden)
        top1 = decoder_output.argmax(1)
        translated_word = fra_idx2word[top1.item()]

        if translated_word == '<EOS>':
            break
        translated_sentence.append(translated_word)
        decoder_input = top1  # 下一步的输入是当前步的输出

    return ' '.join(translated_sentence)

# 示例翻译
if __name__ == "__main__":
    while True:
        # 等待用户输入
        user_input = input("请输入要翻译的句子 (输入 'q' 退出): ")

        # 退出程序的条件
        if user_input.lower() == 'q':
            print("退出翻译程序")
            break
        
        # 调用翻译函数进行翻译
        translated_sentence = translate(user_input, seq2seq)
        print("翻译结果:", translated_sentence)

  • 训练时:Seq2Seq的forward() 函数用于批量处理源和目标序列,它依赖于源序列(经过Encoder)以及目标序列(逐步传递给Decoder),并且通常会使用教师强制(Teacher Forcing),即在训练时,我们将目标序列的前一个真实值作为Decoder的输入。
  • 推理时(预测时):在推理时,我们不再有目标序列的所有真实值。我们只能使用Decoder自己生成的每个词作为下一步的输入,这种方式被称为自回归生成。因此,推理时需要逐步使用Decoder生成每个单词,而不是一次性传入目标序列。

3. Attention机制的引入

传统的Encoder-Decoder模型有一个问题:当输入序列较长时,将整个输入信息压缩成一个固定长度的上下文向量可能导致信息丢失。这时,Attention机制被引入,用来在解码过程中动态地关注输入序列的不同部分,而不是仅依赖固定的上下文向量。

  • Attention机制允许解码器在生成输出时对输入序列的不同位置给予不同的权重。
  • 它能够提升长序列任务(如长文本翻译)的性能。

4. 典型应用

  • 机器翻译:如将英文句子翻译为中文句子。
  • 图像描述生成:将输入图像编码为向量表示,然后解码为一段文字描述。
  • 文本摘要:输入长文本,输出其简短摘要。
  • 对话生成:输入对话上下文,生成合适的回复。

5. Transformer架构的发展

尽管传统的Encoder-Decoder模型基于RNN、LSTM等架构,但在近几年,Transformer模型成为了主流。Transformer使用完全的Attention机制,不依赖递归结构,可以并行处理数据,因此效率更高,性能更好。

  • Transformer本质上也是一种Encoder-Decoder架构,但它用多头自注意力机制代替了RNN等递归结构。
  • Transformer模型的代表性应用就是BERTGPT等预训练语言模型。