# --------------- Torch components --------------- import torch import torch.nn as nn # --------------- Model components --------------- from .rtcdet_backbone import build_backbone from .rtcdet_neck import build_neck from .rtcdet_pafpn import build_fpn from .rtcdet_head import build_head from .rtcdet_pred import build_pred # --------------- External components --------------- from utils.misc import multiclass_nms # Real-time Convolutional General Object Detector class RTCDet(nn.Module): def __init__(self, cfg, device, num_classes = 20, conf_thresh = 0.01, nms_thresh = 0.5, topk = 1000, trainable = False, deploy = False, no_multi_labels = False, nms_class_agnostic = False, ): super(RTCDet, self).__init__() # ---------------- Basic Parameters ---------------- self.cfg = cfg self.device = device self.strides = cfg['stride'] self.num_classes = num_classes self.trainable = trainable self.conf_thresh = conf_thresh self.nms_thresh = nms_thresh self.num_levels = len(self.strides) self.num_classes = num_classes self.topk_candidates = topk self.deploy = deploy self.no_multi_labels = no_multi_labels self.nms_class_agnostic = nms_class_agnostic self.head_dim = round(256 * cfg['width']) # ---------------- Network Parameters ---------------- ## ----------- Backbone ----------- self.backbone, feat_dims = build_backbone(cfg, pretrained=cfg['bk_pretrained']&trainable) ## ----------- Neck: SPP ----------- self.neck = build_neck(cfg, feat_dims[-1], feat_dims[-1]) feat_dims[-1] = self.neck.out_dim ## ----------- Neck: FPN ----------- self.fpn = build_fpn(cfg, feat_dims, out_dim=self.head_dim) self.fpn_dims = self.fpn.out_dim ## ----------- Head ----------- self.head = build_head(cfg, self.fpn_dims, self.head_dim, self.num_levels) ## ----------- Pred ----------- self.pred = build_pred(self.head_dim, self.head_dim, self.strides, num_classes, 4, self.num_levels) # Post process def post_process(self, cls_preds, box_preds): """ Input: cls_preds: List[np.array] -> [[M, C], ...] box_preds: List[np.array] -> [[M, 4], ...] Output: bboxes: np.array -> [N, 4] scores: np.array -> [N,] labels: np.array -> [N,] """ assert len(cls_preds) == self.num_levels all_scores = [] all_labels = [] all_bboxes = [] for cls_pred_i, box_pred_i in zip(cls_preds, box_preds): cls_pred_i = cls_pred_i[0] box_pred_i = box_pred_i[0] if self.no_multi_labels: # [M,] scores, labels = torch.max(cls_pred_i.sigmoid(), dim=1) # Keep top k top scoring indices only. num_topk = min(self.topk_candidates, box_pred_i.size(0)) # topk candidates predicted_prob, topk_idxs = scores.sort(descending=True) topk_scores = predicted_prob[:num_topk] topk_idxs = topk_idxs[:num_topk] # filter out the proposals with low confidence score keep_idxs = topk_scores > self.conf_thresh scores = topk_scores[keep_idxs] topk_idxs = topk_idxs[keep_idxs] labels = labels[topk_idxs] bboxes = box_pred_i[topk_idxs] else: # [M, C] -> [MC,] scores_i = cls_pred_i.sigmoid().flatten() # Keep top k top scoring indices only. num_topk = min(self.topk_candidates, box_pred_i.size(0)) # torch.sort is actually faster than .topk (at least on GPUs) predicted_prob, topk_idxs = scores_i.sort(descending=True) topk_scores = predicted_prob[:num_topk] topk_idxs = topk_idxs[:num_topk] # filter out the proposals with low confidence score keep_idxs = topk_scores > self.conf_thresh scores = topk_scores[keep_idxs] topk_idxs = topk_idxs[keep_idxs] anchor_idxs = torch.div(topk_idxs, self.num_classes, rounding_mode='floor') labels = topk_idxs % self.num_classes bboxes = box_pred_i[anchor_idxs] all_scores.append(scores) all_labels.append(labels) all_bboxes.append(bboxes) scores = torch.cat(all_scores, dim=0) labels = torch.cat(all_labels, dim=0) bboxes = torch.cat(all_bboxes, dim=0) # to cpu & numpy scores = scores.cpu().numpy() labels = labels.cpu().numpy() bboxes = bboxes.cpu().numpy() # nms scores, labels, bboxes = multiclass_nms( scores, labels, bboxes, self.nms_thresh, self.num_classes, self.nms_class_agnostic) return bboxes, scores, labels def forward_det_task(self, x): # ---------------- Heads ---------------- outputs = self.head['det'](x) # ---------------- Post-process ---------------- if self.trainable: return outputs else: all_cls_preds = outputs['pred_cls'] all_box_preds = outputs['pred_box'] if self.deploy: cls_preds = torch.cat(all_cls_preds, dim=1)[0] box_preds = torch.cat(all_box_preds, dim=1)[0] scores = cls_preds.sigmoid() bboxes = box_preds # [n_anchors_all, 4 + C] outputs = torch.cat([bboxes, scores], dim=-1) return outputs else: # post process bboxes, scores, labels = self.post_process(all_cls_preds, all_box_preds) return bboxes, scores, labels # Main process def forward(self, x): # ---------------- Backbone ---------------- pyramid_feats = self.backbone(x) # ---------------- Neck: SPP ---------------- pyramid_feats[-1] = self.neck(pyramid_feats[-1]) # ---------------- Neck: PaFPN ---------------- pyramid_feats = self.fpn(pyramid_feats) # ---------------- Head ---------------- pyramid_feats = self.head(pyramid_feats) # ---------------- Pred ---------------- outputs = self.pred(pyramid_feats) # ---------------- Post-process ---------------- if self.trainable: return outputs else: all_cls_preds = outputs['pred_cls'] all_box_preds = outputs['pred_box'] if self.deploy: cls_preds = torch.cat(all_cls_preds, dim=1)[0] box_preds = torch.cat(all_box_preds, dim=1)[0] scores = cls_preds.sigmoid() bboxes = box_preds # [n_anchors_all, 4 + C] outputs = torch.cat([bboxes, scores], dim=-1) return outputs else: # post process bboxes, scores, labels = self.post_process(all_cls_preds, all_box_preds) return bboxes, scores, labels