{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "07W9tE81x6Ky", "outputId": "db9e9a68-2624-49e3-f419-688bd18e4a85" }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from datasets import load_dataset\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline\n", "\n", "\n", "torch.manual_seed(12046)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "id": "ChYX0H9jx6K2" }, "outputs": [], "source": [ "# 一些超参数\n", "learning_rate = 1e-3\n", "# 如果有GPU,该脚本将使用GPU进行计算\n", "device = 'cuda' if torch.cuda.is_available() else 'cpu'" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "id": "lrW4ceQUx6K3" }, "outputs": [], "source": [ "raw_datasets = load_dataset('code_search_net', 'python')\n", "datasets = raw_datasets['train'].filter(lambda x: 'apache/spark' in x['repository_name'])" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "id": "B1diphqUx6K3" }, "outputs": [], "source": [ "class char_tokenizer:\n", "\n", " def __init__(self, data):\n", " # 数据中出现的所有字符构成字典\n", " chars = sorted(list(set(''.join(data))))\n", " # 预留一个位置给结尾的特殊字符\n", " self.char2ind = {s : i + 1 for i, s in enumerate(chars)}\n", " self.char2ind['<|e|>'] = 0\n", " self.ind2char = {i : s for s, i in self.char2ind.items()}\n", "\n", " def encode(self, text):\n", " return [self.char2ind[c] for c in text]\n", "\n", " def decode(self, enc):\n", " if isinstance(enc, int):\n", " return self.ind2char[enc]\n", " return [self.ind2char[i] for i in enc]" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "YhaYe9rtx6K4", "outputId": "b6bea146-53d5-483c-b32e-a07d553e233e" }, "outputs": [ { "data": { "text/plain": [ "('def post(self):', 98)" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 举例验证分词器\n", "tok = char_tokenizer(datasets['whole_func_string'])\n", "example_text = 'def post(self):'\n", "''.join(tok.decode(tok.encode(example_text))), len(tok.char2ind)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "id": "9zIh_XMzx6K4" }, "outputs": [], "source": [ "class RNNCell(nn.Module):\n", "\n", " def __init__(self, input_size, hidden_size):\n", " '''\n", " 循环神经网络的神经元\n", " 参数\n", " ----\n", " input_size :int,输入数据的特征长度\n", " hidden_size :int,隐藏状态的特征长度\n", " '''\n", " super().__init__()\n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " combined_size = self.input_size + self.hidden_size\n", " # 使用线性回归模型,由输入数据和上一个隐藏状态得到当前的隐藏状态\n", " self.i2h = nn.Linear(combined_size, self.hidden_size)\n", "\n", " def forward(self, inputs, hidden=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " inputs :torch.FloatTensor,输入数据,形状为(1, I),I等于input_size\n", " hidden :torch.FloatTensor,上一个隐藏状态,形状为(1, H),H等于hidden_size\n", " 返回\n", " ----\n", " hidden :torch.FloatTensor,当前隐藏状态\n", " '''\n", " # 若无上一个隐藏状态,比如循环神经网络的开端,则生成默认的隐藏状态\n", " if hidden is None:\n", " hidden = self.init_hidden(inputs.device) # (1, H)\n", " combined = torch.cat((inputs, hidden), dim=1) # (1, I + H)\n", " hidden = F.relu(self.i2h(combined)) # (1, H)\n", " return hidden\n", "\n", " def init_hidden(self, device):\n", " # 默认的隐藏状态全部等于0\n", " # device用于控制生成张量的设备(cpu或gpu)\n", " return torch.zeros((1, self.hidden_size), device=device)\n", "\n", "\n", "class CharRNN(nn.Module):\n", "\n", " def __init__(self, vs):\n", " '''\n", " 单层的循环神经网络\n", " 参数\n", " ----\n", " vs :int,字典大小\n", " '''\n", " super().__init__()\n", " # 定义文字嵌入的特征长度\n", " self.emb_size = 30\n", " # 定义隐藏状态的特征长度\n", " self.hidden_size = 50\n", " # 文字嵌入层\n", " self.embedding = nn.Embedding(vs, self.emb_size)\n", " # 循环神经网络的神经元\n", " self.rnn = RNNCell(self.emb_size, self.hidden_size)\n", " # 语言建模头,根据隐藏状态预测下一个字母是什么\n", " self.h2o = nn.Linear(self.hidden_size, vs)\n", "\n", " def forward(self, x, hidden=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " x :torch.LongTensor,当前字母在字典中的位置,形状为(1)\n", " hidden :torch.FloatTensor,上一个隐藏状态,形状为(1, 50)\n", " 返回\n", " ----\n", " output :torch.FloatTensor,预测结果的logits,形状为(1, vs)\n", " hidden :torch.FloatTensor,当前隐藏状态\n", " '''\n", " emb = self.embedding(x) # (1, 30)\n", " # 得到当前的隐藏状态\n", " hidden = self.rnn(emb, hidden) # (1, 50)\n", " # 预测下一个字母\n", " output = self.h2o(hidden) # (1, vs)\n", " return output, hidden\n", "\n", "c_model = CharRNN(len(tok.char2ind)).to(device)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "frHZYPQSx6K6", "outputId": "a40e747e-8053-4f36-c5e8-02ec382d8fb4" }, "outputs": [ { "data": { "text/plain": [ "(torch.Size([1, 98]), torch.Size([1, 50]))" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 通过一个简单的例子验证模型搭建是否正确\n", "inputs = torch.tensor(tok.encode('d'), device=device)\n", "hidden = None\n", "logits, hidden = c_model(inputs, hidden)\n", "logits.shape, hidden.shape" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "id": "3uo7Pzlqx6K6" }, "outputs": [], "source": [ "@torch.no_grad()\n", "def generate(model, idx, max_new_tokens=300):\n", " '''\n", " 利用模型生成文本(反复使用模型进行预测)\n", " 参数\n", " ----\n", " model :CharRNN,生成文本的模型\n", " idx :torch.LongTensor,当前字母在字典中的位置,形状为(1)\n", " max_new_tokens :int,生成文本的最大长度\n", " 返回\n", " ----\n", " out :list[int],生成的文本\n", " '''\n", " out = idx.tolist()\n", " hidden = None\n", " # 将模型切换至评估模式\n", " model.eval()\n", " for _ in range(max_new_tokens):\n", " logits, hidden = model(idx, hidden)\n", " probs = F.softmax(logits, dim=-1)\n", " # 根据模型预测的概率,得到最终的预测结果(下一个字母)\n", " # 这一步运算有一定随机性\n", " ix = torch.multinomial(probs, num_samples=1)\n", " out.append(ix.item())\n", " # 注意调整模型输入的形状\n", " idx = ix.squeeze(0)\n", " if ix.item() == 0:\n", " break\n", " # 将模型切换至训练模式\n", " model.train()\n", " return out" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "QQ2MFkLWx6K6", "outputId": "ad58eda7-3141-427d-cbc9-ecf782f9ce04" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "d*ZO(F/of(\"YP{BZE G|uw=R:1'_$?Q9)N[{KQ=CKfAM:iKca\"|+Q3j\n" ] } ], "source": [ "# 使用模型来生成文本\n", "begin_text = torch.tensor(tok.encode('d'), device=device)\n", "print(''.join(tok.decode(generate(c_model, begin_text))))" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "F-OErxYDx6K6", "outputId": "ed49a77a-26c1-4e40-ac70-3a22c3a9a9cb" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78, 72, 11, 28],\n", " device='cuda:0')\n", "tensor([71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78, 72, 11, 28, 0],\n", " device='cuda:0')\n", "The input is tensor([70], device='cuda:0'), the implied input is tensor([70], device='cuda:0'), the target is 71\n", "The input is tensor([71], device='cuda:0'), the implied input is tensor([70, 71], device='cuda:0'), the target is 72\n", "The input is tensor([72], device='cuda:0'), the implied input is tensor([70, 71, 72], device='cuda:0'), the target is 2\n", "The input is tensor([2], device='cuda:0'), the implied input is tensor([70, 71, 72, 2], device='cuda:0'), the target is 82\n", "The input is tensor([82], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82], device='cuda:0'), the target is 81\n", "The input is tensor([81], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81], device='cuda:0'), the target is 85\n", "The input is tensor([85], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85], device='cuda:0'), the target is 86\n", "The input is tensor([86], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86], device='cuda:0'), the target is 10\n", "The input is tensor([10], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10], device='cuda:0'), the target is 85\n", "The input is tensor([85], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85], device='cuda:0'), the target is 71\n", "The input is tensor([71], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71], device='cuda:0'), the target is 78\n", "The input is tensor([78], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78], device='cuda:0'), the target is 72\n", "The input is tensor([72], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78, 72], device='cuda:0'), the target is 11\n", "The input is tensor([11], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78, 72, 11],\n", " device='cuda:0'), the target is 28\n", "The input is tensor([28], device='cuda:0'), the implied input is tensor([70, 71, 72, 2, 82, 81, 85, 86, 10, 85, 71, 78, 72, 11, 28],\n", " device='cuda:0'), the target is 0\n" ] } ], "source": [ "def encoding(text):\n", " '''\n", " 根据文本生成训练数据\n", " '''\n", " enc = tok.encode(text)\n", " # 0表示文本的结束\n", " return (torch.tensor(enc, device=device), torch.tensor(enc[1:] + [0], device=device))\n", "\n", "# 用一个简单的例子展示模型的训练数据\n", "inputs, labels = encoding(example_text)\n", "print(inputs)\n", "print(labels)\n", "for i in range(len(inputs)):\n", " context = inputs[:i + 1]\n", " target = labels[i]\n", " print(f'The input is {inputs[i].unsqueeze(0)}, ' +\n", " f'the implied input is {context}, the target is {target}')" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "id": "JDtx4jTix6K7" }, "outputs": [], "source": [ "# 记录模型的损失\n", "lossi = []\n", "epochs = 1\n", "optimizer = optim.Adam(c_model.parameters(), lr=learning_rate)\n", "\n", "for epoch in range(epochs):\n", " for data in datasets:\n", " inputs, labels = encoding(data['whole_func_string'])\n", " # 初始化\n", " hidden = None\n", " loss = torch.tensor([0.], device=device)\n", " optimizer.zero_grad()\n", " lens = inputs.shape[0]\n", " # 遍历文本,累加模型损失\n", " for i in range(lens):\n", " # 使用unsqueeze调整训练数据的形状\n", " logits, hidden = c_model(inputs[i].unsqueeze(0), hidden)\n", " loss += F.cross_entropy(logits, labels[i].unsqueeze(0)) / lens\n", " lossi.append(loss.item())\n", " loss.backward()\n", " optimizer.step()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 448 }, "id": "XHceUlaBx6K7", "outputId": "36e4c82d-4214-4d74-eac7-28489e595a7e" }, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "plt.plot(lossi)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "cPmymAa3x6K7", "outputId": "9e5348b0-c1a6-4b75-9f93-95b556b69711" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def.areartT civeif the _are _sext(am pestranatucbyessedevcet \"cot che = srd Not Fit the = Notintorn aresTiveb), essingparte = pe \"ome retiothe ted th ches %\"ist(mogthar:c_Sy\"\n", " or on Jngien pyrame dor it = veri/g():\n", " siltonf_ante \"\"\"\n", " sedenv)<|e|>\n" ] } ], "source": [ "# 使用模型来生成文本\n", "begin_text = torch.tensor(tok.encode('d'), device=device)\n", "print(''.join(tok.decode(generate(c_model, begin_text))))" ] } ], "metadata": { "accelerator": "GPU", "colab": { "gpuType": "V100", "provenance": [] }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 1 }