V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
monkeyNik
V2EX  ›  分享创造

开源 C 库实现 HTTP 服务器:多线程+事件模型+外挂式跟踪技术

  •  
  •   monkeyNik ·
    Water-Melon · 2023-01-08 00:52:21 +08:00 · 547 次点击
    这是一个创建于 490 天前的主题,其中的信息可能已经有所发展或是发生改变。

    本文主要介绍 Melon 库中的一种跟踪技术,并以一个 HTTP 服务器的实现和使用为例进行说明。

    关于 Melon 库,这是一个开源的 C 语言库,这个库不依赖其他开源第三方库,因此安装方便,开箱即用。并且中英文文档详细,便于作为工具书进行查阅。Github 仓库:传送门

    闲话少叙,我们直接上代码:

    #include <stdio.h>
    #include <stdlib.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <errno.h>
    #include <arpa/inet.h>
    #include <sys/time.h>
    #include "mln_core.h"
    #include "mln_log.h"
    #include "mln_http.h"
    #include "mln_iothread.h"
    #include "mln_trace.h"
    
    
    static void *mln_thread_entry(void *args);
    static void mln_accept(mln_event_t *ev, int fd, void *data);
    static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil);
    static void mln_recv(mln_event_t *ev, int fd, void *data);
    static void mln_quit(mln_event_t *ev, int fd, void *data);
    static void mln_send(mln_event_t *ev, int fd, void *data);
    static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail);
    
    static void mln_accept(mln_event_t *ev, int fd, void *data)
    {
        mln_tcp_conn_t *connection;
        mln_http_t *http;
        int connfd;
        socklen_t len;
        struct sockaddr_in addr;
    
        while (1) {
            memset(&addr, 0, sizeof(addr));
            len = sizeof(addr);
            connfd = accept(fd, (struct sockaddr *)&addr, &len);
            if (connfd < 0) {
                if (errno == EAGAIN) break;
                if (errno == EINTR) continue;
                perror("accept");
                exit(1);
            }
    
            connection = (mln_tcp_conn_t *)malloc(sizeof(mln_tcp_conn_t));
            if (connection == NULL) {
                fprintf(stderr, "3No memory.\n");
                close(connfd);
                continue;
            }
            if (mln_tcp_conn_init(connection, connfd) < 0) {
                fprintf(stderr, "4No memory.\n");
                close(connfd);
                free(connection);
                continue;
            }
    
            http = mln_http_init(connection, NULL, mln_http_recv_body_handler);
            if ( http == NULL) {
                fprintf(stderr, "5No memory.\n");
                mln_tcp_conn_destroy(connection);
                free(connection);
                close(connfd);
                continue;
            }
    
            if (mln_event_fd_set(ev, \
                                 connfd, \
                                 M_EV_RECV|M_EV_NONBLOCK, \
                                 M_EV_UNLIMITED, \
                                 http, \
                                 mln_recv) < 0)
            {
                fprintf(stderr, "6No memory.\n");
                mln_http_destroy( http);
                mln_tcp_conn_destroy(connection);
                free(connection);
                close(connfd);
                continue;
            }
        }
    }
    
    static void mln_quit(mln_event_t *ev, int fd, void *data)
    {
        mln_http_t *http = (mln_http_t *)data;
        mln_tcp_conn_t *connection = mln_http_get_connection( http);
    
        mln_event_fd_set(ev, fd, M_EV_CLR, M_EV_UNLIMITED, NULL, NULL);
        mln_http_destroy( http);
        mln_tcp_conn_destroy(connection);
        free(connection);
        close(fd);
    }
    
    static void mln_recv(mln_event_t *ev, int fd, void *data)
    {
        mln_http_t *http = (mln_http_t *)data;
        mln_tcp_conn_t *connection = mln_http_get_connection( http);
        int ret, rc;
        mln_chain_t *c;
    
        while (1) {
            ret = mln_tcp_conn_recv(connection, M_C_TYPE_MEMORY);
            if (ret == M_C_FINISH) {
                continue;
            } else if (ret == M_C_NOTYET) {
                c = mln_tcp_conn_remove(connection, M_C_RECV);
                if (c != NULL) {
                    rc = mln_http_parse( http, &c);
                    if (c != NULL) {
                        mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV);
                    }
                    if (rc == M_HTTP_RET_OK) {
                        return;
                    } else if (rc == M_HTTP_RET_DONE) {
                        mln_send(ev, fd, data);
                    } else {
                        fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error( http));
                        mln_quit(ev, fd, data);
                        return;
                    }
                }
                break;
            } else if (ret == M_C_CLOSED) {
                c = mln_tcp_conn_remove(connection, M_C_RECV);
                if (c != NULL) {
                    rc = mln_http_parse( http, &c);
                    if (c != NULL) {
                        mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV);
                    }
                    if (rc == M_HTTP_RET_ERROR) {
                        fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error( http));
                    }
                }
                mln_quit(ev, fd, data);
                return;
            } else if (ret == M_C_ERROR) {
                mln_quit(ev, fd, data);
                return;
            }
        }
    }
    
    static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil)
    {
        mln_u32_t method = mln_http_get_method( http);
        mln_trace("s", method == M_HTTP_GET? "GET": "OTHERS");
        if (method == M_HTTP_GET)
            return M_HTTP_RET_DONE;
        mln_http_set_error( http, M_HTTP_NOT_IMPLEMENTED);
        return M_HTTP_RET_ERROR;
    }
    
    static void mln_send(mln_event_t *ev, int fd, void *data)
    {
        mln_http_t *http = (mln_http_t *)data;
        mln_tcp_conn_t *connection = mln_http_get_connection( http);
        mln_chain_t *c = mln_tcp_conn_get_head(connection, M_C_SEND);
        int ret;
    
        if (c == NULL) {
            mln_http_reset( http);
            mln_http_set_status( http, M_HTTP_OK);
            mln_http_set_version( http, M_HTTP_VERSION_1_0);
            mln_http_set_type( http, M_HTTP_RESPONSE);
            mln_http_set_handler( http, mln_http_send_body_handler);
            mln_chain_t *body_head = NULL, *body_tail = NULL;
            if (mln_http_generate( http, &body_head, &body_tail) == M_HTTP_RET_ERROR) {
                fprintf(stderr, "mln_http_generate() failed. %u\n", mln_http_get_error( http));
                mln_quit(ev, fd, data);
                return;
            }
            mln_tcp_conn_append_chain(connection, body_head, body_tail, M_C_SEND);
        }
    
        while ((c = mln_tcp_conn_get_head(connection, M_C_SEND)) != NULL) {
            ret = mln_tcp_conn_send(connection);
            if (ret == M_C_FINISH) {
                mln_quit(ev, fd, data);
                break;
            } else if (ret == M_C_NOTYET) {
                mln_chain_pool_release_all(mln_tcp_conn_remove(connection, M_C_SENT));
                mln_event_fd_set(ev, fd, M_EV_SEND|M_EV_APPEND|M_EV_NONBLOCK, M_EV_UNLIMITED, data, mln_send);
                return;
            } else if (ret == M_C_ERROR) {
                mln_quit(ev, fd, data);
                return;
            } else {
                fprintf(stderr, "Shouldn't be here.\n");
                abort();
            }
        }
    }
    
    static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail)
    {
        mln_u8ptr_t buf;
        mln_alloc_t *pool = mln_http_get_pool( http);
        mln_string_t cttype_key = mln_string("Content-Type");
        mln_string_t cttype_val = mln_string("text/html");
    
        buf = mln_alloc_m(pool, 5);
        if (buf == NULL) {
            mln_http_set_error( http, M_HTTP_INTERNAL_SERVER_ERROR);
            return M_HTTP_RET_ERROR;
        }
        memcpy(buf, "hello", 5);
    
        if (mln_http_set_field( http, &cttype_key, &cttype_val) == M_HTTP_RET_ERROR) {
            mln_http_set_error( http, M_HTTP_INTERNAL_SERVER_ERROR);
            return M_HTTP_RET_ERROR;
        }
    
        mln_string_t ctlen_key = mln_string("Content-Length");
        mln_string_t ctlen_val = mln_string("5");
        if (mln_http_set_field( http, &ctlen_key, &ctlen_val) == M_HTTP_RET_ERROR) {
            mln_http_set_error( http, M_HTTP_INTERNAL_SERVER_ERROR);
            return M_HTTP_RET_ERROR;
        }
    
        mln_chain_t *c = mln_chain_new(pool);
        if (c == NULL) {
            mln_http_set_error( http, M_HTTP_INTERNAL_SERVER_ERROR);
            return M_HTTP_RET_ERROR;
        }
        mln_buf_t *b = mln_buf_new(pool);
        if (b == NULL) {
            mln_chain_pool_release(c);
            mln_http_set_error( http, M_HTTP_INTERNAL_SERVER_ERROR);
            return M_HTTP_RET_ERROR;
        }
        c->buf = b;
        b->left_pos = b->pos = b->start = buf;
        b->last = b->end = buf + 5;
        b->in_memory = 1;
        b->last_buf = 1;
        b->last_in_chain = 1;
    
        if (*body_head == NULL) {
            *body_head = *body_tail = c;
        } else {
            (*body_tail)->next = c;
            *body_tail = c;
        }
    
        return M_HTTP_RET_DONE;
    }
    
    static void *mln_thread_entry(void *args)
    {
        mln_event_dispatch((mln_event_t *)args);
        return NULL;
    }
    
    int main(int argc, char *argv[])
    {
        mln_event_t *ev;
        mln_iothread_t t;
        struct sockaddr_in addr;
        int val = 1, listenfd;
        int nthread = 1;
        mln_u16_t port = 1234;
        mln_s8_t ip[] = "0.0.0.0";
        struct mln_core_attr cattr;
        struct mln_iothread_attr tattr;
    
        /* init library */
        cattr.argc = 0;
        cattr.argv = NULL;
        cattr.global_init = NULL;
        cattr.master_process = NULL;
        cattr.worker_process = NULL;
        mln_core_init(&cattr);
    
        /* create event instance */
        if ((ev = mln_event_new()) == NULL) {
            mln_log(error, "event new error.\n");
            return -1;
        }
    
        /* set listen fd */
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        if (listenfd < 0) {
            mln_log(error, "listen socket error\n");
            return -1;
        }
        if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
            mln_log(error, "setsockopt error\n");
            return -1;
        }
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip);
        if (bind(listenfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
            mln_log(error, "bind error\n");
            return -1;
        }
        if (listen(listenfd, 511) < 0) {
            mln_log(error, "listen error\n");
            return -1;
        }
        if (mln_event_fd_set(ev, \
                             listenfd, \
                             M_EV_RECV|M_EV_NONBLOCK, \
                             M_EV_UNLIMITED, \
                             NULL, \
                             mln_accept) < 0)
        {
            mln_log(error, "listen sock set event error\n");
            return -1;
        }
    
        /* init trace */
        if (mln_trace_init(ev, mln_trace_path()) < 0) {
            mln_log(error, "trace init failed.\n");
            return -1;
        }
    
        /* create threads */
        tattr.nthread = nthread;
        tattr.entry = (mln_iothread_entry_t)mln_thread_entry;
        tattr.args = ev;
        tattr.handler = NULL;
        if (mln_iothread_init(&t, &tattr) < 0) {
            mln_log(error, "iothread init failed\n");
            return -1;
        }
    
        mln_event_dispatch(ev);
        return 0;
    }
    

    在这 329 行代码中,我们使用 Melon 库实现了一个简单的 HTTP1.1 服务器,这个服务器的行为就是,当收到 GET 请求后会回复一个 200 Hello 。

    在这个例子中,我们起了两个线程(主线程+1 个工作线程),两个线程都会操作同一个事件对象(ev)。而我们的 HTTP 服务器的 socket 则是依赖于这个事件对象ev的。换言之,事件对象是可以跨线程调度的,且事件模型帮我们屏蔽了惊群的问题。

    这里其实可以只用单线程,只是为了给读者演示和证明事件模型是支持多线程的。

    接着,我们可以看到mln_trace_init这个函数在main中被调用,用于初始化跟踪( tracing )任务。

    标题中之所以称为外挂式,原因是因为,跟踪( tracing )数据是由一个额外的脚本任务进行处理的。虽然脚本任务也依赖于事件模型,但脚本任务不会阻止 socket 事件的触发和处理。简单来说,所有的句柄、定时器事件与脚本运行是分时处理的,且不会因为脚本中存在死循环而导致其余事件无法被处理。此外,如果我们不启用跟踪脚本,或者不配置跟踪配置项,那么程序是不会投递跟踪信息的(见mln_accept中的mln_trace调用处)。

    trace 脚本使用的在 Melon 库中内置的脚本语言Melang,我们看一下与本例配套的 Melang 脚本:

    sys = Import('sys');
    
    Pipe('subscribe');
    beg = sys.time();
    sum = [];
    while (1) {
        ret = Pipe('recv');
        if (!ret) {
           now = sys.time();
           if (now - beg >= 3) {
               sys.print(sum);
               beg = now;
           } fi
           sys.msleep(10);
        } else {
           n = sys.size(ret);
           for (i = 0; i < n; ++i) {
               if (!(sys.has(sum, ret[i][0])))
                   sum[ret[i][0]] = 0;
               fi
               sum[ret[i][0]]++;
           }
        }
    }
    Pipe('unsubscribe');
    

    简单描述一下:我们会对所有的请求按方法(method)的不同进行归类统计,统计不同方法的请求总次数,并每 3 秒输出一次统计结果。

    接着,我们要修改 Melon 的配置文件,将trace_mode配置项注释去掉。

    注意:如果你是在 Melon 源码目录中启动 http 服务器,那么应该修改的是Melon/trace/trace.m文件。否则,应该修改/usr/local/lib/melang/trace/trace.m文件,因为解释器会优先查看当前路径下是否有该文件。

    下面,我们来运行程序:

    #tty1
    $ ./a
    
    #tty2
    $ ab -c 100 -n 100 http://127.1:1234/
    

    此时,我们可以看到 tty1 中输出:

    ...
    []
    []
    [100, ]
    ...
    

    并且随着ab被多次调用,数组中的值也会随之增加。

    最后,总结一下这样做的好处。

    1. 采用这种跟踪模式可以按需开启跟踪和关闭跟踪,甚至可以在程序中调用 trace 模块接口实现动态开启和关闭跟踪模式。
    2. Melang 脚本本身也支持与 MySQL 通信,因此可以在脚本层进行跟踪信息汇总和入库,这样就将统计与功能相分离,完全不需要在 C 程序中加入大量统计变量。
    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3052 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 34ms · UTC 00:07 · PVG 08:07 · LAX 17:07 · JFK 20:07
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.