basic.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import numpy as np
  2. import torch
  3. import torch.nn as nn
  4. import torch.nn.functional as F
  5. # ---------------------------- NMS ----------------------------
  6. ## basic NMS
  7. def nms(bboxes, scores, nms_thresh):
  8. """"Pure Python NMS."""
  9. x1 = bboxes[:, 0] #xmin
  10. y1 = bboxes[:, 1] #ymin
  11. x2 = bboxes[:, 2] #xmax
  12. y2 = bboxes[:, 3] #ymax
  13. areas = (x2 - x1) * (y2 - y1)
  14. order = scores.argsort()[::-1]
  15. keep = []
  16. while order.size > 0:
  17. i = order[0]
  18. keep.append(i)
  19. # compute iou
  20. xx1 = np.maximum(x1[i], x1[order[1:]])
  21. yy1 = np.maximum(y1[i], y1[order[1:]])
  22. xx2 = np.minimum(x2[i], x2[order[1:]])
  23. yy2 = np.minimum(y2[i], y2[order[1:]])
  24. w = np.maximum(1e-10, xx2 - xx1)
  25. h = np.maximum(1e-10, yy2 - yy1)
  26. inter = w * h
  27. iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-14)
  28. #reserve all the boundingbox whose ovr less than thresh
  29. inds = np.where(iou <= nms_thresh)[0]
  30. order = order[inds + 1]
  31. return keep
  32. ## class-agnostic NMS
  33. def multiclass_nms_class_agnostic(scores, labels, bboxes, nms_thresh):
  34. # nms
  35. keep = nms(bboxes, scores, nms_thresh)
  36. scores = scores[keep]
  37. labels = labels[keep]
  38. bboxes = bboxes[keep]
  39. return scores, labels, bboxes
  40. ## class-aware NMS
  41. def multiclass_nms_class_aware(scores, labels, bboxes, nms_thresh, num_classes):
  42. # nms
  43. keep = np.zeros(len(bboxes), dtype=np.int32)
  44. for i in range(num_classes):
  45. inds = np.where(labels == i)[0]
  46. if len(inds) == 0:
  47. continue
  48. c_bboxes = bboxes[inds]
  49. c_scores = scores[inds]
  50. c_keep = nms(c_bboxes, c_scores, nms_thresh)
  51. keep[inds[c_keep]] = 1
  52. keep = np.where(keep > 0)
  53. scores = scores[keep]
  54. labels = labels[keep]
  55. bboxes = bboxes[keep]
  56. return scores, labels, bboxes
  57. ## multi-class NMS
  58. def multiclass_nms(scores, labels, bboxes, nms_thresh, num_classes, class_agnostic=False):
  59. if class_agnostic:
  60. return multiclass_nms_class_agnostic(scores, labels, bboxes, nms_thresh)
  61. else:
  62. return multiclass_nms_class_aware(scores, labels, bboxes, nms_thresh, num_classes)
  63. # ----------------- MLP modules -----------------
  64. class MLP(nn.Module):
  65. def __init__(self, in_dim, hidden_dim, out_dim, num_layers):
  66. super().__init__()
  67. self.num_layers = num_layers
  68. h = [hidden_dim] * (num_layers - 1)
  69. self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([in_dim] + h, h + [out_dim]))
  70. def forward(self, x):
  71. for i, layer in enumerate(self.layers):
  72. x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
  73. return x
  74. class FFN(nn.Module):
  75. def __init__(self, d_model=256, ffn_dim=1024, dropout=0., act_type='relu'):
  76. super().__init__()
  77. self.ffn_dim = ffn_dim
  78. self.linear1 = nn.Linear(d_model, self.ffn_dim)
  79. self.activation = get_activation(act_type)
  80. self.dropout2 = nn.Dropout(dropout)
  81. self.linear2 = nn.Linear(self.ffn_dim, d_model)
  82. self.dropout3 = nn.Dropout(dropout)
  83. self.norm = nn.LayerNorm(d_model)
  84. def forward(self, src):
  85. src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
  86. src = src + self.dropout3(src2)
  87. src = self.norm(src)
  88. return src
  89. # ----------------- Basic CNN Ops -----------------
  90. def get_conv2d(c1, c2, k, p, s, g, bias=False):
  91. conv = nn.Conv2d(c1, c2, k, stride=s, padding=p, groups=g, bias=bias)
  92. return conv
  93. def get_activation(act_type=None):
  94. if act_type == 'relu':
  95. return nn.ReLU(inplace=True)
  96. elif act_type == 'lrelu':
  97. return nn.LeakyReLU(0.1, inplace=True)
  98. elif act_type == 'mish':
  99. return nn.Mish(inplace=True)
  100. elif act_type == 'silu':
  101. return nn.SiLU(inplace=True)
  102. elif act_type == 'gelu':
  103. return nn.GELU()
  104. elif act_type is None:
  105. return nn.Identity()
  106. else:
  107. raise NotImplementedError
  108. def get_norm(norm_type, dim):
  109. if norm_type == 'BN':
  110. return nn.BatchNorm2d(dim)
  111. elif norm_type == 'GN':
  112. return nn.GroupNorm(num_groups=32, num_channels=dim)
  113. elif norm_type is None:
  114. return nn.Identity()
  115. else:
  116. raise NotImplementedError
  117. def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
  118. """3x3 convolution with padding"""
  119. return nn.Conv2d(
  120. in_planes,
  121. out_planes,
  122. kernel_size=3,
  123. stride=stride,
  124. padding=dilation,
  125. groups=groups,
  126. bias=False,
  127. dilation=dilation,
  128. )
  129. def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
  130. """1x1 convolution"""
  131. return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
  132. class FrozenBatchNorm2d(torch.nn.Module):
  133. def __init__(self, n):
  134. super(FrozenBatchNorm2d, self).__init__()
  135. self.register_buffer("weight", torch.ones(n))
  136. self.register_buffer("bias", torch.zeros(n))
  137. self.register_buffer("running_mean", torch.zeros(n))
  138. self.register_buffer("running_var", torch.ones(n))
  139. def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
  140. missing_keys, unexpected_keys, error_msgs):
  141. num_batches_tracked_key = prefix + 'num_batches_tracked'
  142. if num_batches_tracked_key in state_dict:
  143. del state_dict[num_batches_tracked_key]
  144. super(FrozenBatchNorm2d, self)._load_from_state_dict(
  145. state_dict, prefix, local_metadata, strict,
  146. missing_keys, unexpected_keys, error_msgs)
  147. def forward(self, x):
  148. # move reshapes to the beginning
  149. # to make it fuser-friendly
  150. w = self.weight.reshape(1, -1, 1, 1)
  151. b = self.bias.reshape(1, -1, 1, 1)
  152. rv = self.running_var.reshape(1, -1, 1, 1)
  153. rm = self.running_mean.reshape(1, -1, 1, 1)
  154. eps = 1e-5
  155. scale = w * (rv + eps).rsqrt()
  156. bias = b - rm * scale
  157. return x * scale + bias
  158. class BasicConv(nn.Module):
  159. def __init__(self,
  160. in_dim, # in channels
  161. out_dim, # out channels
  162. kernel_size=1, # kernel size
  163. padding=0, # padding
  164. stride=1, # padding
  165. act_type :str = 'lrelu', # activation
  166. norm_type :str = 'BN', # normalization
  167. ):
  168. super(BasicConv, self).__init__()
  169. add_bias = False if norm_type else True
  170. self.conv = get_conv2d(in_dim, out_dim, k=kernel_size, p=padding, s=stride, g=1, bias=add_bias)
  171. self.norm = get_norm(norm_type, out_dim)
  172. self.act = get_activation(act_type)
  173. def forward(self, x):
  174. return self.act(self.norm(self.conv(x)))
  175. class DepthwiseConv(nn.Module):
  176. def __init__(self,
  177. in_dim, # in channels
  178. out_dim, # out channels
  179. kernel_size=1, # kernel size
  180. padding=0, # padding
  181. stride=1, # padding
  182. act_type :str = None, # activation
  183. norm_type :str = 'BN', # normalization
  184. ):
  185. super(DepthwiseConv, self).__init__()
  186. assert in_dim == out_dim
  187. add_bias = False if norm_type else True
  188. self.conv = get_conv2d(in_dim, out_dim, k=kernel_size, p=padding, s=stride, g=out_dim, bias=add_bias)
  189. self.norm = get_norm(norm_type, out_dim)
  190. self.act = get_activation(act_type)
  191. def forward(self, x):
  192. return self.act(self.norm(self.conv(x)))
  193. class PointwiseConv(nn.Module):
  194. def __init__(self,
  195. in_dim, # in channels
  196. out_dim, # out channels
  197. act_type :str = 'lrelu', # activation
  198. norm_type :str = 'BN', # normalization
  199. ):
  200. super(DepthwiseConv, self).__init__()
  201. assert in_dim == out_dim
  202. add_bias = False if norm_type else True
  203. self.conv = get_conv2d(in_dim, out_dim, k=1, p=0, s=1, g=1, bias=add_bias)
  204. self.norm = get_norm(norm_type, out_dim)
  205. self.act = get_activation(act_type)
  206. def forward(self, x):
  207. return self.act(self.norm(self.conv(x)))
  208. # ----------------- CNN Modules -----------------
  209. class RepVggBlock(nn.Module):
  210. def __init__(self, in_dim, out_dim, act_type='relu', norm_type='BN'):
  211. super().__init__()
  212. self.in_dim = in_dim
  213. self.out_dim = out_dim
  214. self.conv1 = BasicConv(in_dim, out_dim, kernel_size=3, padding=1, act_type=None, norm_type=norm_type)
  215. self.conv2 = BasicConv(in_dim, out_dim, kernel_size=1, padding=0, act_type=None, norm_type=norm_type)
  216. self.act = get_activation(act_type)
  217. def forward(self, x):
  218. if hasattr(self, 'conv'):
  219. y = self.conv(x)
  220. else:
  221. y = self.conv1(x) + self.conv2(x)
  222. return self.act(y)
  223. def convert_to_deploy(self):
  224. if not hasattr(self, 'conv'):
  225. self.conv = nn.Conv2d(self.in_dim, self.out_dim, 3, 1, padding=1)
  226. kernel, bias = self.get_equivalent_kernel_bias()
  227. self.conv.weight.data = kernel
  228. self.conv.bias.data = bias
  229. def get_equivalent_kernel_bias(self):
  230. kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1)
  231. kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2)
  232. return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1), bias3x3 + bias1x1
  233. def _pad_1x1_to_3x3_tensor(self, kernel1x1):
  234. if kernel1x1 is None:
  235. return 0
  236. else:
  237. return F.pad(kernel1x1, [1, 1, 1, 1])
  238. def _fuse_bn_tensor(self, branch: BasicConv):
  239. if branch is None:
  240. return 0, 0
  241. kernel = branch.conv.weight
  242. running_mean = branch.norm.running_mean
  243. running_var = branch.norm.running_var
  244. gamma = branch.norm.weight
  245. beta = branch.norm.bias
  246. eps = branch.norm.eps
  247. std = (running_var + eps).sqrt()
  248. t = (gamma / std).reshape(-1, 1, 1, 1)
  249. return kernel * t, beta - running_mean * gamma / std
  250. class RepRTCBlock(nn.Module):
  251. def __init__(self,
  252. in_dim,
  253. out_dim,
  254. num_blocks = 3,
  255. expansion = 1.0,
  256. act_type = 'silu',
  257. norm_type = 'BN',
  258. ) -> None:
  259. super(RepRTCBlock, self).__init__()
  260. self.inter_dim = round(out_dim * expansion)
  261. self.conv1 = BasicConv(in_dim, self.inter_dim, kernel_size=1, act_type=act_type, norm_type=norm_type)
  262. self.conv2 = BasicConv(in_dim, self.inter_dim, kernel_size=1, act_type=act_type, norm_type=norm_type)
  263. self.module = nn.ModuleList([RepVggBlock(self.inter_dim, self.inter_dim, act_type, norm_type)
  264. for _ in range(num_blocks)])
  265. self.conv3 = BasicConv(self.inter_dim, out_dim, kernel_size=3, padding=1, act_type=act_type, norm_type=norm_type)
  266. def forward(self, x):
  267. # Input proj
  268. x1 = self.conv1(x)
  269. x2 = self.conv2(x)
  270. # Core module
  271. out = [x1]
  272. for m in self.module:
  273. x2 = m(x2)
  274. out.append(x2)
  275. # Output proj
  276. out = self.conv3(sum(out))
  277. return out