九、源码分析专题
0.项目仓库
1.Nginx架构和模块
1.1 Nginx架构
-
nginx在启动后,在unix系统中会以daemon的方式在后台运行,后台进程包含一个master进程和多个worker进程。
-
master进程主要用来管理worker进程,包含:
- 接收来自外界的信号,
- 向各worker进程发送信号,
- 监控worker进程的运行状态,
- 当worker进程退出后(异常情况下),会自动重新启动新的worker进程。
-
而基本的网络事件,则是放在worker进程中来处理了。多个worker进程之间是对等的,他们同等竞争来自客户端的请求,各进程互相之间是独立的。
-
每个worker进程都是从master进程fork过来,在master进程里面,先建立好需要listen的socket(listenfd)之后,然后再fork出多个worker进程。所有worker进程的listenfd会在新连接到来时变得可读,为保证只有一个进程处理该连接,所有worker 进程在注册listenfd读事件前抢accept_mutex,抢到互斥锁的那个进程注册listenfd读事件,在读事件里调用accept接受该连接。当一个worker进程在accept这个连接之后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接,这样一个完整的请求就是这样的了。
-
对于每个worker进程来说,独立的进程,不需要加锁,所以省掉了锁带来的开销,同时在编程以及问题查找时,也会方便很多。其次,采用独立的进程,可以让互相之间不会影响,一个进程退出后,其它进程还在工作,服务不会中断,master进程则很快启动新的worker进程。
-
nginx采用了异步非阻塞的方式来处理请求。非阻塞就是,事件没有准备好,马上返回EAGAIN。
-
nginx里面的定时器事件是放在一颗维 护定时器的红黑树里面,每次在进入epoll_wait前,先从该红黑树里面拿到所有定时器事件的最小时间,在计算出epoll_wait的超时时间后进入epoll_wait。所以,当没有事件产生,也没有中断信号时,epoll_wait会超时,也就是说,定时器事件到了。这时,nginx会检查所有的超时事件,将他们的状态设置为超时,然后再去处理网络事件。
1.2 Nginx基础概念
-
connection
struct ngx_connection_s { void *data; ngx_event_t *read; ngx_event_t *write; ngx_socket_t fd; ngx_recv_pt recv; ngx_send_pt send; ngx_recv_chain_pt recv_chain; ngx_send_chain_pt send_chain; ngx_listening_t *listening; off_t sent; ngx_log_t *log; ngx_pool_t *pool; int type; struct sockaddr *sockaddr; socklen_t socklen; ngx_str_t addr_text; ngx_proxy_protocol_t *proxy_protocol; #if (NGX_SSL || NGX_COMPAT) ngx_ssl_connection_t *ssl; #endif ngx_udp_connection_t *udp; struct sockaddr *local_sockaddr; socklen_t local_socklen; ngx_buf_t *buffer; ngx_queue_t queue; ngx_atomic_uint_t number; ngx_uint_t requests; unsigned buffered:8; unsigned log_error:3; /* ngx_connection_log_error_e */ unsigned timedout:1; unsigned error:1; unsigned destroyed:1; unsigned idle:1; unsigned reusable:1; unsigned close:1; unsigned shared:1; unsigned sendfile:1; unsigned sndlowat:1; unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */ unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */ unsigned need_last_buf:1; #if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT) unsigned busy_count:2; #endif #if (NGX_THREADS || NGX_COMPAT) ngx_thread_task_t *sendfile_task; #endif };
- nginx在实现时,是通过一个连接池来管理的,每个worker进程都有一个独立的连接池,连接池的大小是worker_connections。这里的连接池里面保存的其实不是真实的连接,它只是一个worker_connections大小的一个ngx_connection_t结构的数组。并且,nginx会通过一个链表free_connections来保存所有的空闲ngx_connection_t,每次获取一个连接时,就从空闲连接链表中获取一个,用完后,再放回空闲连接链表里面。
- 一个nginx能建立的最大连接数,应该是worker_connections * worker_processes。当然,这里说的是最大连接数,对于HTTP请求本地资源来说,能够支持的最大并发数量是 worker_connections * worker_processes,而如果是 HTTP 作为反向代理来说,最大并发数量应该是worker_connections * worker_processes / 2。因为作为反向代理服务器,每个并发会建立与客户端的连接和与后端服务的连接,会占用两个连接。
- nginx使用一个叫ngx_accept_disabled的变量来控制是否去竞争accept_mutex锁。计算 ngx_accept_disabled的值,这个值是nginx单进程的所有连接总数的八分之一,减去剩下的空闲连接数量,得到的这个ngx_accept_disabled有一个规律,当剩余连接数小于总连接数的八分之一时,其值才大于 0,而且剩余的连接数越小,这个值越大。当ngx_accept_disabled大于0时,不会去尝试获取accept_mutex锁,并且将ngx_accept_disabled减1,于是,每次执行到此处时,都会去减1,直到小于0。
-
request
struct ngx_http_request_s { uint32_t signature; /* "HTTP" */ ngx_connection_t *connection; void **ctx; void **main_conf; void **srv_conf; void **loc_conf; ngx_http_event_handler_pt read_event_handler; ngx_http_event_handler_pt write_event_handler; #if (NGX_HTTP_CACHE) ngx_http_cache_t *cache; #endif ngx_http_upstream_t *upstream; ngx_array_t *upstream_states; /* of ngx_http_upstream_state_t */ ngx_pool_t *pool; ngx_buf_t *header_in; ngx_http_headers_in_t headers_in; ngx_http_headers_out_t headers_out; ngx_http_request_body_t *request_body; time_t lingering_time; time_t start_sec; ngx_msec_t start_msec; ngx_uint_t method; ngx_uint_t http_version; ngx_str_t request_line; ngx_str_t uri; ngx_str_t args; ngx_str_t exten; ngx_str_t unparsed_uri; ngx_str_t method_name; ngx_str_t http_protocol; ngx_str_t schema; ngx_chain_t *out; ngx_http_request_t *main; ngx_http_request_t *parent; ngx_http_postponed_request_t *postponed; ngx_http_post_subrequest_t *post_subrequest; ngx_http_posted_request_t *posted_requests; ngx_int_t phase_handler; ngx_http_handler_pt content_handler; ngx_uint_t access_code; ngx_http_variable_value_t *variables; #if (NGX_PCRE) ngx_uint_t ncaptures; int *captures; u_char *captures_data; #endif size_t limit_rate; size_t limit_rate_after; /* used to learn the Apache compatible response length without a header */ size_t header_size; off_t request_length; ngx_uint_t err_status; ngx_http_connection_t *http_connection; ngx_http_v2_stream_t *stream; ngx_http_log_handler_pt log_handler; ngx_http_cleanup_t *cleanup; unsigned count:16; unsigned subrequests:8; unsigned blocked:8; unsigned aio:1; unsigned http_state:4; /* URI with "/." and on Win32 with "//" */ unsigned complex_uri:1; /* URI with "%" */ unsigned quoted_uri:1; /* URI with "+" */ unsigned plus_in_uri:1; /* URI with " " */ unsigned space_in_uri:1; unsigned invalid_header:1; unsigned add_uri_to_alias:1; unsigned valid_location:1; unsigned valid_unparsed_uri:1; unsigned uri_changed:1; unsigned uri_changes:4; unsigned request_body_in_single_buf:1; unsigned request_body_in_file_only:1; unsigned request_body_in_persistent_file:1; unsigned request_body_in_clean_file:1; unsigned request_body_file_group_access:1; unsigned request_body_file_log_level:3; unsigned request_body_no_buffering:1; unsigned subrequest_in_memory:1; unsigned waited:1; #if (NGX_HTTP_CACHE) unsigned cached:1; #endif #if (NGX_HTTP_GZIP) unsigned gzip_tested:1; unsigned gzip_ok:1; unsigned gzip_vary:1; #endif #if (NGX_PCRE) unsigned realloc_captures:1; #endif unsigned proxy:1; unsigned bypass_cache:1; unsigned no_cache:1; /* * instead of using the request context data in * ngx_http_limit_conn_module and ngx_http_limit_req_module * we use the bit fields in the request structure */ unsigned limit_conn_status:2; unsigned limit_req_status:3; unsigned limit_rate_set:1; unsigned limit_rate_after_set:1; #if 0 unsigned cacheable:1; #endif unsigned pipeline:1; unsigned chunked:1; unsigned header_only:1; unsigned expect_trailers:1; unsigned keepalive:1; unsigned lingering_close:1; unsigned discard_body:1; unsigned reading_body:1; unsigned internal:1; unsigned error_page:1; unsigned filter_finalize:1; unsigned post_action:1; unsigned request_complete:1; unsigned request_output:1; unsigned header_sent:1; unsigned expect_tested:1; unsigned root_tested:1; unsigned done:1; unsigned logged:1; unsigned buffered:4; unsigned main_filter_need_in_memory:1; unsigned filter_need_in_memory:1; unsigned filter_need_temporary:1; unsigned preserve_body:1; unsigned allow_ranges:1; unsigned subrequest_ranges:1; unsigned single_range:1; unsigned disable_not_modified:1; unsigned stat_reading:1; unsigned stat_writing:1; unsigned stat_processing:1; unsigned background:1; unsigned health_check:1; /* used to parse HTTP headers */ ngx_uint_t state; ngx_uint_t header_hash; ngx_uint_t lowcase_index; u_char lowcase_header[NGX_HTTP_LC_HEADER_LEN]; u_char *header_name_start; u_char *header_name_end; u_char *header_start; u_char *header_end; /* * a memory that can be reused after parsing a request line * via ngx_http_ephemeral_t */ u_char *uri_start; u_char *uri_end; u_char *uri_ext; u_char *args_start; u_char *request_start; u_char *request_end; u_char *method_end; u_char *schema_start; u_char *schema_end; u_char *host_start; u_char *host_end; u_char *port_start; u_char *port_end; unsigned http_minor:16; unsigned http_major:16; };
-
对于nginx来说,一个请求是从ngx_http_init_request开始的,在这个函数中,会设置读事件为ngx_http_process_request_line,也就是说,接下来的网络事件,会由ngx_http_process_request_line来执行,处理请求行。通过ngx_http_read_request_header来读取请求数据。然后调用ngx_http_parse_request_line函数来解析请求行。
-
nginx为提高效率,采用状态机来解析请求行,而且在进行method的比较时,没有直接使用字符串比较,而是将四个字符转换成一个整型,然后一次比较以减少cpu的指令数。
-
整个请求行解析到的参数,会保存到ngx_http_request_t结构当中。
-
在解析完请求行后,nginx会设置读事件的handler为ngx_http_process_request_headers,然后后续的请求就在ngx_http_process_request_headers中进行读取与解析。
-
ngx_http_process_request_headers函数用来读取请求头,跟请求行一样,还是调用ngx_http_read_request_header来读取请求头,调用ngx_http_parse_header_line来解析一行请求头,解析到的请求头会保存到ngx_http_request_t的域headers_in中,headers_in是一个链表结构,保存所有的请求头。
-
而HTTP中有些请求是需要特别处理的,这些请求头与请求处理函数存放在一个映射表里面,即ngx_http_headers_in,在初始化时,会生成一个hash表,当每解析到一个请求头后,就会先在这个hash表中查找,如果有找到,则调用相应的处理函数来处理这个请求头。
-
当nginx解析到两个回车换行符时,就表示请求头的结束,此时就会调用ngx_http_process_request来处理请求了。ngx_http_process_request会设置当前的连接的读写事件处理函数为ngx_http_request_handler,然后再调用ngx_http_handler来真正开始处理一个完整的http请求。读写事件处理函数都是ngx_http_request_handler,其实在这个函数中,会根据当前事件是读事件还是写事件,分别调用ngx_http_request_t中的read_event_handler或者是write_event_handler。由于此时,我们的请求头已经读取完成了,nginx先不读取请求body,设置read_event_handler为ngx_http_block_reading,即不读取数据了。
-
真正开始处理数据,是在ngx_http_handler这个函数里面,这个函数会设置 write_event_handler为ngx_http_core_run_phases,并执行ngx_http_core_run_phases函数。ngx_http_core_run_phases这个函数将执行多阶段请求处理,nginx将一个http请求的处理分为多个阶段,那么这个函数就是执行这些阶段来产生数据。最终是调用ngx_http_core_run_phases来处理请求,产生的响应头会放在ngx_http_request_t的headers_out中。
-
nginx的各种阶段会对请求进行处理,最后会调用filter来过滤数据,对数据进行加工。这里的filter包括header filter与body filter,即对响应头或响应体进行处理。filter是一个链表结构,分别有header filter与body filter,先执行header filter中的所有filter,然后再执行body filter中的所有filter。在header filter中的最后一个filter,即ngx_http_header_filter, 这个filter将会遍历所有的响应头,最后需要输出的响应头在一个连续的内存,然后调用ngx_http_write_filter进行输出。ngx_http_write_filter是body filter中的最后一个,所以nginx首先的body信息,在经过一系列的body filter之后,最后也会调用ngx_http_write_filter来进行输出。nginx会将整个请求头都放在一个buffer里面,这个buffer的大小通过配置项client_header_buffer_size来设置,如果用户的请求头太大,这个 buffer装不下,那nginx就会重新分配一个新的更大的buffer来装请求头,这个大buffer可以通过large_client_header_buffers来设置,这个large_buffer这一组buffer。
-
为了保存请求行或请求头的完整性,一个完整的请求行或请求头,需要放在一个连续的内存里面,所以,一个完整的请求行或请求头,只会保存在一个buffer里面。这样,如果请求行大于一个buffer的大小,就会返回414错误,如果一个请求头大小大于一个buffer大小,就会返回400错误。
-
1.3 Handler模块
-
Handler模块就是接受来自客户端的请求并产生输出的模块。
-
/* * @Author: gongluck * @Date: 2020-12-08 19:10:29 * @Last Modified by: gongluck * @Last Modified time: 2020-12-08 19:11:06 */ // ./configure --add-module=/mnt/e/Code/CVIP/code/nginx/module // sudo ./objs/nginx -c /mnt/e/Code/CVIP/code/nginx/module/hello.conf #include <ngx_http.h> #include <ngx_config.h> #include <ngx_core.h> static char *ngx_http_hello_module_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static ngx_int_t ngx_http_hello_module_handler(ngx_http_request_t *r); // 模块配置结构 typedef struct { ngx_str_t hello_string; } ngx_http_hello_loc_conf_t; // 模块配置指令 static ngx_command_t hello_commands[] = { { ngx_string("hello"), //配置指令的名称 NGX_HTTP_LOC_CONF | NGX_CONF_NOARGS, //该配置指令属性的集合 ngx_http_hello_module_set, //当nginx在解析配置的时候,如果遇到这个配置指令,将会把读取到的值传递给这个函数进行分解处理 NGX_HTTP_LOC_CONF_OFFSET, //指定当前配置项存储的内存位置,实际上是使用哪个内存池的问题。 0, //指定该配置项值的精确存放位置,一般指定为某一个结构体变量的字段偏移。 NULL //可以指向任何一个在读取配置过程中需要的数据,以便于进行配置读取的处理。 }, ngx_null_command}; // 模块上下文结构 static ngx_http_module_t hello_ctx = { NULL, //在创建和读取该模块的配置信息之前被调用 NULL, //在创建和读取该模块的配置信息之后被调用 NULL, //调用该函数创建本模块位于http block的配置信息存储结构。该函数成功的时候,返回创建的配置对象。失败的话,返回NULL。 NULL, //调用该函数初始化本模块位于http block 的配置信息存储结构。该函数成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。 NULL, //调用该函数创建本模块位于http server block的配置信息存储结构,每个server block会创建一个。该函数成功的时候,返回创建的配置对象。失败的话,返回NULL。 NULL, //因为有些配置指令既可以出现在http block,也可以出现在http server block中。那么遇到这种情况,每个server都会有自己存储结构来存储该server的配置,但是在这种情况下http block中的配置与server block中的配置信息发生冲突的时候,就需要调用此函数进行合并,该函数并非必须提供,当预计到绝对不会发生需要合并的情况的时候,就无需提供。当然为了安全起见还是建议提供。该函数执行成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。 NULL, //调用该函数创建本模块位于location block的配置信息存储结构。每个在配置中指明的location创建一个。该函数执行成功,返回创建的配置对象。失败的话,返回NULL。 NULL, //与merge_srv_conf类似,这个也是进行配置值合并的地方。该函数成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。 }; // ngx_http_hello_module ngx_module_t ngx_http_hello_module = { NGX_MODULE_V1, &hello_ctx, /* module context */ hello_commands, /* module directives */ NGX_HTTP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING}; static char *ngx_http_hello_module_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_http_core_loc_conf_t *corecf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module); corecf->handler = ngx_http_hello_module_handler; return NGX_CONF_OK; } static ngx_int_t ngx_http_hello_module_handler(ngx_http_request_t *r) { ngx_str_t str = ngx_string("Hello, Nginx!"); ngx_buf_t *b = ngx_pcalloc(r->pool, sizeof(ngx_buf_t)); if (b == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } ngx_chain_t out; out.buf = b; out.next = NULL; b->pos = str.data; b->last = b->pos + str.len; b->memory = 1; /* this buffer is in memory */ b->last_buf = 1; /* this is the last buffer in the buffer chain */ /* set the status line */ r->headers_out.status = NGX_HTTP_OK; r->headers_out.content_length_n = str.len; /* send the headers of your response */ ngx_int_t rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) { return rc; } /* send the buffer chain of your response */ return ngx_http_output_filter(r, &out); }
1.4 Filter模块
-
过滤(filter)模块是过滤响应头和内容的模块,可以对回复的头和内容进行处理。它的处理时间在获取回复内容之后,向用户发送响应之前。它的处理过程分为两个阶段,过滤HTTP回复的头部和主体,在这两个阶段可以分别对头部和主体进行修改。
-
所有模块的响应内容要返回给客户端,都必须调用这两个接口
//分别对头部和主体进行过滤的函数 ngx_http_top_header_filter(r); ngx_http_top_body_filter(r, in);
-
过滤模块的调用是有顺序的,它的顺序在编译的时候就决定了。控制编译的脚本位于auto/modules中,当编译完Nginx以后,可以在objs目录下面看到一个ngx_modules.c的文件。
ngx_module_t *ngx_modules[] = { &ngx_core_module, &ngx_errlog_module, &ngx_conf_module, &ngx_regex_module, &ngx_events_module, &ngx_event_core_module, &ngx_epoll_module, &ngx_http_module, &ngx_http_core_module, &ngx_http_log_module, &ngx_http_upstream_module, &ngx_http_static_module, &ngx_http_autoindex_module, &ngx_http_index_module, &ngx_http_mirror_module, &ngx_http_try_files_module, &ngx_http_auth_basic_module, &ngx_http_access_module, &ngx_http_limit_conn_module, &ngx_http_limit_req_module, &ngx_http_geo_module, &ngx_http_map_module, &ngx_http_split_clients_module, &ngx_http_referer_module, &ngx_http_rewrite_module, &ngx_http_proxy_module, &ngx_http_fastcgi_module, &ngx_http_uwsgi_module, &ngx_http_scgi_module, &ngx_http_memcached_module, &ngx_http_empty_gif_module, &ngx_http_browser_module, &ngx_http_upstream_hash_module, &ngx_http_upstream_ip_hash_module, &ngx_http_upstream_least_conn_module, &ngx_http_upstream_random_module, &ngx_http_upstream_keepalive_module, &ngx_http_upstream_zone_module, &ngx_http_hello_module, &ngx_http_write_filter_module, &ngx_http_header_filter_module, &ngx_http_chunked_filter_module, &ngx_http_range_header_filter_module, &ngx_http_gzip_filter_module, &ngx_http_postpone_filter_module, &ngx_http_ssi_filter_module, &ngx_http_charset_filter_module, &ngx_http_userid_filter_module, &ngx_http_headers_filter_module,// &ngx_http_copy_filter_module,// &ngx_http_range_body_filter_module, &ngx_http_not_modified_filter_module, NULL };
-
从write_filter到not_modified_filter,模块的执行顺序是反向的。所有第三方的模块只能加入到copy_filter和headers_filter模块之间执行。
-
在过滤模块中,所有输出的内容都是通过一条单向链表所组成。这种单向链表的设计,正好应和了Nginx流式的输出模式。每次Nginx都是读到一部分的内容,就放到链表,然后输出出去。这种设计的好处是简单,非阻塞,但是相应的问题就是跨链表的内容操作非常麻烦, 如果需要跨链表,很多时候都只能缓存链表的内容。单链表负载的就是ngx_buf_t,这个结构体使用非常广泛。
-
响应头过滤函数主要的用处就是处理HTTP响应的头,可以根据实际情况对于响应头进行修改或者添加删除。响应头过滤函数先于响应体过滤函数,而且只调用一次,所以一般可作过滤模块的初始化工作。
ngx_int_t ngx_http_send_header(ngx_http_request_t *r) { if (r->post_action) { return NGX_OK; } if (r->header_sent) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "header already sent"); return NGX_ERROR; } if (r->err_status) { r->headers_out.status = r->err_status; r->headers_out.status_line.len = 0; } return ngx_http_top_header_filter(r);// }
-
可以把HTTP响应头的存储方式想象成一个hash表,在Nginx内部可以很方便地查找和修改各个响应头 部,ngx_http_header_filter_module过滤模块把所有的HTTP头组合成一个完整的buffer,最终ngx_http_write_filter_module过滤模块把buffer输出。
-
响应体过滤函数是过滤响应主体的函数。ngx_http_top_body_filter这个函数每个请求可能会被执行多次,它的入口函数是ngx_http_output_filter。
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in) { ngx_int_t rc; ngx_connection_t *c; c = r->connection; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http output filter \"%V?%V\"", &r->uri, &r->args); rc = ngx_http_top_body_filter(r, in); if (rc == NGX_ERROR) { /* NGX_ERROR may be returned by any filter */ c->error = 1; } return rc; }
1.5 Upstream模块
- 从本质上说,upstream属于handler,只是他不产生自己的内容,而是通过请求后端服务器得到内容,所以才称为upstream(上游)。
- 请求并取得响应内容的整个过程已经被封装到nginx内部,所以upstream模块只需要开发若干回调函数,完成构造请求和解析响应等具体的工作。
- upstream模块使用的就是handler模块的接入方式。同时,upstream模块的指令系统的设计也是遵循handler模块的基本规则:配置该模块才会执行该模块。
1.6 负载均衡模块
- 负载均衡模块用于从upstream指令定义的后端主机列表中选取一台主机。nginx先使用负载均衡模块找到一台主机,再使用upstream模块实现与这台主机的交互。
- 核心指令ip_hash只能在upstream {}中使用。这条指令用于通知nginx使用ip hash负载均衡算法。如果没加这条指令,nginx会使用默认的round robin负载均衡模块。
2.Skynet
2.1 环境安装
git clone https://github.com/cloudwu/skynet.git
cd skynet
make linux -j 8
2.2 Skynet工作模型
-
skynet中的actor模型
-
结构组成
- 隔离的环境(内存块或lua虚拟机)
- 消息队列
- 回调函数
-
实现
- logger服务service-src/service_logger.c
- lua服务启动器service-src/service_snlua.c
-
2.3 Skynet Lua例子
-- ./skynet /mnt/e/Code/CVIP/code/skynet/test.conf
local skynet = require "skynet"
local socket = require "skynet.socket"
local function event_loop(clientfd)
while true do
local data = socket.readline(clientfd)--从网络获取 以\n为分隔符的数据包
if not data then
return
end
print(clientfd, "recv:", data)
socket.write(clientfd, data.."\n")
end
end
local function accept(clientfd, addr)-- 回调函数的作用 就是可以将 fd绑定到其他actor
print("accept a connect:", clientfd, addr)
socket.start(clientfd) -- 将clientfd注册到epoll
skynet.fork(event_loop, clientfd) -- 实现一个简单的echo服务,可以通过 telnet 127.0.0.1 8001来连接skynet
end
skynet.start(function ()
local listenfd = socket.listen("0.0.0.0", 8001) -- socket bind listen
socket.start(listenfd, accept) -- 将listenfd注册到epoll,收到连接会回调accept函数
end)
3.ZeroMQ
3.1 环境安装
git clone https://github.com/zeromq/libzmq.git
cd libzmq/
./autogen.sh
./configure --enable-debug
make -j 8
sudo make install
sudo ldconfig
3.2 例子代码
/*
* @Author: gongluck
* @Date: 2020-12-10 02:27:44
* @Last Modified by: gongluck
* @Last Modified time: 2020-12-10 02:37:16
*/
// gcc -o server server.c -lzmq -g -O0
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main()
{
void *context = zmq_ctx_new();
void *responder = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(responder, "tcp://*:5555");
while (1)
{
char buffer[10];
int ret = zmq_recv(responder, buffer, 10, 0);
printf("收到%.*s\n", ret, buffer);
zmq_send(responder, "server recved.", strlen("server recved."), 0);
}
return 0;
}
/*
* @Author: gongluck
* @Date: 2020-12-10 02:33:38
* @Last Modified by: gongluck
* @Last Modified time: 2020-12-10 02:37:00
*/
// gcc -o client client.c -lzmq -g -O0
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main()
{
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5555");
for (int request_nbr = 0; request_nbr != 10; request_nbr++)
{
char buffer[20] = {0};
zmq_send(requester, "Hello", 5, 0);
zmq_recv(requester, buffer, 20, 0);
printf("接收到 %.20s\n", buffer);
}
zmq_close(requester);
zmq_ctx_destroy(context);
return 0;
}
4.Redis
4.1 Redis的元素结构
-
hash值取余的table数组+hash表
-
hash表
/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ typedef struct dictht { dictEntry **table;//table属性是⼀个数组,数组中的每个元素都是⼀个指向dict.h/dictEntry 结构的指针,每个dictEntry结构保存着⼀个键值对 unsigned long size;//size属性记录了哈希表的⼤⼩,也即是table数组的⼤⼩ unsigned long sizemask;//sizemask属性的值总是等于size - 1, 这个属性和哈希值⼀起决定⼀个键应该被放到table数组的哪个索引上⾯ unsigned long used;//used属性则记录了哈希表⽬前已有节点(键值对)的数量 } dictht;
-
hash表节点
typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next;//next属性是指向另⼀个哈希表节点的指针,这个指针可以将多个哈希值相同的键值对连接在⼀次,以此来解决键冲突(collision)的问题。 } dictEntry;
-
字典
typedef struct dict { dictType *type;//type属性是⼀个指向dictType结构的指针,每个dictType结构保存了⼀簇⽤于操作特定类型键值对的函数,Redis会为⽤途不同的字典设置不同的类型特定函数。 void *privdata;//privdata属性保存了需要传给dictType类型特定函数的可选参数。 dictht ht[2];//ht属性是⼀个包含两个项的数组,数组中的每个项都是⼀个dictht哈希表,⼀般情况下, 字典只使⽤ht[0]哈希表,ht[1]哈希表只会在对ht[0]哈希表进⾏rehash时使⽤。 long rehashidx;//记录了rehash⽬前的进度,如果⽬前没有在进⾏rehash,那么它的值为-1。 unsigned long iterators; /* number of iterators currently running */ } dict; typedef struct dictType { uint64_t (*hashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); void *(*valDup)(void *privdata, const void *obj); int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType;
4.2 Rehash
-
随着操作的不断执⾏,哈希表保存的键值对会逐渐地增多或者减少,为了让哈希表的负载因⼦(ratio)维持在⼀个合理的范围之内,当哈希表保存的键值对数量太多或者太少时,程序需要对哈希表的⼤⼩进⾏相应的扩展或者收缩。
ratio = ht[0].used / ht[0].size
-
扩展和收缩哈希表的⼯作可以通过执⾏rehash(重新散列)操作来完成,Redis对字典的哈希表执⾏rehash的策略如下:
- 如果ratio⼩于0.1,则会对hash表进⾏收缩操作
- 服务器⽬前没有在执⾏BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因⼦⼤于/等于1,则扩容hash表,扩容⼤⼩为当前ht[0].used*2
- 服务器⽬前正在执⾏BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因⼦⼤于/等于5,则扩容hash表,并且扩容⼤⼩为当前ht[0].used*2
-
扩容的步骤如下
- 为字典**ht[1]**哈希表分配合适的空间
- 将ht[0]中所有的键值对rehash到ht[1](rehash指的是重新计算键的哈希值和索引值,然后将键值对放置到**ht[1]**哈希表的指定位置上)
- 当ht[0包含的所有键值对都迁移到了ht[1]之后(ht[0]变为空表),释放ht[0],将ht[1]设置为ht[0],并在ht[1]新创建⼀个空⽩哈希表,为下⼀次rehash做准备
-
为了避免rehash对服务器性能造成影响,服务器不是⼀次性将ht[0]⾥⾯的所有键值对全部rehash到ht[1],⽽是分多次、渐进式地将ht[0]⾥⾯的键值对慢慢地rehash到ht[1]
- 为**ht[1]分配空间,让字典同时持有ht[0]和ht[1]**两个哈希表
- 在字典中维持⼀个索引计数器变量rehashidx,并将它的值设置为0,表示rehash⼯作正式开始
- 在rehash进⾏期间,每次对字典执⾏添加、删除、查找或者更新操作(甚至后台启动定时器)时,程序除了执⾏指定的操作以外,还会顺带将ht[0]哈希表在rehashidx索引上的所有键值对rehash到ht[1],当rehash⼯作完成之后,程序将rehashidx属性的值增⼀
- 随着字典操作的不断执⾏,最终在某个时间点上,ht[0]的所有键值对都会被rehash⾄ht[1],这时程序将 rehashidx属性的值设为**-1**,表示rehash操作已完成
4.3 主从复制
-
redis为了实现⾼可⽤(⽐如解决单点故障的问题),会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的⾼可⽤性,实现对数据的冗余备份,保证数据和服务的可靠性。
-
如何配置:
- 配置⽂件:在从服务器的配置⽂件中加⼊:slaveof
- 启动命令:redis-server启动命令后加⼊**–slaveof**
- 客户端命令:Redis服务器启动后,直接通过客户端执⾏命令:slaveof,则该Redis实例成为从节点。
- PS:通过info replication命令可以看到复制的⼀些信息
- ⽆论是通过哪⼀种⽅式来建⽴主从复制,都是从节点来执⾏slaveof命令
-
主从复制的作⽤
- 数据冗余:主从复制实现了数据的热备份,是持久化之外的⼀种数据冗余⽅式。
- 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是⼀种服务的 冗余。
- 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即 写Redis数据时应⽤连接主节点,读Redis数据时应⽤连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以⼤⼤提⾼Redis服务器的并发量。
- 读写分离:可以⽤于实现读写分离,主库写、从库读,读写分离不仅可以提⾼服务器的负载能⼒,同时 可根据需求的变化,改变从库的数量。
- ⾼可⽤基⽯:除了上述作⽤以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis ⾼可⽤的基础。
-
Redis的主从复制功能除了⽀持⼀个Master节点对应多个Slave节点的同时进⾏复制外,还⽀持Slave节点向其它多个Slave节点进⾏复制。这样就使得架构师能够灵活组织业务缓存数据的传播,例如使⽤多个Slave作为数据读取服务的同时,专⻔使⽤⼀个Slave节点为流式分析⼯具服务。Redis的主从复制功能分为两种数据同步模式进⾏:全量数据同步和增量数据同步。
-
先执⾏⼀次全同步
-
请求master BgSave出⾃⼰的⼀个RDB Snapshot⽂件发给slave,slave接收完毕后,清除掉⾃⼰的旧数据,然后将RDB载⼊内存。
-
当Slave节点给定的replication id和Master的replication id不⼀致时,或者Slave给定的上⼀次增量同步的offset的位置在Master的环形内存中(replication backlog)⽆法定位时,Master就会对Slave发起全量同步操作。这时⽆论是否在Master打开了RDB快照功能,它和Slave节点的每⼀次全量同步操作过程都会更新/创建Master上的RDB⽂件。在Slave连接到Master,并完成第⼀次全量数据同步后,接下来Master到Slave的数据同步过程⼀般就是增量同步形式了(也称为部分同步)。增量同步过程不再主要依赖RDB⽂件,Master会将新产⽣的数据变化操作存放在replication backlog这个内存缓存区,这个内存区域是⼀个环形缓冲区,也就是说是⼀个FIFO的队列。
-
-
之后执行增量同步
-
master作为⼀个普通的client连⼊slave,将所有写操作转发给slave,没有特殊的同步协议。
-
-
Last modified on 2020-12-28