]> err.no Git - varnish/commitdiff
Make it possible to have multiple worker pools.
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Sat, 16 Sep 2006 19:54:34 +0000 (19:54 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Sat, 16 Sep 2006 19:54:34 +0000 (19:54 +0000)
The acceptor selects the pool based on filedescriptor modulus
number of pools.

This is an attempt to reduce lock contention.

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1031 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishd/cache_pool.c
varnish-cache/bin/varnishd/heritage.h
varnish-cache/bin/varnishd/mgt_param.c

index eb1b200a7cbd4213c8834165dc94082808f5306a..658c7d03e6c48b830bb696e98b87e67047611a82 100644 (file)
 #include "cli_priv.h"
 #include "cache.h"
 
-static MTX wrk_mtx;
+TAILQ_HEAD(workerhead, worker);
 
 /* Number of work requests queued in excess of worker threads available */
-static unsigned                wrk_overflow;
 
-TAILQ_HEAD(workerhead, worker);
+struct wq {
+       MTX                     mtx;
+       struct workerhead       idle;
+       TAILQ_HEAD(, workreq)   req;
+       unsigned                overflow;
+};
 
-static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
-static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
-static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+static MTX                     tmtx;
+
+static struct wq               **wq;
+static unsigned                        nwq;
 
 /*--------------------------------------------------------------------
  * Write data to fd
@@ -169,9 +174,10 @@ static void *
 wrk_thread(void *priv)
 {
        struct worker *w, ww;
+       struct wq *qp;
        char c;
 
-       (void)priv;
+       qp = priv;
        w = &ww;
        memset(w, 0, sizeof *w);
        w->magic = WORKER_MAGIC;
@@ -179,40 +185,38 @@ wrk_thread(void *priv)
        AZ(pipe(w->pipe));
 
        VSL(SLT_WorkThread, 0, "%p start", w);
-       LOCK(&wrk_mtx);
+       LOCK(&qp->mtx);
        VSL_stats->n_wrk_create++;
-       TAILQ_INSERT_HEAD(&wrk_busy, w, list);
        VSL_stats->n_wrk_busy++;
        while (1) {
                CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
 
                /* Process overflow requests, if any */
-               if (wrk_overflow > 0) {
-                       wrk_overflow--;
-                       w->wrq = TAILQ_FIRST(&wrk_reqhead);
+               if (qp->overflow > 0) {
+                       qp->overflow--;
+                       w->wrq = TAILQ_FIRST(&qp->req);
                        AN(w->wrq);
-                       TAILQ_REMOVE(&wrk_reqhead, w->wrq, list);
+                       TAILQ_REMOVE(&qp->req, w->wrq, list);
                        VSL_stats->n_wrk_queue--;
-                       UNLOCK(&wrk_mtx);
+                       UNLOCK(&qp->mtx);
                        wrk_do_one(w);
-                       LOCK(&wrk_mtx);
+                       LOCK(&qp->mtx);
                        continue;
                }
                
-               TAILQ_REMOVE(&wrk_busy, w, list);
-               TAILQ_INSERT_HEAD(&wrk_idle, w, list);
+               TAILQ_INSERT_HEAD(&qp->idle, w, list);
                assert(w->idle != 0);
                VSL_stats->n_wrk_busy--;
-               UNLOCK(&wrk_mtx);
+               UNLOCK(&qp->mtx);
                assert(1 == read(w->pipe[0], &c, 1));
                if (w->idle == 0)
                        break;
                wrk_do_one(w);
-               LOCK(&wrk_mtx);
+               LOCK(&qp->mtx);
        }
-       LOCK(&wrk_mtx);
+       LOCK(&tmtx);
        VSL_stats->n_wrk--;
-       UNLOCK(&wrk_mtx);
+       UNLOCK(&tmtx);
        VSL(SLT_WorkThread, 0, "%p end", w);
        close(w->pipe[0]);
        close(w->pipe[1]);
@@ -226,39 +230,42 @@ WRK_QueueSession(struct sess *sp)
 {
        struct worker *w;
        pthread_t tp;
+       struct wq *qp;
 
        sp->workreq.sess = sp;
+       qp = wq[sp->fd % nwq];
 
-       LOCK(&wrk_mtx);
+       LOCK(&qp->mtx);
 
        /* If there are idle threads, we tickle the first one into action */
-       w = TAILQ_FIRST(&wrk_idle);
+       w = TAILQ_FIRST(&qp->idle);
        if (w != NULL) {
-               TAILQ_REMOVE(&wrk_idle, w, list);
-               TAILQ_INSERT_TAIL(&wrk_busy, w, list);
+               TAILQ_REMOVE(&qp->idle, w, list);
                VSL_stats->n_wrk_busy++;
-               UNLOCK(&wrk_mtx);
+               UNLOCK(&qp->mtx);
                w->wrq = &sp->workreq;
                assert(1 == write(w->pipe[1], w, 1));
                return;
        }
        
-       TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+       TAILQ_INSERT_TAIL(&qp->req, &sp->workreq, list);
        VSL_stats->n_wrk_queue++;
-       wrk_overflow++;
+       qp->overflow++;
+       UNLOCK(&qp->mtx);
 
+       LOCK(&tmtx);
        /* Can we create more threads ? */
        if (VSL_stats->n_wrk >= params->wthread_max) {
                VSL_stats->n_wrk_max++;
-               UNLOCK(&wrk_mtx);
+               UNLOCK(&tmtx);
                return;
        }
 
        /* Try to create a thread */
        VSL_stats->n_wrk++;
-       UNLOCK(&wrk_mtx);
+       UNLOCK(&tmtx);
 
-       if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+       if (!pthread_create(&tp, NULL, wrk_thread, qp)) {
                AZ(pthread_detach(tp));
                return;
        }
@@ -266,11 +273,40 @@ WRK_QueueSession(struct sess *sp)
        VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
            errno, strerror(errno));
 
+       LOCK(&tmtx);
        /* Register overflow */
-       LOCK(&wrk_mtx);
        VSL_stats->n_wrk--;
        VSL_stats->n_wrk_failed++;
-       UNLOCK(&wrk_mtx);
+       UNLOCK(&tmtx);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+wrk_addpools(unsigned t)
+{
+       struct wq **pwq, **owq;
+       unsigned u;
+
+       if (t <= nwq)
+               return;
+
+       pwq = calloc(sizeof *pwq, params->wthread_pools);
+       if (pwq == NULL)
+               return;
+       if (wq != NULL)
+               memcpy(pwq, wq, sizeof *pwq * nwq);
+       owq = wq;
+       wq = pwq;
+       for (u = nwq; u < t; u++) {
+               wq[u] = calloc(sizeof *wq[u], 1);
+               XXXAN(wq[u]);
+               MTX_INIT(&wq[u]->mtx);
+               TAILQ_INIT(&wq[u]->idle);
+               TAILQ_INIT(&wq[u]->req);
+       }
+       free(owq);
+       nwq = t;
 }
 
 /*--------------------------------------------------------------------*/
@@ -280,26 +316,32 @@ wrk_reaperthread(void *priv)
 {
        time_t  now;
        struct worker *w;
+       struct wq *qp;
+       unsigned u;
 
        (void)priv;
        while (1) {
+               wrk_addpools(params->wthread_pools);
                sleep(1);
                if (VSL_stats->n_wrk <= params->wthread_min)
                        continue; 
                now = time(NULL);
-               LOCK(&wrk_mtx);
-               w = TAILQ_LAST(&wrk_idle, workerhead);
-               if (w != NULL &&
-                  (w->idle + params->wthread_timeout < now ||
-                   VSL_stats->n_wrk <= params->wthread_max))
-                       TAILQ_REMOVE(&wrk_idle, w, list);
-               else 
-                       w = NULL;
-               UNLOCK(&wrk_mtx);
-               if (w == NULL)
-                       continue;
-               w->idle = 0;
-               assert(1 == write(w->pipe[1], w, 1));
+               for (u = 0; u < nwq; u++) {
+                       qp = wq[u];
+                       LOCK(&qp->mtx);
+                       w = TAILQ_LAST(&qp->idle, workerhead);
+                       if (w != NULL &&
+                          (w->idle + params->wthread_timeout < now ||
+                           VSL_stats->n_wrk <= params->wthread_max))
+                               TAILQ_REMOVE(&qp->idle, w, list);
+                       else 
+                               w = NULL;
+                       UNLOCK(&qp->mtx);
+                       if (w == NULL)
+                               continue;
+                       w->idle = 0;
+                       assert(1 == write(w->pipe[1], w, 1));
+               }
        }
        INCOMPL();
 }
@@ -310,53 +352,20 @@ void
 WRK_Init(void)
 {
        pthread_t tp;
-       int i;
-
-       MTX_INIT(&wrk_mtx);
 
+       wrk_addpools(params->wthread_pools);
+       MTX_INIT(&tmtx);
        AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
        AZ(pthread_detach(tp));
-
-       VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min);
-       for (i = 0; i < params->wthread_min; i++) {
-               VSL_stats->n_wrk++;
-               AZ(pthread_create(&tp, NULL, wrk_thread, NULL));
-               AZ(pthread_detach(tp));
-       }
 }
 
-
 /*--------------------------------------------------------------------*/
 
 void
 cli_func_dump_pool(struct cli *cli, char **av, void *priv)
 {
-       unsigned u;
-       struct sess *s;
-       time_t t;
 
+       (void)cli;
        (void)av;
        (void)priv;
-       struct worker *w;
-       LOCK(&wrk_mtx);
-       t = time(NULL);
-       TAILQ_FOREACH(w, &wrk_busy, list) {
-               cli_out(cli, "\n");
-               cli_out(cli, "W %p", w);
-               if (w->wrq == NULL)
-                       continue;
-               s = w->wrq->sess;
-               if (s == NULL)
-                       continue;
-               cli_out(cli, "sess %p fd %d xid %u step %d handling %d age %d",
-                   s, s->fd, s->xid, s->step, s->handling,
-                   t - s->t_req.tv_sec);
-       }
-       cli_out(cli, "\n");
-
-       u = 0;
-       TAILQ_FOREACH(w, &wrk_idle, list)
-               u++;
-       cli_out(cli, "%u idle workers\n", u);
-       UNLOCK(&wrk_mtx);
 }
index 1089b14f5e310b6a42086fe5b987810a9f86e351..a3094713e2c113a71161ca90a4efe4e744dbc0e3 100644 (file)
@@ -36,6 +36,7 @@ struct params {
        unsigned                wthread_min;
        unsigned                wthread_max;
        unsigned                wthread_timeout;
+       unsigned                wthread_pools;
 
        /* Memory allocation hints */
        unsigned                mem_workspace;
index 1fb1004d0850a825b536fe5c5659dc3aa505cad2..d81e33a515e04e4f5137ae977fbb2fa91677c6a4 100644 (file)
@@ -115,6 +115,18 @@ tweak_default_ttl(struct cli *cli, struct parspec *par, const char *arg)
        tweak_generic_uint(cli, &params->default_ttl, arg, 0, UINT_MAX);
 }
 
+/*--------------------------------------------------------------------*/
+
+static void
+tweak_thread_pools(struct cli *cli, struct parspec *par, const char *arg)
+{
+
+       (void)par;
+       tweak_generic_uint(cli, &params->wthread_pools, arg,
+           1, UINT_MAX);
+}
+
+
 /*--------------------------------------------------------------------*/
 
 static void
@@ -296,6 +308,9 @@ static struct parspec parspec[] = {
                "To force an immediate effect at the expense of a total "
                "flush of the cache use \"url.purge .\"",
                "120", "seconds" },
+       { "thread_pools", tweak_thread_pools,
+               "Number of thread pools.\n",
+               "1", "pools" },
        { "thread_pool_max", tweak_thread_pool_max,
                "The maximum number of threads in the worker pool.\n"
                "-1 is unlimited.\n"