五、网络服务专题
0.项目仓库
1.Reactor反应堆
1.1 Reactor模型
-
Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰 相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
-
Reactor模式是处理并发I/O比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。
-
Reactor模型有三个重要的组件:
- 多路复用器:由操作系统提供,在linux上一般是select、poll、epoll等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数。
1.2 Reactor实现
/*
* @Author: gongluck
* @Date: 2020-11-26 09:41:40
* @Last Modified by: gongluck
* @Last Modified time: 2020-11-26 16:58:43
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define BUFFER_LENGTH 4096
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888
typedef int NCALLBACK(int, int, void *);
struct ntyevent
{
int fd;
int events;
void *arg;
int (*callback)(int fd, int events, void *arg);
int status;//是否已经添加到epoll中
char buffer[BUFFER_LENGTH];
int length;
long last_active;
};
struct ntyreactor
{
int epfd;
struct ntyevent events[MAX_EPOLL_EVENTS];
};
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
ev->last_active = time(NULL);
return;
}
int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
struct epoll_event ep_ev = {0, {0}};
ep_ev.data.ptr = ev;
ep_ev.events = ev->events = events;
int op;
if (ev->status == 1)
{
op = EPOLL_CTL_MOD;
}
else
{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0)
{
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
return -1;
}
return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev)
{
struct epoll_event ep_ev = {0, {0}};
if (ev->status != 1)
{
return -1;
}
ep_ev.data.ptr = ev;
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
return 0;
}
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg;
struct ntyevent *ev = reactor->events + fd;
int len = send(fd, ev->buffer, ev->length, 0);
if (len > 0)
{
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
}
else
{
close(ev->fd);
nty_event_del(reactor->epfd, ev);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return len;
}
int recv_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg;
struct ntyevent *ev = reactor->events + fd;
int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
nty_event_del(reactor->epfd, ev);
if (len > 0)
{
ev->length = len;
ev->buffer[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buffer);
nty_event_set(ev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if (len == 0)
{
close(ev->fd);
printf("[fd=%d] pos[%ld], closed\n", fd, ev - reactor->events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return len;
}
int accept_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg;
if (reactor == NULL)
return -1;
struct sockaddr_in client_addr = {0};
socklen_t len = sizeof(client_addr);
int clientfd;
if ((clientfd = accept(fd, (struct sockaddr *)&client_addr, &len)) == -1)
{
if (errno != EAGAIN && errno != EINTR)
{
}
printf("accept: %s\n", strerror(errno));
return -1;
}
int i = 0;
do
{
for (i = 0; i < MAX_EPOLL_EVENTS; i++)
{
if (reactor->events[i].status == 0)
{
break;
}
}
if (i == MAX_EPOLL_EVENTS)
{
printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
break;
}
if (fcntl(clientfd, F_SETFL, O_NONBLOCK) < 0)
{
printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
break;
}
nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
} while (0);
printf("new connect [%s:%d][time:%ld], pos[%d]\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);
return 0;
}
int init_sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in server_addr = {0};
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (listen(fd, 20) < 0)
{
printf("listen failed : %s\n", strerror(errno));
}
return fd;
}
int ntyreactor_init(struct ntyreactor *reactor)
{
if (reactor == NULL)
return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0)
{
printf("create epfd in %s err %s\n", __func__, strerror(errno));
return -2;
}
}
int ntyreactor_destory(struct ntyreactor *reactor)
{
close(reactor->epfd);
return 0;
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor)
{
if (reactor == NULL)
return -1;
nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
return 0;
}
int ntyreactor_run(struct ntyreactor *reactor)
{
if (reactor == NULL)
return -1;
if (reactor->epfd < 0)
return -1;
struct epoll_event events[MAX_EPOLL_EVENTS + 1];
int checkpos = 0, i;
while (1)
{
long now = time(NULL);
for (i = 0; i < MAX_EPOLL_EVENTS/10; i++, checkpos++)
{
if (checkpos == MAX_EPOLL_EVENTS)
{
checkpos = 0;
}
if (reactor->events[checkpos].status != 1)
{
continue;
}
long duration = now - reactor->events[checkpos].last_active;
if (duration >= 60)
{
close(reactor->events[checkpos].fd);
printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]);
}
}
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0)
{
printf("epoll_wait error, exit\n");
continue;
}
for (i = 0; i < nready; i++)
{
struct ntyevent *ev = (struct ntyevent *)events[i].data.ptr;
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
{
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
{
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
int main()
{
unsigned short port = SERVER_PORT;
int sockfd = init_sock(port);
struct ntyreactor reactor;
ntyreactor_init(&reactor);
ntyreactor_addlistener(&reactor, sockfd, accept_cb);
ntyreactor_run(&reactor);
ntyreactor_destory(&reactor);
close(sockfd);
return 0;
}
2.日志模块
2.1 log4cpp模块
-
log4cpp有且只⼀个根Category,可以有多个⼦Category组成树型结构。
-
Appender负责将⽇志写⼊相应的设备,⽐如控制台、⽂件、调试器、Windows⽇志、syslog等。
-
Layout控制输出⽇志的显示样式。Log4cpp内置了4种Layout:
- PassThroughLayout:直通布局。
- SimpleLayout:简单布局。它只会为你添加“优先级”的输出。
- BasicLayout:基本布局。它会为你添加“时间”、“优先级”、“种类”、“NDC”。
- PatternLayout:格式化布局。它的使⽤⽅式类似C语⾔中的printf,使⽤格式化它符串来描述输出格式。
2.2 log4cpp编译安装
wget https://sourceforge.net/projects/log4cpp/files/log4cpp-1.1.x%20%28new%29/log4cpp-1.1/log4cpp-1.1.3.tar.gz
tar zxf log4cpp-1.1.3.tar.gz
cd log4cpp
./configure
make -j 8
sudo make install
sudo ldconfig
2.3 log4cpp例子
-
/* * @Author: gongluck * @Date: 2020-11-27 21:12:58 * @Last Modified by: gongluck * @Last Modified time: 2020-11-27 21:30:54 */ // g++ log1.cpp -llog4cpp -lpthread #include "log4cpp/Category.hh" #include "log4cpp/FileAppender.hh" #include "log4cpp/OstreamAppender.hh" #include "log4cpp/BasicLayout.hh" int main() { // 实例化一个layout对象 log4cpp::Layout *layout = new log4cpp::BasicLayout(); // 初始化一个appender对象 log4cpp::Appender *appender = new log4cpp::FileAppender("FileAppender","./log1.log"); log4cpp::Appender *osappender = new log4cpp::OstreamAppender("OstreamAppender",&std::cout); // 把layout对象附着在appender对象上,一个layout格式样式对应一个appender appender->setLayout(layout); // 实例化一个category对象 log4cpp::Category &warn_log = log4cpp::Category::getInstance("gongluck"); // 单例工厂 // 设置additivity为false,替换已有的appender,不继承父类appender warn_log.setAdditivity(false); // 把appender对象附到category上 warn_log.setAppender(appender); warn_log.addAppender(osappender); // 设置category的优先级,低于此优先级的日志不被记录 warn_log.setPriority(log4cpp::Priority::INFO); // 记录一些日志 warn_log.info("Program info which cannot be wirten"); warn_log.debug("This debug message will fail to write"); warn_log.alert("Alert info"); // 其他记录日志方式 warn_log.log(log4cpp::Priority::WARN, "This will be a logged warning"); log4cpp::Priority::PriorityLevel priority = log4cpp::Priority::DEBUG; warn_log.log(priority, "Importance depends on context"); warn_log.critStream() << "This will show up << as " << 1 << " critical message"; // clean up and flush all appenders log4cpp::Category::shutdown(); return 0; }
-
/* * @Author: gongluck * @Date: 2020-11-27 21:31:03 * @Last Modified by: gongluck * @Last Modified time: 2020-11-27 21:44:29 */ // g++ log2.cpp -llog4cpp -lpthread #include "log4cpp/Category.hh" #include "log4cpp/PropertyConfigurator.hh" int main() { try { log4cpp::PropertyConfigurator::configure("./log2.conf"); } catch (log4cpp::ConfigureFailure &f) { std::cout << "Configure Problem " << f.what() << std::endl; return -1; } // 实例化category对象 log4cpp::Category &root = log4cpp::Category::getRoot(); log4cpp::Category &category1 = log4cpp::Category::getInstance(std::string("category1")); log4cpp::Category &category3 = log4cpp::Category::getInstance(std::string("category1.category2")); category1.info("This is some info"); category1.alert("A warning"); category3.debug("This debug message will fail to write"); category3.alert("All hands abandon ship"); category3.critStream() << "This will show up << as " << 1 << " critical message"; category3 << log4cpp::Priority::ERROR<< "And this will be an error"; category3.log(log4cpp::Priority::WARN, "This will be a logged warning"); log4cpp::Category::shutdown(); return 0; }
3.百万并发
3.1 C10K
-
从资源上来说,对2GB内存和千兆网卡的服务器来说,同时处理10000个请求,只要每个请求处理占用不到200KB(2GB/10000)的内存和100Kbit(1000Mbit/10000)的网络带宽就可以。所以,物理资源是足够的,接下来自然是软件的问题,特别是网络的 I/O 模型问题。
-
在C10K以前,Linux中网络处理都用同步阻塞的方式,也就是每个请求都分配一个进程或者线程。请求数只有100个时,这种方式自然没问题,但增加到10000个请求时,10000个进程或线程的调度、上下文切换乃至它们占用的内存,都会成为瓶颈。
-
使用非阻塞I/O和水平触发通知,比如使用 select 或者 poll
- 应用软件使用select和poll时,需要对这些文件描述符列表进行轮询,这样,请求数多的时候就会比较耗时。并且,select和poll还有一些其他的限制。
- select使用固定长度的位相量,表示文件描述符的集合,因此会有最大描述符数量的限制。比如,在32位系统中,默认限制是1024。并且,在select内部,检查套接字状态是用轮询的方法,再加上应用软件使用时的轮询,就变成了一个O(n^2)的关系。
- poll改进了select的表示方法,换成了一个没有固定长度的数组,这样就没有了最大描述符数量的限制(当然还会受到系统文件描述符限制)。但应用程序在使用poll时,同样需要对文件描述符列表进行轮询,这样,处理耗时跟描述符数量就是O(N)的关系。
- 应用程序每次调用select和poll时,还需要把文件描述符的集合,从用户空间传入内核空间,由内核修改后,再传出到用户空间中。这一来一回的内核空间与用户空间切换,也增加了处理成本。
-
使用非阻塞I/O和边缘触发通知,比如epoll
- epoll使用红黑树,在内核中管理文件描述符的集合,这样,就不需要应用程序在每次操作时都传入、传出这个集合。
- epoll使用事件驱动的机制,只关注有I/O事件发生的文件描述符,不需要轮询扫描整个集合。
- 由于边缘触发只在文件描述符可读或可写事件发生时才通知,那么应用程序就需要尽可能多地执行I/O,并要处理更多的异常事件。
-
使用异步I/O(Asynchronous I/O,简称为 AIO)
- 异步I/O允许应用程序同时发起很多I/O操作,而不用等待这些操作完成。而在I/O完成后,系统会用事件通知(比如信号或者回调函数)的方式,告诉应用程序。这时,应用程序才会去查询I/O操作的结果。
3.2 C1000K
- 基于C10K的这些理论,epoll配合线程池,再加上CPU、内存和网络接口的性能和容量提升。大部分情况下,C100K很自然就可以达到。
- 从软件资源上来说,大量的连接也会占用大量的软件资源,比如文件描述符的数量、连接状态的跟踪(CONNTRACK)、网络协议栈的缓存大小(比如套接字读写缓存、TCP 读写缓存)等等。最后,大量请求带来的中断处理,也会带来非常高的处理成本。这样,就需要多队列网卡、中断负载均衡、CPU 绑定、RPS/RFS(软中断负载均衡到多个 CPU 核上),以及将网络包的处理卸载(Offload)到网络设备(如 TSO/GSO、LRO/GRO、VXLAN OFFLOAD)等各种硬件和软件的优化。
- C1000K 的解决方法,本质上还是构建在epoll的非阻塞I/O模型上。只不过,除了I/O模型之外,还需要从应用程序到Linux内核、再到CPU、内存和网络等各个层次的深度优化,特别是需要借助硬件,来卸载那些原来通过软件处理的大量功能。
3.3 C10M
- Linux内核协议栈做了太多太繁重的工作。从网卡中断带来的硬中断处理程序开始,到软中断中的各层网络协议处理,最后再到应用程序,这个路径实在是太长了,就会导致网络包的处理优化,到了一定程度后,就无法更进一步了。
- 要解决这个问题,最重要就是跳过内核协议栈的冗长路径,把网络包直接送到要处理的应用程序那里去。这里有两种常见的机制,DPDK和XDP。
Last modified on 2020-11-30