C++

C++ thread pool

jerichou
2022-12-02 / 0 评论 / 17 阅读 / 正在检测是否收录...

学习teamtalk服务端源码,记录下线程池的实现。

主要是实现的类分为任务类(抽象类)、线程池类、工作线程类、线程同步类。

lb5xtgkh.png

Task抽象任务类 虚基类为了能够处理不同任务

#ifndef __TASK_H__
#define __TASK_H__
 
class CTask {
public:
    CTask(){}
    virtual ~CTask(){}
    
    virtual void run() = 0;
private:
};
 
#endif /*defined(__TASK_H__) */

线程池类threadPool

init初始化并启动工作线程

int CThreadPool::Init(uint32_t worker_size)
{
    m_worker_size = worker_size;
    m_worker_list = new CWorkerThread [m_worker_size];
    if (!m_worker_list) {
        return 1;
    }
 
    for (uint32_t i = 0; i < m_worker_size; i++) {
        m_worker_list[i].SetThreadIdx(i);
        m_worker_list[i].Start();
    }
 
    return 0;
}

工作线程创建线程,遍历任务队列并处理。这里遍历任务队列的方式和遍历redis空闲连接队列一样,用一个锁和信号量优雅地等待任务。

void* CWorkerThread::StartRoutine(void* arg)
{
    CWorkerThread* pThread = (CWorkerThread*)arg;
 
    pThread->Execute();
 
    return NULL;
}
 
void CWorkerThread::Start()
{
    (void)pthread_create(&m_thread_id, NULL, StartRoutine, this);
}
 
void CWorkerThread::Execute()
{
    while (true) {
        m_thread_notify.Lock();
 
        // put wait in while cause there can be spurious wake up (due to signal/ENITR)
        while (m_task_list.empty()) {
            m_thread_notify.Wait();
        }
 
        CTask* pTask = m_task_list.front();
        m_task_list.pop_front();
        m_thread_notify.Unlock();
 
        pTask->run();
 
        delete pTask;
 
        m_task_cnt++;
        //log("%d have the execute %d task\n", m_thread_idx, m_task_cnt);
    }
}

初始化工作做完说一下add task,通过线程池对象添加任务,根据线程在线程数组中的索引随机添加任务到一个随机线程中,调用工作线程的pushtask function,加锁并唤醒等待任务而阻塞的线程解锁。

void CThreadPool::AddTask(CTask* pTask)
{
    /*
     * select a random thread to push task
     * we can also select a thread that has less task to do
     * but that will scan the whole thread list and use thread lock to get each task size
     */
    uint32_t thread_idx = random() % m_worker_size;
    m_worker_list[thread_idx].PushTask(pTask);
}

void CWorkerThread::PushTask(CTask* pTask)
{
    m_thread_notify.Lock();
    m_task_list.push_back(pTask);
    m_thread_notify.Signal();
    m_thread_notify.Unlock();
}

线程同步类通过封装pthread的互斥量和信号量实现
构造函数负责初始化互斥量和信号量,析构函数负责destroy操作

CThreadNotify::CThreadNotify()
{
    pthread_mutexattr_init(&m_mutexattr);
    pthread_mutexattr_settype(&m_mutexattr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&m_mutex, &m_mutexattr);
    
    pthread_cond_init(&m_cond, NULL);
}
 
CThreadNotify::~CThreadNotify()
{
    pthread_mutexattr_destroy(&m_mutexattr);
    pthread_mutex_destroy(&m_mutex);
    
    pthread_cond_destroy(&m_cond);
}

实现线程同步lock、unlock、wait、signal

void Lock() { pthread_mutex_lock(&m_mutex); }
void Unlock() { pthread_mutex_unlock(&m_mutex); }
void Wait() { pthread_cond_wait(&m_cond, &m_mutex); }
void Signal() { pthread_cond_signal(&m_cond); }

在dbproxy服务中用到了线程池,proxy连接类处理proxy任务时如下:

static CThreadPool g_thread_pool;
int init_proxy_conn(uint32_t thread_num)
{
    //省略其余代码
    g_thread_pool.Init(thread_num);
}
 
void CProxyConn::HandlePduBuf(uchar_t* pdu_buf, uint32_t pdu_len)
{
    //省略其余代码
    CTask* pTask = new CProxyTask(m_uuid, handler, pPdu);
    g_thread_pool.AddTask(pTask);
 
}

proxytask继承了task抽象类,并重写了run方法,使用task抽象类可以处理任何任务,代码质量很高。

0

评论 (0)

取消