*
* $Id$
*
- * XXX: automatic thread-pool size adaptation.
+ * We maintain a number of worker thread pools, to spread lock contention.
+ *
+ * Pools can be added on the fly, as a means to mitigate lock contention,
+ * but can only be removed again by a restart. (XXX: we could fix that)
+ *
+ * Two threads herd the pools, one eliminates idle threads and aggregates
+ * statistics for all the pools, the other thread creates new threads
+ * on demand, subject to various numerical constraints.
+ *
+ * The algorithm for when to create threads needs to be reactive enough
+ * to handle startup spikes, but sufficiently attenuated to not cause
+ * thread pileups. This remains subject for improvement.
*/
#include "config.h"
/*--------------------------------------------------------------------*/
-static void
-wrk_do_cnt_sess(struct worker *w, void *priv)
-{
- struct sess *sess;
-
- CAST_OBJ_NOTNULL(sess, priv, SESS_MAGIC);
- sess->wrk = w;
- CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
- CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
- w->used = NAN;
- CNT_Session(sess);
- assert(!isnan(w->used));
- CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
- CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
-}
-
-/*--------------------------------------------------------------------*/
-
static void *
wrk_thread(void *priv)
{
return (NULL);
}
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * Queue a workrequest if possible.
+ *
+ * Return zero if the request was queued, negative if it wasn't.
+ */
int
WRK_Queue(struct workreq *wrq)
return (0);
}
- /* If we have too much in the overflow already, refuse */
-
+ /* If we have too much in the overflow already, refuse. */
if (qp->nqueue > ovfl_max) {
qp->ndrop++;
UNLOCK(&qp->mtx);
/*--------------------------------------------------------------------*/
+static void
+wrk_do_cnt_sess(struct worker *w, void *priv)
+{
+ struct sess *sess;
+
+ CAST_OBJ_NOTNULL(sess, priv, SESS_MAGIC);
+ sess->wrk = w;
+ CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
+ CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
+ w->used = NAN;
+ CNT_Session(sess);
+ assert(!isnan(w->used));
+ CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
+ CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
+}
+
+/*--------------------------------------------------------------------*/
+
void
WRK_QueueSession(struct sess *sp)
{
SES_Delete(sp);
}
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * Add (more) thread pools
+ */
static void
wrk_addpools(const unsigned pools)
}
/*--------------------------------------------------------------------
- * Create more threads, if necessay & possible
+ * Create another thread, if necessary & possible
*/
static void
AZ(pthread_mutex_init(&herder_mtx, NULL));
AZ(pthread_create(&tp, NULL, wrk_herdtimer_thread, NULL));
+ AZ(pthread_detach(tp));
AZ(pthread_create(&tp, NULL, wrk_herder_thread, NULL));
AZ(pthread_detach(tp));
}