From fe51caffb1d643771ec38ca9e107d83ebc715c7b Mon Sep 17 00:00:00 2001 From: phk Date: Sat, 16 Sep 2006 19:54:34 +0000 Subject: [PATCH] Make it possible to have multiple worker pools. The acceptor selects the pool based on filedescriptor modulus number of pools. This is an attempt to reduce lock contention. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1031 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/cache_pool.c | 173 +++++++++++++----------- varnish-cache/bin/varnishd/heritage.h | 1 + varnish-cache/bin/varnishd/mgt_param.c | 15 ++ 3 files changed, 107 insertions(+), 82 deletions(-) diff --git a/varnish-cache/bin/varnishd/cache_pool.c b/varnish-cache/bin/varnishd/cache_pool.c index eb1b200a..658c7d03 100644 --- a/varnish-cache/bin/varnishd/cache_pool.c +++ b/varnish-cache/bin/varnishd/cache_pool.c @@ -29,16 +29,21 @@ #include "cli_priv.h" #include "cache.h" -static MTX wrk_mtx; +TAILQ_HEAD(workerhead, worker); /* Number of work requests queued in excess of worker threads available */ -static unsigned wrk_overflow; -TAILQ_HEAD(workerhead, worker); +struct wq { + MTX mtx; + struct workerhead idle; + TAILQ_HEAD(, workreq) req; + unsigned overflow; +}; -static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle); -static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy); -static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead); +static MTX tmtx; + +static struct wq **wq; +static unsigned nwq; /*-------------------------------------------------------------------- * Write data to fd @@ -169,9 +174,10 @@ static void * wrk_thread(void *priv) { struct worker *w, ww; + struct wq *qp; char c; - (void)priv; + qp = priv; w = &ww; memset(w, 0, sizeof *w); w->magic = WORKER_MAGIC; @@ -179,40 +185,38 @@ wrk_thread(void *priv) AZ(pipe(w->pipe)); VSL(SLT_WorkThread, 0, "%p start", w); - LOCK(&wrk_mtx); + LOCK(&qp->mtx); VSL_stats->n_wrk_create++; - TAILQ_INSERT_HEAD(&wrk_busy, w, list); VSL_stats->n_wrk_busy++; while (1) { CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); /* Process overflow requests, if any */ - if (wrk_overflow > 0) { - wrk_overflow--; - w->wrq = TAILQ_FIRST(&wrk_reqhead); + if (qp->overflow > 0) { + qp->overflow--; + w->wrq = TAILQ_FIRST(&qp->req); AN(w->wrq); - TAILQ_REMOVE(&wrk_reqhead, w->wrq, list); + TAILQ_REMOVE(&qp->req, w->wrq, list); VSL_stats->n_wrk_queue--; - UNLOCK(&wrk_mtx); + UNLOCK(&qp->mtx); wrk_do_one(w); - LOCK(&wrk_mtx); + LOCK(&qp->mtx); continue; } - TAILQ_REMOVE(&wrk_busy, w, list); - TAILQ_INSERT_HEAD(&wrk_idle, w, list); + TAILQ_INSERT_HEAD(&qp->idle, w, list); assert(w->idle != 0); VSL_stats->n_wrk_busy--; - UNLOCK(&wrk_mtx); + UNLOCK(&qp->mtx); assert(1 == read(w->pipe[0], &c, 1)); if (w->idle == 0) break; wrk_do_one(w); - LOCK(&wrk_mtx); + LOCK(&qp->mtx); } - LOCK(&wrk_mtx); + LOCK(&tmtx); VSL_stats->n_wrk--; - UNLOCK(&wrk_mtx); + UNLOCK(&tmtx); VSL(SLT_WorkThread, 0, "%p end", w); close(w->pipe[0]); close(w->pipe[1]); @@ -226,39 +230,42 @@ WRK_QueueSession(struct sess *sp) { struct worker *w; pthread_t tp; + struct wq *qp; sp->workreq.sess = sp; + qp = wq[sp->fd % nwq]; - LOCK(&wrk_mtx); + LOCK(&qp->mtx); /* If there are idle threads, we tickle the first one into action */ - w = TAILQ_FIRST(&wrk_idle); + w = TAILQ_FIRST(&qp->idle); if (w != NULL) { - TAILQ_REMOVE(&wrk_idle, w, list); - TAILQ_INSERT_TAIL(&wrk_busy, w, list); + TAILQ_REMOVE(&qp->idle, w, list); VSL_stats->n_wrk_busy++; - UNLOCK(&wrk_mtx); + UNLOCK(&qp->mtx); w->wrq = &sp->workreq; assert(1 == write(w->pipe[1], w, 1)); return; } - TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list); + TAILQ_INSERT_TAIL(&qp->req, &sp->workreq, list); VSL_stats->n_wrk_queue++; - wrk_overflow++; + qp->overflow++; + UNLOCK(&qp->mtx); + LOCK(&tmtx); /* Can we create more threads ? */ if (VSL_stats->n_wrk >= params->wthread_max) { VSL_stats->n_wrk_max++; - UNLOCK(&wrk_mtx); + UNLOCK(&tmtx); return; } /* Try to create a thread */ VSL_stats->n_wrk++; - UNLOCK(&wrk_mtx); + UNLOCK(&tmtx); - if (!pthread_create(&tp, NULL, wrk_thread, NULL)) { + if (!pthread_create(&tp, NULL, wrk_thread, qp)) { AZ(pthread_detach(tp)); return; } @@ -266,11 +273,40 @@ WRK_QueueSession(struct sess *sp) VSL(SLT_Debug, 0, "Create worker thread failed %d %s", errno, strerror(errno)); + LOCK(&tmtx); /* Register overflow */ - LOCK(&wrk_mtx); VSL_stats->n_wrk--; VSL_stats->n_wrk_failed++; - UNLOCK(&wrk_mtx); + UNLOCK(&tmtx); +} + +/*--------------------------------------------------------------------*/ + +static void +wrk_addpools(unsigned t) +{ + struct wq **pwq, **owq; + unsigned u; + + if (t <= nwq) + return; + + pwq = calloc(sizeof *pwq, params->wthread_pools); + if (pwq == NULL) + return; + if (wq != NULL) + memcpy(pwq, wq, sizeof *pwq * nwq); + owq = wq; + wq = pwq; + for (u = nwq; u < t; u++) { + wq[u] = calloc(sizeof *wq[u], 1); + XXXAN(wq[u]); + MTX_INIT(&wq[u]->mtx); + TAILQ_INIT(&wq[u]->idle); + TAILQ_INIT(&wq[u]->req); + } + free(owq); + nwq = t; } /*--------------------------------------------------------------------*/ @@ -280,26 +316,32 @@ wrk_reaperthread(void *priv) { time_t now; struct worker *w; + struct wq *qp; + unsigned u; (void)priv; while (1) { + wrk_addpools(params->wthread_pools); sleep(1); if (VSL_stats->n_wrk <= params->wthread_min) continue; now = time(NULL); - LOCK(&wrk_mtx); - w = TAILQ_LAST(&wrk_idle, workerhead); - if (w != NULL && - (w->idle + params->wthread_timeout < now || - VSL_stats->n_wrk <= params->wthread_max)) - TAILQ_REMOVE(&wrk_idle, w, list); - else - w = NULL; - UNLOCK(&wrk_mtx); - if (w == NULL) - continue; - w->idle = 0; - assert(1 == write(w->pipe[1], w, 1)); + for (u = 0; u < nwq; u++) { + qp = wq[u]; + LOCK(&qp->mtx); + w = TAILQ_LAST(&qp->idle, workerhead); + if (w != NULL && + (w->idle + params->wthread_timeout < now || + VSL_stats->n_wrk <= params->wthread_max)) + TAILQ_REMOVE(&qp->idle, w, list); + else + w = NULL; + UNLOCK(&qp->mtx); + if (w == NULL) + continue; + w->idle = 0; + assert(1 == write(w->pipe[1], w, 1)); + } } INCOMPL(); } @@ -310,53 +352,20 @@ void WRK_Init(void) { pthread_t tp; - int i; - - MTX_INIT(&wrk_mtx); + wrk_addpools(params->wthread_pools); + MTX_INIT(&tmtx); AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL)); AZ(pthread_detach(tp)); - - VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min); - for (i = 0; i < params->wthread_min; i++) { - VSL_stats->n_wrk++; - AZ(pthread_create(&tp, NULL, wrk_thread, NULL)); - AZ(pthread_detach(tp)); - } } - /*--------------------------------------------------------------------*/ void cli_func_dump_pool(struct cli *cli, char **av, void *priv) { - unsigned u; - struct sess *s; - time_t t; + (void)cli; (void)av; (void)priv; - struct worker *w; - LOCK(&wrk_mtx); - t = time(NULL); - TAILQ_FOREACH(w, &wrk_busy, list) { - cli_out(cli, "\n"); - cli_out(cli, "W %p", w); - if (w->wrq == NULL) - continue; - s = w->wrq->sess; - if (s == NULL) - continue; - cli_out(cli, "sess %p fd %d xid %u step %d handling %d age %d", - s, s->fd, s->xid, s->step, s->handling, - t - s->t_req.tv_sec); - } - cli_out(cli, "\n"); - - u = 0; - TAILQ_FOREACH(w, &wrk_idle, list) - u++; - cli_out(cli, "%u idle workers\n", u); - UNLOCK(&wrk_mtx); } diff --git a/varnish-cache/bin/varnishd/heritage.h b/varnish-cache/bin/varnishd/heritage.h index 1089b14f..a3094713 100644 --- a/varnish-cache/bin/varnishd/heritage.h +++ b/varnish-cache/bin/varnishd/heritage.h @@ -36,6 +36,7 @@ struct params { unsigned wthread_min; unsigned wthread_max; unsigned wthread_timeout; + unsigned wthread_pools; /* Memory allocation hints */ unsigned mem_workspace; diff --git a/varnish-cache/bin/varnishd/mgt_param.c b/varnish-cache/bin/varnishd/mgt_param.c index 1fb1004d..d81e33a5 100644 --- a/varnish-cache/bin/varnishd/mgt_param.c +++ b/varnish-cache/bin/varnishd/mgt_param.c @@ -115,6 +115,18 @@ tweak_default_ttl(struct cli *cli, struct parspec *par, const char *arg) tweak_generic_uint(cli, ¶ms->default_ttl, arg, 0, UINT_MAX); } +/*--------------------------------------------------------------------*/ + +static void +tweak_thread_pools(struct cli *cli, struct parspec *par, const char *arg) +{ + + (void)par; + tweak_generic_uint(cli, ¶ms->wthread_pools, arg, + 1, UINT_MAX); +} + + /*--------------------------------------------------------------------*/ static void @@ -296,6 +308,9 @@ static struct parspec parspec[] = { "To force an immediate effect at the expense of a total " "flush of the cache use \"url.purge .\"", "120", "seconds" }, + { "thread_pools", tweak_thread_pools, + "Number of thread pools.\n", + "1", "pools" }, { "thread_pool_max", tweak_thread_pool_max, "The maximum number of threads in the worker pool.\n" "-1 is unlimited.\n" -- 2.39.5