* $Id$
*
* XXX: automatic thread-pool size adaptation.
+ * XXX: unlocked stats variables: consider summing over pools in timer thread.
*/
#include "config.h"
/* Number of work requests queued in excess of worker threads available */
struct wq {
+ unsigned magic;
+#define WQ_MAGIC 0x606658fa
MTX mtx;
struct workerhead idle;
- unsigned nwrk;
+ VTAILQ_HEAD(, workreq) overflow;
+ unsigned nthr;
+ unsigned nqueue;
+ uintmax_t drops;
};
-static MTX tmtx;
-static VTAILQ_HEAD(, workreq) overflow = VTAILQ_HEAD_INITIALIZER(overflow);
-
static struct wq **wq;
static unsigned nwq;
+static unsigned ovfl_max;
+static unsigned nthr_max;
+static unsigned nthr_min;
+
+static pthread_cond_t herder_cond;
+static MTX herder_mtx;
/*--------------------------------------------------------------------
* Write data to fd
/*--------------------------------------------------------------------*/
static void
-wrk_do_one(struct worker *w)
+wrk_do_cnt_sess(struct worker *w, void *priv)
{
- struct workreq *wrq;
+ struct sess *sess;
- AN(w->wrq);
- wrq = w->wrq;
- CHECK_OBJ_NOTNULL(wrq->sess, SESS_MAGIC);
- wrq->sess->wrk = w;
+ CAST_OBJ_NOTNULL(sess, priv, SESS_MAGIC);
+ sess->wrk = w;
CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
w->used = NAN;
- CNT_Session(wrq->sess);
+ CNT_Session(sess);
assert(!isnan(w->used));
CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
- w->wrq = NULL;
}
+/*--------------------------------------------------------------------*/
+
static void *
wrk_thread(void *priv)
{
struct worker *w, ww;
struct wq *qp;
unsigned char wlog[8192]; /* XXX: size */
+ struct workreq *wrq;
THR_Name("cache-worker");
w = &ww;
- qp = priv;
+ CAST_OBJ_NOTNULL(qp, priv, WQ_MAGIC);
memset(w, 0, sizeof *w);
w->magic = WORKER_MAGIC;
w->used = TIM_real();
AZ(pthread_cond_init(&w->cond, NULL));
VSL(SLT_WorkThread, 0, "%p start", w);
- LOCK(&tmtx);
- VSL_stats->n_wrk_create++;
- UNLOCK(&tmtx);
while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
assert(!isnan(w->used));
- w->wrq = VTAILQ_FIRST(&overflow);
+ /* Process overflow requests, if any */
+ LOCK(&qp->mtx);
+ w->wrq = VTAILQ_FIRST(&qp->overflow);
if (w->wrq != NULL) {
- LOCK(&tmtx);
-
- /* Process overflow requests, if any */
- w->wrq = VTAILQ_FIRST(&overflow);
- if (w->wrq != NULL) {
- VSL_stats->n_wrk_queue--;
- VTAILQ_REMOVE(&overflow, w->wrq, list);
- }
- UNLOCK(&tmtx);
- }
- if (w->wrq == NULL) {
- LOCK(&qp->mtx);
+ VTAILQ_REMOVE(&qp->overflow, w->wrq, list);
+ qp->nqueue--;
+ VSL_stats->n_wrk_queue--; /* XXX: unlocked */
+ } else {
VTAILQ_INSERT_HEAD(&qp->idle, w, list);
AZ(pthread_cond_wait(&w->cond, &qp->mtx));
- UNLOCK(&qp->mtx);
}
+ UNLOCK(&qp->mtx);
if (w->wrq == NULL)
break;
- wrk_do_one(w);
+ AN(w->wrq);
+ wrq = w->wrq;
+ AN(wrq->func);
+ wrq->func(w, wrq->priv);
+ w->wrq = NULL;
}
- LOCK(&tmtx);
- VSL_stats->n_wrk--;
- qp->nwrk--;
- UNLOCK(&tmtx);
-
VSL(SLT_WorkThread, 0, "%p end", w);
if (w->vcl != NULL)
VCL_Rel(&w->vcl);
/*--------------------------------------------------------------------*/
-void
-WRK_QueueSession(struct sess *sp)
+int
+WRK_Queue(struct workreq *wrq)
{
struct worker *w;
- pthread_t tp;
struct wq *qp;
static unsigned nq = 0;
unsigned onq;
+ /*
+ * Select which pool we issue to
+ * XXX: better alg ?
+ * XXX: per CPU ?
+ */
onq = nq + 1;
if (onq >= nwq)
onq = 0;
- sp->workreq.sess = sp;
qp = wq[onq];
nq = onq;
if (w != NULL) {
VTAILQ_REMOVE(&qp->idle, w, list);
UNLOCK(&qp->mtx);
- w->wrq = &sp->workreq;
+ w->wrq = wrq;
AZ(pthread_cond_signal(&w->cond));
- return;
+ return (0);
}
- UNLOCK(&qp->mtx);
+ /* If we have too much in the overflow already, refuse */
- LOCK(&tmtx);
- /*
- * If we have too much in the overflow, and this is a new session
- * just drop it. We do not drop sessions which were waiting for
- * a busy object, they will be cheap to serve from here and the
- * cleanup would be more complex to carry out than delivering
- * the result will be
- */
- if (sp->obj == NULL &&
- (VSL_stats->n_wrk_queue >
- (params->wthread_max * params->overflow_max) / 100)) {
- VSL_stats->n_wrk_drop++;
- UNLOCK(&tmtx);
- sp->t_end = TIM_real();
- vca_close_session(sp, "dropped");
- if(sp->vcl != NULL) {
- /*
- * A session parked on a busy object can come here
- * after it wakes up. Loose the VCL reference.
- */
- VCL_Rel(&sp->vcl);
- }
- SES_Delete(sp);
- return;
- }
- /*
- * XXX: If there are too many requests in the overflow queue
- * XXX: we should kill the request right here.
- * XXX: Not sure how though. Simply closing may be the better
- * XXX: compromise.
- */
- VTAILQ_INSERT_TAIL(&overflow, &sp->workreq, list);
- VSL_stats->n_wrk_overflow++;
- VSL_stats->n_wrk_queue++;
- /* Can we create more threads ? */
- if (VSL_stats->n_wrk >= params->wthread_max ||
- qp->nwrk * nwq >= params->wthread_max) {
- VSL_stats->n_wrk_max++;
- UNLOCK(&tmtx);
- return;
+ if (qp->nqueue > ovfl_max) {
+ qp->drops++;
+ VSL_stats->n_wrk_drop++; /* XXX: unlocked */
+ UNLOCK(&qp->mtx);
+ return (-1);
}
- /* Try to create a thread */
- VSL_stats->n_wrk++;
- qp->nwrk++;
- UNLOCK(&tmtx);
+ VTAILQ_INSERT_TAIL(&qp->overflow, wrq, list);
+ VSL_stats->n_wrk_queue++; /* XXX: unlocked */
+ VSL_stats->n_wrk_overflow++; /* XXX: unlocked */
+ qp->nqueue++;
+ UNLOCK(&qp->mtx);
+ AZ(pthread_cond_signal(&herder_cond));
+ return (0);
+}
- if (!pthread_create(&tp, NULL, wrk_thread, qp)) {
- AZ(pthread_detach(tp));
- return;
- }
+/*--------------------------------------------------------------------*/
- VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
- errno, strerror(errno));
+void
+WRK_QueueSession(struct sess *sp)
+{
+ sp->workreq.func = wrk_do_cnt_sess;
+ sp->workreq.priv = sp;
+ if (WRK_Queue(&sp->workreq) == 0)
+ return;
- LOCK(&tmtx);
- /* Register overflow */
- qp->nwrk--;
- VSL_stats->n_wrk--;
- VSL_stats->n_wrk_failed++;
- UNLOCK(&tmtx);
+ /*
+ * Couldn't queue it -- kill it.
+ *
+ * XXX: a notice might be polite, but would potentially
+ * XXX: sleep whichever thread got us here
+ */
+ sp->t_end = TIM_real();
+ vca_close_session(sp, "dropped");
+ if(sp->vcl != NULL) {
+ /*
+ * A session parked on a busy object can come here
+ * after it wakes up. Loose the VCL reference.
+ */
+ VCL_Rel(&sp->vcl);
+ }
+ SES_Delete(sp);
}
/*--------------------------------------------------------------------*/
static void
-wrk_addpools(unsigned t)
+wrk_addpools(const unsigned pools)
{
struct wq **pwq, **owq;
unsigned u;
- if (t <= nwq)
- return;
-
- pwq = calloc(sizeof *pwq, params->wthread_pools);
+ pwq = calloc(sizeof *pwq, pools);
if (pwq == NULL)
return;
if (wq != NULL)
memcpy(pwq, wq, sizeof *pwq * nwq);
owq = wq;
wq = pwq;
- for (u = nwq; u < t; u++) {
+ for (u = nwq; u < pools; u++) {
wq[u] = calloc(sizeof *wq[u], 1);
XXXAN(wq[u]);
+ wq[u]->magic = WQ_MAGIC;
MTX_INIT(&wq[u]->mtx);
+ VTAILQ_INIT(&wq[u]->overflow);
VTAILQ_INIT(&wq[u]->idle);
}
- free(owq);
- nwq = t;
+ (void)owq; /* XXX: avoid race, leak it. */
+ nwq = pools;
}
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * If a thread is idle or excess, pick it out of the pool...
+ */
-static void *
-wrk_reaperthread(void *priv)
+static void
+wrk_decimate_flock(struct wq *qp, double t_idle)
{
- double now;
struct worker *w;
- struct wq *qp;
+
+ if (qp->nthr <= nthr_min)
+ return;
+
+ LOCK(&qp->mtx);
+ w = VTAILQ_LAST(&qp->idle, workerhead);
+ if (w != NULL && (w->used < t_idle || qp->nthr > nthr_max))
+ VTAILQ_REMOVE(&qp->idle, w, list);
+ UNLOCK(&qp->mtx);
+
+ /* And give it a kiss on the cheek... */
+ if (w != NULL) {
+ AZ(w->wrq);
+ AZ(pthread_cond_signal(&w->cond));
+ qp->nthr--;
+ VSL_stats->n_wrk--; /* XXX: unlocked */
+ (void)usleep(params->wthread_purge_delay * 1000);
+ }
+}
+
+/*--------------------------------------------------------------------
+ * Periodic pool herding thread
+ *
+ * Do things which we can do at our leisure:
+ * Add pools
+ * Scale constants
+ * Get rid of excess threads
+ */
+
+static void *
+wrk_herdtimer_thread(void *priv)
+{
+ volatile unsigned u;
+ double t_idle;
+
+ THR_Name("wrk_herdtimer");
+
+ (void)priv;
+ while (1) {
+ /* Add Pools */
+ u = params->wthread_pools;
+ if (u > nwq)
+ wrk_addpools(u);
+
+ /* Scale parameters */
+ u = params->wthread_min / nwq;
+ if (u < 1)
+ u = 1;
+ nthr_min = u;
+
+ u = params->wthread_max / nwq;
+ if (u < nthr_min)
+ u = nthr_min;
+ nthr_max = u;
+
+ ovfl_max = (nthr_max * params->overflow_max) / 100;
+
+ t_idle = TIM_real() - params->wthread_timeout;
+ for (u = 0; u < nwq; u++)
+ wrk_decimate_flock(wq[u], t_idle);
+
+ (void)usleep(params->wthread_purge_delay * 1000);
+ }
+}
+
+/*--------------------------------------------------------------------
+ * Create more threads, if necessay & possible
+ */
+
+static void
+wrk_breed_flock(struct wq *qp)
+{
+ pthread_t tp;
+
+ /*
+ * If we need more threads, and have space, create
+ * one more thread.
+ */
+ if (qp->nqueue > params->wthread_add_threshold ||
+ qp->nthr < nthr_min) {
+ if (qp->nthr >= nthr_max) {
+ VSL_stats->n_wrk_max++;
+ } else if (pthread_create(&tp, NULL, wrk_thread, qp)) {
+ VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+ errno, strerror(errno));
+ VSL_stats->n_wrk_failed++;
+ (void)usleep(params->wthread_fail_delay * 1000);
+ } else {
+ qp->nthr++;
+ AZ(pthread_detach(tp));
+ VSL_stats->n_wrk++; /* XXX: unlocked */
+ VSL_stats->n_wrk_create++;
+ (void)usleep(params->wthread_add_delay * 1000);
+ }
+ }
+}
+
+/*--------------------------------------------------------------------
+ * This thread wakes up whenever a pool overflows.
+ *
+ * The trick here is to not be too aggressive about creating threads.
+ * We do this by only examining one pool at a time, and by sleeping
+ * a short while whenever we create a thread and a little while longer
+ * whenever we fail to, hopefully missing a lot of cond_signals in
+ * the meantime.
+ *
+ * XXX: probably need a lot more work.
+ *
+ */
+
+static void *
+wrk_herder_thread(void *priv)
+{
unsigned u;
+ THR_Name("wrk_herder");
(void)priv;
while (1) {
- wrk_addpools(params->wthread_pools);
- AZ(sleep(1));
- if (VSL_stats->n_wrk <= params->wthread_min)
- continue;
- now = TIM_real();
- for (u = 0; u < nwq; u++) {
- qp = wq[u];
- LOCK(&qp->mtx);
- w = VTAILQ_LAST(&qp->idle, workerhead);
- if (w != NULL &&
- (w->used + params->wthread_timeout < now ||
- VSL_stats->n_wrk > params->wthread_max))
- VTAILQ_REMOVE(&qp->idle, w, list);
- else
- w = NULL;
- UNLOCK(&qp->mtx);
- if (w == NULL)
- continue;
- AZ(w->wrq);
- AZ(pthread_cond_signal(&w->cond));
+ for (u = 0 ; u < nwq; u++) {
+ /*
+ * We cannot avoid getting a mutex, so we have a
+ * bogo mutex just for POSIX_STUPIDITY
+ */
+ AZ(pthread_mutex_lock(&herder_mtx));
+ AZ(pthread_cond_wait(&herder_cond, &herder_mtx));
+ AZ(pthread_mutex_unlock(&herder_mtx));
+ wrk_breed_flock(wq[u]);
}
}
}
{
pthread_t tp;
- wrk_addpools(params->wthread_pools);
- MTX_INIT(&tmtx);
- AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
+ AZ(pthread_cond_init(&herder_cond, NULL));
+ AZ(pthread_mutex_init(&herder_mtx, NULL));
+
+ AZ(pthread_create(&tp, NULL, wrk_herdtimer_thread, NULL));
+ AZ(pthread_create(&tp, NULL, wrk_herder_thread, NULL));
AZ(pthread_detach(tp));
}
0,
"120", "seconds" },
{ "thread_pools", tweak_uint, &master.wthread_pools, 1, UINT_MAX,
- "Number of worker pools. "
+ "Number of worker thread pools.\n"
+ "\n"
"Increasing number of worker pools decreases lock "
- "contention but increases the number of threads as well. "
+ "contention.\n"
+ "\n"
+ "Too many pools waste CPU and RAM resources, and more than "
+ "one pool for each CPU is probably detrimal to performance.\n"
+ "\n"
"Can be increased on the fly, but decreases require a "
"restart to take effect.",
- EXPERIMENTAL,
+ EXPERIMENTAL | DELAYED_EFFECT,
"1", "pools" },
- { "thread_pool_max", tweak_thread_pool_max, NULL, 0, 0,
- "The maximum number of threads in the total worker pool.\n"
- "-1 is unlimited.",
+ { "thread_pool_max", tweak_thread_pool_max, NULL, 1, 0,
+ "The maximum number of worker threads in all pools combined.\n"
+ "\n"
+ "Do not set this higher than you have to, since excess "
+ "worker threads soak up RAM and CPU and generally just get "
+ "in the way of getting work done.\n",
EXPERIMENTAL | DELAYED_EFFECT,
- "1000", "threads" },
- { "thread_pool_min", tweak_thread_pool_min, NULL, 0, 0,
- "The minimum number of threads in the worker pool.\n"
+ "100", "threads" },
+ { "thread_pool_min", tweak_thread_pool_min, NULL, 1, 0,
+ "The minimum number of threads in all worker pools combined.\n"
+ "\n"
+ "Increasing this may help ramp up faster from low load "
+ "situations where threads have expired.\n"
+ "\n"
"Minimum is 1 thread.",
EXPERIMENTAL | DELAYED_EFFECT,
"1", "threads" },
- { "thread_pool_timeout", tweak_timeout, &master.wthread_timeout, 0, 0,
- "Thread dies after this many seconds of inactivity.\n"
+ { "thread_pool_timeout", tweak_timeout, &master.wthread_timeout, 1, 0,
+ "Thread idle threshold.\n"
+ "\n"
+ "Threads in excess of thread_pool_min, which have been idle "
+ "for at least this long are candidates for purging.\n"
+ "\n"
"Minimum is 1 second.",
EXPERIMENTAL | DELAYED_EFFECT,
"120", "seconds" },
+ { "thread_pool_purge_delay",
+ tweak_timeout, &master.wthread_purge_delay, 100, 0,
+ "Wait this long between purging threads.\n"
+ "\n"
+ "This controls the decay of thread pools when idle(-ish).\n"
+ "\n"
+ "Minimum is 100 milliseconds.",
+ EXPERIMENTAL | DELAYED_EFFECT,
+ "1000", "milliseconds" },
+ { "thread_pool_add_threshold",
+ tweak_uint, &master.wthread_add_threshold, 0, UINT_MAX,
+ "Overflow threshold for worker thread creation.\n"
+ "\n"
+ "Setting this too low, will result in excess worker threads, "
+ "which is generally a bad idea.\n"
+ "\n"
+ "Setting it too high results in insuffient worker threads.\n",
+ EXPERIMENTAL,
+ "2", "requests" },
+ { "thread_pool_add_delay",
+ tweak_timeout, &master.wthread_add_delay, 0, UINT_MAX,
+ "Wait at least this long between creating threads.\n"
+ "\n"
+ "Setting this too long results in insuffient worker threads.\n"
+ "\n"
+ "Setting this too short increases the risk of worker "
+ "thread pile-up.\n",
+ EXPERIMENTAL,
+ "10", "milliseconds" },
+ { "thread_pool_fail_delay",
+ tweak_timeout, &master.wthread_fail_delay, 100, UINT_MAX,
+ "Wait at least this long after a failed thread creation "
+ "before trying to create another thread.\n"
+ "\n"
+ "Failure to create a worker thread is often a sign that "
+ " the end is near, because the process is running out of "
+ "RAM resources for thread stacks.\n"
+ "This delay tries to not rush it on needlessly.\n"
+ "\n"
+ "If thread creation failures are a problem, check that "
+ "thread_pool_max is not too high.\n"
+ "\n"
+ "It may also help to increase thread_pool_timeout and "
+ "thread_pool_min, to reduce the rate at which treads are "
+ "destroyed and later recreated.\n",
+ EXPERIMENTAL,
+ "200", "milliseconds" },
{ "overflow_max", tweak_uint, &master.overflow_max, 0, UINT_MAX,
- "Limit on overflow queue length in percent of "
- "thread_pool_max parameter.",
+ "Percentage permitted overflow queue length.\n"
+ "\n"
+ "This sets the ratio of queued requests to worker threads, "
+ "above which sessions will be dropped instead of queued.\n",
EXPERIMENTAL,
"100", "%" },
{ "rush_exponent", tweak_uint, &master.rush_exponent, 2, UINT_MAX,