| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import copy
- from typing import Optional
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from torch import nn, Tensor
- # ------------------------------- Basic Modules -------------------------------
- def get_activation(act_type=None):
- if act_type == 'relu':
- return nn.ReLU(inplace=True)
- elif act_type == 'gelu':
- return nn.GELU()
- elif act_type == 'lrelu':
- return nn.LeakyReLU(0.1, inplace=True)
- elif act_type == 'mish':
- return nn.Mish(inplace=True)
- elif act_type == 'silu':
- return nn.SiLU(inplace=True)
- def get_norm(norm_type, dim):
- if norm_type == 'BN':
- return nn.BatchNorm2d(dim)
- elif norm_type == 'GN':
- return nn.GroupNorm(num_groups=32, num_channels=dim)
- elif norm_type == 'LN':
- return nn.LayerNorm(dim)
- def get_clones(module, N):
- return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
-
- def build_multi_head_attention(d_model, num_heads, dropout, attn_type='mhsa'):
- if attn_type == 'mhsa':
- attn_layer = MultiHeadAttention(d_model, num_heads, dropout)
- elif attn_type == 's_mhsa':
- attn_layer = None
- return attn_layer
- # ------------------------------- MLP -------------------------------
- class MLP(nn.Module):
- """ Very simple multi-layer perceptron (also called FFN)"""
- def __init__(self, in_dim, hidden_dim, out_dim, num_layers):
- super().__init__()
- self.num_layers = num_layers
- h = [hidden_dim] * (num_layers - 1)
- self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([in_dim] + h, h + [out_dim]))
- def forward(self, x):
- for i, layer in enumerate(self.layers):
- x = F.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
- return x
- # ------------------------------- Transformer Modules -------------------------------
- ## Vanilla Multi-Head Attention
- class MultiHeadAttention(nn.Module):
- def __init__(self, d_model, num_heads, dropout=0.) -> None:
- super().__init__()
- # --------------- Basic parameters ---------------
- self.d_model = d_model
- self.num_heads = num_heads
- self.dropout = dropout
- self.scale = (d_model // num_heads) ** -0.5
- # --------------- Network parameters ---------------
- self.q_proj = nn.Linear(d_model, d_model, bias = False) # W_q, W_k, W_v
- self.k_proj = nn.Linear(d_model, d_model, bias = False) # W_q, W_k, W_v
- self.v_proj = nn.Linear(d_model, d_model, bias = False) # W_q, W_k, W_v
- self.out_proj = nn.Linear(d_model, d_model)
- self.dropout = nn.Dropout(dropout)
- def forward(self, query, key, value):
- """
- Inputs:
- query : (Tensor) -> [B, Nq, C]
- key : (Tensor) -> [B, Nk, C]
- value : (Tensor) -> [B, Nk, C]
- """
- bs = query.shape[0]
- Nq = query.shape[1]
- Nk = key.shape[1]
- # ----------------- Input proj -----------------
- query = self.q_proj(query)
- key = self.k_proj(key)
- value = self.v_proj(value)
- # ----------------- Multi-head Attn -----------------
- ## [B, N, C] -> [B, N, H, C_h] -> [B, H, N, C_h]
- query = query.view(bs, Nq, self.num_heads, self.d_model // self.num_heads)
- query = query.permute(0, 2, 1, 3).contiguous()
- key = key.view(bs, Nk, self.num_heads, self.d_model // self.num_heads)
- key = key.permute(0, 2, 1, 3).contiguous()
- value = value.view(bs, Nk, self.num_heads, self.d_model // self.num_heads)
- value = value.permute(0, 2, 1, 3).contiguous()
- # Attention
- ## [B, H, Nq, C_h] X [B, H, C_h, Nk] = [B, H, Nq, Nk]
- sim_matrix = torch.matmul(query, key.transpose(-1, -2)) * self.scale
- sim_matrix = torch.softmax(sim_matrix, dim=-1)
- # ----------------- Output -----------------
- out = torch.matmul(sim_matrix, value) # [B, H, Nq, C_h]
- out = out.permute(0, 2, 1, 3).contiguous().view(bs, Nq, -1)
- out = self.out_proj(out)
- return out
-
- ## Transformer Encoder layer
- class TREncoderLayer(nn.Module):
- def __init__(self,
- d_model,
- num_heads,
- dim_feedforward=2048,
- dropout=0.1,
- act_type="relu",
- attn_type='mhsa'
- ):
- super().__init__()
- # Multi-head Self-Attn
- self.self_attn = build_multi_head_attention(d_model, num_heads, dropout, attn_type)
- # Feedforwaed Network
- self.linear1 = nn.Linear(d_model, dim_feedforward)
- self.dropout = nn.Dropout(dropout)
- self.linear2 = nn.Linear(dim_feedforward, d_model)
- self.norm1 = nn.LayerNorm(d_model)
- self.norm2 = nn.LayerNorm(d_model)
- self.dropout1 = nn.Dropout(dropout)
- self.dropout2 = nn.Dropout(dropout)
- self.activation = get_activation(act_type)
- def with_pos_embed(self, tensor, pos: Optional[Tensor]):
- return tensor if pos is None else tensor + pos
- def forward(self, src, pos):
- """
- Input:
- src: [torch.Tensor] -> [B, N, C]
- pos: [torch.Tensor] -> [B, N, C]
- Output:
- src: [torch.Tensor] -> [B, N, C]
- """
- q = k = self.with_pos_embed(src, pos)
- # self-attn
- src2 = self.self_attn(q, k, value=src)
- # reshape: [B, N, C] -> [B, C, H, W]
- src = src + self.dropout1(src2)
- src = self.norm1(src)
- # ffpn
- src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
- src = src + self.dropout2(src2)
- src = self.norm2(src)
-
- return src
- ## Transformer Decoder layer
- class TRDecoderLayer(nn.Module):
- def __init__(self, d_model, num_heads, dim_feedforward=2048, dropout=0.1, act_type="relu", attn_type='mhsa'):
- super().__init__()
- # Multi-head Self-Attn
- self.self_attn = build_multi_head_attention(d_model, num_heads, dropout, attn_type)
- self.cross_attn = build_multi_head_attention(d_model, num_heads, dropout)
- # Feedforward Network
- self.linear1 = nn.Linear(d_model, dim_feedforward)
- self.dropout = nn.Dropout(dropout)
- self.linear2 = nn.Linear(dim_feedforward, d_model)
- self.norm1 = nn.LayerNorm(d_model)
- self.norm2 = nn.LayerNorm(d_model)
- self.norm3 = nn.LayerNorm(d_model)
- self.dropout1 = nn.Dropout(dropout)
- self.dropout2 = nn.Dropout(dropout)
- self.dropout3 = nn.Dropout(dropout)
- self.activation = get_activation(act_type)
- def with_pos_embed(self, tensor, pos: Optional[Tensor]):
- return tensor if pos is None else tensor + pos
- def forward(self, tgt, tgt_query_pos, memory, memory_pos):
- # self attention
- tgt2 = self.self_attn(
- query=self.with_pos_embed(tgt, tgt_query_pos),
- key=self.with_pos_embed(tgt, tgt_query_pos),
- value=tgt)[0]
- tgt = tgt + self.dropout1(tgt2)
- tgt = self.norm1(tgt)
- # cross attention
- tgt2 = self.cross_attn(
- query=self.with_pos_embed(tgt, tgt_query_pos),
- key=self.with_pos_embed(memory, memory_pos),
- value=memory)
- tgt = tgt + self.dropout2(tgt2)
- tgt = self.norm2(tgt)
- # ffn
- tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
- tgt = tgt + self.dropout3(tgt2)
- tgt = self.norm3(tgt)
-
- return tgt
|