web 项目–线程池的设计和实现
线程池原理
- 空间换时间,浪费服务器的硬件资源,换取运行效率.
- 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源.
- 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配.
- 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源.
线程池分析
线程池的设计模式为半同步/半反应堆,其中反应堆具体为 Proactor 事件处理模式。
- 主线程为异步线程,==负责监听文件描述符,接收 socket 新连接==,若当前监听的 socket 发生了读写事件,然后将任务插入到请求队列。
- 工作线程==从请求队列中取出任务,完成读写数据的处理==。
线程池类定义
具体定义可以看代码。需要注意,线程处理函数和运行函数设置为私有属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| 1template<typename T> 2class threadpool{ 3 public: 4 5 6 7 threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000); 8 ~threadpool(); 9 10 11 bool append(T* request); 12 13 private: 14 15 16 static void *worker(void *arg); 17 18 void run(); 19 20 private: 21 22 int m_thread_number; 23 24 25 int m_max_requests; 26 27 28 pthread_t *m_threads; 29 30 31 std::list<T *>m_workqueue; 32 33 34 locker m_queuelocker; 35 36 37 sem m_queuestat; 38 39 40 bool m_stop; 41 42 43 connection_pool *m_connPool; 44};
|
线程池创建与回收
具体的,类对象传递时用 this 指针,传递给静态函数后,将其转换为线程池类,并调用私有成员函数 run。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 1template<typename T> 2threadpool<T>::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL),m_connPool(connPool){ 3 4 if(thread_number<=0||max_requests<=0) 5 throw std::exception(); 6 7 8 m_threads=new pthread_t[m_thread_number]; 9 if(!m_threads) 10 throw std::exception(); 11 for(int i=0;i<thread_number;++i) 12 { 13 14 if(pthread_create(m_threads+i,NULL,worker,this)!=0){ 15 delete [] m_threads; 16 throw std::exception(); 17 } 18 19 20 if(pthread_detach(m_threads[i])){ 21 delete[] m_threads; 22 throw std::exception(); 23 } 24 } 25}
|
向请求队列中添加任务
通过 list 容器创建请求队列,向队列中添加时,通过互斥锁保证线程安全,添加完成后通过信号量提醒有任务要处理,最后注意线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 1template<typename T> 2bool threadpool<T>::append(T* request) 3{ 4 m_queuelocker.lock(); 5 6 7 if(m_workqueue.size()>m_max_requests) 8 { 9 m_queuelocker.unlock(); 10 return false; 11 } 12 13 14 m_workqueue.push_back(request); 15 m_queuelocker.unlock(); 16 17 18 m_queuestat.post(); 19 return true; 20}
|
线程处理函数
内部访问私有成员函数 run,完成线程处理要求。
1 2 3 4 5 6 7 8
| 1template<typename T> 2void* threadpool<T>::worker(void* arg){ 3 4 5 threadpool* pool=(threadpool*)arg; 6 pool->run(); 7 return pool; 8}
|
run 执行任务
主要实现,工作线程从请求队列中取出某个任务进行处理,注意线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| 1template<typename T> 2void threadpool<T>::run() 3{ 4 while(!m_stop) 5 { 6 7 m_queuestat.wait(); 8 9 10 m_queuelocker.lock(); 11 if(m_workqueue.empty()) 12 { 13 m_queuelocker.unlock(); 14 continue; 15 } 16 17 18 19 T* request=m_workqueue.front(); 20 m_workqueue.pop_front(); 21 m_queuelocker.unlock(); 22 if(!request) 23 continue; 24 25 26 request->mysql = m_connPool->GetConnection(); 27 28 29 request->process(); 30 31 32 m_connPool->ReleaseConnection(request->mysql); 33 } 34}
|