summaryrefslogtreecommitdiff
path: root/src/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/queue.c')
-rw-r--r--src/queue.c52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/queue.c b/src/queue.c
index 6bf9092..746750e 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -8,6 +8,8 @@
#include "queue.h"
#include "util.h"
+static void queue_push(struct queue *queue, struct job *job);
+
void *queue_thread_entry(void *queue_thread)
{
struct queue_thread *qt = queue_thread;
@@ -17,23 +19,51 @@ void *queue_thread_entry(void *queue_thread)
while (true) {
ret = job_read(qt->stream, &job);
if (ret == -EOF) {
+ qt->queue->state = Q_ITS_OVER;
+ sem_post(&qt->queue->sem);
+
ret = 0;
break;
} else if (ret < 0) {
break;
}
- pthread_mutex_lock(&qt->queue->mutex);
- CIRCLEQ_INSERT_TAIL(&qt->queue->jobs, job, clist);
- pthread_mutex_unlock(&qt->queue->mutex);
+ queue_push(qt->queue, job);
}
pthread_exit(NULL);
}
+int queue_pop(struct queue *queue, struct job **job)
+{
+ struct job *j = CIRCLEQ_FIRST(&queue->jobs);
+
+ if (CIRCLEQ_EMPTY(&queue->jobs)) {
+ print_err("%s", "Empty queue");
+ return -EPERM;
+ }
+
+ pthread_mutex_lock(&queue->mutex);
+ CIRCLEQ_REMOVE(&queue->jobs, j, clist);
+ pthread_mutex_unlock(&queue->mutex);
+
+ *job = j;
+ return 0;
+}
+
+static void queue_push(struct queue *queue, struct job *job)
+{
+ pthread_mutex_lock(&queue->mutex);
+ CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist);
+ pthread_mutex_unlock(&queue->mutex);
+
+ sem_post(&queue->sem);
+}
+
void queue_thread_free(struct queue_thread *queue_thread)
{
struct job *job;
+ int ret;
if (queue_thread == NULL)
return;
@@ -43,6 +73,13 @@ void queue_thread_free(struct queue_thread *queue_thread)
free(job);
}
+ ret = sem_destroy(&queue_thread->queue->sem);
+ if (ret < 0)
+ print_err("%s", strerror(errno));
+ ret = pthread_mutex_destroy(&queue_thread->queue->mutex);
+ if (ret < 0)
+ print_err("%s", strerror(errno));
+
free(queue_thread->queue);
fclose(queue_thread->stream);
}
@@ -57,7 +94,6 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream)
print_err("%s", strerror(errno));
return -errno;
}
-
qt->stream = stream;
qt->queue = malloc(sizeof(*qt->queue));
@@ -66,6 +102,14 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream)
ret = -errno;
goto out_free;
}
+ qt->queue->state = Q_SEM_WAIT;
+ ret = sem_init(&qt->queue->sem, 0, 0);
+ if (ret < 0) {
+ print_err("%s", strerror(errno));
+ ret = -errno;
+ goto out_free;
+ }
+
CIRCLEQ_INIT(&qt->queue->jobs);
pthread_mutex_init(&qt->queue->mutex, NULL);