流水线设计实践
理论: 将一个产品的加个过程分解为n 个步骤, 如果产品够多, 且机器支持n个并行,则理论上可以达到原来n 倍效率
设 将一个单位A 加个成B 的time 为 t , 则单并行加工m个 所需时间为 m * t,
而n并行的所需时间为 : t + (m - 1) * (t/n) , 单个产品的所需时间为 : (t + (m - 1) * (t/n))/ m;
注: 第一个t 表示 生产第一个产品所需的时间 , 之后每隔 t/n 时间产生一个新产品。 因此可得出结论 ,当m 足够大时, 所需时间为原来的1/n , 当m很小时 则t不可忽略。 因此该设计方法适合大批量的加工 ,不适合很少产品的加工。
开发过程
待加工产品: [Product_A] map [Product_B]
Product_A = Int
Product_B = Int
串行 map
map = Product_A.forEach.add {1...10000} // 累加 从 +1 加到 + 10000
并发 map
// 即每个线程计算量为单线程的 1/ n
map = Product_A.forEach.add {10000 / n * i + 1... 10000 / n * (i + 1) }
数据结构
采用链表构建队列 (先进先出, 链表add last,remove first 的算法复杂度为O(1)有效避免子线程操作的时候造成额外的性能损害)
锁
- 每个thread拥有两把锁, waitProcessLock and processedLock
- currentThread.processedArray = nextThreadWaitProcessArray
流程大致如下:
while(1) {
waitProcessLock lock;
while(waitProcessArray empty) {
waitProcessLock wait;
}
waitProcessElement = waitProcessArray.pop
waitProcessLock unlock;
waitProcessElement processed;
processedLock lock;
ProcessedArray add processedElement;
processedLock signal;
processedLock unlock;
}
测试程序
assert([Product_B].forEach == 1...10000 + i)
i为 [Product_A]的初始值, 从 1 递增。
这样能保证最后的结果 不仅正确而且有序。
经过一番调试程序能够运行了,但是效果并不理想:
- 多线程所占cpu 大约为 350% , 单线程 100%
- 多线程性能 只为 单线程2倍。 (我开了8个子线程)
尝试了很多种方法都无法优化性能:
-
做无锁操作
(1) 因为 currentThread.processedArray = nextThreadWaitProcessArray 的缘故 , 事实上下两个thread share a public lock, 这样造成 链表要么做add opreate, 要么做 remove opreate. 而事实上一个在队列头一个在队列尾,只要二者不是操作同一个对象,就可以允许删除的时候,做添加操作
-
做缓存操作
(1) 只有在处理的数据大于1000的时候,才发出signal 信号
(2) 启动一个定时器 每隔一段时间发出 signal 信号, 因为如果只有第一步的话, 永远不会处理数据量小于1000的情况
上述两种操作仍然无法优化性能,直到后来使用instrument 查看CPU情况,才发现只有2个核,我一直以为我的电脑是4核,拿着同事的4核电脑测试了一下,性能果然是4倍.也就是说性能无法提升了,达到理论值了。
然后我又想为什么 在不允许同时进行 add 和 remove的情况下,为什么没有影响性能呢?
想了下得到了答案。 每一把锁住的区域就做临界区。非临界区允许并发,临界区只允许串行。 但是只要非临界区所需耗时够大,发生临界区的竞争几率就变小了。
简单的说 如果map 操作的耗时,远大于临界区的耗时,那么就不会发生锁竞争。
为了验证这个,我调整了map操作的耗时,从+{1...10000} 变成 +{1...100} 结果符合预期,由于map 耗时减少, 造成锁竞争几率变大,结果导致多线程性能大大降低。
第二个问题,为什么减少 signal 信号,仍无法提高性能呢? 那是因为发出 signal 信号的时候, wait thread 并不会马上被唤醒启动,这样的话 并不会因为多次发出 signal 造成性能损耗,同时 发出signal 同样不会阻塞current thread.因此上述操作纯属多余。
![](https://img.haomeiwen.com/i2031664/445074a85ab37222.png)
好了 ,我现在来解决 +{1...100}, 锁竞争的问题, 首先我去掉了 所有的 condition lock , 现在只发condition sign,但是并不锁住condition mutex.
这时一个队列就可以同时进行增加和删除操作 现在有个问题需要解决,在队列只有一个元素的时候,同时进行增加和删除操作是会出问题的,所以我尝试了一下形式加锁:
add and remove operate share a public lock
// add operate method body
if (_head == NULL ) {
...
} else if(_head == _tail) {
[_lock lock];
_tail->next = element;
_tail = element;
[_lock unlock];
}
代码很快就crash掉了, 我分析了一下 在判断_head == _tail
为真的时候,如果此时线程进入阻塞状态, 而另外一个线程进行删除操作,等切换回该线程的时候_tail已经是空了,这时在对其进行操作引发crash,那么如何改呢? 比较简单的是扩大临界区,扩大到整个函数体,但是这并非我想要的,我想要的是仅当队列只有一个元素才加锁,将整个函数体全锁起来意味着add和remove无法并发了,那么还是无法解决锁竞争的问题。
于是我决定使用另外一个方案:
使用2把锁
一把锁住add 操作,一把锁住remove 操作,这样 add 和remove互不影响可以并发。
但是怎么知道当队列只有一个元素的时候,有两个线程对其同时操作呢? 我采用try lock, 并且试图使用锁会退避免思索代码如下:
// add operate method body
// func name addElementToLast:
if (_head == _tail) {
if ([_popElementLock tryLock]) {
_tail->next = element;
_tail = element;
[_popElementLock unlock];
} else {
[_addElementLock unlock];
[self addElementToLast:element];
return;
}
}
很快我的代码又crash掉了,这次是栈溢出,或许我还不太理解真正的锁回退。由于递归调用addElementToLast,我的栈空间被爆了。于是我又换了种写法
if (_head == _tail) {
if ([_popElementLock tryLock]) {
_tail->next = element;
_tail = element;
[_popElementLock unlock];
} else {
[_addElementLock unlock];
// 由于此时 _popElementLock 已经被其他线程持有
// 此时执行lock 一定会被堵塞
[_popElementLock lock];
[_popElementLock unlock];
[self addElementToLast:element];
return;
}
}
这次我学乖了, 我在 tryLock 失败后, 立马释放掉自己持有的锁, 然后再去锁另外一个线程持有的锁。 这个时候该线程就被阻塞掉了。好的这下就解决了锁的问题。
最后一个问题,我的电脑是双核四线程的,在占有350%cpu 的情况下,为何只提高了2倍性能呢,按我的理解如果因为锁的问题导致线程竞争的话,那么线程在阻塞的时候不会消耗CPU, 也就是说锁不会导致cpu空转效率低下这种状况的出现, 多余的性能都消耗在哪里呢, 于是我开始测试单线程的耗时 ,比较发现标准线程的耗时比单线程的耗时要少很多, 于是我开始寻找为什么二者耗时不相等的原因。
![](https://img.haomeiwen.com/i2031664/2d666575e21a2156.png)
红框是循环体耗时函数,两者差距非常大, 于是我仔细了阅读两个循环函数体,发现其中一个使用了常量宏
, 通过调整 最终使两者的耗时充分接近,但是单线程的耗时 还是比标准线程耗时要多一些, 最终得到了接近3倍的提升,效果比较满意了。
网友评论