In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
%matplotlib inline


torch.manual_seed(12046)

<torch._C.Generator at 0x1301103b0>

In [2]:
# 准备数据
dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
# 将数据划分成训练集、验证集、测试集
train_set, val_set = random_split(dataset, [50000, 10000])
test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())
len(train_set), len(val_set), len(test_set)

(50000, 10000, 10000)

In [3]:
# 构建数据读取器
train_loader = DataLoader(train_set, batch_size=500, shuffle=True)
val_loader = DataLoader(val_set, batch_size=500, shuffle=True)
test_loader = DataLoader(test_set, batch_size=500, shuffle=True)

In [4]:
# 卷积层的几个小技巧

# 这个卷积操作，输入和输出的形状是一样的
conv3 = nn.Conv2d(3, 3, (3, 3), stride=1, padding=1)
x = torch.randn(1, 3, 28, 28)
print(x.size(), conv3(x).size())

# 这两个卷积操作输出的形状是一样的
stride = torch.randint(0, 10, (1,))
conv1 = nn.Conv2d(3, 4, (3, 3), stride=stride, padding=1)
conv2 = nn.Conv2d(3, 4, (1, 1), stride=stride, padding=0)
x = torch.randn(1, 3, 28, 28)
print(stride, conv1(x).size(), conv2(x).size())

torch.Size([1, 3, 28, 28]) torch.Size([1, 3, 28, 28])
tensor([7]) torch.Size([1, 4, 4, 4]) torch.Size([1, 4, 4, 4])


In [5]:
# 有漏洞的残差连接
class ResidualBlockBugVersion(nn.Module):
    
    def __init__(self, in_channel, out_channel, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channel, out_channel, (3, 3), 
            stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.conv2 = nn.Conv2d(
            out_channel, out_channel, (3, 3),
            stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
            
    def forward(self, x):
        inputs = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        # 残差连接
        ## 如果stride != 1或者in_channel != out_channel，
        ## 下面的计算会出错，因为out和inputs的形状不一样
        out += inputs
        out = F.relu(out)
        return out
    
m = ResidualBlockBugVersion(3, 3)
print(m(torch.randn(1, 3, 28, 28)).size())

# 报错
m = ResidualBlockBugVersion(3, 3, 2)
print(m(torch.randn(1, 3, 28, 28)).size())

torch.Size([1, 3, 28, 28])


RuntimeError: The size of tensor a (14) must match the size of tensor b (28) at non-singleton dimension 3

In [6]:
class ResidualBlock(nn.Module):
    
    def __init__(self, in_channel, out_channel, stride=1):
        '''
        定义残差块
        参数
        ----
        in_channel ：int，输入通道
        out_channel ：int，输出通道
        stride ：int，步幅大小
        '''
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channel, out_channel, (3, 3), 
            stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.conv2 = nn.Conv2d(
            out_channel, out_channel, (3, 3),
            stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = None
        # 如果stride != 1或者in_channel != out_channel，那么输入的形状和输出的形状不一样
        # 使用下面的变换使得两个张量的形状一样
        if stride != 1 or in_channel != out_channel:
            # 下面两个卷积操作的输出形状是一样的
            # Conv2d(in_channel, out_channel, (3, 3), stride, padding=1)
            # Conv2d(in_channel, out_channel, (1, 1), stride, padding=0)
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, (1, 1), stride=stride, bias=False),
                nn.BatchNorm2d(out_channel))
            
    def forward(self, x):
        '''
        向前传播
        参数
        ----
        x ：torch.FloatTensor，形状为(B, I, H, W)
        '''
        inputs = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        # 让输入(inputs)的形状和输出(out)的形状一样
        if self.downsample is not None:
            inputs = self.downsample(inputs)
        out += inputs
        out = F.relu(out)
        return out

m = ResidualBlock(3, 4, 2)
print(m(torch.randn(1, 3, 28, 28)).size())

torch.Size([1, 4, 14, 14])


In [7]:
class ResNet(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.block1 = ResidualBlock(1, 20)
        self.block2 = ResidualBlock(20, 40, stride=2)
        self.block3 = ResidualBlock(40, 60, stride=2)
        self.block4 = ResidualBlock(60, 80, stride=2)
        self.block5 = ResidualBlock(80, 100, stride=2)
        self.block6 = ResidualBlock(100, 120, stride=2)
        self.lm = nn.Linear(120, 10)

    def forward(self, x):
        '''
        向前传播
        参数
        ----
        x ：torch.FloatTensor，形状为(B, 1, 28, 28)
        '''
        x = self.block1(x) # (B,  20, 28, 28)
        x = self.block2(x) # (B,  40, 14, 14)
        x = self.block3(x) # (B,  60,  7,  7)
        x = self.block4(x) # (B,  60,  4,  4)
        x = self.block5(x) # (B,  60,  2,  2)
        x = self.block6(x) # (B, 120,  1,  1)
        out = self.lm(x.view(x.shape[0], -1)) # (B, 10)
        return out

model = ResNet()

In [8]:
eval_iters = 10

def estimate_loss(model):
    re = {}
    # 将模型切换至评估模式
    model.eval()
    re['train'] = _loss(model, train_loader)
    re['val'] = _loss(model, val_loader)
    re['test'] = _loss(model, test_loader)
    # 将模型切换至训练模式
    model.train()
    return re

@torch.no_grad()
def _loss(model, data_loader):
    """
    计算模型在不同数据集下面的评估指标
    """
    loss = []
    accuracy = []
    data_iter = iter(data_loader)
    for k in range(eval_iters):
        inputs, labels = next(data_iter)
        B = inputs.shape[0]
        logits = model(inputs)
        # 计算模型损失
        loss.append(F.cross_entropy(logits, labels))
        # 计算预测的准确率
        _, predicted = torch.max(logits, 1)
        accuracy.append((predicted == labels).sum() / B)
    re = {
        'loss': torch.tensor(loss).mean().item(),
        'accuracy': torch.tensor(accuracy).mean().item()
    }
    return re

In [9]:
def train_resnet(model, optimizer, data_loader, epochs=10, penalty=[]):
    lossi = []
    for epoch in range(epochs):
        for i, data in enumerate(data_loader, 0):
            inputs, labels = data
            optimizer.zero_grad()
            logits = model(inputs)
            loss = F.cross_entropy(logits, labels)
            lossi.append(loss.item())
            # 增加惩罚项
            for p in penalty:
                loss += p(model)
            loss.backward()
            optimizer.step()
        # 评估模型，并输出结果
        stats = estimate_loss(model)
        train_loss = f'train loss {stats["train"]["loss"]:.4f}'
        val_loss = f'val loss {stats["val"]["loss"]:.4f}'
        test_loss = f'test loss {stats["test"]["loss"]:.4f}'
        print(f'epoch {epoch:>2}: {train_loss}, {val_loss}, {test_loss}')
        train_acc = f'train acc {stats["train"]["accuracy"]:.4f}'
        val_acc = f'val acc {stats["val"]["accuracy"]:.4f}'
        test_acc = f'test acc {stats["test"]["accuracy"]:.4f}'
        print(f'{"":>10}{train_acc}, {val_acc}, {test_acc}')
    return lossi

In [10]:
stats = {}

In [11]:
stats['resnet'] = train_resnet(model, optim.Adam(model.parameters(), lr=0.01), train_loader, epochs=5)

epoch  0: train loss 0.0820, val loss 0.1000, test loss 0.0955
          train acc 0.9728, val acc 0.9646, test acc 0.9686
epoch  1: train loss 0.0826, val loss 0.0873, test loss 0.0864
          train acc 0.9746, val acc 0.9744, test acc 0.9734
epoch  2: train loss 0.0523, val loss 0.0689, test loss 0.0634
          train acc 0.9860, val acc 0.9798, test acc 0.9804
epoch  3: train loss 0.0781, val loss 0.1083, test loss 0.0830
          train acc 0.9762, val acc 0.9700, test acc 0.9754
epoch  4: train loss 0.0157, val loss 0.0412, test loss 0.0365
          train acc 0.9930, val acc 0.9870, test acc 0.9894
