| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- import math
- import warnings
- import numpy as np
- import torch
- import torch.nn as nn
- def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.):
- """Copy from timm"""
- with torch.no_grad():
- """Copy from timm"""
- def norm_cdf(x):
- return (1. + math.erf(x / math.sqrt(2.))) / 2.
- if (mean < a - 2 * std) or (mean > b + 2 * std):
- warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
- "The distribution of values may be incorrect.",
- stacklevel=2)
- l = norm_cdf((a - mean) / std)
- u = norm_cdf((b - mean) / std)
- tensor.uniform_(2 * l - 1, 2 * u - 1)
- tensor.erfinv_()
- tensor.mul_(std * math.sqrt(2.))
- tensor.add_(mean)
- tensor.clamp_(min=a, max=b)
- return tensor
- def box_xyxy_to_cxcywh(x):
- x0, y0, x1, y1 = x.unbind(-1)
- b = [(x0 + x1) / 2, (y0 + y1) / 2, (x1 - x0), (y1 - y0)]
-
- return torch.stack(b, dim=-1)
- def delta2bbox(proposals,
- deltas,
- max_shape=None,
- wh_ratio_clip=16 / 1000,
- clip_border=True,
- add_ctr_clamp=False,
- ctr_clamp=32):
- dxy = deltas[..., :2]
- dwh = deltas[..., 2:]
- # Compute width/height of each roi
- pxy = proposals[..., :2]
- pwh = proposals[..., 2:]
- dxy_wh = pwh * dxy
- wh_ratio_clip = torch.as_tensor(wh_ratio_clip)
- max_ratio = torch.abs(torch.log(wh_ratio_clip)).item()
-
- if add_ctr_clamp:
- dxy_wh = torch.clamp(dxy_wh, max=ctr_clamp, min=-ctr_clamp)
- dwh = torch.clamp(dwh, max=max_ratio)
- else:
- dwh = dwh.clamp(min=-max_ratio, max=max_ratio)
- gxy = pxy + dxy_wh
- gwh = pwh * dwh.exp()
- x1y1 = gxy - (gwh * 0.5)
- x2y2 = gxy + (gwh * 0.5)
- bboxes = torch.cat([x1y1, x2y2], dim=-1)
- if clip_border and max_shape is not None:
- bboxes[..., 0::2].clamp_(min=0).clamp_(max=max_shape[1])
- bboxes[..., 1::2].clamp_(min=0).clamp_(max=max_shape[0])
- return bboxes
- # ---------------------------- NMS ----------------------------
- ## basic NMS
- def nms(bboxes, scores, nms_thresh):
- """"Pure Python NMS."""
- x1 = bboxes[:, 0] #xmin
- y1 = bboxes[:, 1] #ymin
- x2 = bboxes[:, 2] #xmax
- y2 = bboxes[:, 3] #ymax
- areas = (x2 - x1) * (y2 - y1)
- order = scores.argsort()[::-1]
- keep = []
- while order.size > 0:
- i = order[0]
- keep.append(i)
- # compute iou
- xx1 = np.maximum(x1[i], x1[order[1:]])
- yy1 = np.maximum(y1[i], y1[order[1:]])
- xx2 = np.minimum(x2[i], x2[order[1:]])
- yy2 = np.minimum(y2[i], y2[order[1:]])
- w = np.maximum(1e-10, xx2 - xx1)
- h = np.maximum(1e-10, yy2 - yy1)
- inter = w * h
- iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-14)
- #reserve all the boundingbox whose ovr less than thresh
- inds = np.where(iou <= nms_thresh)[0]
- order = order[inds + 1]
- return keep
- ## class-agnostic NMS
- def multiclass_nms_class_agnostic(scores, labels, bboxes, nms_thresh):
- # nms
- keep = nms(bboxes, scores, nms_thresh)
- scores = scores[keep]
- labels = labels[keep]
- bboxes = bboxes[keep]
- return scores, labels, bboxes
- ## class-aware NMS
- def multiclass_nms_class_aware(scores, labels, bboxes, nms_thresh, num_classes):
- # nms
- keep = np.zeros(len(bboxes), dtype=np.int32)
- for i in range(num_classes):
- inds = np.where(labels == i)[0]
- if len(inds) == 0:
- continue
- c_bboxes = bboxes[inds]
- c_scores = scores[inds]
- c_keep = nms(c_bboxes, c_scores, nms_thresh)
- keep[inds[c_keep]] = 1
- keep = np.where(keep > 0)
- scores = scores[keep]
- labels = labels[keep]
- bboxes = bboxes[keep]
- return scores, labels, bboxes
- ## multi-class NMS
- def multiclass_nms(scores, labels, bboxes, nms_thresh, num_classes, class_agnostic=False):
- if class_agnostic:
- return multiclass_nms_class_agnostic(scores, labels, bboxes, nms_thresh)
- else:
- return multiclass_nms_class_aware(scores, labels, bboxes, nms_thresh, num_classes)
- # ----------------- Customed NormLayer Ops -----------------
- class FrozenBatchNorm2d(torch.nn.Module):
- def __init__(self, n):
- super(FrozenBatchNorm2d, self).__init__()
- self.register_buffer("weight", torch.ones(n))
- self.register_buffer("bias", torch.zeros(n))
- self.register_buffer("running_mean", torch.zeros(n))
- self.register_buffer("running_var", torch.ones(n))
- def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
- missing_keys, unexpected_keys, error_msgs):
- num_batches_tracked_key = prefix + 'num_batches_tracked'
- if num_batches_tracked_key in state_dict:
- del state_dict[num_batches_tracked_key]
- super(FrozenBatchNorm2d, self)._load_from_state_dict(
- state_dict, prefix, local_metadata, strict,
- missing_keys, unexpected_keys, error_msgs)
- def forward(self, x):
- # move reshapes to the beginning
- # to make it fuser-friendly
- w = self.weight.reshape(1, -1, 1, 1)
- b = self.bias.reshape(1, -1, 1, 1)
- rv = self.running_var.reshape(1, -1, 1, 1)
- rm = self.running_mean.reshape(1, -1, 1, 1)
- eps = 1e-5
- scale = w * (rv + eps).rsqrt()
- bias = b - rm * scale
- return x * scale + bias
- class LayerNorm2D(nn.Module):
- def __init__(self, normalized_shape, norm_layer=nn.LayerNorm):
- super().__init__()
- self.ln = norm_layer(normalized_shape) if norm_layer is not None else nn.Identity()
- def forward(self, x):
- """
- x: N C H W
- """
- x = x.permute(0, 2, 3, 1)
- x = self.ln(x)
- x = x.permute(0, 3, 1, 2)
- return x
- # ----------------- Basic CNN Ops -----------------
- def get_conv2d(c1, c2, k, p, s, g, bias=False):
- conv = nn.Conv2d(c1, c2, k, stride=s, padding=p, groups=g, bias=bias)
- return conv
- def get_activation(act_type=None):
- if act_type == 'relu':
- return nn.ReLU(inplace=True)
- elif act_type == 'lrelu':
- return nn.LeakyReLU(0.1, inplace=True)
- elif act_type == 'mish':
- return nn.Mish(inplace=True)
- elif act_type == 'silu':
- return nn.SiLU(inplace=True)
- elif act_type == 'gelu':
- return nn.GELU()
- elif act_type is None:
- return nn.Identity()
- else:
- raise NotImplementedError
-
- def get_norm(norm_type, dim):
- if norm_type == 'BN':
- return nn.BatchNorm2d(dim)
- elif norm_type == 'GN':
- return nn.GroupNorm(num_groups=32, num_channels=dim)
- elif norm_type is None:
- return nn.Identity()
- else:
- raise NotImplementedError
- class BasicConv(nn.Module):
- def __init__(self,
- in_dim, # in channels
- out_dim, # out channels
- kernel_size=1, # kernel size
- padding=0, # padding
- stride=1, # padding
- act_type :str = 'lrelu', # activation
- norm_type :str = 'BN', # normalization
- ):
- super(BasicConv, self).__init__()
- add_bias = False if norm_type else True
- self.conv = get_conv2d(in_dim, out_dim, k=kernel_size, p=padding, s=stride, g=1, bias=add_bias)
- self.norm = get_norm(norm_type, out_dim)
- self.act = get_activation(act_type)
- def forward(self, x):
- return self.act(self.norm(self.conv(x)))
- class UpSampleWrapper(nn.Module):
- """Upsample last feat map to specific stride."""
- def __init__(self, in_dim, upsample_factor):
- super(UpSampleWrapper, self).__init__()
- # ---------- Basic parameters ----------
- self.upsample_factor = upsample_factor
- # ---------- Network parameters ----------
- if upsample_factor == 1:
- self.upsample = nn.Identity()
- else:
- scale = int(math.log2(upsample_factor))
- dim = in_dim
- layers = []
- for _ in range(scale-1):
- layers += [
- nn.ConvTranspose2d(dim, dim, kernel_size=2, stride=2),
- LayerNorm2D(dim),
- nn.GELU()
- ]
- layers += [nn.ConvTranspose2d(dim, dim, kernel_size=2, stride=2)]
- self.upsample = nn.Sequential(*layers)
- self.out_dim = dim
- def forward(self, x):
- x = self.upsample(x)
- return x
- # ----------------- MLP modules -----------------
- class MLP(nn.Module):
- def __init__(self, in_dim, hidden_dim, out_dim, num_layers):
- super().__init__()
- self.num_layers = num_layers
- h = [hidden_dim] * (num_layers - 1)
- self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([in_dim] + h, h + [out_dim]))
- def forward(self, x):
- for i, layer in enumerate(self.layers):
- x = nn.functional.relu(layer(x)) if i < self.num_layers - 1 else layer(x)
- return x
- class FFN(nn.Module):
- def __init__(self, d_model=256, mlp_ratio=4.0, dropout=0., act_type='relu', pre_norm=False):
- super().__init__()
- # ----------- Basic parameters -----------
- self.pre_norm = pre_norm
- self.fpn_dim = round(d_model * mlp_ratio)
- # ----------- Network parameters -----------
- self.linear1 = nn.Linear(d_model, self.fpn_dim)
- self.activation = get_activation(act_type)
- self.dropout2 = nn.Dropout(dropout)
- self.linear2 = nn.Linear(self.fpn_dim, d_model)
- self.dropout3 = nn.Dropout(dropout)
- self.norm = nn.LayerNorm(d_model)
- def forward(self, src):
- if self.pre_norm:
- src = self.norm(src)
- src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
- src = src + self.dropout3(src2)
- else:
- src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
- src = src + self.dropout3(src2)
- src = self.norm(src)
-
- return src
-
- # ----------------- Attention Ops -----------------
- class GlobalCrossAttention(nn.Module):
- def __init__(
- self,
- dim :int = 256,
- num_heads :int = 8,
- qkv_bias :bool = True,
- qk_scale :float = None,
- attn_drop :float = 0.0,
- proj_drop :float = 0.0,
- rpe_hidden_dim :int = 512,
- feature_stride :int = 16,
- ):
- super().__init__()
- # --------- Basic parameters ---------
- self.dim = dim
- self.num_heads = num_heads
- head_dim = dim // num_heads
- self.scale = qk_scale or head_dim ** -0.5
- self.feature_stride = feature_stride
- # --------- Network parameters ---------
- self.cpb_mlp1 = self.build_cpb_mlp(2, rpe_hidden_dim, num_heads)
- self.cpb_mlp2 = self.build_cpb_mlp(2, rpe_hidden_dim, num_heads)
- self.q = nn.Linear(dim, dim, bias=qkv_bias)
- self.k = nn.Linear(dim, dim, bias=qkv_bias)
- self.v = nn.Linear(dim, dim, bias=qkv_bias)
- self.attn_drop = nn.Dropout(attn_drop)
- self.proj = nn.Linear(dim, dim)
- self.proj_drop = nn.Dropout(proj_drop)
- self.softmax = nn.Softmax(dim=-1)
- def build_cpb_mlp(self, in_dim, hidden_dim, out_dim):
- cpb_mlp = nn.Sequential(nn.Linear(in_dim, hidden_dim, bias=True),
- nn.ReLU(inplace=True),
- nn.Linear(hidden_dim, out_dim, bias=False))
- return cpb_mlp
- def forward(
- self,
- query,
- reference_points,
- k_input_flatten,
- v_input_flatten,
- input_spatial_shapes,
- input_padding_mask=None,
- ):
- assert input_spatial_shapes.size(0) == 1, 'This is designed for single-scale decoder.'
- h, w = input_spatial_shapes[0]
- stride = self.feature_stride
- ref_pts = torch.cat([
- reference_points[:, :, :, :2] - reference_points[:, :, :, 2:] / 2,
- reference_points[:, :, :, :2] + reference_points[:, :, :, 2:] / 2,
- ], dim=-1) # B, nQ, 1, 4
- pos_x = torch.linspace(0.5, w - 0.5, w, dtype=torch.float32, device=w.device)[None, None, :, None] * stride # 1, 1, w, 1
- pos_y = torch.linspace(0.5, h - 0.5, h, dtype=torch.float32, device=h.device)[None, None, :, None] * stride # 1, 1, h, 1
- delta_x = ref_pts[..., 0::2] - pos_x # B, nQ, w, 2
- delta_y = ref_pts[..., 1::2] - pos_y # B, nQ, h, 2
- rpe_x, rpe_y = self.cpb_mlp1(delta_x), self.cpb_mlp2(delta_y) # B, nQ, w/h, nheads
- rpe = (rpe_x[:, :, None] + rpe_y[:, :, :, None]).flatten(2, 3) # B, nQ, h, w, nheads -> B, nQ, h*w, nheads
- rpe = rpe.permute(0, 3, 1, 2)
- B_, N, C = k_input_flatten.shape
- k = self.k(k_input_flatten).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
- v = self.v(v_input_flatten).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
- B_, N, C = query.shape
- q = self.q(query).reshape(B_, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
- q = q * self.scale
- attn = q @ k.transpose(-2, -1)
- attn += rpe
- if input_padding_mask is not None:
- attn += input_padding_mask[:, None, None] * -100
- fmin, fmax = torch.finfo(attn.dtype).min, torch.finfo(attn.dtype).max
- torch.clip_(attn, min=fmin, max=fmax)
- attn = self.softmax(attn)
- attn = self.attn_drop(attn)
- x = attn @ v
- x = x.transpose(1, 2).reshape(B_, N, C)
- x = self.proj(x)
- x = self.proj_drop(x)
- return x
|