本文总阅读量 本站访客数人次 本站总访问量
gongluck's blog
C/C++ Golang 音视频流媒体
基础组件开发专栏

三、基础组件开发专栏

0.项目仓库

1.线程池原理与实现

1.1 线程池工作流程

线程池工作流程

1.2 线程池实现

  • 线程池实现代码

    #include <pthread.h>
      
    // 添加队列节点
    #define LL_ADD(item, list) \
        do                     \
        {                      \
            item->prev = NULL; \
            item->next = list; \
            list = item;       \
        } while (0)
      
    // 移除队列节点
    #define LL_REMOVE(item, list)              \
        do                                     \
        {                                      \
            if (item->prev != NULL)            \
                item->prev->next = item->next; \
            if (item->next != NULL)            \
                item->next->prev = item->prev; \
            if (list == item)                  \
                list = item->next;             \
            item->prev = item->next = NULL;    \
        } while (0)
      
    // 工作线程
    typedef struct WORKER
    {
        pthread_t thread;
        int terminate;
        struct WORKQUEUE *workqueue;
        struct WORKER *prev;
        struct WORKER *next;
    } Worker;
      
    // 工作任务
    typedef struct JOB
    {
        void (*job_function)(struct JOB *job);
        void *user_data;
        struct JOB *prev;
        struct JOB *next;
    } Job;
      
    // 工作调度
    typedef struct WORKQUEUE
    {
        struct WORKER *workers;
        struct JOB *waiting_jobs;
        pthread_mutex_t jobs_mtx;
        pthread_cond_t jobs_cond;
    } WorkQueue;
      
    typedef WorkQueue ThreadPool;
      
    // 工作线程回调函数
    static void *WorkerThread(void *ptr)
    {
        Worker *worker = (Worker *)ptr;
      
        while (1)
        {
            pthread_mutex_lock(&worker->workqueue->jobs_mtx);
      
            while (worker->workqueue->waiting_jobs == NULL)
            {
                if (worker->terminate)
                    break;
                pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
            }
      
            if (worker->terminate)
            {
                pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
                break;
            }
      
            Job *job = worker->workqueue->waiting_jobs;
            if (job != NULL)
            {
                LL_REMOVE(job, worker->workqueue->waiting_jobs);
            }
      
            pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
      
            if (job == NULL)
                continue;
      
            job->job_function(job);
        }
      
        free(worker);
        pthread_exit(NULL);
    }
      
    // 创建线程池
    int ThreadPoolCreate(ThreadPool *workqueue, int numWorkers)
    {
        if (numWorkers < 1)
            numWorkers = 1;
        memset(workqueue, 0, sizeof(ThreadPool));
      
        pthread_mutex_init(&workqueue->jobs_mtx, NULL);
        pthread_cond_init(&workqueue->jobs_cond, NULL);
      
        for (int i = 0; i < numWorkers; i++)
        {
            Worker *worker = (Worker *)malloc(sizeof(Worker));
            if (worker == NULL)
            {
                perror("malloc");
                return 1;
            }
      
            memset(worker, 0, sizeof(Worker));
            worker->workqueue = workqueue;
      
            int ret = pthread_create(&worker->thread, NULL, WorkerThread, (void *)worker);
            if (ret)
            {
                perror("pthread_create");
                free(worker);
                return 1;
            }
      
            LL_ADD(worker, worker->workqueue->workers);
        }
      
        return 0;
    }
      
    // 终止线程池
    void ThreadPoolShutdown(ThreadPool *workqueue)
    {
        for (Worker *worker = workqueue->workers; worker != NULL; worker = worker->next)
        {
            worker->terminate = 1;
        }
      
        pthread_mutex_lock(&workqueue->jobs_mtx);
      
        workqueue->workers = NULL;
        workqueue->waiting_jobs = NULL;
      
        pthread_cond_broadcast(&workqueue->jobs_cond);
      
        pthread_mutex_unlock(&workqueue->jobs_mtx);
    }
      
    // 添加任务
    void ThreadPoolQueue(ThreadPool *workqueue, Job *job)
    {
        pthread_mutex_lock(&workqueue->jobs_mtx);
      
        LL_ADD(job, workqueue->waiting_jobs);
      
        pthread_cond_signal(&workqueue->jobs_cond);
        pthread_mutex_unlock(&workqueue->jobs_mtx);
    }
    

2.CAS和无锁队列

2.1 CAS

  • 比较并交换(compare and swap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某⼀数据时由于执行顺序不确定性以及中断的不可预知性产⽣的数据不一致问题有了CAS,我们就可以用它来实现各种无锁(lock free)的数据结构。

  • 该操作通过将内存中的值与指定数据进行比较,当数值⼀样时将内存中的数据替换为新的值。

    int compare_and_swap(int *reg, int oldval, int newval)
    {
        int old_ref_val = *reg;
        if(old_reg_val == oldval)//compare
            *reg = newval;//swap
        return old_reg_val;
    }
    
  • gcc/g++中的CAS

    bool __sync_bool_compare_and_swap(type *ptr, type oldval type newval, ...);
    type __sync_val_compare_and_swap(type *ptr, type oldval type newval, ...);
    
  • Windows的CAS

    InterlockedCompareExchange(__inout LONG volatile  *Target, __in LONG Exchange, __in LONG Comperand);
    
  • C++11标准库的CAS

    template< class T >
    bool atomic_compare_exchange_weak(std::atomic<T>* obj, T* expected, T desired);
    template< class T >
    bool atomic_compare_exchange_weak(volatile std::atomic<T>* obj, T* expected, T desired );
    

2.2 无锁队列

  • 无锁队列代码

    /*
     * @Author: gongluck 
     * @Date: 2020-11-16 16:02:56 
     * @Last Modified by:   gongluck 
     * @Last Modified time: 2020-11-16 16:02:56 
     */
      
    template <typename ElemType>
    class Queue
    {
    public:
        Queue();
        ~Queue();
      
    public:
        void push(ElemType elem);
        bool pop();
        void show();
      
    private:
        struct _qNode
        {
            _qNode() : _next(nullptr) {}
            _qNode(ElemType elem) : _elem(elem), _next(nullptr) {}
            ElemType _elem;
            struct _qNode *_next;
        };
      
    private:
        struct _qNode *_head;
        struct _qNode *_tail;
    };
      
    template <typename ElemType>
    Queue<ElemType>::Queue()
    {
        _head = _tail = new _qNode();
    }
      
    template <typename ElemType>
    Queue<ElemType>::~Queue()
    {
        while (_head != nullptr)
        {
            struct _qNode *tempNode = _head;
            _head = _head->_next;
            delete tempNode;
        }
    }
      
    template <typename ElemType>
    void Queue<ElemType>::push(ElemType elem)
    {
        struct _qNode *newNode = new struct _qNode(elem);
        struct _qNode *oldp = _tail;
        while (!__sync_bool_compare_and_swap(&_tail->_next, nullptr, newNode))
            ;
        __sync_bool_compare_and_swap(&_tail, oldp, newNode);
    }
      
    template <typename ElemType>
    bool Queue<ElemType>::pop()
    {
        struct _qNode *p;
        do
        {
            p = _head;
            if (p->_next == nullptr)
                return false;
        } while (!__sync_bool_compare_and_swap(&_head, p, p->_next));
        delete p;
        return true;
    }
    

3.内存池

3.1 Nginx内存池结构

  • Nginx内存池结构

    Nginx内存池结构

  • nginx对内存的管理分为大内存与小内存,当某一个申请的内存大于某一个值时,就需要从大内存中分配空间,否则从小内存中分配空间。

  • nginx中的内存池是在创建的时候就设定好了大小,在以后分配小块内存的时候,如果内存不够,则是重新创建一块内存串到内存池中,而不是将原有的内存池进行扩张。当要分配大块内存是,则是在内存池外面再分配空间进行管理的,称为大块内存池。

  • Nginx内存池中大内存块和小内存块的分配与释放是不一样的。使用内存池时,可以使用ngx_palloc进行分配,使用ngx_pfree释放。对于大内存,这样做是没有问题的,而对于小内存就不一样了,分配的小内存,不会进行释放。

3.2 Nginx内存池的实现

ngx_pool_t *
ngx_create_pool(size_t size, ngx_log_t *log)
{
    ngx_pool_t  *p;

    p = ngx_memalign(NGX_POOL_ALIGNMENT, size, log);
    if (p == NULL) {
        return NULL;
    }

    p->d.last = (u_char *) p + sizeof(ngx_pool_t);//初始状态:last指向ngx_pool_t结构体之后数据取起始位置
    p->d.end = (u_char *) p + size;//end指向分配的整个size大小的内存的末尾
    p->d.next = NULL;
    p->d.failed = 0;

    size = size - sizeof(ngx_pool_t);
    p->max = (size < NGX_MAX_ALLOC_FROM_POOL) ? size : NGX_MAX_ALLOC_FROM_POOL;

    p->current = p;
    p->chain = NULL;
    p->large = NULL;
    p->cleanup = NULL;
    p->log = log;

    return p;
}


void
ngx_destroy_pool(ngx_pool_t *pool)
{
    ngx_pool_t          *p, *n;
    ngx_pool_large_t    *l;
    ngx_pool_cleanup_t  *c;

    //首先调用所有的数据清理函数
    for (c = pool->cleanup; c; c = c->next) {
        if (c->handler) {
            ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
                           "run cleanup: %p", c);
            c->handler(c->data);
        }
    }

#if (NGX_DEBUG)

    /*
     * we could allocate the pool->log from this pool
     * so we cannot use this log while free()ing the pool
     */

    for (l = pool->large; l; l = l->next) {
        ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0, "free: %p", l->alloc);
    }

    for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next) {
        ngx_log_debug2(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
                       "free: %p, unused: %uz", p, p->d.end - p->d.last);

        if (n == NULL) {
            break;
        }
    }

#endif

    //释放所有的大块内存
    for (l = pool->large; l; l = l->next) {
        if (l->alloc) {
            ngx_free(l->alloc);
        }
    }

    //最后释放所有内存池中的内存块
    for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next) {
        ngx_free(p);

        if (n == NULL) {
            break;
        }
    }
}


void
ngx_reset_pool(ngx_pool_t *pool)
{
    ngx_pool_t        *p;
    ngx_pool_large_t  *l;

    //释放所有大块内存
    for (l = pool->large; l; l = l->next) {
        if (l->alloc) {
            ngx_free(l->alloc);
        }
    }

    //重置所有小块内存区
    for (p = pool; p; p = p->d.next) {
        p->d.last = (u_char *) p + sizeof(ngx_pool_t);
        p->d.failed = 0;
    }

    pool->current = pool;
    pool->chain = NULL;
    pool->large = NULL;
}


void *
ngx_palloc(ngx_pool_t *pool, size_t size)
{
#if !(NGX_DEBUG_PALLOC)
    if (size <= pool->max) {
        return ngx_palloc_small(pool, size, 1);
    }
#endif

    return ngx_palloc_large(pool, size);
}


void *
ngx_pnalloc(ngx_pool_t *pool, size_t size)
{
#if !(NGX_DEBUG_PALLOC)
    if (size <= pool->max) {
        return ngx_palloc_small(pool, size, 0);
    }
#endif

    return ngx_palloc_large(pool, size);
}


static ngx_inline void *
ngx_palloc_small(ngx_pool_t *pool, size_t size, ngx_uint_t align)
{
    u_char      *m;
    ngx_pool_t  *p;

    p = pool->current;

    do {
        m = p->d.last;

        if (align) {
            m = ngx_align_ptr(m, NGX_ALIGNMENT);
        }

        if ((size_t) (p->d.end - m) >= size) {//如果在当前内存块有效范围内,进行内存指针的移动
            p->d.last = m + size;

            return m;
        }

        p = p->d.next;//如果当前内存块有效容量不够分配,则移动到下一个内存块进行分配

    } while (p);

    return ngx_palloc_block(pool, size);
}


static void *
ngx_palloc_block(ngx_pool_t *pool, size_t size)
{
    u_char      *m;
    size_t       psize;
    ngx_pool_t  *p, *new;

    psize = (size_t) (pool->d.end - (u_char *) pool);//计算内存池第一个内存块的大小

    m = ngx_memalign(NGX_POOL_ALIGNMENT, psize, pool->log);//分配和第一个内存块同样大小的内存块
    if (m == NULL) {
        return NULL;
    }

    new = (ngx_pool_t *) m;

    new->d.end = m + psize;//设置新内存块的end
    new->d.next = NULL;
    new->d.failed = 0;

    m += sizeof(ngx_pool_data_t);//将指针m移动到d后面的一个位置,作为起始位置
    m = ngx_align_ptr(m, NGX_ALIGNMENT);
    new->d.last = m + size;//设置新内存块的last,即申请使用size大小的内存

    //这里的循环用来找最后一个链表节点,这里failed用来控制循环的长度,如果分配失败次数达到5次,就忽略,不需要每次都从头找起
    for (p = pool->current; p->d.next; p = p->d.next) {
        if (p->d.failed++ > 4) {
            pool->current = p->d.next;
        }
    }

    p->d.next = new;

    return m;
}


static void *
ngx_palloc_large(ngx_pool_t *pool, size_t size)
{
    void              *p;
    ngx_uint_t         n;
    ngx_pool_large_t  *large;

    //直接在系统堆中分配一块空间
    p = ngx_alloc(size, pool->log);
    if (p == NULL) {
        return NULL;
    }

    n = 0;
    //查找到一个空的large区,如果有,则将刚才分配的空间交由它管理
    for (large = pool->large; large; large = large->next) {
        if (large->alloc == NULL) {
            large->alloc = p;
            return p;
        }

        if (n++ > 3) {
            break;
        }
    }

    //为了提高效率, 如果在三次内没有找到空的large结构体,则创建一个
    large = ngx_palloc_small(pool, sizeof(ngx_pool_large_t), 1);
    if (large == NULL) {
        ngx_free(p);
        return NULL;
    }

    large->alloc = p;
    large->next = pool->large;
    pool->large = large;

    return p;
}


void *
ngx_pmemalign(ngx_pool_t *pool, size_t size, size_t alignment)
{
    void              *p;
    ngx_pool_large_t  *large;

    p = ngx_memalign(alignment, size, pool->log);
    if (p == NULL) {
        return NULL;
    }

    large = ngx_palloc_small(pool, sizeof(ngx_pool_large_t), 1);
    if (large == NULL) {
        ngx_free(p);
        return NULL;
    }

    large->alloc = p;
    large->next = pool->large;
    pool->large = large;

    return p;
}


ngx_int_t
ngx_pfree(ngx_pool_t *pool, void *p)
{
    ngx_pool_large_t  *l;

    //只检查是否是大内存块,如果是大内存块则释放
    for (l = pool->large; l; l = l->next) {
        if (p == l->alloc) {
            ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
                           "free: %p", l->alloc);
            ngx_free(l->alloc);
            l->alloc = NULL;

            return NGX_OK;
        }
    }

    return NGX_DECLINED;
}

Last modified on 2020-11-30