- 最初简单的做法是使用单线程while死循环接收从tcp或udp客户端发来的json数据(接收到数据——生产者)后,立即调用处理json数据的函数(消费者),直到这份数据被处理完毕,才会再一次循环接收socket数据。
int loop_flag = 1;
datatype json_data;
while(loop_flag)
{
json_data = recv_json(); //生产者,在别处定义声明
if(json_data)
{
parse_json( json_data ); //消费者,在别处定义声明
}
}
-
这样做带来的问题是,可能在处理json数据(消费数据)的过程中,又有新的socket数据发来了,但是由于程序还阻塞在同步的处理数据函数(消费者)中,还轮不到接收数据的生产者执行,因此可能会丢失数据。
-
这就是生产者和消费者耦合严重。消费者消费太慢会影响到生产。
解决办法——引入缓冲队列,合理使用生产消费模式
![](https://img.haomeiwen.com/i5822251/40e6f20beb5e9020.png)
- 可以将生产者生产的数据push推送到一个缓冲队列中,数据入队。
- 消费者使用一个单独的线程,从缓冲队列中取用数据,数据出队。
int loop_flag = 1; //全局变量
queue<datatype> q_json_datas; //生产者和消费者两线程共享的数据
mutex mtx; //互斥量,用来对多线程共享数据加锁
//生产者线程执行函数,内有while循环保持线程不结束
void Producer()
{
while(loop_flag)
{
datatype json_data;
json_data = recv_json(); //生产者,在别处定义声明
if(json_data && !队列满)
{
mtx.lock(); //线程访问共享数据要加锁
q_json_datas.push( json_data ); //生产的数据先保存到缓冲队列
mtx.unlock(); // 使用完解锁
}
}
}
//创建一个单独的消费者线程
void Consumer()
{
//也使用一个while循环检查队列中是否还有未处理的数据,阻塞线程不结束
while(loop_flag)
{
if( !q_json_datas.empty())
{
mtx.lock(); //线程访问共享数据要加锁
datatype json_data2 = q_json_datas.front(); //从队列中获取数据
q_json_datas.pop(); //将已消费的数据从队列中清除
mtx.unlock();
parse_json( json_data2 ); //消费者,在别处定义声明
}
}
}
int main()
{
thread t1( &Producer ); //创建一个子线程来作为生产者
thread t2( &Consumer ); //创建一个子线程来作为消费者
}
- 如此一来,生产者和消费者解耦了,他们不是直接联系在一起而是通过缓冲队列联系,当然在缓冲队列出队和入队时要注意对共享的缓冲队列加锁。
- 另一个问题:若是收到的某些json数据要求第一时间被处理,那就涉及到了优先级队列的问题,即这些数据收到后要放到待处理队列的头部。因此队列不能用普通的queue容器来实现,可以用priority_queue优先级队列来实现。
如此一来,就需要在json数据中有标明优先级的一个字段。但是为了是生产者线程能够立即再投入生产,应该考虑让新生产(接收到)的数据快速进入缓冲队列,即标明优先的就放到队列最前面,不考虑优先级的比较再排位置。 - 在多线程加锁处理过程中,还可以考虑结合使用信号量和互斥锁。特别是在缓冲队列只能存放一个元素这种极端的情况下,只能生产者先生产入队,然后消费者才能去消费,这种情况如果一直用while循环,会大大的浪费cpu。如果用信号量的PV操作来控制生产者和消费者的同步,那就更方便了,生产者生产后执行V操作,使信号量+1,然后消费者被唤醒去执行P操作使信号量-1。但是这种使用方式类似于条件变量了。
- 如果是C语言,将队列换成链表,能够更灵活地使用空间,每次加减任务只需要加减链表节点就行了。当然如果是C++,那还是用STL的queue比较方便。
网友评论