美文网首页
MaskRCNN代码详解(Facebook官方Pytorch版本

MaskRCNN代码详解(Facebook官方Pytorch版本

作者: TiTiWung | 来源:发表于2019-05-30 08:45 被阅读0次

    MaskRCNN(Facebook官网Pytorch版本)

    Resnet部分

    首先来看有FPN的Resnet是如何搭建的,我们假设所使用的模型是ResnetTop5

    class ResNet(nn.Module):
        def __init__(self, cfg):
            super(ResNet, self).__init__()
    
            # If we want to use the cfg in forward(), then we should make a copy
            # of it and store it for later use:
            # self.cfg = cfg.clone()
    
            # Translate string names to implementations
            # stem_module = StemWithFixedBatchNorm,后面说明了,
            # 这个模块的意思是resnet的第一个卷积层所使用的batchnorm的四个参数都为常量
            # stage_specs = ResNet50FPNStagesTo5
            # 实际上,stage_specs是下面的这样一个tuple
            '''
            (StageSpec(index=1, block_count=3, return_features=True),
            StageSpec(index=2, block_count=4, return_features=True),
            StageSpec(index=3, block_count=6, return_features=True),
            StageSpec(index=4, block_count=3, return_features=True))
            '''
            # transformation_module = BottleneckWithFixedBatchNorm
            stem_module = _STEM_MODULES[cfg.MODEL.RESNETS.STEM_FUNC]
            stage_specs = _STAGE_SPECS[cfg.MODEL.BACKBONE.CONV_BODY]
            # transformation_module = BottleneckWithFixedBatchNorm
            transformation_module = _TRANSFORMATION_MODULES[cfg.MODEL.RESNETS.TRANS_FUNC]
    
            # Construct the stem module
            # 在这里的意思是resnet的第一个卷积层所使用的batchnorm的四个参数都为常量,
            # 相当于batchnorm层什么操作都没有做,只是乘了一个常数1
            self.stem = stem_module(cfg)
    
            # Constuct the specified ResNet stages
            num_groups = cfg.MODEL.RESNETS.NUM_GROUPS
            width_per_group = cfg.MODEL.RESNETS.WIDTH_PER_GROUP
            in_channels = cfg.MODEL.RESNETS.STEM_OUT_CHANNELS
            stage2_bottleneck_channels = num_groups * width_per_group
            stage2_out_channels = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS
            self.stages = []
            self.return_features = {}
            for stage_spec in stage_specs:
                name = "layer" + str(stage_spec.index)
                # stage2_relative_factor = 1,2,4,8
                stage2_relative_factor = 2 ** (stage_spec.index - 1)
                # bottleneck_channels = 64, 128, 256, 512
                bottleneck_channels = stage2_bottleneck_channels * stage2_relative_factor
                # out_channels = 256, 512, 1024, 2048
                out_channels = stage2_out_channels * stage2_relative_factor
                # stage_with_dcn = (False, False, False, False)
                stage_with_dcn = cfg.MODEL.RESNETS.STAGE_WITH_DCN[stage_spec.index -1]
                module = _make_stage(
                    transformation_module,
                    in_channels,
                    bottleneck_channels,
                    out_channels,
                    stage_spec.block_count,
                    num_groups,
                    cfg.MODEL.RESNETS.STRIDE_IN_1X1,
                    first_stride=int(stage_spec.index > 1) + 1,
                    dcn_config={
                        "stage_with_dcn": stage_with_dcn,
                        "with_modulated_dcn": cfg.MODEL.RESNETS.WITH_MODULATED_DCN,
                        "deformable_groups": cfg.MODEL.RESNETS.DEFORMABLE_GROUPS,
                    }
                )
                in_channels = out_channels
                self.add_module(name, module)
                self.stages.append(name)
                self.return_features[name] = stage_spec.return_features
    
            # Optionally freeze (requires_grad=False) parts of the backbone
            self._freeze_backbone(cfg.MODEL.BACKBONE.FREEZE_CONV_BODY_AT)
    
        # 相当于将Resnet的第一个卷积层,第二个卷积层及其配套的batchnorm等层的参数freeze
        def _freeze_backbone(self, freeze_at):
            if freeze_at < 0:
                return
            for stage_index in range(freeze_at):
                if stage_index == 0:
                    m = self.stem  # stage 0 is the stem
                else:
                    m = getattr(self, "layer" + str(stage_index))
                for p in m.parameters():
                    p.requires_grad = False
    
        def forward(self, x):
            outputs = []
            x = self.stem(x)
            for stage_name in self.stages:
                x = getattr(self, stage_name)(x)
                if self.return_features[stage_name]:
                    outputs.append(x)
            # 所返回的FPN是按照从大到小的feature map排列的
            return outputs
    

    上面所用到的_make_stage函数,实际上是按3,4,6,3个block来生成Resnet的结构

    def _make_stage(
        transformation_module,
        in_channels,
        bottleneck_channels,
        out_channels,
        block_count,
        num_groups,
        stride_in_1x1,
        first_stride,
        dilation=1,
        dcn_config={}
    ):
        blocks = []
        stride = first_stride
        for _ in range(block_count):
            blocks.append(
                transformation_module(
                    in_channels,
                    bottleneck_channels,
                    out_channels,
                    num_groups,
                    stride_in_1x1,
                    stride,
                    dilation=dilation,
                    dcn_config=dcn_config
                )
            )
            # stride = 1在这里表示在block_count中,只有第一个block做了降采样
            stride = 1
            in_channels = out_channels
        return nn.Sequential(*blocks)
    

    下面是Resnet的Bottleneck的具体实现,在这里有一个疑问就是为何所有的Batchnorm都使用FrozenBatchNorm2d,这岂不意味着Batchnorm基本没有起到什么作用吗?在Facebook的Github上有人提出了这个疑问,作者是这样解答的:
    The reason why we use FrozenBatchNorm2d instead of BatchNorm2d is that the sizes of the batches are very small, which makes the batch statistics very poor and degrades performance.
    Plus, when using multiple GPUs, the batch statistics are not accumulated from multiple devices, so that only a single GPU compute the statistics.

    class Bottleneck(nn.Module):
        def __init__(
            self,
            in_channels,
            bottleneck_channels,
            out_channels,
            num_groups,
            stride_in_1x1,
            stride,
            dilation,
            norm_func,
            dcn_config
        ):
            super(Bottleneck, self).__init__()
    
            self.downsample = None
            if in_channels != out_channels:
                down_stride = stride if dilation == 1 else 1
                self.downsample = nn.Sequential(
                    Conv2d(
                        in_channels, out_channels,
                        kernel_size=1, stride=down_stride, bias=False
                    ),
                    norm_func(out_channels),
                )
                for modules in [self.downsample,]:
                    for l in modules.modules():
                        if isinstance(l, Conv2d):
                            nn.init.kaiming_uniform_(l.weight, a=1)
    
            if dilation > 1:
                stride = 1 # reset to be 1
    
            # The original MSRA ResNet models have stride in the first 1x1 conv
            # The subsequent fb.torch.resnet and Caffe2 ResNe[X]t implementations have
            # stride in the 3x3 conv
            # 这里的意思是,在最开始提出的Resnet中,是将第一个1*1的卷积的步长为2来实现降采样的
            # 而在Facebook的实现中,是将第二个3*3的卷积的步长为2来实现降采样的
            stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride)
    
            self.conv1 = Conv2d(
                in_channels,
                bottleneck_channels,
                kernel_size=1,
                stride=stride_1x1,
                bias=False,
            )
            self.bn1 = norm_func(bottleneck_channels)
            # TODO: specify init for the above
            # 在我们所使用的Resnet版本中,既没有使用空洞卷积,也没有使用可分离卷积
            with_dcn = dcn_config.get("stage_with_dcn", False)
            if with_dcn:
                deformable_groups = dcn_config.get("deformable_groups", 1)
                with_modulated_dcn = dcn_config.get("with_modulated_dcn", False)
                self.conv2 = DFConv2d(
                    bottleneck_channels, 
                    bottleneck_channels, 
                    with_modulated_dcn=with_modulated_dcn, 
                    kernel_size=3, 
                    stride=stride_3x3, 
                    groups=num_groups,
                    dilation=dilation,
                    deformable_groups=deformable_groups,
                    bias=False
                )
            else:
                self.conv2 = Conv2d(
                    bottleneck_channels,
                    bottleneck_channels,
                    kernel_size=3,
                    stride=stride_3x3,
                    padding=dilation,
                    bias=False,
                    groups=num_groups,
                    dilation=dilation
                )
                nn.init.kaiming_uniform_(self.conv2.weight, a=1)
    
            self.bn2 = norm_func(bottleneck_channels)
    
            self.conv3 = Conv2d(
                bottleneck_channels, out_channels, kernel_size=1, bias=False
            )
            self.bn3 = norm_func(out_channels)
    
            for l in [self.conv1, self.conv3,]:
                nn.init.kaiming_uniform_(l.weight, a=1)
    
        def forward(self, x):
            identity = x
    
            out = self.conv1(x)
            out = self.bn1(out)
            out = F.relu_(out)
    
            out = self.conv2(out)
            out = self.bn2(out)
            out = F.relu_(out)
    
            out0 = self.conv3(out)
            out = self.bn3(out0)
    
            if self.downsample is not None:
                identity = self.downsample(x)
    
            out += identity
            out = F.relu_(out)
    
            return out
    

    FPN部分

    def build_resnet_fpn_backbone(cfg):
        body = resnet.ResNet(cfg)
        # in_channels_stage2 = 256, out_channels = 1024
        # 相当于FPN四层的feature map的channel统一都是1024
        in_channels_stage2 = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS
        out_channels = cfg.MODEL.RESNETS.BACKBONE_OUT_CHANNELS
        # in_channels_list = [256, 512, 1024, 2048]
        fpn = fpn_module.FPN(
            in_channels_list=[
                in_channels_stage2,
                in_channels_stage2 * 2,
                in_channels_stage2 * 4,
                in_channels_stage2 * 8,
            ],
            out_channels=out_channels,
            conv_block=conv_with_kaiming_uniform(
                cfg.MODEL.FPN.USE_GN, cfg.MODEL.FPN.USE_RELU
            ),
            top_blocks=fpn_module.LastLevelMaxPool(),
        )
        model = nn.Sequential(OrderedDict([("body", body), ("fpn", fpn)]))
        model.out_channels = out_channels
        return model
    
    class FPN(nn.Module):
        """
        Module that adds FPN on top of a list of feature maps.
        The feature maps are currently supposed to be in increasing depth
        order, and must be consecutive
        """
    
        def __init__(
            self, in_channels_list, out_channels, conv_block, top_blocks=None
        ):
            """
            Arguments:
                in_channels_list (list[int]): number of channels for each feature map that
                    will be fed
                out_channels (int): number of channels of the FPN representation
                top_blocks (nn.Module or None): if provided, an extra operation will
                    be performed on the output of the last (smallest resolution)
                    FPN output, and the result will extend the result list
            """
            super(FPN, self).__init__()
            self.inner_blocks = []
            self.layer_blocks = []
            # in_channels_list = [256, 512, 1024, 2048]
            for idx, in_channels in enumerate(in_channels_list, 1):
                inner_block = "fpn_inner{}".format(idx)
                layer_block = "fpn_layer{}".format(idx)
    
                if in_channels == 0:
                    continue
                inner_block_module = conv_block(in_channels, out_channels, 1)
                layer_block_module = conv_block(out_channels, out_channels, 3, 1)
                self.add_module(inner_block, inner_block_module)
                self.add_module(layer_block, layer_block_module)
                self.inner_blocks.append(inner_block)
                self.layer_blocks.append(layer_block)
            self.top_blocks = top_blocks
    
        def forward(self, x):
            """
            Arguments:
                x (list[Tensor]): feature maps for each feature level.
            Returns:
                results (tuple[Tensor]): feature maps after FPN layers.
                    They are ordered from highest resolution first.
            """
            last_inner = getattr(self, self.inner_blocks[-1])(x[-1])
            results = []
            results.append(getattr(self, self.layer_blocks[-1])(last_inner))
            for feature, inner_block, layer_block in zip(
                x[:-1][::-1], self.inner_blocks[:-1][::-1], self.layer_blocks[:-1][::-1]
            ):
                if not inner_block:
                    continue
                inner_top_down = F.interpolate(last_inner, scale_factor=2, mode="nearest")
                inner_lateral = getattr(self, inner_block)(feature)
                # TODO use size instead of scale to make it robust to different sizes
                # inner_top_down = F.upsample(last_inner, size=inner_lateral.shape[-2:],
                # mode='bilinear', align_corners=False)
                last_inner = inner_lateral + inner_top_down
                # 使用insert方法每次得到的feature map都插到最前面,
                # feature map的排列顺序依旧是尺寸从大到小
                results.insert(0, getattr(self, layer_block)(last_inner))
    
            if isinstance(self.top_blocks, LastLevelP6P7):
                last_results = self.top_blocks(x[-1], results[-1])
                results.extend(last_results)
            # 在这里对最后一层也就是尺寸最小的feature map做了最大池化
            elif isinstance(self.top_blocks, LastLevelMaxPool):
                last_results = self.top_blocks(results[-1])
                results.extend(last_results)
    
            return tuple(results)
    

    RPN部分

    RPN部分最终调用的就是下面RPNModule这个类

    class RPNModule(torch.nn.Module):
        """
        Module for RPN computation. Takes feature maps from the backbone and RPN
        proposals and losses. Works for both FPN and non-FPN.
        """
    
        def __init__(self, cfg, in_channels):
            super(RPNModule, self).__init__()
    
            self.cfg = cfg.clone()
    
            anchor_generator = make_anchor_generator(cfg)
            # cfg.MODEL.RPN.RPN_HEAD = 'SingleConvRPNHead'
            rpn_head = registry.RPN_HEADS[cfg.MODEL.RPN.RPN_HEAD]
            # 这里调用的就是下面的RPNHead这个类,anchor_generator.num_anchors_per_location()[0]
            # 的值固定为3,其意义见下面RPNHead类中num_anchors的说明
            head = rpn_head(
                cfg, in_channels, anchor_generator.num_anchors_per_location()[0]
            )
    
            rpn_box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
    
            box_selector_train = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=True)
            box_selector_test = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=False)
    
            loss_evaluator = make_rpn_loss_evaluator(cfg, rpn_box_coder)
    
            self.anchor_generator = anchor_generator
            self.head = head
            self.box_selector_train = box_selector_train
            self.box_selector_test = box_selector_test
            self.loss_evaluator = loss_evaluator
    
        def forward(self, images, features, targets=None):
            """
            Arguments:
                images (ImageList): images for which we want to compute the predictions
                features (list[Tensor]): features computed from the images that are
                    used for computing the predictions. Each tensor in the list
                    correspond to different feature levels
                targets (list[BoxList): ground-truth boxes present in the image (optional)
    
            Returns:
                boxes (list[BoxList]): the predicted boxes from the RPN, one BoxList per
                    image.
                losses (dict[Tensor]): the losses for the model during training. During
                    testing, it is an empty dict.
            """
            # 这里的objectness就是分类的预测值,长度为5,即5个feature map分别的到的预测值
            # rpn_box_regression就是预测的box的偏移量,shape与objectness除了通道维度乘以4,其它都一样
            objectness, rpn_box_regression = self.head(features)
            # 所产生的anchors与rpn_box_regression是一致的,只不过anchors将h与w合并了,所以维度少了两维
            anchors = self.anchor_generator(images, features)
    
            if self.training:
                return self._forward_train(anchors, objectness, rpn_box_regression, targets)
            else:
                return self._forward_test(anchors, objectness, rpn_box_regression)
    
        def _forward_train(self, anchors, objectness, rpn_box_regression, targets):
            # cfg.MODEL.RPN_ONLY = False
            if self.cfg.MODEL.RPN_ONLY:
                # When training an RPN-only model, the loss is determined by the
                # predicted objectness and rpn_box_regression values and there is
                # no need to transform the anchors into predicted boxes; this is an
                # optimization that avoids the unnecessary transformation.
                boxes = anchors
            else:
                # For end-to-end models, anchors must be transformed into boxes and
                # sampled into a training batch.
                with torch.no_grad():
                    # 因为求bbox的过程中只有筛选、nms等步骤,它们不需要梯度更新,因此
                    # 使用了torch.no_grad()来保证这些过程中的参数不发生变化
                    # box_selector_train最终调用了下面的RPNPostProcessor这个类
                    # box_selector_train在这里做了两件事情,一件是根据预测的
                    # objectness得分去除了一些得分小的框,另一方面是做了nms
                    boxes = self.box_selector_train(
                        anchors, objectness, rpn_box_regression, targets
                    )
            # rpn阶段loss的计算,由下面RPNLossComputation这个类来实现
            loss_objectness, loss_rpn_box_reg = self.loss_evaluator(
                anchors, objectness, rpn_box_regression, targets
            )
            losses = {
                "loss_objectness": loss_objectness,
                "loss_rpn_box_reg": loss_rpn_box_reg,
            }
            return boxes, losses
    
        def _forward_test(self, anchors, objectness, rpn_box_regression):
            boxes = self.box_selector_test(anchors, objectness, rpn_box_regression)
            if self.cfg.MODEL.RPN_ONLY:
                # For end-to-end models, the RPN proposals are an intermediate state
                # and don't bother to sort them in decreasing score order. For RPN-only
                # models, the proposals are the final output and we return them in
                # high-to-low confidence order.
                inds = [
                    box.get_field("objectness").sort(descending=True)[1] for box in boxes
                ]
                boxes = [box[ind] for box, ind in zip(boxes, inds)]
            return boxes, {}
    
    
    class RPNHead(nn.Module):
        """
        Adds a simple RPN Head with classification and regression heads
        """
    
        def __init__(self, cfg, in_channels, num_anchors):
            """
            Arguments:
                cfg              : config
                in_channels (int): number of channels of the input feature
                num_anchors (int): number of anchors to be predicted
            """
            super(RPNHead, self).__init__()
            self.conv = nn.Conv2d(
                in_channels, in_channels, kernel_size=3, stride=1, padding=1
            )
            # 这里num_anchors的值为3,意味着在每个点上的有三个尺寸不同的anchor,
            # 而且对于FPN的不同层的feature map都是如此
            self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
            self.bbox_pred = nn.Conv2d(
                in_channels, num_anchors * 4, kernel_size=1, stride=1
            )
    
            for l in [self.conv, self.cls_logits, self.bbox_pred]:
                torch.nn.init.normal_(l.weight, std=0.01)
                torch.nn.init.constant_(l.bias, 0)
    
        def forward(self, x):
            logits = []
            bbox_reg = []
            # 在具体得到cls_logits和bbox_pred的预测值之前,做了3*3的conv及relu
            # 然后使用1*1的卷积分别得到cls_logits和bbox_pred
            for feature in x:
                t = F.relu(self.conv(feature))
                logits.append(self.cls_logits(t))
                bbox_reg.append(self.bbox_pred(t))
            return logits, bbox_reg
    
    def make_rpn_postprocessor(config, rpn_box_coder, is_train):
        # fpn_post_nms_top_n = 2000
        fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TRAIN
        if not is_train:
            # fpn_post_nms_top_n = 2000
            fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TEST
        # pre_nms_top_n = 12000, post_nms_top_n = 2000
        pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TRAIN
        post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TRAIN
        if not is_train:
            # pre_nms_top_n = 6000, post_nms_top_n = 1000
            pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TEST
            post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TEST
        # fpn_post_nms_per_batch = True, nms_thresh = 0.7, min_size = 0
        fpn_post_nms_per_batch = config.MODEL.RPN.FPN_POST_NMS_PER_BATCH
        nms_thresh = config.MODEL.RPN.NMS_THRESH
        min_size = config.MODEL.RPN.MIN_SIZE
        box_selector = RPNPostProcessor(
            pre_nms_top_n=pre_nms_top_n,
            post_nms_top_n=post_nms_top_n,
            nms_thresh=nms_thresh,
            min_size=min_size,
            box_coder=rpn_box_coder,
            fpn_post_nms_top_n=fpn_post_nms_top_n,
            fpn_post_nms_per_batch=fpn_post_nms_per_batch,
        )
        return box_selector
    
    class RPNPostProcessor(torch.nn.Module):
        """
        Performs post-processing on the outputs of the RPN boxes, before feeding the
        proposals to the heads
        """
    
        def __init__(
            self,
            pre_nms_top_n,
            post_nms_top_n,
            nms_thresh,
            min_size,
            box_coder=None,
            fpn_post_nms_top_n=None,
            fpn_post_nms_per_batch=True,
        ):
            """
            Arguments:
                pre_nms_top_n (int)
                post_nms_top_n (int)
                nms_thresh (float)
                min_size (int)
                box_coder (BoxCoder)
                fpn_post_nms_top_n (int)
            """
            super(RPNPostProcessor, self).__init__()
            self.pre_nms_top_n = pre_nms_top_n
            self.post_nms_top_n = post_nms_top_n
            self.nms_thresh = nms_thresh
            self.min_size = min_size
    
            if box_coder is None:
                box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
            self.box_coder = box_coder
    
            if fpn_post_nms_top_n is None:
                fpn_post_nms_top_n = post_nms_top_n
            self.fpn_post_nms_top_n = fpn_post_nms_top_n
            self.fpn_post_nms_per_batch = fpn_post_nms_per_batch
    
        def add_gt_proposals(self, proposals, targets):
            """
            Arguments:
                proposals: list[BoxList]
                targets: list[BoxList]
            """
            # Get the device we're operating on
            device = proposals[0].bbox.device
    
            gt_boxes = [target.copy_with_fields([]) for target in targets]
    
            # later cat of bbox requires all fields to be present for all bbox
            # so we need to add a dummy for objectness that's missing
            for gt_box in gt_boxes:
                gt_box.add_field("objectness", torch.ones(len(gt_box), device=device))
    
            proposals = [
                cat_boxlist((proposal, gt_box))
                for proposal, gt_box in zip(proposals, gt_boxes)
            ]
    
            return proposals
    
        def forward_for_single_feature_map(self, anchors, objectness, box_regression):
            """
            Arguments:
                anchors: list[BoxList]
                objectness: tensor of size N, A, H, W
                box_regression: tensor of size N, A * 4, H, W
            """
            device = objectness.device
            N, A, H, W = objectness.shape
    
            # put in the same format as anchors
            objectness = permute_and_flatten(objectness, N, A, 1, H, W).view(N, -1)
            objectness = objectness.sigmoid()
    
            box_regression = permute_and_flatten(box_regression, N, A, 4, H, W)
    
            num_anchors = A * H * W
    
            pre_nms_top_n = min(self.pre_nms_top_n, num_anchors)
            # 筛选掉得分低的框在这里完成
            objectness, topk_idx = objectness.topk(pre_nms_top_n, dim=1, sorted=True)
    
            batch_idx = torch.arange(N, device=device)[:, None]
            box_regression = box_regression[batch_idx, topk_idx]
    
            image_shapes = [box.size for box in anchors]
            concat_anchors = torch.cat([a.bbox for a in anchors], dim=0)
            concat_anchors = concat_anchors.reshape(N, -1, 4)[batch_idx, topk_idx]
            # 在这里完成了根据预测出来的box的偏移量修正box的工作
            proposals = self.box_coder.decode(
                box_regression.view(-1, 4), concat_anchors.view(-1, 4)
            )
    
            proposals = proposals.view(N, -1, 4)
    
            result = []
            for proposal, score, im_shape in zip(proposals, objectness, image_shapes):
                boxlist = BoxList(proposal, im_shape, mode="xyxy")
                boxlist.add_field("objectness", score)
                boxlist = boxlist.clip_to_image(remove_empty=False)
                boxlist = remove_small_boxes(boxlist, self.min_size)
                # nms的步骤在这里完成,但是没有找到具体的nms的python代码,这里应该是5个feature map
                # 的每一层提取的box都会小于post_nms_top_n,对于训练是2000,对于测试是1000
                boxlist = boxlist_nms(
                    boxlist,
                    self.nms_thresh,
                    max_proposals=self.post_nms_top_n,
                    score_field="objectness",
                )
                result.append(boxlist)
            return result
    
        def forward(self, anchors, objectness, box_regression, targets=None):
            """
            Arguments:
                anchors: list[list[BoxList]]
                objectness: list[tensor]
                box_regression: list[tensor]
    
            Returns:
                boxlists (list[BoxList]): the post-processed anchors, after
                    applying box decoding and NMS
            """
            sampled_boxes = []
            num_levels = len(objectness)
            anchors = list(zip(*anchors))
            for a, o, b in zip(anchors, objectness, box_regression):
                sampled_boxes.append(self.forward_for_single_feature_map(a, o, b))
    
            boxlists = list(zip(*sampled_boxes))
            boxlists = [cat_boxlist(boxlist) for boxlist in boxlists]
            
            # 在这一步会进一步筛选掉得分小的box,留下的box的数量不超过fpn_post_nms_top_n,
            # 对于训练阶段是2000,测试阶段是1000
            if num_levels > 1:
                boxlists = self.select_over_all_levels(boxlists)
    
            # append ground-truth bboxes to proposals
            if self.training and targets is not None:
                boxlists = self.add_gt_proposals(boxlists, targets)
    
            return boxlists
    
        def select_over_all_levels(self, boxlists):
            num_images = len(boxlists)
            # different behavior during training and during testing:
            # during training, post_nms_top_n is over *all* the proposals combined, while
            # during testing, it is over the proposals for each image
            # NOTE: it should be per image, and not per batch. However, to be consistent 
            # with Detectron, the default is per batch (see Issue #672)
            if self.training and self.fpn_post_nms_per_batch:
                objectness = torch.cat(
                    [boxlist.get_field("objectness") for boxlist in boxlists], dim=0
                )
                box_sizes = [len(boxlist) for boxlist in boxlists]
                post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))
                _, inds_sorted = torch.topk(objectness, post_nms_top_n, dim=0, sorted=True)
                inds_mask = torch.zeros_like(objectness, dtype=torch.uint8)
                inds_mask[inds_sorted] = 1
                inds_mask = inds_mask.split(box_sizes)
                for i in range(num_images):
                    boxlists[i] = boxlists[i][inds_mask[i]]
            else:
                for i in range(num_images):
                    objectness = boxlists[i].get_field("objectness")
                    post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))
                    _, inds_sorted = torch.topk(
                        objectness, post_nms_top_n, dim=0, sorted=True
                    )
                    boxlists[i] = boxlists[i][inds_sorted]
            return boxlists
    

    RPN的loss部分

    class RPNLossComputation(object):
        """
        This class computes the RPN loss.
        """
    
        def __init__(self, proposal_matcher, fg_bg_sampler, box_coder,
                     generate_labels_func):
            """
            Arguments:
                proposal_matcher (Matcher)
                fg_bg_sampler (BalancedPositiveNegativeSampler)
                box_coder (BoxCoder)
            """
            # self.target_preparator = target_preparator
            self.proposal_matcher = proposal_matcher
            self.fg_bg_sampler = fg_bg_sampler
            self.box_coder = box_coder
            self.copied_fields = []
            self.generate_labels_func = generate_labels_func
            self.discard_cases = ['not_visibility', 'between_thresholds']
    
        def match_targets_to_anchors(self, anchor, target, copied_fields=[]):
            match_quality_matrix = boxlist_iou(target, anchor)
            matched_idxs = self.proposal_matcher(match_quality_matrix)
            # RPN doesn't need any fields from target
            # for creating the labels, so clear them all
            target = target.copy_with_fields(copied_fields)
            # get the targets corresponding GT for each anchor
            # NB: need to clamp the indices because we can have a single
            # GT in the image, and matched_idxs can be -2, which goes
            # out of bounds
            matched_targets = target[matched_idxs.clamp(min=0)]
            matched_targets.add_field("matched_idxs", matched_idxs)
            return matched_targets
    
        def prepare_targets(self, anchors, targets):
            labels = []
            regression_targets = []
            for anchors_per_image, targets_per_image in zip(anchors, targets):
                matched_targets = self.match_targets_to_anchors(
                    anchors_per_image, targets_per_image, self.copied_fields
                )
    
                matched_idxs = matched_targets.get_field("matched_idxs")
                labels_per_image = self.generate_labels_func(matched_targets)
                labels_per_image = labels_per_image.to(dtype=torch.float32)
    
                # Background (negative examples)
                # Matcher.BELOW_LOW_THRESHOLD = -1
                bg_indices = matched_idxs == Matcher.BELOW_LOW_THRESHOLD
                labels_per_image[bg_indices] = 0
    
                # discard anchors that go out of the boundaries of the image
                if "not_visibility" in self.discard_cases:
                    labels_per_image[~anchors_per_image.get_field("visibility")] = -1
    
                # discard indices that are between thresholds
                # Matcher.BETWEEN_THRESHOLDS = -2
                if "between_thresholds" in self.discard_cases:
                    inds_to_discard = matched_idxs == Matcher.BETWEEN_THRESHOLDS
                    labels_per_image[inds_to_discard] = -1
    
                # compute regression targets
                regression_targets_per_image = self.box_coder.encode(
                    matched_targets.bbox, anchors_per_image.bbox
                )
    
                labels.append(labels_per_image)
                regression_targets.append(regression_targets_per_image)
    
            return labels, regression_targets
    
    
        def __call__(self, anchors, objectness, box_regression, targets):
            """
            Arguments:
                anchors (list[BoxList])
                objectness (list[Tensor])
                box_regression (list[Tensor])
                targets (list[BoxList])
    
            Returns:
                objectness_loss (Tensor)
                box_loss (Tensor
            """
            anchors = [cat_boxlist(anchors_per_image) for anchors_per_image in anchors]
            # 根据所生成的所有的框与真实框来确定我们要的目标框和对应的label为哪些
            labels, regression_targets = self.prepare_targets(anchors, targets)
            sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
            sampled_pos_inds = torch.nonzero(torch.cat(sampled_pos_inds, dim=0)).squeeze(1)
            sampled_neg_inds = torch.nonzero(torch.cat(sampled_neg_inds, dim=0)).squeeze(1)
    
            sampled_inds = torch.cat([sampled_pos_inds, sampled_neg_inds], dim=0)
    
            objectness, box_regression = \
                    concat_box_prediction_layers(objectness, box_regression)
    
            objectness = objectness.squeeze()
    
            labels = torch.cat(labels, dim=0)
            regression_targets = torch.cat(regression_targets, dim=0)
    
            box_loss = smooth_l1_loss(
                box_regression[sampled_pos_inds],
                regression_targets[sampled_pos_inds],
                beta=1.0 / 9,
                size_average=False,
            ) / (sampled_inds.numel())
    
            objectness_loss = F.binary_cross_entropy_with_logits(
                objectness[sampled_inds], labels[sampled_inds]
            )
    
            return objectness_loss, box_loss
    
    

    box_head部分

    class CombinedROIHeads(torch.nn.ModuleDict):
        """
        Combines a set of individual heads (for box prediction or masks) into a single
        head.
        """
    
        def __init__(self, cfg, heads):
            super(CombinedROIHeads, self).__init__(heads)
            self.cfg = cfg.clone()
            if cfg.MODEL.MASK_ON and cfg.MODEL.ROI_MASK_HEAD.SHARE_BOX_FEATURE_EXTRACTOR:
                self.mask.feature_extractor = self.box.feature_extractor
            if cfg.MODEL.KEYPOINT_ON and cfg.MODEL.ROI_KEYPOINT_HEAD.SHARE_BOX_FEATURE_EXTRACTOR:
                self.keypoint.feature_extractor = self.box.feature_extractor
    
        def forward(self, features, proposals, targets=None):
            losses = {}
            # TODO rename x to roi_box_features, if it doesn't increase memory consumption
            # 这里的box即下面的ROIBoxHead类,它的输入features即FPN得到的feature,proposals
            # 为RPN输出(即经过nms,去除scores小的部分,经过decode过程得到的修正的框)
            x, detections, loss_box = self.box(features, proposals, targets)
            losses.update(loss_box)
            if self.cfg.MODEL.MASK_ON:
                mask_features = features
                # optimization: during training, if we share the feature extractor between
                # the box and the mask heads, then we can reuse the features already computed
                if (
                    self.training
                    and self.cfg.MODEL.ROI_MASK_HEAD.SHARE_BOX_FEATURE_EXTRACTOR
                ):
                    mask_features = x
                # During training, self.box() will return the unaltered proposals as "detections"
                # this makes the API consistent during training and testing
                x, detections, loss_mask = self.mask(mask_features, detections, targets)
                losses.update(loss_mask)
    
            if self.cfg.MODEL.KEYPOINT_ON:
                keypoint_features = features
                # optimization: during training, if we share the feature extractor between
                # the box and the mask heads, then we can reuse the features already computed
                if (
                    self.training
                    and self.cfg.MODEL.ROI_KEYPOINT_HEAD.SHARE_BOX_FEATURE_EXTRACTOR
                ):
                    keypoint_features = x
                # During training, self.box() will return the unaltered proposals as "detections"
                # this makes the API consistent during training and testing
                x, detections, loss_keypoint = self.keypoint(keypoint_features, detections, targets)
                losses.update(loss_keypoint)
            return x, detections, losses
    
    class ROIBoxHead(torch.nn.Module):
        """
        Generic Box Head class.
        """
    
        def __init__(self, cfg, in_channels):
            super(ROIBoxHead, self).__init__()
            self.feature_extractor = make_roi_box_feature_extractor(cfg, in_channels)
            self.predictor = make_roi_box_predictor(
                cfg, self.feature_extractor.out_channels)
            self.post_processor = make_roi_box_post_processor(cfg)
            self.loss_evaluator = make_roi_box_loss_evaluator(cfg)
    
        def forward(self, features, proposals, targets=None):
            """
            Arguments:
                features (list[Tensor]): feature-maps from possibly several levels
                proposals (list[BoxList]): proposal boxes
                targets (list[BoxList], optional): the ground-truth targets.
    
            Returns:
                x (Tensor): the result of the feature extractor
                proposals (list[BoxList]): during training, the subsampled proposals
                    are returned. During testing, the predicted boxlists are returned
                losses (dict[Tensor]): During training, returns the losses for the
                    head. During testing, returns an empty dict.
            """
    
            if self.training:
                # Faster R-CNN subsamples during training the proposals with a fixed
                # positive / negative ratio
                with torch.no_grad():
                    # 以固定的正负样本比例挑选出相应的proposals
                    proposals = self.loss_evaluator.subsample(proposals, targets)
    
            # extract features that will be fed to the final classifier. The
            # feature_extractor generally corresponds to the pooler + heads
            x = self.feature_extractor(features, proposals)
            # final classifier that converts the features into predictions
            class_logits, box_regression = self.predictor(x)
    
            if not self.training:
                result = self.post_processor((class_logits, box_regression), proposals)
                return x, result, {}
    
            loss_classifier, loss_box_reg = self.loss_evaluator(
                [class_logits], [box_regression]
            )
            return (
                x,
                proposals,
                dict(loss_classifier=loss_classifier, loss_box_reg=loss_box_reg),
            )
    
    class FastRCNNLossComputation(object):
        """
        Computes the loss for Faster R-CNN.
        Also supports FPN
        """
    
        def __init__(
            self, 
            proposal_matcher, 
            fg_bg_sampler, 
            box_coder, 
            cls_agnostic_bbox_reg=False
        ):
            """
            Arguments:
                proposal_matcher (Matcher)
                fg_bg_sampler (BalancedPositiveNegativeSampler)
                box_coder (BoxCoder)
            """
            self.proposal_matcher = proposal_matcher
            self.fg_bg_sampler = fg_bg_sampler
            self.box_coder = box_coder
            self.cls_agnostic_bbox_reg = cls_agnostic_bbox_reg
    
        def match_targets_to_proposals(self, proposal, target):
            match_quality_matrix = boxlist_iou(target, proposal)
            # 这一步的筛选前背景见下面的Matcher类
            matched_idxs = self.proposal_matcher(match_quality_matrix)
            # Fast RCNN only need "labels" field for selecting the targets
            target = target.copy_with_fields("labels")
            # get the targets corresponding GT for each proposal
            # NB: need to clamp the indices because we can have a single
            # GT in the image, and matched_idxs can be -2, which goes
            # out of bounds
            matched_targets = target[matched_idxs.clamp(min=0)]
            matched_targets.add_field("matched_idxs", matched_idxs)
            return matched_targets
    
        def prepare_targets(self, proposals, targets):
            labels = []
            regression_targets = []
            for proposals_per_image, targets_per_image in zip(proposals, targets):
                matched_targets = self.match_targets_to_proposals(
                    proposals_per_image, targets_per_image
                )
                matched_idxs = matched_targets.get_field("matched_idxs")
    
                labels_per_image = matched_targets.get_field("labels")
                labels_per_image = labels_per_image.to(dtype=torch.int64)
    
                # Label background (below the low threshold)
                # BELOW_LOW_THRESHOLD = -1, BETWEEN_THRESHOLDS = -2
                bg_inds = matched_idxs == Matcher.BELOW_LOW_THRESHOLD
                labels_per_image[bg_inds] = 0
    
                # Label ignore proposals (between low and high thresholds)
                ignore_inds = matched_idxs == Matcher.BETWEEN_THRESHOLDS
                labels_per_image[ignore_inds] = -1  # -1 is ignored by sampler
    
                # compute regression targets
                regression_targets_per_image = self.box_coder.encode(
                    matched_targets.bbox, proposals_per_image.bbox
                )
    
                labels.append(labels_per_image)
                regression_targets.append(regression_targets_per_image)
    
            return labels, regression_targets
    
        def subsample(self, proposals, targets):
            """
            This method performs the positive/negative sampling, and return
            the sampled proposals.
            Note: this function keeps a state.
    
            Arguments:
                proposals (list[BoxList])
                targets (list[BoxList])
            """
    
            # 这一步通过计算由RPN得到的proposals与gt的交并比,进一步将proposals分为了前景和背景
            # 并筛去了既非前景也非背景的框(但这里前景背景默认值都为0.5,实际上相当于没有筛去)
            # 这里得到的regression_targets是又经过了encode的结果
            # 而这里正样本的labels则全部是gt的label,见match_targets_to_proposals这个函数的
            # target = target.copy_with_fields("labels")这条语句
            labels, regression_targets = self.prepare_targets(proposals, targets)
            # 这里的fg_bg_sampler见下面的BalancedPositiveNegativeSampler类,
            # 实际上是做了正负样本的均衡,1:3
            sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
    
            proposals = list(proposals)
            # add corresponding label and regression_targets information to the bounding boxes
            for labels_per_image, regression_targets_per_image, proposals_per_image in zip(
                labels, regression_targets, proposals
            ):
                proposals_per_image.add_field("labels", labels_per_image)
                proposals_per_image.add_field(
                    "regression_targets", regression_targets_per_image
                )
    
            # distributed sampled proposals, that were obtained on all feature maps
            # concatenated via the fg_bg_sampler, into individual feature map levels
            for img_idx, (pos_inds_img, neg_inds_img) in enumerate(
                zip(sampled_pos_inds, sampled_neg_inds)
            ):
                img_sampled_inds = torch.nonzero(pos_inds_img | neg_inds_img).squeeze(1)
                proposals_per_image = proposals[img_idx][img_sampled_inds]
                proposals[img_idx] = proposals_per_image
    
            self._proposals = proposals
            return proposals
    
        def __call__(self, class_logits, box_regression):
            """
            Computes the loss for Faster R-CNN.
            This requires that the subsample method has been called beforehand.
    
            Arguments:
                class_logits (list[Tensor])
                box_regression (list[Tensor])
    
            Returns:
                classification_loss (Tensor)
                box_loss (Tensor)
            """
    
            class_logits = cat(class_logits, dim=0)
            box_regression = cat(box_regression, dim=0)
            device = class_logits.device
    
            if not hasattr(self, "_proposals"):
                raise RuntimeError("subsample needs to be called before")
    
            proposals = self._proposals
    
            labels = cat([proposal.get_field("labels") for proposal in proposals], dim=0)
            regression_targets = cat(
                [proposal.get_field("regression_targets") for proposal in proposals], dim=0
            )
    
            classification_loss = F.cross_entropy(class_logits, labels)
    
            # get indices that correspond to the regression targets for
            # the corresponding ground truth labels, to be used with
            # advanced indexing
            sampled_pos_inds_subset = torch.nonzero(labels > 0).squeeze(1)
            labels_pos = labels[sampled_pos_inds_subset]
            if self.cls_agnostic_bbox_reg:
                map_inds = torch.tensor([4, 5, 6, 7], device=device)
            else:
                map_inds = 4 * labels_pos[:, None] + torch.tensor(
                    [0, 1, 2, 3], device=device)
    
            box_loss = smooth_l1_loss(
                box_regression[sampled_pos_inds_subset[:, None], map_inds],
                regression_targets[sampled_pos_inds_subset],
                size_average=False,
                beta=1,
            )
            box_loss = box_loss / labels.numel()
    
            return classification_loss, box_loss
    
    class Matcher(object):
        """
        This class assigns to each predicted "element" (e.g., a box) a ground-truth
        element. Each predicted element will have exactly zero or one matches; each
        ground-truth element may be assigned to zero or more predicted elements.
    
        Matching is based on the MxN match_quality_matrix, that characterizes how well
        each (ground-truth, predicted)-pair match. For example, if the elements are
        boxes, the matrix may contain box IoU overlap values.
    
        The matcher returns a tensor of size N containing the index of the ground-truth
        element m that matches to prediction n. If there is no match, a negative value
        is returned.
        """
    
        BELOW_LOW_THRESHOLD = -1
        BETWEEN_THRESHOLDS = -2
    
        def __init__(self, high_threshold, low_threshold, allow_low_quality_matches=False):
            """
            Args:
                high_threshold (float): quality values greater than or equal to
                    this value are candidate matches.
                low_threshold (float): a lower quality threshold used to stratify
                    matches into three levels:
                    1) matches >= high_threshold
                    2) BETWEEN_THRESHOLDS matches in [low_threshold, high_threshold)
                    3) BELOW_LOW_THRESHOLD matches in [0, low_threshold)
                allow_low_quality_matches (bool): if True, produce additional matches
                    for predictions that have only low-quality match candidates. See
                    set_low_quality_matches_ for more details.
            """
            assert low_threshold <= high_threshold
            self.high_threshold = high_threshold
            self.low_threshold = low_threshold
            self.allow_low_quality_matches = allow_low_quality_matches
    
        def __call__(self, match_quality_matrix):
            """
            Args:
                match_quality_matrix (Tensor[float]): an MxN tensor, containing the
                pairwise quality between M ground-truth elements and N predicted elements.
    
            Returns:
                matches (Tensor[int64]): an N tensor where N[i] is a matched gt in
                [0, M - 1] or a negative value indicating that prediction i could not
                be matched.
            """
            if match_quality_matrix.numel() == 0:
                # empty targets or proposals not supported during training
                if match_quality_matrix.shape[0] == 0:
                    raise ValueError(
                        "No ground-truth boxes available for one of the images "
                        "during training")
                else:
                    raise ValueError(
                        "No proposal boxes available for one of the images "
                        "during training")
    
            # match_quality_matrix is M (gt) x N (predicted)
            # Max over gt elements (dim 0) to find best gt candidate for each prediction
            matched_vals, matches = match_quality_matrix.max(dim=0)
            if self.allow_low_quality_matches:
                all_matches = matches.clone()
            # 这里没有对正样本做筛选
            # Assign candidate matches with low quality to negative (unassigned) values
            below_low_threshold = matched_vals < self.low_threshold
            between_thresholds = (matched_vals >= self.low_threshold) & (
                matched_vals < self.high_threshold
            )
            matches[below_low_threshold] = Matcher.BELOW_LOW_THRESHOLD
            matches[between_thresholds] = Matcher.BETWEEN_THRESHOLDS
    
            if self.allow_low_quality_matches:
                self.set_low_quality_matches_(matches, all_matches, match_quality_matrix)
    
            return matches
    
        def set_low_quality_matches_(self, matches, all_matches, match_quality_matrix):
            """
            Produce additional matches for predictions that have only low-quality matches.
            Specifically, for each ground-truth find the set of predictions that have
            maximum overlap with it (including ties); for each prediction in that set, if
            it is unmatched, then match it to the ground-truth with which it has the highest
            quality value.
            """
            # For each gt, find the prediction with which it has highest quality
            highest_quality_foreach_gt, _ = match_quality_matrix.max(dim=1)
            # Find highest quality match available, even if it is low, including ties
            gt_pred_pairs_of_highest_quality = torch.nonzero(
                match_quality_matrix == highest_quality_foreach_gt[:, None]
            )
            # Example gt_pred_pairs_of_highest_quality:
            #   tensor([[    0, 39796],
            #           [    1, 32055],
            #           [    1, 32070],
            #           [    2, 39190],
            #           [    2, 40255],
            #           [    3, 40390],
            #           [    3, 41455],
            #           [    4, 45470],
            #           [    5, 45325],
            #           [    5, 46390]])
            # Each row is a (gt index, prediction index)
            # Note how gt items 1, 2, 3, and 5 each have two ties
    
            pred_inds_to_update = gt_pred_pairs_of_highest_quality[:, 1]
            matches[pred_inds_to_update] = all_matches[pred_inds_to_update]
    
    class BalancedPositiveNegativeSampler(object):
        """
        This class samples batches, ensuring that they contain a fixed proportion of positives
        """
        # positive_fraction = cfg.MODEL.ROI_HEADS.POSITIVE_FRACTION = 0.25,
        # 实际上相当于正负样本比例为1:3
        def __init__(self, batch_size_per_image, positive_fraction):
            """
            Arguments:
                batch_size_per_image (int): number of elements to be selected per image
                positive_fraction (float): percentace of positive elements per batch
            """
            self.batch_size_per_image = batch_size_per_image
            self.positive_fraction = positive_fraction
    
        def __call__(self, matched_idxs):
            """
            Arguments:
                matched idxs: list of tensors containing -1, 0 or positive values.
                    Each tensor corresponds to a specific image.
                    -1 values are ignored, 0 are considered as negatives and > 0 as
                    positives.
    
            Returns:
                pos_idx (list[tensor])
                neg_idx (list[tensor])
    
            Returns two lists of binary masks for each image.
            The first list contains the positive elements that were selected,
            and the second list the negative example.
            """
            pos_idx = []
            neg_idx = []
            for matched_idxs_per_image in matched_idxs:
                positive = torch.nonzero(matched_idxs_per_image >= 1).squeeze(1)
                negative = torch.nonzero(matched_idxs_per_image == 0).squeeze(1)
    
                num_pos = int(self.batch_size_per_image * self.positive_fraction)
                # protect against not enough positive examples
                num_pos = min(positive.numel(), num_pos)
                num_neg = self.batch_size_per_image - num_pos
                # protect against not enough negative examples
                num_neg = min(negative.numel(), num_neg)
    
                # randomly select positive and negative examples
                perm1 = torch.randperm(positive.numel(), device=positive.device)[:num_pos]
                perm2 = torch.randperm(negative.numel(), device=negative.device)[:num_neg]
    
                pos_idx_per_image = positive[perm1]
                neg_idx_per_image = negative[perm2]
    
                # create binary mask from indices
                pos_idx_per_image_mask = torch.zeros_like(
                    matched_idxs_per_image, dtype=torch.uint8
                )
                neg_idx_per_image_mask = torch.zeros_like(
                    matched_idxs_per_image, dtype=torch.uint8
                )
                pos_idx_per_image_mask[pos_idx_per_image] = 1
                neg_idx_per_image_mask[neg_idx_per_image] = 1
    
                pos_idx.append(pos_idx_per_image_mask)
                neg_idx.append(neg_idx_per_image_mask)
    
            return pos_idx, neg_idx
    

    相关文章

      网友评论

          本文标题:MaskRCNN代码详解(Facebook官方Pytorch版本

          本文链接:https://www.haomeiwen.com/subject/mtnsoqtx.html