一个逗逗 2020-01-01
在新的 C++11 标准中,引入并发编程的一些基础组件:线程(thread)、互斥锁(mutex)、条件变量(condition_variable) 等,凭借这些,就足够我设计一个平台无关的 线程池 组件了。下面就详细介绍一下这个线程池组件。
需要特别说明的是,这个线程池组件,在增加了“存在关联性的任务对象顺序执行”的功能后,原本的任务队列就分成了两级任务队列,目的是为了降低 “任务提交” 与 “任务提取” 之间(属于一种生产/消费的关系)的锁竞争。
源码有点多,这里就不贴出来了,直接给下载地址:https://github.com/Gaaagaa/xthreadpool 。主要的线程池类 x_threadpool_t 在 xthreadpool.h 中已完整实现,在实际项目应用中,只需要 xthreadpool.h 这一个文件就足够了。
测试程序的编译命令:
- MSVC++2017:cl /EHsc main.cpp
- gcc :g++ -Wall -std=c++11 -lpthread -o main main.cpp
技术特点:
- 使用 C++11 的 thread 实现,可跨平台,亲测的编译器有 MSVC++2017、gcc 4.8.5、gcc 8.2.0;
- 支持传统的面向对象编程的任务对象类接口:继承抽象任务对象接口类,实现多态(可结合对象池的模式进行资源复用);
- 支持泛型接口的任务对象,如:C 函数接口、lambda 表达式、仿函数对象、类对象的成员函数调用;
- 支持动态调整工作线程的数量:通过 resize() 接口实现,该功能当前的实现方式还不够好,仍有优化的方式;
- 支持运行时的线程状态检测(判断当前工作线程是否需要退出);
- 存在关联性的任务对象可顺序执行(这一特点只针对一些特别的应用场景,后续示例中会展示)。
#include "xthreadpool.h" #include <chrono> #include <stdio.h> int main(int argc, char * argv[]) { // 工作线程数量 // 若为 0,将取 hardware_concurrency() 返回值的 2倍 + 1 int nthreads = 4; // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(nthreads)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 // ...... //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
#include "xthreadpool.h" #include <chrono> #include <stdio.h> void func_task(int task_id, int task_iter) { int count = 1; do { printf("func_task[%d, %d] => count: %d\n", task_id, task_iter, count); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while (count++ < 10); } int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 : C 函数接口的任务 for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex(func_task, iter, iter * iter); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
#include "xthreadpool.h" #include <chrono> #include <stdio.h> /** * @struct functor_task_A * @brief 仿函数模式的任务对象类。 */ struct functor_task_A { // constructor/destructor public: functor_task_A(int xtask_id = 0) : m_xtask_id(xtask_id) { } public: void operator()() const { int count = 1; do { printf("functor_task_A[%d] => count: %d\n", m_xtask_id, count); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while (count++ < 10); } // data members private: int m_xtask_id; }; /** * @struct functor_task_B * @brief 仿函数模式的任务对象类。 */ struct functor_task_B { // constructor/destructor public: functor_task_B(int xtask_id = 0) : m_xtask_id(xtask_id) { } public: void operator()(int flag) const { int count = 1; do { printf("functor_task_B[%d, %d] => count: %d\n", m_xtask_id, flag, count); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while (count++ < 10); } // data members private: int m_xtask_id; }; int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 : 仿函数对象 for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex((functor_task_A(iter))); } for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex((functor_task_B(iter)), iter * iter / 2); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
#include "xthreadpool.h" #include <chrono> #include <stdio.h> /** * @class memfunc_task * @brief 调用成员函数的任务对象类。 */ class memfunc_task { // constructor/destructor public: memfunc_task(int xtask_id = 0) : m_xtask_id(xtask_id) { } // overrides public: /**********************************************************/ /** * @brief 任务对象执行流程的操作接口。 */ void memfunc(int task_iter) { int count = 1; do { printf("memfunc_task[%d, %d] => count: %d\n", m_xtask_id, task_iter, count); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while (count++ < 10); } // data members private: int m_xtask_id; }; int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 // 注意,这个栈区对象的生命期需要在线程池关闭前存活, // 至少要保证所提交的任务都执行完成时,才可结束该对象的生命期 memfunc_task mftask(0); for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex(&memfunc_task::memfunc, &mftask, iter); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
#include "xthreadpool.h" #include <chrono> #include <stdio.h> /** * @class user_task * @brief 用户自定义的任务对象类。 */ class user_task : public x_task_t { // constructor/destructor public: user_task(int xtask_id = 0) : m_xtask_id(xtask_id) { } // overrides public: /**********************************************************/ /** * @brief 任务对象执行流程的操作接口。 */ virtual void run(x_running_checker_t * xchecker_ptr) override { int count = 1; do { printf("[%d]user_task[%d] => count: %d\n", (int)xchecker_ptr->thread_index(), m_xtask_id, count); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while ((count++ < 10) && xchecker_ptr->is_enable_running()); } // data members private: int m_xtask_id; }; int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 : 重载的任务对象 for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task((x_task_ptr_t)(new user_task(iter))); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
#include "xthreadpool.h" #include <chrono> #include <stdio.h> int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 : lambda 表达式 for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex( [iter]() -> void { int task_id = iter; for (int jter = 0; jter < 10; ++jter) { printf("lambda A task id : %d -> %d\n", task_id, task_id + jter); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); } for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex( [iter](const std::string & str_name) -> void { int task_id = iter; for (int jter = 0; jter < 10; ++jter) { printf("lambda B task id : %d -> %d name : %s\n", task_id, task_id + jter, str_name.c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }, std::string("Lambda B")); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
x_threadpool_t 内部提供了一个类 x_running_checker_t 可检测当前工作线程是否可继续执行下去(或者说,是否要立即终止执行的任务流程)。提供此功能,主要目的是针对那些耗时长的任务对象(处于运行状态),在线程池关闭或者需要动态调整(减少)工作线程数量时,能够优雅地终止任务流程。参看如下代码是如何使用的:
#include "xthreadpool.h" #include <chrono> #include <stdio.h> int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0)) { printf("startup return false!\n"); return -1; } //====================================== // 提交任务对象 : lambda 表达式(检测工作线程的退出标识) for (int iter = 0; iter < 100; iter += 10) { xht_pool.submit_task_ex( [iter](x_running_checker_t * xchecker_ptr) -> void { int task_id = iter; for (int jter = 0; (jter < 10) && xchecker_ptr->is_enable_running(); ++jter) { printf("rchecker[%d] lambda A task id : %d -> %d\n", (int)xchecker_ptr->thread_index(), task_id, task_id + jter); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }, x_running_checker_t::xholder()); } //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
单看 “按提交次序而顺序执行” 这句话,可能不好理解,先看后面提到的这样应用场景。
A 对象产生(需要执行)的任务对象序列为[ A1, A2, ..., Am ],并按此顺序依次提交到线程池中;与此同时,B 对象产生(需要执行)的任务对象序列为[ B1, B2, ..., Bn ],也按此顺序依次提交到线程池中。而不管是 A 对象还是 B 对象,都期望按所提交顺序执行任务对象。这种情况下,在线程池中的任务队列可能有如下图所示的排列:
从本质上看,我们只需要保证 A(或 B) 对象产生的任务在同一时刻只能有一个工作线程执行,这就能保证其顺序性。
面对这种应用场景,我在设计任务对象的抽象基类 x_task_t 时,就为此做好了相关的扩展接口。先看下 x_task_t 代码中的两个接口:
struct x_task_t { ...... // extensible interfaces public: ...... /**********************************************************/ /** * @brief 判断 任务对象 是否挂起。 * @note 若任务对象处于挂起状态,工作线程提取任务时,则跳过该对象。 */ virtual bool is_suspend(void) const { return false; } /**********************************************************/ /** * @brief 设置任务对象的运行标识。 * * @note * <pre> * 工作线程在提取到任务对象后,则立即调用 set_running_flag(true) 操作; * 执行 run() 操作返回后,又调用 set_running_flag(false) 操作。 * </pre> */ virtual void set_running_flag(bool xrunning_flag) { } ...... };
再看下 x_threadpool_t 工作线程 “提取任务对象” 以及 “执行任务对象” 的部分实现流程(注意任务对象 is_suspend() 和 set_running_flag() 两个接口被调用的地方):
class x_threadpool_t { ...... x_task_ptr_t get_task(void) { ...... for (std::list< x_task_ptr_t >::iterator itlst = m_lst_run_tasks.begin(); (itlst != m_lst_run_tasks.end()) && is_enable_get_task(); ++itlst) { if ((nullptr == *itlst) || !(*itlst)->is_suspend()) { xtask_ptr = *itlst; m_lst_run_tasks.erase(itlst); m_xst_lst_tasks.fetch_sub(1); break; } } if (nullptr != xtask_ptr) { xtask_ptr->set_running_flag(true); } return xtask_ptr; } ...... void thread_run(size_t xthread_index) { ...... xtask_ptr = get_task(); if (nullptr == xtask_ptr) { if (get_lst_task_size() > 0) thread_yield(xcounter); continue; } if (xht_checker.is_enable_running()) { xtask_ptr->run(&xht_checker); } // 执行完任务对象后,将任务对象转换为 非挂起状态, // 加锁进行操作,是为了与 get_task() 内的操作保持队列的同步 { // 标识当前不可提取待执行的任务对象,迫使 get_task() 内部迅速解锁 m_xst_get_task.fetch_add(1); m_lock_run_task.lock(); xtask_ptr->set_running_flag(false); m_lock_run_task.unlock(); m_xst_get_task.fetch_sub(1); } ...... } ...... };
我们可通过重载 x_task_t 所提供的两个扩展接口:is_suspend() 和 set_running_flag() 进行状态切换(即在调用 set_running_flag() 时让相关任务切换 is_suspend() 挂起/非挂起 两种状态),就可实现顺序执行。
示例代码如下所示:
#include "xthreadpool.h" #include <chrono> #include <string> #include <stdio.h> #include <stdlib.h> #include <time.h> //////////////////////////////////////////////////////////////////////////////// class x_order_task_t; /** * @class objectT * @brief 产生 x_order_task_t 任务对象序列的测试类。 */ class objectT { // constructor/destructor public: objectT(const std::string & xstr_name) : m_xrunning_flag(false) , m_seqno_taskid(0) , m_xstr_name(xstr_name) { } // public interfaces public: inline void set_running(bool xrunning_flag) { m_xrunning_flag = xrunning_flag; } inline bool is_running(void) const { return m_xrunning_flag; } inline const std::string & name(void) const { return m_xstr_name; } x_order_task_t * new_task(void); // data members private: bool m_xrunning_flag; int m_seqno_taskid; std::string m_xstr_name; }; /** * @class x_order_task_t * @brief 保证顺序执行的测试任务对象。 */ class x_order_task_t : public x_threadpool_t::x_task_t { // constructor/destructor public: x_order_task_t(objectT * obj_owner, int taskid) : m_obj_owner(obj_owner) , m_taskid(taskid) { } virtual ~x_order_task_t(void) { } // overrides public: virtual void run(x_running_checker_t * xchecker_ptr) override { printf("%s:%02d\n", m_obj_owner->name().c_str(), m_taskid); } virtual bool is_suspend(void) const override { return m_obj_owner->is_running(); } virtual void set_running_flag(bool xrunning_flag) override { m_obj_owner->set_running(xrunning_flag); } // data members private: objectT * m_obj_owner; int m_taskid; }; x_order_task_t * objectT::new_task(void) { return (new x_order_task_t(this, m_seqno_taskid++)); } //==================================================================== int main(int argc, char * argv[]) { // 线程池对象 x_threadpool_t xht_pool; // 启动线程池 if (!xht_pool.startup(0, true)) { printf("startup return false!\n"); return -1; } //====================================== // 便于测试而增加的代码 // 提交任务对象 : 使线程池的所有工作线程处于暂停状态 bool is_pause_pool = true; for (int iter = 0, nthds = (int)xht_pool.size(); iter < nthds + 1; ++iter) { xht_pool.submit_task_ex( [&is_pause_pool]() -> void { while (is_pause_pool) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }); } //====================================== // 提交任务对象 : 按提交顺序执行任务对象 srand(time(NULL)); objectT objA("objA"); objectT objB("objB"); for (int iter = 0; iter < 100; ++iter) { if (0 == (rand() % 2)) { xht_pool.submit_task(objA.new_task()); } else { xht_pool.submit_task(objB.new_task()); } } // 使线程池开始工作(便于测试而增加的代码) is_pause_pool = false; //====================================== // 等待所有任务执行完成 while (xht_pool.task_count() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 关闭线程池 xht_pool.shutdown(); return 0; }
看一下测试的输出结果:
objA:00 objB:00 objB:01 objB:02 objB:03 objB:04 objA:01 objA:02 objA:03 objB:05 objA:04 objB:06 objB:07 objB:08 objB:09 objA:05 objA:06 objB:10 objB:11 objB:12 objA:07 objA:08 objA:09 objA:10 objB:13 objA:11 objA:12 objB:14 objB:15 objB:16 objB:17 objB:18 objB:19 objA:13 objA:14 objA:15 objA:16 objA:17 objA:18 objA:19 objB:20 objA:20 objA:21 objA:22 objB:21 objB:22 objB:23 objA:23 objB:24 objA:24 objB:25 objB:26 objB:27 objB:28 objB:29 objA:25 objB:30 objA:26 objA:27 objB:31 objB:32 objB:33 objA:28 objA:29 objA:30 objB:34 objA:31 objA:32 objB:35 objA:33 objA:34 objB:36 objA:35 objA:36 objB:37 objA:37 objA:38 objA:39 objA:40 objB:38 objB:39 objB:40 objA:41 objA:42 objA:43 objA:44 objA:45 objB:41 objA:46 objA:47 objA:48 objB:42 objA:49 objB:43 objA:50 objA:51 objA:52 objA:53 objA:54 objB:44
经检测,这已经达到我们所期望的结果。