继续阅读《基于物理的渲染:从理论到实践》一书,遇到了多线程的代码,发现自己看不懂它的代码,于是找了本书补充了一下多线程的知识,终于理解了pbrt中的代码,在此把学到的东西整理一下,希望能对读者有所帮助。
提起多线程,总有一种熟悉的陌生感,为什么呢?因为我们可以从很多地方听到线程的概念,比如4核8线程之类的,但是真正要用的话,却总感觉无处下手,我们需要一种可以把概念转换成代码的方法。
多线程的应用非常普遍,现在你几乎找不到单线程的应用了,举个简单的例子,下图是QQ拥有的线程:
Excuse me? 为啥你要这么多线程?Anyway,从这个常用的软件就能看出来多线程地运用有多么普遍。
要使用多线程,我们必须解决下面四个问题:
- 为什么要用多线程?
- 如何进行任务分割?
- 如何共享数据?
- 如何进行线程同步?
解决这些问题的过程,也是我们使用多线程的过程。话不多说,我们开始吧。
为什么要用多线程?
通常,我们用多线程有两个原因:
- 提高性能,加快运行速度(99%的情况是如此)
- 分离关注点
第一个原因不用多说,很容易理解。如果我们有一个计算量很大的任务,我们自然希望把它拆成几个子任务,然后同时进行,充分利用计算机资源,减少总体的运行时间。
那么,什么是分离关注点呢?虽然不太容易理解,但是用的也很普遍。举个例子,一个普通的应用,需要一个UI线程和一个业务线程。UI线程将用户的动作捕捉,然后发给业务线程执行,获得业务线程的执行结果后,再反馈给用户。这里的UI线程起到的就是分离关注点的作用,它负责且仅负责与用户的交互,不负责任何具体的计算工作。
如何进行任务分割?
简单分割
最简单的情况就是没有共享数据,大家各干各的事,干完之后,整个任务也就完成了。由于不涉及数据共享,实现简单分割的方式可以是直接开n个线程,然后把它们各自需要的数据传过去,等着线程都执行完毕,收集结果。其执行的方式如下图所示:
快速排序非常适合多线程模式,其原理如下:
单线程百万个整数排序执行了3秒多的时间,不知道多线程的话能有多快。
任务管线
如果我们的任务是对不同的数据进行相同的一系列操作,那么我们就可以使用任务管线来提高执行效率。
任务管线的方法正如它的名字所“明示”的那样,将任务分成多个阶段,每个阶段使用一个线程去执行,这样任务的一个阶段执行完后,就到下一个阶段继续执行,像是流水线一样,所有的线程都有任务做,直到所有的数据都操作完毕。
管线这种设计并不只有多线程会用,很多地方都用到了这个方法,最常见的就是CPU。CPU会将一个指令分解成多个阶段,最经典的是5个阶段:
获取指令(IF)
解码指令并从寄存器获取操作数(ID/RF)
执行(EX)
读取内存(MEM)
写回寄存器(WB)
执行指令也是每一个阶段都有执行的元件,所有元件可以同时运行,提高了指令执行的效率。事实证明,这是一个非常好的策略,CPU的运行速度也大大提高,甚至有些CPU把指令分成几十个阶段以提高效率。
如何共享数据?
多线程需要考虑如何共享数据是因为线程的调度精度实在是太高了,在一个指令到另一个指令的间隔,有可能就切换成另一个线程运行了,而我们写的每一行代码都会被分解成多个指令执行,举个简单的粒子:
假如说有一个变量i,要把它自增1,我们使用代码++i;就行了。这就够了吗?远远不够。++i在执行过程中会分解成多个指令,在这些指令的间隔,另外一个线程可能就执行了,然后也是获取i的数据,对其进行修改,然后再切换回来,对i进行修改,这样另一个线程的操作完全就被覆盖了。这种情况称为竞争条件(race condition)。参考如下的代码:
#include <iostream>
#include <thread>
int32_t i = 0;
void Add100Times1()
{
for (int32_t j = 0; j < 100000; ++j)
++i;
}
void Add100Times2()
{
for (int32_t j = 0; j < 100000; ++j)
++i;
}
int main()
{
std::thread t1(Add100Times1);
std::thread t2(Add100Times2);
std::cout << "The final i is " << std::endl;
t1.join();
t2.join();
std::cout << i << std::endl;
}
上面的代码的输出结果可能是185524,166968,200000,186661等等。
这问题就非常严重了,如果我在写代码的时候都无法控制我的数据,那运行后的结果怎么可能对?好在,我们有方法可以把数据保护起来,使得当一个线程使用数据的时候,不允许其他的线程使用,这就要用到互斥体(mutex)。
互斥体的使用方式如下所示:
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 1
some_list.push_back(new_value); // 2
}
第1行代码是获得一个互斥体,std::lock_guard会在初始化的时候获得互斥体,在离开作用于的时候自动释放互斥体。这样我们就不用担心会忘了释放而卡死其他线程了。
C++17中可以使用std::scoped_guard来代替std::lock_guard。并且这是其推荐的做法,而std::scope_guard会逐渐废弃。
还有一种使用互斥的方式是用std::unique_lock
。它提供了它提供了lock和unlock操作,也就是说这可以循环利用。在初始化的时候它也会获得互斥体,离开作用于的时候也会自动释放。也就是说,它比std::scoped_guard要灵活很多。
对上面的代码使用互斥体后的效果就不贴出来了,用脚指头想想也能知道结果是200000。
如何进行线程同步?
同步的意思是管理和调度线程。像是管理一个团队一样,我们必须要知道团队中的每个人在做哪些事,做到什么阶段了,需要什么资源等等。当我们有很多线程的时候,我们就必须采用某些方法来知道线程的状态,从而可以控制线程的执行。比如某一个线程需要在另一个线程执行到一定阶段之后才能开始执行,或者某一个线程执行得到某一个结果,然后另一个线程获得这个结果然后继续执行等等。
最简单也是使用地最广泛的方式是条件变量(condition variable),C++标准库(C++ 11)就有提供,std::condition_variable。它的使用方法是:
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;}); // If the condition is not satified, the mutex will be unlocked.
// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";
// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed \n";
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again.(see notify_one for details)
lk.unlock();
cv.notify_one();
}
int main()
{
using namespace std::chrono_literals;
std::thread worker (worker_thread);
data = "Example data";
// send data to the worker thread
std::this_thread::sleep_for(1s);
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one(); // notify a thread to check its condition
// wait for the worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
condition_variable.wait()用来等待直到条件满足,返回true。然后继续执行下去。condition_variable.notify_one()表示激活一个在此条件变量上等待的线程(如果有多个线程,那么无法确定哪一个被激活),激活之后,condition_variable会执行其关联的检测函数,如果检测函数返回true,则获得锁,然后继续往下执行。除此之外,condition_variable还提供一个notify_all函数,表示激活所有在此条件变量上等待的线程,在pbrt的代码中就用到了这个函数。
线程池
这算是对多线程的一个高级应用,严格来说,可以不出现在“基础”之中。不过,pbrt中使用了线程池,所以,把线程池的概念也放到文章中来,以便对代码有更好的理解。
说起来,线程池的概念也非常容易理解。在应用启动的时候,创建n个线程,所有的线程初始化完成后就让它进入等待状态,直到有任务唤醒它为止。
当有任务时,等待的线程被唤醒,执行任务,完成之后继续进入等待状态,直到再次被唤醒。
让线程进入等待状态非常容易做到,一个条件变量就可以了,pbrt中就是这样做的。
pbrt中的并发代码
ParallelInit函数
void ParallelInit() {
CHECK_EQ(threads.size(), 0);
int nThreads = MaxThreadIndex();
ThreadIndex = 0;
// Create a barrier so that we can be sure all worker threads get past
// their call to ProfilerWorkerThreadInit() before we return from this
// function. In turn, we can be sure that the profiling system isn't
// started until after all worker threads have done that.
std::shared_ptr<Barrier> barrier = std::make_shared<Barrier>(nThreads);
// Launch one fewer worker thread than the total number we want doing
// work, since the main thread helps out, too.
for (int i = 0; i < nThreads - 1; ++i)
threads.push_back(std::thread(workerThreadFunc, i + 1, barrier));
barrier->Wait();
}
代码很简单,就是创建了多个线程,把这些线程放容器中保存。比较难理解的是barrier对象,它也是一种同步机制,是pbrt中自定义的一个结构,作用是让所有的线程都执行到一定程度后,ParallelInit函数才继续执行下去,这就是barrier->Wait()的作用。
std::thread(workerThreadFunc, i + 1, barrier)
表示创建一个新的线程,线程的入口函数是workerThreadFunc,参数是i+1和barrier。每一个线程都需要一个入口函数,主线程也一样,所以我们才有int main()。注意,线程创建后会立刻执行入口函数,不会等到所有线程创建好了,运行到barrier->Wait()才开始执行。
接着来看看workerThreadFunc函数:
static void workerThreadFunc(int tIndex, std::shared_ptr<Barrier> barrier) {
LOG(INFO) << "Started execution in worker thread " << tIndex;
ThreadIndex = tIndex;
// Give the profiler a chance to do per-thread initialization for
// the worker thread before the profiling system actually stops running.
ProfilerWorkerThreadInit();
// The main thread sets up a barrier so that it can be sure that all
// workers have called ProfilerWorkerThreadInit() before it continues
// (and actually starts the profiling system).
barrier->Wait();
// Release our reference to the Barrier so that it's freed once all of
// the threads have cleared it.
barrier.reset();
std::unique_lock<std::mutex> lock(workListMutex);
while (!shutdownThreads) {
if (reportWorkerStats) {
ReportThreadStats();
if (--reporterCount == 0)
// Once all worker threads have merged their stats, wake up
// the main thread.
reportDoneCondition.notify_one();
// Now sleep again.
workListCondition.wait(lock);
} else if (!workList) {
// Sleep until there are more tasks to run
workListCondition.wait(lock);
} else {
// Get work from _workList_ and run loop iterations
ParallelForLoop &loop = *workList;
// Run a chunk of loop iterations for _loop_
// Find the set of loop iterations to run next
int64_t indexStart = loop.nextIndex;
int64_t indexEnd =
std::min(indexStart + loop.chunkSize, loop.maxIndex);
// Update _loop_ to reflect iterations this thread will run
loop.nextIndex = indexEnd;
if (loop.nextIndex == loop.maxIndex) workList = loop.next;
loop.activeWorkers++;
// Run loop indices in _[indexStart, indexEnd)_
lock.unlock();
for (int64_t index = indexStart; index < indexEnd; ++index) {
uint64_t oldState = ProfilerState;
ProfilerState = loop.profilerState;
if (loop.func1D) {
loop.func1D(index);
}
// Handle other types of loops
else {
CHECK(loop.func2D);
loop.func2D(Point2i(index % loop.nX, index / loop.nX));
}
ProfilerState = oldState;
}
lock.lock();
// Update _loop_ to reflect completion of iterations
loop.activeWorkers--;
if (loop.Finished()) workListCondition.notify_all();
}
}
LOG(INFO) << "Exiting worker thread " << tIndex;
}
我们最关心的是std::unique_lock<std::mutex> lock(workListMutex);
这一行之后的代码。进入循环中,如果工作列表中没有任务了,那么就在条件变量上等待,这个功能是由这两行代码实现的:
} else if (!workList) {
// Sleep until there are more tasks to run
workListCondition.wait(lock);
如果还有任务,就从工作列表中取一个任务出来。要注意的是,取任务这个操作是被互斥体包围的,这点在上面的代码中就可以看到。取完之后,真正执行任务的时候,互斥体就被释放了(lock.unlock();),然后执行任务。在任务执行的过程中,其他线程可以从工作列表中获取任务执行,这是我们使用多线程的目的。完成任务后,继续获得互斥体(lock.lock();)继续循环看看是否还有任务。
这些操作与我们之前学到的线程池是一致的,说明pbrt中实现了线程池。
ParallelFor函数
void ParallelFor(std::function<void(int64_t)> func, int64_t count,
int chunkSize) {
CHECK(threads.size() > 0 || MaxThreadIndex() == 1);
// Run iterations immediately if not using threads or if _count_ is small
if (threads.empty() || count < chunkSize) {
for (int64_t i = 0; i < count; ++i) func(i);
return;
}
// Create and enqueue _ParallelForLoop_ for this loop
ParallelForLoop loop(std::move(func), count, chunkSize,
CurrentProfilerState());
workListMutex.lock();
loop.next = workList;
workList = &loop;
workListMutex.unlock();
// Notify worker threads of work to be done
std::unique_lock<std::mutex> lock(workListMutex);
workListCondition.notify_all();
// Help out with parallel loop iterations in the current thread
while (!loop.Finished()) {
// Run a chunk of loop iterations for _loop_
// Find the set of loop iterations to run next
int64_t indexStart = loop.nextIndex;
int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex);
// Update _loop_ to reflect iterations this thread will run
loop.nextIndex = indexEnd;
if (loop.nextIndex == loop.maxIndex) workList = loop.next;
loop.activeWorkers++;
// Run loop indices in _[indexStart, indexEnd)_
lock.unlock();
for (int64_t index = indexStart; index < indexEnd; ++index) {
uint64_t oldState = ProfilerState;
ProfilerState = loop.profilerState;
if (loop.func1D) {
loop.func1D(index);
}
// Handle other types of loops
else {
CHECK(loop.func2D);
loop.func2D(Point2i(index % loop.nX, index / loop.nX));
}
ProfilerState = oldState;
}
lock.lock();
// Update _loop_ to reflect completion of iterations
loop.activeWorkers--;
}
}
ParallelFor函数主要做两件事情:1、把任务放到工作列表中去。2、和线程池中的线程一起完成任务。第1件事容易理解,第2件事为啥要做呢?
因为调用ParallelFor的线程也是资源啊,不能让他闲着,和线程池中的线程一起工作,这样也能加快速度。
而且,执行代码与线程池中的线程有区别,就是它不需要去等待条件变量。它是被主线程调用的,如果任务完成,它还需要继续往下执行,所以直接检测任务是否执行完毕就行了。当然,获取任务的时候也需要互斥体保护。
下面来看使用ParallelFor的代码:
// Compute Morton indices of primitives
std::vector<MortonPrimitive> mortonPrims(primitiveInfo.size());
ParallelFor([&](int i) {
// Initialize _mortonPrims[i]_ for _i_th primitive
PBRT_CONSTEXPR int mortonBits = 10;
PBRT_CONSTEXPR int mortonScale = 1 << mortonBits;
mortonPrims[i].primitiveIndex = primitiveInfo[i].primitiveNumber;
Vector3f centroidOffset = bounds.Offset(primitiveInfo[i].centroid);
mortonPrims[i].mortonCode = EncodeMorton3(centroidOffset * mortonScale);
}, primitiveInfo.size(), 512);
这段代码的作用是将所有的primitive转换成mortonPrims。就是将场景中的所有物体的包围盒的中心坐标,用Morton Code表示。这任务非常简单,不涉及到数据共享,所以可以同时执行转换操作,这也就是为什么我们在线程中执行这个函数的时候,不用获得互斥体的原因。
好了,就到这里,洗洗睡了:)
参考资料
C++ Concurrency in Action 2nd edition
C++并发编程第2版中文版:我同学翻译的,质量不错
pbrt源码第3版
网友评论