From 66f21cbea7177ca0d3e169a553b370eccbdfdc82 Mon Sep 17 00:00:00 2001 From: phk Date: Sat, 7 Jun 2008 21:19:58 +0000 Subject: [PATCH] Redo the way we manage the thread pool(s). This is necessary to generalize the thread pools to do other tasks for us in the future. Please read the descriptions of the new and changed thread_pool* parameters carefully before you tweak them, some of them have slightly different meanings now. The high-level view of this is that we now have dedicated a thread to adding threads to the pools, in addition to the thread we already had that killed idle threads from the pools. The difference is that this new thread is quite a bit more reluctant to add threads than the previous code, which would add a thread any time it could get away with it. Hopefully that reduces the tendency for thread-pile-ups. This commit also reduces the cross-pool locking contention by making the overflow queue a per pool item. The down side of that is that more of the stats counters have become unlocked and thus can get out of sync if two threads race when updating them. This is an XXX item. Thanks to Anders Nordby for testing this patch. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@2653 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/cache.h | 13 +- varnish-cache/bin/varnishd/cache_pool.c | 357 +++++++++++++++--------- varnish-cache/bin/varnishd/heritage.h | 6 +- varnish-cache/bin/varnishd/mgt_param.c | 91 +++++- 4 files changed, 317 insertions(+), 150 deletions(-) diff --git a/varnish-cache/bin/varnishd/cache.h b/varnish-cache/bin/varnishd/cache.h index e52b0b69..82ee2645 100644 --- a/varnish-cache/bin/varnishd/cache.h +++ b/varnish-cache/bin/varnishd/cache.h @@ -197,9 +197,19 @@ struct worker { unsigned wlr; }; +/* Work Request for worker thread ------------------------------------*/ + +/* + * This is a worker-function. + * XXX: typesafety is probably not worth fighting for + */ + +typedef void workfunc(struct worker *, void *priv); + struct workreq { VTAILQ_ENTRY(workreq) list; - struct sess *sess; + workfunc *func; + void *priv; }; #include "hash_slinger.h" @@ -504,6 +514,7 @@ void PipeSession(struct sess *sp); /* cache_pool.c */ void WRK_Init(void); +int WRK_Queue(struct workreq *wrq); void WRK_QueueSession(struct sess *sp); void WRK_Reset(struct worker *w, int *fd); unsigned WRK_Flush(struct worker *w); diff --git a/varnish-cache/bin/varnishd/cache_pool.c b/varnish-cache/bin/varnishd/cache_pool.c index 4ea04c06..06344c31 100644 --- a/varnish-cache/bin/varnishd/cache_pool.c +++ b/varnish-cache/bin/varnishd/cache_pool.c @@ -29,6 +29,7 @@ * $Id$ * * XXX: automatic thread-pool size adaptation. + * XXX: unlocked stats variables: consider summing over pools in timer thread. */ #include "config.h" @@ -64,16 +65,24 @@ VTAILQ_HEAD(workerhead, worker); /* Number of work requests queued in excess of worker threads available */ struct wq { + unsigned magic; +#define WQ_MAGIC 0x606658fa MTX mtx; struct workerhead idle; - unsigned nwrk; + VTAILQ_HEAD(, workreq) overflow; + unsigned nthr; + unsigned nqueue; + uintmax_t drops; }; -static MTX tmtx; -static VTAILQ_HEAD(, workreq) overflow = VTAILQ_HEAD_INITIALIZER(overflow); - static struct wq **wq; static unsigned nwq; +static unsigned ovfl_max; +static unsigned nthr_max; +static unsigned nthr_min; + +static pthread_cond_t herder_cond; +static MTX herder_mtx; /*-------------------------------------------------------------------- * Write data to fd @@ -180,34 +189,34 @@ WRK_Sendfile(struct worker *w, int fd, off_t off, unsigned len) /*--------------------------------------------------------------------*/ static void -wrk_do_one(struct worker *w) +wrk_do_cnt_sess(struct worker *w, void *priv) { - struct workreq *wrq; + struct sess *sess; - AN(w->wrq); - wrq = w->wrq; - CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC); - wrq->sess->wrk = w; + CAST_OBJ_NOTNULL(sess, priv, SESS_MAGIC); + sess->wrk = w; CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC); CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC); w->used = NAN; - CNT_Session(wrq->sess); + CNT_Session(sess); assert(!isnan(w->used)); CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC); CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC); - w->wrq = NULL; } +/*--------------------------------------------------------------------*/ + static void * wrk_thread(void *priv) { struct worker *w, ww; struct wq *qp; unsigned char wlog[8192]; /* XXX: size */ + struct workreq *wrq; THR_Name("cache-worker"); w = &ww; - qp = priv; + CAST_OBJ_NOTNULL(qp, priv, WQ_MAGIC); memset(w, 0, sizeof *w); w->magic = WORKER_MAGIC; w->used = TIM_real(); @@ -216,42 +225,32 @@ wrk_thread(void *priv) AZ(pthread_cond_init(&w->cond, NULL)); VSL(SLT_WorkThread, 0, "%p start", w); - LOCK(&tmtx); - VSL_stats->n_wrk_create++; - UNLOCK(&tmtx); while (1) { CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); assert(!isnan(w->used)); - w->wrq = VTAILQ_FIRST(&overflow); + /* Process overflow requests, if any */ + LOCK(&qp->mtx); + w->wrq = VTAILQ_FIRST(&qp->overflow); if (w->wrq != NULL) { - LOCK(&tmtx); - - /* Process overflow requests, if any */ - w->wrq = VTAILQ_FIRST(&overflow); - if (w->wrq != NULL) { - VSL_stats->n_wrk_queue--; - VTAILQ_REMOVE(&overflow, w->wrq, list); - } - UNLOCK(&tmtx); - } - if (w->wrq == NULL) { - LOCK(&qp->mtx); + VTAILQ_REMOVE(&qp->overflow, w->wrq, list); + qp->nqueue--; + VSL_stats->n_wrk_queue--; /* XXX: unlocked */ + } else { VTAILQ_INSERT_HEAD(&qp->idle, w, list); AZ(pthread_cond_wait(&w->cond, &qp->mtx)); - UNLOCK(&qp->mtx); } + UNLOCK(&qp->mtx); if (w->wrq == NULL) break; - wrk_do_one(w); + AN(w->wrq); + wrq = w->wrq; + AN(wrq->func); + wrq->func(w, wrq->priv); + w->wrq = NULL; } - LOCK(&tmtx); - VSL_stats->n_wrk--; - qp->nwrk--; - UNLOCK(&tmtx); - VSL(SLT_WorkThread, 0, "%p end", w); if (w->vcl != NULL) VCL_Rel(&w->vcl); @@ -269,19 +268,22 @@ wrk_thread(void *priv) /*--------------------------------------------------------------------*/ -void -WRK_QueueSession(struct sess *sp) +int +WRK_Queue(struct workreq *wrq) { struct worker *w; - pthread_t tp; struct wq *qp; static unsigned nq = 0; unsigned onq; + /* + * Select which pool we issue to + * XXX: better alg ? + * XXX: per CPU ? + */ onq = nq + 1; if (onq >= nwq) onq = 0; - sp->workreq.sess = sp; qp = wq[onq]; nq = onq; @@ -292,136 +294,219 @@ WRK_QueueSession(struct sess *sp) if (w != NULL) { VTAILQ_REMOVE(&qp->idle, w, list); UNLOCK(&qp->mtx); - w->wrq = &sp->workreq; + w->wrq = wrq; AZ(pthread_cond_signal(&w->cond)); - return; + return (0); } - UNLOCK(&qp->mtx); + /* If we have too much in the overflow already, refuse */ - LOCK(&tmtx); - /* - * If we have too much in the overflow, and this is a new session - * just drop it. We do not drop sessions which were waiting for - * a busy object, they will be cheap to serve from here and the - * cleanup would be more complex to carry out than delivering - * the result will be - */ - if (sp->obj == NULL && - (VSL_stats->n_wrk_queue > - (params->wthread_max * params->overflow_max) / 100)) { - VSL_stats->n_wrk_drop++; - UNLOCK(&tmtx); - sp->t_end = TIM_real(); - vca_close_session(sp, "dropped"); - if(sp->vcl != NULL) { - /* - * A session parked on a busy object can come here - * after it wakes up. Loose the VCL reference. - */ - VCL_Rel(&sp->vcl); - } - SES_Delete(sp); - return; - } - /* - * XXX: If there are too many requests in the overflow queue - * XXX: we should kill the request right here. - * XXX: Not sure how though. Simply closing may be the better - * XXX: compromise. - */ - VTAILQ_INSERT_TAIL(&overflow, &sp->workreq, list); - VSL_stats->n_wrk_overflow++; - VSL_stats->n_wrk_queue++; - /* Can we create more threads ? */ - if (VSL_stats->n_wrk >= params->wthread_max || - qp->nwrk * nwq >= params->wthread_max) { - VSL_stats->n_wrk_max++; - UNLOCK(&tmtx); - return; + if (qp->nqueue > ovfl_max) { + qp->drops++; + VSL_stats->n_wrk_drop++; /* XXX: unlocked */ + UNLOCK(&qp->mtx); + return (-1); } - /* Try to create a thread */ - VSL_stats->n_wrk++; - qp->nwrk++; - UNLOCK(&tmtx); + VTAILQ_INSERT_TAIL(&qp->overflow, wrq, list); + VSL_stats->n_wrk_queue++; /* XXX: unlocked */ + VSL_stats->n_wrk_overflow++; /* XXX: unlocked */ + qp->nqueue++; + UNLOCK(&qp->mtx); + AZ(pthread_cond_signal(&herder_cond)); + return (0); +} - if (!pthread_create(&tp, NULL, wrk_thread, qp)) { - AZ(pthread_detach(tp)); - return; - } +/*--------------------------------------------------------------------*/ - VSL(SLT_Debug, 0, "Create worker thread failed %d %s", - errno, strerror(errno)); +void +WRK_QueueSession(struct sess *sp) +{ + sp->workreq.func = wrk_do_cnt_sess; + sp->workreq.priv = sp; + if (WRK_Queue(&sp->workreq) == 0) + return; - LOCK(&tmtx); - /* Register overflow */ - qp->nwrk--; - VSL_stats->n_wrk--; - VSL_stats->n_wrk_failed++; - UNLOCK(&tmtx); + /* + * Couldn't queue it -- kill it. + * + * XXX: a notice might be polite, but would potentially + * XXX: sleep whichever thread got us here + */ + sp->t_end = TIM_real(); + vca_close_session(sp, "dropped"); + if(sp->vcl != NULL) { + /* + * A session parked on a busy object can come here + * after it wakes up. Loose the VCL reference. + */ + VCL_Rel(&sp->vcl); + } + SES_Delete(sp); } /*--------------------------------------------------------------------*/ static void -wrk_addpools(unsigned t) +wrk_addpools(const unsigned pools) { struct wq **pwq, **owq; unsigned u; - if (t <= nwq) - return; - - pwq = calloc(sizeof *pwq, params->wthread_pools); + pwq = calloc(sizeof *pwq, pools); if (pwq == NULL) return; if (wq != NULL) memcpy(pwq, wq, sizeof *pwq * nwq); owq = wq; wq = pwq; - for (u = nwq; u < t; u++) { + for (u = nwq; u < pools; u++) { wq[u] = calloc(sizeof *wq[u], 1); XXXAN(wq[u]); + wq[u]->magic = WQ_MAGIC; MTX_INIT(&wq[u]->mtx); + VTAILQ_INIT(&wq[u]->overflow); VTAILQ_INIT(&wq[u]->idle); } - free(owq); - nwq = t; + (void)owq; /* XXX: avoid race, leak it. */ + nwq = pools; } -/*--------------------------------------------------------------------*/ +/*-------------------------------------------------------------------- + * If a thread is idle or excess, pick it out of the pool... + */ -static void * -wrk_reaperthread(void *priv) +static void +wrk_decimate_flock(struct wq *qp, double t_idle) { - double now; struct worker *w; - struct wq *qp; + + if (qp->nthr <= nthr_min) + return; + + LOCK(&qp->mtx); + w = VTAILQ_LAST(&qp->idle, workerhead); + if (w != NULL && (w->used < t_idle || qp->nthr > nthr_max)) + VTAILQ_REMOVE(&qp->idle, w, list); + UNLOCK(&qp->mtx); + + /* And give it a kiss on the cheek... */ + if (w != NULL) { + AZ(w->wrq); + AZ(pthread_cond_signal(&w->cond)); + qp->nthr--; + VSL_stats->n_wrk--; /* XXX: unlocked */ + (void)usleep(params->wthread_purge_delay * 1000); + } +} + +/*-------------------------------------------------------------------- + * Periodic pool herding thread + * + * Do things which we can do at our leisure: + * Add pools + * Scale constants + * Get rid of excess threads + */ + +static void * +wrk_herdtimer_thread(void *priv) +{ + volatile unsigned u; + double t_idle; + + THR_Name("wrk_herdtimer"); + + (void)priv; + while (1) { + /* Add Pools */ + u = params->wthread_pools; + if (u > nwq) + wrk_addpools(u); + + /* Scale parameters */ + u = params->wthread_min / nwq; + if (u < 1) + u = 1; + nthr_min = u; + + u = params->wthread_max / nwq; + if (u < nthr_min) + u = nthr_min; + nthr_max = u; + + ovfl_max = (nthr_max * params->overflow_max) / 100; + + t_idle = TIM_real() - params->wthread_timeout; + for (u = 0; u < nwq; u++) + wrk_decimate_flock(wq[u], t_idle); + + (void)usleep(params->wthread_purge_delay * 1000); + } +} + +/*-------------------------------------------------------------------- + * Create more threads, if necessay & possible + */ + +static void +wrk_breed_flock(struct wq *qp) +{ + pthread_t tp; + + /* + * If we need more threads, and have space, create + * one more thread. + */ + if (qp->nqueue > params->wthread_add_threshold || + qp->nthr < nthr_min) { + if (qp->nthr >= nthr_max) { + VSL_stats->n_wrk_max++; + } else if (pthread_create(&tp, NULL, wrk_thread, qp)) { + VSL(SLT_Debug, 0, "Create worker thread failed %d %s", + errno, strerror(errno)); + VSL_stats->n_wrk_failed++; + (void)usleep(params->wthread_fail_delay * 1000); + } else { + qp->nthr++; + AZ(pthread_detach(tp)); + VSL_stats->n_wrk++; /* XXX: unlocked */ + VSL_stats->n_wrk_create++; + (void)usleep(params->wthread_add_delay * 1000); + } + } +} + +/*-------------------------------------------------------------------- + * This thread wakes up whenever a pool overflows. + * + * The trick here is to not be too aggressive about creating threads. + * We do this by only examining one pool at a time, and by sleeping + * a short while whenever we create a thread and a little while longer + * whenever we fail to, hopefully missing a lot of cond_signals in + * the meantime. + * + * XXX: probably need a lot more work. + * + */ + +static void * +wrk_herder_thread(void *priv) +{ unsigned u; + THR_Name("wrk_herder"); (void)priv; while (1) { - wrk_addpools(params->wthread_pools); - AZ(sleep(1)); - if (VSL_stats->n_wrk <= params->wthread_min) - continue; - now = TIM_real(); - for (u = 0; u < nwq; u++) { - qp = wq[u]; - LOCK(&qp->mtx); - w = VTAILQ_LAST(&qp->idle, workerhead); - if (w != NULL && - (w->used + params->wthread_timeout < now || - VSL_stats->n_wrk > params->wthread_max)) - VTAILQ_REMOVE(&qp->idle, w, list); - else - w = NULL; - UNLOCK(&qp->mtx); - if (w == NULL) - continue; - AZ(w->wrq); - AZ(pthread_cond_signal(&w->cond)); + for (u = 0 ; u < nwq; u++) { + /* + * We cannot avoid getting a mutex, so we have a + * bogo mutex just for POSIX_STUPIDITY + */ + AZ(pthread_mutex_lock(&herder_mtx)); + AZ(pthread_cond_wait(&herder_cond, &herder_mtx)); + AZ(pthread_mutex_unlock(&herder_mtx)); + wrk_breed_flock(wq[u]); } } } @@ -433,8 +518,10 @@ WRK_Init(void) { pthread_t tp; - wrk_addpools(params->wthread_pools); - MTX_INIT(&tmtx); - AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL)); + AZ(pthread_cond_init(&herder_cond, NULL)); + AZ(pthread_mutex_init(&herder_mtx, NULL)); + + AZ(pthread_create(&tp, NULL, wrk_herdtimer_thread, NULL)); + AZ(pthread_create(&tp, NULL, wrk_herder_thread, NULL)); AZ(pthread_detach(tp)); } diff --git a/varnish-cache/bin/varnishd/heritage.h b/varnish-cache/bin/varnishd/heritage.h index e464fb75..374e2f88 100644 --- a/varnish-cache/bin/varnishd/heritage.h +++ b/varnish-cache/bin/varnishd/heritage.h @@ -76,11 +76,15 @@ struct params { /* TTL used for lack of anything better */ unsigned default_ttl; - /* Worker threads */ + /* Worker threads and pool */ unsigned wthread_min; unsigned wthread_max; unsigned wthread_timeout; unsigned wthread_pools; + unsigned wthread_add_threshold; + unsigned wthread_add_delay; + unsigned wthread_fail_delay; + unsigned wthread_purge_delay; unsigned overflow_max; diff --git a/varnish-cache/bin/varnishd/mgt_param.c b/varnish-cache/bin/varnishd/mgt_param.c index 3bcd8609..a0b3613e 100644 --- a/varnish-cache/bin/varnishd/mgt_param.c +++ b/varnish-cache/bin/varnishd/mgt_param.c @@ -463,31 +463,96 @@ static const struct parspec parspec[] = { 0, "120", "seconds" }, { "thread_pools", tweak_uint, &master.wthread_pools, 1, UINT_MAX, - "Number of worker pools. " + "Number of worker thread pools.\n" + "\n" "Increasing number of worker pools decreases lock " - "contention but increases the number of threads as well. " + "contention.\n" + "\n" + "Too many pools waste CPU and RAM resources, and more than " + "one pool for each CPU is probably detrimal to performance.\n" + "\n" "Can be increased on the fly, but decreases require a " "restart to take effect.", - EXPERIMENTAL, + EXPERIMENTAL | DELAYED_EFFECT, "1", "pools" }, - { "thread_pool_max", tweak_thread_pool_max, NULL, 0, 0, - "The maximum number of threads in the total worker pool.\n" - "-1 is unlimited.", + { "thread_pool_max", tweak_thread_pool_max, NULL, 1, 0, + "The maximum number of worker threads in all pools combined.\n" + "\n" + "Do not set this higher than you have to, since excess " + "worker threads soak up RAM and CPU and generally just get " + "in the way of getting work done.\n", EXPERIMENTAL | DELAYED_EFFECT, - "1000", "threads" }, - { "thread_pool_min", tweak_thread_pool_min, NULL, 0, 0, - "The minimum number of threads in the worker pool.\n" + "100", "threads" }, + { "thread_pool_min", tweak_thread_pool_min, NULL, 1, 0, + "The minimum number of threads in all worker pools combined.\n" + "\n" + "Increasing this may help ramp up faster from low load " + "situations where threads have expired.\n" + "\n" "Minimum is 1 thread.", EXPERIMENTAL | DELAYED_EFFECT, "1", "threads" }, - { "thread_pool_timeout", tweak_timeout, &master.wthread_timeout, 0, 0, - "Thread dies after this many seconds of inactivity.\n" + { "thread_pool_timeout", tweak_timeout, &master.wthread_timeout, 1, 0, + "Thread idle threshold.\n" + "\n" + "Threads in excess of thread_pool_min, which have been idle " + "for at least this long are candidates for purging.\n" + "\n" "Minimum is 1 second.", EXPERIMENTAL | DELAYED_EFFECT, "120", "seconds" }, + { "thread_pool_purge_delay", + tweak_timeout, &master.wthread_purge_delay, 100, 0, + "Wait this long between purging threads.\n" + "\n" + "This controls the decay of thread pools when idle(-ish).\n" + "\n" + "Minimum is 100 milliseconds.", + EXPERIMENTAL | DELAYED_EFFECT, + "1000", "milliseconds" }, + { "thread_pool_add_threshold", + tweak_uint, &master.wthread_add_threshold, 0, UINT_MAX, + "Overflow threshold for worker thread creation.\n" + "\n" + "Setting this too low, will result in excess worker threads, " + "which is generally a bad idea.\n" + "\n" + "Setting it too high results in insuffient worker threads.\n", + EXPERIMENTAL, + "2", "requests" }, + { "thread_pool_add_delay", + tweak_timeout, &master.wthread_add_delay, 0, UINT_MAX, + "Wait at least this long between creating threads.\n" + "\n" + "Setting this too long results in insuffient worker threads.\n" + "\n" + "Setting this too short increases the risk of worker " + "thread pile-up.\n", + EXPERIMENTAL, + "10", "milliseconds" }, + { "thread_pool_fail_delay", + tweak_timeout, &master.wthread_fail_delay, 100, UINT_MAX, + "Wait at least this long after a failed thread creation " + "before trying to create another thread.\n" + "\n" + "Failure to create a worker thread is often a sign that " + " the end is near, because the process is running out of " + "RAM resources for thread stacks.\n" + "This delay tries to not rush it on needlessly.\n" + "\n" + "If thread creation failures are a problem, check that " + "thread_pool_max is not too high.\n" + "\n" + "It may also help to increase thread_pool_timeout and " + "thread_pool_min, to reduce the rate at which treads are " + "destroyed and later recreated.\n", + EXPERIMENTAL, + "200", "milliseconds" }, { "overflow_max", tweak_uint, &master.overflow_max, 0, UINT_MAX, - "Limit on overflow queue length in percent of " - "thread_pool_max parameter.", + "Percentage permitted overflow queue length.\n" + "\n" + "This sets the ratio of queued requests to worker threads, " + "above which sessions will be dropped instead of queued.\n", EXPERIMENTAL, "100", "%" }, { "rush_exponent", tweak_uint, &master.rush_exponent, 2, UINT_MAX, -- 2.39.5