针对于设计自定义神经网络框架,最基本的过程包括两个阶段,就是训练和预测,做了个思维导图就是这样的:
神经网络框架的抽象实现 神经网络的预测就是训练过程的一部分,所以,我们可以对神经网络中的基本组件进行抽象;包括四部分,分别为数据输入,计算层,损失计算和优化器:
作用分别为:
输入数据: 这个是神经网络中数据输入的基本内容,一般称其为tensor
;
计算层: 负责接收上一层的输入,进行该层的运算,并将结果输出给下一层,由于tensor
的流动有前向和反向两个方向,因此对于每种类型的网络层,我们都需要同时实现forward
和backward
两种运算;
激活层: 通常与计算层结合在一起对每个计算层进行非线性分割;
损失计算: 在给定模型预测值与真实值之后,使用该组件计算损失之以及关于最后一层的梯度;
优化器 :负责使用梯度更新模型的参数。
自定义神经网络框架的具体实现 tensor数据包装 张量是神经网络的基本数据单位;
可以直接生成
1 2 3 import numpy as nptensor = np.random.random(size = (10 , 28 , 28 , 1 ))
layer计算层的基类与实现 计算层是对输入的数据进行计算,在这一层中输入数据的前向计算在forward
过程中完成;而对于计算层来说,除了forward
外,还需要实现一个backward
的过程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 """Base class for Layer""" class Layer : def __init__ (self ): self .params = {p: None for p in self .param_names} self .nt_params = {p: None for p in self .nt_param_names} self .initializers = {} self .grads = {} self .shapes = {} self ._is_training = True self ._is_init = False self .ctx = {} def __repr__ (self ): shape = None if not self .shapes else self .shapes return f"layer: {self.name} \tshape: {shape} " def forward (self, inputs ): raise NotImplementedError def backward (self, grad ): raise NotImplementedError @property def is_init (self ): return self ._is_init @is_init.setter def is_init (self, is_init ): self ._is_init = is_init for name in self .param_names: self .shapes[name] = self .params[name].shape @property def is_training (self ): return self ._is_training @is_training.setter def is_training (self, is_train ): self ._is_training = is_train @property def name (self ): return self .__class__.__name__ @property def param_names (self ): return () @property def nt_param_names (self ): return () def _init_params (self ): for name in self .param_names: self .params[name] = self .initializers[name](self .shapes[name]) self .is_init = True
计算层——全连接层 全连接层在整个卷积神经网络中起到分类器的作用;
在全连接层的计算过程中,forward
接受上层
的输入inputs
实现wx + b
的计算;backward
正好相反,接受来自反向的梯度:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from initializer import XavierUniform from initializer import Zeros class Dense(Layer): def __init__(self, num_out, w_init = XavierUniform(), b_init = Zeros()): super().__init__() self.initializers = {"w": w_init, "b": b_init} self.shape = {"w": [None, num_out], "b": [None, num_out]} def forward(self, inputs): if not self.is_init: self.shapes["w"][0] = inputs.shape[1] self._init_params() self.ctx = {"X": inputs} return inputs @ self.params["w"] + self.params["b"] def backward(self, grad): self.grads["w"] = self.ctx["X"].T @ grad self.grads["b"] = np.sum(grad, axis=0) return grad @ self.params["w"].T @property def param_names(self): return "w", "b"
激活层的基类与实现 激活函数可以看作是一个网络层,通过继承Layer
基类实现激活函数类,这里使用了ReLU
激活函数;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class Activation (Layer ): def __init__ (self, name ): super ().__init__(name) self .inputs = None def forward (self, inputs ): self .inputs = inputs return self .forward_func(inputs) def backward (self, grad ): return self .backward_func(self .inputs) * grad def forward_func (self, inputs ): raise NotImplementedError def backward_func (self, inputs ): raise NotImplementedError class Relu (Layer ): def __init__ (self ): super ().__init__("ReLU" ) def forward_func (self, x ): return np.maximum(x, 0.0 ) def backward_func (self, x ): return x > 0.0
Net辅助网络更新的基类 对于神经网络来说,误差要在整个模型中传播,即正向传播(Forward)和反向传播(Backward)。正向传播的实现只需要按顺序遍历所有层,每层计算的输出作为下一层的输入;反向传播则需要逆序遍历所有层,将每层的梯度作为下一层的输入。
所以这一部分的具体实现需要建立一个辅助网络更新的网络基类,其作用是对每一层进行forward
和backward
计算,并更新各个层的参数;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class Net (object ): def __init__ (self, layers ): self .layers = layers def forward (self, inputs ): for layer in self .layers: inputs = layer.forward(inputs) def backward (self, grad ): all_greds = [] for layer in reversed (self .layers): grad = layer.backward(grad) all_greds.append(grad) return all_greds[::-1 ] def get_params_and_grads (self ): for layer in self .layers: yield layer.params, layer.grads def get_parameters (self ): return [layer.params for layer in self .layers] def set_parameters (self, params ): for i, layer in enumerate (self .layers): for key in layer.params.keys(): layer.params[key] = params[i]
损失函数与优化器 根据前两篇文章也可以看出,对于神经网络的训练,损失函数的计算以及优化器的选择(参数优化)是必不可少的,首先是损失函数:
1 2 3 4 5 6 class BaseLoss (object ): def loss (self, predicted, actual ): raise NotImplementedError def grad (self, predicted, actual ): raise NotImplementedError
优化器的基类需要实现根据当前梯度,计算返回实际优化时每个参数改变的步长:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class BaseOptimizer (object ): def __init__ (self, lr, weight_decay ): self .lr = lr self .weight_decay = weight_decay def compute_step (self, grads, params ): step = list () flatten_grads = np.concatenate( [np.revel(v) for grad in grads for v in grad.values()] ) flatten_step = self ._compute_step(flatten_grads) p = 0 for param in params: layer = dict () for k, v in param.items(): block = np.prod(v.shape) _step = flatten_step[p:p+block].reshape(v.shape) _step -= self .weight_decay * v layer[k] = _step p += block step.append(layer) return step def _compute_step (self, grads ): raise NotImplementedError
对于损失函数的具体实现,常用多分类损失函数——多分类Softmax交叉熵,数学形式如下: $$ cross(y_{true}, y_{pred}) = -\sum_{i = 1}^Ny(i)\times\log(y\ _pred(i)) $$ 具体实现为:
1 2 3 4 5 6 7 8 9 10 11 12 13 class CrossEntropyLoss (BaseLoss ): def loss (self, predicted, actual ): m = predicted.shape[0 ] exps = np.exp(predicted - np.max (predicted, axis=1 , keepdims=True )) p = exps / np.sum (exps, axis=1 , keepdims=True ) nll = -np.log(np.sum (p * actual, axis=1 )) return np.sum (nll) / m def grad (self, predicted, actual ): m = predicted.shape[0 ] grad = np.copy(predicted) grad -= actual return grad / m
整体model类实现 Model
类实现了我们一开始设计的三个接口。在forward
方法中,直接调用net的forward方法,在backword
方法中,把net、loss、optimizer串联起来,先计算损失,然后反向传播得到梯度,由optimizer计算步长,最后通过apply_gard对参数进行更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Model (object ): def __init__ (self, net, loss, optimizer ): self .net = net self .loss = loss self .optimizer = optimizer def forward (self, inputs ): return self .net.forward(inputs) def backward (self, preds, targets ): loss = self .loss.loss(preds, targets) grad = self .loss.grad(preds, targets) grads = self .net.backward(grad) params = self .net.get.parameters() step = self .optimizer.compute_step(grads, params) return loss, step def apply_grad (self, grads ): for grad, (param, _) in zip (grads, self .net.get_params_and_grads()): for k, v in param.items(): param[k] += grad[k]
最终实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import numpy as npdef get_one_hot (targets, nb_classes=10 ): return np.eye(nb_classes)[np.array(targets).reshape(-1 )] train_x = np.load("../MNIST/mnist/mnist/x_train.npy" ) train_x = np.reshape(train_x, [60000 , 784 ]) train_y = get_one_hot(np.load("../MNIST/mnist/mnist/y_train_label.npy" )) import layer, model, net, loss, optimizernet = net.Net([ layer.Dense(200 ), layer.ReLU(), layer.Dense(100 ), layer.ReLU(), layer.Dense(70 ), layer.ReLU(), layer.Dense(30 ), layer.ReLU(), layer.Dense(10 ), ]) model = model.Model(net = net, loss = loss.SoftmaxCrossEntropy(), optimizer = optimizer.Adam(lr = 2e-4 )) loss_list = list () train_num = 60000 // 128 for epoch in range (20 ): train_loss = 0 for i in range (train_num): start = i * 128 end = (i + 1 ) * 128 inputs = train_x[start:end] targets = train_y[start:end] pred = model.forward(inputs) loss, grads = model.backward(pred, targets) model.apply_grad(grads) if (i + 1 ) % 10 == 0 : test_pred = model.forward(inputs) test_pred_idx = np.argmax(test_pred, axis=1 ) real_pred_idx = np.argmax(targets, axis=1 ) counter = 0 for pre, rel in zip (test_pred_idx, real_pred_idx): if pre == rel: counter += 1 print ("train_loss: " , round (loss, 2 ), "accuracy: " , round (counter / 128 , 2 ))