
程序员咋不秃头 2024-08-21 01:08:52

本文是对卷积神经网络(CNN)的简要介绍。本文详细介绍了PyTorch Lightning的优点,然后简要介绍了CNN组件的理论,并描述了使用PyTorch Lightning库从头开始编写的简单CNN架构的训练循环的实现。

为什么选择PyTorch Lightning?





简而言之,使用PyTorch Lightning我发现编写、阅读和调试都很容易。这些活动占据了我作为机器学习工程师大部分时间。此外,文档写得很好,包含许多教程,因此学习起来也很容易。


LeNet是学习或复习计算机视觉深度学习架构的良好起点。LeNet是由Yann LeCun等人于1998年设计的第一个成功的卷积神经网络(CNN)架构,用于手写数字识别。






修正线性单元(ReLU)f(x)= max(0,x)及其变种GeLU。图像采用知识共享许可证。 每个卷积层后面主要跟着一个激活函数,以添加非线性。如果没有这样做,模型将像常规线性模型一样行为,而不考虑其深度。



















其他经典架构,如AlexNet、VGG、ResNet和Inception,在架构的最后都包括一个FC层。然而,最近的架构取消了这一层,如MobileNet、YOLO、EfficientNet和Vision Transformers。







Input -> [[[Conv-> ReLU] * N ] -> Pool?] * M -> [[FC+ReLU] * K] -> FC -> Scores


0 ≤ N ≤ 3堆叠的卷积。M≥0的池化块。0≤ K< 3 FC堆叠层。


通常建议机器学习从业者使用现有的最先进的架构,而不是创建自己的架构。然而,在使用卷积网络时,了解空间约束是很重要的。例如,应用卷积层后,输入层大小(宽度或高度)为W,滤波器大小为F,填充大小为P,步幅大小为S,则输出特征图的大小为Output size = ((W -F + 2P) / S ) +1。对于每个卷积,需要选择参数W、F、P和S,使得输出大小为整数。通常添加填充可以解决大部分问题。



理论已经足够,现在我们将使用PyTorch Lightning实现LetNet CNN。由于其简单性和小型尺寸,选择了LeNet作为示例。


在PyTorch中,新模块继承自pytorch.nn.Module。在PyTorch Lighthing中,模型类继承自ligthning.pytorch.LightningModule。

你可以像使用 nn.Module 类一样使用 ligthning.pytorch.LightningModule,只是它包含了更多的功能。


输入通道的数量(对于灰度图像为 1,对于 RGB 图像为 3)。分类器中的类别数量(对于 MNIST 数据集为 10)。

在 PyTorch 中,模型分为两部分 init() 和 forward()。init() 声明了每个具有可学习参数的组件作为初始化方法。它还可以包含更多的声明,如激活函数。然后,forward() 方法在输入图像上连续应用所有的层和函数。

LeNet 架构由两个堆叠的卷积块组成,每个后面都跟着一个池化层。然后将结果传递给连续的全连接(FC)层,它们输出一个尺寸为 (batch_size, out_channels) 的张量,其中 out_channels 表示类别数量。


用于在运行 print(model) 时显示每个层之间张量大小模拟的 example_input_array 张量。

运行 print(model) 时自动记录模型。作者提供的图片。 从上表中,我们可以确认输出张量的大小为 (batch_size=16, num_classes=10)。

训练和验证期间将使用 TheAccuracy() 指标。具有可学习参数的层也被初始化。首先是两个{卷积 + 最大池化}块,然后是全连接层。# models/detection/lenet.py"""PyTorch reference: https://pytorch.org/tutorials/beginner/blitz/neural_networks_tutorial.html"""from __future__ import annotationsimport lightning.pytorch as plimport torchimport torch.nn as nnimport torch.nn.functional as Fimport torchmetricsclass LeNet(pl.LightningModule): def __init__(self, in_channels: int, out_channels: int, lr: float = 2e-4): """ Args: - in_channels: One for grayscale input image (which is the case for MNIST), 3 for RGB input image. - out_channels: Number ofes of theifier. 10 for MNIST. """ super().__init__() # Debugging tool to display intermediate input/output size of all your layer (called before fit) self.example_input_array = torch.Tensor(16, in_channels, 32, 32) self.learning_rate = lr self.train_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=out_channels) self.val_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=out_channels) self.test_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=out_channels) # [img_size] 32 -> conv -> 32 -> (max_pool) -> 16 # with 6 output activation maps self.conv_layer1 = nn.Sequential( nn.Conv2d( in_channels=in_channels, out_channels=6, kernel_size=5, stride=1, # Either resize (28x28) MNIST images to (32x32) or pad the imput to be 32x32 # padding=2, ), nn.MaxPool2d(kernel_size=2), ) # [img_size] 16 -> (conv) -> 10 -> (max pool) 5 self.conv_layer2 = nn.Sequential( nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0), nn.MaxPool2d(kernel_size=2, stride=2), ) # The activation size (number of values after passing through one layer) is getting gradually smaller and smaller. # The output is flatten and then used as a long input into the next dense layers. self.fc1 = nn.Linear(in_features=16 * 5 * 5, out_features=120) # 5 from the image dimension self.fc2 = nn.Linear(in_features=120, out_features=84) # "Softmax" layer = Linear + Softmax. self.fc3 = nn.Linear(in_features=84, out_features=out_channels)



为了简化前向调用,通常将堆叠的层表示为 nn.Sequential() 子模块。

第一个卷积层接收尺寸为 (32x32) 的图像,并在池化层中将尺寸除以 2 后输出尺寸为 (16x16) 的图像。

LeNet 期望输入图像尺寸为 (32x32),但现有的 MNIST 数据集图像尺寸为 (28x28)。你可以将图像调整大小,或者增加第一个卷积层的填充大小(如评论中所述)。否则,在两次下采样之后,最后一个卷积层输出的激活与第一个全连接(FC)层的矩阵乘法(其维度如下所示)之间存在尺寸不匹配:

self.fc1 = nn.Linear(in_features=16 * 5 * 5, out_features=120) # 5 from the image dimension self.fc2 = nn.Linear(in_features=120, out_features=84) # "Softmax" layer = Linear + Softmax. self.fc3 = nn.Linear(in_features=84, out_features=out_channels)

第二个卷积层的输入与第一个卷积层的输出滤波器数量相同(为 6)。

ReLU 和 MaxPool 的顺序在这里无关紧要。

与前面部分提到的不同,ReLU 激活函数在池化之前不会被调用。在这个实现中,ReLU 激活函数只在 forward() 调用中被调用。

卷积层应始终跟随一个激活函数以添加非线性。但是,如果卷积层之后也跟随一个池化层,顺序就无关紧要了。两种操作都是可交换的 MaxPool(Relu(x)) = Relu(MaxPool(x))。事实上,我们可以取局部区域的最大值,并将所有负值设置为 0,或者将所有负值设置为 0,并取每个局部区域的最大值。


第一个 FC 层接收尺寸为 (number_output_filter_from_conv2 * previous_activation_width * previous_activation_height) 的张量。输出激活的尺寸通过三个 FC 层逐渐减小。


# Method of LetNet in models/detection/lenet.pydef forward(self, x: torch.Tensor) -> torch.Tensor: x = F.relu(self.conv_layer1(x)) x = F.relu(self.conv_layer2(x)) x = torch.flatten(x, 1) # flatten all dimensions except the batch dimension x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x

自动定义了计算梯度的 backward 函数,当使用 autograd 时。

在大多数 PyTorch 实现中,最后一层(有时也称为 softmax 层)输出原始激活值,其中每个数字对应一个分数。这里 softmax 函数在前向传播中未被调用,而是内置在交叉熵损失函数中。


在与之前相同的文件中,在类 LeNet(pl.LightningModule) 下覆盖了所有核心函数。

优化器和调度器:configure_optimizers()def configure_optimizers(self) -> torch.optim.Adam: return torch.optim.Adam(self.parameters(), lr=self.learning_rate)训练循环:training_step()验证循环:validation_step() # Methods in LeNet in models/detection.lenet.py ############################### # --- For Pytorch Lightning --- ############################### def validation_step( self, batch: list[torch.Tensor, torch.Tensor], batch_idx: int, verbose: bool = True, ) -> torch.Tensor: """Function called when using `trainer.validate()` with trainer a lightning `Trainer` instance.""" x, y = batch logit_preds = self(x) loss = F.cross_entropy(logit_preds, y) self.val_accuracy.update(torch.argmax(logit_preds, dim=1), y) self.log("val_loss", loss) self.log("val_acc_step", self.val_accuracy, on_epoch=True) return loss def training_step( self, batch: list[torch.Tensor, torch.Tensor], batch_idx: int, ) -> torch.Tensor: """Function called when using `trainer.fit()` with trainer a lightning `Trainer` instance.""" x, y = batch logit_preds = self(x) loss = F.cross_entropy(logit_preds, y) self.train_accuracy.update(torch.argmax(logit_preds, dim=1), y) self.log("train_acc_step", self.train_accuracy, on_step=True, on_epoch=True, logger=True) # logs metrics for each training_step, # and the average across the epoch, to the progress bar and logger self.log("train_loss", loss, on_step=True, on_epoch=True, logger=True) return loss

正如你可能注意到的那样,上述函数非常简短。无需将变量移到 to(device) ,也不需要使用 optimizer.zero_grad() 删除梯度,也不需要使用 loss.backward() 计算新梯度。模型模式的切换也由 PyTorch Lightning 库自己处理 model.eval() ,model.train() 。

你可以注意到这里调用了 log() 方法。该方法在适当时保存和显示结果。


log() 方法有几个选项:


根据 log() 的调用位置不同,Lightning 会自动确定正确的模式。当然,你也可以通过手动设置标志来覆盖默认行为。

PyTorch Lightning 的另一个好功能是验证健全性检查:

你可能注意到了记录的“验证健全性检查”一词。这是因为 Lightning 在开始训练之前运行了 2 个批次的验证。这是一种单元测试,用于确保如果在验证循环中有 bug,你不需要等待一个完整的 epoch 才能发现。




def test_step( self, batch: list[torch.Tensor, torch.Tensor], batch_idx: int, ): """Function called when using `trainer.test()` with trainer a lightning `Trainer` instance.""" x, y = batch logit_preds = self(x) loss = F.cross_entropy(logit_preds, y) self.test_accuracy.update(torch.argmax(logit_preds, dim=1), y) self.log_dict({"test_loss": loss, "test_acc": self.test_accuracy}) def predict_step( self, batch: list[torch.Tensor, torch.Tensor], batch_idx: int ) -> tuple[torch.Tensor, torch.Tensor]: """Function called when using `trainer.predict()` with trainer a lightning `Trainer` instance.""" x, _ = batch logit_preds = self(x) softmax_preds = F.softmax(logit_preds, dim=1) return x, softmax_preds

管理 MNIST 数据集

你可以使用常规的 PyTorch DataLoader 类或 PyTorch Lightning DataModule。在本文中,我使用 PyTorch Lightning DataModule 实现了数据集和数据加载。它旨在将所有相对于一个数据集的信息集中在一个单一文件中。它包括数据下载、数据拆分、数据加载等功能。

在本教程中,我们使用大小为 28x28 的图像组成的 MNIST。

这里是管理 MNIST 数据集的数据模块的实现。它包括设置标准参数:


以及下载和处理功能在 prepare_data() 和 setup() 中。

# datasets/mnist.py"""More at https://lightning.ai/docs/pytorch/stable/data/datamodule.html"""import loggingfrom pathlib import Pathimport lightning.pytorch as plfrom torch.utils.data import DataLoader, random_splitfrom torchvision import transformsfrom torchvision.datasets import MNIST# Create a loggerlogger = logging.getLogger(Path(__file__).stem)logger.setLevel(logging.INFO)_DEFAULT_MNIST_BATCH_SIZE = 32_DEFAULT_RESIZE_SIZE = 32class MNISTDataModule(pl.LightningDataModule): def __init__(self, data_dir: str, batch_size: int = _DEFAULT_MNIST_BATCH_SIZE): super().__init__() self.data_dir = data_dir self.batch_size = batch_size self.transform = transforms.Compose( [ transforms.ToTensor(), transforms.Resize((_DEFAULT_RESIZE_SIZE, _DEFAULT_RESIZE_SIZE)), transforms.Normalize((0.1307,), (0.3081,)), ] ) def prepare_data(self): """Ensure we download using one process only on CPU and avoid data corruption when downloading the data. It's recommended to avoid creating attributes `self.*` because the state won't be available for other processes. """ MNIST(self.data_dir, train=True, download=True, transform=self.transform) MNIST(self.data_dir, train=False, download=True, transform=self.transform) def setup(self, stage: str): """Is called from every process across all nodes. It also uses every GPUs to perform data processing and state assignement. `teardown` is its counterpart used to clean the states. """ logger.info(f"Stage: {stage}") if stage == "test" or stage == "predict": self.mnist_test = MNIST(self.data_dir, train=False, download=True, transform=self.transform) elif stage == "fit" or stage == "validate": mnist_full = MNIST(self.data_dir, train=True, transform=self.transform) self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000])


DataModule 的 prepare_data() 和 setup() 方法。prepare_data() 方法在一个 CPU 上运行,用于在本地下载数据。而 setup() 方法是一个并行进程,可以运行数据处理作业。这些方法在每次调用训练器的方法时都会被调用,比如 trainer.fit()、trainer.validate() 等等。pl.LightningModule configure_optimizers() 初始化优化器。

然后,在同一个类 MNISTDataModule 中,我们实现了不同的数据加载器:

def train_dataloader(self) -> DataLoader: """Called by Trainer `.fit` method""" return DataLoader(self.mnist_train, batch_size=self.batch_size) def val_dataloader(self) -> DataLoader: """Called by Trainer `validate()` and `validate()` method.""" return DataLoader(self.mnist_val, batch_size=self.batch_size) def test_dataloader(self) -> DataLoader: """Called by Trainer `test()` method.""" return DataLoader(self.mnist_test, batch_size=self.batch_size) def predict_dataloader(self) -> DataLoader: """Called by Trainer `predict()` method. Use the same data as the test_dataloader.""" return DataLoader(self.mnist_test, batch_size=self.batch_size, num_workers=3)DataModule 的 train_dataloader() 检索训练 DataLoader。pl.LightningModule training_step() 在从训练 DataLoader 获得的小批量上运行前向传播和反向传播。该方法重复调用,直到训练 DataLoader 中的所有样本都被看到一次。pl.LightningModule validation_step() 计算验证数据集上的损失和指标。当达到最大 epoch 数或验证损失停止下降(提前停止)时,训练停止。




解析 CLI 参数并调用主函数if __name__ == "__main__": parser = ArgumentParser(description=__doc__) parser.add_argument("--model", default="lenet", type=str, help="Provide an implemented model.") parser.add_argument("--device", default=0, type=int, help="Select a CUDA device.") parser.add_argument("--max-epoch", default=10, type=int, help="Max number of epochs.") parser.add_argument("--out-dir", type=Path, help="Path to output directory") parser.add_argument( "--early-stopping", action="store_true", help="If True, stops the training if validation loss stops decreasing." ) args = parser.parse_args() main( model_choice=args.model, device=args.device, max_epoch=args.max_epoch, out_dir=args.out_dir, early_stopping=args.early_stopping, )主函数包括模型选择、早停回调的创建,以及对训练器的调用:trainer.fit(model, datamodule=data_module)、验证 trainer.validate(datamodule=data_module)、测试 trainer.test(datamodule=data_module) 和预测 output_preds = trainer.predict(datamodule=data_module, ckpt_path=”best”)。def main( model_choice: str, device: int, max_epoch: int, out_dir: Path | None, early_stopping: bool | None,): accelerator = "gpu" if torch.cuda.is_available() else "cpu" if out_dir is None: out_dir = Path(__file__).parent / "output" out_dir.mkdir(parents=True, exist_ok=True) # Select architecture if model_choice == "lenet": model = LeNet(in_channels=1, out_channels=10) data_module = MNISTDataModule(data_dir=_PATH_DATASETS, batch_size=_BATCH_SIZE) else: raise NotImplementedError(f"{model_choice} is not implemented!") callbacks = ( [ EarlyStopping( monitor="val_loss", min_delta=0.00, patience=_EARLY_STOPPING_PATIENCE, verbose=True, mode="min", ) ] if early_stopping else [] ) # If your machine has GPUs, it will use the GPU Accelerator for training. trainer = L.Trainer( accelerator=accelerator, devices=[device], strategy="auto", max_epochs=max_epoch, callbacks=callbacks, default_root_dir=out_dir, ) # Train the model ⚡ # data_module.setup(stage="fit") # Is called by trainer.fit(). # Call training_step + validation_step for all the epochs. trainer.fit(model, datamodule=data_module) # Validate trainer.validate(datamodule=data_module) # Automatically auto-loads the best weights from the previous run. # data_module.setup(stage="test") # Is called by trainer.test(). # The checkpoint path is logged on the terminal. trainer.test(datamodule=data_module) # Run the prediction on the test set and save a subset of the resulting prediction along with the # original image. output_preds = trainer.predict(datamodule=data_module, ckpt_path="best") img_tensors, softmax_preds = zip(*output_preds) out_dir_imgs = out_dir / "test_images" out_dir_imgs.mkdir(exist_ok=True, parents=True) save_results( img_tensors=img_tensors, output_tensors=softmax_preds, out_dir=out_dir_imgs, )保存预测图像的函数(主要用于调试)。def save_results( img_tensors: list[torch.Tensor], output_tensors: list[torch.Tensor], out_dir: Path, max_number_of_imgs: int = 10): """Save test results as images in the provided output directory. Args: img_tensors: List of the tensors containing the input images. output_tensors: List of softmax activation from the trained model. out_dir: Path to output directory. max_number_of_imgs: Maximum number of images to output from the provided images. The images will be selected randomly. """ selected_img_indices = random.sample(range(len(img_tensors)), min(max_number_of_imgs, len(img_tensors))) for img_indice in selected_img_indices: # Take the first instance of the batch (index 0) img_filepath = out_dir / f"{img_indice}_predicted_{torch.argmax(output_tensors[img_indice], dim=1)[0]}.png" torchvision.utils.save_image(img_tensors[img_indice][0], fp=img_filepath)


# Train.py script#!/usr/bin/python3"""Example training script to fit a model on MNIST dataset."""from __future__ import annotations # Enable PEP 563 for Python 3.7from argparse import ArgumentParserfrom lightning.pytorch.callbacks.early_stopping import EarlyStoppingfrom pathlib import Pathimport lightning as Limport osimport randomimport torchimport torchvisionfrom datasets.mnist import MNISTDataModulefrom models import AlexNet, LeNet_PATH_DATASETS = os.environ.get("PATH_DATASETS", ".")_BATCH_SIZE = 64 if torch.cuda.is_available() else 32_EARLY_STOPPING_PATIENCE = 4 # epochsdef save_results( img_tensors: list[torch.Tensor], output_tensors: list[torch.Tensor], out_dir: Path, max_number_of_imgs: int = 10): """Save test results as images in the provided output directory. Args: img_tensors: List of the tensors containing the input images. output_tensors: List of softmax activation from the trained model. out_dir: Path to output directory. max_number_of_imgs: Maximum number of images to output from the provided images. The images will be selected randomly. """ selected_img_indices = random.sample(range(len(img_tensors)), min(max_number_of_imgs, len(img_tensors))) for img_indice in selected_img_indices: # Take the first instance of the batch (index 0) img_filepath = out_dir / f"{img_indice}_predicted_{torch.argmax(output_tensors[img_indice], dim=1)[0]}.png" torchvision.utils.save_image(img_tensors[img_indice][0], fp=img_filepath)def main( model_choice: str, device: int, max_epoch: int, out_dir: Path | None, early_stopping: bool | None,): accelerator = "gpu" if torch.cuda.is_available() else "cpu" if out_dir is None: out_dir = Path(__file__).parent / "output" out_dir.mkdir(parents=True, exist_ok=True) # Select architecture if model_choice == "lenet": model = LeNet(in_channels=1, out_channels=10) data_module = MNISTDataModule(data_dir=_PATH_DATASETS, batch_size=_BATCH_SIZE) else: raise NotImplementedError(f"{model_choice} is not implemented!") callbacks = ( [ EarlyStopping( monitor="val_loss", min_delta=0.00, patience=_EARLY_STOPPING_PATIENCE, verbose=True, mode="min", ) ] if early_stopping else [] ) # If your machine has GPUs, it will use the GPU Accelerator for training. trainer = L.Trainer( accelerator=accelerator, devices=[device], strategy="auto", max_epochs=max_epoch, callbacks=callbacks, default_root_dir=out_dir, ) # Train the model ⚡ # data_module.setup(stage="fit") # Is called by trainer.fit(). # Call training_step + validation_step for all the epochs. trainer.fit(model, datamodule=data_module) # Validate trainer.validate(datamodule=data_module) # Automatically auto-loads the best weights from the previous run. # data_module.setup(stage="test") # Is called by trainer.test(). # The checkpoint path is logged on the terminal. trainer.test(datamodule=data_module) # Run the prediction on the test set and save a subset of the resulting prediction along with the # original image. output_preds = trainer.predict(datamodule=data_module, ckpt_path="best") img_tensors, softmax_preds = zip(*output_preds) out_dir_imgs = out_dir / "test_images" out_dir_imgs.mkdir(exist_ok=True, parents=True) save_results( img_tensors=img_tensors, output_tensors=softmax_preds, out_dir=out_dir_imgs, )if __name__ == "__main__": parser = ArgumentParser(description=__doc__) parser.add_argument("--model", default="lenet", type=str, help="Provide an implemented model.") parser.add_argument("--device", default=0, type=int, help="Select a CUDA device.") parser.add_argument("--max-epoch", default=10, type=int, help="Max number of epochs.") parser.add_argument("--out-dir", type=Path, help="Path to output directory") parser.add_argument( "--early-stopping", action="store_true", help="If True, stops the training if validation loss stops decreasing." ) args = parser.parse_args() main( model_choice=args.model, device=args.device, max_epoch=args.max_epoch, out_dir=args.out_dir, early_stopping=args.early_stopping, )


在我的配备了 NVIDIA GeForce RTX 3070 GPU 的计算机上,运行 python -m train --early-stopping 进行 10 轮训练(批量大小为 64)不到两分钟。

当训练达到默认的最大 epoch 数(10)时,PyTorch Lightning 分别在验证集和测试集上输出损失和准确率的结果:

10 轮训练后模型在未见数据上的结果。 经过训练的模型在未见数据上获得了近 99% 的准确率。

脚本还保存了来自测试集的 10 张图像以及预测的类别:


在本文中,我们发现了 PyTorch Lightning 的魔力,然后复习了 CNN 的关键技术概念,并从头开始演示了一个简单 CNN 结构的完整实现训练循环。

希望这篇入门级文章能够帮助你快速可靠地实现基本架构,并帮助你在学习过程中建立更坚实的基础。你可以查看我的公共深度学习仓库获取更多内容 https://github.com/bledem/deep-learning。


斯坦福计算机视觉课程Andrew NG https://www.youtube.com/watch?v=c1RBQzKsDCk&list=PLpFsSf5Dm-pd5d3rjNtIXUHT-v7bdaEIe&index=115&ab_channel=DeepLearningAIPyTorch Lightning 文档 https://www.pytorchlightning.ai/tutorials
0 阅读:0