大创项目推荐 深度学习的口罩佩戴检测 - opencv 卷积神经网络 机器视觉 深度学习
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import math
import pandas
import json
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
from torch import nn
from matplotlib import pyplot
from collections import defaultdict
from collections import deque
import xml.etree.cElementTree as ET
# 缩放图片的大小
IMAGE_SIZE = (256, 192)
# 训练使用的数据集路径
DATASET_1_IMAGE_DIR = "./archive/images"
DATASET_1_ANNOTATION_DIR = "./archive/annotations"
DATASET_2_IMAGE_DIR = "./784145_1347673_bundle_archive/train/image_data"
DATASET_2_BOX_CSV_PATH = "./784145_1347673_bundle_archive/train/bbox_train.csv"
# 分类列表
# YOLO 原则上不需要 other 分类,但实测中添加这个分类有助于提升标签分类的精确度
CLASSES = [ "other", "with_mask", "without_mask" ]
CLASSES_MAPPING = { c: index for index, c in enumerate(CLASSES) }
# 判断是否存在对象使用的区域重叠率的阈值 (另外要求对象中心在区域内)
IOU_POSITIVE_THRESHOLD = 0.30
IOU_NEGATIVE_THRESHOLD = 0.30
# 用于启用 GPU 支持
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class BasicBlock(nn.Module):
"""ResNet 使用的基础块"""
expansion = 1 # 定义这个块的实际出通道是 channels_out 的几倍,这里的实现固定是一倍
def __init__(self, channels_in, channels_out, stride):
super().__init__()
# 生成 3x3 的卷积层
# 处理间隔 stride = 1 时,输出的长宽会等于输入的长宽,例如 (32-3+2)//1+1 == 32
# 处理间隔 stride = 2 时,输出的长宽会等于输入的长宽的一半,例如 (32-3+2)//2+1 == 16
# 此外 resnet 的 3x3 卷积层不使用偏移值 bias
self.conv1 = nn.Sequential(
nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 再定义一个让输出和输入维度相同的 3x3 卷积层
self.conv2 = nn.Sequential(
nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(channels_out))
# 让原始输入和输出相加的时候,需要维度一致,如果维度不一致则需要整合
self.identity = nn.Sequential()
if stride != 1 or channels_in != channels_out * self.expansion:
self.identity = nn.Sequential(
nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channels_out * self.expansion))
def forward(self, x):
# x => conv1 => relu => conv2 => + => relu
# | ^
# |==============================|
tmp = self.conv1(x)
tmp = nn.functional.relu(tmp, inplace=True)
tmp = self.conv2(tmp)
tmp += self.identity(x)
y = nn.functional.relu(tmp, inplace=True)
return y
class MyModel(nn.Module):
"""YOLO (基于 ResNet 的变种)"""
Anchors = None # 锚点列表,包含 锚点数量 * 形状数量 的范围
AnchorSpans = (16, 32, 64) # 尺度列表,值为锚点之间的距离
AnchorAspects = ((1, 1), (1.5, 1.5)) # 锚点对应区域的长宽比例列表
AnchorOutputs = 1 + 4 + len(CLASSES) # 每个锚点范围对应的输出数量,是否对象中心 (1) + 区域偏移 (4) + 分类数量
AnchorTotalOutputs = AnchorOutputs * len(AnchorAspects) # 每个锚点对应的输出数量
ObjScoreThreshold = 0.9 # 认为是对象中心所需要的最小分数
IOUMergeThreshold = 0.3 # 判断是否应该合并重叠区域的重叠率阈值
def __init__(self):
super().__init__()
# 抽取图片特征的 ResNet
# 因为锚点距离有三个,这里最后会输出各个锚点距离对应的特征
self.previous_channels_out = 4
self.resnet_models = nn.ModuleList([
nn.Sequential(
nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(self.previous_channels_out),
nn.ReLU(inplace=True),
self._make_layer(BasicBlock, channels_out=16, num_blocks=2, stride=1),
self._make_layer(BasicBlock, channels_out=32, num_blocks=2, stride=2),
self._make_layer(BasicBlock, channels_out=64, num_blocks=2, stride=2),
self._make_layer(BasicBlock, channels_out=128, num_blocks=2, stride=2),
self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)),
self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2),
self._make_layer(BasicBlock, channels_out=256, num_blocks=2, stride=2)
])
# 根据各个锚点距离对应的特征预测输出的卷积层
# 大的锚点距离抽取的特征会合并到小的锚点距离抽取的特征
# 这里的三个子模型意义分别是:
# - 计算用于合并的特征
# - 放大特征
# - 计算最终的预测输出
self.yolo_detectors = nn.ModuleList([
nn.ModuleList([nn.Sequential(
nn.Conv2d(256 if index == 0 else 512, 256, kernel_size=1, stride=1, padding=0, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=True),
nn.ReLU(inplace=True)),
nn.Upsample(scale_factor=2, mode="nearest"),
nn.Sequential(
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(256, MyModel.AnchorTotalOutputs, kernel_size=1, stride=1, padding=0, bias=True))])
for index in range(len(self.resnet_models))
])
# 处理结果范围的函数
self.sigmoid = nn.Sigmoid()
def _make_layer(self, block_type, channels_out, num_blocks, stride):
"""创建 resnet 使用的层"""
blocks = []
# 添加第一个块
blocks.append(block_type(self.previous_channels_out, channels_out, stride))
self.previous_channels_out = channels_out * block_type.expansion
# 添加剩余的块,剩余的块固定处理间隔为 1,不会改变长宽
for _ in range(num_blocks-1):
blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
self.previous_channels_out *= block_type.expansion
return nn.Sequential(*blocks)
@staticmethod
def _generate_anchors():
"""根据锚点和形状生成锚点范围列表"""
w, h = IMAGE_SIZE
anchors = []
for span in MyModel.AnchorSpans:
for x in range(0, w, span):
for y in range(0, h, span):
xcenter, ycenter = x + span / 2, y + span / 2
for ratio in MyModel.AnchorAspects:
ww = span * ratio[0]
hh = span * ratio[1]
xx = xcenter - ww / 2
yy = ycenter - hh / 2
xx = max(int(xx), 0)
yy = max(int(yy), 0)
ww = min(int(ww), w - xx)
hh = min(int(hh), h - yy)
anchors.append((xx, yy, ww, hh))
return anchors
def forward(self, x):
# 抽取各个锚点距离对应的特征
# 维度分别是:
# torch.Size([16, 256, 16, 12])
# torch.Size([16, 256, 8, 6])
# torch.Size([16, 256, 4, 3])
features_list = []
resnet_input = x
for m in self.resnet_models:
resnet_input = m(resnet_input)
features_list.append(resnet_input)
# 根据特征预测输出
# 维度分别是:
# torch.Size([16, 16, 4, 3])
# torch.Size([16, 16, 8, 6])
# torch.Size([16, 16, 16, 12])
# 16 是 (5 + 分类3) * 形状2
previous_upsampled_feature = None
outputs = []
for index, feature in enumerate(reversed(features_list)):
if previous_upsampled_feature is not None:
# 合并大的锚点距离抽取的特征到小的锚点距离抽取的特征
feature = torch.cat((feature, previous_upsampled_feature), dim=1)
# 计算用于合并的特征
hidden = self.yolo_detectors[index][0](feature)
# 放大特征 (用于下一次处理时合并)
upsampled = self.yolo_detectors[index][1](hidden)
# 计算最终的预测输出
output = self.yolo_detectors[index][2](hidden)
previous_upsampled_feature = upsampled
outputs.append(output)
# 连接所有输出
# 注意顺序需要与 Anchors 一致
outputs_flatten = []
for output in reversed(outputs):
output = output.permute(0, 2, 3, 1)
output = output.reshape(output.shape[0], -1, MyModel.AnchorOutputs)
outputs_flatten.append(output)
outputs_all = torch.cat(outputs_flatten, dim=1)
# 是否对象中心应该在 0 ~ 1 之间,使用 sigmoid 处理
outputs_all[:,:,:1] = self.sigmoid(outputs_all[:,:,:1])
# 分类应该在 0 ~ 1 之间,使用 sigmoid 处理
outputs_all[:,:,5:] = self.sigmoid(outputs_all[:,:,5:])
return outputs_all
@staticmethod
def loss_function(predicted, actual):
"""YOLO 使用的多任务损失计算器"""
result_tensor, result_isobject_masks, result_nonobject_masks = actual
objectness_losses = []
offsets_losses = []
labels_losses = []
for x in range(result_tensor.shape[0]):
mask_positive = result_isobject_masks[x]
mask_negative = result_nonobject_masks[x]
# 计算是否对象中心的损失,分别针对正负样本计算
# 因为大部分区域不包含对象中心,这里减少负样本的损失对调整参数的影响
objectness_loss_positive = nn.functional.mse_loss(
predicted[x,mask_positive,0], result_tensor[x,mask_positive,0])
objectness_loss_negative = nn.functional.mse_loss(
predicted[x,mask_negative,0], result_tensor[x,mask_negative,0]) * 0.5
objectness_losses.append(objectness_loss_positive)
objectness_losses.append(objectness_loss_negative)
# 计算区域偏移的损失,只针对正样本计算
offsets_loss = nn.functional.mse_loss(
predicted[x,mask_positive,1:5], result_tensor[x,mask_positive,1:5])
offsets_losses.append(offsets_loss)
# 计算标签分类的损失,分别针对正负样本计算
labels_loss_positive = nn.functional.binary_cross_entropy(
predicted[x,mask_positive,5:], result_tensor[x,mask_positive,5:])
labels_loss_negative = nn.functional.binary_cross_entropy(
predicted[x,mask_negative,5:], result_tensor[x,mask_negative,5:]) * 0.5
labels_losses.append(labels_loss_positive)
labels_losses.append(labels_loss_negative)
loss = (
torch.mean(torch.stack(objectness_losses)) +
torch.mean(torch.stack(offsets_losses)) +
torch.mean(torch.stack(labels_losses)))
return loss
@staticmethod
def calc_accuracy(actual, predicted):
"""YOLO 使用的正确率计算器,这里只计算是否对象中心与标签分类的正确率,区域偏移不计算"""
result_tensor, result_isobject_masks, result_nonobject_masks = actual
# 计算是否对象中心的正确率,正样本和负样本的正确率分别计算再平均
a = result_tensor[:,:,0]
p = predicted[:,:,0] > MyModel.ObjScoreThreshold
obj_acc_positive = ((a == 1) & (p == 1)).sum().item() / ((a == 1).sum().item() + 0.00001)
obj_acc_negative = ((a == 0) & (p == 0)).sum().item() / ((a == 0).sum().item() + 0.00001)
obj_acc = (obj_acc_positive + obj_acc_negative) / 2
# 计算标签分类的正确率
cls_total = 0
cls_correct = 0
for x in range(result_tensor.shape[0]):
mask = list(sorted(result_isobject_masks[x] + result_nonobject_masks[x]))
actual_classes = result_tensor[x,mask,5:].max(dim=1).indices
predicted_classes = predicted[x,mask,5:].max(dim=1).indices
cls_total += len(mask)
cls_correct += (actual_classes == predicted_classes).sum().item()
cls_acc = cls_correct / cls_total
return obj_acc, cls_acc
@staticmethod
def convert_predicted_result(predicted):
"""转换预测结果到 (标签, 区域, 对象中心分数, 标签识别分数) 的列表,重叠区域使用 NMS 算法合并"""
# 记录重叠的结果区域, 结果是 [ [(标签, 区域, RPN 分数, 标签识别分数)], ... ]
final_result = []
for anchor, tensor in zip(MyModel.Anchors, predicted):
obj_score = tensor[0].item()
if obj_score <= MyModel.ObjScoreThreshold:
# 要求对象中心分数超过一定值
continue
offset = tensor[1:5].tolist()
offset[0] = max(