diff options
author | sinanmohd <sinan@sinanmohd.com> | 2024-06-21 09:46:41 +0530 |
---|---|---|
committer | sinanmohd <sinan@sinanmohd.com> | 2024-06-21 18:09:42 +0530 |
commit | 49b1faa9c128b9beb8f517bed5cccfc5cb3b0ef0 (patch) | |
tree | a7692f9ce4834172aa4c5262712841cd8aaa80a0 /src | |
parent | 36be4b06236ef60234ac8d6fe3962208c880c163 (diff) |
queue: merge matching derivations in queue
Diffstat (limited to 'src')
-rw-r--r-- | src/build.c | 2 | ||||
-rw-r--r-- | src/jobs.c | 4 | ||||
-rw-r--r-- | src/queue.c | 160 | ||||
-rw-r--r-- | src/util.c | 1 |
4 files changed, 156 insertions, 11 deletions
diff --git a/src/build.c b/src/build.c index 98fc9d7..fa76c13 100644 --- a/src/build.c +++ b/src/build.c @@ -44,7 +44,7 @@ static int build(struct queue *queue) struct job *job; int ret = 0; - ret = queue_pop(queue, &job); + ret = queue_pop(queue, &job, queue->htab); if (ret < 0) return ret; @@ -16,7 +16,6 @@ static int job_read_inputdrvs(struct job *job, cJSON *input_drvs); static int job_read_outputs(struct job *job, cJSON *outputs); static int job_deps_list_insert(struct job *job, struct job *dep); static int job_output_list_insert(struct job *job, struct output *output); -static int job_parents_list_insert(struct job *job, struct job *parent); static void job_deps_list_rm(struct job *job, struct job *dep); static void output_free(struct output *output) @@ -90,7 +89,7 @@ static int job_deps_list_insert(struct job *job, struct job *dep) return 0; } -static int job_parents_list_insert(struct job *job, struct job *parent) +int job_parents_list_insert(struct job *job, struct job *parent) { size_t newsize; void *ret; @@ -310,6 +309,7 @@ static int job_new(struct job **j, char *name, char *drv_path, print_err("%s", strerror(errno)); return -errno; } + job->transitive = true; job->outputs_size = 0; job->outputs_filled = 0; diff --git a/src/queue.c b/src/queue.c index 191c3f8..1fb5ae7 100644 --- a/src/queue.c +++ b/src/queue.c @@ -8,7 +8,11 @@ #include "queue.h" #include "util.h" -static void queue_push(struct queue *queue, struct job *job); +#define MAX_NIX_PKG_COUNT 200000 + +static int queue_push(struct queue *queue, struct job *job); +static int queue_htab_job_merge(struct job **job, struct hsearch_data *htab); +static int queue_htab_parent_merge(struct job *to, struct job *from); void *queue_thread_entry(void *queue_thread) { @@ -37,8 +41,11 @@ void *queue_thread_entry(void *queue_thread) pthread_exit(NULL); } -int queue_pop(struct queue *queue, struct job **job) +int queue_pop(struct queue *queue, struct job **job, struct hsearch_data *htab) { + ENTRY e, *ep; + int ret; + struct job *j = CIRCLEQ_FIRST(&queue->jobs); if (CIRCLEQ_EMPTY(&queue->jobs)) { @@ -47,20 +54,133 @@ int queue_pop(struct queue *queue, struct job **job) } pthread_mutex_lock(&queue->mutex); + CIRCLEQ_REMOVE(&queue->jobs, j, clist); + if (j->parents_filled <= 0) { + e.key = j->drv_path; + ret = hsearch_r(e, FIND, &ep, htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + } + else + ep->data = NULL; + } + pthread_mutex_unlock(&queue->mutex); *job = j; return 0; } -static void queue_push(struct queue *queue, struct job *job) +static int queue_htab_parent_merge(struct job *to, struct job *from) +{ + int ret; + + /* the output from nix-eval-jobs("from") can only have maximum one + * parent */ + for (size_t i = 0; i < to->parents_filled; i++) { + if (strcmp(to->parents[i]->drv_path, + from->parents[0]->drv_path)) + continue; + + /* steal name from "from" */ + if (to->parents[i]->name == NULL) { + to->parents[i]->name = + from->parents[0]->name; + from->parents[0]->name = NULL; + } + + return 0; + } + + ret = job_parents_list_insert(to, from->parents[0]); + if (ret < 0) + return ret; + from->parents_filled = 0; + + return 0; +} + +/* this merge functions are closely tied to the output characteristics of + * nix-eval-jobs, that is + * - only two level of nodes (root and childrens or dependencies) + * - only childrens or dependencies have parent node + * - only root node have dependencies + */ +static int queue_htab_job_merge(struct job **job, struct hsearch_data *htab) +{ + struct job *jtab; + ENTRY e, *ep; + int ret; + + e.key = (*job)->drv_path; + ret = hsearch_r(e, FIND, &ep, htab); + if (ret == 0) { + if (errno != ESRCH) { + print_err("%s", strerror(errno)); + return -errno; + } + + e.data = *job; + ret = hsearch_r(e, ENTER, &ep, htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + return -errno; + } + + for (size_t i = 0; i < (*job)->deps_filled; i++) { + ret = queue_htab_job_merge(&(*job)->deps[i], htab); + if (ret < 0) + return ret; + } + + return 0; + } + + /* if it's already inside htab, it's deps should also be in htab, hence + * not merging deps */ + jtab = ep->data; + if (jtab->name == NULL) { + /* steal name from new job struct */ + jtab->name = (*job)->name; + (*job)->name = NULL; + } + + /* only recursive calls with childrens or dependencies can enter this + * for a recursive call to happen the parent was just entered into htab + * so, update the parent's deps reference to point to the node in htab + */ + if ((*job)->parents_filled > 0) { + ret = queue_htab_parent_merge(jtab, *job); + if (ret < 0) + return ret; + } + + job_free(*job); + *job = jtab; + return 0; +} + +static int queue_push(struct queue *queue, struct job *job) { + int ret; + pthread_mutex_lock(&queue->mutex); - CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist); - pthread_mutex_unlock(&queue->mutex); + ret = queue_htab_job_merge(&job, queue->htab); + if (ret < 0) { + pthread_mutex_unlock(&queue->mutex); + return ret; + } + /* no duplicate entries in queue */ + if (job->transitive) { + job->transitive = false; + CIRCLEQ_INSERT_TAIL(&queue->jobs, job, clist); + } + pthread_mutex_unlock(&queue->mutex); sem_post(&queue->sem); + + return 0; } void queue_thread_free(struct queue_thread *queue_thread) @@ -74,6 +194,8 @@ void queue_thread_free(struct queue_thread *queue_thread) CIRCLEQ_FOREACH_FREE(cur, next, &queue_thread->queue->jobs, clist, job_free); + hdestroy_r(queue_thread->queue->htab); + free(queue_thread->queue->htab); ret = sem_destroy(&queue_thread->queue->sem); if (ret < 0) print_err("%s", strerror(errno)); @@ -102,20 +224,42 @@ int queue_thread_new(struct queue_thread **queue_thread, FILE *stream) if (qt->queue == NULL) { print_err("%s", strerror(errno)); ret = -errno; - goto out_free; + goto out_free_qt; } qt->queue->state = Q_SEM_WAIT; ret = sem_init(&qt->queue->sem, 0, 0); if (ret < 0) { print_err("%s", strerror(errno)); ret = -errno; - goto out_free; + goto out_free_queue; + } + + qt->queue->htab = malloc(sizeof(*qt->queue->htab)); + if (qt->queue->htab == NULL) { + print_err("%s", strerror(errno)); + ret = -errno; + goto out_free_sem; + } + ret = hcreate_r(MAX_NIX_PKG_COUNT, qt->queue->htab); + if (ret == 0) { + print_err("%s", strerror(errno)); + ret = -errno; + goto out_free_htab; } CIRCLEQ_INIT(&qt->queue->jobs); pthread_mutex_init(&qt->queue->mutex, NULL); -out_free: +out_free_htab: + if (ret < 0) + free(qt->queue->htab); +out_free_sem: + if (ret < 0) + sem_destroy(&qt->queue->sem); +out_free_queue: + if (ret < 0) + free(qt->queue); +out_free_qt: if (ret < 0) free(qt); else @@ -14,6 +14,7 @@ int json_streaming_read(FILE *stream, cJSON **json) int ret; char *line = NULL; + errno = 0; ret = getline(&line, &n, stream); if (ret < 0) { if (errno != 0) { |