]> err.no Git - varnish/commitdiff
Rewrite the worker thread pool code.
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 10 Jul 2006 14:52:04 +0000 (14:52 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 10 Jul 2006 14:52:04 +0000 (14:52 +0000)
Assign prefix WRK to the worker pool.

Introduce a struct workreq since the prefetcher (when it happens) will
not have a session to pass in.

The worker threads get a cond_var each and are hung from a list in
most recently used order.

When a request is queued and the worker thread list is not empty,
tickle the cond_var of the first one.

If no threads were availble the max number of threads is not reached,
try to start another worker thread.

If the max was reached or the start filed (likely due to out of memory)
indicate overflow and let the existing pool deal with it.

Create only the minimum requested number of threads initially.

Allow specification of the timeout before a dynamic worker thread commits
suicide to be specified with -w.

Default parameters are -w1,UINT_MAX,10 {min, max, timeout}

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@409 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishd/cache.h
varnish-cache/bin/varnishd/cache_acceptor.c
varnish-cache/bin/varnishd/cache_main.c
varnish-cache/bin/varnishd/cache_pool.c
varnish-cache/bin/varnishd/heritage.h
varnish-cache/bin/varnishd/varnishd.c
varnish-cache/bin/varnishlog/varnishlog.c
varnish-cache/include/shmlog_tags.h
varnish-cache/include/stat_field.h

index 8013c0d3cafbc8d851687aaf0ff69dc7dfb5595b..c7ffc8b596dc39c6296499df6156fb76a8e205ec 100644 (file)
@@ -52,6 +52,15 @@ struct worker {
        struct sbuf             *sb;
        struct objhead          *nobjhead;
        struct object           *nobj;
+
+       unsigned                nbr;
+       pthread_cond_t          cv;
+       TAILQ_ENTRY(worker)     list;
+};
+
+struct workreq {
+       TAILQ_ENTRY(workreq)    list;
+       struct sess             *sess;
 };
 
 #include "hash_slinger.h"
@@ -150,6 +159,8 @@ struct sess {
        /* Various internal stuff */
        struct sessmem          *mem;
        time_t                  t0;
+
+       struct workreq          workreq;
 };
 
 struct backend {
@@ -238,8 +249,8 @@ void PassSession(struct worker *w, struct sess *sp);
 void PipeSession(struct worker *w, struct sess *sp);
 
 /* cache_pool.c */
-void CacheInitPool(void);
-void DealWithSession(void *arg);
+void WRK_Init(void);
+void WRK_QueueSession(struct sess *sp);
 
 /* cache_shmlog.c */
 
index 329f10e228aa95597b4b716b326c86c936ab9111..1469588f78c9e981feb97e30e7101fc45f6b4600 100644 (file)
@@ -190,7 +190,7 @@ vca_callback(void *arg, int bad)
                vca_return_session(sp);
                return;
        }
-       DealWithSession(sp);
+       WRK_QueueSession(sp);
 }
 
 static void
index bd73c86ed4a855d44d0361afc944397bf455f67e..a756acc34075036e4be8b21eb3e7899f420a73c4 100644 (file)
@@ -109,7 +109,7 @@ child_main(void)
        AZ(pthread_mutex_init(&sessmtx, NULL));
        VBE_Init();
        VSL_Init();
-       CacheInitPool();
+       WRK_Init();
 
        VCA_Init();
        EXP_Init();
index 0d1455668c668789bed8abc850d44fcda7228bdc..07a41a25c6e25444154e1878002c700d6e1b6f2f 100644 (file)
@@ -5,6 +5,7 @@
  */
 
 #include <stdio.h>
+#include <errno.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include "vcl.h"
 #include "cache.h"
 
-static TAILQ_HEAD(, sess) shd = TAILQ_HEAD_INITIALIZER(shd);
-
-static pthread_cond_t  shdcnd;
+static pthread_mutex_t wrk_mtx;
 static unsigned                xids;
 
+/* Number of work requests queued in excess of worker threads available */
+static unsigned                wrk_overflow;
+
+static TAILQ_HEAD(, worker) wrk_head = TAILQ_HEAD_INITIALIZER(wrk_head);
+static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+
 /*--------------------------------------------------------------------*/
 
 static int
@@ -52,114 +57,202 @@ DeliverSession(struct worker *w, struct sess *sp)
        return (1);
 }
 
-static void *
-CacheWorker(void *priv)
+static void
+wrk_WorkSession(struct worker *w, struct sess *sp)
 {
-       struct sess *sp;
-       struct worker w;
        int done;
        char *b;
 
-       memset(&w, 0, sizeof w);
-       w.eb = event_init();
-       assert(w.eb != NULL);
-       w.sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
-       assert(w.sb != NULL);
-       
-       (void)priv;
+       time(&sp->t0);
        AZ(pthread_mutex_lock(&sessmtx));
+       sp->vcl = GetVCL();
+       AZ(pthread_mutex_unlock(&sessmtx));
+
+       done = http_Dissect(sp->http, sp->fd, 1);
+       if (done != 0) {
+               RES_Error(w, sp, done, NULL);
+               goto out;
+       }
+
+       sp->backend = sp->vcl->backend[0];
+
+       VCL_recv_method(sp);
+
+       for (done = 0; !done; ) {
+               switch(sp->handling) {
+               case VCL_RET_LOOKUP:
+                       done = LookupSession(w, sp);
+                       break;
+               case VCL_RET_FETCH:
+                       done = FetchSession(w, sp);
+                       break;
+               case VCL_RET_DELIVER:
+                       done = DeliverSession(w, sp);
+                       break;
+               case VCL_RET_PIPE:
+                       PipeSession(w, sp);
+                       done = 1;
+                       break;
+               case VCL_RET_PASS:
+                       PassSession(w, sp);
+                       done = 1;
+                       break;
+               default:
+                       INCOMPL();
+               }
+       }
+       if (http_GetHdr(sp->http, "Connection", &b) &&
+           !strcmp(b, "close")) {
+               vca_close_session(sp, "Connection header");
+       } else if (http_GetProto(sp->http, &b) &&
+           strcmp(b, "HTTP/1.1")) {
+               vca_close_session(sp, "not HTTP/1.1");
+       }
+
+out:
+       AZ(pthread_mutex_lock(&sessmtx));
+       RelVCL(sp->vcl);
+       AZ(pthread_mutex_unlock(&sessmtx));
+       sp->vcl = NULL;
+       vca_return_session(sp);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+wrk_thread(void *priv)
+{
+       struct worker *w, ww;
+       struct workreq *wrq;
+       struct timespec ts;
+
+       w = &ww;
+       memset(w, 0, sizeof w);
+
+       AZ(pthread_cond_init(&w->cv, NULL));
+
+       w->eb = event_init();
+       assert(w->eb != NULL);
+
+       w->sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
+       assert(w->sb != NULL);
+       
+       AZ(pthread_mutex_lock(&wrk_mtx));
+       VSL_stats->n_wrk++;
+       w->nbr = VSL_stats->n_wrk;
+       if (priv == NULL)
+               VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr);
+       else
+               VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr);
+       TAILQ_INSERT_HEAD(&wrk_head, w, list);
        while (1) {
-               while (1) {
-                       sp = TAILQ_FIRST(&shd);
-                       if (sp != NULL)
-                               break;
-                       AZ(pthread_cond_wait(&shdcnd, &sessmtx));
+               wrq = TAILQ_FIRST(&wrk_reqhead);
+               if (wrq != NULL) {
+                       VSL_stats->n_wrkbusy++;
+                       TAILQ_REMOVE(&wrk_head, w, list);
+                       TAILQ_REMOVE(&wrk_reqhead, wrq, list);
+                       AZ(pthread_mutex_unlock(&wrk_mtx));
+                       assert(wrq->sess != NULL);
+                       wrk_WorkSession(w, wrq->sess);
+                       AZ(pthread_mutex_lock(&wrk_mtx));
+                       VSL_stats->n_wrkbusy--;
+                       TAILQ_INSERT_HEAD(&wrk_head, w, list);
                }
-               TAILQ_REMOVE(&shd, sp, list);
-               time(&sp->t0);
-               sp->vcl = GetVCL();
-               AZ(pthread_mutex_unlock(&sessmtx));
-
-               done = http_Dissect(sp->http, sp->fd, 1);
-               if (done != 0) {
-                       RES_Error(&w, sp, done, NULL);
-                       goto out;
+               if (wrk_overflow > 0) {
+                       wrk_overflow--;
+                       continue;
                }
 
-               sp->backend = sp->vcl->backend[0];
-
-               VCL_recv_method(sp);
-
-               for (done = 0; !done; ) {
-                       switch(sp->handling) {
-                       case VCL_RET_LOOKUP:
-                               done = LookupSession(&w, sp);
-                               break;
-                       case VCL_RET_FETCH:
-                               done = FetchSession(&w, sp);
-                               break;
-                       case VCL_RET_DELIVER:
-                               done = DeliverSession(&w, sp);
-                               break;
-                       case VCL_RET_PIPE:
-                               PipeSession(&w, sp);
-                               done = 1;
-                               break;
-                       case VCL_RET_PASS:
-                               PassSession(&w, sp);
-                               done = 1;
-                               break;
-                       default:
-                               INCOMPL();
-                       }
-               }
-               if (http_GetHdr(sp->http, "Connection", &b) &&
-                   !strcmp(b, "close")) {
-                       vca_close_session(sp, "Connection header");
-               } else if (http_GetProto(sp->http, &b) &&
-                   strcmp(b, "HTTP/1.1")) {
-                       vca_close_session(sp, "not HTTP/1.1");
+               /* If we are a reserved thread we don't die */
+               if (priv != NULL) {
+                       AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
+                       continue;
                }
 
-out:
-               AZ(pthread_mutex_lock(&sessmtx));
-               RelVCL(sp->vcl);
-               sp->vcl = NULL;
-               vca_return_session(sp);
+               /* If we are a dynamic thread, time out and die */
+               clock_gettime(CLOCK_REALTIME, &ts);
+               ts.tv_sec += heritage.wthread_timeout;
+               if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
+                       VSL_stats->n_wrk--;
+                       TAILQ_REMOVE(&wrk_head, w, list);
+                       AZ(pthread_mutex_unlock(&wrk_mtx));
+                       VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
+                       sbuf_delete(w->sb);
+                       event_base_free(w->eb);
+                       AZ(pthread_cond_destroy(&w->cv));
+                       return (NULL);
+               }
        }
 }
 
+/*--------------------------------------------------------------------*/
+
 void
-DealWithSession(void *arg)
+WRK_QueueSession(struct sess *sp)
 {
-       struct sess *sp = arg;
+       struct worker *w;
+       pthread_t tp;
 
        time(&sp->t_req);
 
        /*
         * No locking necessary, we're serialized in the acceptor thread
+        * XXX: still ?
         */
        sp->xid = xids++;
        VSL(SLT_XID, sp->fd, "%u", sp->xid);
 
+       sp->workreq.sess = sp;
        VSL_stats->client_req++;
-       AZ(pthread_mutex_lock(&sessmtx));
-       TAILQ_INSERT_TAIL(&shd, sp, list);
-       AZ(pthread_mutex_unlock(&sessmtx));
-       AZ(pthread_cond_signal(&shdcnd));
+
+       AZ(pthread_mutex_lock(&wrk_mtx));
+       TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+
+       /* If there are idle threads, we tickle the first one into action */
+       w = TAILQ_FIRST(&wrk_head);
+       if (w != NULL) {
+               AZ(pthread_cond_signal(&w->cv));
+               AZ(pthread_mutex_unlock(&wrk_mtx));
+               return;
+       }
+       
+       /* Register overflow if max threads reached */
+       if (VSL_stats->n_wrk >= heritage.wthread_max) {
+               wrk_overflow++;
+               AZ(pthread_mutex_unlock(&wrk_mtx));
+               return;
+       }
+
+       /* Try to create a thread */
+       AZ(pthread_mutex_unlock(&wrk_mtx));
+       if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+               AZ(pthread_detach(tp));
+               return;
+       }
+
+       VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+           errno, strerror(errno));
+
+       /* Register overflow */
+       AZ(pthread_mutex_lock(&wrk_mtx));
+       wrk_overflow++;
+       AZ(pthread_mutex_unlock(&wrk_mtx));
 }
+       
+
+/*--------------------------------------------------------------------*/
 
 void
-CacheInitPool(void)
+WRK_Init(void)
 {
        pthread_t tp;
        int i;
 
-       AZ(pthread_cond_init(&shdcnd, NULL));
+       AZ(pthread_mutex_init(&wrk_mtx, NULL));
 
        VSL(SLT_Debug, 0, "Starting %u worker threads", heritage.wthread_min);
        for (i = 0; i < heritage.wthread_min; i++) {
-               AZ(pthread_create(&tp, NULL, CacheWorker, NULL));
+               AZ(pthread_create(&tp, NULL, wrk_thread, &i));
                AZ(pthread_detach(tp));
        }
        srandomdev();
index 54f7c816d7d13349dcdbe681a21b62e2ed920233..c51fdc373ac73576aa310b1995b0f3763eadae09 100644 (file)
@@ -37,7 +37,9 @@ struct heritage {
        unsigned                default_ttl;
 
        /* Worker threads */
-       unsigned                wthread_min, wthread_max;
+       unsigned                wthread_min;
+       unsigned                wthread_max;
+       unsigned                wthread_timeout;
 
        /* Memory allocation hints */
        unsigned                mem_http_headerspace;
index cc2b776f6a17f4d0981d0a88ef022d547e65985d..e16a72d4abf8751418794f6050c6c75c843835e3 100644 (file)
@@ -8,6 +8,7 @@
 #include <string.h>
 #include <errno.h>
 #include <fcntl.h>
+#include <limits.h>
 #include <signal.h>
 #include <stdarg.h>
 #include <stdio.h>
@@ -419,8 +420,8 @@ usage(void)
        fprintf(stderr, "    %-28s # %s\n",
            "-s kind[,storageoptions]", "Backend storage specification");
        fprintf(stderr, "    %-28s # %s\n", "-t", "Default TTL");
-       fprintf(stderr, "    %-28s # %s\n", "-w int[,int]",
-           "Number of worker threads (fixed/{min,max})");
+       fprintf(stderr, "    %-28s # %s\n", "-w int[,int[,int]]",
+           "Number of worker threads (fixed/{min,max}/{min/max/timeout})");
 #if 0
        -c clusterid@cluster_controller
        -m memory_limit
@@ -433,6 +434,28 @@ usage(void)
 }
 
 
+/*--------------------------------------------------------------------*/
+
+static void
+tackle_warg(const char *argv)
+{
+       int i;
+       unsigned ua, ub, uc;
+
+       i = sscanf(argv, "%u,%u,%u", &ua, &ub, &uc);
+       if (i == 0)
+               usage();
+       if (ua < 1)
+               usage();
+       heritage.wthread_min = ua;
+       heritage.wthread_max = ua;
+       heritage.wthread_timeout = 10;
+       if (i >= 2)
+               heritage.wthread_max = ub;
+       if (i >= 3)
+               heritage.wthread_timeout = uc;
+}
+
 /*--------------------------------------------------------------------*/
 
 /* for development purposes */
@@ -442,8 +465,7 @@ usage(void)
 int
 main(int argc, char *argv[])
 {
-       int o, i;
-       unsigned ua, ub;
+       int o;
        const char *portnumber = "8080";
        unsigned dflag = 1;     /* XXX: debug=on for now */
        const char *bflag = NULL;
@@ -456,8 +478,9 @@ main(int argc, char *argv[])
        VCC_InitCompile();
 
        heritage.default_ttl = 120;
-       heritage.wthread_min = 5;
-       heritage.wthread_max = 5;
+       heritage.wthread_min = 1;
+       heritage.wthread_max = UINT_MAX;
+       heritage.wthread_timeout = 10;
        heritage.mem_http_headerspace= 4096;
        heritage.mem_http_headers= 32;
        heritage.mem_workspace = 0;
@@ -486,13 +509,7 @@ main(int argc, char *argv[])
                        heritage.default_ttl = strtoul(optarg, NULL, 0);
                        break;
                case 'w':
-                       i = sscanf(optarg, "%u,%u", &ua, &ub);
-                       if (i == 0)
-                               usage();
-                       heritage.wthread_min = ua;
-                       heritage.wthread_max = ua;
-                       if (i == 2)
-                               heritage.wthread_max = ub;
+                       tackle_warg(optarg);
                        break;
                default:
                        usage();
index b46c193114ba10ba850a534d36bd62c489287426..bdba675996c6c17c8828bca3e9180f7b90e23474 100644 (file)
@@ -182,6 +182,7 @@ order(unsigned char *p, int h_opt)
        case SLT_SessionClose:
        case SLT_SessionReuse:
        case SLT_BackendClose:
+       case SLT_BackendReuse:
                sbuf_finish(ob[u]);
                if ((hc[u] != 4 || h_opt == 0) && sbuf_len(ob[u]) > 1)
                        printf("%s\n", sbuf_data(ob[u]));
index b031d927f42719a13d96a233ac4682ba2b30f7bb..fef0282fc34df5ad0771e134ffa76fcce3317c14 100644 (file)
@@ -36,3 +36,4 @@ SLTM(Hit)
 SLTM(ExpBan)
 SLTM(ExpPick)
 SLTM(ExpKill)
+SLTM(WorkThread)
index 2baf744f1fad2e1b305c1fd1832eaee87a9f4a9b..f3a81ef97281ef1f29dbe5d71fd375235036d4a7 100644 (file)
@@ -16,5 +16,7 @@ MAC_STAT(n_header,            uint64_t, "u", "N struct header");
 MAC_STAT(n_smf,                        uint64_t, "u", "N struct smf");
 MAC_STAT(n_vbe,                        uint64_t, "u", "N struct vbe");
 MAC_STAT(n_vbe_conn,           uint64_t, "u", "N struct vbe_conn");
+MAC_STAT(n_wrk,                        uint64_t, "u", "N worker threads");
+MAC_STAT(n_wrkbusy,            uint64_t, "u", "N busy worker threads");