In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, GPT2LMHeadModel, GPT2Model


torch.manual_seed(12046)



In [2]:
llm = GPT2LMHeadModel.from_pretrained('gpt2')
tokenizer = AutoTokenizer.from_pretrained('gpt2')

In [3]:
class RewardModel(nn.Module):

 def __init__(self, model):
 '''
 评分模型
 参数
 ----
 model :嵌入模型
 '''
 super().__init__()
 self.embedding = model
 # 评分建模头
 self.score = nn.Linear(model.embed_dim, 1, bias=False)

 def forward(self, x, seq_len=None):
 '''
 向前传播
 参数
 ----
 x :torch.LongTensor,文本,形状为(B, T)或者(B, T, vs),其中vs表示字典大小
 seq_len :torch.LongTensor,文本的实际长度,形状为(B)
 返回
 ----
 score :torch.FloatTensor,评分,形状为(B, 1)
 '''
 
 B = x.shape[0]
 T = x.shape[1]
 # 文本的嵌入向量
 emb = self.get_last_hidden_state(x) # (B, T, C)
 ind = torch.arange(B, device=x.device)
 # 如果没有传入seq_len,则所有文本的实际长度都等于T
 if seq_len == None:
 seq_len = torch.tensor([T] * B)
 # 获取最后一个词元的特征
 pooled_emb = emb[ind, seq_len - 1] # (B, C)
 score = self.score(pooled_emb) # (B, 1)
 return score
 
 def get_last_hidden_state(self, x):
 '''
 获取文本的嵌入向量
 '''
 # 普通情况下,x的形状为(B, T)
 if len(x.shape) == 2:
 emb = self.embedding(x).last_hidden_state # (B, T, C)
 # 如果使用了gumbel_softmax,则x的形状为(B, T, vs)
 # 这种情况下,需要直接与embedding的模型参数进行计算
 else:
 w = self.embedding.get_input_embeddings().weight # (vs, C)
 inputs_embeds = x @ w # (B, T, vs) @ (vs, C) --> (B, T, C)
 emb = self.embedding(inputs_embeds=inputs_embeds).last_hidden_state
 return emb

r_model = RewardModel(GPT2Model.from_pretrained('gpt2'))

In [4]:
# 验证评分模型计算正确
# x的形状是(B, T),x_hot的形状是(B, T, vs)
x = torch.randint(0, tokenizer.vocab_size, (3, 4))
x_hot = F.one_hot(x, num_classes=tokenizer.vocab_size).float()
(r_model(x) - r_model(x_hot)).abs().max()

tensor(0., grad_fn=)

In [5]:
class RLModel(nn.Module):
 
 def __init__(self, llm, r_model):
 '''
 大语言模型与评分模型的拼接(错误方式)
 参数
 ----
 llm :大语言模型
 r_model :评分模型
 '''
 super().__init__()
 self.llm = llm
 self.r_model = r_model
 # 冻结模型
 for param in r_model.parameters():
 param.requires_grad = False
 
 def generate(self, idx, max_new_tokens):
 '''
 利用大语言模型生成文本(反复使用模型进行预测)
 参数
 ----
 idx :torch.LongTensor,背景文本,形状为(1, T)
 max_new_tokens :int,生成文本的最大长度
 '''
 model = self.llm
 for _ in range(max_new_tokens):
 logits = model(input_ids=idx).logits
 logits = logits[:, -1, :]
 probs = F.softmax(logits, dim=-1)
 # 根据概率,随机生成下一个词元
 idx_next = torch.multinomial(probs, num_samples=1)
 idx = torch.cat((idx, idx_next), dim=1)
 return idx
 
 def forward(self, idx):
 '''
 利用大语言模型生成文本,再使用评分模型对生成文本进行评分
 参数
 ----
 idx :torch.LongTensor,背景文本,形状为(1, T)
 返回
 ----
 reward :torch.FloatTensor,评分,形状为(1, 1)
 '''
 # 为了代码简洁,我们设置产生文本的长度
 ans = self.generate(idx, 20)
 # 对文本进行评分
 reward = self.r_model(ans)
 return reward

In [6]:
inputs = '1 + 2 = 3, 2 + 1 = 3, 1 + 2 ='
ids = tokenizer(inputs, return_tensors='pt')
model = RLModel(llm, r_model)

In [7]:
# 验证generate是正确的
print(tokenizer.decode(model.generate(ids['input_ids'], 20)[0], skip_special_tokens=True))

1 + 2 = 3, 2 + 1 = 3, 1 + 2 = 4, 3 + 1 = 5, 1 + 2 = 6 — Ha ha ha! In us


In [8]:
# 使用第三方库封装好的函数生成文本
res = model.llm.generate(
 input_ids=ids['input_ids'], max_new_tokens=20,
 do_sample=True, top_k=0)[0]
print(tokenizer.decode(res, skip_special_tokens=True))

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


1 + 2 = 3, 2 + 1 = 3, 1 + 2 = 4 without action FARMADAM (same) Wooden child Servant use Intel SOCKS+


In [9]:
loss = -1 * model(ids['input_ids'])
# 将报错,因为torch.multinomial不可微
loss.backward()

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

In [10]:
# 验证gumbel_softmax可以近似torch.multinomial
logits = torch.randn(1, 5)
probs = F.softmax(logits, dim=-1)
# 使用torch.multinomial生成结果
y = torch.multinomial(probs, num_samples=10000, replacement=True)
print(torch.histogram(y.float(), bins=5).hist)
# 使用gumbel_softmax生成结果
gumbel_y = torch.argmax(F.gumbel_softmax(logits.repeat(10000, 1), tau=1, hard=True), dim=-1, keepdim=True)
print(torch.histogram(gumbel_y.float(), bins=5).hist)

tensor([ 928., 926., 1631., 340., 6175.])
tensor([ 996., 865., 1616., 314., 6209.])


In [11]:
class RLModelWithGumbel(nn.Module):
 
 def __init__(self, llm, r_model):
 '''
 大语言模型与评分模型的拼接(没有明显错误的方式,但也不是合适的方式)
 参数
 ----
 llm :大语言模型
 r_model :评分模型
 '''
 super().__init__()
 self.llm = llm
 self.r_model = r_model
 # 冻结模型
 for param in r_model.parameters():
 param.requires_grad = False
 
 def generate(self, idx, max_new_tokens):
 '''
 利用大语言模型生成文本(反复使用模型进行预测)
 参数
 ----
 idx :torch.LongTensor,背景文本,形状为(1, T)
 max_new_tokens :int,生成文本的最大长度
 返回
 ----
 idx :torch.LongTensor,背景文本 + 生成文本,形状为(1, T+L),其中L是生成文本的长度
 ans :torch.LongTensor,生成文本,形状为(1, L, vs),其中vs是字典的大小
 '''
 model = self.llm
 ans = None
 for _ in range(max_new_tokens):
 logits = model(input_ids=idx).logits
 logits = logits[:, -1, :]
 # 根据概率,随机生成下一个词元
 idx_next_hot = F.gumbel_softmax(logits, tau=1, hard=True) # (1, vs)
 # torch.argmax不可微,所以idx不可微
 idx_next = torch.argmax(idx_next_hot, dim=-1, keepdim=True)
 idx = torch.cat((idx, idx_next.long()), dim=1)
 idx_next_hot = idx_next_hot.unsqueeze(1) # (1, 1, vs)
 if ans == None:
 ans = idx_next_hot
 else:
 ans = torch.cat((ans, idx_next_hot), dim=1)
 return idx, ans
 
 def forward(self, idx):
 '''
 利用大语言模型生成文本,再使用评分模型对生成文本进行评分
 参数
 ----
 idx :torch.LongTensor,背景文本,形状为(1, T)
 返回
 ----
 reward :torch.FloatTensor,评分,形状为(1, 1)
 '''
 # 为了代码简洁,我们设置产生文本的长度
 _, ans = self.generate(idx, 20)
 # 对生成的文本进行评分
 reward = self.r_model(ans)
 return reward

In [12]:
model_gumbel = RLModelWithGumbel(llm, r_model)

In [13]:
# 验证generate函数是否正确
idx, ans = model_gumbel.generate(ids['input_ids'], 20)
# 验证idx和ans的重叠部分是否相同
print(idx[:, ids['input_ids'].shape[1]:] == torch.argmax(ans, dim=-1, keepdim=True).squeeze(-1))
print(tokenizer.decode(idx[0], skip_special_tokens=True))

tensor([[True, True, True, True, True, True, True, True, True, True, True, True,
 True, True, True, True, True, True, True, True]])
1 + 2 = 3, 2 + 1 = 3, 1 + 2 = 0, 1 + 1 = 0; extends laugh(cow, decision, discount) fifth person,


In [14]:
# 验证评分模型计算正确
model_gumbel.r_model(idx[:, ids['input_ids'].shape[1]:]), model_gumbel.r_model(ans)

(tensor([[-0.2085]]), tensor([[-0.2085]], grad_fn=))

In [15]:
loss = -1 * model_gumbel(ids['input_ids'])
# 成功运行反向传播算法
loss.backward()
list(model_gumbel.llm.parameters())[0].grad

tensor([[ 2.3994e-06, 4.8380e-06, 3.5403e-06, ..., 4.4225e-06,
 -1.5709e-06, 4.8997e-06],
 [ 4.4208e-05, 1.3246e-04, 1.4072e-05, ..., 7.9197e-05,
 -1.4321e-06, -6.9506e-06],
 [ 7.8832e-06, 5.7550e-06, -1.3545e-07, ..., 5.6032e-06,
 -5.2948e-06, 1.6141e-06],
 ...,
 [ 6.0610e-10, 9.2871e-10, 3.8407e-10, ..., 1.6127e-09,
 -1.6454e-09, -8.2414e-10],
 [-1.5970e-09, 4.7921e-09, 6.8945e-09, ..., 7.0852e-09,
 -7.1524e-09, -1.9468e-09],
 [ 3.6735e-04, 2.7833e-04, 3.1601e-05, ..., 1.5014e-05,
 3.1863e-04, -2.6312e-04]])