Parallel2.h
#pragma once
#include <memory>
#include <functional>
#include <vector>
#include <thread>
using namespace std;
class Parallel2
{
struct impl_t;
unique_ptr<impl_t> pImpl;
public:
/**
* 功能描述: 创建并发执行器,这个函数只能串行调用
* @param _onCreate 创建回调,这个函数在线程池启动时会在并发线程中被调用,参数是线程数量
* 一般用来初始化线程每个线程的上下文,可以避免加锁
* @param _onDestroy 销毁回调,这个函数在线程池关闭时会在并发线程中被调用
* @param _fUsage 线程利用率,[0,1]
*/
Parallel2(
function<void(size_t)> _onCreate,
function<void(size_t)> _onDestroy,
float _fUsage = 0.8f
);
Parallel2(
function<void(size_t)> _onCreate,
function<void(size_t)> _onDestroy,
int _nThreadCount = 0
);
/**
* 功能描述: 并发执行,这个函数只能在串行调用
* @param _fnCallBack 运算回调,参数是数据,执行的线程序号(可以用来索引上下文)
* @param _pOBjectArrayPointer 数据指针
* @param _nDataSizeInByte 数据类型的大小(字节)
* @param _nCountOfObjects 数据对象的数量
* @param _bRunTimeRace 是否在运行时抢夺对象,否则提前分配好,
运行时抢夺适合对象过程重度,而且负载不可预测的情况
提前分配适合数量众多的简单运算(加法之类的),每个任务访问连续的内存区域,有利于提高效率
但是最糟的情况下(数量=核心X2-1)会有一个线程承担几乎一半的任务。
*/
void ForEachRawMemory(
function<void(uint8_t*,size_t)> _fnCallBack,
uint8_t* _pOBjectArrayPointer,
size_t _nDataSizeInByte,
size_t _nCountOfObjects,
bool _bRunTimeRace
) const;
/**
* 功能描述: ForEachRawMemory 的模板封装
*/
template<typename T> void ForEach(
function<void(T&,size_t)> _fnCallBack,
T* _pData,
size_t _nCount,
bool _bRunTimeRace = true
) const {
ForEachRawMemory([=](void* p, size_t c) {_fnCallBack(*((T*)p), c);
}, (uint8_t*)_pData, sizeof(T), _nCount, _bRunTimeRace);
}
template<typename T> void ForEach(
function<void(T&, size_t)> _fnCallBack,
const vector<T>& _DataVec,
bool _bRunTimeRace = true
) const {
if (!_DataVec.empty()) {
ForEachRawMemory([=](void* p, size_t c) {_fnCallBack(*((T*)p), c);
}, (uint8_t*)(_DataVec.data()), sizeof(T), _DataVec.size(), _bRunTimeRace);
}
}
/**
* 功能描述: 销毁,线程结束后会调用_onDestroy,这个函数只能串行调用
*/
~Parallel2();
void SetBeforeProcess(size_t c, std::function<void(size_t)>&& func);
size_t GetThreadSize();
std::thread::id GetThreadId(size_t i);
};
Parallel2.cpp
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <assert.h>
using namespace std;
#include "Parallel2.h"
struct Parallel2::impl_t
{
struct worker_t;
vector<unique_ptr<worker_t>> Workers;
atomic_bool bExit;
function<void(uint8_t*,size_t)> pFunc;
function<void(size_t)> onDestroy;
function<void(size_t)> onCreate;
// 运行时抢夺
struct all_range_t {
uint8_t* pBegin;
size_t nStrid;
size_t nCount;
atomic<size_t> nIndex;
void Set(uint8_t* _pDataArray, size_t _nObjectSize, size_t _nCount) {
pBegin = _pDataArray;
nStrid = _nObjectSize;
nCount = _nCount;
nIndex.store(0);
}
bool Next(uint8_t*& p) {
size_t id = nIndex++;
if (id < nCount) {
p = pBegin + id * nStrid;
return true;
}
else {
p = nullptr;
return false;
}
}
};
all_range_t AllRange;
// 预先分配
struct sub_range_t {
uint8_t* pBegin;
uint8_t* pEnd;
size_t nStrid;
void Set(uint8_t* _pDataArray, size_t _nObjectSize, size_t _nCount, size_t _nWorker, size_t _nIndex) {
assert(_nCount);
nStrid = _nObjectSize;
if (_nCount <= _nWorker) {
if (_nIndex < _nCount) {
pBegin = _pDataArray + nStrid * _nIndex;
pEnd = pBegin + nStrid;
}
else {
pBegin = nullptr;
pEnd = nullptr;
}
}
else {
size_t CountPreWork = _nCount / _nWorker;
size_t CountRemain = _nCount % _nWorker;
pBegin = _pDataArray + nStrid * CountPreWork * _nIndex;
if (_nIndex != _nWorker - 1) {
pEnd = pBegin + nStrid * CountPreWork;
}
else {
pEnd = pBegin + nStrid * (CountPreWork + CountRemain);
}
}
}
bool Next(uint8_t*& p) {
if (pBegin != nullptr) {
if (p == nullptr)
{
p = pBegin;
return true;
}
else {
p = p + nStrid;
if (p < pEnd) {
return true;
}
}
}
return false;
}
};
bool bRunTimeRace;
struct worker_t {
// 模拟 win 下 Event 的行为
struct event_t {
bool flag;
condition_variable cv;
mutex mtx;
event_t(bool _init) :flag(_init) {
;
}
void Wait() {
unique_lock < mutex > lock(mtx);
while (!flag)
cv.wait(lock);
flag = false;
}
void Set() {
unique_lock < mutex > lock(mtx);
flag = true;
cv.notify_all();
}
void Reset() {
unique_lock < mutex > lock(mtx);
flag = false;
}
};
size_t nIndex;
unique_ptr<thread> pThread;
impl_t* pHost;
sub_range_t Range;
event_t evToDo;
event_t evDone;
std::function<void(size_t)> fnBeforeProcess;
// 避免用户回调中产生异常导致信号量没被恰当的设置引起死锁
struct guard_t {
worker_t* p;
guard_t(worker_t*_p) :p(_p) {
p->evToDo.Wait();
p->evDone.Reset();
}
~guard_t(){
p->evToDo.Reset();
p->evDone.Set();
}
};
void Go() {
evToDo.Set();
}
worker_t(impl_t* _pHost, size_t _nIdx): nIndex(_nIdx), pHost(_pHost), evToDo(false),evDone(false){
pThread.reset(new thread(bind(&worker_t::Loop, this)));
}
~worker_t() {
pHost->bExit = true;
evToDo.Set();
pThread->join();
}
void Loop() {
if (pHost->onCreate) {
pHost->onCreate(nIndex);
}
while (!pHost->bExit)
{
guard_t guard(this);
if (pHost->bExit) {
return;
}
if(fnBeforeProcess)
{
fnBeforeProcess(nIndex);
fnBeforeProcess = nullptr;
}
uint8_t* p = nullptr;
if (pHost->bRunTimeRace) {
while (pHost->AllRange.Next(p)) {
#ifdef _DEBUG
pHost->pFunc(p, nIndex);
#else
try {
pHost->pFunc(p, nIndex);
}
catch (...) {
assert(false);
}
#endif // DEBUG
}
}
else {
while (Range.Next(p)) {
#ifdef _DEBUG
pHost->pFunc(p, nIndex);
#else
try {
pHost->pFunc(p, nIndex);
}
catch (...) {
assert(false);
}
#endif // DEBUG
}
}
}
if (pHost->onDestroy) {
pHost->onDestroy(nIndex);
}
}
void Wait() {
evDone.Wait();
}
};
impl_t(function<void(size_t)> _onCreate, function<void(size_t)> _onDestroy,float _fUsage) {
bExit = false;
bRunTimeRace = true;
int nCore = (int)thread::hardware_concurrency();
int nWork = int(_fUsage * nCore);
if (nWork < 1) {
nWork = 1;
}
if (nWork > nCore) {
nWork = nCore;
}
onDestroy = _onDestroy;
onCreate = _onCreate;
Workers.resize(nWork);
for (int i = 0; i < nWork; ++i) {
Workers[i].reset(new worker_t(this, i));
}
}
impl_t(function<void(size_t)> _onCreate, function<void(size_t)> _onDestroy, int _nThreadCount) {
bExit = false;
bRunTimeRace = true;
int nWork = _nThreadCount;
if (nWork < 1) {
nWork = (int)thread::hardware_concurrency();
}
onDestroy = _onDestroy;
onCreate = _onCreate;
Workers.resize(nWork);
for (int i = 0; i < nWork; ++i) {
Workers[i].reset(new worker_t(this, i));
}
}
~impl_t()
{
bExit = true;
Workers.clear();
}
void DoOnMemory(function<void(uint8_t*, size_t)> _fnCallBack,
uint8_t* _pOBjectArrayPointer,
size_t _nDataSizeInByte,
size_t _nCountOfObjects,
bool _bRunTimeAssign )
{
if (_fnCallBack == nullptr ||
_pOBjectArrayPointer == nullptr ||
_nDataSizeInByte == 0 ||
_nCountOfObjects == 0) {
return;
}
bRunTimeRace = _bRunTimeAssign;
pFunc = _fnCallBack;
size_t nWorker = Workers.size();
if (bRunTimeRace) {
AllRange.Set(_pOBjectArrayPointer, _nDataSizeInByte, _nCountOfObjects);
}
else {
for (size_t i = 0; i < nWorker; ++i) {
Workers[i]->Range.Set(_pOBjectArrayPointer, _nDataSizeInByte, _nCountOfObjects, nWorker, i);
}
}
for (size_t i = 0; i < nWorker; ++i) {
Workers[i]->Go();
}
for (size_t i = 0; i < nWorker; ++i) {
Workers[i]->Wait();
}
}
};
void Parallel2::ForEachRawMemory(
function<void(uint8_t*, size_t)> _fnCallBack,
uint8_t* _pOBjectArrayPointer,
size_t _nDataSizeInByte,
size_t _nCountOfObjects,
bool _bRunTimeAssign ) const {
pImpl->DoOnMemory(_fnCallBack, _pOBjectArrayPointer, _nDataSizeInByte, _nCountOfObjects, _bRunTimeAssign);
}
void Parallel2::SetBeforeProcess(size_t c, std::function<void(size_t)>&& func)
{
pImpl->Workers[c]->fnBeforeProcess = std::move(func);
}
size_t Parallel2::GetThreadSize()
{
return pImpl->Workers.size();
}
std::thread::id Parallel2::GetThreadId(size_t i)
{
return pImpl->Workers[i]->pThread->get_id();
}
Parallel2::Parallel2(function<void(size_t)> _onCreate, function<void(size_t)> _onDestroy,float _fUsage) {
pImpl.reset(new impl_t(_onCreate, _onDestroy,_fUsage));
}
Parallel2::Parallel2(function<void(size_t)> _onCreate, function<void(size_t)> _onDestroy, int _nThreadCount) {
pImpl.reset(new impl_t(_onCreate, _onDestroy, _nThreadCount));
}
Parallel2::~Parallel2() {
;
}
Parallel2.testcase.h
//---------------------------------------------------------------------------------------
// 测试代码
// 做简单并发性能测试,这个例子中由于计算太简单,且多线程会引起缓存命中问题,会更慢
// 尤其运行时竞争策略时,还会有额外的原子操作开销
// 这里只做正确性测试
#pragma once
#include "Parallel2.h"
#include <vector>
#include <sstream>
#include <atomic>
#include <thread>
#include <windows.h>
namespace GxParallel_TestCase {
void onCreate(size_t _n) {
printf("create threads (count = %d) context.\r\n", (int)_n);
}
void onDestroy(size_t _n) {
printf("destroy threads context.\r\n");
}
struct player_t {
atomic<int> n;
int a, b, c;
int64_t r;
player_t() {
a = rand();
b = rand();
c = rand();
n = 0;
}
void Tick() {
r = a + b*c;
++n;
}
void Test() {
if (r != (a + b*c)) {
printf("error!\r\n");
}
if (n != 1) {
printf("error!\r\n");
}
n = 0;
}
};
void TickPlayer(player_t&p, size_t idx) {
p.Tick();
std::thread::id this_id = std::this_thread::get_id();
stringstream ss;
ss << "idx:" << idx << " " << this_id << "\r\n";
OutputDebugStringA(ss.str().c_str());
}
inline void test() {
static const size_t N = 40;
vector<player_t> vecPlayer(N);
{
Parallel2 ThreadPool(onCreate, onDestroy, 4);
OutputDebugStringA("---------------------------Runtime Race---------------------------\r\n");
for(int k =0;k<1;++k)
{
ThreadPool.ForEach<player_t>(TickPlayer, vecPlayer, true);
for (auto&& p : vecPlayer) {
p.Test();
}
}
OutputDebugStringA("---------------------------Pre Assign---------------------------\r\n");
for (int k = 0; k<1; ++k)
{
ThreadPool.ForEach<player_t>(TickPlayer, vecPlayer, false);
for (auto&& p : vecPlayer) {
p.Test();
}
}
}
printf("any key to exit.\r\n");
getchar();
}
}
main.cpp
#include "stdafx.h"
#include "Parallel2.testcase.h"
int main()
{
GxParallel_TestCase::test();
return 0;
}
创建并行执行器的时候会根据参数/硬件配置,创建相应的线程数量,有2种模式,1:运行时抢夺(适合过程计算复杂) 2:预分配(适合过程计算简单)
运行时抢夺大致运行流程说明
每个线程负责一个任务,如果当前线程任务完成就继续从总任务中获取一个进行操作直到所有任务完成
预分配大致运行流程说明
预分配大致运行流程说明
预分配模式的话会根据线程数量直接把所有任务平均分配给每个线程,每个线程操作的数据完全独立,
网友评论