0.为什么需要线程池?
当我们需要完成一些持续时间短、发生频率高的工作时,每次为他们开启一个线程既显得繁琐又会造成不必要的开销,所以为这一类工作写一个简单的线程池就很有必要了。
1.如何设计我们的线程池?
考虑这样一个应用场景,我们有一个设计软件,当我们需要下载一些模板用作接下来的设计工作。我们显然不能将下载工作放在主线程去完成,如果你的用户网络速度飞快还好说,一旦他的网络稍有波动,你的设计软件将会停止GUI界面的渲染,卡在那边不懂,这显然不是好的体验。正确的做法是另外开启一个线程去下载,一旦这个下载工作完成后,我们可以通知主线程去做一个提示,例如弹出一个对话框告诉用户模板已经可以用了。
如果这个设计软件需要频繁的做下载工作,那么每次单开一个线程就比较麻烦。我们可以将下载任务打包好,一个一个的推送到队列中,由线程池自动从队列中获取任务,完成下载工作。当队列为空时,线程池里面的线程就进入等待状态,这也不会造成资源的占用。
2.先完成队列的设计
STL中提供了一个关于队列的实现std::queue
,但是std::queue
并不是线程安全的,我们需要在此基础上进行一些包装,让它在多线程的情况下也可以使用。
我们的队列需要拥有这几个功能:
1.从队列里弹出一个任务
2.向队列里推送任务
3.了解队列里的任务还有多少个
4.清除任务队列
下面是一个简单的线程安全队列:
namespace multi_thread {
template<typename Task>
class thread_safe_queue {
public:
thread_safe_queue() :done_(false) {
}
void push(const Task& t) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(t);
ready_.notify_one();
}
void wait_and_pop(Task& t) {
std::unique_lock<std::mutex> lock(mtx_);
if (queue_.empty() && !done_)
ready_.wait(lock);
if (done_)
return;
t = queue_.front();
queue_.pop();
}
bool empty()const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
void clear() {
std::lock_guard<std::mutex> lock(mtx_);
for (int i = 0; i < queue_.size(); ++i)
queue_.pop();
}
void done() {
done_ = true;
ready_.notify_all();
}
private:
std::queue<Task> queue_;
mutable std::mutex mtx_;
std::condition_variable ready_;
std::atomic_bool done_;
};
}
简单讲解一下:
我们的线程安全队列thread_safe_queue
遵循了STL的命名规范,对于任意容器,他们的判断为空的函数签名都是bool empty()const;
,清除容器中的内容的都为void clear()
。对于推入任务的函数,我们跟std::queue
保持了一致,而弹出函数的命名我们做了一些改变,与函数的操作更加切合,表示对std::queue::pop()
的一种扩展。
这里我们采用了std::mutex
作为我们的互斥量,每次访问队列中的内容时,都用std::lock_guard
或者std::unique_lock
加锁,保证在同一时段只有一个线程可以读写队列。std::lock_guard
是一个方便的RAII的体现,他会在析构时自动调用unlock()
,std::unique_lock
与之的区别就是,你可以显式调用unlock()
函数完成解锁,它更加的灵活,当然你可以用{}
和std::lock_guard
完成相同的工作。我们将std::mutex_
设计成mutable
,是因为我们需要在const member functionempty()
中使用它。
在push
函数中,每次完成push后就调用std::conditional_variable::notify_one()
来唤醒一个处于等待状态的线程,通知他队列中已经有任务了,可以开始弹出了。在对应了wait_and_pop
函数中,同样有一个std::conditional_variable::wait()
,这个函数可以传入一个或者两个参数,第一个参数表示要释放的锁,第二个函数是一个predicate,不接受参数,返回值为bool
。他会在调用wait
时首先调用这个predicate,如果值是true
,那么就会重新获得锁并继续向下执行,如果值是false
,他会释放锁并且进入等待状态,直到接收到前面所说的唤醒通知,他才会重新检查predicate并做前述操作。这里我们没有使用predicate,仅使用了简单的版本,他会在接收到唤醒通知时直接向下执行。
再来看最后一个变量std::atomic_bool
,它是标准库提供的一个原子类型,在访问时标准保证在同一时刻只有一个线程在读写他。原子类型是不可拷贝且不可移动的,所以我们不能对他进行类内初始化,这会调用copy constructor,正确的方法是使用constructor initializer list进行初始化,当然你也可以在构造函数体内使用赋值运算符给其赋上一个值,但是请务必在这两个中间选择一个。回到代码,这个变量的作用很简单,就是让弹出的函数wait_and_pop
直接结束,我们只有在需要析构这个线程池时才会这么做,具体做法在线程池中细说。
3.完成线程池的设计
在启用线程池时,我们需要指定在线程池中存放的线程的数量,当然我们也可以在后续为线程池添加新的线程。
我们将使用STL提供的容器std::vector
来存放我们的线程,这些线程做的工作都是一致的,他们将在队列中不断地弹出任务并执行它,如果队列中没有任务了,他们就将进入等待状态。
结合上面的队列实现,给出一个简单的线程池实现:
namespace multi_thread {
template<typename Task>
class thread_safe_queue {
public:
thread_safe_queue() :done_(false) {
}
void push(const Task& t) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(t);
ready_.notify_one();
}
void wait_and_pop(Task& t) {
std::unique_lock<std::mutex> lock(mtx_);
if (queue_.empty() && !done_)
ready_.wait(lock);
if (done_)
return;
t = queue_.front();
queue_.pop();
}
bool empty()const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
void clear() {
std::lock_guard<std::mutex> lock(mtx_);
for (int i = 0; i < queue_.size(); ++i)
queue_.pop();
}
void done() {
done_ = true;
ready_.notify_all();
}
private:
std::queue<Task> queue_;
mutable std::mutex mtx_;
std::condition_variable ready_;
std::atomic_bool done_;
};
class thread_pool {
public:
thread_pool(const int threadNum) :done_(false) {
for (int i = 0; i < threadNum; ++i) {
threads_.emplace_back(&thread_pool::workerThread, this);
}
}
~thread_pool() {
done_.store(true);
queue_.done();
for (auto& thread : threads_) {
if (thread.joinable())
thread.join();
}
}
void submit(const std::function<void(void)>& t) {
queue_.push(t);
}
bool isEmpty()const {
return queue_.empty();
}
void clearTask() {
queue_.clear();
}
private:
void workerThread() {
while (true) {
std::function<void(void)> t;
queue_.wait_and_pop(t);
if (done_)
break;
if (t)
t();
}
}
private:
std::vector<std::thread> threads_;
thread_safe_queue<std::function<void(void)>> queue_;
std::atomic_bool done_;
};
}
队列里存放的是std::function<void(void)>
,是一个没有参数和返回值的Callable
。在C++中,常用的Callable包括lambda表达式、function object(也就是operator()
)、std::bind
和std::function
。我个人比较喜欢使用lambda和std::bind
,他们都很简单易用,在使用std::bind
时,如果你的函数是一个member function,别忘了把this
作为第一个参数。
这个线程池非常简单,当你调用构造函数时,传入一个你想要的线程数量,然后构造函数体里就会开启对应数量的线程。std::thread
接受一个Callable和它的参数作为构造函数的参数,构造函数完成后线程立即启动,这里的workerThread
是一个member function,所以,我们还得加上this
作为参数,跟std::bind
一样。
workerThread
函数里面循环地从队列中获取任务并执行,如果没有任务,他就会处于等待状态。可以看到这里同样有一个std::atomic_bool
的变量done_
,它用于使线程结束。我们会在线程池的析构函数里将这个变量设为true
,然后我们调用队列的done
函数,让队列退出。最后我们逐个检查线程池中的线程,如果他们还在运行,等待他们结束。一个std::thread
在析构前必须指定等待该线程结束还是分离该线程,否则会发生错误。如果我们不做这些工作,那么线程池在析构时要么是其中的线程一直处于等待状态,要么是一直处于循环状态,这会导致析构函数停滞不前,也就是程序的卡死。
4.总结
这次实现的线程池只能说是实现了最基础的功能,还缺少一些功能,例如任务返回值的缺失等。本文实现的线程安全队列也十分简单,还有很多可以优化的地方。
网友评论