From e2f62ab47ad935ccc8d45bdd7b231e4289548260 Mon Sep 17 00:00:00 2001 From: phk Date: Mon, 10 Jul 2006 14:52:04 +0000 Subject: [PATCH] Rewrite the worker thread pool code. Assign prefix WRK to the worker pool. Introduce a struct workreq since the prefetcher (when it happens) will not have a session to pass in. The worker threads get a cond_var each and are hung from a list in most recently used order. When a request is queued and the worker thread list is not empty, tickle the cond_var of the first one. If no threads were availble the max number of threads is not reached, try to start another worker thread. If the max was reached or the start filed (likely due to out of memory) indicate overflow and let the existing pool deal with it. Create only the minimum requested number of threads initially. Allow specification of the timeout before a dynamic worker thread commits suicide to be specified with -w. Default parameters are -w1,UINT_MAX,10 {min, max, timeout} git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@409 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/cache.h | 15 +- varnish-cache/bin/varnishd/cache_acceptor.c | 2 +- varnish-cache/bin/varnishd/cache_main.c | 2 +- varnish-cache/bin/varnishd/cache_pool.c | 243 ++++++++++++++------ varnish-cache/bin/varnishd/heritage.h | 4 +- varnish-cache/bin/varnishd/varnishd.c | 43 ++-- varnish-cache/bin/varnishlog/varnishlog.c | 1 + varnish-cache/include/shmlog_tags.h | 1 + varnish-cache/include/stat_field.h | 2 + 9 files changed, 220 insertions(+), 93 deletions(-) diff --git a/varnish-cache/bin/varnishd/cache.h b/varnish-cache/bin/varnishd/cache.h index 8013c0d3..c7ffc8b5 100644 --- a/varnish-cache/bin/varnishd/cache.h +++ b/varnish-cache/bin/varnishd/cache.h @@ -52,6 +52,15 @@ struct worker { struct sbuf *sb; struct objhead *nobjhead; struct object *nobj; + + unsigned nbr; + pthread_cond_t cv; + TAILQ_ENTRY(worker) list; +}; + +struct workreq { + TAILQ_ENTRY(workreq) list; + struct sess *sess; }; #include "hash_slinger.h" @@ -150,6 +159,8 @@ struct sess { /* Various internal stuff */ struct sessmem *mem; time_t t0; + + struct workreq workreq; }; struct backend { @@ -238,8 +249,8 @@ void PassSession(struct worker *w, struct sess *sp); void PipeSession(struct worker *w, struct sess *sp); /* cache_pool.c */ -void CacheInitPool(void); -void DealWithSession(void *arg); +void WRK_Init(void); +void WRK_QueueSession(struct sess *sp); /* cache_shmlog.c */ diff --git a/varnish-cache/bin/varnishd/cache_acceptor.c b/varnish-cache/bin/varnishd/cache_acceptor.c index 329f10e2..1469588f 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor.c +++ b/varnish-cache/bin/varnishd/cache_acceptor.c @@ -190,7 +190,7 @@ vca_callback(void *arg, int bad) vca_return_session(sp); return; } - DealWithSession(sp); + WRK_QueueSession(sp); } static void diff --git a/varnish-cache/bin/varnishd/cache_main.c b/varnish-cache/bin/varnishd/cache_main.c index bd73c86e..a756acc3 100644 --- a/varnish-cache/bin/varnishd/cache_main.c +++ b/varnish-cache/bin/varnishd/cache_main.c @@ -109,7 +109,7 @@ child_main(void) AZ(pthread_mutex_init(&sessmtx, NULL)); VBE_Init(); VSL_Init(); - CacheInitPool(); + WRK_Init(); VCA_Init(); EXP_Init(); diff --git a/varnish-cache/bin/varnishd/cache_pool.c b/varnish-cache/bin/varnishd/cache_pool.c index 0d145566..07a41a25 100644 --- a/varnish-cache/bin/varnishd/cache_pool.c +++ b/varnish-cache/bin/varnishd/cache_pool.c @@ -5,6 +5,7 @@ */ #include +#include #include #include #include @@ -16,11 +17,15 @@ #include "vcl.h" #include "cache.h" -static TAILQ_HEAD(, sess) shd = TAILQ_HEAD_INITIALIZER(shd); - -static pthread_cond_t shdcnd; +static pthread_mutex_t wrk_mtx; static unsigned xids; +/* Number of work requests queued in excess of worker threads available */ +static unsigned wrk_overflow; + +static TAILQ_HEAD(, worker) wrk_head = TAILQ_HEAD_INITIALIZER(wrk_head); +static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead); + /*--------------------------------------------------------------------*/ static int @@ -52,114 +57,202 @@ DeliverSession(struct worker *w, struct sess *sp) return (1); } -static void * -CacheWorker(void *priv) +static void +wrk_WorkSession(struct worker *w, struct sess *sp) { - struct sess *sp; - struct worker w; int done; char *b; - memset(&w, 0, sizeof w); - w.eb = event_init(); - assert(w.eb != NULL); - w.sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND); - assert(w.sb != NULL); - - (void)priv; + time(&sp->t0); AZ(pthread_mutex_lock(&sessmtx)); + sp->vcl = GetVCL(); + AZ(pthread_mutex_unlock(&sessmtx)); + + done = http_Dissect(sp->http, sp->fd, 1); + if (done != 0) { + RES_Error(w, sp, done, NULL); + goto out; + } + + sp->backend = sp->vcl->backend[0]; + + VCL_recv_method(sp); + + for (done = 0; !done; ) { + switch(sp->handling) { + case VCL_RET_LOOKUP: + done = LookupSession(w, sp); + break; + case VCL_RET_FETCH: + done = FetchSession(w, sp); + break; + case VCL_RET_DELIVER: + done = DeliverSession(w, sp); + break; + case VCL_RET_PIPE: + PipeSession(w, sp); + done = 1; + break; + case VCL_RET_PASS: + PassSession(w, sp); + done = 1; + break; + default: + INCOMPL(); + } + } + if (http_GetHdr(sp->http, "Connection", &b) && + !strcmp(b, "close")) { + vca_close_session(sp, "Connection header"); + } else if (http_GetProto(sp->http, &b) && + strcmp(b, "HTTP/1.1")) { + vca_close_session(sp, "not HTTP/1.1"); + } + +out: + AZ(pthread_mutex_lock(&sessmtx)); + RelVCL(sp->vcl); + AZ(pthread_mutex_unlock(&sessmtx)); + sp->vcl = NULL; + vca_return_session(sp); +} + +/*--------------------------------------------------------------------*/ + +static void * +wrk_thread(void *priv) +{ + struct worker *w, ww; + struct workreq *wrq; + struct timespec ts; + + w = &ww; + memset(w, 0, sizeof w); + + AZ(pthread_cond_init(&w->cv, NULL)); + + w->eb = event_init(); + assert(w->eb != NULL); + + w->sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND); + assert(w->sb != NULL); + + AZ(pthread_mutex_lock(&wrk_mtx)); + VSL_stats->n_wrk++; + w->nbr = VSL_stats->n_wrk; + if (priv == NULL) + VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr); + else + VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr); + TAILQ_INSERT_HEAD(&wrk_head, w, list); while (1) { - while (1) { - sp = TAILQ_FIRST(&shd); - if (sp != NULL) - break; - AZ(pthread_cond_wait(&shdcnd, &sessmtx)); + wrq = TAILQ_FIRST(&wrk_reqhead); + if (wrq != NULL) { + VSL_stats->n_wrkbusy++; + TAILQ_REMOVE(&wrk_head, w, list); + TAILQ_REMOVE(&wrk_reqhead, wrq, list); + AZ(pthread_mutex_unlock(&wrk_mtx)); + assert(wrq->sess != NULL); + wrk_WorkSession(w, wrq->sess); + AZ(pthread_mutex_lock(&wrk_mtx)); + VSL_stats->n_wrkbusy--; + TAILQ_INSERT_HEAD(&wrk_head, w, list); } - TAILQ_REMOVE(&shd, sp, list); - time(&sp->t0); - sp->vcl = GetVCL(); - AZ(pthread_mutex_unlock(&sessmtx)); - - done = http_Dissect(sp->http, sp->fd, 1); - if (done != 0) { - RES_Error(&w, sp, done, NULL); - goto out; + if (wrk_overflow > 0) { + wrk_overflow--; + continue; } - sp->backend = sp->vcl->backend[0]; - - VCL_recv_method(sp); - - for (done = 0; !done; ) { - switch(sp->handling) { - case VCL_RET_LOOKUP: - done = LookupSession(&w, sp); - break; - case VCL_RET_FETCH: - done = FetchSession(&w, sp); - break; - case VCL_RET_DELIVER: - done = DeliverSession(&w, sp); - break; - case VCL_RET_PIPE: - PipeSession(&w, sp); - done = 1; - break; - case VCL_RET_PASS: - PassSession(&w, sp); - done = 1; - break; - default: - INCOMPL(); - } - } - if (http_GetHdr(sp->http, "Connection", &b) && - !strcmp(b, "close")) { - vca_close_session(sp, "Connection header"); - } else if (http_GetProto(sp->http, &b) && - strcmp(b, "HTTP/1.1")) { - vca_close_session(sp, "not HTTP/1.1"); + /* If we are a reserved thread we don't die */ + if (priv != NULL) { + AZ(pthread_cond_wait(&w->cv, &wrk_mtx)); + continue; } -out: - AZ(pthread_mutex_lock(&sessmtx)); - RelVCL(sp->vcl); - sp->vcl = NULL; - vca_return_session(sp); + /* If we are a dynamic thread, time out and die */ + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += heritage.wthread_timeout; + if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) { + VSL_stats->n_wrk--; + TAILQ_REMOVE(&wrk_head, w, list); + AZ(pthread_mutex_unlock(&wrk_mtx)); + VSL(SLT_WorkThread, 0, "%u suicide", w->nbr); + sbuf_delete(w->sb); + event_base_free(w->eb); + AZ(pthread_cond_destroy(&w->cv)); + return (NULL); + } } } +/*--------------------------------------------------------------------*/ + void -DealWithSession(void *arg) +WRK_QueueSession(struct sess *sp) { - struct sess *sp = arg; + struct worker *w; + pthread_t tp; time(&sp->t_req); /* * No locking necessary, we're serialized in the acceptor thread + * XXX: still ? */ sp->xid = xids++; VSL(SLT_XID, sp->fd, "%u", sp->xid); + sp->workreq.sess = sp; VSL_stats->client_req++; - AZ(pthread_mutex_lock(&sessmtx)); - TAILQ_INSERT_TAIL(&shd, sp, list); - AZ(pthread_mutex_unlock(&sessmtx)); - AZ(pthread_cond_signal(&shdcnd)); + + AZ(pthread_mutex_lock(&wrk_mtx)); + TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list); + + /* If there are idle threads, we tickle the first one into action */ + w = TAILQ_FIRST(&wrk_head); + if (w != NULL) { + AZ(pthread_cond_signal(&w->cv)); + AZ(pthread_mutex_unlock(&wrk_mtx)); + return; + } + + /* Register overflow if max threads reached */ + if (VSL_stats->n_wrk >= heritage.wthread_max) { + wrk_overflow++; + AZ(pthread_mutex_unlock(&wrk_mtx)); + return; + } + + /* Try to create a thread */ + AZ(pthread_mutex_unlock(&wrk_mtx)); + if (!pthread_create(&tp, NULL, wrk_thread, NULL)) { + AZ(pthread_detach(tp)); + return; + } + + VSL(SLT_Debug, 0, "Create worker thread failed %d %s", + errno, strerror(errno)); + + /* Register overflow */ + AZ(pthread_mutex_lock(&wrk_mtx)); + wrk_overflow++; + AZ(pthread_mutex_unlock(&wrk_mtx)); } + + +/*--------------------------------------------------------------------*/ void -CacheInitPool(void) +WRK_Init(void) { pthread_t tp; int i; - AZ(pthread_cond_init(&shdcnd, NULL)); + AZ(pthread_mutex_init(&wrk_mtx, NULL)); VSL(SLT_Debug, 0, "Starting %u worker threads", heritage.wthread_min); for (i = 0; i < heritage.wthread_min; i++) { - AZ(pthread_create(&tp, NULL, CacheWorker, NULL)); + AZ(pthread_create(&tp, NULL, wrk_thread, &i)); AZ(pthread_detach(tp)); } srandomdev(); diff --git a/varnish-cache/bin/varnishd/heritage.h b/varnish-cache/bin/varnishd/heritage.h index 54f7c816..c51fdc37 100644 --- a/varnish-cache/bin/varnishd/heritage.h +++ b/varnish-cache/bin/varnishd/heritage.h @@ -37,7 +37,9 @@ struct heritage { unsigned default_ttl; /* Worker threads */ - unsigned wthread_min, wthread_max; + unsigned wthread_min; + unsigned wthread_max; + unsigned wthread_timeout; /* Memory allocation hints */ unsigned mem_http_headerspace; diff --git a/varnish-cache/bin/varnishd/varnishd.c b/varnish-cache/bin/varnishd/varnishd.c index cc2b776f..e16a72d4 100644 --- a/varnish-cache/bin/varnishd/varnishd.c +++ b/varnish-cache/bin/varnishd/varnishd.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -419,8 +420,8 @@ usage(void) fprintf(stderr, " %-28s # %s\n", "-s kind[,storageoptions]", "Backend storage specification"); fprintf(stderr, " %-28s # %s\n", "-t", "Default TTL"); - fprintf(stderr, " %-28s # %s\n", "-w int[,int]", - "Number of worker threads (fixed/{min,max})"); + fprintf(stderr, " %-28s # %s\n", "-w int[,int[,int]]", + "Number of worker threads (fixed/{min,max}/{min/max/timeout})"); #if 0 -c clusterid@cluster_controller -m memory_limit @@ -433,6 +434,28 @@ usage(void) } +/*--------------------------------------------------------------------*/ + +static void +tackle_warg(const char *argv) +{ + int i; + unsigned ua, ub, uc; + + i = sscanf(argv, "%u,%u,%u", &ua, &ub, &uc); + if (i == 0) + usage(); + if (ua < 1) + usage(); + heritage.wthread_min = ua; + heritage.wthread_max = ua; + heritage.wthread_timeout = 10; + if (i >= 2) + heritage.wthread_max = ub; + if (i >= 3) + heritage.wthread_timeout = uc; +} + /*--------------------------------------------------------------------*/ /* for development purposes */ @@ -442,8 +465,7 @@ usage(void) int main(int argc, char *argv[]) { - int o, i; - unsigned ua, ub; + int o; const char *portnumber = "8080"; unsigned dflag = 1; /* XXX: debug=on for now */ const char *bflag = NULL; @@ -456,8 +478,9 @@ main(int argc, char *argv[]) VCC_InitCompile(); heritage.default_ttl = 120; - heritage.wthread_min = 5; - heritage.wthread_max = 5; + heritage.wthread_min = 1; + heritage.wthread_max = UINT_MAX; + heritage.wthread_timeout = 10; heritage.mem_http_headerspace= 4096; heritage.mem_http_headers= 32; heritage.mem_workspace = 0; @@ -486,13 +509,7 @@ main(int argc, char *argv[]) heritage.default_ttl = strtoul(optarg, NULL, 0); break; case 'w': - i = sscanf(optarg, "%u,%u", &ua, &ub); - if (i == 0) - usage(); - heritage.wthread_min = ua; - heritage.wthread_max = ua; - if (i == 2) - heritage.wthread_max = ub; + tackle_warg(optarg); break; default: usage(); diff --git a/varnish-cache/bin/varnishlog/varnishlog.c b/varnish-cache/bin/varnishlog/varnishlog.c index b46c1931..bdba6759 100644 --- a/varnish-cache/bin/varnishlog/varnishlog.c +++ b/varnish-cache/bin/varnishlog/varnishlog.c @@ -182,6 +182,7 @@ order(unsigned char *p, int h_opt) case SLT_SessionClose: case SLT_SessionReuse: case SLT_BackendClose: + case SLT_BackendReuse: sbuf_finish(ob[u]); if ((hc[u] != 4 || h_opt == 0) && sbuf_len(ob[u]) > 1) printf("%s\n", sbuf_data(ob[u])); diff --git a/varnish-cache/include/shmlog_tags.h b/varnish-cache/include/shmlog_tags.h index b031d927..fef0282f 100644 --- a/varnish-cache/include/shmlog_tags.h +++ b/varnish-cache/include/shmlog_tags.h @@ -36,3 +36,4 @@ SLTM(Hit) SLTM(ExpBan) SLTM(ExpPick) SLTM(ExpKill) +SLTM(WorkThread) diff --git a/varnish-cache/include/stat_field.h b/varnish-cache/include/stat_field.h index 2baf744f..f3a81ef9 100644 --- a/varnish-cache/include/stat_field.h +++ b/varnish-cache/include/stat_field.h @@ -16,5 +16,7 @@ MAC_STAT(n_header, uint64_t, "u", "N struct header"); MAC_STAT(n_smf, uint64_t, "u", "N struct smf"); MAC_STAT(n_vbe, uint64_t, "u", "N struct vbe"); MAC_STAT(n_vbe_conn, uint64_t, "u", "N struct vbe_conn"); +MAC_STAT(n_wrk, uint64_t, "u", "N worker threads"); +MAC_STAT(n_wrkbusy, uint64_t, "u", "N busy worker threads"); -- 2.39.5