任务队列 有一个隐藏的问题
#include "./workqueue/uplat_zynq7000/cache2data/list.h"
#include <malloc.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
struct task_queue {
// TODO: Mutex from other thread
char taskName[32];
pthread_mutex_t mutex;
pthread_cond_t cond;
bool empty;
void *task_args;
void *deal_callback_args;
int32_t (*deal_callback)(void *, size_t);
int32_t (*task)(void *, int32_t (*deal_callback)(void *, size_t), void *);
struct LIST_NODE node;
};
static inline int32_t setupTaskQueue(struct task_queue *task) {
if (task == 0) {
return -2;
}
task->empty = true;
task->node.next = &task->node;
task->node.prev = &task->node;
pthread_mutex_init(&task->mutex, 0);
pthread_cond_init(&task->cond, 0);
return 0;
}
// NOTE: add mutex lock
static inline int32_t taskQueuePush(
struct task_queue *task, const char *taskName, void *task_args,
void *deal_callback_args,
int32_t (*task_cb)(void *, int32_t (*deal_callback)(void *, size_t),
void *),
int32_t (*deal_callback)(void *, size_t)) {
if (task == 0) {
return -1;
}
struct task_queue *new_task = calloc(sizeof(struct task_queue), 1);
if (new_task == 0) {
return -3;
}
task->empty = false;
bzero(new_task->taskName, sizeof(new_task->taskName));
strcpy(new_task->taskName, taskName);
new_task->task_args = task_args;
new_task->deal_callback_args = deal_callback_args;
new_task->task = task_cb;
list_add(&new_task->node, &task->node);
pthread_mutex_lock(&task->mutex);
pthread_cond_broadcast(&task->cond);
pthread_mutex_unlock(&task->mutex);
return 0;
}
static inline int32_t taskQueueDeal(struct task_queue *task) {
if (task == 0) {
return -1;
}
struct task_queue *cur;
struct LIST_NODE *node;
struct LIST_NODE *safe_node;
while (true) {
while (task->empty) {
pthread_mutex_lock(&task->mutex);
pthread_cond_wait(&task->cond, &task->mutex);
pthread_mutex_unlock(&task->mutex);
}
node = task->node.next;
while (node != &task->node) {
cur = container_of(node, struct task_queue, node);
if (cur != 0) {
if (cur->task != 0) {
cur->task(cur->task_args, cur->deal_callback,
cur->deal_callback_args);
}
}
list_del(node);
node = node->next;
free(cur);
cur = 0;
}
task->empty = true;
}
return 0;
}
static inline int32_t isTaskQueueEmpty(struct task_queue *task) {
if (task == 0) {
return -1;
}
return task->empty;
}
int32_t task_1(void *args, int32_t (*deal_callback)(void *, size_t),
void *_args) {
printf("%sn", __func__);
return 0;
}
int32_t task_2(void *args, int32_t (*deal_callback)(void *, size_t),
void *_args) {
printf("%sn", __func__);
return 0;
}
void *all(void *p) {
struct task_queue *q = p;
while (1) {
usleep(500);
taskQueuePush(q, "task 111", 0, 0, task_1, 0);
taskQueuePush(q, "task 112", 0, 0, task_2, 0);
}
}
int32_t threading(struct task_queue *q) {
pthread_t tid;
return pthread_create(&tid, 0, all, q);
}
int main(void) {
struct task_queue queue;
setupTaskQueue(&queue);
threading(&queue);
taskQueueDeal(&queue);
return 0;
}
生产的速度要小于,消化的速度
消化不良就溢出了
我链表的释放时没有问题的,忽略了生产消化的问题
增加处理上限,超过门限,等待处理完毕,才能继续加入队列
或者自动添加一个延时
#include "./workqueue/uplat_zynq7000/cache2data/list.h"
#include <malloc.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
struct task_queue {
// TODO: Mutex from other thread
clock_t start;
char taskName[32];
pthread_mutex_t mutex;
pthread_cond_t cond;
bool empty;
bool busy;
void *task_args;
void *deal_callback_args;
int32_t (*deal_callback)(void *, size_t);
int32_t (*task)(void *, int32_t (*deal_callback)(void *, size_t), void *);
clock_t end;
struct LIST_NODE node;
};
static inline int32_t setupTaskQueue(struct task_queue *task) {
if (task == 0) {
return -2;
}
task->empty = true;
task->start = 0;
task->end = 1;
task->node.next = &task->node;
task->node.prev = &task->node;
pthread_mutex_init(&task->mutex, 0);
pthread_cond_init(&task->cond, 0);
return 0;
}
// NOTE: add mutex lock
static inline int32_t taskQueuePush(
struct task_queue *task, const char *taskName, void *task_args,
void *deal_callback_args,
int32_t (*task_cb)(void *, int32_t (*deal_callback)(void *, size_t),
void *),
int32_t (*deal_callback)(void *, size_t)) {
if (task == 0) {
return -1;
}
usleep(abs((int)(task->end - task->start)) * 100);
struct task_queue *new_task = calloc(sizeof(struct task_queue), 1);
if (new_task == 0) {
return -3;
}
task->empty = false;
bzero(new_task->taskName, sizeof(new_task->taskName));
strcpy(new_task->taskName, taskName);
new_task->task_args = task_args;
new_task->deal_callback_args = deal_callback_args;
new_task->task = task_cb;
list_add(&new_task->node, &task->node);
pthread_mutex_lock(&task->mutex);
pthread_cond_broadcast(&task->cond);
pthread_mutex_unlock(&task->mutex);
return 0;
}
static inline int32_t taskQueueDeal(struct task_queue *task) {
if (task == 0) {
return -1;
}
struct task_queue *cur;
struct LIST_NODE *node;
struct LIST_NODE *safe_node;
while (true) {
while (task->empty) {
pthread_mutex_lock(&task->mutex);
pthread_cond_wait(&task->cond, &task->mutex);
pthread_mutex_unlock(&task->mutex);
}
node = task->node.next;
while (node != &task->node) {
cur = container_of(node, struct task_queue, node);
if (cur != 0) {
task->start = clock();
if (cur->task != 0) {
cur->task(cur->task_args, cur->deal_callback,
cur->deal_callback_args);
}
task->end = clock();
}
list_del(node);
node = node->next;
free(cur);
cur = 0;
}
task->empty = true;
}
return 0;
}
static inline int32_t isTaskQueueEmpty(struct task_queue *task) {
if (task == 0) {
return -1;
}
return task->empty;
}
int32_t task_1(void *args, int32_t (*deal_callback)(void *, size_t),
void *_args) {
printf("%sn", __func__);
return 0;
}
int32_t task_2(void *args, int32_t (*deal_callback)(void *, size_t),
void *_args) {
printf("%sn", __func__);
return 0;
}
void *all(void *p) {
struct task_queue *q = p;
while (1) {
taskQueuePush(q, "task 111", 0, 0, task_1, 0);
taskQueuePush(q, "task 112", 0, 0, task_2, 0);
}
}
int32_t threading(struct task_queue *q) {
pthread_t tid;
return pthread_create(&tid, 0, all, q);
}
int main(void) {
struct task_queue queue;
setupTaskQueue(&queue);
threading(&queue);
taskQueueDeal(&queue);
return 0;
}