本文总阅读量 本站访客数人次 本站总访问量
gongluck's blog
C/C++ Golang 音视频流媒体
源码分析专题

九、源码分析专题

0.项目仓库

1.Nginx架构和模块

1.1 Nginx架构

  • nginx在启动后,在unix系统中会以daemon的方式在后台运行,后台进程包含一个master进程和多个worker进程。

  • master进程主要用来管理worker进程,包含:

    • 接收来自外界的信号,
    • 向各worker进程发送信号,
    • 监控worker进程的运行状态,
    • worker进程退出后(异常情况下),会自动重新启动新的worker进程。
  • 而基本的网络事件,则是放在worker进程中来处理了。多个worker进程之间是对等的,他们同等竞争来自客户端的请求,各进程互相之间是独立的。

    nginx进程模型

  • 每个worker进程都是从master进程fork过来,在master进程里面,先建立好需要listensocketlistenfd)之后,然后再fork出多个worker进程。所有worker进程的listenfd会在新连接到来时变得可读,为保证只有一个进程处理该连接,所有worker 进程在注册listenfd读事件前抢accept_mutex,抢到互斥锁的那个进程注册listenfd读事件,在读事件里调用accept接受该连接。当一个worker进程在accept这个连接之后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接,这样一个完整的请求就是这样的了。

  • 对于每个worker进程来说,独立的进程,不需要加锁,所以省掉了锁带来的开销,同时在编程以及问题查找时,也会方便很多。其次,采用独立的进程,可以让互相之间不会影响,一个进程退出后,其它进程还在工作,服务不会中断,master进程则很快启动新的worker进程。

  • nginx采用了异步非阻塞的方式来处理请求。非阻塞就是,事件没有准备好,马上返回EAGAIN

  • nginx里面的定时器事件是放在一颗维 护定时器的红黑树里面,每次在进入epoll_wait前,先从该红黑树里面拿到所有定时器事件的最小时间,在计算出epoll_wait的超时时间后进入epoll_wait。所以,当没有事件产生,也没有中断信号时,epoll_wait会超时,也就是说,定时器事件到了。这时,nginx会检查所有的超时事件,将他们的状态设置为超时,然后再去处理网络事件。

1.2 Nginx基础概念

  • connection

    struct ngx_connection_s {
        void               *data;
        ngx_event_t        *read;
        ngx_event_t        *write;
      
        ngx_socket_t        fd;
      
        ngx_recv_pt         recv;
        ngx_send_pt         send;
        ngx_recv_chain_pt   recv_chain;
        ngx_send_chain_pt   send_chain;
      
        ngx_listening_t    *listening;
      
        off_t               sent;
      
        ngx_log_t          *log;
      
        ngx_pool_t         *pool;
      
        int                 type;
      
        struct sockaddr    *sockaddr;
        socklen_t           socklen;
        ngx_str_t           addr_text;
      
        ngx_proxy_protocol_t  *proxy_protocol;
      
    #if (NGX_SSL || NGX_COMPAT)
        ngx_ssl_connection_t  *ssl;
    #endif
      
        ngx_udp_connection_t  *udp;
      
        struct sockaddr    *local_sockaddr;
        socklen_t           local_socklen;
      
        ngx_buf_t          *buffer;
      
        ngx_queue_t         queue;
      
        ngx_atomic_uint_t   number;
      
        ngx_uint_t          requests;
      
        unsigned            buffered:8;
      
        unsigned            log_error:3;     /* ngx_connection_log_error_e */
      
        unsigned            timedout:1;
        unsigned            error:1;
        unsigned            destroyed:1;
      
        unsigned            idle:1;
        unsigned            reusable:1;
        unsigned            close:1;
        unsigned            shared:1;
      
        unsigned            sendfile:1;
        unsigned            sndlowat:1;
        unsigned            tcp_nodelay:2;   /* ngx_connection_tcp_nodelay_e */
        unsigned            tcp_nopush:2;    /* ngx_connection_tcp_nopush_e */
      
        unsigned            need_last_buf:1;
      
    #if (NGX_HAVE_AIO_SENDFILE || NGX_COMPAT)
        unsigned            busy_count:2;
    #endif
      
    #if (NGX_THREADS || NGX_COMPAT)
        ngx_thread_task_t  *sendfile_task;
    #endif
    };
    
    • nginx在实现时,是通过一个连接池来管理的,每个worker进程都有一个独立的连接池,连接池的大小是worker_connections。这里的连接池里面保存的其实不是真实的连接,它只是一个worker_connections大小的一个ngx_connection_t结构的数组。并且,nginx会通过一个链表free_connections来保存所有的空闲ngx_connection_t,每次获取一个连接时,就从空闲连接链表中获取一个,用完后,再放回空闲连接链表里面。
    • 一个nginx能建立的最大连接数,应该是worker_connections * worker_processes。当然,这里说的是最大连接数,对于HTTP请求本地资源来说,能够支持的最大并发数量是 worker_connections * worker_processes,而如果是 HTTP 作为反向代理来说,最大并发数量应该是worker_connections * worker_processes / 2。因为作为反向代理服务器,每个并发会建立与客户端的连接和与后端服务的连接,会占用两个连接。
    • nginx使用一个叫ngx_accept_disabled的变量来控制是否去竞争accept_mutex锁。计算 ngx_accept_disabled的值,这个值是nginx单进程的所有连接总数的八分之一,减去剩下的空闲连接数量,得到的这个ngx_accept_disabled有一个规律,当剩余连接数小于总连接数的八分之一时,其值才大于 0,而且剩余的连接数越小,这个值越大。当ngx_accept_disabled大于0时,不会去尝试获取accept_mutex锁,并且将ngx_accept_disabled减1,于是,每次执行到此处时,都会去减1,直到小于0。
  • request

    struct ngx_http_request_s {
        uint32_t                          signature;         /* "HTTP" */
      
        ngx_connection_t                 *connection;
      
        void                            **ctx;
        void                            **main_conf;
        void                            **srv_conf;
        void                            **loc_conf;
      
        ngx_http_event_handler_pt         read_event_handler;
        ngx_http_event_handler_pt         write_event_handler;
      
    #if (NGX_HTTP_CACHE)
        ngx_http_cache_t                 *cache;
    #endif
      
        ngx_http_upstream_t              *upstream;
        ngx_array_t                      *upstream_states;
                                             /* of ngx_http_upstream_state_t */
      
        ngx_pool_t                       *pool;
        ngx_buf_t                        *header_in;
      
        ngx_http_headers_in_t             headers_in;
        ngx_http_headers_out_t            headers_out;
      
        ngx_http_request_body_t          *request_body;
      
        time_t                            lingering_time;
        time_t                            start_sec;
        ngx_msec_t                        start_msec;
      
        ngx_uint_t                        method;
        ngx_uint_t                        http_version;
      
        ngx_str_t                         request_line;
        ngx_str_t                         uri;
        ngx_str_t                         args;
        ngx_str_t                         exten;
        ngx_str_t                         unparsed_uri;
      
        ngx_str_t                         method_name;
        ngx_str_t                         http_protocol;
        ngx_str_t                         schema;
      
        ngx_chain_t                      *out;
        ngx_http_request_t               *main;
        ngx_http_request_t               *parent;
        ngx_http_postponed_request_t     *postponed;
        ngx_http_post_subrequest_t       *post_subrequest;
        ngx_http_posted_request_t        *posted_requests;
      
        ngx_int_t                         phase_handler;
        ngx_http_handler_pt               content_handler;
        ngx_uint_t                        access_code;
      
        ngx_http_variable_value_t        *variables;
      
    #if (NGX_PCRE)
        ngx_uint_t                        ncaptures;
        int                              *captures;
        u_char                           *captures_data;
    #endif
      
        size_t                            limit_rate;
        size_t                            limit_rate_after;
      
        /* used to learn the Apache compatible response length without a header */
        size_t                            header_size;
      
        off_t                             request_length;
      
        ngx_uint_t                        err_status;
      
        ngx_http_connection_t            *http_connection;
        ngx_http_v2_stream_t             *stream;
      
        ngx_http_log_handler_pt           log_handler;
      
        ngx_http_cleanup_t               *cleanup;
      
        unsigned                          count:16;
        unsigned                          subrequests:8;
        unsigned                          blocked:8;
      
        unsigned                          aio:1;
      
        unsigned                          http_state:4;
      
        /* URI with "/." and on Win32 with "//" */
        unsigned                          complex_uri:1;
      
        /* URI with "%" */
        unsigned                          quoted_uri:1;
      
        /* URI with "+" */
        unsigned                          plus_in_uri:1;
      
        /* URI with " " */
        unsigned                          space_in_uri:1;
      
        unsigned                          invalid_header:1;
      
        unsigned                          add_uri_to_alias:1;
        unsigned                          valid_location:1;
        unsigned                          valid_unparsed_uri:1;
        unsigned                          uri_changed:1;
        unsigned                          uri_changes:4;
      
        unsigned                          request_body_in_single_buf:1;
        unsigned                          request_body_in_file_only:1;
        unsigned                          request_body_in_persistent_file:1;
        unsigned                          request_body_in_clean_file:1;
        unsigned                          request_body_file_group_access:1;
        unsigned                          request_body_file_log_level:3;
        unsigned                          request_body_no_buffering:1;
      
        unsigned                          subrequest_in_memory:1;
        unsigned                          waited:1;
      
    #if (NGX_HTTP_CACHE)
        unsigned                          cached:1;
    #endif
      
    #if (NGX_HTTP_GZIP)
        unsigned                          gzip_tested:1;
        unsigned                          gzip_ok:1;
        unsigned                          gzip_vary:1;
    #endif
      
    #if (NGX_PCRE)
        unsigned                          realloc_captures:1;
    #endif
      
        unsigned                          proxy:1;
        unsigned                          bypass_cache:1;
        unsigned                          no_cache:1;
      
        /*
         * instead of using the request context data in
         * ngx_http_limit_conn_module and ngx_http_limit_req_module
         * we use the bit fields in the request structure
         */
        unsigned                          limit_conn_status:2;
        unsigned                          limit_req_status:3;
      
        unsigned                          limit_rate_set:1;
        unsigned                          limit_rate_after_set:1;
      
    #if 0
        unsigned                          cacheable:1;
    #endif
      
        unsigned                          pipeline:1;
        unsigned                          chunked:1;
        unsigned                          header_only:1;
        unsigned                          expect_trailers:1;
        unsigned                          keepalive:1;
        unsigned                          lingering_close:1;
        unsigned                          discard_body:1;
        unsigned                          reading_body:1;
        unsigned                          internal:1;
        unsigned                          error_page:1;
        unsigned                          filter_finalize:1;
        unsigned                          post_action:1;
        unsigned                          request_complete:1;
        unsigned                          request_output:1;
        unsigned                          header_sent:1;
        unsigned                          expect_tested:1;
        unsigned                          root_tested:1;
        unsigned                          done:1;
        unsigned                          logged:1;
      
        unsigned                          buffered:4;
      
        unsigned                          main_filter_need_in_memory:1;
        unsigned                          filter_need_in_memory:1;
        unsigned                          filter_need_temporary:1;
        unsigned                          preserve_body:1;
        unsigned                          allow_ranges:1;
        unsigned                          subrequest_ranges:1;
        unsigned                          single_range:1;
        unsigned                          disable_not_modified:1;
        unsigned                          stat_reading:1;
        unsigned                          stat_writing:1;
        unsigned                          stat_processing:1;
      
        unsigned                          background:1;
        unsigned                          health_check:1;
      
        /* used to parse HTTP headers */
      
        ngx_uint_t                        state;
      
        ngx_uint_t                        header_hash;
        ngx_uint_t                        lowcase_index;
        u_char                            lowcase_header[NGX_HTTP_LC_HEADER_LEN];
      
        u_char                           *header_name_start;
        u_char                           *header_name_end;
        u_char                           *header_start;
        u_char                           *header_end;
      
        /*
         * a memory that can be reused after parsing a request line
         * via ngx_http_ephemeral_t
         */
      
        u_char                           *uri_start;
        u_char                           *uri_end;
        u_char                           *uri_ext;
        u_char                           *args_start;
        u_char                           *request_start;
        u_char                           *request_end;
        u_char                           *method_end;
        u_char                           *schema_start;
        u_char                           *schema_end;
        u_char                           *host_start;
        u_char                           *host_end;
        u_char                           *port_start;
        u_char                           *port_end;
      
        unsigned                          http_minor:16;
        unsigned                          http_major:16;
    };
    
    • 对于nginx来说,一个请求是从ngx_http_init_request开始的,在这个函数中,会设置读事件为ngx_http_process_request_line,也就是说,接下来的网络事件,会由ngx_http_process_request_line来执行,处理请求行。通过ngx_http_read_request_header来读取请求数据。然后调用ngx_http_parse_request_line函数来解析请求行。

    • nginx为提高效率,采用状态机来解析请求行,而且在进行method的比较时,没有直接使用字符串比较,而是将四个字符转换成一个整型,然后一次比较以减少cpu的指令数。

    • 整个请求行解析到的参数,会保存到ngx_http_request_t结构当中。

    • 在解析完请求行后,nginx会设置读事件的handlerngx_http_process_request_headers,然后后续的请求就在ngx_http_process_request_headers中进行读取与解析。

    • ngx_http_process_request_headers函数用来读取请求头,跟请求行一样,还是调用ngx_http_read_request_header来读取请求头,调用ngx_http_parse_header_line来解析一行请求头,解析到的请求头会保存到ngx_http_request_t的域headers_in中,headers_in是一个链表结构,保存所有的请求头。

    • HTTP中有些请求是需要特别处理的,这些请求头与请求处理函数存放在一个映射表里面,即ngx_http_headers_in,在初始化时,会生成一个hash表,当每解析到一个请求头后,就会先在这个hash表中查找,如果有找到,则调用相应的处理函数来处理这个请求头。

    • nginx解析到两个回车换行符时,就表示请求头的结束,此时就会调用ngx_http_process_request来处理请求了。ngx_http_process_request会设置当前的连接的读写事件处理函数为ngx_http_request_handler,然后再调用ngx_http_handler来真正开始处理一个完整的http请求。读写事件处理函数都是ngx_http_request_handler,其实在这个函数中,会根据当前事件是读事件还是写事件,分别调用ngx_http_request_t中的read_event_handler或者是write_event_handler。由于此时,我们的请求头已经读取完成了,nginx先不读取请求body,设置read_event_handlerngx_http_block_reading,即不读取数据了。

    • 真正开始处理数据,是在ngx_http_handler这个函数里面,这个函数会设置 write_event_handlerngx_http_core_run_phases,并执行ngx_http_core_run_phases函数。ngx_http_core_run_phases这个函数将执行多阶段请求处理,nginx将一个http请求的处理分为多个阶段,那么这个函数就是执行这些阶段来产生数据。最终是调用ngx_http_core_run_phases来处理请求,产生的响应头会放在ngx_http_request_theaders_out中。

    • nginx的各种阶段会对请求进行处理,最后会调用filter来过滤数据,对数据进行加工。这里的filter包括header filterbody filter,即对响应头或响应体进行处理。filter是一个链表结构,分别有header filterbody filter,先执行header filter中的所有filter,然后再执行body filter中的所有filter。在header filter中的最后一个filter,即ngx_http_header_filter, 这个filter将会遍历所有的响应头,最后需要输出的响应头在一个连续的内存,然后调用ngx_http_write_filter进行输出。ngx_http_write_filterbody filter中的最后一个,所以nginx首先的body信息,在经过一系列的body filter之后,最后也会调用ngx_http_write_filter来进行输出。nginx会将整个请求头都放在一个buffer里面,这个buffer的大小通过配置项client_header_buffer_size来设置,如果用户的请求头太大,这个 buffer装不下,那nginx就会重新分配一个新的更大的buffer来装请求头,这个大buffer可以通过large_client_header_buffers来设置,这个large_buffer这一组buffer

    • 为了保存请求行或请求头的完整性,一个完整的请求行或请求头,需要放在一个连续的内存里面,所以,一个完整的请求行或请求头,只会保存在一个buffer里面。这样,如果请求行大于一个buffer的大小,就会返回414错误,如果一个请求头大小大于一个buffer大小,就会返回400错误。

      nginx处理http流程

1.3 Handler模块

  • Handler模块就是接受来自客户端的请求并产生输出的模块。

  • 自定义模块

    /*
     * @Author: gongluck 
     * @Date: 2020-12-08 19:10:29 
     * @Last Modified by: gongluck
     * @Last Modified time: 2020-12-08 19:11:06
     */
      
    // ./configure --add-module=/mnt/e/Code/CVIP/code/nginx/module
    // sudo ./objs/nginx -c /mnt/e/Code/CVIP/code/nginx/module/hello.conf
      
    #include <ngx_http.h>
    #include <ngx_config.h>
    #include <ngx_core.h>
      
    static char *ngx_http_hello_module_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
    static ngx_int_t ngx_http_hello_module_handler(ngx_http_request_t *r);
      
    // 模块配置结构
    typedef struct
    {
        ngx_str_t hello_string;
    } ngx_http_hello_loc_conf_t;
      
    // 模块配置指令
    static ngx_command_t hello_commands[] = {
        {
            ngx_string("hello"),                 //配置指令的名称
            NGX_HTTP_LOC_CONF | NGX_CONF_NOARGS, //该配置指令属性的集合
            ngx_http_hello_module_set,           //当nginx在解析配置的时候,如果遇到这个配置指令,将会把读取到的值传递给这个函数进行分解处理
            NGX_HTTP_LOC_CONF_OFFSET,            //指定当前配置项存储的内存位置,实际上是使用哪个内存池的问题。
            0,                                   //指定该配置项值的精确存放位置,一般指定为某一个结构体变量的字段偏移。
            NULL                                 //可以指向任何一个在读取配置过程中需要的数据,以便于进行配置读取的处理。
        },
        ngx_null_command};
      
    // 模块上下文结构
    static ngx_http_module_t hello_ctx = {
        NULL, //在创建和读取该模块的配置信息之前被调用
        NULL, //在创建和读取该模块的配置信息之后被调用
      
        NULL, //调用该函数创建本模块位于http block的配置信息存储结构。该函数成功的时候,返回创建的配置对象。失败的话,返回NULL。
        NULL, //调用该函数初始化本模块位于http block 的配置信息存储结构。该函数成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。
      
        NULL, //调用该函数创建本模块位于http server block的配置信息存储结构,每个server block会创建一个。该函数成功的时候,返回创建的配置对象。失败的话,返回NULL。
        NULL, //因为有些配置指令既可以出现在http block,也可以出现在http server block中。那么遇到这种情况,每个server都会有自己存储结构来存储该server的配置,但是在这种情况下http block中的配置与server block中的配置信息发生冲突的时候,就需要调用此函数进行合并,该函数并非必须提供,当预计到绝对不会发生需要合并的情况的时候,就无需提供。当然为了安全起见还是建议提供。该函数执行成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。
      
        NULL, //调用该函数创建本模块位于location block的配置信息存储结构。每个在配置中指明的location创建一个。该函数执行成功,返回创建的配置对象。失败的话,返回NULL。
        NULL, //与merge_srv_conf类似,这个也是进行配置值合并的地方。该函数成功的时候,返回NGX_CONF_OK。失败的话,返回NGX_CONF_ERROR或错误字符串。
    };
      
    // ngx_http_hello_module
    ngx_module_t ngx_http_hello_module = {
        NGX_MODULE_V1,
        &hello_ctx,      /* module context */
        hello_commands,  /* module directives */
        NGX_HTTP_MODULE, /* module type */
        NULL,            /* init master */
        NULL,            /* init module */
        NULL,            /* init process */
        NULL,            /* init thread */
        NULL,            /* exit thread */
        NULL,            /* exit process */
        NULL,            /* exit master */
        NGX_MODULE_V1_PADDING};
      
    static char *ngx_http_hello_module_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
    {
        ngx_http_core_loc_conf_t *corecf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
        corecf->handler = ngx_http_hello_module_handler;
      
        return NGX_CONF_OK;
    }
      
    static ngx_int_t ngx_http_hello_module_handler(ngx_http_request_t *r)
    {
        ngx_str_t str = ngx_string("Hello, Nginx!");
        ngx_buf_t *b = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));
        if (b == NULL)
        {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }
      
        ngx_chain_t out;
        out.buf = b;
        out.next = NULL;
        b->pos = str.data;
        b->last = b->pos + str.len;
        b->memory = 1;   /* this buffer is in memory */
        b->last_buf = 1; /* this is the last buffer in the buffer chain */
      
        /* set the status line */
        r->headers_out.status = NGX_HTTP_OK;
        r->headers_out.content_length_n = str.len;
      
        /* send the headers of your response */
        ngx_int_t rc = ngx_http_send_header(r);
        if (rc == NGX_ERROR || rc > NGX_OK || r->header_only)
        {
            return rc;
        }
      
        /* send the buffer chain of your response */
        return ngx_http_output_filter(r, &out);
    }
    

1.4 Filter模块

  • 过滤(filter)模块是过滤响应头和内容的模块,可以对回复的头和内容进行处理。它的处理时间在获取回复内容之后,向用户发送响应之前。它的处理过程分为两个阶段,过滤HTTP回复的头部和主体,在这两个阶段可以分别对头部和主体进行修改。

  • 所有模块的响应内容要返回给客户端,都必须调用这两个接口

    //分别对头部和主体进行过滤的函数
    ngx_http_top_header_filter(r); 
    ngx_http_top_body_filter(r, in); 
    
  • 过滤模块的调用是有顺序的,它的顺序在编译的时候就决定了。控制编译的脚本位于auto/modules中,当编译完Nginx以后,可以在objs目录下面看到一个ngx_modules.c的文件。

    ngx_module_t *ngx_modules[] = {
        &ngx_core_module,
        &ngx_errlog_module,
        &ngx_conf_module,
        &ngx_regex_module,
        &ngx_events_module,
        &ngx_event_core_module,
        &ngx_epoll_module,
        &ngx_http_module,
        &ngx_http_core_module,
        &ngx_http_log_module,
        &ngx_http_upstream_module,
        &ngx_http_static_module,
        &ngx_http_autoindex_module,
        &ngx_http_index_module,
        &ngx_http_mirror_module,
        &ngx_http_try_files_module,
        &ngx_http_auth_basic_module,
        &ngx_http_access_module,
        &ngx_http_limit_conn_module,
        &ngx_http_limit_req_module,
        &ngx_http_geo_module,
        &ngx_http_map_module,
        &ngx_http_split_clients_module,
        &ngx_http_referer_module,
        &ngx_http_rewrite_module,
        &ngx_http_proxy_module,
        &ngx_http_fastcgi_module,
        &ngx_http_uwsgi_module,
        &ngx_http_scgi_module,
        &ngx_http_memcached_module,
        &ngx_http_empty_gif_module,
        &ngx_http_browser_module,
        &ngx_http_upstream_hash_module,
        &ngx_http_upstream_ip_hash_module,
        &ngx_http_upstream_least_conn_module,
        &ngx_http_upstream_random_module,
        &ngx_http_upstream_keepalive_module,
        &ngx_http_upstream_zone_module,
        &ngx_http_hello_module,
        &ngx_http_write_filter_module,
        &ngx_http_header_filter_module,
        &ngx_http_chunked_filter_module,
        &ngx_http_range_header_filter_module,
        &ngx_http_gzip_filter_module,
        &ngx_http_postpone_filter_module,
        &ngx_http_ssi_filter_module,
        &ngx_http_charset_filter_module,
        &ngx_http_userid_filter_module,
        &ngx_http_headers_filter_module,//
        &ngx_http_copy_filter_module,//
        &ngx_http_range_body_filter_module,
        &ngx_http_not_modified_filter_module,
        NULL
    };
    
  • write_filternot_modified_filter,模块的执行顺序是反向的。所有第三方的模块只能加入到copy_filterheaders_filter模块之间执行。

  • 在过滤模块中,所有输出的内容都是通过一条单向链表所组成。这种单向链表的设计,正好应和了Nginx流式的输出模式。每次Nginx都是读到一部分的内容,就放到链表,然后输出出去。这种设计的好处是简单,非阻塞,但是相应的问题就是跨链表的内容操作非常麻烦, 如果需要跨链表,很多时候都只能缓存链表的内容。单链表负载的就是ngx_buf_t,这个结构体使用非常广泛。

  • 响应头过滤函数主要的用处就是处理HTTP响应的头,可以根据实际情况对于响应头进行修改或者添加删除。响应头过滤函数先于响应体过滤函数,而且只调用一次,所以一般可作过滤模块的初始化工作。

    ngx_int_t
    ngx_http_send_header(ngx_http_request_t *r)
    {
        if (r->post_action) {
            return NGX_OK;
        }
      
        if (r->header_sent) {
            ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                          "header already sent");
            return NGX_ERROR;
        }
      
        if (r->err_status) {
            r->headers_out.status = r->err_status;
            r->headers_out.status_line.len = 0;
        }
      
        return ngx_http_top_header_filter(r);//
    }
    
  • 可以把HTTP响应头的存储方式想象成一个hash表,在Nginx内部可以很方便地查找和修改各个响应头 部,ngx_http_header_filter_module过滤模块把所有的HTTP头组合成一个完整的buffer,最终ngx_http_write_filter_module过滤模块把buffer输出。

  • 响应体过滤函数是过滤响应主体的函数。ngx_http_top_body_filter这个函数每个请求可能会被执行多次,它的入口函数是ngx_http_output_filter

    ngx_int_t
    ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
    {
        ngx_int_t          rc;
        ngx_connection_t  *c;
      
        c = r->connection;
      
        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http output filter \"%V?%V\"", &r->uri, &r->args);
      
        rc = ngx_http_top_body_filter(r, in);
      
        if (rc == NGX_ERROR) {
            /* NGX_ERROR may be returned by any filter */
            c->error = 1;
        }
      
        return rc;
    }
    

1.5 Upstream模块

  • 从本质上说,upstream属于handler,只是他不产生自己的内容,而是通过请求后端服务器得到内容,所以才称为upstream(上游)。
  • 请求并取得响应内容的整个过程已经被封装到nginx内部,所以upstream模块只需要开发若干回调函数,完成构造请求和解析响应等具体的工作。
  • upstream模块使用的就是handler模块的接入方式。同时,upstream模块的指令系统的设计也是遵循handler模块的基本规则:配置该模块才会执行该模块。

1.6 负载均衡模块

  • 负载均衡模块用于从upstream指令定义的后端主机列表中选取一台主机。nginx先使用负载均衡模块找到一台主机,再使用upstream模块实现与这台主机的交互。
  • 核心指令ip_hash只能在upstream {}中使用。这条指令用于通知nginx使用ip hash负载均衡算法。如果没加这条指令,nginx会使用默认的round robin负载均衡模块。

2.Skynet

2.1 环境安装

git clone https://github.com/cloudwu/skynet.git
cd skynet
make linux -j 8

2.2 Skynet工作模型

Skynet工作模型

  • skynet中的actor模型

    • 结构组成

      • 隔离的环境(内存块或lua虚拟机)
      • 消息队列
      • 回调函数
    • 实现

      • logger服务service-src/service_logger.c
      • lua服务启动器service-src/service_snlua.c

2.3 Skynet Lua例子

-- ./skynet /mnt/e/Code/CVIP/code/skynet/test.conf

local skynet = require "skynet"
local socket = require "skynet.socket"

local function event_loop(clientfd)
    while true do
        local data = socket.readline(clientfd)--从网络获取 以\n为分隔符的数据包
        if not data then
            return
        end
        print(clientfd, "recv:", data)
        socket.write(clientfd, data.."\n")
    end
end

local function accept(clientfd, addr)-- 回调函数的作用 就是可以将 fd绑定到其他actor
    print("accept a connect:", clientfd, addr)
    socket.start(clientfd) -- 将clientfd注册到epoll
    skynet.fork(event_loop, clientfd) -- 实现一个简单的echo服务,可以通过 telnet 127.0.0.1 8001来连接skynet
end

skynet.start(function ()
    local listenfd = socket.listen("0.0.0.0", 8001) -- socket bind listen 
    socket.start(listenfd, accept) -- 将listenfd注册到epoll,收到连接会回调accept函数
end)

3.ZeroMQ

3.1 环境安装

git clone https://github.com/zeromq/libzmq.git
cd libzmq/
./autogen.sh
./configure --enable-debug
make -j 8
sudo make install
sudo ldconfig

3.2 例子代码

/*
 * @Author: gongluck 
 * @Date: 2020-12-10 02:27:44 
 * @Last Modified by: gongluck
 * @Last Modified time: 2020-12-10 02:37:16
 */

// gcc -o server server.c -lzmq -g -O0

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main()
{
    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_REP);
    int rc = zmq_bind(responder, "tcp://*:5555");

    while (1)
    {
        char buffer[10];
        int ret = zmq_recv(responder, buffer, 10, 0);
        printf("收到%.*s\n", ret, buffer);
        zmq_send(responder, "server recved.", strlen("server recved."), 0);
    }
    
    return 0;
}
/*
 * @Author: gongluck 
 * @Date: 2020-12-10 02:33:38 
 * @Last Modified by: gongluck
 * @Last Modified time: 2020-12-10 02:37:00
 */

// gcc -o client client.c -lzmq -g -O0

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main()
{
    void *context = zmq_ctx_new();
    void *requester = zmq_socket(context, ZMQ_REQ);
    zmq_connect(requester, "tcp://localhost:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++)
    {
        char buffer[20] = {0};
        zmq_send(requester, "Hello", 5, 0);
        zmq_recv(requester, buffer, 20, 0);
        printf("接收到 %.20s\n", buffer);
    }
    zmq_close(requester);
    zmq_ctx_destroy(context);
    return 0;
}

4.Redis

4.1 Redis的元素结构

  • hash值取余的table数组+hash

  • hash

    /* This is our hash table structure. Every dictionary has two of this as we
     * implement incremental rehashing, for the old to the new table. */
    typedef struct dictht {
        dictEntry **table;//table属性是⼀个数组,数组中的每个元素都是⼀个指向dict.h/dictEntry 结构的指针,每个dictEntry结构保存着⼀个键值对
        unsigned long size;//size属性记录了哈希表的⼤⼩,也即是table数组的⼤⼩
        unsigned long sizemask;//sizemask属性的值总是等于size - 1, 这个属性和哈希值⼀起决定⼀个键应该被放到table数组的哪个索引上⾯
        unsigned long used;//used属性则记录了哈希表⽬前已有节点(键值对)的数量
    } dictht;
    
  • hash表节点

    typedef struct dictEntry {
        void *key;
        union {
            void *val;
            uint64_t u64;
            int64_t s64;
            double d;
        } v;
        struct dictEntry *next;//next属性是指向另⼀个哈希表节点的指针,这个指针可以将多个哈希值相同的键值对连接在⼀次,以此来解决键冲突(collision)的问题。
    } dictEntry;
    

redis哈希表结构

  • 字典

    typedef struct dict {
        dictType *type;//type属性是⼀个指向dictType结构的指针,每个dictType结构保存了⼀簇⽤于操作特定类型键值对的函数,Redis会为⽤途不同的字典设置不同的类型特定函数。
        void *privdata;//privdata属性保存了需要传给dictType类型特定函数的可选参数。
        dictht ht[2];//ht属性是⼀个包含两个项的数组,数组中的每个项都是⼀个dictht哈希表,⼀般情况下, 字典只使⽤ht[0]哈希表,ht[1]哈希表只会在对ht[0]哈希表进⾏rehash时使⽤。
        long rehashidx;//记录了rehash⽬前的进度,如果⽬前没有在进⾏rehash,那么它的值为-1。
        unsigned long iterators; /* number of iterators currently running */
    } dict;
    typedef struct dictType {
        uint64_t (*hashFunction)(const void *key);
        void *(*keyDup)(void *privdata, const void *key);
        void *(*valDup)(void *privdata, const void *obj);
        int (*keyCompare)(void *privdata, const void *key1, const void *key2);
        void (*keyDestructor)(void *privdata, void *key);
        void (*valDestructor)(void *privdata, void *obj);
    } dictType;
    

    redis字典结构

4.2 Rehash

  • 随着操作的不断执⾏,哈希表保存的键值对会逐渐地增多或者减少,为了让哈希表的负载因⼦(ratio)维持在⼀个合理的范围之内,当哈希表保存的键值对数量太多或者太少时,程序需要对哈希表的⼤⼩进⾏相应的扩展或者收缩。

    ratio = ht[0].used / ht[0].size
    
  • 扩展和收缩哈希表的⼯作可以通过执⾏rehash(重新散列)操作来完成,Redis对字典的哈希表执⾏rehash的策略如下:

    • 如果ratio⼩于0.1,则会对hash表进⾏收缩操作
    • 服务器⽬前没有在执⾏BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因⼦⼤于/等于1,则扩容hash表,扩容⼤⼩为当前ht[0].used*2
    • 服务器⽬前正在执⾏BGSAVE命令或者BGREWRITEAOF命令,并且哈希表的负载因⼦⼤于/等于5,则扩容hash表,并且扩容⼤⼩为当前ht[0].used*2
  • 扩容的步骤如下

    • 为字典**ht[1]**哈希表分配合适的空间
    • ht[0]中所有的键值对rehashht[1]rehash指的是重新计算键的哈希值和索引值,然后将键值对放置到**ht[1]**哈希表的指定位置上)
    • ht[0包含的所有键值对都迁移到了ht[1]之后(ht[0]变为空表),释放ht[0],将ht[1]设置为ht[0],并在ht[1]新创建⼀个空⽩哈希表,为下⼀次rehash做准备
  • 为了避免rehash对服务器性能造成影响,服务器不是⼀次性将ht[0]⾥⾯的所有键值对全部rehashht[1],⽽是分多次、渐进式地将ht[0]⾥⾯的键值对慢慢地rehashht[1]

    • 为**ht[1]分配空间,让字典同时持有ht[0]ht[1]**两个哈希表
    • 在字典中维持⼀个索引计数器变量rehashidx,并将它的值设置为0,表示rehash⼯作正式开始
    • rehash进⾏期间,每次对字典执⾏添加、删除、查找或者更新操作(甚至后台启动定时器)时,程序除了执⾏指定的操作以外,还会顺带将ht[0]哈希表在rehashidx索引上的所有键值对rehashht[1],当rehash⼯作完成之后,程序将rehashidx属性的值增⼀
    • 随着字典操作的不断执⾏,最终在某个时间点上,ht[0]的所有键值对都会被rehashht[1],这时程序将 rehashidx属性的值设为**-1**,表示rehash操作已完成

4.3 主从复制

  • redis为了实现⾼可⽤(⽐如解决单点故障的问题),会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的⾼可⽤性,实现对数据的冗余备份,保证数据和服务的可靠性。

  • 如何配置:

    • 配置⽂件:在从服务器的配置⽂件中加⼊:slaveof
    • 启动命令:redis-server启动命令后加⼊**–slaveof**
    • 客户端命令:Redis服务器启动后,直接通过客户端执⾏命令:slaveof,则该Redis实例成为从节点。
      • PS:通过info replication命令可以看到复制的⼀些信息
      • ⽆论是通过哪⼀种⽅式来建⽴主从复制,都是从节点来执⾏slaveof命令
  • 主从复制的作⽤

    • 数据冗余:主从复制实现了数据的热备份,是持久化之外的⼀种数据冗余⽅式。
    • 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是⼀种服务的 冗余。
    • 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即 写Redis数据时应⽤连接主节点,读Redis数据时应⽤连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以⼤⼤提⾼Redis服务器的并发量。
    • 读写分离:可以⽤于实现读写分离,主库写、从库读,读写分离不仅可以提⾼服务器的负载能⼒,同时 可根据需求的变化,改变从库的数量。
    • ⾼可⽤基⽯:除了上述作⽤以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis ⾼可⽤的基础。
  • Redis的主从复制功能除了⽀持⼀个Master节点对应多个Slave节点的同时进⾏复制外,还⽀持Slave节点向其它多个Slave节点进⾏复制。这样就使得架构师能够灵活组织业务缓存数据的传播,例如使⽤多个Slave作为数据读取服务的同时,专⻔使⽤⼀个Slave节点为流式分析⼯具服务。Redis的主从复制功能分为两种数据同步模式进⾏:全量数据同步和增量数据同步。

    • 先执⾏⼀次全同步

      • 请求master BgSave出⾃⼰的⼀个RDB Snapshot⽂件发给slaveslave接收完毕后,清除掉⾃⼰的旧数据,然后将RDB载⼊内存。

      • Slave节点给定的replication idMaster的replication id不⼀致时,或者Slave给定的上⼀次增量同步的offset的位置在Master的环形内存中(replication backlog)⽆法定位时,Master就会对Slave发起全量同步操作。这时⽆论是否在Master打开了RDB快照功能,它和Slave节点的每⼀次全量同步操作过程都会更新/创建Master上的RDB⽂件。在Slave连接到Master,并完成第⼀次全量数据同步后,接下来MasterSlave的数据同步过程⼀般就是增量同步形式了(也称为部分同步)。增量同步过程不再主要依赖RDB⽂件,Master会将新产⽣的数据变化操作存放在replication backlog这个内存缓存区,这个内存区域是⼀个环形缓冲区,也就是说是⼀个FIFO的队列。

        redis完全同步

    • 之后执行增量同步

      • master作为⼀个普通的client连⼊slave,将所有写操作转发给slave,没有特殊的同步协议。

        redis增量同步


Last modified on 2020-12-28