basic.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. import math
  2. import warnings
  3. import torch
  4. import torch.nn as nn
  5. def _trunc_normal_(tensor, mean, std, a, b):
  6. """Copy from timm"""
  7. def norm_cdf(x):
  8. return (1. + math.erf(x / math.sqrt(2.))) / 2.
  9. if (mean < a - 2 * std) or (mean > b + 2 * std):
  10. warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
  11. "The distribution of values may be incorrect.",
  12. stacklevel=2)
  13. l = norm_cdf((a - mean) / std)
  14. u = norm_cdf((b - mean) / std)
  15. tensor.uniform_(2 * l - 1, 2 * u - 1)
  16. tensor.erfinv_()
  17. tensor.mul_(std * math.sqrt(2.))
  18. tensor.add_(mean)
  19. tensor.clamp_(min=a, max=b)
  20. return tensor
  21. def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.):
  22. """Copy from timm"""
  23. with torch.no_grad():
  24. return _trunc_normal_(tensor, mean, std, a, b)
  25. def box_xyxy_to_cxcywh(x):
  26. x0, y0, x1, y1 = x.unbind(-1)
  27. b = [(x0 + x1) / 2, (y0 + y1) / 2, (x1 - x0), (y1 - y0)]
  28. return torch.stack(b, dim=-1)
  29. def delta2bbox(proposals,
  30. deltas,
  31. max_shape=None,
  32. wh_ratio_clip=16 / 1000,
  33. clip_border=True,
  34. add_ctr_clamp=False,
  35. ctr_clamp=32):
  36. dxy = deltas[..., :2]
  37. dwh = deltas[..., 2:]
  38. # Compute width/height of each roi
  39. pxy = proposals[..., :2]
  40. pwh = proposals[..., 2:]
  41. dxy_wh = pwh * dxy
  42. wh_ratio_clip = torch.as_tensor(wh_ratio_clip)
  43. max_ratio = torch.abs(torch.log(wh_ratio_clip)).item()
  44. if add_ctr_clamp:
  45. dxy_wh = torch.clamp(dxy_wh, max=ctr_clamp, min=-ctr_clamp)
  46. dwh = torch.clamp(dwh, max=max_ratio)
  47. else:
  48. dwh = dwh.clamp(min=-max_ratio, max=max_ratio)
  49. gxy = pxy + dxy_wh
  50. gwh = pwh * dwh.exp()
  51. x1y1 = gxy - (gwh * 0.5)
  52. x2y2 = gxy + (gwh * 0.5)
  53. bboxes = torch.cat([x1y1, x2y2], dim=-1)
  54. if clip_border and max_shape is not None:
  55. bboxes[..., 0::2].clamp_(min=0).clamp_(max=max_shape[1])
  56. bboxes[..., 1::2].clamp_(min=0).clamp_(max=max_shape[0])
  57. return bboxes
  58. # ----------------- Customed NormLayer Ops -----------------
  59. class FrozenBatchNorm2d(torch.nn.Module):
  60. def __init__(self, n):
  61. super(FrozenBatchNorm2d, self).__init__()
  62. self.register_buffer("weight", torch.ones(n))
  63. self.register_buffer("bias", torch.zeros(n))
  64. self.register_buffer("running_mean", torch.zeros(n))
  65. self.register_buffer("running_var", torch.ones(n))
  66. def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
  67. missing_keys, unexpected_keys, error_msgs):
  68. num_batches_tracked_key = prefix + 'num_batches_tracked'
  69. if num_batches_tracked_key in state_dict:
  70. del state_dict[num_batches_tracked_key]
  71. super(FrozenBatchNorm2d, self)._load_from_state_dict(
  72. state_dict, prefix, local_metadata, strict,
  73. missing_keys, unexpected_keys, error_msgs)
  74. def forward(self, x):
  75. # move reshapes to the beginning
  76. # to make it fuser-friendly
  77. w = self.weight.reshape(1, -1, 1, 1)
  78. b = self.bias.reshape(1, -1, 1, 1)
  79. rv = self.running_var.reshape(1, -1, 1, 1)
  80. rm = self.running_mean.reshape(1, -1, 1, 1)
  81. eps = 1e-5
  82. scale = w * (rv + eps).rsqrt()
  83. bias = b - rm * scale
  84. return x * scale + bias
  85. class LayerNorm2D(nn.Module):
  86. def __init__(self, normalized_shape, norm_layer=nn.LayerNorm):
  87. super().__init__()
  88. self.ln = norm_layer(normalized_shape) if norm_layer is not None else nn.Identity()
  89. def forward(self, x):
  90. """
  91. x: N C H W
  92. """
  93. x = x.permute(0, 2, 3, 1)
  94. x = self.ln(x)
  95. x = x.permute(0, 3, 1, 2)
  96. return x
  97. # ----------------- Basic CNN Ops -----------------
  98. def get_conv2d(c1, c2, k, p, s, g, bias=False):
  99. conv = nn.Conv2d(c1, c2, k, stride=s, padding=p, groups=g, bias=bias)
  100. return conv
  101. def get_activation(act_type=None):
  102. if act_type == 'relu':
  103. return nn.ReLU(inplace=True)
  104. elif act_type == 'lrelu':
  105. return nn.LeakyReLU(0.1, inplace=True)
  106. elif act_type == 'mish':
  107. return nn.Mish(inplace=True)
  108. elif act_type == 'silu':
  109. return nn.SiLU(inplace=True)
  110. elif act_type == 'gelu':
  111. return nn.GELU()
  112. elif act_type is None:
  113. return nn.Identity()
  114. else:
  115. raise NotImplementedError
  116. def get_norm(norm_type, dim):
  117. if norm_type == 'BN':
  118. return nn.BatchNorm2d(dim)
  119. elif norm_type == 'GN':
  120. return nn.GroupNorm(num_groups=32, num_channels=dim)
  121. elif norm_type is None:
  122. return nn.Identity()
  123. else:
  124. raise NotImplementedError
  125. class BasicConv(nn.Module):
  126. def __init__(self,
  127. in_dim, # in channels
  128. out_dim, # out channels
  129. kernel_size=1, # kernel size
  130. padding=0, # padding
  131. stride=1, # padding
  132. act_type :str = 'lrelu', # activation
  133. norm_type :str = 'BN', # normalization
  134. ):
  135. super(BasicConv, self).__init__()
  136. add_bias = False if norm_type else True
  137. self.conv = get_conv2d(in_dim, out_dim, k=kernel_size, p=padding, s=stride, g=1, bias=add_bias)
  138. self.norm = get_norm(norm_type, out_dim)
  139. self.act = get_activation(act_type)
  140. def forward(self, x):
  141. return self.act(self.norm(self.conv(x)))
  142. class UpSampleWrapper(nn.Module):
  143. """Upsample last feat map to specific stride."""
  144. def __init__(self, in_dim, upsample_factor):
  145. super(UpSampleWrapper, self).__init__()
  146. # ---------- Basic parameters ----------
  147. self.upsample_factor = upsample_factor
  148. # ---------- Network parameters ----------
  149. if upsample_factor == 1:
  150. self.upsample = nn.Identity()
  151. else:
  152. scale = int(math.log2(upsample_factor))
  153. dim = in_dim
  154. layers = []
  155. for _ in range(scale-1):
  156. layers += [
  157. nn.ConvTranspose2d(dim, dim // 2, kernel_size=2, stride=2),
  158. LayerNorm2D(dim // 2),
  159. nn.GELU()
  160. ]
  161. dim = dim // 2
  162. layers += [nn.ConvTranspose2d(dim, dim // 2, kernel_size=2, stride=2)]
  163. dim = dim // 2
  164. self.upsample = nn.Sequential(*layers)
  165. self.out_dim = dim
  166. def forward(self, x):
  167. x = self.upsample(x)
  168. return x
  169. # ----------------- MLP modules -----------------
  170. class MLP(nn.Module):
  171. def __init__(self, in_dim, hidden_dim, out_dim, num_layers):
  172. super().__init__()
  173. self.num_layers = num_layers
  174. h = [hidden_dim] * (num_layers - 1)
  175. self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([in_dim] + h, h + [out_dim]))
  176. def forward(self, x):
  177. for i, layer in enumerate(self.layers):
  178. x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
  179. return x
  180. class FFN(nn.Module):
  181. def __init__(self, d_model=256, mlp_ratio=4.0, dropout=0., act_type='relu', pre_norm=False):
  182. super().__init__()
  183. # ----------- Basic parameters -----------
  184. self.pre_norm = pre_norm
  185. self.fpn_dim = round(d_model * mlp_ratio)
  186. # ----------- Network parameters -----------
  187. self.linear1 = nn.Linear(d_model, self.fpn_dim)
  188. self.activation = get_activation(act_type)
  189. self.dropout2 = nn.Dropout(dropout)
  190. self.linear2 = nn.Linear(self.fpn_dim, d_model)
  191. self.dropout3 = nn.Dropout(dropout)
  192. self.norm = nn.LayerNorm(d_model)
  193. def forward(self, src):
  194. if self.pre_norm:
  195. src = self.norm(src)
  196. src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
  197. src = src + self.dropout3(src2)
  198. else:
  199. src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
  200. src = src + self.dropout3(src2)
  201. src = self.norm(src)
  202. return src
  203. # ----------------- Attention Ops -----------------
  204. class GlobalCrossAttention(nn.Module):
  205. def __init__(
  206. self,
  207. dim :int = 256,
  208. num_heads :int = 8,
  209. qkv_bias :bool = True,
  210. qk_scale :float = None,
  211. attn_drop :float = 0.0,
  212. proj_drop :float = 0.0,
  213. rpe_hidden_dim :int = 512,
  214. feature_stride :int = 16,
  215. ):
  216. super().__init__()
  217. # --------- Basic parameters ---------
  218. self.dim = dim
  219. self.num_heads = num_heads
  220. head_dim = dim // num_heads
  221. self.scale = qk_scale or head_dim ** -0.5
  222. self.feature_stride = feature_stride
  223. # --------- Network parameters ---------
  224. self.cpb_mlp1 = self.build_cpb_mlp(2, rpe_hidden_dim, num_heads)
  225. self.cpb_mlp2 = self.build_cpb_mlp(2, rpe_hidden_dim, num_heads)
  226. self.q = nn.Linear(dim, dim, bias=qkv_bias)
  227. self.k = nn.Linear(dim, dim, bias=qkv_bias)
  228. self.v = nn.Linear(dim, dim, bias=qkv_bias)
  229. self.attn_drop = nn.Dropout(attn_drop)
  230. self.proj = nn.Linear(dim, dim)
  231. self.proj_drop = nn.Dropout(proj_drop)
  232. self.softmax = nn.Softmax(dim=-1)
  233. def build_cpb_mlp(self, in_dim, hidden_dim, out_dim):
  234. cpb_mlp = nn.Sequential(nn.Linear(in_dim, hidden_dim, bias=True),
  235. nn.ReLU(inplace=True),
  236. nn.Linear(hidden_dim, out_dim, bias=False))
  237. return cpb_mlp
  238. def forward(
  239. self,
  240. query,
  241. reference_points,
  242. k_input_flatten,
  243. v_input_flatten,
  244. input_spatial_shapes,
  245. input_padding_mask=None,
  246. ):
  247. assert input_spatial_shapes.size(0) == 1, 'This is designed for single-scale decoder.'
  248. h, w = input_spatial_shapes[0]
  249. stride = self.feature_stride
  250. ref_pts = torch.cat([
  251. reference_points[:, :, :, :2] - reference_points[:, :, :, 2:] / 2,
  252. reference_points[:, :, :, :2] + reference_points[:, :, :, 2:] / 2,
  253. ], dim=-1) # B, nQ, 1, 4
  254. pos_x = torch.linspace(0.5, w - 0.5, w, dtype=torch.float32, device=w.device)[None, None, :, None] * stride # 1, 1, w, 1
  255. pos_y = torch.linspace(0.5, h - 0.5, h, dtype=torch.float32, device=h.device)[None, None, :, None] * stride # 1, 1, h, 1
  256. delta_x = ref_pts[..., 0::2] - pos_x # B, nQ, w, 2
  257. delta_y = ref_pts[..., 1::2] - pos_y # B, nQ, h, 2
  258. rpe_x, rpe_y = self.cpb_mlp1(delta_x), self.cpb_mlp2(delta_y) # B, nQ, w/h, nheads
  259. rpe = (rpe_x[:, :, None] + rpe_y[:, :, :, None]).flatten(2, 3) # B, nQ, h, w, nheads -> B, nQ, h*w, nheads
  260. rpe = rpe.permute(0, 3, 1, 2)
  261. B_, N, C = k_input_flatten.shape
  262. k = self.k(k_input_flatten).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
  263. v = self.v(v_input_flatten).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
  264. B_, N, C = query.shape
  265. q = self.q(query).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
  266. q = q * self.scale
  267. attn = q @ k.transpose(-2, -1)
  268. attn += rpe
  269. if input_padding_mask is not None:
  270. attn += input_padding_mask[:, None, None] * -100
  271. fmin, fmax = torch.finfo(attn.dtype).min, torch.finfo(attn.dtype).max
  272. torch.clip_(attn, min=fmin, max=fmax)
  273. attn = self.softmax(attn)
  274. attn = self.attn_drop(attn)
  275. x = attn @ v
  276. x = x.transpose(1, 2).reshape(B_, N, C)
  277. x = self.proj(x)
  278. x = self.proj_drop(x)
  279. return x