In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
%matplotlib inline


torch.manual_seed(12046)

<torch._C.Generator at 0x7fdbc884f330>

In [2]:
# 准备数据
dataset = datasets.MNIST(root='./mnist', train=True, download=True, transform=transforms.ToTensor())
# 将数据划分成训练集、验证集、测试集
train_set, val_set = random_split(dataset, [50000, 10000])
test_set = datasets.MNIST(root='./mnist', train=False, download=True, transform=transforms.ToTensor())
len(train_set), len(val_set), len(test_set)

(50000, 10000, 10000)

In [3]:
# 构建数据读取器
train_loader = DataLoader(train_set, batch_size=500, shuffle=True)
val_loader = DataLoader(val_set, batch_size=500, shuffle=True)
test_loader = DataLoader(test_set, batch_size=500, shuffle=True)

In [4]:
eval_iters = 10


def estimate_loss(model):
    re = {}
    # 将模型切换为评估模式
    model.eval()
    re['train'] = _loss(model, train_loader)
    re['val'] = _loss(model, val_loader)
    re['test'] = _loss(model, test_loader)
    # 将模型切换为训练模式
    model.train()
    return re

    
@torch.no_grad()
def _loss(model, dataloader):
    # 估算模型效果
    loss = []
    acc = []
    data_iter = iter(dataloader)
    for t in range(eval_iters):
        inputs, labels = next(data_iter)
        # inputs: (500, 1, 28, 28)
        # labels: (500)
        B, C, H, W = inputs.shape
        #logits = model(inputs.view(B, -1))
        logits = model(inputs)
        loss.append(F.cross_entropy(logits, labels))
        # preds = torch.argmax(F.softmax(logits, dim=-1), dim=-1)
        preds = torch.argmax(logits, dim=-1)
        acc.append((preds == labels).sum() / B)
    re = {
        'loss': torch.tensor(loss).mean().item(),
        'acc': torch.tensor(acc).mean().item()
    }
    return re

In [5]:
def train_model(model, optimizer, epochs=10, penalty=False):
    lossi = []
    for e in range(epochs):
        for data in train_loader:
            inputs, labels = data
            #B, C, H, W = inputs.shape
            #logits = model(inputs.view(B, -1))
            logits = model(inputs)
            loss = F.cross_entropy(logits, labels)
            lossi.append(loss.item())
            if penalty:
                w = torch.cat([p.view(-1) for p in model.parameters()])
                loss += 0.001 * w.abs().sum() + 0.002 * w.square().sum()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        stats = estimate_loss(model)
        train_loss = f'{stats["train"]["loss"]:.3f}'
        val_loss = f'{stats["val"]["loss"]:.3f}'
        test_loss = f'{stats["test"]["loss"]:.3f}'
        print(f'epoch {e} train {train_loss} val {val_loss} test {test_loss}')
    return lossi

In [10]:
# 卷积层的几个小技巧
channels = torch.randint(1, 10, (1,))
conv1 = nn.Conv2d(channels, channels, (3, 3), stride=1, padding=1)
x = torch.randn(1, channels, 28, 28)
x.shape, conv1(x).shape

(torch.Size([1, 4, 28, 28]), torch.Size([1, 4, 28, 28]))

In [11]:
class ResidualBlockSimplified(nn.Module):
    
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, (3, 3), stride=1, padding=1)
        self.conv2 = nn.Conv2d(channels, channels, (3, 3), stride=1, padding=1)
        
    def forward(self, x):
        inputs = x
        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        # 残差连接
        out = x + inputs
        out = F.relu(out)
        return out

In [12]:
model = ResidualBlockSimplified(3)
x = torch.randn(1, 3, 28, 28)
model(x).shape

torch.Size([1, 3, 28, 28])

In [16]:
# 这两个卷积操作的输出是一样形状的
stride = torch.randint(1, 10, (1,))
in_channels = torch.randint(1, 10, (1,))
out_channels = torch.randint(1, 10, (1,))
conv1 = nn.Conv2d(in_channels, out_channels, (3, 3), stride=stride, padding=1)
conv2 = nn.Conv2d(in_channels, out_channels, (1, 1), stride=stride, padding=0)
x = torch.randn(1, in_channels, 28, 28)
conv1(x).shape, conv2(x).shape

(torch.Size([1, 2, 28, 28]), torch.Size([1, 2, 28, 28]))

In [21]:
class ResidualBlock(nn.Module):
    
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, (3, 3), stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, (3, 3), stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Conv2d(in_channels, out_channels, (1, 1), stride=stride, padding=0)
            self.bn3 = nn.BatchNorm2d(out_channels)
            
    def forward(self, x):
        inputs = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        if self.downsample is not None:
            inputs = self.bn3(self.downsample(inputs))
        out = x + inputs
        out = F.relu(out)
        return out

In [23]:
model = ResidualBlock(3, 5, 2)
x = torch.randn(1, 3, 7, 7)
model(x).shape

torch.Size([1, 5, 4, 4])

In [24]:
class ResNet(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.block1 = ResidualBlock(1, 20)
        self.block2 = ResidualBlock(20, 40, stride=2)
        self.block3 = ResidualBlock(40, 60, stride=2)
        self.block4 = ResidualBlock(60, 80, stride=2)
        self.block5 = ResidualBlock(80, 100, stride=2)
        self.block6 = ResidualBlock(100, 120, stride=2)
        self.fc = nn.Linear(120, 10)
        
    def forward(self, x):
        # x : (B, 1, 28, 28)
        B = x.shape[0]
        x = self.block1(x)  # (B, 20, 28, 28)
        x = self.block2(x)  # (B, 40, 14, 14)
        x = self.block3(x)  # (B, 60,  7,  7)
        x = self.block4(x)  # (B, 80,  4,  4)
        x = self.block5(x)  # (B, 100, 2,  2)
        x = self.block6(x)  # (B, 120, 1,  1)
        x = self.fc(x.view(B, -1))
        return x

In [25]:
model = ResNet()
x = torch.randn(10, 1, 28, 28)
model(x).shape

torch.Size([10, 10])

In [26]:
_ = train_model(model, optim.Adam(model.parameters(), lr=0.01), epochs=5)

epoch 0 train 0.055 val 0.057 test 0.058
epoch 1 train 0.038 val 0.046 test 0.047
epoch 2 train 0.024 val 0.040 test 0.037
epoch 3 train 0.023 val 0.048 test 0.041
epoch 4 train 0.045 val 0.067 test 0.067


In [27]:
estimate_loss(model)

{'train': {'loss': 0.05183951184153557, 'acc': 0.9844000935554504},
 'val': {'loss': 0.07237933576107025, 'acc': 0.9765999913215637},
 'test': {'loss': 0.06279060989618301, 'acc': 0.9815999865531921}}