最近自己写了一个线程池。
总的来说,线程池就是有一个任务队列,一个线程队列,线程队列不断地去取任务队列中的任务来执行,当任务队列中为空时,线程阻塞等待新的任务添加过来。
我是用queue来存放任务,vector存放thread*,然后用condition_variable 来设置线程阻塞和唤醒。
下面直接上代码吧。
线程池类头文件Thread_Pool.h
/******************************************** 线程池头文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #pragma once #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include<thread> #include<queue> #include<mutex> #include<atomic> #include<vector> #include<condition_variable> typedef std::function<void()> Func;//定义线程执行函数类型,方便后面编码使用。 //任务类 class Task { public: Task() {} ~Task() {} int push(Func func);//添加任务; int getTaskNum();//获得当前队列中的任务数; Func pop();//取出待执行的任务; public: std::mutex mx;//锁; private: std::queue<Func> tasks;//任务队列 }; //线程池类 class Thread_Pool { public: Thread_Pool() :IsStart(false) {} ~Thread_Pool(); int addTasks(Func tasks);//添加任务; void start();//开启线程池; void stop();//关闭线程池; void run();//线程工作函数; int getTaskNum();//获得当前队列中的任务数; private: static const int maxThreadNum = 3;//最大线程数为3; std::condition_variable cond;//条件量; std::vector<std::thread*> threads;//线程向量; std::atomic<bool> IsStart;//原子变量,判断线程池是否运行; Task tasks;//任务变量; }; #endif
然后是线程池类成员函数定义文件Thread_Pool.cpp
/******************************************** 线程池CPP文件 Author:十面埋伏但莫慌 Time:2020/05/03 *********************************************/ #include"Thread_Pool.h" #include<iostream> int Task::push(Func func) { std::unique_lock<std::mutex> lock(mx); try { tasks.emplace(func); } catch (std::exception e) { throw e; return -1; } return 0; } int Task::getTaskNum() { return tasks.size(); } Func Task::pop() { std::unique_lock<std::mutex> lock(mx); Func temp; if (tasks.empty()) return temp; else { temp = tasks.front(); tasks.pop(); return temp; } } int Thread_Pool::addTasks(Func func) { int ret = tasks.push(func); cond.notify_one(); return ret; } void Thread_Pool::start() { if (!IsStart) { IsStart = true; for (int i = 0; i < maxThreadNum; i++) { threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this))); } } } void Thread_Pool::run() { while (IsStart) { Func f; if (tasks.getTaskNum() == 0 && IsStart) { std::unique_lock<std::mutex> lock(tasks.mx); cond.wait(lock); } if (tasks.getTaskNum() != 0 && IsStart) { f = tasks.pop(); if(f) f(); } } } int Thread_Pool::getTaskNum() { return tasks.getTaskNum(); } void Thread_Pool::stop() { IsStart = false; cond.notify_all(); for (auto T : threads) { std::cout << "线程 " << T->get_id() << " 已停止。" << std::endl; T->join(); if (T != nullptr) { delete T; T = nullptr; } } std::cout << "所有线程已停止。" << std::endl; } Thread_Pool::~Thread_Pool() { if (IsStart) { stop(); } }
最后是测试用的main.cpp
#include<iostream> #include"Thread_Pool.h" using namespace std; void string_out_one() { cout << "One!" << endl; } void string_out_two() { cout << "Two!" << endl; } void string_out_three() { cout << "Three!" << endl; } int main() { { Thread_Pool Pool; try { Pool.start(); } catch (std::exception e) { throw e; cout << "线程池创建失败。" << endl; } for (int i = 0; i < 50000 ;) { if (Pool.getTaskNum() < 1000) { Pool.addTasks(string_out_one); Pool.addTasks(string_out_two); Pool.addTasks(string_out_three); std::cout << i++ << std::endl; } } getchar(); } getchar(); return 0; }
执行的效果如下:
线程唤醒和阻塞的逻辑就是在线程工作函数run函数中,判断队列是否为空,若为空则设置锁并调用condition变量的wait函数,释放这个线程中的锁并阻塞线程,等待任务队列中新的任务添加进来后,
condition变量通过notify_one()随机唤醒一个在wait的线程,取出队列中的任务执行。
写这个线程池的过程中碰到的最主要需要注意的就是锁的使用,在对队列的写和释放时要注意加锁,在需要阻塞线程时,要注意通过{}设置锁的范围。
IsStart是原子的,所以在写这个变量的时候没有另外加锁。
目前我觉得这个线程池的缺陷就是可执行函数的类型被写死了,有尝试对Task类使用模板类,但是在Thread_Pool中还是要指明Task模板类的类型参数,要是有大神指点下就好了- -。
就先记录这么多,感觉这个线程池的还是有很多可以改进的地方的,也欢迎大家指出不足。