Bladeren bron

enhanced ctrnet

yjh0410 1 jaar geleden
bovenliggende
commit
c17bcbd6f9

+ 56 - 0
config/model_config/ctrnet_config.py

@@ -0,0 +1,56 @@
+# Enhanced CenterNet
+
+
+ctrnet_cfg = {
+    'ctrnet_n':{
+        # ---------------- Model config ----------------
+        ## Backbone
+        'bk_pretrained': True,
+        'bk_act': 'silu',
+        'bk_norm': 'BN',
+        'bk_depthwise': False,
+        'width': 0.25,
+        'depth': 0.34,
+        'ratio': 2.0,
+        'max_stride': 32,
+        'out_stride': 4,
+        ## Neck
+        'neck': 'sppf',
+        'neck_expand_ratio': 0.5,
+        'pooling_size': 5,
+        'neck_act': 'silu',
+        'neck_norm': 'BN',
+        'neck_depthwise': False,
+        ## Decoder
+        'dec_act': 'silu',
+        'dec_norm': 'BN',
+        'dec_depthwise': False,
+        ## Head
+        'head': 'decoupled_head',
+        'num_cls_head': 2,
+        'num_reg_head': 2,
+        'head_act': 'silu',
+        'head_norm': 'BN',
+        'head_depthwise': False,  
+        # ---------------- Train config ----------------
+        ## input
+        'multi_scale': [0.5, 1.25],   # 320 -> 800
+        'trans_type': 'yolox_n',
+        # ---------------- Assignment config ----------------
+        ## Matcher
+        'matcher': "aligned_simota",
+        'matcher_hpy': {'main' : {'soft_center_radius': 3.0,
+                                  'topk_candidates': 1},   # one-to-one assignment
+                        'aux'  : {'soft_center_radius': 3.0,
+                                  'topk_candidates': 13},  # one-to-many assignment
+                                  },
+        # ---------------- Loss config ----------------
+        ## loss weight
+        'loss_cls_weight': 1.0,
+        'loss_box_weight': 2.0,
+        # ---------------- Train config ----------------
+        'trainer_type': 'rtcdet',
+    },
+
+}
+

+ 0 - 5
models/detectors/centernet/centernet.py

@@ -1,5 +0,0 @@
-# Objects as Points
-
-
-class CenterNet():
-    pass

+ 1 - 0
models/detectors/ctrnet/README.md

@@ -0,0 +1 @@
+# Enchanced CenterNet

+ 0 - 0
models/detectors/ctrnet/build.py


+ 164 - 0
models/detectors/ctrnet/ctrnet.py

@@ -0,0 +1,164 @@
+# Objects as Points
+
+# --------------- Torch components ---------------
+import torch
+import torch.nn as nn
+
+# --------------- Model components ---------------
+from .ctrnet_encoder import build_encoder
+from .ctrnet_decoder import build_decoder
+from .ctrnet_neck    import build_neck
+from .ctrnet_head    import build_det_head
+from .ctrnet_pred    import build_det_pred
+
+
+# CenterNet
+class CenterNet():
+    def __init__(self,
+                 cfg,
+                 device,
+                 num_classes = 20,
+                 conf_thresh = 0.01,
+                 topk        = 1000,
+                 trainable   = False,
+                 deploy      = False,
+                 no_multi_labels = False,
+                 nms_class_agnostic = False,
+                 ):
+        super(CenterNet, self).__init__()
+        # ---------------- Basic Parameters ----------------
+        self.cfg = cfg
+        self.device = device
+        self.stride = cfg['out_stride']
+        self.num_classes = num_classes
+        self.trainable = trainable
+        self.conf_thresh = conf_thresh
+        self.num_classes = num_classes
+        self.topk_candidates = topk
+        self.deploy = deploy
+        self.no_multi_labels = no_multi_labels
+        self.nms_class_agnostic = nms_class_agnostic
+        self.head_dim = round(128 * cfg['width'])
+        
+        # ---------------- Network Parameters ----------------
+        ## Encoder
+        self.encoder, feat_dims = build_encoder(cfg, pretrained=cfg['bk_pretrained']&trainable)
+
+        ## Neck
+        self.neck = build_neck(cfg, feat_dims[-1], feat_dims[-1])
+        self.feat_dim = self.neck.out_dim
+        
+        ## Decoder
+        self.decoder = build_decoder(cfg, self.feat_dim, self.head_dim)
+
+        ## Head
+        self.det_head = nn.Sequential(
+            build_det_head(cfg, self.head_dim, self.head_dim),
+            build_det_pred(self.head_dim, self.head_dim, self.stride, num_classes, 4)
+        )
+        ## Aux Head
+        self.aux_det_head = nn.Sequential(
+            build_det_head(cfg, self.head_dim, self.head_dim),
+            build_det_pred(self.head_dim, self.head_dim, self.stride, num_classes, 4)
+        )
+
+    # Post process
+    def post_process(self, cls_pred, box_pred):
+        """
+        Input:
+            cls_pred: torch.Tensor -> [M, C]
+            box_pred: torch.Tensor -> [M, 4]
+        Output:
+            bboxes: np.array -> [N, 4]
+            scores: np.array -> [N,]
+            labels: np.array -> [N,]
+        """
+        cls_pred = cls_pred[0]
+        box_pred = box_pred[0]
+        if self.no_multi_labels:
+            # [M,]
+            scores, labels = torch.max(cls_pred.sigmoid(), dim=1)
+
+            # Keep top k top scoring indices only.
+            num_topk = min(self.topk_candidates, box_pred.size(0))
+
+            # topk candidates
+            predicted_prob, topk_idxs = scores.sort(descending=True)
+            topk_scores = predicted_prob[:num_topk]
+            topk_idxs = topk_idxs[:num_topk]
+
+            # filter out the proposals with low confidence score
+            keep_idxs = topk_scores > self.conf_thresh
+            scores = topk_scores[keep_idxs]
+            topk_idxs = topk_idxs[keep_idxs]
+
+            labels = labels[topk_idxs]
+            bboxes = box_pred[topk_idxs]
+        else:
+            # [M, C] -> [MC,]
+            scores = cls_pred.sigmoid().flatten()
+
+            # Keep top k top scoring indices only.
+            num_topk = min(self.topk_candidates, box_pred.size(0))
+
+            # torch.sort is actually faster than .topk (at least on GPUs)
+            predicted_prob, topk_idxs = scores.sort(descending=True)
+            topk_scores = predicted_prob[:num_topk]
+            topk_idxs = topk_idxs[:num_topk]
+
+            # filter out the proposals with low confidence score
+            keep_idxs = topk_scores > self.conf_thresh
+            scores = topk_scores[keep_idxs]
+            topk_idxs = topk_idxs[keep_idxs]
+
+            anchor_idxs = torch.div(topk_idxs, self.num_classes, rounding_mode='floor')
+            labels = topk_idxs % self.num_classes
+
+            bboxes = box_pred[anchor_idxs]
+
+        # to cpu & numpy
+        scores = scores.cpu().numpy()
+        labels = labels.cpu().numpy()
+        bboxes = bboxes.cpu().numpy()
+
+        return bboxes, scores, labels
+    
+    # Main process
+    def forward(self, x):
+        # ---------------- Backbone ----------------
+        pyramid_feats = self.encoder(x)
+
+        # ---------------- Neck ----------------
+        feat = self.neck(pyramid_feats[-1])
+
+        # ---------------- Encoder ----------------
+        feat = self.decoder(feat)
+
+        # ---------------- Head ----------------
+        outputs = self.det_head(x)
+        if self.trainable:
+            outputs['aux_outputs'] = self.aux_det_head(x)
+
+        # ---------------- Post-process ----------------
+        if not self.trainable:
+            cls_preds = outputs['pred_cls']
+            box_preds = outputs['pred_box']
+
+            if self.deploy:
+                scores = cls_preds[0].sigmoid()
+                bboxes = box_preds[0]
+                # [n_anchors_all, 4 + C]
+                outputs = torch.cat([bboxes, scores], dim=-1)
+
+            else:
+                # post process
+                bboxes, scores, labels = self.post_process(cls_preds, box_preds)
+
+                outputs = {
+                    "scores": scores,
+                    "labels": labels,
+                    "bboxes": bboxes
+                }
+            
+        return outputs
+    

+ 221 - 0
models/detectors/ctrnet/ctrnet_basic.py

@@ -0,0 +1,221 @@
+import math
+import torch
+import torch.nn as nn
+import torchvision.ops
+
+
+# --------------------- Basic modules ---------------------
+def get_conv2d(c1, c2, k, p, s, d, g, bias=False):
+    conv = nn.Conv2d(c1, c2, k, stride=s, padding=p, dilation=d, groups=g, bias=bias)
+
+    return conv
+
+def get_activation(act_type=None):
+    if act_type == 'relu':
+        return nn.ReLU(inplace=True)
+    elif act_type == 'lrelu':
+        return nn.LeakyReLU(0.1, inplace=True)
+    elif act_type == 'mish':
+        return nn.Mish(inplace=True)
+    elif act_type == 'silu':
+        return nn.SiLU(inplace=True)
+    elif act_type is None:
+        return nn.Identity()
+    else:
+        raise NotImplementedError
+        
+def get_norm(norm_type, dim):
+    if norm_type == 'BN':
+        return nn.BatchNorm2d(dim)
+    elif norm_type == 'GN':
+        return nn.GroupNorm(num_groups=32, num_channels=dim)
+    elif norm_type is None:
+        return nn.Identity()
+    else:
+        raise NotImplementedError
+
+class Conv(nn.Module):
+    def __init__(self, 
+                 c1,                   # in channels
+                 c2,                   # out channels 
+                 k=1,                  # kernel size 
+                 p=0,                  # padding
+                 s=1,                  # padding
+                 d=1,                  # dilation
+                 act_type='lrelu',     # activation
+                 norm_type='BN',       # normalization
+                 depthwise=False):
+        super(Conv, self).__init__()
+        convs = []
+        add_bias = False if norm_type else True
+        if depthwise:
+            convs.append(get_conv2d(c1, c1, k=k, p=p, s=s, d=d, g=c1, bias=add_bias))
+            # depthwise conv
+            if norm_type:
+                convs.append(get_norm(norm_type, c1))
+            if act_type:
+                convs.append(get_activation(act_type))
+            # pointwise conv
+            convs.append(get_conv2d(c1, c2, k=1, p=0, s=1, d=d, g=1, bias=add_bias))
+            if norm_type:
+                convs.append(get_norm(norm_type, c2))
+            if act_type:
+                convs.append(get_activation(act_type))
+
+        else:
+            convs.append(get_conv2d(c1, c2, k=k, p=p, s=s, d=d, g=1, bias=add_bias))
+            if norm_type:
+                convs.append(get_norm(norm_type, c2))
+            if act_type:
+                convs.append(get_activation(act_type))
+            
+        self.convs = nn.Sequential(*convs)
+
+
+    def forward(self, x):
+        return self.convs(x)
+
+class DeConv(nn.Module):
+    def __init__(self,
+                 in_dim      :int,
+                 out_dim     :int,
+                 kernel_size :int  = 4,
+                 stride      :int  = 2,
+                 act_type    :str  = 'silu',
+                 norm_type   :str  = 'BN'
+                 ):
+        super(DeConv, self).__init__()
+        # ----------- Basic parameters -----------
+        if kernel_size == 4:
+            padding = 1
+            output_padding = 0
+        elif kernel_size == 3:
+            padding = 1
+            output_padding = 1
+        elif kernel_size == 2:
+            padding = 0
+            output_padding = 0
+
+        # ----------- Network parameters -----------
+        self.convs = nn.Sequential(
+            nn.ConvTranspose2d(in_dim, out_dim, kernel_size, stride=stride, padding=padding, output_padding=output_padding),
+            get_norm(norm_type, out_dim),
+            get_activation(act_type)
+        )
+
+    def forward(self, x):
+        return self.convs(x)
+    
+class DeformableConv(nn.Module):
+    def __init__(self,
+                 in_dim  :int,
+                 out_dim :int,
+                 kernel_size  :int = 3,
+                 stride       :int = 1,
+                 padding      :int = 1):
+        super(DeformableConv, self).__init__()
+        self.in_dim = in_dim
+        self.out_dim = out_dim
+        self.kernel_size = kernel_size
+        self.stride = stride if type(stride) == tuple else (stride, stride)
+        self.padding = padding
+        
+        # init weight and bias
+        self.weight = nn.Parameter(torch.Tensor(out_dim, in_dim, kernel_size, kernel_size))
+        self.bias = nn.Parameter(torch.Tensor(out_dim))
+
+        # offset conv
+        self.conv_offset_mask = nn.Conv2d(in_dim, 
+                                          3 * kernel_size * kernel_size,
+                                          kernel_size=kernel_size, 
+                                          stride=stride,
+                                          padding=self.padding, 
+                                          bias=True)
+        
+        # init        
+        self.reset_parameters()
+        self._init_weight()
+
+
+    def reset_parameters(self):
+        n = self.in_dim * (self.kernel_size**2)
+        stdv = 1. / math.sqrt(n)
+        self.weight.data.uniform_(-stdv, stdv)
+        self.bias.data.zero_()
+
+
+    def _init_weight(self):
+        # init offset_mask conv
+        nn.init.constant_(self.conv_offset_mask.weight, 0.)
+        nn.init.constant_(self.conv_offset_mask.bias, 0.)
+
+
+    def forward(self, x):
+        out = self.conv_offset_mask(x)
+        o1, o2, mask = torch.chunk(out, 3, dim=1)
+        offset = torch.cat((o1, o2), dim=1)
+        mask = torch.sigmoid(mask)
+
+        x = torchvision.ops.deform_conv2d(input=x, 
+                                          offset=offset, 
+                                          weight=self.weight, 
+                                          bias=self.bias, 
+                                          padding=self.padding,
+                                          mask=mask,
+                                          stride=self.stride)
+        return x
+    
+
+# --------------------- Yolov8 modules ---------------------
+## Yolov8-style BottleNeck
+class Bottleneck(nn.Module):
+    def __init__(self,
+                 in_dim,
+                 out_dim,
+                 expand_ratio = 0.5,
+                 kernel_sizes = [3, 3],
+                 shortcut     = True,
+                 act_type     = 'silu',
+                 norm_type    = 'BN',
+                 depthwise    = False,):
+        super(Bottleneck, self).__init__()
+        inter_dim = int(out_dim * expand_ratio)  # hidden channels            
+        self.cv1 = Conv(in_dim, inter_dim, k=kernel_sizes[0], p=kernel_sizes[0]//2, norm_type=norm_type, act_type=act_type, depthwise=depthwise)
+        self.cv2 = Conv(inter_dim, out_dim, k=kernel_sizes[1], p=kernel_sizes[1]//2, norm_type=norm_type, act_type=act_type, depthwise=depthwise)
+        self.shortcut = shortcut and in_dim == out_dim
+
+    def forward(self, x):
+        h = self.cv2(self.cv1(x))
+
+        return x + h if self.shortcut else h
+
+# Yolov8-style StageBlock
+class RTCBlock(nn.Module):
+    def __init__(self,
+                 in_dim,
+                 out_dim,
+                 num_blocks = 1,
+                 shortcut   = False,
+                 act_type   = 'silu',
+                 norm_type  = 'BN',
+                 depthwise  = False,):
+        super(RTCBlock, self).__init__()
+        self.inter_dim = out_dim // 2
+        self.input_proj = Conv(in_dim, out_dim, k=1, act_type=act_type, norm_type=norm_type)
+        self.m = nn.Sequential(*(
+            Bottleneck(self.inter_dim, self.inter_dim, 1.0, [3, 3], shortcut, act_type, norm_type, depthwise)
+            for _ in range(num_blocks)))
+        self.output_proj = Conv((2 + num_blocks) * self.inter_dim, out_dim, k=1, act_type=act_type, norm_type=norm_type)
+
+    def forward(self, x):
+        # Input proj
+        x1, x2 = torch.chunk(self.input_proj(x), 2, dim=1)
+        out = list([x1, x2])
+
+        # Bottlenecl
+        out.extend(m(out[-1]) for m in self.m)
+
+        # Output proj
+        out = self.output_proj(torch.cat(out, dim=1))
+
+        return out

+ 47 - 0
models/detectors/ctrnet/ctrnet_decoder.py

@@ -0,0 +1,47 @@
+import math
+import torch.nn as nn
+
+from .ctrnet_basic import DeConv, RTCBlock
+
+
+def build_decoder(cfg, in_dim, out_dim):
+    return CTRDecoder(in_dim     = in_dim,
+                      out_dim    = out_dim,
+                      max_stride = cfg['max_stride'],
+                      out_stride = cfg['out_stride'],
+                      act_type   = cfg['dec_act'],
+                      norm_type  = cfg['dec_norm'],
+                      depthwise  = cfg['dec_depthwise']
+                      )
+
+
+class CTRDecoder(nn.Module):
+    def __init__(self,
+                 in_dim     :int,
+                 out_dim    :int,
+                 max_stride :int,
+                 out_stride :int,
+                 act_type   :str,
+                 norm_type  :str,
+                 depthwise  :bool
+                 ):
+        super().__init__()
+        # ---------- Basic parameters ----------
+        self.in_dim = in_dim
+        self.out_dim = out_dim
+        self.out_stride = out_stride
+        self.num_layers = round(math.log2(max_stride // out_stride))
+
+        # ---------- Network parameters ----------
+        layers = []
+        for _ in range(self.num_layers):
+            layer = nn.Sequential(
+                RTCBlock(in_dim, out_dim, 1, False, act_type, norm_type, depthwise),
+                DeConv(out_dim, out_dim, kernel_size=4, stride=2, act_type=act_type, norm_type=norm_type)
+            )
+            layers.append(layer)
+            in_dim = out_dim
+        self.layers = nn.Sequential(*layers)
+
+    def forward(self, x):
+        return self.layers(x)

+ 180 - 0
models/detectors/ctrnet/ctrnet_encoder.py

@@ -0,0 +1,180 @@
+import torch
+import torch.nn as nn
+
+try:
+    from .ctrnet_basic import Conv, RTCBlock
+except:
+    from ctrnet_basic import Conv, RTCBlock
+
+
+# MIM-pretrained weights
+model_urls = {
+    "rtcnet_n": None,
+    "rtcnet_t": None,
+    "rtcnet_s": None,
+    "rtcnet_m": None,
+    "rtcnet_l": None,
+    "rtcnet_x": None,
+}
+
+
+# ---------------------------- Basic functions ----------------------------
+## Real-time Convolutional Backbone
+class CTREncoder(nn.Module):
+    def __init__(self, width=1.0, depth=1.0, ratio=1.0, act_type='silu', norm_type='BN', depthwise=False):
+        super(CTREncoder, self).__init__()
+        # ---------------- Basic parameters ----------------
+        self.width_factor = width
+        self.depth_factor = depth
+        self.last_stage_factor = ratio
+        self.feat_dims = [round(64 * width), round(128 * width), round(256 * width), round(512 * width), round(512 * width * ratio)]
+        # ---------------- Network parameters ----------------
+        ## P1/2
+        self.layer_1 = Conv(3, self.feat_dims[0], k=3, p=1, s=2, act_type=act_type, norm_type=norm_type)
+        ## P2/4
+        self.layer_2 = nn.Sequential(
+            Conv(self.feat_dims[0], self.feat_dims[1], k=3, p=1, s=2, act_type=act_type, norm_type=norm_type),
+            RTCBlock(in_dim     = self.feat_dims[1],
+                     out_dim    = self.feat_dims[1],
+                     num_blocks = round(3*depth),
+                     shortcut   = True,
+                     act_type   = act_type,
+                     norm_type  = norm_type,
+                     depthwise  = depthwise)
+        )
+        ## P3/8
+        self.layer_3 = nn.Sequential(
+            Conv(self.feat_dims[1], self.feat_dims[2], k=3, p=1, s=2, act_type=act_type, norm_type=norm_type),
+            RTCBlock(in_dim     = self.feat_dims[2],
+                     out_dim    = self.feat_dims[2],
+                     num_blocks = round(6*depth),
+                     shortcut   = True,
+                     act_type   = act_type,
+                     norm_type  = norm_type,
+                     depthwise  = depthwise)
+        )
+        ## P4/16
+        self.layer_4 = nn.Sequential(
+            Conv(self.feat_dims[2], self.feat_dims[3], k=3, p=1, s=2, act_type=act_type, norm_type=norm_type),
+            RTCBlock(in_dim     = self.feat_dims[3],
+                     out_dim    = self.feat_dims[3],
+                     num_blocks = round(6*depth),
+                     shortcut   = True,
+                     act_type   = act_type,
+                     norm_type  = norm_type,
+                     depthwise  = depthwise)
+        )
+        ## P5/32
+        self.layer_5 = nn.Sequential(
+            Conv(self.feat_dims[3], self.feat_dims[4], k=3, p=1, s=2, act_type=act_type, norm_type=norm_type),
+            RTCBlock(in_dim     = self.feat_dims[4],
+                     out_dim    = self.feat_dims[4],
+                     num_blocks = round(3*depth),
+                     shortcut   = True,
+                     act_type   = act_type,
+                     norm_type  = norm_type,
+                     depthwise  = depthwise)
+        )
+
+    def forward(self, x):
+        c1 = self.layer_1(x)
+        c2 = self.layer_2(c1)
+        c3 = self.layer_3(c2)
+        c4 = self.layer_4(c3)
+        c5 = self.layer_5(c4)
+
+        outputs = [c3, c4, c5]
+
+        return outputs
+
+
+# ---------------------------- Functions ----------------------------
+## build Backbone
+def build_encoder(cfg, pretrained=False): 
+    # build backbone model
+    backbone = CTREncoder(width=cfg['width'],
+                          depth=cfg['depth'],
+                          ratio=cfg['ratio'],
+                          act_type=cfg['bk_act'],
+                          norm_type=cfg['bk_norm'],
+                          depthwise=cfg['bk_depthwise']
+                          )
+    feat_dims = backbone.feat_dims[-3:]
+
+    # load pretrained weight
+    if pretrained:
+        backbone = load_pretrained_weight(backbone)
+        
+    return backbone, feat_dims
+
+## load pretrained weight
+def load_pretrained_weight(model):
+    # Model name
+    width, depth, ratio = model.width_factor, model.depth_factor, model.last_stage_factor
+    if width == 0.25 and depth == 0.34 and ratio == 2.0:
+        model_name = "rtcnet_n"
+    elif width == 0.375 and depth == 0.34 and ratio == 2.0:
+        model_name = "rtcnet_t"
+    elif width == 0.50 and depth == 0.34 and ratio == 2.0:
+        model_name = "rtcnet_s"
+    elif width == 0.75 and depth == 0.67 and ratio == 1.5:
+        model_name = "rtcnet_m"
+    elif width == 1.0 and depth == 1.0 and ratio == 1.0:
+        model_name = "rtcnet_l"
+    elif width == 1.25 and depth == 1.34 and ratio == 1.0:
+        model_name = "rtcnet_x"
+    
+    # Load pretrained weight
+    url = model_urls[model_name]
+    if url is not None:
+        print('Loading pretrained weight ...')
+        checkpoint = torch.hub.load_state_dict_from_url(
+            url=url, map_location="cpu", check_hash=True)
+        # checkpoint state dict
+        checkpoint_state_dict = checkpoint.pop("model")
+        # model state dict
+        model_state_dict = model.state_dict()
+        # check
+        for k in list(checkpoint_state_dict.keys()):
+            if k in model_state_dict:
+                shape_model = tuple(model_state_dict[k].shape)
+                shape_checkpoint = tuple(checkpoint_state_dict[k].shape)
+                if shape_model != shape_checkpoint:
+                    checkpoint_state_dict.pop(k)
+            else:
+                checkpoint_state_dict.pop(k)
+                print(k)
+        # load the weight
+        model.load_state_dict(checkpoint_state_dict)
+    else:
+        print('No backbone pretrained for {}.'.format(model_name))
+
+    return model
+
+
+if __name__ == '__main__':
+    import time
+    from thop import profile
+    cfg = {
+        'bk_act': 'silu',
+        'bk_norm': 'BN',
+        'bk_depthwise': False,
+        'width': 1.0,
+        'depth': 1.0,
+        'ratio': 1.0,
+    }
+    model, feats = build_encoder(cfg)
+    x = torch.randn(1, 3, 640, 640)
+    t0 = time.time()
+    outputs = model(x)
+    t1 = time.time()
+    print('Time: ', t1 - t0)
+    for out in outputs:
+        print(out.shape)
+
+    x = torch.randn(1, 3, 640, 640)
+    print('==============================')
+    flops, params = profile(model, inputs=(x, ), verbose=False)
+    print('==============================')
+    print('GFLOPs : {:.2f}'.format(flops / 1e9 * 2))
+    print('Params : {:.2f} M'.format(params / 1e6))

+ 139 - 0
models/detectors/ctrnet/ctrnet_head.py

@@ -0,0 +1,139 @@
+import torch
+import torch.nn as nn
+
+try:
+    from .ctrnet_basic import Conv
+except:
+    from ctrnet_basic import Conv
+
+
+def build_det_head(cfg, in_dim, out_dim):
+    head = SDetHead(in_dim       = in_dim,
+                    cls_head_dim = out_dim,
+                    reg_head_dim = out_dim,
+                    num_cls_head = cfg['num_cls_head'],
+                    num_reg_head = cfg['num_reg_head'],
+                    act_type     = cfg['head_act'],
+                    norm_type    = cfg['head_norm'],
+                    depthwise    = cfg['head_depthwise']
+                    )
+
+    return head
+
+
+# ---------------------------- Detection Head ----------------------------
+## Single-level Detection Head
+class SDetHead(nn.Module):
+    def __init__(self,
+                 in_dim       :int  = 256,
+                 cls_head_dim :int  = 256,
+                 reg_head_dim :int  = 256,
+                 num_cls_head :int  = 2,
+                 num_reg_head :int  = 2,
+                 act_type     :str  = "silu",
+                 norm_type    :str  = "BN",
+                 depthwise    :bool = False):
+        super().__init__()
+        # --------- Basic Parameters ----------
+        self.in_dim = in_dim
+        self.num_cls_head = num_cls_head
+        self.num_reg_head = num_reg_head
+        self.act_type = act_type
+        self.norm_type = norm_type
+        self.depthwise = depthwise
+        
+        # --------- Network Parameters ----------
+        ## cls head
+        cls_feats = []
+        self.cls_head_dim = cls_head_dim
+        for i in range(num_cls_head):
+            if i == 0:
+                cls_feats.append(
+                    Conv(in_dim, self.cls_head_dim, k=3, p=1, s=1, 
+                         act_type=act_type,
+                         norm_type=norm_type,
+                         depthwise=depthwise)
+                        )
+            else:
+                cls_feats.append(
+                    Conv(self.cls_head_dim, self.cls_head_dim, k=3, p=1, s=1, 
+                        act_type=act_type,
+                        norm_type=norm_type,
+                        depthwise=depthwise)
+                        )      
+        ## reg head
+        reg_feats = []
+        self.reg_head_dim = reg_head_dim
+        for i in range(num_reg_head):
+            if i == 0:
+                reg_feats.append(
+                    Conv(in_dim, self.reg_head_dim, k=3, p=1, s=1, 
+                         act_type=act_type,
+                         norm_type=norm_type,
+                         depthwise=depthwise)
+                        )
+            else:
+                reg_feats.append(
+                    Conv(self.reg_head_dim, self.reg_head_dim, k=3, p=1, s=1, 
+                         act_type=act_type,
+                         norm_type=norm_type,
+                         depthwise=depthwise)
+                        )
+        self.cls_feats = nn.Sequential(*cls_feats)
+        self.reg_feats = nn.Sequential(*reg_feats)
+
+        self.init_weights()
+        
+    def init_weights(self):
+        """Initialize the parameters."""
+        for m in self.modules():
+            if isinstance(m, torch.nn.Conv2d):
+                # In order to be consistent with the source code,
+                # reset the Conv2d initialization parameters
+                m.reset_parameters()
+
+    def forward(self, x):
+        """
+            in_feats: (Tensor) [B, C, H, W]
+        """
+        cls_feats = self.cls_feats(x)
+        reg_feats = self.reg_feats(x)
+
+        outputs = {
+            "cls_feat": cls_feats,
+            "reg_feat": reg_feats
+        }
+
+        return outputs
+    
+
+if __name__ == '__main__':
+    import time
+    from thop import profile
+    cfg = {
+        'head': 'decoupled_head',
+        'num_cls_head': 2,
+        'num_reg_head': 2,
+        'head_act': 'silu',
+        'head_norm': 'BN',
+        'head_depthwise': False,
+        'reg_max': 16,
+    }
+    fpn_dims = [256, 256, 256]
+    out_dim = 256
+    # Head-1
+    model = build_det_head(cfg, fpn_dims, out_dim, num_levels=3)
+    print(model)
+    fpn_feats = [torch.randn(1, fpn_dims[0], 80, 80), torch.randn(1, fpn_dims[1], 40, 40), torch.randn(1, fpn_dims[2], 20, 20)]
+    t0 = time.time()
+    outputs = model(fpn_feats)
+    t1 = time.time()
+    print('Time: ', t1 - t0)
+    # for out in outputs:
+    #     print(out.shape)
+
+    print('==============================')
+    flops, params = profile(model, inputs=(fpn_feats, ), verbose=False)
+    print('==============================')
+    print('Head-1: GFLOPs : {:.2f}'.format(flops / 1e9 * 2))
+    print('Head-1: Params : {:.2f} M'.format(params / 1e6))

+ 108 - 0
models/detectors/ctrnet/ctrnet_neck.py

@@ -0,0 +1,108 @@
+import torch
+import torch.nn as nn
+
+try:
+    from .ctrnet_basic import Conv
+except:
+    from ctrnet_basic import Conv
+
+
+# Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher
+class SPPF(nn.Module):
+    """
+        This code referenced to https://github.com/ultralytics/yolov5
+    """
+    def __init__(self, cfg, in_dim, out_dim, expand_ratio=0.5):
+        super().__init__()
+        # ---------------- Basic Parameters ----------------
+        inter_dim = int(in_dim * expand_ratio)
+        self.out_dim = out_dim
+        # ---------------- Network Parameters ----------------
+        self.cv1 = Conv(in_dim, inter_dim, k=1, act_type=cfg['neck_act'], norm_type=cfg['neck_norm'])
+        self.cv2 = Conv(inter_dim * 4, out_dim, k=1, act_type=cfg['neck_act'], norm_type=cfg['neck_norm'])
+        self.m = nn.MaxPool2d(kernel_size=cfg['pooling_size'], stride=1, padding=cfg['pooling_size'] // 2)
+
+    def forward(self, x):
+        x = self.cv1(x)
+        y1 = self.m(x)
+        y2 = self.m(y1)
+
+        return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1))
+
+
+# SPPF block with CSP module
+class SPPFBlockCSP(nn.Module):
+    """
+        CSP Spatial Pyramid Pooling Block
+    """
+    def __init__(self, cfg, in_dim, out_dim, expand_ratio):
+        super(SPPFBlockCSP, self).__init__()
+        # ---------------- Basic Parameters ----------------
+        inter_dim = int(in_dim * expand_ratio)
+        self.out_dim = out_dim
+        # ---------------- Network Parameters ----------------
+        self.cv1 = Conv(in_dim, inter_dim, k=1, act_type=cfg['neck_act'], norm_type=cfg['neck_norm'])
+        self.cv2 = Conv(in_dim, inter_dim, k=1, act_type=cfg['neck_act'], norm_type=cfg['neck_norm'])
+        self.m = nn.Sequential(
+            Conv(inter_dim, inter_dim, k=3, p=1, 
+                 act_type=cfg['neck_act'], norm_type=cfg['neck_norm'], 
+                 depthwise=cfg['neck_depthwise']),
+            SPPF(cfg, inter_dim, inter_dim, expand_ratio=1.0),
+            Conv(inter_dim, inter_dim, k=3, p=1, 
+                 act_type=cfg['neck_act'], norm_type=cfg['neck_norm'], 
+                 depthwise=cfg['neck_depthwise'])
+        )
+        self.cv3 = Conv(inter_dim * 2, self.out_dim, k=1, act_type=cfg['neck_act'], norm_type=cfg['neck_norm'])
+
+        
+    def forward(self, x):
+        x1 = self.cv1(x)
+        x2 = self.cv2(x)
+        x3 = self.m(x2)
+        y = self.cv3(torch.cat([x1, x3], dim=1))
+
+        return y
+
+
+def build_neck(cfg, in_dim, out_dim):
+    model = cfg['neck']
+    print('==============================')
+    print('Neck: {}'.format(model))
+    # build neck
+    if model == 'sppf':
+        neck = SPPF(cfg, in_dim, out_dim, cfg['neck_expand_ratio'])
+    elif model == 'csp_sppf':
+        neck = SPPFBlockCSP(cfg, in_dim, out_dim, cfg['neck_expand_ratio'])
+
+    return neck
+
+
+if __name__ == '__main__':
+    import time
+    from thop import profile
+    cfg = {
+        ## Neck: SPP
+        'neck': 'sppf',
+        'neck_expand_ratio': 0.5,
+        'pooling_size': 5,
+        'neck_act': 'silu',
+        'neck_norm': 'BN',
+        'neck_depthwise': False,
+    }
+    in_dim = 512
+    out_dim = 512
+    # Head-1
+    model = build_neck(cfg, in_dim, out_dim)
+    feat = torch.randn(1, in_dim, 20, 20)
+    t0 = time.time()
+    outputs = model(feat)
+    t1 = time.time()
+    print('Time: ', t1 - t0)
+    # for out in outputs:
+    #     print(out.shape)
+
+    print('==============================')
+    flops, params = profile(model, inputs=(feat, ), verbose=False)
+    print('==============================')
+    print('FPN: GFLOPs : {:.2f}'.format(flops / 1e9 * 2))
+    print('FPN: Params : {:.2f} M'.format(params / 1e6))

+ 96 - 0
models/detectors/ctrnet/ctrnet_pred.py

@@ -0,0 +1,96 @@
+import math
+import torch
+import torch.nn as nn
+
+
+def build_det_pred(cls_dim, reg_dim, stride, num_classes, num_coords=4):
+    pred_layers = SDetPDLayer(cls_dim     = cls_dim,
+                              reg_dim     = reg_dim,
+                              stride      = stride,
+                              num_classes = num_classes,
+                              num_coords  = num_coords) 
+
+    return pred_layers
+
+
+# ---------------------------- Detection predictor ----------------------------
+## Single-level Detection Prediction Layer
+class SDetPDLayer(nn.Module):
+    def __init__(self,
+                 cls_dim     :int = 256,
+                 reg_dim     :int = 256,
+                 stride      :int = 32,
+                 num_classes :int = 80,
+                 num_coords  :int = 4):
+        super().__init__()
+        # --------- Basic Parameters ----------
+        self.stride = stride
+        self.cls_dim = cls_dim
+        self.reg_dim = reg_dim
+        self.num_classes = num_classes
+        self.num_coords = num_coords
+
+        # --------- Network Parameters ----------
+        self.cls_pred = nn.Conv2d(cls_dim, num_classes, kernel_size=1)
+        self.reg_pred = nn.Conv2d(reg_dim, num_coords, kernel_size=1)                
+
+        self.init_bias()
+        
+    def init_bias(self):
+        # cls pred bias
+        b = self.cls_pred.bias.view(1, -1)
+        b.data.fill_(math.log(5 / self.num_classes / (640. / self.stride) ** 2))
+        self.cls_pred.bias = torch.nn.Parameter(b.view(-1), requires_grad=True)
+        # reg pred bias
+        b = self.reg_pred.bias.view(-1, )
+        b.data.fill_(1.0)
+        self.reg_pred.bias = torch.nn.Parameter(b.view(-1), requires_grad=True)
+
+    def generate_anchors(self, fmp_size):
+        """
+            fmp_size: (List) [H, W]
+        """
+        # generate grid cells
+        fmp_h, fmp_w = fmp_size
+        anchor_y, anchor_x = torch.meshgrid([torch.arange(fmp_h), torch.arange(fmp_w)])
+        # [H, W, 2] -> [HW, 2]
+        anchors = torch.stack([anchor_x, anchor_y], dim=-1).float().view(-1, 2)
+        anchors += 0.5  # add center offset
+        anchors *= self.stride
+
+        return anchors
+        
+    def forward(self, cls_feat, reg_feat):
+        # pred
+        cls_pred = self.cls_pred(cls_feat)
+        reg_pred = self.reg_pred(reg_feat)
+
+        # generate anchor boxes: [M, 4]
+        B, _, H, W = cls_pred.size()
+        fmp_size = [H, W]
+        anchors = self.generate_anchors(fmp_size)
+        anchors = anchors.to(cls_pred.device)
+        # stride tensor: [M, 1]
+        stride_tensor = torch.ones_like(anchors[..., :1]) * self.stride
+        
+        # [B, C, H, W] -> [B, H, W, C] -> [B, M, C]
+        cls_pred = cls_pred.permute(0, 2, 3, 1).contiguous().view(B, -1, self.num_classes)
+        reg_pred = reg_pred.permute(0, 2, 3, 1).contiguous().view(B, -1, 4)
+
+        # ---------------- Decode bbox ----------------
+        ctr_pred = reg_pred[..., :2] * self.stride + anchors[..., :2]
+        wh_pred = torch.exp(reg_pred[..., 2:]) * self.stride
+        pred_x1y1 = ctr_pred - wh_pred * 0.5
+        pred_x2y2 = ctr_pred + wh_pred * 0.5
+        box_pred = torch.cat([pred_x1y1, pred_x2y2], dim=-1)
+
+        # output dict
+        outputs = {"pred_cls": cls_pred,             # (Tensor) [B, M, C]
+                   "pred_reg": reg_pred,             # (Tensor) [B, M, 4]
+                   "pred_box": box_pred,             # (Tensor) [B, M, 4] 
+                   "anchors": anchors,               # (Tensor) [M, 2]
+                   "stride": self.stride,            # (Int)
+                   "stride_tensors": stride_tensor   # List(Tensor) [M, 1]
+                   }
+
+        return outputs

+ 231 - 0
models/detectors/ctrnet/loss.py

@@ -0,0 +1,231 @@
+import torch
+import torch.nn.functional as F
+
+from utils.box_ops import get_ious
+from utils.distributed_utils import get_world_size, is_dist_avail_and_initialized
+
+from .matcher import AlignedSimOTA
+
+
+class Criterion(object):
+    def __init__(self, args, cfg, device, num_classes=80):
+        self.args = args
+        self.cfg = cfg
+        self.device = device
+        self.num_classes = num_classes
+        self.max_epoch = args.max_epoch
+        self.no_aug_epoch = args.no_aug_epoch
+        self.aux_bbox_loss = False
+        # --------------- Loss config ---------------
+        self.loss_cls_weight = cfg['loss_cls_weight']
+        self.loss_box_weight = cfg['loss_box_weight']
+        # --------------- Matcher config ---------------
+        self.matcher_hpy = cfg['matcher_hpy']['main']
+        self.matcher = AlignedSimOTA(soft_center_radius = self.matcher_hpy['soft_center_radius'],
+                                     topk_candidates    = self.matcher_hpy['topk_candidates'],
+                                     num_classes        = num_classes,
+                                     )
+        # --------------- Aux Matcher config ---------------
+        self.aux_matcher_hpy = cfg['matcher_hpy']['aux']
+        self.aux_matcher = AlignedSimOTA(soft_center_radius = self.aux_matcher_hpy['soft_center_radius'],
+                                         topk_candidates    = self.aux_matcher_hpy['topk_candidates'],
+                                         num_classes        = num_classes,
+                                         )
+
+    # -------------------- Basic loss functions --------------------
+    def loss_classes(self, pred_cls, target, beta=2.0):
+        # Quality FocalLoss
+        """
+            pred_cls: (torch.Tensor): [N, C]。
+            target:   (tuple([torch.Tensor], [torch.Tensor])): label -> (N,), score -> (N)
+        """
+        label, score = target
+        pred_sigmoid = pred_cls.sigmoid()
+        scale_factor = pred_sigmoid
+        zerolabel = scale_factor.new_zeros(pred_cls.shape)
+
+        ce_loss = F.binary_cross_entropy_with_logits(
+            pred_cls, zerolabel, reduction='none') * scale_factor.pow(beta)
+        
+        bg_class_ind = pred_cls.shape[-1]
+        pos = ((label >= 0) & (label < bg_class_ind)).nonzero().squeeze(1)
+        pos_label = label[pos].long()
+
+        scale_factor = score[pos] - pred_sigmoid[pos, pos_label]
+
+        ce_loss[pos, pos_label] = F.binary_cross_entropy_with_logits(
+            pred_cls[pos, pos_label], score[pos],
+            reduction='none') * scale_factor.abs().pow(beta)
+
+        return ce_loss
+    
+    def loss_bboxes(self, pred_box, gt_box):
+        ious = get_ious(pred_box, gt_box, box_mode="xyxy", iou_type='giou')
+        loss_box = 1.0 - ious
+
+        return loss_box
+    
+    def loss_bboxes_aux(self, pred_reg, gt_box, anchors, stride_tensors):
+        # xyxy -> cxcy&bwbh
+        gt_cxcy = (gt_box[..., :2] + gt_box[..., 2:]) * 0.5
+        gt_bwbh = gt_box[..., 2:] - gt_box[..., :2]
+        # encode gt box
+        gt_cxcy_encode = (gt_cxcy - anchors) / stride_tensors
+        gt_bwbh_encode = torch.log(gt_bwbh / stride_tensors)
+        gt_box_encode = torch.cat([gt_cxcy_encode, gt_bwbh_encode], dim=-1)
+        # l1 loss
+        loss_box_aux = F.l1_loss(pred_reg, gt_box_encode, reduction='none')
+
+        return loss_box_aux
+
+
+    # -------------------- Task loss functions --------------------
+    def compute_loss(self, outputs, targets, aux_loss=False, epoch=0):
+        """
+            Input:
+                outputs: (Dict) -> {
+                    'pred_cls': (List[torch.Tensor] -> [B, M, Nc]),
+                    'pred_reg': (List[torch.Tensor] -> [B, M, 4]),
+                    'pred_box': (List[torch.Tensor] -> [B, M, 4]),
+                    'strides':  (List[Int])
+                }
+                target: (List[Dict]) [
+                    {'boxes':  (torch.Tensor) -> [N, 4], 
+                     'labels': (torch.Tensor) -> [N,],
+                     ...}, ...
+                     ]
+            Output:
+                loss_dict: (Dict) -> {
+                    'loss_cls': (torch.Tensor) It is a scalar.),
+                    'loss_box': (torch.Tensor) It is a scalar.),
+                    'loss_box_aux': (torch.Tensor) It is a scalar.),
+                    'losses':  (torch.Tensor) It is a scalar.),
+                }
+        """
+        bs = outputs['pred_cls'][0].shape[0]
+        device = outputs['pred_cls'][0].device
+        fpn_strides = outputs['strides']
+        anchors = outputs['anchors']
+        # preds: [B, M, C]
+        cls_preds = torch.cat(outputs['pred_cls'], dim=1)
+        box_preds = torch.cat(outputs['pred_box'], dim=1)
+        
+        # --------------- label assignment ---------------
+        cls_targets = []
+        box_targets = []
+        assign_metrics = []
+        for batch_idx in range(bs):
+            tgt_labels = targets[batch_idx]["labels"].to(device)  # [N,]
+            tgt_bboxes = targets[batch_idx]["boxes"].to(device)   # [N, 4]
+            if not aux_loss:
+                assigned_result = self.matcher(fpn_strides=fpn_strides,
+                                            anchors=anchors,
+                                            pred_cls=cls_preds[batch_idx].detach(),
+                                            pred_box=box_preds[batch_idx].detach(),
+                                            gt_labels=tgt_labels,
+                                            gt_bboxes=tgt_bboxes
+                                            )
+            else:
+                assigned_result = self.aux_matcher(fpn_strides=fpn_strides,
+                                                   anchors=anchors,
+                                                   pred_cls=cls_preds[batch_idx].detach(),
+                                                   pred_box=box_preds[batch_idx].detach(),
+                                                   gt_labels=tgt_labels,
+                                                   gt_bboxes=tgt_bboxes
+                                                   )
+            cls_targets.append(assigned_result['assigned_labels'])
+            box_targets.append(assigned_result['assigned_bboxes'])
+            assign_metrics.append(assigned_result['assign_metrics'])
+
+        # List[B, M, C] -> Tensor[BM, C]
+        cls_targets = torch.cat(cls_targets, dim=0)
+        box_targets = torch.cat(box_targets, dim=0)
+        assign_metrics = torch.cat(assign_metrics, dim=0)
+
+        # FG cat_id: [0, num_classes -1], BG cat_id: num_classes
+        bg_class_ind = self.num_classes
+        pos_inds = ((cls_targets >= 0) & (cls_targets < bg_class_ind)).nonzero().squeeze(1)
+        num_fgs = assign_metrics.sum()
+
+        if is_dist_avail_and_initialized():
+            torch.distributed.all_reduce(num_fgs)
+        num_fgs = (num_fgs / get_world_size()).clamp(1.0).item()
+
+        # ------------------ Classification loss ------------------
+        cls_preds = cls_preds.view(-1, self.num_classes)
+        loss_cls = self.loss_classes(cls_preds, (cls_targets, assign_metrics))
+        loss_cls = loss_cls.sum() / num_fgs
+
+        # ------------------ Regression loss ------------------
+        box_preds_pos = box_preds.view(-1, 4)[pos_inds]
+        box_targets_pos = box_targets[pos_inds]
+        loss_box = self.loss_bboxes(box_preds_pos, box_targets_pos)
+        loss_box = loss_box.sum() / num_fgs
+
+        # total loss
+        losses = self.loss_cls_weight * loss_cls + \
+                 self.loss_box_weight * loss_box
+
+        # ------------------ Aux regression loss ------------------
+        loss_box_aux = None
+        if epoch >= (self.max_epoch - self.no_aug_epoch - 1):
+            ## reg_preds
+            reg_preds = torch.cat(outputs['pred_reg'], dim=1)
+            reg_preds_pos = reg_preds.view(-1, 4)[pos_inds]
+            ## anchor tensors
+            anchors_tensors = torch.cat(outputs['anchors'], dim=0)[None].repeat(bs, 1, 1)
+            anchors_tensors_pos = anchors_tensors.view(-1, 2)[pos_inds]
+            ## stride tensors
+            stride_tensors = torch.cat(outputs['stride_tensors'], dim=0)[None].repeat(bs, 1, 1)
+            stride_tensors_pos = stride_tensors.view(-1, 1)[pos_inds]
+            ## aux loss
+            loss_box_aux = self.loss_bboxes_aux(reg_preds_pos, box_targets_pos, anchors_tensors_pos, stride_tensors_pos)
+            loss_box_aux = loss_box_aux.sum() / num_fgs
+
+            losses += loss_box_aux
+
+        # Loss dict
+        if loss_box_aux is None:
+            loss_dict = dict(
+                    loss_cls = loss_cls,
+                    loss_box = loss_box,
+                    losses = losses
+            )
+        else:
+            loss_dict = dict(
+                    loss_cls = loss_cls,
+                    loss_box = loss_box,
+                    loss_box_aux = loss_box_aux,
+                    losses = losses
+                    )
+
+        return loss_dict
+
+    def __call__(self, outputs, targets, epoch=0):
+        # -------------- Main loss --------------
+        main_loss_dict = self.compute_loss(outputs, targets, epoch)
+        
+        # -------------- Aux loss --------------
+        aux_loss_dict = self.compute_loss(outputs['aux_outputs'], targets, epoch)
+
+        # Reformat loss dict
+        loss_dict = dict()
+        loss_dict['losses'] = main_loss_dict['losses'] + aux_loss_dict['losses']
+        for k in main_loss_dict:
+            if k != 'losses':
+                loss_dict[k] = main_loss_dict[k]
+        for k in aux_loss_dict:
+            if k != 'losses':
+                loss_dict[k] = main_loss_dict[k]
+        
+        return loss_dict
+
+
+def build_criterion(args, cfg, device, num_classes):
+    criterion = Criterion(args, cfg, device, num_classes)
+
+    return criterion
+
+
+if __name__ == "__main__":
+    pass

+ 162 - 0
models/detectors/ctrnet/matcher.py

@@ -0,0 +1,162 @@
+# ------------------------------------------------------------------------------------------
+# This code referenced to https://github.com/open-mmlab/mmyolo/models/task_modules/assigners/batch_dsl_assigner.py
+# ------------------------------------------------------------------------------------------
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+from utils.box_ops import box_iou
+
+
+# -------------------------- Aligned SimOTA assigner --------------------------
+class AlignedSimOTA(object):
+    def __init__(self, num_classes, soft_center_radius=3.0, topk_candidates=13):
+        self.num_classes = num_classes
+        self.soft_center_radius = soft_center_radius
+        self.topk_candidates = topk_candidates
+
+    @torch.no_grad()
+    def __call__(self, 
+                 fpn_strides, 
+                 anchors, 
+                 pred_cls, 
+                 pred_box, 
+                 gt_labels,
+                 gt_bboxes):
+        # [M,]
+        strides = torch.cat([torch.ones_like(anchor_i[:, 0]) * stride_i
+                                for stride_i, anchor_i in zip(fpn_strides, anchors)], dim=-1)
+        # List[F, M, 2] -> [M, 2]
+        num_gt = len(gt_labels)
+        anchors = torch.cat(anchors, dim=0)
+
+        # check gt
+        if num_gt == 0 or gt_bboxes.max().item() == 0.:
+            return {
+                'assigned_labels': gt_labels.new_full(pred_cls[..., 0].shape,
+                                                      self.num_classes,
+                                                      dtype=torch.long),
+                'assigned_bboxes': gt_bboxes.new_full(pred_box.shape, 0),
+                'assign_metrics': gt_bboxes.new_full(pred_cls[..., 0].shape, 0)
+            }
+        
+        # get inside points: [N, M]
+        is_in_gt = self.find_inside_points(gt_bboxes, anchors)
+        valid_mask = is_in_gt.sum(dim=0) > 0  # [M,]
+
+        # ----------------------------------- soft center prior -----------------------------------
+        gt_center = (gt_bboxes[..., :2] + gt_bboxes[..., 2:]) / 2.0
+        distance = (anchors.unsqueeze(0) - gt_center.unsqueeze(1)
+                    ).pow(2).sum(-1).sqrt() / strides.unsqueeze(0)  # [N, M]
+        distance = distance * valid_mask.unsqueeze(0)
+        soft_center_prior = torch.pow(10, distance - self.soft_center_radius)
+
+        # ----------------------------------- regression cost -----------------------------------
+        pair_wise_ious, _ = box_iou(gt_bboxes, pred_box)  # [N, M]
+        pair_wise_ious_loss = -torch.log(pair_wise_ious + 1e-8) * 3.0
+
+        # ----------------------------------- classification cost -----------------------------------
+        ## select the predicted scores corresponded to the gt_labels
+        pairwise_pred_scores = pred_cls.permute(1, 0)  # [M, C] -> [C, M]
+        pairwise_pred_scores = pairwise_pred_scores[gt_labels.long(), :].float()   # [N, M]
+        ## scale factor
+        scale_factor = (pair_wise_ious - pairwise_pred_scores.sigmoid()).abs().pow(2.0)
+        ## cls cost
+        pair_wise_cls_loss = F.binary_cross_entropy_with_logits(
+            pairwise_pred_scores, pair_wise_ious,
+            reduction="none") * scale_factor # [N, M]
+            
+        del pairwise_pred_scores
+
+        ## foreground cost matrix
+        cost_matrix = pair_wise_cls_loss + pair_wise_ious_loss + soft_center_prior
+        max_pad_value = torch.ones_like(cost_matrix) * 1e9
+        cost_matrix = torch.where(valid_mask[None].repeat(num_gt, 1),   # [N, M]
+                                  cost_matrix, max_pad_value)
+
+        # ----------------------------------- dynamic label assignment -----------------------------------
+        matched_pred_ious, matched_gt_inds, fg_mask_inboxes = self.dynamic_k_matching(
+            cost_matrix, pair_wise_ious, num_gt)
+        del pair_wise_cls_loss, cost_matrix, pair_wise_ious, pair_wise_ious_loss
+
+        # -----------------------------------process assigned labels -----------------------------------
+        assigned_labels = gt_labels.new_full(pred_cls[..., 0].shape,
+                                             self.num_classes)  # [M,]
+        assigned_labels[fg_mask_inboxes] = gt_labels[matched_gt_inds].squeeze(-1)
+        assigned_labels = assigned_labels.long()  # [M,]
+
+        assigned_bboxes = gt_bboxes.new_full(pred_box.shape, 0)        # [M, 4]
+        assigned_bboxes[fg_mask_inboxes] = gt_bboxes[matched_gt_inds]  # [M, 4]
+
+        assign_metrics = gt_bboxes.new_full(pred_cls[..., 0].shape, 0) # [M, 4]
+        assign_metrics[fg_mask_inboxes] = matched_pred_ious            # [M, 4]
+
+        assigned_dict = dict(
+            assigned_labels=assigned_labels,
+            assigned_bboxes=assigned_bboxes,
+            assign_metrics=assign_metrics
+            )
+        
+        return assigned_dict
+
+    def find_inside_points(self, gt_bboxes, anchors):
+        """
+            gt_bboxes: Tensor -> [N, 2]
+            anchors:   Tensor -> [M, 2]
+        """
+        num_anchors = anchors.shape[0]
+        num_gt = gt_bboxes.shape[0]
+
+        anchors_expand = anchors.unsqueeze(0).repeat(num_gt, 1, 1)           # [N, M, 2]
+        gt_bboxes_expand = gt_bboxes.unsqueeze(1).repeat(1, num_anchors, 1)  # [N, M, 4]
+
+        # offset
+        lt = anchors_expand - gt_bboxes_expand[..., :2]
+        rb = gt_bboxes_expand[..., 2:] - anchors_expand
+        bbox_deltas = torch.cat([lt, rb], dim=-1)
+
+        is_in_gts = bbox_deltas.min(dim=-1).values > 0
+
+        return is_in_gts
+    
+    def dynamic_k_matching(self, cost_matrix, pairwise_ious, num_gt):
+        """Use IoU and matching cost to calculate the dynamic top-k positive
+        targets.
+
+        Args:
+            cost_matrix (Tensor): Cost matrix.
+            pairwise_ious (Tensor): Pairwise iou matrix.
+            num_gt (int): Number of gt.
+            valid_mask (Tensor): Mask for valid bboxes.
+        Returns:
+            tuple: matched ious and gt indexes.
+        """
+        matching_matrix = torch.zeros_like(cost_matrix, dtype=torch.uint8)
+        # select candidate topk ious for dynamic-k calculation
+        candidate_topk = min(self.topk_candidates, pairwise_ious.size(1))
+        topk_ious, _ = torch.topk(pairwise_ious, candidate_topk, dim=1)
+        # calculate dynamic k for each gt
+        dynamic_ks = torch.clamp(topk_ious.sum(1).int(), min=1)
+
+        # sorting the batch cost matirx is faster than topk
+        _, sorted_indices = torch.sort(cost_matrix, dim=1)
+        for gt_idx in range(num_gt):
+            topk_ids = sorted_indices[gt_idx, :dynamic_ks[gt_idx]]
+            matching_matrix[gt_idx, :][topk_ids] = 1
+
+        del topk_ious, dynamic_ks, topk_ids
+
+        prior_match_gt_mask = matching_matrix.sum(0) > 1
+        if prior_match_gt_mask.sum() > 0:
+            cost_min, cost_argmin = torch.min(
+                cost_matrix[:, prior_match_gt_mask], dim=0)
+            matching_matrix[:, prior_match_gt_mask] *= 0
+            matching_matrix[cost_argmin, prior_match_gt_mask] = 1
+
+        # get foreground mask inside box and center prior
+        fg_mask_inboxes = matching_matrix.sum(0) > 0
+        matched_pred_ious = (matching_matrix *
+                             pairwise_ious).sum(0)[fg_mask_inboxes]
+        matched_gt_inds = matching_matrix[:, fg_mask_inboxes].argmax(0)
+
+        return matched_pred_ious, matched_gt_inds, fg_mask_inboxes
+