目录
前言
1.实现效果
2.关键点分类网络
3.KPNet训练测试数据准备
4.训练结果
4.1训练过程可视化
4.2验证集上的混淆矩阵
4.测试结果
4.1不同规模模型的测试结果对比
4.2分类结果投影到第一象限
4.3测试集上的混淆矩阵
4.4 二义性手势结果
4.5视频实测
5.零样本的任意手势识别
5.1任意手势关键点获取
5.2任意手势特征编码
6.训练和测试关键代码
6.1dataset.py
6.2dataloader.py
6.3engine.py
6.4train.py
6.5test.py
先使用YOLOv8检测手部区域,然后使用YOLOv8-pose对放大的手部区域检测关键点,最后使用PointNet分类关键点,可以实现对任意手势的高精度实时识别。
对于非遮挡手势,仅需1W个参数,即可实现98%的准确率,极限情况下,仅需400个参数,可以达到80%的准确率。
手部关键点数据集准备:基于YOLOv8-pose的手部关键点检测(1)- 手部关键点数据集获取(数据集下载、数据清洗、处理与增强)
手部关键点检测模型训练:基于YOLOv8-pose的手部关键点检测(2)- 模型训练、结果分析和超参数优化
实现手部关键点实时检测:
基于YOLOv8-pose的手部关键点检测(3)- 实现实时手部关键点检测
hand使用yolov8-m检测得到,resnt表示ResNet18的分类结果,shfnt表示用shufflenet_v2的分类结果,kpnet表示使用关键点分类网络的分类结果,conf是置信度。
类别效果如下,将原始的18个类别映射为以下的14个类别:
mapping_dict = {'call': 0, 'dislike': 1, 'fist': 2, 'four': 3, 'like': 4, 'mute': 5, 'ok': 6,
'one': 5, 'palm': 7, 'peace': 8, 'peace_inverted': 8, 'rock': 9, 'stop': 10,
'stop_inverted': 10, 'three': 11, 'three2': 12, 'two_up': 13, 'two_up_inverted': 13}
论文地址:PointNet: Deep Learning on Point Sets for 3D Classification and Segmentation
项目地址:(tensorflow) https://github.com/charlesq34/pointnet
(pytorch) https://github.com/yanx27/Pointnet_Pointnet2_pytorch
PointNet主要用于3D点云分类,这里将手部关键点看做3D点到2D平面的投影。如果有深度估计(例如mediapipe)可以取得更好的效果,可以更准确识别正反面手、左右手。
mlp1_layers和mlp2_layers分别表示编码层和解码层全连接的节点数,调整节点数,可以获得不同规模大小的模型。最小的模型nn仅需[8, 8, 8]和[8, 8, 8],48个神经元,即可实现80%的分类准确率。将PointNet简化为关键点(KeyPoint)分类网络(Net)KPNet:
import torch
import torch.nn as nn
class KPNet(nn.Module):
def __init__(self, num_classes, dropout_rate=0.3):
super(KPNet, self).__init__()
# shared-MLP1 in encode layers
# mlp1_layers = [2, 64, 128, 1024] # X
# mlp2_layers = [1024, 512, 256, 128, num_classes] # X
# mlp1_layers = [2, 64, 128, 512] # L
# mlp2_layers = [512, 256, 128, num_classes] # L
mlp1_layers = [2, 64, 128, 256] # M
mlp2_layers = [256, 128, 64, num_classes] # M
# mlp1_layers = [2, 32, 64, 128] # S
# mlp2_layers = [128, 64, 32, num_classes] # S
# mlp1_layers = [2, 32, 32, 64] # n
# mlp2_layers = [64, 32, 32, num_classes] # n
# mlp1_layers = [2, 8, 8, 8] # nn
# mlp2_layers = [8, 8, 8, num_classes] # nn
# mlp1_layers = [2, 64, 128, 512] # visual
# mlp2_layers = [512, 256, 128, 2, num_classes] # visual
self.mlp1 = nn.ModuleList()
self.mlp2 = nn.ModuleList()
# MLP1 layers (Conv1d + BatchNorm1d + ReLU)
for i in range(len(mlp1_layers) - 1):
self.mlp1.append(nn.Conv1d(mlp1_layers[i], mlp1_layers[i + 1], 1))
self.mlp1.append(nn.BatchNorm1d(mlp1_layers[i + 1]))
self.mlp1.append(nn.ReLU())
# MLP2 layers (Linear + BatchNorm1d + ReLU)
for i in range(len(mlp2_layers) - 2): # Exclude last layer for linear
self.mlp2.append(nn.Linear(mlp2_layers[i], mlp2_layers[i + 1]))
self.mlp2.append(nn.BatchNorm1d(mlp2_layers[i + 1]))
self.mlp2.append(nn.ReLU())
if i >= 1: # Apply dropout after the third linear layer
self.mlp2.append(nn.Dropout(p=dropout_rate))
# Final layer without ReLU, dropout, or batch normalization
self.mlp2.append(nn.Linear(mlp2_layers[-2], mlp2_layers[-1]))
def forward(self, x):
# MLP1
x = x.transpose(2, 1) # (B, 2, N)
for layer in self.mlp1:
x = layer(x)
x = torch.max(x, 2)[0] # (B, 1024) global feature
# MLP2
# feat = None
for i, layer in enumerate(self.mlp2):
x = layer(x)
# if x.shape[1] == 2:
# feat = x
return x #, feat
# 测试 KPNet
if __name__ == "__main__":
B, N, C = 32, 100, 2 # Batch size = 32, 100 points, each with 2 dimensions
num_classes = 10
model = KPNet(num_classes)
x = torch.randn(B, N, C) # Random input
output = model(x)
print("Output shape:", output.shape) # Expected output shape: (32, 10)
将手部patch的关键点坐标归一化得到。patch如下图:
将每个类别坐标统一保存为txt文件:
每行保存一个patch的关键点信息:
对一行关键点进行可视化,外观特征较为明显:
关键点相对于点云更容易训练:点云需要随机采样(满足平移、旋转和置换不变性),而关键点的输入顺序是固定的,方向也是可以固定的。(根据需要,训练时可以加入旋转)。
可以看到约40轮就收敛了,每轮训练约13秒(2W条关键点),大概10分钟就能训练完。
主要错误:将three2错误预测为two(16个),将palm错误预测为stop(8个),将two错误预测为two_up。这也是符合预期的,这几类手势本身相似,会很容易受视角影响。其余每类准确率都在99%以上。
X号对应于原PointNet的网络设计,nn号为每层最小神经元尝试,总共参数仅400多个,就可以达到80+%的分类准确率:
模型 型号 | size (KB) | param | instance (test) | P (test) | R (test) | mAP (50:95) | Loss (test) | dropout |
nn | 14 | 438 | 122,720 | 0.8614 | 0.8258 | 0.8474 | 0.4693 | 0 |
n | 43 | 7,862 | 122,720 | 0.9804 | 0.9761 | 0.9784 | 0.0997 | 0.3 |
S | 103 | 18,722 | 122,720 | 0.9842 | 0.9803 | 0.9826 | 0.0912 | 0.3 |
M | 357 | 40,834 | 122,720 | 0.9865 | 0.9840 | 0.9860 | 0.0762 | 0.3 |
L | 992 | 224,218 | 122,720 | 0.9868 | 0.9836 | 0.9863 | 0.0731 | 0.3 |
X | 3,400 | 1,701,514 | 122,720 | 0.9848 | 0.9848 | 0.9863 | 0.0737 | 0.3 |
将特征映射为2为特征,在第一象限进行投影,可以看到14个类别被有效分开。不过,由于将负值强行映射到第一象限,导致原点处存在聚集(这也是为什么,分类网络的全连接层最后一层,不要加Relu的原因):
测试集上的效果与验证集上类似:
归一化的混淆矩阵如下图所示,绝大部分手势准确率都在99%以上,fist只用200个训练,导致准确率最低:
如下图,存在二义性的手势,由于光线等问题,分类网络预测为stop和three,但是利用关键点可以预测为four:
使用分类网络可以区分正反面,可以学习到旋转等特征,比如call都是写着的,横着时候resnet依然可以识别出,但关键点分类无法识别。(因为训练时,没有加入旋转,这样关键点分类可以识别更多的手势语义。)
方式1:
由于我们已经知道了标准手势,我们不需要在获取图片后,再提取关键点。我们可以自己在白板上直接画几个点表示关键点,然后加入随机抖动(限制一定范围内的)产生大量的手势关键点。
方式2:
我们已经训练好了YOLOv8-pose的手部关键点检测网络,我们只需自己用电脑摄像头,调整远近、角度、视角等,即可自动标注获取大量的标准手势。(如果是分类网络,需要不同背景、手部样式,关键点则不需要考虑这些)。
在训练完网络后,我们在前向推理中,获取分类结果前一层的特征,用于特征编码。给定一种标准手势,获取其特征编码值(该类手势可以获取几百上千个,然后进行特征聚类,获取更一般的特征);然后对于要判定的手势,计算其特征向量和标准手势编码特征向量的预先相似度。
class KPNet(nn.Module):
def forward(self, x):
# MLP1
x = x.transpose(2, 1) # (B, 2, N)
for layer in self.mlp1:
x = layer(x)
x = torch.max(x, 2)[0] # (B, 1024) global feature
# MLP2
feat = None
for i, layer in enumerate(self.mlp2):
x = layer(x)
if i == len(layer) - 2:
feat = x
return x, feat
import os
import numpy as np
from torch.utils.data import Dataset
class KPNetDataset(Dataset):
def __init__(self, data_dir, mapping_dict, reshape_dim=2, transform=None):
"""
:param data_dir: 数据文件夹的根目录
:param mapping_dict: 类别映射字典,key 为文件名,value 为类别值
:param reshape_dim: 重新调整数据形状的维度,默认是 2,即将一行的数据 reshape 为 [-1, 2]
:param transform: 数据增强的百分比,0~10%之间,None表示不做增强
"""
self.data_dir = data_dir
self.mapping_dict = mapping_dict
self.reshape_dim = reshape_dim
self.transform = transform
self.file_list = []
self.labels = []
# 构建 file_list 和 labels 列表
self._prepare_file_index()
def _prepare_file_index(self):
"""
构建 file_list 和 labels 列表,存储每一行数据的文件路径及其标签。
"""
for file_name, label in self.mapping_dict.items():
file_path = os.path.join(self.data_dir, file_name + '.txt')
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_path} does not exist.")
with open(file_path, 'r') as file:
lines = file.readlines()
for _ in lines:
self.file_list.append(file_path)
self.labels.append(label)
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
file_path = self.file_list[idx]
label = self.labels[idx]
# 读取指定文件的对应行数据
with open(file_path, 'r') as file:
lines = file.readlines()
line = lines[idx - self.file_list.index(file_path)].strip()
points = np.array(list(map(float, line.split()))).reshape(-1, self.reshape_dim)
# 数据增强
if self.transform:
points = self._apply_transform(points)
return points, label
def _apply_transform(self, points):
"""
应用随机抖动数据增强,并将超出范围的点置为(0, 0)。
:param points: 关键点数组,形状为 [n, 2]
:return: 增强后的关键点数组
"""
jitter = np.random.uniform(-self.transform, self.transform, points.shape)
points += jitter
# 找到超出 [0, 1] 范围的点,并将其置为 (0, 0)
mask = (points < 0) | (points > 1)
points[np.any(mask, axis=1)] = [0, 0]
return points
# 主程序
if __name__ == "__main__":
import cv2
data_dir = r'./datasets/hagrid/yolo_pose_point/val'
mapping_dict = {
'call': 0,
}
# 初始化数据集
dataset = KPNetDataset(data_dir, mapping_dict, reshape_dim=2, transform=0.02)
# 打印数据集的大小
print(f'Total items in dataset: {len(dataset)}')
# 测试读取前 5 条数据
for i in range(min(5, len(dataset))):
data, label = dataset[i]
print(f'Item {i}: Data shape: {data.shape}, Label: {label}')
# print(data.tolist())
# 创建白色背景的图像
canvas_size = 224
img = np.ones((canvas_size, canvas_size, 3), dtype=np.uint8) * 255
# 将归一化坐标转换为画布上的坐标,并绘制蓝色点
for point in data:
x, y = point
x = int(x * canvas_size)
y = int(y * canvas_size)
cv2.circle(img, (x, y), radius=3, color=(255, 0, 0), thickness=-1) # 蓝色点
# 显示图像
cv2.imshow(f'Item {i}', img)
cv2.waitKey(0) # 按任意键继续
cv2.destroyAllWindows()
import torch
from torch.utils.data import DataLoader
from dataset import KPNetDataset
def create_dataloader(data_dir, mapping_dict, phase='train', reshape_dim=2, batch_size=32, num_workers=4,
transform=None):
"""
创建并返回一个 DataLoader。
:param data_dir: 数据根目录。
:param mapping_dict: 类别映射字典,key 为文件名,value 为类别值。
:param phase: 当前数据集的阶段(train, val, test)。
:param reshape_dim: 将数据 reshape 为 [n, reshape_dim] 的维度。
:param batch_size: 批大小。
:param num_workers: 用于数据加载的子进程数。
:param transform: 数据增强的百分比,0~10%之间,None表示不做增强。
:return: 返回 DataLoader。
"""
shuffle = True if phase == 'train' else False
transform = transform if phase == 'train' else None
dataset = KPNetDataset(data_dir, mapping_dict, reshape_dim, transform)
# 使用默认的collate_fn
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)
return dataloader
# 主程序(用于测试)
if __name__ == "__main__":
data_dir = r'./datasets/hagrid/yolo_pose_point/val'
mapping_dict = {
'call': 0,
# 可以添加更多映射
}
# 创建训练集 DataLoader(应用 2% 的数据增强)
train_dataloader = create_dataloader(data_dir, mapping_dict, phase='train', transform=0.02, batch_size=4)
# 测试读取训练集数据
for i, (data, label) in enumerate(train_dataloader):
print(f'Train Batch {i}:')
print(f'Data shape: {data.shape}, Labels: {label.shape}')
if i == 2: # 仅测试前三个批次
break
# 创建验证集 DataLoader(不做数据增强)
val_dataloader = create_dataloader(data_dir, mapping_dict, phase='val', batch_size=4)
# 测试读取验证集数据
for i, (data, label) in enumerate(val_dataloader):
print(f'Validation Batch {i}:')
print(f'Data shape: {data.shape}, Labels: {label.shape}')
if i == 2: # 仅测试前三个批次
break
import torch
import torch.nn as nn
from sklearn.metrics import classification_report
from tqdm import tqdm
def train_one_epoch(model, dataloader, optimizer, criterion, device, label_dict):
model.train()
running_loss = 0.0
correct_predictions = 0
total_samples = 0
all_labels = []
all_predictions = []
for data_batches, label_batches in tqdm(dataloader, desc="Training", unit="batch"):
data_batches = data_batches.to(device).float()
label_batches = label_batches.to(device)
optimizer.zero_grad()
outputs = model(data_batches)
loss = criterion(outputs, label_batches)
loss.backward()
optimizer.step()
running_loss += loss.item() * data_batches.size(0)
_, predicted = torch.max(outputs, 1)
correct_predictions += (predicted == label_batches).sum().item()
total_samples += label_batches.size(0)
all_labels.extend(label_batches.cpu().numpy())
all_predictions.extend(predicted.cpu().numpy())
epoch_loss = running_loss / total_samples
epoch_accuracy = correct_predictions / total_samples
# 将标签索引映射回 label_dict 中的标签名称
target_names = [label_dict[i] for i in sorted(label_dict.keys())]
classification_metrics = classification_report(all_labels, all_predictions, target_names=target_names,
output_dict=True, zero_division=0, digits=3)
overall_recall = classification_metrics["macro avg"]["recall"]
return epoch_loss, epoch_accuracy, overall_recall, classification_metrics
def test_one_epoch(model, dataloader, criterion, device, label_dict):
model.eval()
running_loss = 0.0
correct_predictions = 0
total_samples = 0
all_labels = []
all_predictions = []
with torch.no_grad():
for data_batches, label_batches in tqdm(dataloader, desc="Validation", unit="batch"):
data_batches = data_batches.to(device).float()
label_batches = label_batches.to(device)
outputs = model(data_batches)
loss = criterion(outputs, label_batches)
running_loss += loss.item() * data_batches.size(0)
_, predicted = torch.max(outputs, 1)
correct_predictions += (predicted == label_batches).sum().item()
total_samples += label_batches.size(0)
all_labels.extend(label_batches.cpu().numpy())
all_predictions.extend(predicted.cpu().numpy())
epoch_loss = running_loss / total_samples
epoch_accuracy = correct_predictions / total_samples
# 将标签索引映射回 label_dict 中的标签名称
target_names = [label_dict[i] for i in sorted(label_dict.keys())]
classification_metrics = classification_report(all_labels, all_predictions, target_names=target_names,
output_dict=True, zero_division=0, digits=3)
overall_recall = classification_metrics["macro avg"]["recall"]
return epoch_loss, epoch_accuracy, overall_recall, classification_metrics
if __name__ == "__main__":
from KPNet import KPNet
from dataloader import create_dataloader
train_data_dir = r'./datasets/hagrid/yolo_pose_point/test'
val_data_dir = r'./datasets/hagrid/yolo_pose_point/val'
mapping_dict = {'call': 0, 'three': 1, 'palm': 2}
num_classes = 3
batch_size = 1024
num_workers = 1
learning_rate = 0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 创建模型、损失函数、优化器
model = KPNet(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 创建数据加载器
train_dataloader = create_dataloader(train_data_dir, mapping_dict, phase='train', batch_size=batch_size,
num_workers=num_workers, transform=0.02)
val_dataloader = create_dataloader(val_data_dir, mapping_dict, phase='val', batch_size=batch_size,
num_workers=num_workers)
# 训练一轮
train_loss, train_accuracy, train_recall, train_metrics = train_one_epoch(model, train_dataloader, optimizer,
criterion, device, mapping_dict)
# 测试一轮
val_loss, val_accuracy, val_recall, val_metrics = test_one_epoch(model, val_dataloader, criterion, device,
mapping_dict)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ExponentialLR
import os
import logging
import time
from KPNet import KPNet
from dataloader import create_dataloader
from engine import train_one_epoch, test_one_epoch
from torch.utils.tensorboard import SummaryWriter
from tabulate import tabulate
import matplotlib.pyplot as plt
def train_pipline(train_data_dir, val_data_dir, mapping_dict, label_dict, num_classes, batch_size=256, num_workers=4,
initial_lr=0.01, num_epochs=100, min_lr=0.00005, optimizer_choice='adam'):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 创建模型、损失函数、优化器
model = KPNet(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
if optimizer_choice.lower() == 'adam':
optimizer = optim.Adam(model.parameters(), lr=initial_lr)
else:
optimizer = optim.SGD(model.parameters(), lr=initial_lr, momentum=0.9)
# 指数衰减的学习率调度器
scheduler = ExponentialLR(optimizer, gamma=0.95)
# 创建数据加载器
train_dataloader = create_dataloader(train_data_dir, mapping_dict, phase='train', batch_size=batch_size,
num_workers=num_workers, transform=0.01)
val_dataloader = create_dataloader(val_data_dir, mapping_dict, phase='val', batch_size=batch_size // 4,
num_workers=num_workers)
# 日志和模型保存配置
timestamp = time.strftime("%Y%m%d-%H%M%S")
model_save_dir = os.path.join('model_save', timestamp)
os.makedirs(model_save_dir, exist_ok=True)
log_dir = os.path.join('run_log', timestamp)
os.makedirs(log_dir, exist_ok=True)
log_filename = os.path.join(log_dir, 'training.log')
logging.basicConfig(filename=log_filename, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# 创建参数日志
args_log_filename = os.path.join(log_dir, 'args.log')
with open(args_log_filename, 'w') as f:
f.write(f"train_data_dir: {train_data_dir}\n")
f.write(f"val_data_dir: {val_data_dir}\n")
f.write(f"mapping_dict: {mapping_dict}\n")
f.write(f"label_dict: {label_dict}\n")
f.write(f"num_classes: {num_classes}\n")
f.write(f"batch_size: {batch_size}\n")
f.write(f"num_workers: {num_workers}\n")
f.write(f"initial_lr: {initial_lr}\n")
f.write(f"num_epochs: {num_epochs}\n")
f.write(f"min_lr: {min_lr}\n")
f.write(f"optimizer_choice: {optimizer_choice}\n")
# 创建 TensorBoard writer
writer = SummaryWriter(log_dir=log_dir)
best_accuracy = 0.0
best_recall = 0.0
for epoch in range(num_epochs):
train_loss, train_accuracy, train_recall, train_metrics = train_one_epoch(model, train_dataloader, optimizer,
criterion, device, label_dict)
val_loss, val_accuracy, val_recall, val_metrics = test_one_epoch(model, val_dataloader, criterion, device,
label_dict)
# 学习率调度
scheduler.step()
current_lr = scheduler.get_last_lr()[0]
if current_lr < min_lr:
for param_group in optimizer.param_groups:
param_group['lr'] = min_lr
# TensorBoard记录
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Loss/val', val_loss, epoch)
writer.add_scalar('Accuracy/train', train_accuracy, epoch)
writer.add_scalar('Accuracy/val', val_accuracy, epoch)
writer.add_scalar('Recall/train', train_recall, epoch)
writer.add_scalar('Recall/val', val_recall, epoch)
# 每个类别的准确率和召回率记录在 TensorBoard 中
for category_index, category_name in label_dict.items():
writer.add_scalar(f'Accuracy/train_{category_name}', train_metrics[category_name]['precision'], epoch)
writer.add_scalar(f'Recall/train_{category_name}', train_metrics[category_name]['recall'], epoch)
writer.add_scalar(f'Accuracy/val_{category_name}', val_metrics[category_name]['precision'], epoch)
writer.add_scalar(f'Recall/val_{category_name}', val_metrics[category_name]['recall'], epoch)
# 日志记录
logging.info(f'Epoch {epoch + 1}/{num_epochs}, '
f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train Recall: {train_recall:.4f}, '
f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Validation Recall: {val_recall:.4f}, '
f'Learning Rate: {current_lr:.6f}')
print(f'Epoch {epoch + 1}/{num_epochs}, '
f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train Recall: {train_recall:.4f}, '
f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Validation Recall: {val_recall:.4f}, '
f'Learning Rate: {current_lr:.6f}')
# 保存最佳模型:先比准确率,再比召回率
if val_accuracy > best_accuracy or (val_accuracy == best_accuracy and val_recall > best_recall):
best_accuracy = val_accuracy
best_recall = val_recall
best_model_wts = model.state_dict()
best_model_path = os.path.join(model_save_dir, 'best_model.pth')
torch.save(best_model_wts, best_model_path)
logging.info(f'Best model saved with accuracy: {best_accuracy:.4f} and recall: {best_recall:.4f}')
print(f'Best model saved with accuracy: {best_accuracy:.4f} and recall: {best_recall:.4f}')
time.sleep(0.3) # 防止 tqdm 输出错位
# 保存最后一轮模型
last_model_path = os.path.join(model_save_dir, 'last_model.pth')
torch.save(model.state_dict(), last_model_path)
logging.info('Last model saved.')
print('Last model saved.')
# 关闭 TensorBoard writer
writer.close()
# 加载并验证最佳模型
model.load_state_dict(torch.load(best_model_path))
val_loss, val_accuracy, val_recall, val_metrics = test_one_epoch(model, val_dataloader, criterion, device,
label_dict)
print(
f'Best Model Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Validation Recall: {val_recall:.4f}')
print(f"Best Model Validation Metrics:\n")
# 在命令行中以表格形式显示验证指标
print_metrics_table(val_metrics, label_dict.values())
# 将验证指标保存到日志文件
val_best_model_log_filename = os.path.join(log_dir, 'val_best_model.log')
with open(val_best_model_log_filename, 'w') as f:
f.write(f'Best Model Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Validation Recall: {val_recall:.4f}\n')
f.write(f"Best Model Validation Metrics:\n")
log_content = tabulate([
[category,
f"{metrics['precision']:.3f}" if isinstance(metrics, dict) else 'N/A',
f"{metrics['recall']:.3f}" if isinstance(metrics, dict) else 'N/A',
f"{metrics['f1-score']:.3f}" if isinstance(metrics, dict) else 'N/A']
for category, metrics in val_metrics.items()
], headers=['Category', 'Precision', 'Recall', 'F1-Score'], tablefmt='grid')
f.write(log_content) # 写入到日志文件
logging.info("\n" + log_content) # 记录到日志
# 绘制并保存训练过程的图像
plot_and_save_separate_graphs(log_dir, num_epochs)
def print_metrics_table(metrics, class_names):
"""
在命令行中以表格形式打印验证集的分类指标
"""
table = []
for category in class_names:
precision = metrics[category]['precision']
recall = metrics[category]['recall']
f1_score = metrics[category]['f1-score']
table.append([category, f"{precision:.3f}", f"{recall:.3f}", f"{f1_score:.3f}"])
# 打印表格
print(tabulate(table, headers=['Category', 'Precision', 'Recall', 'F1-Score'], tablefmt='grid'))
def plot_and_save_separate_graphs(log_dir, num_epochs):
"""
分别绘制并保存总损失和其他指标(准确率、召回率)的图像
"""
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
# 加载 TensorBoard 日志
event_acc = EventAccumulator(log_dir)
event_acc.Reload()
steps = range(num_epochs)
# 总损失曲线
train_loss = [scalar_event.value for scalar_event in event_acc.Scalars('Loss/train')]
val_loss = [scalar_event.value for scalar_event in event_acc.Scalars('Loss/val')]
plt.figure(figsize=(10, 6))
plt.plot(steps, train_loss, label='Train Loss')
plt.plot(steps, val_loss, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Train and Validation Loss Over Epochs')
plt.legend()
plt.savefig(os.path.join(log_dir, 'loss_curve.png'))
# 总准确率和召回率曲线
train_accuracy = [scalar_event.value for scalar_event in event_acc.Scalars('Accuracy/train')]
val_accuracy = [scalar_event.value for scalar_event in event_acc.Scalars('Accuracy/val')]
train_recall = [scalar_event.value for scalar_event in event_acc.Scalars('Recall/train')]
val_recall = [scalar_event.value for scalar_event in event_acc.Scalars('Recall/val')]
plt.figure(figsize=(10, 6))
plt.plot(steps, train_accuracy, label='Train Accuracy')
plt.plot(steps, val_accuracy, label='Validation Accuracy')
plt.plot(steps, train_recall, label='Train Recall')
plt.plot(steps, val_recall, label='Validation Recall')
plt.xlabel('Epoch')
plt.ylabel('Accuracy/Recall')
plt.title('Accuracy and Recall Over Epochs')
plt.legend()
plt.savefig(os.path.join(log_dir, 'accuracy_recall_curve.png'))
if __name__ == "__main__":
train_data_dir = r'./datasets/hagrid/yolo_pose_point/test'
val_data_dir = r'./datasets/hagrid/yolo_pose_point/val'
# mapping_dict = {'call': 0, 'dislike': 1, 'fist': 2, 'four': 3, 'like': 4, 'mute': 5, 'ok': 6, 'one': 5, 'palm': 7,
# 'peace': 8, 'peace_inverted': 8, 'rock': 9, 'stop': 10, 'stop_inverted': 10, 'three': 11,
# 'three2': 12, 'two_up': 13, 'two_up_inverted': 13, 'no_gesture': 14}
# label_dict = {0: 'six', 1: 'dislike', 2: 'fist', 3: 'four', 4: 'like', 5: 'one', 6: 'ok', 7: 'palm', 8: 'two',
# 9: 'rock', 10: 'stop', 11: 'three', 12: 'three2', 13: 'two_up', 14: 'no_gesture'}
mapping_dict = {'call': 0, 'dislike': 1, 'fist': 2, 'four': 3, 'like': 4, 'mute': 5, 'ok': 6,
'one': 5, 'palm': 7, 'peace': 8, 'peace_inverted': 8, 'rock': 9, 'stop': 10,
'stop_inverted': 10, 'three': 11, 'three2': 12, 'two_up': 13, 'two_up_inverted': 13}
label_dict = {0: 'six', 1: 'dislike', 2: 'fist', 3: 'four', 4: 'like', 5: 'one', 6: 'ok', 7: 'palm', 8: 'two',
9: 'rock', 10: 'stop', 11: 'three', 12: 'three2', 13: 'two_up'}
num_classes = len(label_dict)
batch_size = 512
num_workers = 4
initial_lr = 0.01
num_epochs = 100
min_lr = 0.0001
optimizer_choice = 'adam' # or 'sgd'
train_pipline(train_data_dir, val_data_dir, mapping_dict, label_dict, num_classes, batch_size, num_workers, initial_lr,
num_epochs, min_lr, optimizer_choice)
import torch
import torch.nn as nn
import os
import numpy as np
import time
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from KPNet import KPNet
from dataloader import create_dataloader
from engine import test_one_epoch
from tabulate import tabulate
def test_pipeline(test_data_dir, model_path, mapping_dict, label_dict, num_classes, batch_size=256, num_workers=4):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载模型
model = KPNet(num_classes).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
# 创建数据加载器
test_dataloader = create_dataloader(test_data_dir, mapping_dict, phase='val', batch_size=batch_size // 4,
num_workers=num_workers)
# 获取模型时间戳目录
model_timestamp = os.path.basename(os.path.dirname(model_path))
output_dir = os.path.join('output', model_timestamp)
# 确保 output 目录存在
os.makedirs(output_dir, exist_ok=True)
# 查找下一个测试文件夹编号
test_folders = [f for f in os.listdir(output_dir) if f.startswith('test_')]
test_numbers = [int(f.split('_')[1]) for f in test_folders if f.split('_')[1].isdigit()]
next_test_number = max(test_numbers) + 1 if test_numbers else 1
test_output_dir = os.path.join(output_dir, f'test_{next_test_number:02d}')
os.makedirs(test_output_dir, exist_ok=True)
# 运行测试
criterion = nn.CrossEntropyLoss()
test_loss, test_accuracy, test_recall, test_metrics = test_one_epoch(model, test_dataloader, criterion, device,
label_dict)
# 打印并保存测试结果
test_results_log_filename = os.path.join(test_output_dir, 'test_results.log')
with open(test_results_log_filename, 'w') as f:
f.write(f'Test Loss: {test_loss:.4f}\n')
f.write(f'Test Accuracy: {test_accuracy:.4f}\n')
f.write(f'Test Recall: {test_recall:.4f}\n')
f.write(f"Test Metrics:\n")
log_content = tabulate([
[category,
f"{metrics['precision']:.3f}" if isinstance(metrics, dict) else 'N/A',
f"{metrics['recall']:.3f}" if isinstance(metrics, dict) else 'N/A',
f"{metrics['f1-score']:.3f}" if isinstance(metrics, dict) else 'N/A',
metrics['support']]
for category, metrics in test_metrics.items() if category != 'accuracy'
], headers=['Category', 'Precision', 'Recall', 'F1-Score', 'Instance'], tablefmt='grid')
f.write(log_content)
# 计算并保存混淆矩阵
all_labels = []
all_predictions = []
for data_batches, label_batches in test_dataloader:
data_batches = data_batches.to(device).float()
label_batches = label_batches.to(device)
with torch.no_grad():
outputs = model(data_batches)
_, predicted = torch.max(outputs, 1)
all_labels.extend(label_batches.cpu().numpy())
all_predictions.extend(predicted.cpu().numpy())
cm = confusion_matrix(all_labels, all_predictions, labels=list(label_dict.keys()))
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_dict.values(), yticklabels=label_dict.values())
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.savefig(os.path.join(test_output_dir, 'confusion_matrix.png'))
plt.figure(figsize=(10, 8))
sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues', xticklabels=label_dict.values(),
yticklabels=label_dict.values())
plt.title('Normalized Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.savefig(os.path.join(test_output_dir, 'normalized_confusion_matrix.png'))
# 计算 mAP
precision_values = []
recall_values = []
for category in label_dict.values():
if category in test_metrics:
precision_values.append(test_metrics[category]['precision'])
recall_values.append(test_metrics[category]['recall'])
mAP50 = np.mean([p >= 0.5 for p in precision_values])
mAP75 = np.mean([p >= 0.75 for p in precision_values])
mAP50_95 = np.mean([p for p in precision_values])
with open(test_results_log_filename, 'a') as f:
f.write(f"\nmAP50: {mAP50:.4f}\n")
f.write(f"mAP75: {mAP75:.4f}\n")
f.write(f"mAP50:95: {mAP50_95:.4f}\n")
if __name__ == "__main__":
test_data_dir = r'./datasets/hagrid/yolo_pose_point/train'
model_path = r'./KPNet/model_save/20240816-172047/best_model.pth'
mapping_dict = {'call': 0, 'dislike': 1, 'fist': 2, 'four': 3, 'like': 4, 'mute': 5, 'ok': 6, 'one': 5, 'palm': 7,
'peace': 8, 'peace_inverted': 8, 'rock': 9, 'stop': 10, 'stop_inverted': 10, 'three': 11,
'three2': 12, 'two_up': 13, 'two_up_inverted': 13}
label_dict = {0: 'six', 1: 'dislike', 2: 'fist', 3: 'four', 4: 'like', 5: 'one', 6: 'ok', 7: 'palm', 8: 'two',
9: 'rock', 10: 'stop', 11: 'three', 12: 'three2', 13: 'two_up'}
num_classes = len(label_dict)
batch_size = 512
num_workers = 4
test_pipeline(test_data_dir, model_path, mapping_dict, label_dict, num_classes, batch_size, num_workers)