系统环境
- Ubuntu 18.04 LTS
- Python 3.7.6
- PyTorch 1.4.0
- CUDA 10.1
- cuDNN 7.6.5
Windows下有相应环境也可以,我电脑Linux下风扇驱动不理想,跑GPU太烫,训练的代码在Windows上也可以跑。
制作数据集
数据集在“素材”目录下,以下是num目录结构说明。
.└── num├── 0-original 负样本原图│ └── 0├── 0-processed 负样本处理图│ └── 0├── num1-original 数字1原图│ └── num1├── num1-processed 数字1处理图│ └── num1├── num2-original 数字2原图│ └── num2├── num2-processed 数字2处理图│ └── num2├── num3-original 数字3原图│ └── num3├── num3-processed 数字3处理图│ └── num3├── num4-original 数字4原图│ └── num4├── num4-processed 数字4处理图│ └── num4├── num5-original 数字5原图│ └── num5├── num5-processed 数字5处理图│ └── num5├── sentinel-original 哨兵标记原图│ └── sentinel└── sentinel-processed 哨兵标记处理图└── sentinel
根据目录结构,制作数据集,在CSV文件中打印图片路径和对应分类标签,
"""@author starrysky@date 2020/08/16@details 制作数据集标签"""import pandas as pdimport osdf = pd.DataFrame({"image_path": [],"label": [],})# 素材文件目录路径src_dir = r"./素材/num/0-processed/0/"files = os.listdir(src_dir)for i in files:# print(src_dir + i)df.loc[df.shape[0] + 1] = {"image_path": src_dir + i,"label": 0,}dir_type_list = ["close", "closeh", "far", "farh"]label_type_list = ["num1", "num2", "num3", "num4", "num5", "sentinel"]for num in range(0, 6):for dir_name in dir_type_list:# 素材文件目录路径src_dir = r"./素材/num/" + label_type_list[num] + "-processed/" + label_type_list[num] + "/" + dir_name + "/"files = os.listdir(src_dir)for i in files:# print(src_dir + i)df.loc[df.shape[0] + 1] = {"image_path": src_dir + i,"label": num + 1,}# print(df)df.to_csv("./素材/num/label.csv")
在PyTorch中自定义数据集
在PyTrich种制作数据集主要是要将样本转换成Tensor格式,首先要继承torch.utils.data.Dataset这个父类,然后根据PyTorch的要求实现一些魔法方法,如init,getitem,len。
trans = transforms.ToTensor()def default_loader(path):"""定义读取图片的格式为28*28的单通道灰度图:param path: 图片路径:return: 图片"""return Image.open(path).convert('L').resize((28, 28))class MyDataset(Dataset):"""制作数据集"""def __init__(self, csv_path, transform=None, loader=default_loader):""":param csv_path: 文件路径:param transform: 转后后的Tensor格式:param loader: 图片加载方式"""super(MyDataset, self).__init__()df = pd.read_csv(csv_path, engine="python", encoding="utf-8")self.df = dfself.transform = transformself.loader = loaderdef __getitem__(self, index):"""按照索引从数据集提取对应样本的信息Args:index: 索引值Returns:特征和标签"""fn = self.df.iloc[index][1]label = self.df.iloc[index][2]img = self.loader(fn)# 按照路径读取图片if self.transform is not None:# 数据标签转换为Tensorimg = self.transform(img)return img, labeldef __len__(self):"""样本数量Returns:样本数量"""return len(self.df)
定义好之后就可以创建数据集对象,打印着看一下。
# 数据集元数据文件的路径metadata_path = r"./素材/num/label.csv"# 批次规模batch_size = 64# 线程数if sys.platform == "win32":num_workers = 0else:num_workers = 12# 训练集占比train_rate = 0.8# 创建数据集src_data = MyDataset(csv_path=metadata_path, transform=trans)print('num_of_trainData:', len(src_data))# K折交叉验证train_size = int(train_rate * len(src_data))test_size = len(src_data) - train_sizetrain_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)# 打印数据集for i, j in train_iter:print(i, j)break
运行结果如下,展示了一个批次共64个样本。
num_of_trainData: 8120tensor([[[[0.0039, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],[0.0000, 0.0157, 0.0000, ..., 0.0000, 0.0000, 0.0000],[0.0039, 0.0000, 0.0078, ..., 0.0000, 0.0000, 0.0000],...,[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000]]],[[[0.9922, 1.0000, 0.0000, ..., 0.0039, 0.0000, 0.0039],[1.0000, 0.9725, 0.0000, ..., 0.0078, 0.0000, 0.0235],[0.0000, 0.0000, 0.0039, ..., 0.0000, 0.0000, 0.0000],...,[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0196, 0.0078],[0.0000, 0.0000, 0.0000, ..., 0.0235, 0.0078, 0.0000],[0.0000, 0.0000, 0.0000, ..., 0.0039, 0.0000, 1.0000]]],[[[0.0000, 0.0000, 0.0000, ..., 0.0078, 0.0000, 0.0000],[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0039],[0.0000, 0.0000, 0.0000, ..., 0.0157, 0.0039, 0.0000],...,[0.0000, 0.0000, 0.0157, ..., 0.0000, 0.0000, 0.0078],[0.0039, 0.0118, 0.0000, ..., 0.0549, 0.0000, 0.0000],[0.0000, 0.0039, 0.0000, ..., 0.0000, 0.0118, 0.0235]]],...,[[[0.9373, 0.0118, 0.0000, ..., 0.0000, 0.0000, 0.0000],[0.1529, 0.1255, 0.1059, ..., 0.0000, 0.0000, 0.0000],[0.9961, 1.0000, 0.7412, ..., 0.0000, 0.0000, 0.0000],...,[0.0275, 0.5765, 0.5804, ..., 0.0000, 0.0000, 0.0000],[0.0078, 0.0000, 0.0039, ..., 0.0000, 0.0000, 0.0000],[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000]]],[[[1.0000, 0.9961, 1.0000, ..., 1.0000, 1.0000, 1.0000],[0.9961, 1.0000, 0.9804, ..., 1.0000, 1.0000, 1.0000],[0.9961, 1.0000, 0.9961, ..., 0.9961, 0.9961, 0.9961],...,[0.9961, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],[1.0000, 0.9843, 1.0000, ..., 1.0000, 1.0000, 1.0000],[0.0000, 1.0000, 1.0000, ..., 0.0000, 0.0000, 0.0000]]],[[[0.0000, 0.0000, 0.0078, ..., 0.9922, 1.0000, 1.0000],[0.0000, 0.0039, 0.0078, ..., 1.0000, 0.9961, 1.0000],[0.0000, 0.0000, 0.2980, ..., 0.9961, 0.9843, 0.9922],...,[0.0039, 0.0039, 0.0078, ..., 1.0000, 1.0000, 1.0000],[0.0000, 0.0000, 0.0000, ..., 1.0000, 1.0000, 0.9647],[0.0078, 0.0039, 0.0157, ..., 1.0000, 0.9843, 1.0000]]]]) tensor([4., 6., 0., 6., 6., 4., 6., 2., 4., 6., 4., 6., 2., 6., 2., 1., 5., 4.,2., 2., 4., 3., 2., 1., 3., 3., 2., 6., 4., 3., 5., 4., 6., 2., 2., 5.,1., 2., 1., 1., 6., 5., 6., 4., 3., 2., 2., 3., 4., 1., 5., 5., 2., 6.,2., 2., 5., 3., 3., 5., 2., 3., 6., 6.])
定义模型
之前做MNIST问题的时候使用LeNet也可以取得比较不错的效果,这里再考虑到小电脑的计算能力,因为实际应用对于实时性的要求也非常高,暂时没有使用一些深度卷积神经网络的经典模型。
整个模型结构就是基本使用了LeNet的结构,我只修改了输出层参数为7。
class LeNet(nn.Module):"""定义模型, 这里使用LeNet"""def __init__(self):super(LeNet, self).__init__()# 卷积层self.conv = nn.Sequential(# 输入通道数, 输出通道数, kernel_sizenn.Conv2d(1, 6, 5),nn.Sigmoid(),# 最大池化nn.MaxPool2d(2, 2),nn.Conv2d(6, 16, 5),nn.Sigmoid(),nn.MaxPool2d(2, 2))# 全连接层self.fc = nn.Sequential(nn.Linear(16 * 4 * 4, 120),nn.Sigmoid(),nn.Linear(120, 84),nn.Sigmoid(),nn.Linear(84, 7))def forward(self, img):feature = self.conv(img)output = self.fc(feature.view(img.shape[0], -1))return outputnet = LeNet()print(net)
模型结构展示。
LeNet((conv): Sequential((0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))(1): Sigmoid()(2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)(3): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))(4): Sigmoid()(5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False))(fc): Sequential((0): Linear(in_features=256, out_features=120, bias=True)(1): Sigmoid()(2): Linear(in_features=120, out_features=84, bias=True)(3): Sigmoid()(4): Linear(in_features=84, out_features=7, bias=True)))
评估模型
计算模型的分类正确率。
# %% 模型评估def evaluate_accuracy(data_iter, net, device=None):"""评估模型, GPU加速运算:param data_iter: 测试集迭代器:param net: 待评估模型:param device: 训练设备:return: 正确率"""# 未指定训练设备的话就使用 net 的 deviceif device is None and isinstance(net, nn.Module):device = list(net.parameters())[0].deviceacc_sum, n = 0.0, 0with torch.no_grad():for x, y in data_iter:if isinstance(net, nn.Module):# 评估模式, 关闭dropout(丢弃法)net.eval()acc_sum += (net(x.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()# 改回训练模式net.train()n += y.shape[0]return acc_sum / n
训练模型
参考之前跟着Dive-into-DL-PyTorch学习时候的一套训练流程。
def train_model(net, train_iter, test_iter, loss_func, optimizer, device, num_epochs):"""训练模型:param net: 原始网络:param train_iter: 训练集:param test_iter: 测试集:param loss_func: 损失函数:param optimizer: 优化器:param device: 训练设备:param num_epochs: 训练周期:return: 无"""net = net.to(device)print("训练设备={0}".format(device))for i in range(num_epochs):# 总误差, 准确率train_lose_sum, train_acc_sum = 0.0, 0.0# 样本数量, 批次数量sample_count, batch_count = 0, 0# 训练时间start = time.time()for x, y in train_iter:# x, y = jx = x.to(device)y = y.long().to(device)y_output = net(x)lose = loss_func(y_output, y)optimizer.zero_grad()lose.backward()optimizer.step()train_lose_sum += lose.cpu().item()train_acc_sum += (y_output.argmax(dim=1) == y).sum().cpu().item()sample_count += y.shape[0]batch_count += 1test_acc = evaluate_accuracy(test_iter, net)print("第{0}个周期, lose={1:.3f}, train_acc={2:.3f}, test_acc={3:.3f}, time={4:.1f}".format(i, train_lose_sum / batch_count, train_acc_sum / sample_count, test_acc, time.time() - start))
训练过程
配置超参数,训练模型。由于数据集也不是很大,采用K折交叉验证来重用数据。
if __name__ == '__main__':# %% 设置工作路径# print(os.getcwd())# os.chdir(os.getcwd() + "\learn")# 获取当前文件路径print(os.getcwd())# %% 超参数配置# 数据集元数据文件的路径metadata_path = r"./素材/num/label.csv"# 训练设备device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 批次规模batch_size = 64# 线程数if sys.platform == "win32":num_workers = 0else:num_workers = 12# 训练集占比train_rate = 0.8# 创建数据集# src_data = MyDataset(csv_path=metadata_path, transform=trans)src_data = MyDataset(csv_path=metadata_path, transform=trans)print('num_of_trainData:', len(src_data))# K折交叉验证train_size = int(train_rate * len(src_data))test_size = len(src_data) - train_sizetrain_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)# 打印数据集for i, j in train_iter:print(i, j)breaknet = LeNet()print(net)# 训练次数num_epochs = 5# 优化算法optimizer = torch.optim.Adam(net.parameters(), lr=0.002)# 交叉熵损失函数loss_func = nn.CrossEntropyLoss()for i in range(10):# K折交叉验证train_size = int(train_rate * len(src_data))test_size = len(src_data) - train_sizetrain_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)train_model(net, train_iter, test_iter, loss_func, optimizer, device, num_epochs)
保存模型
保存训练好的参数,以便后续使用。
# 保存训练后的模型数据torch.save(net.state_dict(), "./model_param/state_dict.pt")
测试模型
测试在整个数据集上的正确率,并导出供LibTorch使用的模型,整体速度还是非常快。
"""@author starrysky@date 2020/08/16@details 加载训练好的模型参数, 测试模型, 导出完整的模型供C++项目部署使用"""import syssys.path.append("./")import timeimport torchfrom learn import mymodelimport osimport pandas as pd# %%# print(os.getcwd())# os.chdir(os.getcwd() + r"\test")print(os.getcwd())device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# %%model = mymodel.LeNet()model.load_state_dict(torch.load("./model_param/state_dict.pt"))model.eval()print(model)# 加载样本和标签df = pd.read_csv("./素材/num/label.csv", index_col=0)df["predict"] = Nonedf["is_correct"] = Noneprint(df.columns)# %%ans = 0start_time = time.time()for i in range(df.shape[0]):image_path = df.iloc[i, 0]image = mymodel.default_loader(image_path)label = df.iloc[i, 1]x = mymodel.trans(image)x_ = x.view(1, 1, 28, 28)y_predict = model(x_).argmax(dim=1).item()# print(i, y_predict)df.iloc[i, 2] = y_predictdf.iloc[i, 3] = y_predict == labelif y_predict == label:ans += 1print("正确样本数:{0}, 正确率={1:.4f}".format(ans, ans / df.shape[0]))print("测试时间:{0:.4f}".format(time.time() - start_time))# %%df.to_csv("result.csv", index=False)image_path = df.iloc[0, 0]image = mymodel.default_loader(image_path)x = mymodel.trans(image)x_ = x.view(1, 1, 28, 28)traced_script_module = torch.jit.trace(model, x_)traced_script_module.save("./libtorch_model/model.pt")
小结
后续在C++和OpenCV环境下的部署。在C++环境下和小电脑上的运行时间还是未知数,但是鉴于当前LeNet的运行速度还是比较快的,后续可以考虑使用深度卷积神经网络的模型。
