简述
linux内核的workqueue引入背景、使用方法网上很多,在内核源码的documentation中也有对workqueue的说明,路径为Documentation/core-api/workqueue.rst
。
下面两个图是创建workqueue的原理图,来自这里(如有侵权,请留言,多谢)


openharmony的HDF框架中也引入了workqueue的机制以及统一的接口供驱动层使用,但是对于底层系统liteos-a与linux做了适配,系统层实现还是有很多不一样的地方,本文详细说明一下liteos-a系统中workqueue的使用方法以及详细的流程分析
workqueue使用方法
- 使用方法
openharmony系统对workqueue提供了几个接口,以方便用户的使用
- 先调用
HdfWorkQueueInit
,传入queue名称,创建并初始化一个workqueue - 再调用
HdfWorkInit
,我理解应该是一个work的模板,里主要是记录这个queue里面的work需要调用的func以及para信息,类似于linux的work_struct - 通过调用
HdfAddWork
往workqueue中添加work,就会自动调用work_struct中记录的func
- 以Gryo为例,说明下这种用法
- 在
Gryo
驱动中可以看到在初始化时对workqueue的初始化,代码如下
static int32_t InitGyroData(void)
{
struct GyroDrvData *drvData = GyroGetDrvData();
int32_t ret;
CHECK_NULL_PTR_RETURN_VALUE(drvData, HDF_ERR_INVALID_PARAM);
if (drvData->initStatus) {
return HDF_SUCCESS;
}
if (HdfWorkQueueInit(&drvData->gyroWorkQueue, HDF_GYRO_WORK_QUEUE_NAME) != HDF_SUCCESS) {
HDF_LOGE("%s: gyro init work queue failed", __func__);
return HDF_FAILURE;
}
if (HdfWorkInit(&drvData->gyroWork, GyroDataWorkEntry, drvData) != HDF_SUCCESS) {
HDF_LOGE("%s: gyro create thread failed", __func__);
return HDF_FAILURE;
}
CHECK_NULL_PTR_RETURN_VALUE(drvData->ops.Init, HDF_ERR_INVALID_PARAM);
ret = drvData->ops.Init(drvData->gyroCfg);
if (ret != HDF_SUCCESS) {
HDF_LOGE("%s: gyro create thread failed", __func__);
return HDF_FAILURE;
}
drvData->interval = SENSOR_TIMER_MIN_TIME;
drvData->initStatus = true;
drvData->enable = false;
return HDF_SUCCESS;
}
- 从调用情况看work要执行的接口为
GyroDataWorkEntry
,看实现就是实现Gryo这个sensor的读数据操作
static void GyroDataWorkEntry(void *arg)
{
int32_t ret;
struct GyroDrvData *drvData = (struct GyroDrvData *)arg;
CHECK_NULL_PTR_RETURN(drvData);
CHECK_NULL_PTR_RETURN(drvData->ops.ReadData);
ret = drvData->ops.ReadData(drvData->gyroCfg);
if (ret != HDF_SUCCESS) {
HDF_LOGE("%s: gyro read data failed", __func__);
return;
}
}
- 在定时器处理函数中调用
HdfAddWork
,底层就会自动调用接口GyroDataWorkEntry
static void GyroTimerEntry(uintptr_t arg)
{
int64_t interval;
int32_t ret;
struct GyroDrvData *drvData = (struct GyroDrvData *)arg;
CHECK_NULL_PTR_RETURN(drvData);
if (!HdfAddWork(&drvData->gyroWorkQueue, &drvData->gyroWork)) {
HDF_LOGE("%s: gyro add work queue failed", __func__);
}
interval = OsalDivS64(drvData->interval, (SENSOR_CONVERT_UNIT * SENSOR_CONVERT_UNIT));
interval = (interval < SENSOR_TIMER_MIN_TIME) ? SENSOR_TIMER_MIN_TIME : interval;
ret = OsalTimerSetTimeout(&drvData->gyroTimer, interval);
if (ret != HDF_SUCCESS) {
HDF_LOGE("%s: gyro modify time failed", __func__);
}
}
liteos-a系统下workqueue的实现分析
HdfWorkQueueInit
接口
- 在源码中搜索此接口,这里分析liteos-a系统,所以找liteos相关的实现,此接口是在文件
drivers/adapter/khdf/liteos/osal/src/osal_workqueue.c
中,实现如下:
int32_t HdfWorkQueueInit(HdfWorkQueue *queue, char *name)
{
if (queue == NULL || name == NULL) {
HDF_LOGE("%s invalid para", __func__);
return HDF_ERR_INVALID_PARAM;
}
queue->realWorkQueue = create_singlethread_workqueue(name);
if (queue->realWorkQueue == NULL) {
HDF_LOGE("%s create queue fail", __func__);
return HDF_FAILURE;
}
return HDF_SUCCESS;
}
-
create_singlethread_workqueue
接口是在文件kernel/liteos_a/bsd/compat/linuxkpi/include/linux/workqueue.h
定义,就是一个宏
#define create_singlethread_workqueue(name) \
linux_create_singlethread_workqueue(name)
- 实现代码在文件
kernel/liteos_a/bsd/compat/linuxkpi/src/linux_workqueue.c
中定义,如下
struct workqueue_struct *linux_create_singlethread_workqueue(char *name)
{
return __create_workqueue_key(name, 1, 0, 0, NULL, NULL);
}
-
__create_workqueue_key
中初始化化了一个event,这个后面用到再讲;创建一个workqueueThread线程用来处理这个workqueue里的所有work,具体如下(删除了一些正确判断代码)
struct workqueue_struct *__create_workqueue_key(char *name,
int singleThread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lockName)
{
wq = (struct workqueue_struct *)LOS_MemAlloc(m_aucSysMem0, sizeof(struct workqueue_struct));
wq->cpu_wq = (cpu_workqueue_struct *)LOS_MemAlloc(m_aucSysMem0, sizeof(cpu_workqueue_struct));
wq->name = name;
wq->singlethread = singleThread;
wq->freezeable = freezeable;
wq->rt = rt;
wq->delayed_work_count = 0;
INIT_LIST_HEAD(&wq->list);
(VOID)LOS_EventInit(&wq->wq_event);
if (singleThread) {
cwq = InitCpuWorkqueue(wq, singleThread);
ret = CreateWorkqueueThread(cwq, singleThread);
} else {
LOS_MemFree(m_aucSysMem0, wq->cpu_wq);
LOS_MemFree(m_aucSysMem0, wq);
return NULL;
}
if (ret) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
-
LOS_EventInit
就是liteos系统的两个task之间通信的事件机制实现 -
CreateWorkqueueThread
就是调用的liteos的LOS_TaskCreate
来创建一个Task(也即thread),task处理函数为WorkerThread
,如下
STATIC VOID WorkerThread(cpu_workqueue_struct *cwqParam)
{
cpu_workqueue_struct *cwq = cwqParam;
for (;;) {
if (WorkqueueIsEmpty(cwq)) {
(VOID)LOS_EventRead(&(cwq->wq->wq_event), 0x01, LOS_WAITMODE_OR | LOS_WAITMODE_CLR, LOS_WAIT_FOREVER);
}
RunWorkqueue(cwq);
}
}
线程处理函数里面就是一个死循环不停的处理,当workqueue中为空时,代码会停在LOS_EventRead处,这里就是事件机制了,在读一个还未发生的事件时,代码就会在此处一直阻塞,直到事件发生;
- 在事件发生或者有work可以处理时,就会调用真正的处理接口
RunWorkqueue
,里面最主要的就是对work调用上层通过HdfWorkInit
接口传入的处理函数(例如上面Gryo中的GyroDataWorkEntry
)
STATIC VOID RunWorkqueue(cpu_workqueue_struct *cwq)
{
if (!WorkqueueIsEmpty(cwq)) {
work = worklist_entry(cwq->worklist.next, struct work_struct, entry);
LOS_SpinUnlockRestore(&g_workqueueSpin, intSave);
func(work);
LOS_SpinLockSave(&g_workqueueSpin, &intSave);
}
LOS_SpinUnlockRestore(&g_workqueueSpin, intSave);
}
HdfWorkInit
接口
- 此接口也是在文件
drivers/adapter/khdf/liteos/osal/src/osal_workqueue.c
中定义,代码如下
int32_t HdfWorkInit(HdfWork *work, HdfWorkFunc func, void *para)
{
struct work_struct *realWork = NULL;
struct WorkWrapper *wrapper = NULL;
if (work == NULL || func == NULL) {
HDF_LOGE("%s invalid para", __func__);
return HDF_ERR_INVALID_PARAM;
}
work->realWork = NULL;
wrapper = (struct WorkWrapper *)OsalMemCalloc(sizeof(*wrapper));
if (wrapper == NULL) {
HDF_LOGE("%s malloc fail", __func__);
return HDF_ERR_MALLOC_FAIL;
}
realWork = &(wrapper->work.work);
wrapper->workFunc = func;
wrapper->para = para;
INIT_WORK(realWork, WorkEntry);
work->realWork = wrapper;
return HDF_SUCCESS;
}
- 从这里看这个处理函数func是赋值给了wrapper->workFunc,而在
INIT_WORK
中将WorkEntry
接口赋值给了work的func
了,
#ifdef WORKQUEUE_SUPPORT_PRIORITY
#define INIT_WORK(work, callbackFunc) do { \
INIT_LIST_HEAD(&((work)->entry)); \
(work)->func = (callbackFunc); \
(work)->data = (atomic_long_t)(0); \
(work)->work_status = 0; \
(work)->work_pri = OS_WORK_PRIORITY_DEFAULT; \
} while (0)
#else
#define INIT_WORK(work, callbackFunc) do { \
INIT_LIST_HEAD(&((work)->entry)); \
(work)->func = (callbackFunc); \
(work)->data = (atomic_long_t)(0); \
(work)->work_status = 0; \
} while (0)
#endif
从前面分析,有work需要处理时调用的就是这个(work)->func
;而这里看这个接口的值是WorkEntry
,怎么跟驱动侧输入的处理接口联系的呢?就是这个WorkEntry
实现的
static void WorkEntry(struct work_struct *work)
{
struct WorkWrapper *wrapper = NULL;
if (work != NULL) {
wrapper = (struct WorkWrapper *)work;
if (wrapper->workFunc != NULL) {
wrapper->workFunc(wrapper->para);
} else {
HDF_LOGE("%s routine null", __func__);
}
} else {
HDF_LOGE("%s work null", __func__);
}
}
从这里看就是调用的wrapper->workFunc
,也即HdfWorkInit
接口传入的处理接口。
HdfAddWork
接口
- 与前两个接口一样,此接口也在文件
drivers/adapter/khdf/liteos/osal/src/osal_workqueue.c
中实现
bool HdfAddWork(HdfWorkQueue *queue, HdfWork *work)
{
if (queue == NULL || queue->realWorkQueue == NULL || work == NULL || work->realWork == NULL) {
HDF_LOGE("%s invalid para", __func__);
return false;
}
return queue_work(queue->realWorkQueue, &((struct WorkWrapper *)work->realWork)->work.work);
}
- 与
create_singlethread_workqueue
类似,queue_work
接口也是一个宏,对应的值为linux_queue_work
#define queue_work(wq, work) \
linux_queue_work(wq, work)
-
linux_queue_work
最终调用的接口是InsertWork
,中间的调用如下图所示
linux_queue_work --(kernel/liteos_a/bsd/compat/linuxkpi/src/linux_workqueue.c)
QueueWorkOn
QueueWork
InsertWork
InsertWork
的实现如下:
STATIC VOID InsertWork(cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head, UINT32 *intSave)
{
#ifdef WORKQUEUE_SUPPORT_PRIORITY
WorkListAdd(&work->entry, head, work->work_pri);
#else
WorkListAddTail(&work->entry, head);
#endif
LOS_SpinUnlockRestore(&g_workqueueSpin, *intSave);
(VOID)LOS_EventWrite(&(cwq->wq->wq_event), 0x01);
LOS_SpinLockSave(&g_workqueueSpin, intSave);
}
从实现上看这里就是写了一个Event,还记得前面在init里面一个阻塞在读Event的吗?这里的写就代表了事件发生,然后就可以正常处理work了
总结
从liteos-a系统workqueue的实现源码分析,大概的流程如下:
- 系统对于一个workqueue都会创建一个系统Task
- 然后通过evnt机制实现这个系统Task与用户task之间的的同步
- 系统对于这个workqueue中的每一个work都调用上层传入的处理函数
网友评论