From: phk Date: Wed, 19 Jul 2006 21:16:03 +0000 (+0000) Subject: Rework the worker thread pool logic slightly, we were leaking X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b77aaf9ca809ef05af575a3354f88c137a163798;p=varnish Rework the worker thread pool logic slightly, we were leaking threads before. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@510 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- diff --git a/varnish-cache/bin/varnishd/cache_pool.c b/varnish-cache/bin/varnishd/cache_pool.c index 64ebc8bd..bdc3e28f 100644 --- a/varnish-cache/bin/varnishd/cache_pool.c +++ b/varnish-cache/bin/varnishd/cache_pool.c @@ -26,11 +26,37 @@ static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead); /*--------------------------------------------------------------------*/ +static void +wrk_do_one(struct worker *w) +{ + struct workreq *wrq; + + wrq = TAILQ_FIRST(&wrk_reqhead); + assert(wrq != NULL); + VSL_stats->n_wrk_busy++; + TAILQ_REMOVE(&wrk_reqhead, wrq, list); + VSL_stats->n_wrk_queue--; + AZ(pthread_mutex_unlock(&wrk_mtx)); + assert(wrq->sess != NULL); + wrq->sess->wrk = w; + CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC); + if (w->nobj != NULL) + CHECK_OBJ(w->nobj, OBJECT_MAGIC); + if (w->nobjhead != NULL) + CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); + CNT_Session(wrq->sess); + if (w->nobj != NULL) + CHECK_OBJ(w->nobj, OBJECT_MAGIC); + if (w->nobjhead != NULL) + CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); + AZ(pthread_mutex_lock(&wrk_mtx)); + VSL_stats->n_wrk_busy--; +} + static void * wrk_thread(void *priv) { struct worker *w, ww; - struct workreq *wrq; struct timespec ts; w = &ww; @@ -50,66 +76,40 @@ wrk_thread(void *priv) if (priv == NULL) { VSL_stats->n_wrk_create++; VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr); - } else { - VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr); } - TAILQ_INSERT_HEAD(&wrk_head, w, list); while (1) { CHECK_OBJ_NOTNULL(w, WORKER_MAGIC); - if (w->nobj != NULL) - CHECK_OBJ(w->nobj, OBJECT_MAGIC); - if (w->nobjhead != NULL) - CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); - wrq = TAILQ_FIRST(&wrk_reqhead); - if (wrq != NULL) { - VSL_stats->n_wrk_busy++; - TAILQ_REMOVE(&wrk_head, w, list); - TAILQ_REMOVE(&wrk_reqhead, wrq, list); - AZ(pthread_mutex_unlock(&wrk_mtx)); - assert(wrq->sess != NULL); - wrq->sess->wrk = w; - CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC); - CNT_Session(wrq->sess); - AZ(pthread_mutex_lock(&wrk_mtx)); - VSL_stats->n_wrk_busy--; - TAILQ_INSERT_HEAD(&wrk_head, w, list); - } + + /* Process overflow requests, if any */ if (wrk_overflow > 0) { wrk_overflow--; + wrk_do_one(w); continue; } - if (w->nobj != NULL) - CHECK_OBJ(w->nobj, OBJECT_MAGIC); - if (w->nobjhead != NULL) - CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); + + TAILQ_INSERT_HEAD(&wrk_head, w, list); /* If we are a reserved thread we don't die */ if (priv != NULL) { AZ(pthread_cond_wait(&w->cv, &wrk_mtx)); - continue; + } else { + /* If we are a dynamic thread, time out and die */ + AZ(clock_gettime(CLOCK_REALTIME, &ts)); + ts.tv_sec += heritage.wthread_timeout; + if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) { + VSL_stats->n_wrk--; + TAILQ_REMOVE(&wrk_head, w, list); + AZ(pthread_mutex_unlock(&wrk_mtx)); + VSL(SLT_WorkThread, 0, "%u suicide", w->nbr); + sbuf_delete(w->sb); + event_base_free(w->eb); + AZ(pthread_cond_destroy(&w->cv)); + return (NULL); + } } - if (w->nobj != NULL) - CHECK_OBJ(w->nobj, OBJECT_MAGIC); - if (w->nobjhead != NULL) - CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); - - /* If we are a dynamic thread, time out and die */ - AZ(clock_gettime(CLOCK_REALTIME, &ts)); - ts.tv_sec += heritage.wthread_timeout; - if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) { - VSL_stats->n_wrk--; - TAILQ_REMOVE(&wrk_head, w, list); - AZ(pthread_mutex_unlock(&wrk_mtx)); - VSL(SLT_WorkThread, 0, "%u suicide", w->nbr); - sbuf_delete(w->sb); - event_base_free(w->eb); - AZ(pthread_cond_destroy(&w->cv)); - return (NULL); - } - if (w->nobj != NULL) - CHECK_OBJ(w->nobj, OBJECT_MAGIC); - if (w->nobjhead != NULL) - CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC); + + /* we are already removed from wrk_head */ + wrk_do_one(w); } } @@ -127,19 +127,22 @@ WRK_QueueSession(struct sess *sp) AZ(pthread_mutex_lock(&wrk_mtx)); TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list); + VSL_stats->n_wrk_queue++; /* If there are idle threads, we tickle the first one into action */ w = TAILQ_FIRST(&wrk_head); if (w != NULL) { AZ(pthread_cond_signal(&w->cv)); + TAILQ_REMOVE(&wrk_head, w, list); AZ(pthread_mutex_unlock(&wrk_mtx)); return; } - /* Register overflow if max threads reached */ + wrk_overflow++; + + /* Can we create more threads ? */ if (VSL_stats->n_wrk >= heritage.wthread_max) { - wrk_overflow++; - VSL_stats->n_wrk_short++; + VSL_stats->n_wrk_max++; AZ(pthread_mutex_unlock(&wrk_mtx)); return; } @@ -147,6 +150,7 @@ WRK_QueueSession(struct sess *sp) /* Try to create a thread */ VSL_stats->n_wrk++; AZ(pthread_mutex_unlock(&wrk_mtx)); + if (!pthread_create(&tp, NULL, wrk_thread, NULL)) { AZ(pthread_detach(tp)); return; @@ -158,9 +162,7 @@ WRK_QueueSession(struct sess *sp) /* Register overflow */ AZ(pthread_mutex_lock(&wrk_mtx)); VSL_stats->n_wrk--; - wrk_overflow++; VSL_stats->n_wrk_failed++; - VSL_stats->n_wrk_short++; AZ(pthread_mutex_unlock(&wrk_mtx)); } diff --git a/varnish-cache/include/stat_field.h b/varnish-cache/include/stat_field.h index 77c7b2a8..655156e8 100644 --- a/varnish-cache/include/stat_field.h +++ b/varnish-cache/include/stat_field.h @@ -21,7 +21,8 @@ MAC_STAT(n_vbe_conn, uint64_t, "u", "N struct vbe_conn") MAC_STAT(n_wrk, uint64_t, "u", "N worker threads") MAC_STAT(n_wrk_create, uint64_t, "u", "N worker threads created") MAC_STAT(n_wrk_failed, uint64_t, "u", "N worker threads not created") -MAC_STAT(n_wrk_short, uint64_t, "u", "N worker threads shortages") +MAC_STAT(n_wrk_max, uint64_t, "u", "N worker threads limited") MAC_STAT(n_wrk_busy, uint64_t, "u", "N busy worker threads") +MAC_STAT(n_wrk_queue, uint64_t, "u", "N queued work requests") MAC_STAT(losthdr, uint64_t, "u", "HTTP header overflows")