From 24d44957d1e55005125550d37dc5dbb18cdec4e8 Mon Sep 17 00:00:00 2001 From: phk Date: Wed, 6 Sep 2006 19:35:33 +0000 Subject: [PATCH] Reduce traffic on the wrk_mtx, saving some syscalls along the way: Make a thread-reaper-thread which examines the tail end of the queue and kicks threads which are too old so the wake up and die. This allows us to assume that a thread on the free queue is always waiting on its condvar, so we don't need to hold the mutex when we signal the condvar. As a result, the woken thread stands a chance to grab the mutex and a little song and dance between the two threads is avoided. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@929 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/cache.h | 3 +- varnish-cache/bin/varnishd/cache_pool.c | 76 ++++++++++++++++--------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/varnish-cache/bin/varnishd/cache.h b/varnish-cache/bin/varnishd/cache.h index f05bc779..753967ad 100644 --- a/varnish-cache/bin/varnishd/cache.h +++ b/varnish-cache/bin/varnishd/cache.h @@ -99,7 +99,8 @@ struct worker { struct objhead *nobjhead; struct object *nobj; - unsigned nbr; + time_t idle; + pthread_cond_t cv; TAILQ_ENTRY(worker) list; struct workreq *wrq; diff --git a/varnish-cache/bin/varnishd/cache_pool.c b/varnish-cache/bin/varnishd/cache_pool.c index 26a336aa..59102cc2 100644 --- a/varnish-cache/bin/varnishd/cache_pool.c +++ b/varnish-cache/bin/varnishd/cache_pool.c @@ -25,8 +25,10 @@ static pthread_mutex_t wrk_mtx; /* Number of work requests queued in excess of worker threads available */ static unsigned wrk_overflow; -static TAILQ_HEAD(, worker) wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle); -static TAILQ_HEAD(, worker) wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy); +TAILQ_HEAD(workerhead, worker); + +static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle); +static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy); static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead); /*-------------------------------------------------------------------- @@ -150,6 +152,7 @@ wrk_do_one(struct worker *w) if (w->nobjhead != NULL) CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); w->wrq = NULL; + w->idle = time(NULL); LOCK(&wrk_mtx); VSL_stats->n_wrk_busy--; } @@ -158,19 +161,17 @@ static void * wrk_thread(void *priv) { struct worker *w, ww; - struct timespec ts; (void)priv; w = &ww; memset(w, 0, sizeof *w); w->magic = WORKER_MAGIC; - + w->idle = time(NULL); AZ(pthread_cond_init(&w->cv, NULL)); + VSL(SLT_WorkThread, 0, "%p start", w); LOCK(&wrk_mtx); - w->nbr = VSL_stats->n_wrk; VSL_stats->n_wrk_create++; - VSL(SLT_WorkThread, 0, "%u born", w->nbr); TAILQ_INSERT_HEAD(&wrk_busy, w, list); while (1) { CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); @@ -184,27 +185,17 @@ wrk_thread(void *priv) TAILQ_REMOVE(&wrk_busy, w, list); TAILQ_INSERT_HEAD(&wrk_idle, w, list); - - /* If we are a reserved thread we don't die */ - if (w->nbr < params->wthread_min) { - AZ(pthread_cond_wait(&w->cv, &wrk_mtx)); - } else { - /* If we are a dynamic thread, time out and die */ - AZ(clock_gettime(CLOCK_REALTIME, &ts)); - ts.tv_sec += params->wthread_timeout; - if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) { - VSL_stats->n_wrk--; - TAILQ_REMOVE(&wrk_idle, w, list); - UNLOCK(&wrk_mtx); - VSL(SLT_WorkThread, 0, "%u suicide", w->nbr); - AZ(pthread_cond_destroy(&w->cv)); - return (NULL); - } - } - - /* we are already removed from wrk_idle */ + assert(w->idle != 0); + AZ(pthread_cond_wait(&w->cv, &wrk_mtx)); + if (w->idle == 0) + break; wrk_do_one(w); } + VSL_stats->n_wrk--; + UNLOCK(&wrk_mtx); + VSL(SLT_WorkThread, 0, "%p end", w); + AZ(pthread_cond_destroy(&w->cv)); + return (NULL); } /*--------------------------------------------------------------------*/ @@ -224,10 +215,10 @@ WRK_QueueSession(struct sess *sp) /* If there are idle threads, we tickle the first one into action */ w = TAILQ_FIRST(&wrk_idle); if (w != NULL) { - AZ(pthread_cond_signal(&w->cv)); TAILQ_REMOVE(&wrk_idle, w, list); TAILQ_INSERT_TAIL(&wrk_busy, w, list); UNLOCK(&wrk_mtx); + AZ(pthread_cond_signal(&w->cv)); return; } @@ -258,7 +249,35 @@ WRK_QueueSession(struct sess *sp) VSL_stats->n_wrk_failed++; UNLOCK(&wrk_mtx); } + +/*--------------------------------------------------------------------*/ +static void * +wrk_reaperthread(void *priv) +{ + time_t now; + struct worker *w; + + (void)priv; + while (1) { + sleep(1); + if (VSL_stats->n_wrk <= params->wthread_min) + continue; + now = time(NULL); + LOCK(&wrk_mtx); + w = TAILQ_LAST(&wrk_idle, workerhead); + if (w != NULL && w->idle + params->wthread_timeout < now) + TAILQ_REMOVE(&wrk_idle, w, list); + else + w = NULL; + UNLOCK(&wrk_mtx); + if (w == NULL) + continue; + w->idle = 0; + AZ(pthread_cond_signal(&w->cv)); + } + INCOMPL(); +} /*--------------------------------------------------------------------*/ @@ -270,6 +289,9 @@ WRK_Init(void) AZ(pthread_mutex_init(&wrk_mtx, NULL)); + AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL)); + AZ(pthread_detach(tp)); + VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min); for (i = 0; i < params->wthread_min; i++) { VSL_stats->n_wrk++; @@ -295,7 +317,7 @@ cli_func_dump_pool(struct cli *cli, char **av, void *priv) t = time(NULL); TAILQ_FOREACH(w, &wrk_busy, list) { cli_out(cli, "\n"); - cli_out(cli, "W %p nbr %d ", w, w->nbr); + cli_out(cli, "W %p", w); if (w->wrq == NULL) continue; s = w->wrq->sess; -- 2.39.5