{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "ATlvvGTUto8T", "outputId": "48d62cee-4f18-4e75-913a-fcbc97366c04" }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "import torch.optim as optim\n", "from torch.utils.data import DataLoader\n", "from datasets import load_dataset\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline\n", "\n", "\n", "torch.manual_seed(12046)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "id": "UV7wHrEFto8V" }, "outputs": [], "source": [ "class LSTMCell(nn.Module):\n", "\n", " def __init__(self, input_size, hidden_size):\n", " '''\n", " 长短期记忆网络的神经元\n", " 参数\n", " ----\n", " input_size :int,输入数据的特征长度\n", " hidden_size :int,隐藏状态的特征长度\n", " '''\n", " super().__init__()\n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " combined_size = self.input_size + self.hidden_size\n", " # 定义输入门的线性部分\n", " self.in_gate = nn.Linear(combined_size, self.hidden_size)\n", " # 定义遗忘门的线性部分\n", " self.forget_gate = nn.Linear(combined_size, self.hidden_size)\n", " # 定义备选细胞状态的线性部分\n", " self.new_cell_state = nn.Linear(combined_size, self.hidden_size)\n", " # 定义输出门的线性部分\n", " self.out_gate = nn.Linear(combined_size, self.hidden_size)\n", "\n", " def forward(self, inputs, state=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " inputs :torch.FloatTensor\n", " 输入数据,形状为(B, I),其中B表示批量大小,I表示文字特征的长度(input_size)\n", " state :tuple(torch.FloatTensor, torch.FloatTensor)\n", " (隐藏状态,细胞状态),两个状态的形状都为(B, H),其中H表示隐藏状态的长度(hidden_size)\n", " 返回\n", " ----\n", " hs :torch.FloatTensor,隐藏状态,形状为(B, H)\n", " cs :torch.FloatTensor,细胞状态,形状为(B, H)\n", " '''\n", " B, _ = inputs.shape\n", " if state is None:\n", " state = self.init_state(B, inputs.device)\n", " hs, cs = state\n", " combined = torch.cat((inputs, hs), dim=1) # (B, I + H)\n", " # 输入门\n", " ingate = F.sigmoid(self.in_gate(combined)) # (B, H)\n", " # 遗忘门\n", " forgetgate = F.sigmoid(self.forget_gate(combined)) # (B, H)\n", " # 输出门\n", " outgate = F.sigmoid(self.out_gate(combined)) # (B, H)\n", " # 更新细胞状态\n", " ncs = F.tanh(self.new_cell_state(combined)) # (B, H)\n", " cs = (forgetgate * cs) + (ingate * ncs) # (B, H)\n", " # 更新隐藏状态\n", " hs = outgate * F.tanh(cs) # (B, H)\n", " return hs, cs\n", "\n", " def init_state(self, B, device):\n", " # 默认的隐藏状态和细胞状态全部都等于0\n", " cs = torch.zeros((B, self.hidden_size), device=device)\n", " hs = torch.zeros((B, self.hidden_size), device=device)\n", " return hs, cs\n", "\n", "class LSTM(nn.Module):\n", "\n", " def __init__(self, input_size, hidden_size):\n", " '''\n", " 单层的长短期记忆网络(支持批量计算)\n", " 参数\n", " ----\n", " input_size :int,输入数据的特征长度\n", " hidden_size :int,隐藏状态的特征长度\n", " '''\n", " super().__init__()\n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " self.lstm = LSTMCell(self.input_size, self.hidden_size)\n", "\n", " def forward(self, inputs, state=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " inputs :torch.FloatTensor\n", " 输入数据的集合,形状为(B, T, C),其中B表示批量大小,T表示文本长度,C表示文字特征的长度(input_size)\n", " state :tuple(torch.FloatTensor, torch.FloatTensor)\n", " (初始的隐藏状态,初始的细胞状态),两个状态的形状都为(B, H),其中H表示隐藏状态的长度(hidden_size)\n", " 返回\n", " ----\n", " hidden :torch.FloatTensor,所有隐藏状态的集合,形状为(B, T, H)\n", " '''\n", " re = []\n", " B, T, C = inputs.shape\n", " inputs = inputs.transpose(0, 1) # (T, B, C)\n", " for i in range(T):\n", " state = self.lstm(inputs[i], state)\n", " # 只记录隐藏状态,state[0]的形状为(B, H)\n", " re.append(state[0])\n", " result_tensor = torch.stack(re, dim=0) # (T, B, H)\n", " return result_tensor.transpose(0, 1) # (B, T, H)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "aGr01cqBto8W", "outputId": "ce3ac010-0ac5-4dde-fb31-4543e7f6c500" }, "outputs": [ { "data": { "text/plain": [ "(tensor(True), (2, 17, 15, 16, 15))" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def test_lstm():\n", " '''\n", " 测试LSTM实现的准确性\n", " '''\n", " # 随机生成模型结构\n", " B, T, input_size, hidden_size, num_layers = torch.randint(1, 20, (5,)).tolist()\n", " ref_model = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)\n", " # 随机生成输入\n", " inputs = torch.randn(B, T, input_size)\n", " hs, cs = torch.randn((2 * num_layers, B, hidden_size)).chunk(2, 0)\n", " _hs = list((i.squeeze(0) for i in hs))\n", " _cs = list((i.squeeze(0) for i in cs))\n", " re = inputs\n", " # 取出模型参数\n", " for layer_index in range(num_layers):\n", " l = ref_model.all_weights[layer_index]\n", " if layer_index == 0:\n", " model = LSTM(input_size, hidden_size)\n", " else:\n", " model = LSTM(hidden_size, hidden_size)\n", " i, f, c, o = torch.cat((l[0], l[1]), dim=1).chunk(4, 0)\n", " ib, fb, cb, ob = (l[2] + l[3]).chunk(4, 0)\n", " # 设置模型参数\n", " model.lstm.in_gate.weight = nn.Parameter(i)\n", " model.lstm.in_gate.bias = nn.Parameter(ib)\n", " model.lstm.forget_gate.weight = nn.Parameter(f)\n", " model.lstm.forget_gate.bias = nn.Parameter(fb)\n", " model.lstm.new_cell_state.weight = nn.Parameter(c)\n", " model.lstm.new_cell_state.bias = nn.Parameter(cb)\n", " model.lstm.out_gate.weight = nn.Parameter(o)\n", " model.lstm.out_gate.bias = nn.Parameter(ob)\n", " # 计算隐藏状态\n", " re = model(re, (_hs[layer_index], _cs[layer_index]))\n", " ref_re, _ = ref_model(inputs, (hs, cs))\n", " # 验证计算结果(最后一层的隐藏状态是否一致)\n", " out = torch.all(torch.abs(re - ref_re) < 1e-4)\n", " return out, (B, T, input_size, hidden_size, num_layers)\n", "\n", "test_lstm()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "id": "FDmaQtfbto8Z" }, "outputs": [], "source": [ "# 一些超参数\n", "learning_rate = 1e-3\n", "eval_iters = 10\n", "batch_size=1000\n", "sequence_len=64\n", "# 如果有GPU,该脚本将使用GPU进行计算\n", "device = 'cuda' if torch.cuda.is_available() else 'cpu'" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "iKwo5iGnto8Z", "outputId": "33b6bdaa-b2b1-43eb-f318-c40ff76be96d" }, "outputs": [ { "data": { "text/plain": [ "98" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "raw_datasets = load_dataset(\"code_search_net\", \"python\")\n", "datasets = raw_datasets['train'].filter(lambda x: 'apache/spark' in x['repository_name'])\n", "\n", "class char_tokenizer:\n", "\n", " def __init__(self, data):\n", " # 数据中出现的所有字符构成字典\n", " chars = sorted(list(set(''.join(data))))\n", " # 预留一个位置给结尾的特殊字符\n", " self.char2ind = {s : i + 1 for i, s in enumerate(chars)}\n", " self.char2ind['<|e|>'] = 0\n", " self.ind2char = {i : s for s, i in self.char2ind.items()}\n", "\n", " def encode(self, text):\n", " return [self.char2ind[c] for c in text]\n", "\n", " def decode(self, enc):\n", " if isinstance(enc, int):\n", " return self.ind2char[enc]\n", " return [self.ind2char[i] for i in enc]\n", "\n", "tok = char_tokenizer(datasets['whole_func_string'])\n", "len(tok.char2ind)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "id": "GS3Cfz2wto8a" }, "outputs": [], "source": [ "class CharLSTM(nn.Module):\n", "\n", " def __init__(self, vs):\n", " '''\n", " 三层的长短期记忆网络\n", " 参数\n", " ----\n", " vs :int,字典大小\n", " '''\n", " super().__init__()\n", " # 定义文字嵌入的特征长度\n", " self.emb_size = 256\n", " # 定义隐藏状态的特征长度\n", " self.hidden_size = 128\n", " # 文字嵌入层\n", " self.embedding = nn.Embedding(vs, self.emb_size)\n", " # 随机失活\n", " self.dp = nn.Dropout(0.4)\n", " # 第一层长短期记忆网络\n", " self.lstm1 = LSTM(self.emb_size, self.hidden_size)\n", " # 层归一化\n", " self.norm1 = nn.LayerNorm(self.hidden_size)\n", " self.lstm2 = LSTM(self.hidden_size, self.hidden_size)\n", " self.norm2 = nn.LayerNorm(self.hidden_size)\n", " self.lstm3 = LSTM(self.hidden_size, self.hidden_size)\n", " self.norm3 = nn.LayerNorm(self.hidden_size)\n", " # 语言建模头,根据最后一层的隐藏状态预测下一个字母是什么\n", " self.h2o = nn.Linear(self.hidden_size, vs)\n", "\n", " def forward(self, x):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " x :torch.LongTensor,当前字母在字典中的位置,形状为(B, T)\n", " 返回\n", " ----\n", " output :torch.FloatTensor,预测结果的logits,形状为(B, T, vs)\n", " '''\n", " emb = self.embedding(x) # (B, T, C)\n", " h = self.norm1(self.dp(self.lstm1(emb))) # (B, T, H)\n", " # 第一层的隐藏状态是第二层的输入\n", " h = self.norm2(self.dp(self.lstm2(h))) # (B, T, H)\n", " # 第二层的隐藏状态是第三层的输入\n", " h = self.norm3(self.dp(self.lstm3(h))) # (B, T, H)\n", " # 使用第三层的隐藏状态预测下一个字母是什么\n", " output = self.h2o(h) # (B, T, vs)\n", " return output\n", "\n", "model = CharLSTM(len(tok.char2ind)).to(device)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "zc5jC4dxto8a", "outputId": "44079c5c-396e-490f-fe69-47d8cffafdd9" }, "outputs": [ { "data": { "text/plain": [ "CharLSTM(\n", " (embedding): Embedding(98, 256)\n", " (dp): Dropout(p=0.4, inplace=False)\n", " (lstm1): LSTM(\n", " (lstm): LSTMCell(\n", " (in_gate): Linear(in_features=384, out_features=128, bias=True)\n", " (forget_gate): Linear(in_features=384, out_features=128, bias=True)\n", " (new_cell_state): Linear(in_features=384, out_features=128, bias=True)\n", " (out_gate): Linear(in_features=384, out_features=128, bias=True)\n", " )\n", " )\n", " (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)\n", " (lstm2): LSTM(\n", " (lstm): LSTMCell(\n", " (in_gate): Linear(in_features=256, out_features=128, bias=True)\n", " (forget_gate): Linear(in_features=256, out_features=128, bias=True)\n", " (new_cell_state): Linear(in_features=256, out_features=128, bias=True)\n", " (out_gate): Linear(in_features=256, out_features=128, bias=True)\n", " )\n", " )\n", " (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)\n", " (lstm3): LSTM(\n", " (lstm): LSTMCell(\n", " (in_gate): Linear(in_features=256, out_features=128, bias=True)\n", " (forget_gate): Linear(in_features=256, out_features=128, bias=True)\n", " (new_cell_state): Linear(in_features=256, out_features=128, bias=True)\n", " (out_gate): Linear(in_features=256, out_features=128, bias=True)\n", " )\n", " )\n", " (norm3): LayerNorm((128,), eps=1e-05, elementwise_affine=True)\n", " (h2o): Linear(in_features=128, out_features=98, bias=True)\n", ")" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 展示模型结构\n", "model" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "id": "SfRdGvxoto8a" }, "outputs": [], "source": [ "@torch.no_grad()\n", "def generate_batch(model, idx, max_new_tokens=300):\n", " '''\n", " 利用模型生成文本(反复使用模型进行预测)\n", " 参数\n", " ----\n", " model :CharLSTM,生成文本的模型\n", " idx :torch.LongTensor,当前字母在字典中的位置,形状为(1, T)\n", " max_new_tokens :int,生成文本的最大长度\n", " 返回\n", " ----\n", " out :list[int],生成的文本\n", " '''\n", " # 将模型切换至评估模式\n", " model.eval()\n", " for _ in range(max_new_tokens):\n", " # 限制背景长度,使之与模型训练时的状况更相符\n", " # 当然也可以不限制\n", " context = idx[:, -sequence_len:]\n", " # 在文本生成时,模型的计算效率很低,因为有很多重复计算\n", " logits = model(context)\n", " # 只使用最后一个预测结果\n", " logits = logits[:, -1, :]\n", " probs = F.softmax(logits, dim=-1)\n", " # 根据模型预测的概率,得到最终的预测结果(下一个字母)\n", " # 这一步运算有一定随机性\n", " ix = torch.multinomial(probs, num_samples=1)\n", " idx = torch.cat((idx, ix), dim=1)\n", " if ix.item() == 0:\n", " break\n", " # 将模型切换至训练模式\n", " model.train()\n", " return idx.tolist()[0]" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "DfHreqdJto8a", "outputId": "b05db6d9-a1e0-4695-e814-702448958986" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def*$O(h/of(\"YP{so.8G|1w=3:1'ZS?z9)N[{3Q=CKfAM:iEca\";+Q31SDax1zk<|e|>\n" ] } ], "source": [ "# 使用模型来生成文本\n", "begin_text = torch.tensor(tok.encode('def'), device=device).unsqueeze(0)\n", "print(''.join(tok.decode(generate_batch(model, begin_text))))" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "ngIplGT3to8b", "outputId": "50fa082b-6a34-4a73-ab87-43d7bada48ee" }, "outputs": [ { "data": { "text/plain": [ "(torch.Size([605913, 64]), torch.Size([605913, 64]))" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def process(data, sequence_len=sequence_len):\n", " '''\n", " 根据文本生成训练数据\n", " '''\n", " # text是字符串列表\n", " text = data['whole_func_string']\n", " inputs, labels = [], []\n", " for i in text:\n", " enc = tok.encode(i)\n", " # 0对应着文本结束\n", " enc += [0]\n", " # 将文本转换为多个训练数据\n", " for i in range(len(enc) - sequence_len):\n", " inputs.append(enc[i: i + sequence_len])\n", " # 预测标签是下一个字母,因此只需要挪动一个位置即可\n", " labels.append(enc[i + 1: i + 1 + sequence_len])\n", " return {'inputs': inputs, 'labels': labels}\n", "\n", "# 将数据分为训练集和测试集\n", "tokenized = datasets.train_test_split(test_size=0.1, seed=1024, shuffle=True)\n", "# 将文本转换为训练数据,里面包含inputs和labels\n", "tokenized = tokenized.map(process, batched=True, remove_columns=datasets.column_names)\n", "tokenized.set_format(type='torch', device=device)\n", "\n", "tokenized['train']['inputs'].shape, tokenized['train']['labels'].shape" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "cFPy9_AWto8b", "outputId": "91eed388-c6a1-4872-c993-514d055da2a1" }, "outputs": [ { "data": { "text/plain": [ "{'inputs': tensor([[71, 80, 88, ..., 43, 48, 40],\n", " [82, 57, 75, ..., 71, 78, 71],\n", " [91, 2, 85, ..., 85, 71, 86],\n", " ...,\n", " [71, 2, 54, ..., 79, 71, 65],\n", " [ 2, 2, 2, ..., 1, 1, 2],\n", " [84, 75, 80, ..., 85, 86, 84]], device='cuda:0'),\n", " 'labels': tensor([[80, 88, 2, ..., 48, 40, 49],\n", " [57, 75, 86, ..., 78, 71, 79],\n", " [ 2, 85, 71, ..., 71, 86, 10],\n", " ...,\n", " [ 2, 54, 91, ..., 71, 65, 65],\n", " [ 2, 2, 4, ..., 1, 2, 2],\n", " [75, 80, 73, ..., 86, 84, 75]], device='cuda:0')}" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 构建数据读取器\n", "train_loader = DataLoader(tokenized['train'], batch_size=batch_size, shuffle=True)\n", "test_loader = DataLoader(tokenized['test'], batch_size=batch_size, shuffle=True)\n", "# 获取一个批量的数据\n", "next(iter(test_loader))" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "oXR279ncto8b", "outputId": "bb83d85b-a4e9-44ee-b11d-1db0c0a2d471" }, "outputs": [ { "data": { "text/plain": [ "{'train': 4.7519965171813965, 'test': 4.765100002288818}" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def estimate_loss(model):\n", " re = {}\n", " # 将模型切换至评估模式\n", " model.eval()\n", " re['train'] = _loss(model, train_loader)\n", " re['test'] = _loss(model, test_loader)\n", " # 将模型切换至训练模式\n", " model.train()\n", " return re\n", "\n", "@torch.no_grad()\n", "def _loss(model, data_loader):\n", " \"\"\"\n", " 计算模型在不同数据集下面的评估指标\n", " \"\"\"\n", " loss = []\n", " data_iter= iter(data_loader)\n", " # 随机使用多个批量数据来预估模型效果\n", " for k in range(eval_iters):\n", " data = next(data_iter, None)\n", " if data is None:\n", " data_iter = iter(data_loader)\n", " data = next(data_iter, None)\n", " inputs, labels = data['inputs'], data['labels']\n", " logits = model(inputs)\n", " # 根据cross_entropy的定义,需要对logits进行转置运算\n", " # 具体细节请参考cross_entropy的官方文档\n", " logits = logits.transpose(-2, -1)\n", " loss.append(F.cross_entropy(logits, labels).item())\n", " return torch.tensor(loss).mean().item()\n", "\n", "estimate_loss(model)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "id": "5PpRhC4Oto8c" }, "outputs": [], "source": [ "def train_lstm(model, optimizer, data_loader, epochs=10):\n", " lossi = []\n", " for epoch in range(epochs):\n", " for i, data in enumerate(data_loader, 0):\n", " inputs, labels = data['inputs'], data['labels']\n", " optimizer.zero_grad()\n", " logits = model(inputs)\n", " # 根据cross_entropy的定义,需要对logits进行转置运算\n", " # 具体细节请参考cross_entropy的官方文档\n", " logits = logits.transpose(-2, -1)\n", " loss = F.cross_entropy(logits, labels)\n", " lossi.append(loss.item())\n", " loss.backward()\n", " optimizer.step()\n", " # 评估模型,并输出结果\n", " stats = estimate_loss(model)\n", " train_loss = f'train loss {stats[\"train\"]:.4f}'\n", " test_loss = f'test loss {stats[\"test\"]:.4f}'\n", " print(f'epoch {epoch:>2}: {train_loss}, {test_loss}')\n", " return lossi" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "WUVdQEcAto8c", "outputId": "63073b27-622e-45e0-bfaf-7a2c3827b5fc" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "epoch 0: train loss 1.2563, test loss 1.4122\n", "epoch 1: train loss 1.1342, test loss 1.3129\n", "epoch 2: train loss 1.0388, test loss 1.2483\n", "epoch 3: train loss 0.9971, test loss 1.2172\n", "epoch 4: train loss 0.9649, test loss 1.2048\n", "epoch 5: train loss 0.9491, test loss 1.1944\n", "epoch 6: train loss 0.9319, test loss 1.1899\n", "epoch 7: train loss 0.9200, test loss 1.1925\n", "epoch 8: train loss 0.9045, test loss 1.1841\n", "epoch 9: train loss 0.8960, test loss 1.1883\n" ] } ], "source": [ "l = train_lstm(model, optim.Adam(model.parameters(), lr=learning_rate), train_loader)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 448 }, "id": "SQ7oEi-Pto8c", "outputId": "2f987df6-3036-47c0-f8ed-139be38bf2aa" }, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "plt.plot(torch.tensor(l).view(-1, 10).mean(1).numpy())" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "353K8yRito8c", "outputId": "128ac4eb-84da-4f6f-b171-2d5960769630" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def partition(uvalue, sockColumn):\n", " raise ValueError(\"No :class:`Row``, over a a transform when condition and Convert a SQL types at :func:`DataFrame.ifSchema.toAurecordType` to the specified reverse to the given formSt (b, len(goneuter)).map(self._java_matrix_wrapper._jdf.groupBy(lambda v: \n" ] } ], "source": [ "# 使用模型来生成文本\n", "begin_text = torch.tensor(tok.encode('def'), device=device).unsqueeze(0)\n", "print(''.join(tok.decode(generate_batch(model, begin_text))))" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "id": "ojDIISnWto8c" }, "outputs": [], "source": [ "# 将层归一化放到在LSTM神经元里面\n", "class LSTMLayerNormCell(nn.Module):\n", "\n", " def __init__(self, input_size, hidden_size):\n", " '''\n", " 长短期记忆网络的神经元(内含层归一化)\n", " 参数\n", " ----\n", " input_size :int,输入数据的特征长度\n", " hidden_size :int,隐藏状态的特征长度\n", " '''\n", " super().__init__()\n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " combined_size = self.input_size + self.hidden_size\n", " # 将四个线性模块放在一起定义,使得代码更加简洁和高效\n", " self.gates = nn.Linear(\n", " combined_size, 4 * self.hidden_size, bias=False)\n", " # 用于门的层归一化\n", " self.ln_gates = nn.LayerNorm(4 * self.hidden_size)\n", " # 用于细胞状态的层归一化\n", " self.ln_c = nn.LayerNorm(self.hidden_size)\n", "\n", " def forward(self, inputs, state=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " inputs :torch.FloatTensor\n", " 输入数据,形状为(B, I),其中B表示批量大小,I表示文字特征的长度(input_size)\n", " state :tuple(torch.FloatTensor, torch.FloatTensor)\n", " (隐藏状态,细胞状态),两个状态的形状都为(B, H),其中H表示隐藏状态的长度(hidden_size)\n", " 返回\n", " ----\n", " hs :torch.FloatTensor,隐藏状态,形状为(B, H)\n", " cs :torch.FloatTensor,细胞状态,形状为(B, H)\n", " '''\n", " B, _ = inputs.shape\n", " if state is None:\n", " state = self.init_state(B, inputs.device)\n", " hs, cs = state\n", " combined = torch.cat((inputs, hs), dim=1) # (B, I + H)\n", " # 将四个线性模块分开\n", " i, f, c, o = self.ln_gates(self.gates(combined)).chunk(4, 1)\n", " # 输入门\n", " ingate = F.sigmoid(i) # (B, H)\n", " # 遗忘门\n", " forgetgate = F.sigmoid(f) # (B, H)\n", " # 输出门\n", " outgate = F.sigmoid(o) # (B, H)\n", " # 更新细胞状态\n", " ncs = F.tanh(c) # (B, H)\n", " cs = self.ln_c((forgetgate * cs) + (ingate * ncs)) # (B, H)\n", " # 更新隐藏状态\n", " hs = outgate * F.tanh(cs) # (B, H)\n", " return hs, cs\n", "\n", " def init_state(self, B, device):\n", " cs = torch.zeros((B, self.hidden_size), device=device)\n", " hs = torch.zeros((B, self.hidden_size), device=device)\n", " return hs, cs\n", "\n", "class LSTMLayerNorm(nn.Module):\n", "\n", " def __init__(self, input_size, hidden_size):\n", " '''\n", " 单层的长短期记忆网络(支持批量计算且内含层归一化)\n", " 参数\n", " ----\n", " input_size :int,输入数据的特征长度\n", " hidden_size :int,隐藏状态的特征长度\n", " '''\n", " super().__init__()\n", " self.input_size = input_size\n", " self.hidden_size = hidden_size\n", " self.lstm = LSTMLayerNormCell(self.input_size, self.hidden_size)\n", "\n", " def forward(self, inputs, state=None):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " inputs :torch.FloatTensor\n", " 输入数据的集合,形状为(B, T, C),其中B表示批量大小,T表示文本长度,C表示文字特征的长度(input_size)\n", " state :tuple(torch.FloatTensor, torch.FloatTensor)\n", " (初始的隐藏状态,初始的细胞状态),两个状态的形状都为(B, H),其中H表示隐藏状态的长度(hidden_size)\n", " 返回\n", " ----\n", " hidden :torch.FloatTensor,所有隐藏状态的集合,形状为(B, T, H)\n", " '''\n", " re = []\n", " B, T, C = inputs.shape\n", " inputs = inputs.transpose(0, 1) # (T, B, C)\n", " for i in range(T):\n", " state = self.lstm(inputs[i], state)\n", " # 只记录隐藏状态,state[0]的形状为(B, H)\n", " re.append(state[0])\n", " result_tensor = torch.stack(re, dim=0) # (T, B, H)\n", " return result_tensor.transpose(0, 1) # (B, T, H)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "id": "wJ2ypYx6to8d" }, "outputs": [], "source": [ "class CharLSTMLayerNorm(nn.Module):\n", "\n", " def __init__(self, vs):\n", " '''\n", " 三层的长短期记忆网络(内嵌层归一化)\n", " 参数\n", " ----\n", " vs :int,字典大小\n", " '''\n", " super().__init__()\n", " self.emb_size = 256\n", " self.hidden_size = 128\n", " self.embedding = nn.Embedding(vs, self.emb_size)\n", " self.dp = nn.Dropout(0.4)\n", " self.lstm1 = LSTMLayerNorm(self.emb_size, self.hidden_size)\n", " self.lstm2 = LSTMLayerNorm(self.hidden_size, self.hidden_size)\n", " self.lstm3 = LSTMLayerNorm(self.hidden_size, self.hidden_size)\n", " self.h2o = nn.Linear(self.hidden_size, vs)\n", "\n", " def forward(self, x):\n", " '''\n", " 向前传播\n", " 参数\n", " ----\n", " x :torch.LongTensor,当前字母在字典中的位置,形状为(B, T)\n", " 返回\n", " ----\n", " output :torch.FloatTensor,预测结果的logits,形状为(B, T, vs)\n", " '''\n", " emb = self.embedding(x) # (B, T, C)\n", " h = self.dp(self.lstm1(emb)) # (B, T, H)\n", " h = self.dp(self.lstm2(h)) # (B, T, H)\n", " h = self.dp(self.lstm3(h)) # (B, T, H)\n", " output = self.h2o(h) # (B, T, vs)\n", " return output\n", "\n", "model_norm = CharLSTMLayerNorm(len(tok.char2ind)).to(device)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "khqYG4hYto8d", "outputId": "154fe844-b093-41b7-b31f-c26bf703a460" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "epoch 0: train loss 1.1342, test loss 1.2981\n", "epoch 1: train loss 0.9863, test loss 1.1823\n", "epoch 2: train loss 0.9283, test loss 1.1456\n", "epoch 3: train loss 0.8908, test loss 1.1117\n", "epoch 4: train loss 0.8742, test loss 1.1144\n", "epoch 5: train loss 0.8481, test loss 1.0984\n", "epoch 6: train loss 0.8359, test loss 1.0962\n", "epoch 7: train loss 0.8202, test loss 1.0890\n", "epoch 8: train loss 0.8229, test loss 1.0829\n", "epoch 9: train loss 0.8128, test loss 1.0881\n" ] } ], "source": [ "l_norm = train_lstm(model_norm, optim.Adam(model_norm.parameters(), lr=learning_rate),\n", " train_loader)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 448 }, "id": "GngdCJHkto8d", "outputId": "ab705301-e8fb-4ba4-a6f8-c9075dce6c5d" }, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "plt.plot(torch.tensor(l_norm).view(-1, 10).mean(1).numpy())" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "w87iEsiato8d", "outputId": "49c389d9-c457-4d3e-fb17-d8e67341b6ab" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def numValues(tempfile.mkdtemp(), df.j1).collect()\n", " [Row(age2=5, name=u'Alice')]\n", " \"\"\"\n", " if len(other) >= 3:\n", " raise ValueError(\"Correlation in a sustance with the bases to and batching with Value thes no data,\n", " while heap is expected.\n", " \"\"\"\n", " def returnTy\n" ] } ], "source": [ "# 使用模型来生成文本\n", "begin_text = torch.tensor(tok.encode('def '), device=device).unsqueeze(0)\n", "print(''.join(tok.decode(generate_batch(model_norm, begin_text))))" ] } ], "metadata": { "accelerator": "GPU", "colab": { "gpuType": "V100", "provenance": [] }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 1 }