diff options
Diffstat (limited to 'src/queue.c')
-rw-r--r-- | src/queue.c | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/queue.c b/src/queue.c index 6bf9092..746750e 100644 --- a/src/queue.c +++ b/src/queue.c @@ -8,6 +8,8 @@ #include "queue.h" #include "util.h" +static void queue_push(struct queue *queue, struct job *job); + void *queue_thread_entry(void *queue_thread) { struct queue_thread *qt = queue_thread; @@ -17,23 +19,51 @@ void *queue_thread_entry(void *queue_thread) while (true) { ret = job_read(qt->stream, &job); if (ret == -EOF) { + qt->queue->state = Q_ITS_OVER; + sem_post(&qt->queue->sem); + ret = 0; break; } else if (ret < 0) { break; } - pthread_mutex_lock(&qt->queue->mutex); - CIRCLEQ_INSERT_TAIL(&qt->queue->jobs, job, clist); - pthread_mutex_unlock(&qt->queue->mutex); + queue_push(qt->queue, job); } pthread_exit(NULL); } +int queue_pop(struct queue *queue, struct job **job) +{ + struct job *j = CIRCLEQ_FIRST(&queue->jobs); + + if (CIRCLEQ_EMPTY(&queue->jobs)) { + print_err("%s", "Empty queue"); + return -EPERM; + } + + pthread_mutex_lock(&queue->mutex); + CIRCLEQ_REMOVE(&queue->jobs, j, clist); + pthread_mutex_unlock(&queue->mutex); + + *job = j; + return 0; +} + +static void queue_push(struct queue *queue, struct job *job) +{ + pthread_mutex_lock(&queue->mutex); + CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist); + pthread_mutex_unlock(&queue->mutex); + + sem_post(&queue->sem); +} + void queue_thread_free(struct queue_thread *queue_thread) { struct job *job; + int ret; if (queue_thread == NULL) return; @@ -43,6 +73,13 @@ void queue_thread_free(struct queue_thread *queue_thread) free(job); } + ret = sem_destroy(&queue_thread->queue->sem); + if (ret < 0) + print_err("%s", strerror(errno)); + ret = pthread_mutex_destroy(&queue_thread->queue->mutex); + if (ret < 0) + print_err("%s", strerror(errno)); + free(queue_thread->queue); fclose(queue_thread->stream); } @@ -57,7 +94,6 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream) print_err("%s", strerror(errno)); return -errno; } - qt->stream = stream; qt->queue = malloc(sizeof(*qt->queue)); @@ -66,6 +102,14 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream) ret = -errno; goto out_free; } + qt->queue->state = Q_SEM_WAIT; + ret = sem_init(&qt->queue->sem, 0, 0); + if (ret < 0) { + print_err("%s", strerror(errno)); + ret = -errno; + goto out_free; + } + CIRCLEQ_INIT(&qt->queue->jobs); pthread_mutex_init(&qt->queue->mutex, NULL); |