Linux下PCI设备驱动开发详解(七)
本章及其以后的几章,我们将通过PCI Express总线实现CPU和FPGA数据通信的简单框架。
这个框架就是开源界非常有名的RIFFA(reuseable integration framework for FPGA accelerators),它是一个FPGA加速器的一种可重用性集成框架,是一个第三方开源PCIe框架。
该框架要求具备一个支持PCIe的工作站和一个带有PCIe连接器的FPGA板卡。RIFFA支持windows、linux,altera和xilinx,可以通过c/c++、python、matlab、java驱动来实现数据的发送和接收。驱动程序可以在linux或windows上运行,每一个系统最多支持5个FPGA device。
在用户端有独立的发送和接收端口,用户只需要编写几行简单代码即可实现和FPGA IP内核通信。
riffa使用直接存储器访问(DMA)传输和中断信号传输数据。这实现了PCIe链路上的高带宽,运行速率可以达到PCIe链路饱和点。
本文主要介绍消息队列,即riffa.c和riffa.h文件。
riffa是为了在内核中使用而编写的消息队列,用于同步中断和进程。
开源地址:https://github.com/KastnerRG/riffa
一、消息队列是什么?
在学习消息队列之前,我们先说一下什么是队列(queue)。队列queue可以说是一个数据结构,可以存储数据,如下图,我们从左侧插入元素(入队),从队头获取元素(出队)。
![](https://img.haomeiwen.com/i29469226/acc715c2b449eac5.png)
对于这样一个数据结构大家并不陌生,java/c++也是实现好多队列。例如,创建线程池我们需要一个阻塞队列,JDK的lock也需要队列。
了解到队列后,我们看一下什么是消息队列,消息队列就是我们常说的MQ,message queue,是作为一个单独的中间件产品存在的,独立部署。
![](https://img.haomeiwen.com/i29469226/0e949fd2a8e4dca3.png)
二、为什么要用消息队列呢?
消息队列在某些场景下发挥奇效,例如:解耦、异步、削峰。
1. 解耦
解耦都不陌生吧,就是降低耦合度,我们都知道Spring的主要目的是降低耦合,那MQ又是如何解耦的呢?
如下图所示,系统A是一个关键性的系统,产生数据后需要通知到系统B和系统C做响应的反应,三个系统都写好了,稳定运行;某一天,系统D也需要在系统A产生数据后作出反应,那就得系统A改代码,去调系统D的接口,好,改完了,上线了。假设过了某段时间,系统C因为某些原因,不需要作出反应了,不要系统A调它接口了,就让系统A把调接口的代码删了,系统A的负责人可定会很烦,改来改去,不停的改,不同的测,还得看会不会影响系统B,系统D。没办法,这种架构下,就是这样麻烦。
![](https://img.haomeiwen.com/i29469226/7f9aa9ff765d5313.png)
而且这样还没考虑异常情况,假如系统A产生了数据,本来需要实时调系统B的,结果系统B宕机了或重启了,没调成功咋办,或者调用返回失败怎么办,系统A是不是要考虑要不要重试?还要开发一套重试机制,系统A要考虑的东西也太多了吧。
那如果使用MQ会是什么样的效果呢?如下图所示,系统A产生数据之后,将该数据写到MQ中,系统A就不管了,不用关心谁消费,谁不消费,即使是再来一个系统E,或者是系统D不需要数据了,系统A也不需要做任何改变,而系统B、C、D是否消费成功,也不用系统A去关心,通过这样一种机制,系统A和其他各系统之间的强耦合是不是一下子就解除了,系统A是不是一下子清爽了很多?
![](https://img.haomeiwen.com/i29469226/aaaf4d404d5d062d.png)
2. 异步
同步/异步大家都知道吧,举个例子,你早上起床后边吃早饭边看电视(异步),而不是吃完饭再看电视(同步)。在上述例子中,没有使用MQ时,系统A要调系统B、C、D的接口,我们看一下下面的伪代码想一下是不是这样:
Data newData = produceData(); // 系统A经过一系列逻辑处理产生了数据,耗时200ms
Response respB = callsysB(newData); //系统A调用系统B接口发送数据,耗时300ms
Response respC = callsysyC(newData); //系统A调用系统C发送接口,耗时300ms
Response respD = callsysyD(newData); //系统A调用系统D发送接口,耗时300ms
这样系统A的用户做完这个操作需要等待:
200ms+300ms+300ms+300ms=1100ms=1.1s
假设使用了MQ呢?系统A就只需要把产生的数据放到MQ里就行了,就可以立马返回用户响应。伪代码如下:
//系统A的代码
Data newData = produceData(); //系统A经过一些逻辑后产生了数据,耗时200ms
writeDataToMQ(newData); //往MQ写消息,耗时50ms
这样系统A的用户做完这个操作就只需要等待200ms+50ms=250ms,给用户的感觉就是一瞬间的事儿,点一下就好了,用户体验提升了很多。
系统A把数据写到MQ里,系统B、C、D就自己去拿,慢慢消费就行了。一般就是一些时效性要求不高的操作,比如下单成功系统A调系统B发下单成功的短信,短信晚几秒发都是OK的。
三、RIFFA驱动中的消息队列
消息队列是一种线程间同步的方法,在Linux应用编程中又称为无名管道,其主体就是一个FIFO,读写跨接在两个线程间。
消息队列主要是用来同步并且传递数据,所以都有阻塞功能,但因为RIFFA中用的是自己编写的消息队列不自带阻塞,故需要配合阻塞I/O实现完整的消息队列功能。
消息队列的操作函数和数据结构在circ_queue.c与circ_queue.h中,下面看一下这两个文件。
circ_queue.h中定义了消息队列的数据结构circ_queue,里面的成员中,writeIndex和readIndex是写指针和读指针,类型是原子变量,为了解决多线程操作时的竞争问题;**vals是指针数组,指向消息队列的存储体;len是消息队列长度。
/* struct for the circular queue */
struct circ_queue {
atomic_t writeIndex;
atomic_t readIndex;
unsigned int ** vals;
unsigned int len;
};
typedef struct circ_queue circ_queue;
circ_queue.c中定义了消息队列的初始化函数init_circ_queue,如下所示:
/**
* initializes a circ_queue with depth/length len. returns non-NULL or success,
* NULL if there was a problem creating the queue.
*/
circ_queue * init_circ_queue(int len)
{
int i;
circ_queue *q;
q = kzalloc(sizeof(circ_queue), GFP_KERNEL);
if (q == NULL) {
printk(KERN_ERR "not enough memory to allocate circ_queue.");
return NULL;
}
atomic_set(&q->writeIndex, 0);
atomic_set(&q->readIndex, 0);
q->len = len;
q->vals = (unsigned int **) kzalloc(len*sizeof(unsigned int *), GFP_KERNEL);
if (q->vals == NULL) {
printk(KERN_ERR "not enough memory to allocate circ_queue array.");
return NULL;
}
for (i = 0; i < len; i++) {
q->vals[i] = (unsigned int *)kzalloc(2*sizeof(unsigned int), GFP_KERNEL);
if (q->vals[i] == NULL) {
printk(KERN_ERR "not enough memory to allocate circ_queue position.");
return NULL;
}
}
return q;
}
首先调用kzalloc创建了circ_queue结构体指针q,这个q就是描述消息队列的对象,然后把读写指针复位,接着又调用kzalloc分配了消息队列存储体的数组整体,每个元素都是一个空指针(此时消息队列还没有生成),最后为每个空指针都分配两个数据空间,此时消息队列建立完毕,q中所描述的是一个二维数组,行数2,列数len,也就是一个存储单元能存2个数。
再看一个函数queue_count_to_index,这个函数用来将读写指针变成数组的索引号:
/**
* internal function to help count. returns the queue size normalized position.
*/
unsigned int queue_count_to_index(unsigned int count, unsigned int len)
{
return (count % len);
}
可以看到这个函数对count参数做了模运算,说明是一个环形队列,消息队列的全貌如下:
![](https://img.haomeiwen.com/i29469226/717a41a2b6196ee1.png)
读写消息队列函数就没什么好说的了,就是对这个环形缓冲区进行一次读写,每个单元存两个值,成功返回0,失败返回1。值得注意的是,和FIFO一样,读数据不会让数据消失,只是读写指针自增了而已。
写消息队列函数为push_circ_queue;读消息队列函数为pop_circ_queue;
这里直接给出源代码:
int push_circ_queue(circ_queue * q, unsigned int val1, unsigned int val2)
{
unsigned int curReadIndex;
unsigned int curWriteIndex;
curWriteIndex = atomic_read(&q->writeIndex);
currReadIndex = atomic_read(&q->readIndex);
// The queue is full
if (queue_count_to_index(currWriteIndex+1, q->len) == queue_count_to_index(currReadIndex, q->len)) {
return 1;
}
// Save the data into the queue
q->vals[queue_count_to_index(currWriteIndex, q->len)][0] = val1;
q->vals[queue_count_to_index(currWriteIndex, q->len)][1] = val2;
// Increment atomically write index. Now a consumer thread can read
// the piece of data that was just stored.
atomic_inc(&q->writeIndex);
return 0;
}
int pop_circ_queue(circ_queue * q, unsigned int * val1, unsigned int * val2)
{
unsigned int currReadIndex;
unsigned int currMaxReadIndex;
do
{
currReadIndex = atomic_read(&q->readIndex);
currMaxReadIndex = atomic_read(&q->writeIndex);
// The queue is empty or a producer thread has allocate space in the queue
// but is waiting to commit the data into it
if (queue_count_to_index(currReadIndex, q->len) == queue_count_to_index(currMaxReadIndex, q->len)) {
return 1;
}
// Retrieve the data from the queue
*val1 = q->vals[queue_count_to_index(currReadIndex, q->len)][0];
*val2 = q->vals[queue_count_to_index(currReadIndex, q->len)][1];
// Try to perfrom now the CAS operation on the read index. If we succeed
// label & val already contain what q->readIndex pointed to before we
// increased it.
if (atomic_cmpxchg(&q->readIndex, currReadIndex, currReadIndex+1) == currReadIndex) {
// The lable & val were retrieved from the queue. Note that the
// data inside the label or value arrays are not deleted.
return 0;
}
// Failed to retrieve the elements off the queue. Someone else must
// have read the element stored at countToIndex(currReadIndex)
// before we could perform the CAS operation.
} while(1); // keep looping to try again!
return 1;
}
四、未完待续
Linux下PCI设备驱动开发详解(八),将详细分析一下RIFFA的用户态如何使用RIFFA框架。
网友评论