queue.h:
#ifndef _QUEUE_H_
#define _QUEUE_H_ 1
#ifdef __cplusplus
extern "C" {
#endif
//创建一个指定长度的循环队列,返回队列句柄。
void* queue_create(int length);
//向循环队列放入一个消息,成功返回0,队列满返回-1。
int queue_putmsg(void *handle, void *msg);
//从循环队列取出一个消息。
void* queue_getmsg(void *handle);
//销毁这个循环队列,并通过回调函数返回滞留在队列里的消息。
//注意销毁前确保 queue_putmsg() 和 queue_getmsg() 都没有调用中。
void queue_destroy(void *handle, void(*callback)(void*msg));
#ifdef __cplusplus
}
#endif
#endif
queue.c:
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include "queue.h"
typedef struct {
uint32_t head; // 循环队列头虚拟位置
uint32_t tail; // 循环队列尾虚拟位置
uint32_t count; // 循环队列已有消息个数
uint32_t length; // 循环队列总长度
sem_t wait; // 信号量
pthread_mutex_t lock; // 互斥锁
void* array[0]; // 消息列表
} QUEUE_T;
void* queue_create(int length){
if(length <= 0) return NULL;
QUEUE_T *queueCtx = calloc(1, sizeof(QUEUE_T)+sizeof(void*)*(length+1));
if(queueCtx == NULL) return NULL;
queueCtx->head = 0;
queueCtx->tail = 0;
queueCtx->count = 0;
queueCtx->length = length;
sem_init(&(queueCtx->wait), 0, 0);
pthread_mutex_init(&(queueCtx->lock), NULL);
return queueCtx;
}
int queue_putmsg(void *handle, void *msg){
QUEUE_T *queueCtx = handle;
int queueFull = 1;
pthread_mutex_lock(&(queueCtx->lock));
if(queueCtx->count < queueCtx->length){
queueCtx->array[queueCtx->tail % queueCtx->length] = msg;
queueCtx->count += 1;
queueCtx->tail += 1;
queueFull = 0;
}
pthread_mutex_unlock(&(queueCtx->lock));
if(queueFull){
return -1;
}else{
sem_post(&(queueCtx->wait));
return 0;
}
}
void* queue_getmsg(void *handle){
QUEUE_T *queueCtx = handle;
void *msg = NULL;
sem_wait(&(queueCtx->wait));
pthread_mutex_lock(&(queueCtx->lock));
if(queueCtx->count > 0){
msg = queueCtx->array[queueCtx->head % queueCtx->length];
queueCtx->array[queueCtx->head % queueCtx->length] = NULL;
queueCtx->count -= 1;
queueCtx->head += 1;
}
if(queueCtx->count == 0){
queueCtx->head = 0;
queueCtx->tail = 0;
}
pthread_mutex_unlock(&(queueCtx->lock));
return msg;
}
void queue_destroy(void *handle, void(*callback)(void*msg)){
QUEUE_T *queueCtx = handle;
if(callback){
int end = 0;
void *msg = NULL;
while(end == 0){
pthread_mutex_lock(&(queueCtx->lock));
if(queueCtx->count > 0){
msg = queueCtx->array[queueCtx->head % queueCtx->length];
queueCtx->count -= 1;
queueCtx->head += 1;
}else{
end = 1;
}
pthread_mutex_unlock(&(queueCtx->lock));
if(msg){
callback(msg);
msg = NULL;
}
}
}
sem_destroy(&(queueCtx->wait));
pthread_mutex_destroy(&(queueCtx->lock));
free(queueCtx);
}
测试程序test.c:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "queue.h"
int main(int argc, char* argv[]){
void *handle = queue_create(32);
queue_putmsg(handle, "0");
queue_putmsg(handle, "11");
queue_putmsg(handle, "222");
queue_putmsg(handle, "333");
queue_putmsg(handle, "4444");
queue_putmsg(handle, "55555");
queue_putmsg(handle, "666666");
queue_putmsg(handle, "7777777");
queue_putmsg(handle, "88888888");
queue_putmsg(handle, "999999999");
queue_putmsg(handle, "aaaaaaaaaa");
queue_putmsg(handle, "bbbbbbbbbbb");
queue_putmsg(handle, "cccccccccccc");
queue_putmsg(handle, "ddddddddddddd");
queue_putmsg(handle, "eeeeeeeeeeeeee");
queue_putmsg(handle, "exit");
char *str;
for(;;){
str = queue_getmsg(handle);
printf("queue_getmsg() = \"%s\"\n", str);
if(strcmp(str, "exit") == 0){
break;
}
}
queue_destroy(handle, NULL);
return 0;
}
网友评论