From 49b1faa9c128b9beb8f517bed5cccfc5cb3b0ef0 Mon Sep 17 00:00:00 2001 From: sinanmohd Date: Fri, 21 Jun 2024 09:46:41 +0530 Subject: queue: merge matching derivations in queue --- include/jobs.h | 3 ++ include/queue.h | 4 +- meson.build | 1 + src/build.c | 2 +- src/jobs.c | 4 +- src/queue.c | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- src/util.c | 1 + 7 files changed, 163 insertions(+), 12 deletions(-) diff --git a/include/jobs.h b/include/jobs.h index 85d606b..623eaea 100644 --- a/include/jobs.h +++ b/include/jobs.h @@ -1,4 +1,5 @@ #include +#include #include #ifndef JOBS_H @@ -9,6 +10,7 @@ struct output { struct job { char *name, *drv_path; + bool transitive; size_t outputs_size, outputs_filled; struct output **outputs; @@ -33,6 +35,7 @@ int job_read(FILE *stream, struct job **jobs); int jobs_init(FILE **stream); void job_free(struct job *j); +int job_parents_list_insert(struct job *job, struct job *parent); #define JOBS_H #endif diff --git a/include/queue.h b/include/queue.h index 6608ec4..c454339 100644 --- a/include/queue.h +++ b/include/queue.h @@ -1,4 +1,5 @@ #include +#include #include #include @@ -13,6 +14,7 @@ typedef enum { struct queue { struct job_clist jobs; + struct hsearch_data *htab; sem_t sem; queue_state_t state; pthread_mutex_t mutex; @@ -27,7 +29,7 @@ struct queue_thread { int queue_thread_new(struct queue_thread **queue_thread, FILE *stream); void queue_thread_free(struct queue_thread *queue_thread); void *queue_thread_entry(void *queue_thread); -int queue_pop(struct queue *queue, struct job **job); +int queue_pop(struct queue *queue, struct job **job, struct hsearch_data *htab); #define QUEUE_H #endif diff --git a/meson.build b/meson.build index cac287a..38f78d7 100644 --- a/meson.build +++ b/meson.build @@ -12,6 +12,7 @@ project( add_project_arguments( [ '-D_POSIX_C_SOURCE=200809L', + '-D_GNU_SOURCE', '-Wvla', ], language: 'c', diff --git a/src/build.c b/src/build.c index 98fc9d7..fa76c13 100644 --- a/src/build.c +++ b/src/build.c @@ -44,7 +44,7 @@ static int build(struct queue *queue) struct job *job; int ret = 0; - ret = queue_pop(queue, &job); + ret = queue_pop(queue, &job, queue->htab); if (ret < 0) return ret; diff --git a/src/jobs.c b/src/jobs.c index 2bf871a..28273dd 100644 --- a/src/jobs.c +++ b/src/jobs.c @@ -16,7 +16,6 @@ static int job_read_inputdrvs(struct job *job, cJSON *input_drvs); static int job_read_outputs(struct job *job, cJSON *outputs); static int job_deps_list_insert(struct job *job, struct job *dep); static int job_output_list_insert(struct job *job, struct output *output); -static int job_parents_list_insert(struct job *job, struct job *parent); static void job_deps_list_rm(struct job *job, struct job *dep); static void output_free(struct output *output) @@ -90,7 +89,7 @@ static int job_deps_list_insert(struct job *job, struct job *dep) return 0; } -static int job_parents_list_insert(struct job *job, struct job *parent) +int job_parents_list_insert(struct job *job, struct job *parent) { size_t newsize; void *ret; @@ -310,6 +309,7 @@ static int job_new(struct job **j, char *name, char *drv_path, print_err("%s", strerror(errno)); return -errno; } + job->transitive = true; job->outputs_size = 0; job->outputs_filled = 0; diff --git a/src/queue.c b/src/queue.c index 191c3f8..1fb5ae7 100644 --- a/src/queue.c +++ b/src/queue.c @@ -8,7 +8,11 @@ #include "queue.h" #include "util.h" -static void queue_push(struct queue *queue, struct job *job); +#define MAX_NIX_PKG_COUNT 200000 + +static int queue_push(struct queue *queue, struct job *job); +static int queue_htab_job_merge(struct job **job, struct hsearch_data *htab); +static int queue_htab_parent_merge(struct job *to, struct job *from); void *queue_thread_entry(void *queue_thread) { @@ -37,8 +41,11 @@ void *queue_thread_entry(void *queue_thread) pthread_exit(NULL); } -int queue_pop(struct queue *queue, struct job **job) +int queue_pop(struct queue *queue, struct job **job, struct hsearch_data *htab) { + ENTRY e, *ep; + int ret; + struct job *j = CIRCLEQ_FIRST(&queue->jobs); if (CIRCLEQ_EMPTY(&queue->jobs)) { @@ -47,20 +54,133 @@ int queue_pop(struct queue *queue, struct job **job) } pthread_mutex_lock(&queue->mutex); + CIRCLEQ_REMOVE(&queue->jobs, j, clist); + if (j->parents_filled <= 0) { + e.key = j->drv_path; + ret = hsearch_r(e, FIND, &ep, htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + } + else + ep->data = NULL; + } + pthread_mutex_unlock(&queue->mutex); *job = j; return 0; } -static void queue_push(struct queue *queue, struct job *job) +static int queue_htab_parent_merge(struct job *to, struct job *from) +{ + int ret; + + /* the output from nix-eval-jobs("from") can only have maximum one + * parent */ + for (size_t i = 0; i < to->parents_filled; i++) { + if (strcmp(to->parents[i]->drv_path, + from->parents[0]->drv_path)) + continue; + + /* steal name from "from" */ + if (to->parents[i]->name == NULL) { + to->parents[i]->name = + from->parents[0]->name; + from->parents[0]->name = NULL; + } + + return 0; + } + + ret = job_parents_list_insert(to, from->parents[0]); + if (ret < 0) + return ret; + from->parents_filled = 0; + + return 0; +} + +/* this merge functions are closely tied to the output characteristics of + * nix-eval-jobs, that is + * - only two level of nodes (root and childrens or dependencies) + * - only childrens or dependencies have parent node + * - only root node have dependencies + */ +static int queue_htab_job_merge(struct job **job, struct hsearch_data *htab) +{ + struct job *jtab; + ENTRY e, *ep; + int ret; + + e.key = (*job)->drv_path; + ret = hsearch_r(e, FIND, &ep, htab); + if (ret == 0) { + if (errno != ESRCH) { + print_err("%s", strerror(errno)); + return -errno; + } + + e.data = *job; + ret = hsearch_r(e, ENTER, &ep, htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + return -errno; + } + + for (size_t i = 0; i < (*job)->deps_filled; i++) { + ret = queue_htab_job_merge(&(*job)->deps[i], htab); + if (ret < 0) + return ret; + } + + return 0; + } + + /* if it's already inside htab, it's deps should also be in htab, hence + * not merging deps */ + jtab = ep->data; + if (jtab->name == NULL) { + /* steal name from new job struct */ + jtab->name = (*job)->name; + (*job)->name = NULL; + } + + /* only recursive calls with childrens or dependencies can enter this + * for a recursive call to happen the parent was just entered into htab + * so, update the parent's deps reference to point to the node in htab + */ + if ((*job)->parents_filled > 0) { + ret = queue_htab_parent_merge(jtab, *job); + if (ret < 0) + return ret; + } + + job_free(*job); + *job = jtab; + return 0; +} + +static int queue_push(struct queue *queue, struct job *job) { + int ret; + pthread_mutex_lock(&queue->mutex); - CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist); - pthread_mutex_unlock(&queue->mutex); + ret = queue_htab_job_merge(&job, queue->htab); + if (ret < 0) { + pthread_mutex_unlock(&queue->mutex); + return ret; + } + /* no duplicate entries in queue */ + if (job->transitive) { + job->transitive = false; + CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist); + } + pthread_mutex_unlock(&queue->mutex); sem_post(&queue->sem); + + return 0; } void queue_thread_free(struct queue_thread *queue_thread) @@ -74,6 +194,8 @@ void queue_thread_free(struct queue_thread *queue_thread) CIRCLEQ_FOREACH_FREE(cur, next, &queue_thread->queue->jobs, clist, job_free); + hdestroy_r(queue_thread->queue->htab); + free(queue_thread->queue->htab); ret = sem_destroy(&queue_thread->queue->sem); if (ret < 0) print_err("%s", strerror(errno)); @@ -102,20 +224,42 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream) if (qt->queue == NULL) { print_err("%s", strerror(errno)); ret = -errno; - goto out_free; + goto out_free_qt; } qt->queue->state = Q_SEM_WAIT; ret = sem_init(&qt->queue->sem, 0, 0); if (ret < 0) { print_err("%s", strerror(errno)); ret = -errno; - goto out_free; + goto out_free_queue; + } + + qt->queue->htab = malloc(sizeof(*qt->queue->htab)); + if (qt->queue->htab == NULL) { + print_err("%s", strerror(errno)); + ret = -errno; + goto out_free_sem; + } + ret = hcreate_r(MAX_NIX_PKG_COUNT, qt->queue->htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + ret = -errno; + goto out_free_htab; } CIRCLEQ_INIT(&qt->queue->jobs); pthread_mutex_init(&qt->queue->mutex, NULL); -out_free: +out_free_htab: + if (ret < 0) + free(qt->queue->htab); +out_free_sem: + if (ret < 0) + sem_destroy(&qt->queue->sem); +out_free_queue: + if (ret < 0) + free(qt->queue); +out_free_qt: if (ret < 0) free(qt); else diff --git a/src/util.c b/src/util.c index e84a72a..a2ae788 100644 --- a/src/util.c +++ b/src/util.c @@ -14,6 +14,7 @@ int json_streaming_read(FILE *stream, cJSON **json) int ret; char *line = NULL; + errno = 0; ret = getline(&line, &n, stream); if (ret < 0) { if (errno != 0) { -- cgit v1.2.3