/* Number of work requests queued in excess of worker threads available */
static unsigned wrk_overflow;
-static TAILQ_HEAD(, worker) wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
-static TAILQ_HEAD(, worker) wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
+TAILQ_HEAD(workerhead, worker);
+
+static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
+static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
/*--------------------------------------------------------------------
if (w->nobjhead != NULL)
CHECK_OBJ(w->nobjhead, OBJHEAD_MAGIC);
w->wrq = NULL;
+ w->idle = time(NULL);
LOCK(&wrk_mtx);
VSL_stats->n_wrk_busy--;
}
wrk_thread(void *priv)
{
struct worker *w, ww;
- struct timespec ts;
(void)priv;
w = &ww;
memset(w, 0, sizeof *w);
w->magic = WORKER_MAGIC;
-
+ w->idle = time(NULL);
AZ(pthread_cond_init(&w->cv, NULL));
+ VSL(SLT_WorkThread, 0, "%p start", w);
LOCK(&wrk_mtx);
- w->nbr = VSL_stats->n_wrk;
VSL_stats->n_wrk_create++;
- VSL(SLT_WorkThread, 0, "%u born", w->nbr);
TAILQ_INSERT_HEAD(&wrk_busy, w, list);
while (1) {
CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
TAILQ_REMOVE(&wrk_busy, w, list);
TAILQ_INSERT_HEAD(&wrk_idle, w, list);
-
- /* If we are a reserved thread we don't die */
- if (w->nbr < params->wthread_min) {
- AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
- } else {
- /* If we are a dynamic thread, time out and die */
- AZ(clock_gettime(CLOCK_REALTIME, &ts));
- ts.tv_sec += params->wthread_timeout;
- if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
- VSL_stats->n_wrk--;
- TAILQ_REMOVE(&wrk_idle, w, list);
- UNLOCK(&wrk_mtx);
- VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
- AZ(pthread_cond_destroy(&w->cv));
- return (NULL);
- }
- }
-
- /* we are already removed from wrk_idle */
+ assert(w->idle != 0);
+ AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
+ if (w->idle == 0)
+ break;
wrk_do_one(w);
}
+ VSL_stats->n_wrk--;
+ UNLOCK(&wrk_mtx);
+ VSL(SLT_WorkThread, 0, "%p end", w);
+ AZ(pthread_cond_destroy(&w->cv));
+ return (NULL);
}
/*--------------------------------------------------------------------*/
/* If there are idle threads, we tickle the first one into action */
w = TAILQ_FIRST(&wrk_idle);
if (w != NULL) {
- AZ(pthread_cond_signal(&w->cv));
TAILQ_REMOVE(&wrk_idle, w, list);
TAILQ_INSERT_TAIL(&wrk_busy, w, list);
UNLOCK(&wrk_mtx);
+ AZ(pthread_cond_signal(&w->cv));
return;
}
VSL_stats->n_wrk_failed++;
UNLOCK(&wrk_mtx);
}
+
+/*--------------------------------------------------------------------*/
+static void *
+wrk_reaperthread(void *priv)
+{
+ time_t now;
+ struct worker *w;
+
+ (void)priv;
+ while (1) {
+ sleep(1);
+ if (VSL_stats->n_wrk <= params->wthread_min)
+ continue;
+ now = time(NULL);
+ LOCK(&wrk_mtx);
+ w = TAILQ_LAST(&wrk_idle, workerhead);
+ if (w != NULL && w->idle + params->wthread_timeout < now)
+ TAILQ_REMOVE(&wrk_idle, w, list);
+ else
+ w = NULL;
+ UNLOCK(&wrk_mtx);
+ if (w == NULL)
+ continue;
+ w->idle = 0;
+ AZ(pthread_cond_signal(&w->cv));
+ }
+ INCOMPL();
+}
/*--------------------------------------------------------------------*/
AZ(pthread_mutex_init(&wrk_mtx, NULL));
+ AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
+ AZ(pthread_detach(tp));
+
VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min);
for (i = 0; i < params->wthread_min; i++) {
VSL_stats->n_wrk++;
t = time(NULL);
TAILQ_FOREACH(w, &wrk_busy, list) {
cli_out(cli, "\n");
- cli_out(cli, "W %p nbr %d ", w, w->nbr);
+ cli_out(cli, "W %p", w);
if (w->wrq == NULL)
continue;
s = w->wrq->sess;