本文总阅读量 本站访客数人次 本站总访问量
gongluck's blog
C/C++ Golang 音视频流媒体
网络服务专题

五、网络服务专题

0.项目仓库

1.Reactor反应堆

1.1 Reactor模型

Reactor模型

  • Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰 相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

  • Reactor模式是处理并发I/O比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。

  • Reactor模型有三个重要的组件:

    Reactor组件

    • 多路复用器:由操作系统提供,在linux上一般是select、poll、epoll等系统调用。
    • 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
    • 事件处理器:负责处理特定事件的处理函数。

1.2 Reactor实现

/*
 * @Author: gongluck 
 * @Date: 2020-11-26 09:41:40 
 * @Last Modified by: gongluck
 * @Last Modified time: 2020-11-26 16:58:43
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888

typedef int NCALLBACK(int, int, void *);

struct ntyevent
{
    int fd;
    int events;
    void *arg;
    int (*callback)(int fd, int events, void *arg);

    int status;//是否已经添加到epoll中
    char buffer[BUFFER_LENGTH];
    int length;
    long last_active;
};

struct ntyreactor
{
    int epfd;
    struct ntyevent events[MAX_EPOLL_EVENTS];
};

void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);

    return;
}

int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;

    int op;
    if (ev->status == 1)
    {
        op = EPOLL_CTL_MOD;
    }
    else
    {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }

    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0)
    {
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
        return -1;
    }

    return 0;
}

int nty_event_del(int epfd, struct ntyevent *ev)
{
    struct epoll_event ep_ev = {0, {0}};

    if (ev->status != 1)
    {
        return -1;
    }

    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);

    return 0;
}

int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    struct ntyevent *ev = reactor->events + fd;

    int len = send(fd, ev->buffer, ev->length, 0);
    if (len > 0)
    {
        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);

        nty_event_del(reactor->epfd, ev);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
    }
    else
    {
        close(ev->fd);

        nty_event_del(reactor->epfd, ev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }

    return len;
}

int recv_cb(int fd, int events, void *arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    struct ntyevent *ev = reactor->events + fd;

    int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    nty_event_del(reactor->epfd, ev);
    if (len > 0)
    {
        ev->length = len;
        ev->buffer[len] = '\0';

        printf("C[%d]:%s\n", fd, ev->buffer);

        nty_event_set(ev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ev);
    }
    else if (len == 0)
    {
        close(ev->fd);
        printf("[fd=%d] pos[%ld], closed\n", fd, ev - reactor->events);
    }
    else
    {
        close(ev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }

    return len;
}

int accept_cb(int fd, int events, void *arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    if (reactor == NULL)
        return -1;

    struct sockaddr_in client_addr = {0};
    socklen_t len = sizeof(client_addr);

    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *)&client_addr, &len)) == -1)
    {
        if (errno != EAGAIN && errno != EINTR)
        {
        }
        printf("accept: %s\n", strerror(errno));
        return -1;
    }

    int i = 0;
    do
    {
        for (i = 0; i < MAX_EPOLL_EVENTS; i++)
        {
            if (reactor->events[i].status == 0)
            {
                break;
            }
        }
        if (i == MAX_EPOLL_EVENTS)
        {
            printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
            break;
        }

        if (fcntl(clientfd, F_SETFL, O_NONBLOCK) < 0)
        {
            printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
            break;
        }

        nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
    } while (0);

    printf("new connect [%s:%d][time:%ld], pos[%d]\n",
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);

    return 0;
}

int init_sock(short port)
{
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(fd, F_SETFL, O_NONBLOCK);

    struct sockaddr_in server_addr = {0};
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);

    bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
    if (listen(fd, 20) < 0)
    {
        printf("listen failed : %s\n", strerror(errno));
    }

    return fd;
}

int ntyreactor_init(struct ntyreactor *reactor)
{
    if (reactor == NULL)
        return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));

    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0)
    {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
}

int ntyreactor_destory(struct ntyreactor *reactor)
{
    close(reactor->epfd);
    
    return 0;
}

int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor)
{
    if (reactor == NULL)
        return -1;

    nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);

    return 0;
}

int ntyreactor_run(struct ntyreactor *reactor)
{
    if (reactor == NULL)
        return -1;
    if (reactor->epfd < 0)
        return -1;

    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int checkpos = 0, i;
    while (1)
    {
        long now = time(NULL);
        for (i = 0; i < MAX_EPOLL_EVENTS/10; i++, checkpos++)
        {
            if (checkpos == MAX_EPOLL_EVENTS)
            {
                checkpos = 0;
            }

            if (reactor->events[checkpos].status != 1)
            {
                continue;
            }

            long duration = now - reactor->events[checkpos].last_active;
            if (duration >= 60)
            {
                close(reactor->events[checkpos].fd);
                printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
                nty_event_del(reactor->epfd, &reactor->events[checkpos]);
            }
        }

        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0)
        {
            printf("epoll_wait error, exit\n");
            continue;
        }

        for (i = 0; i < nready; i++)
        {
            struct ntyevent *ev = (struct ntyevent *)events[i].data.ptr;
            if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
            {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
            {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
        }
    }
}

int main()
{
    unsigned short port = SERVER_PORT;
    int sockfd = init_sock(port);

    struct ntyreactor reactor;
    ntyreactor_init(&reactor);

    ntyreactor_addlistener(&reactor, sockfd, accept_cb);
    ntyreactor_run(&reactor);

    ntyreactor_destory(&reactor);
    close(sockfd);

    return 0;
}

2.日志模块

2.1 log4cpp模块

log4cpp模块图

  • log4cpp有且只⼀个根Category,可以有多个⼦Category组成树型结构。

    log4cpp的appender种类

  • Appender负责将⽇志写⼊相应的设备,⽐如控制台、⽂件、调试器、Windows⽇志、syslog等。

  • Layout控制输出⽇志的显示样式。Log4cpp内置了4种Layout:

    • PassThroughLayout:直通布局。
    • SimpleLayout:简单布局。它只会为你添加“优先级”的输出。
    • BasicLayout:基本布局。它会为你添加“时间”、“优先级”、“种类”、“NDC”。
    • PatternLayout:格式化布局。它的使⽤⽅式类似C语⾔中的printf,使⽤格式化它符串来描述输出格式。

2.2 log4cpp编译安装

wget https://sourceforge.net/projects/log4cpp/files/log4cpp-1.1.x%20%28new%29/log4cpp-1.1/log4cpp-1.1.3.tar.gz
tar zxf log4cpp-1.1.3.tar.gz
cd log4cpp
./configure
make -j 8
sudo make install
sudo ldconfig

2.3 log4cpp例子

  • 例子1

    /*
     * @Author: gongluck 
     * @Date: 2020-11-27 21:12:58 
     * @Last Modified by: gongluck
     * @Last Modified time: 2020-11-27 21:30:54
     */
    
    // g++ log1.cpp -llog4cpp -lpthread
    
    #include "log4cpp/Category.hh"
    #include "log4cpp/FileAppender.hh"
    #include "log4cpp/OstreamAppender.hh"
    #include "log4cpp/BasicLayout.hh"
    
    int main()
    {
        // 实例化一个layout对象
        log4cpp::Layout *layout = new log4cpp::BasicLayout();
        // 初始化一个appender对象
        log4cpp::Appender *appender = new log4cpp::FileAppender("FileAppender","./log1.log");
        log4cpp::Appender *osappender = new log4cpp::OstreamAppender("OstreamAppender",&std::cout);
        // 把layout对象附着在appender对象上,一个layout格式样式对应一个appender
        appender->setLayout(layout);
        // 实例化一个category对象
        log4cpp::Category &warn_log = log4cpp::Category::getInstance("gongluck"); // 单例工厂
        // 设置additivity为false,替换已有的appender,不继承父类appender
        warn_log.setAdditivity(false);
        // 把appender对象附到category上
        warn_log.setAppender(appender);
        warn_log.addAppender(osappender);
        // 设置category的优先级,低于此优先级的日志不被记录
        warn_log.setPriority(log4cpp::Priority::INFO);
        // 记录一些日志
        warn_log.info("Program info which cannot be wirten");
        warn_log.debug("This debug message will fail to write");
        warn_log.alert("Alert info");
        // 其他记录日志方式
        warn_log.log(log4cpp::Priority::WARN, "This will be a logged warning");
    
        log4cpp::Priority::PriorityLevel priority = log4cpp::Priority::DEBUG;
        warn_log.log(priority, "Importance depends on context");
        warn_log.critStream() << "This will show up << as " << 1 << " critical message";
        // clean up and flush all appenders
        log4cpp::Category::shutdown();
        return 0;
    }
    
  • 例子2

    /*
     * @Author: gongluck 
     * @Date: 2020-11-27 21:31:03 
     * @Last Modified by: gongluck
     * @Last Modified time: 2020-11-27 21:44:29
     */
      
    // g++ log2.cpp -llog4cpp -lpthread
      
    #include "log4cpp/Category.hh"
    #include "log4cpp/PropertyConfigurator.hh"
      
    int main()
    {
        try
        {
            log4cpp::PropertyConfigurator::configure("./log2.conf");
        }
        catch (log4cpp::ConfigureFailure &f)
        {
            std::cout << "Configure Problem " << f.what() << std::endl;
            return -1;
        }
      
        // 实例化category对象
        log4cpp::Category &root = log4cpp::Category::getRoot();
      
        log4cpp::Category &category1 = log4cpp::Category::getInstance(std::string("category1"));
        log4cpp::Category &category3 = log4cpp::Category::getInstance(std::string("category1.category2"));
         
        category1.info("This is some info");
        category1.alert("A warning");
      
        category3.debug("This debug message will fail to write");
        category3.alert("All hands abandon ship");
        category3.critStream() << "This will show up << as " << 1 << " critical message";
        category3 << log4cpp::Priority::ERROR<< "And this will be an error";
        category3.log(log4cpp::Priority::WARN, "This will be a logged warning");
          
        log4cpp::Category::shutdown();
        return 0;
    }
    

3.百万并发

3.1 C10K

  • 从资源上来说,对2GB内存和千兆网卡的服务器来说,同时处理10000个请求,只要每个请求处理占用不到200KB(2GB/10000)的内存和100Kbit(1000Mbit/10000)的网络带宽就可以。所以,物理资源是足够的,接下来自然是软件的问题,特别是网络的 I/O 模型问题。

  • 在C10K以前,Linux中网络处理都用同步阻塞的方式,也就是每个请求都分配一个进程或者线程。请求数只有100个时,这种方式自然没问题,但增加到10000个请求时,10000个进程或线程的调度、上下文切换乃至它们占用的内存,都会成为瓶颈。

  • 使用非阻塞I/O和水平触发通知,比如使用 select 或者 poll

    • 应用软件使用select和poll时,需要对这些文件描述符列表进行轮询,这样,请求数多的时候就会比较耗时。并且,select和poll还有一些其他的限制。
    • select使用固定长度的位相量,表示文件描述符的集合,因此会有最大描述符数量的限制。比如,在32位系统中,默认限制是1024。并且,在select内部,检查套接字状态是用轮询的方法,再加上应用软件使用时的轮询,就变成了一个O(n^2)的关系。
    • poll改进了select的表示方法,换成了一个没有固定长度的数组,这样就没有了最大描述符数量的限制(当然还会受到系统文件描述符限制)。但应用程序在使用poll时,同样需要对文件描述符列表进行轮询,这样,处理耗时跟描述符数量就是O(N)的关系。
    • 应用程序每次调用select和poll时,还需要把文件描述符的集合,从用户空间传入内核空间,由内核修改后,再传出到用户空间中。这一来一回的内核空间与用户空间切换,也增加了处理成本。
  • 使用非阻塞I/O和边缘触发通知,比如epoll

    • epoll使用红黑树,在内核中管理文件描述符的集合,这样,就不需要应用程序在每次操作时都传入、传出这个集合。
    • epoll使用事件驱动的机制,只关注有I/O事件发生的文件描述符,不需要轮询扫描整个集合。
    • 由于边缘触发只在文件描述符可读或可写事件发生时才通知,那么应用程序就需要尽可能多地执行I/O,并要处理更多的异常事件。
  • 使用异步I/O(Asynchronous I/O,简称为 AIO)

    • 异步I/O允许应用程序同时发起很多I/O操作,而不用等待这些操作完成。而在I/O完成后,系统会用事件通知(比如信号或者回调函数)的方式,告诉应用程序。这时,应用程序才会去查询I/O操作的结果。

3.2 C1000K

  • 基于C10K的这些理论,epoll配合线程池,再加上CPU、内存和网络接口的性能和容量提升。大部分情况下,C100K很自然就可以达到。
  • 从软件资源上来说,大量的连接也会占用大量的软件资源,比如文件描述符的数量、连接状态的跟踪(CONNTRACK)、网络协议栈的缓存大小(比如套接字读写缓存、TCP 读写缓存)等等。最后,大量请求带来的中断处理,也会带来非常高的处理成本。这样,就需要多队列网卡、中断负载均衡、CPU 绑定、RPS/RFS(软中断负载均衡到多个 CPU 核上),以及将网络包的处理卸载(Offload)到网络设备(如 TSO/GSO、LRO/GRO、VXLAN OFFLOAD)等各种硬件和软件的优化。
  • C1000K 的解决方法,本质上还是构建在epoll的非阻塞I/O模型上。只不过,除了I/O模型之外,还需要从应用程序到Linux内核、再到CPU、内存和网络等各个层次的深度优化,特别是需要借助硬件,来卸载那些原来通过软件处理的大量功能。

3.3 C10M

  • Linux内核协议栈做了太多太繁重的工作。从网卡中断带来的硬中断处理程序开始,到软中断中的各层网络协议处理,最后再到应用程序,这个路径实在是太长了,就会导致网络包的处理优化,到了一定程度后,就无法更进一步了。
  • 要解决这个问题,最重要就是跳过内核协议栈的冗长路径,把网络包直接送到要处理的应用程序那里去。这里有两种常见的机制,DPDK和XDP。

Last modified on 2020-11-30