| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- # Real-time Convolutional Object Detector
- # --------------- Torch components ---------------
- import torch
- import torch.nn as nn
- # --------------- Model components ---------------
- from .rtcdet_backbone import build_backbone
- from .rtcdet_neck import build_neck
- from .rtcdet_pafpn import build_fpn
- from .rtcdet_head import build_det_head, build_seg_head, build_pose_head
- from .rtcdet_pred import build_det_pred, build_seg_pred, build_pose_pred
- # --------------- External components ---------------
- from utils.misc import multiclass_nms
- # Real-time Convolutional General Object Detector
- class RTCDet(nn.Module):
- def __init__(self,
- cfg,
- device,
- num_classes = 20,
- conf_thresh = 0.01,
- nms_thresh = 0.5,
- topk = 1000,
- trainable = False,
- deploy = False,
- no_multi_labels = False,
- nms_class_agnostic = False,
- ):
- super(RTCDet, self).__init__()
- # ---------------- Basic Parameters ----------------
- self.cfg = cfg
- self.device = device
- self.strides = cfg['stride']
- self.num_classes = num_classes
- self.trainable = trainable
- self.conf_thresh = conf_thresh
- self.nms_thresh = nms_thresh
- self.num_levels = len(self.strides)
- self.num_classes = num_classes
- self.topk_candidates = topk
- self.deploy = deploy
- self.no_multi_labels = no_multi_labels
- self.nms_class_agnostic = nms_class_agnostic
- self.head_dim = round(256 * cfg['width'])
-
- # ---------------- Network Parameters ----------------
- ## ----------- Backbone -----------
- self.backbone, feat_dims = build_backbone(cfg, pretrained=cfg['bk_pretrained']&trainable)
- ## ----------- Neck: SPP -----------
- self.neck = build_neck(cfg, feat_dims[-1], feat_dims[-1])
- feat_dims[-1] = self.neck.out_dim
-
- ## ----------- Neck: FPN -----------
- self.fpn = build_fpn(cfg, feat_dims, out_dim=self.head_dim)
- self.fpn_dims = self.fpn.out_dim
- ## ----------- Head -----------
- self.det_head = nn.Sequential(
- build_det_head(cfg['det_head'], self.fpn_dims, self.head_dim, self.num_levels),
- build_det_pred(self.head_dim, self.head_dim, self.strides, num_classes, 4, self.num_levels)
- )
- self.seg_head = nn.Sequential(
- build_seg_head(cfg['seg_head']),
- build_seg_pred()
- ) if cfg['seg_head']['name'] is not None else None
- self.pos_head = nn.Sequential(
- build_pose_head(cfg['pos_head']),
- build_pose_pred()
- ) if cfg['pos_head']['name'] is not None else None
- # Post process
- def post_process(self, cls_preds, box_preds):
- """
- Input:
- cls_preds: List[np.array] -> [[M, C], ...]
- box_preds: List[np.array] -> [[M, 4], ...]
- Output:
- bboxes: np.array -> [N, 4]
- scores: np.array -> [N,]
- labels: np.array -> [N,]
- """
- assert len(cls_preds) == self.num_levels
- all_scores = []
- all_labels = []
- all_bboxes = []
-
- for cls_pred_i, box_pred_i in zip(cls_preds, box_preds):
- cls_pred_i = cls_pred_i[0]
- box_pred_i = box_pred_i[0]
- if self.no_multi_labels:
- # [M,]
- scores, labels = torch.max(cls_pred_i.sigmoid(), dim=1)
- # Keep top k top scoring indices only.
- num_topk = min(self.topk_candidates, box_pred_i.size(0))
- # topk candidates
- predicted_prob, topk_idxs = scores.sort(descending=True)
- topk_scores = predicted_prob[:num_topk]
- topk_idxs = topk_idxs[:num_topk]
- # filter out the proposals with low confidence score
- keep_idxs = topk_scores > self.conf_thresh
- scores = topk_scores[keep_idxs]
- topk_idxs = topk_idxs[keep_idxs]
- labels = labels[topk_idxs]
- bboxes = box_pred_i[topk_idxs]
- else:
- # [M, C] -> [MC,]
- scores_i = cls_pred_i.sigmoid().flatten()
- # Keep top k top scoring indices only.
- num_topk = min(self.topk_candidates, box_pred_i.size(0))
- # torch.sort is actually faster than .topk (at least on GPUs)
- predicted_prob, topk_idxs = scores_i.sort(descending=True)
- topk_scores = predicted_prob[:num_topk]
- topk_idxs = topk_idxs[:num_topk]
- # filter out the proposals with low confidence score
- keep_idxs = topk_scores > self.conf_thresh
- scores = topk_scores[keep_idxs]
- topk_idxs = topk_idxs[keep_idxs]
- anchor_idxs = torch.div(topk_idxs, self.num_classes, rounding_mode='floor')
- labels = topk_idxs % self.num_classes
- bboxes = box_pred_i[anchor_idxs]
- all_scores.append(scores)
- all_labels.append(labels)
- all_bboxes.append(bboxes)
- scores = torch.cat(all_scores, dim=0)
- labels = torch.cat(all_labels, dim=0)
- bboxes = torch.cat(all_bboxes, dim=0)
- # to cpu & numpy
- scores = scores.cpu().numpy()
- labels = labels.cpu().numpy()
- bboxes = bboxes.cpu().numpy()
- # nms
- scores, labels, bboxes = multiclass_nms(
- scores, labels, bboxes, self.nms_thresh, self.num_classes, self.nms_class_agnostic)
- return bboxes, scores, labels
-
- # Main process
- def forward(self, x):
- # ---------------- Backbone ----------------
- pyramid_feats = self.backbone(x)
- # ---------------- Neck: SPP ----------------
- pyramid_feats[-1] = self.neck(pyramid_feats[-1])
- # ---------------- Neck: PaFPN ----------------
- pyramid_feats = self.fpn(pyramid_feats)
- # ---------------- Head ----------------
- det_outpus = self.forward_det_head(pyramid_feats)
- seg_outpus = self.forward_seg_head(pyramid_feats)
- pos_outpus = self.forward_pos_head(pyramid_feats)
- outputs = {
- 'det_outputs': det_outpus,
- 'seg_outputs': seg_outpus,
- 'pos_outputs': pos_outpus
- }
- if not self.trainable:
- if seg_outpus is not None:
- det_outpus.update(seg_outpus)
- if pos_outpus is not None:
- det_outpus.update(pos_outpus)
- outputs = det_outpus
-
- else:
- outputs = {
- 'det_outputs': det_outpus,
- 'seg_outputs': seg_outpus,
- 'pos_outputs': pos_outpus
- }
- return outputs
- def forward_det_head(self, x):
- # ---------------- Heads ----------------
- outputs = self.det_head(x)
- # ---------------- Post-process ----------------
- if not self.trainable:
- all_cls_preds = outputs['pred_cls']
- all_box_preds = outputs['pred_box']
- if self.deploy:
- cls_preds = torch.cat(all_cls_preds, dim=1)[0]
- box_preds = torch.cat(all_box_preds, dim=1)[0]
- scores = cls_preds.sigmoid()
- bboxes = box_preds
- # [n_anchors_all, 4 + C]
- outputs = torch.cat([bboxes, scores], dim=-1)
- else:
- # post process
- bboxes, scores, labels = self.post_process(all_cls_preds, all_box_preds)
- outputs = {
- "scores": scores,
- "labels": labels,
- "bboxes": bboxes
- }
-
- return outputs
- def forward_seg_head(self, x):
- if self.seg_head is None:
- return None
-
- def forward_pos_head(self, x):
- if self.pos_head is None:
- return None
|