150 行代码,用 numpy 手写了一个简单的神经网络

2019-07-12 14:22:54 +08:00
 hplar
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from numpy import linalg as LA

class Activation:
    def f(self, x, **args):
        raise NotImplementedError("Should have implemented this")

    def grad(self, y, dy):
        raise NotImplementedError("Should have implemented this")

class Linear(Activation):
    def f(self, x):
        return x

    def grad(self, y, dy):
        return dy

class Sigmoid(Activation):
    def f(self, x):
        return 1/(1+np.exp(-x))

    def grad(self, y, dy):
        return y*(1-y)*dy

class Relu(Activation):
    def f(self, x):
        return x*(x>0)

    def grad(self, y, dy):
        return dy*(y>0)

class Softmax(Activation):
    def f(self, x, axis=1):
        x = x-np.max(x, axis=axis, keepdims=True)
        return np.exp(x)/np.sum(np.exp(x), axis=axis, keepdims=True)

    def grad(self, y, dy):
        return y/(y.shape[0])+y*dy

class Dense:
    activation_map = {
        'relu': Relu,
        'softmax': Softmax,
        'sigmoid': Sigmoid,
        'linear': Linear,
    }
    def __init__(self, output_dim, input_dim=0, activation='relu'):
        self.output_dim = output_dim
        self.input_dim = input_dim
        if activation in self.activation_map:
            self.activation = self.activation_map[activation]()
        else:
            raise Exception('activation %s not implemented' % activation)

    def initialize_parameter(self):
        self.w = np.random.randn(self.input_dim, self.output_dim)*np.sqrt(6/(self.input_dim+self.output_dim))
        self.b = np.zeros((1, self.output_dim))

    def initialize_optimizer(self, optimizer, l_rate):
        self.optimizer = optimizer
        self.l_rate = l_rate
        if self.optimizer == 'adam':
            self.t, self.s_w, self.r_w, self.s_b, self.r_b = 0, 0, 0, 0, 0
            self.rho1, self.rho2, self.delta = 0.9, 0.999, 1e-8
        else:
            raise Exception('optimizer %s not implemented' % self.optimizer)

    def forward(self, x):
        self.x = x
        self.h = np.dot(self.x, self.w)+self.b
        self.a = self.activation.f(self.h)
        return self.a

    def backward(self, da):
        self.da = da
        self.dh = self.activation.grad(self.a, self.da)
        self.dw = np.dot(self.x.T, self.dh)
        self.db = (1/self.x.shape[0])*np.sum(self.dh, axis=0, keepdims=True)
        self.dx = np.dot(self.dh, self.w.T)
        return self.dx

    def update_parameter(self):
        if self.optimizer == 'adam':
            self.t = self.t+1
            self.s_w = self.rho1*self.s_w+(1-self.rho1)*self.dw
            self.r_w = self.rho2*self.r_w+(1-self.rho2)*(self.dw**2)
            s_w_ = self.s_w/(1-self.rho1**self.t)
            r_w_ = self.r_w/(1-self.rho2**self.t)
            self.w = self.w-self.l_rate*s_w_/(np.sqrt(r_w_)+self.delta)
            self.s_b = self.rho1*self.s_b+(1-self.rho1)*self.db
            self.r_b = self.rho2*self.r_b+(1-self.rho2)*(self.db**2)
            s_b_ = self.s_b/(1-self.rho1**self.t)
            r_b_ = self.r_b/(1-self.rho2**self.t)
            self.b = self.b-self.l_rate*s_b_/(np.sqrt(r_b_)+self.delta)
        else:
            raise Exception('optimizer %s not implemented' % self.optimizer)

class Sequential:
    def __init__(self):
        self.layers = []
        self.loss = 'categorical_crossentropy'
        self.optimizer = 'adam'

    def add(self, layer):
        self.layers.append(layer)

    def compile(self, loss='categorical_crossentropy', optimizer='adam', l_rate=0.001):
        self.loss = loss
        self.optimizer = optimizer
        for idx in range(len(self.layers)-1):
            self.layers[idx+1].input_dim = self.layers[idx].output_dim
        for layer in self.layers:
            layer.initialize_optimizer(optimizer, l_rate)

    def forward_propagation(self, x, y):
        for layer in self.layers:
            a = layer.forward(x)
            x = a
        if self.loss == 'categorical_crossentropy':
            loss = -(1/y.shape[0])*np.sum(np.log(a)*y)
        elif self.loss == 'mse':
            loss = 0.5*(1/y.shape[0])*np.square(LA.norm(a-y))
        else:
            raise Exception('loss %s not implemented' % self.loss)
        return a, loss

    def backward_propagation(self, a, y):
        if self.loss == 'categorical_crossentropy':
            da = -(1/y.shape[0])*(y/a)
        elif self.loss == 'mse':
            da = (1/y.shape[0])*(a-y)
        else:
            raise Exception('loss %s not implemented' % self.loss)
        for layer in self.layers[::-1]:
            da = layer.backward(da)
            layer.update_parameter()

    def fit(self, x, y, epochs=10, batch_size=200):
        for layer in self.layers:
            layer.initialize_parameter()
        batch_count = int(x.shape[0]/batch_size)
        for i in range(epochs):
            for j in range(batch_count):
                start, end = j*batch_size, (j+1)*batch_size
                a, _ = self.forward_propagation(x[start:end], y[start:end])
                self.backward_propagation(a, y[start:end])
            _, loss = self.forward_propagation(x, y)
            print("epoch %d/%d: loss %f" % (i+1, epochs, loss))

    def print_parameters(self):
        for idx, layer in enumerate(self.layers):
            print('layer %d parameters:' % (idx+1))
            print(layer.w, layer.b)

# A simple linear regression demo
if __name__ == '__main__':
    w, b = np.array([[1.0], [2.0], [3.0]]), 5
    x = np.random.randn(300, 3)*100
    noise = np.random.randn(300, 1)*0.1
    y = np.dot(x, w)+noise+b
    model = Sequential()
    model.add(Dense(1, input_dim=3, activation='linear'))
    model.compile(loss='mse', optimizer='adam')
    model.fit(x, y, epochs=5000, batch_size=100)
    model.print_parameters()
1920 次点击
所在节点    程序员
4 条回复
2pen
2019-07-12 14:29:41 +08:00
看不懂 再贱
shyrock
2019-07-12 15:35:32 +08:00
不错。适合教学。
owenliang
2019-07-12 16:06:41 +08:00
还好,有一本薄薄的书讲的非常清楚,看完就能实现,请参考我的博客:。

yuerblog.cc/2019/04/14/%e6%89%8b%e5%86%99python%e7%a5%9e%e7%bb%8f%e7%bd%91%e7%bb%9c/
bwael
2019-07-12 21:51:53 +08:00
@owenliang 好书,同荐

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/582371

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX