*/
#include <stdio.h>
+#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "vcl.h"
#include "cache.h"
-static TAILQ_HEAD(, sess) shd = TAILQ_HEAD_INITIALIZER(shd);
-
-static pthread_cond_t shdcnd;
+static pthread_mutex_t wrk_mtx;
static unsigned xids;
+/* Number of work requests queued in excess of worker threads available */
+static unsigned wrk_overflow;
+
+static TAILQ_HEAD(, worker) wrk_head = TAILQ_HEAD_INITIALIZER(wrk_head);
+static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+
/*--------------------------------------------------------------------*/
static int
return (1);
}
-static void *
-CacheWorker(void *priv)
+static void
+wrk_WorkSession(struct worker *w, struct sess *sp)
{
- struct sess *sp;
- struct worker w;
int done;
char *b;
- memset(&w, 0, sizeof w);
- w.eb = event_init();
- assert(w.eb != NULL);
- w.sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
- assert(w.sb != NULL);
-
- (void)priv;
+ time(&sp->t0);
AZ(pthread_mutex_lock(&sessmtx));
+ sp->vcl = GetVCL();
+ AZ(pthread_mutex_unlock(&sessmtx));
+
+ done = http_Dissect(sp->http, sp->fd, 1);
+ if (done != 0) {
+ RES_Error(w, sp, done, NULL);
+ goto out;
+ }
+
+ sp->backend = sp->vcl->backend[0];
+
+ VCL_recv_method(sp);
+
+ for (done = 0; !done; ) {
+ switch(sp->handling) {
+ case VCL_RET_LOOKUP:
+ done = LookupSession(w, sp);
+ break;
+ case VCL_RET_FETCH:
+ done = FetchSession(w, sp);
+ break;
+ case VCL_RET_DELIVER:
+ done = DeliverSession(w, sp);
+ break;
+ case VCL_RET_PIPE:
+ PipeSession(w, sp);
+ done = 1;
+ break;
+ case VCL_RET_PASS:
+ PassSession(w, sp);
+ done = 1;
+ break;
+ default:
+ INCOMPL();
+ }
+ }
+ if (http_GetHdr(sp->http, "Connection", &b) &&
+ !strcmp(b, "close")) {
+ vca_close_session(sp, "Connection header");
+ } else if (http_GetProto(sp->http, &b) &&
+ strcmp(b, "HTTP/1.1")) {
+ vca_close_session(sp, "not HTTP/1.1");
+ }
+
+out:
+ AZ(pthread_mutex_lock(&sessmtx));
+ RelVCL(sp->vcl);
+ AZ(pthread_mutex_unlock(&sessmtx));
+ sp->vcl = NULL;
+ vca_return_session(sp);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+wrk_thread(void *priv)
+{
+ struct worker *w, ww;
+ struct workreq *wrq;
+ struct timespec ts;
+
+ w = &ww;
+ memset(w, 0, sizeof w);
+
+ AZ(pthread_cond_init(&w->cv, NULL));
+
+ w->eb = event_init();
+ assert(w->eb != NULL);
+
+ w->sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
+ assert(w->sb != NULL);
+
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ VSL_stats->n_wrk++;
+ w->nbr = VSL_stats->n_wrk;
+ if (priv == NULL)
+ VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr);
+ else
+ VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr);
+ TAILQ_INSERT_HEAD(&wrk_head, w, list);
while (1) {
- while (1) {
- sp = TAILQ_FIRST(&shd);
- if (sp != NULL)
- break;
- AZ(pthread_cond_wait(&shdcnd, &sessmtx));
+ wrq = TAILQ_FIRST(&wrk_reqhead);
+ if (wrq != NULL) {
+ VSL_stats->n_wrkbusy++;
+ TAILQ_REMOVE(&wrk_head, w, list);
+ TAILQ_REMOVE(&wrk_reqhead, wrq, list);
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ assert(wrq->sess != NULL);
+ wrk_WorkSession(w, wrq->sess);
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ VSL_stats->n_wrkbusy--;
+ TAILQ_INSERT_HEAD(&wrk_head, w, list);
}
- TAILQ_REMOVE(&shd, sp, list);
- time(&sp->t0);
- sp->vcl = GetVCL();
- AZ(pthread_mutex_unlock(&sessmtx));
-
- done = http_Dissect(sp->http, sp->fd, 1);
- if (done != 0) {
- RES_Error(&w, sp, done, NULL);
- goto out;
+ if (wrk_overflow > 0) {
+ wrk_overflow--;
+ continue;
}
- sp->backend = sp->vcl->backend[0];
-
- VCL_recv_method(sp);
-
- for (done = 0; !done; ) {
- switch(sp->handling) {
- case VCL_RET_LOOKUP:
- done = LookupSession(&w, sp);
- break;
- case VCL_RET_FETCH:
- done = FetchSession(&w, sp);
- break;
- case VCL_RET_DELIVER:
- done = DeliverSession(&w, sp);
- break;
- case VCL_RET_PIPE:
- PipeSession(&w, sp);
- done = 1;
- break;
- case VCL_RET_PASS:
- PassSession(&w, sp);
- done = 1;
- break;
- default:
- INCOMPL();
- }
- }
- if (http_GetHdr(sp->http, "Connection", &b) &&
- !strcmp(b, "close")) {
- vca_close_session(sp, "Connection header");
- } else if (http_GetProto(sp->http, &b) &&
- strcmp(b, "HTTP/1.1")) {
- vca_close_session(sp, "not HTTP/1.1");
+ /* If we are a reserved thread we don't die */
+ if (priv != NULL) {
+ AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
+ continue;
}
-out:
- AZ(pthread_mutex_lock(&sessmtx));
- RelVCL(sp->vcl);
- sp->vcl = NULL;
- vca_return_session(sp);
+ /* If we are a dynamic thread, time out and die */
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += heritage.wthread_timeout;
+ if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
+ VSL_stats->n_wrk--;
+ TAILQ_REMOVE(&wrk_head, w, list);
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
+ sbuf_delete(w->sb);
+ event_base_free(w->eb);
+ AZ(pthread_cond_destroy(&w->cv));
+ return (NULL);
+ }
}
}
+/*--------------------------------------------------------------------*/
+
void
-DealWithSession(void *arg)
+WRK_QueueSession(struct sess *sp)
{
- struct sess *sp = arg;
+ struct worker *w;
+ pthread_t tp;
time(&sp->t_req);
/*
* No locking necessary, we're serialized in the acceptor thread
+ * XXX: still ?
*/
sp->xid = xids++;
VSL(SLT_XID, sp->fd, "%u", sp->xid);
+ sp->workreq.sess = sp;
VSL_stats->client_req++;
- AZ(pthread_mutex_lock(&sessmtx));
- TAILQ_INSERT_TAIL(&shd, sp, list);
- AZ(pthread_mutex_unlock(&sessmtx));
- AZ(pthread_cond_signal(&shdcnd));
+
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+
+ /* If there are idle threads, we tickle the first one into action */
+ w = TAILQ_FIRST(&wrk_head);
+ if (w != NULL) {
+ AZ(pthread_cond_signal(&w->cv));
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ return;
+ }
+
+ /* Register overflow if max threads reached */
+ if (VSL_stats->n_wrk >= heritage.wthread_max) {
+ wrk_overflow++;
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ return;
+ }
+
+ /* Try to create a thread */
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+ AZ(pthread_detach(tp));
+ return;
+ }
+
+ VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+ errno, strerror(errno));
+
+ /* Register overflow */
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ wrk_overflow++;
+ AZ(pthread_mutex_unlock(&wrk_mtx));
}
+
+
+/*--------------------------------------------------------------------*/
void
-CacheInitPool(void)
+WRK_Init(void)
{
pthread_t tp;
int i;
- AZ(pthread_cond_init(&shdcnd, NULL));
+ AZ(pthread_mutex_init(&wrk_mtx, NULL));
VSL(SLT_Debug, 0, "Starting %u worker threads", heritage.wthread_min);
for (i = 0; i < heritage.wthread_min; i++) {
- AZ(pthread_create(&tp, NULL, CacheWorker, NULL));
+ AZ(pthread_create(&tp, NULL, wrk_thread, &i));
AZ(pthread_detach(tp));
}
srandomdev();
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <limits.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
fprintf(stderr, " %-28s # %s\n",
"-s kind[,storageoptions]", "Backend storage specification");
fprintf(stderr, " %-28s # %s\n", "-t", "Default TTL");
- fprintf(stderr, " %-28s # %s\n", "-w int[,int]",
- "Number of worker threads (fixed/{min,max})");
+ fprintf(stderr, " %-28s # %s\n", "-w int[,int[,int]]",
+ "Number of worker threads (fixed/{min,max}/{min/max/timeout})");
#if 0
-c clusterid@cluster_controller
-m memory_limit
}
+/*--------------------------------------------------------------------*/
+
+static void
+tackle_warg(const char *argv)
+{
+ int i;
+ unsigned ua, ub, uc;
+
+ i = sscanf(argv, "%u,%u,%u", &ua, &ub, &uc);
+ if (i == 0)
+ usage();
+ if (ua < 1)
+ usage();
+ heritage.wthread_min = ua;
+ heritage.wthread_max = ua;
+ heritage.wthread_timeout = 10;
+ if (i >= 2)
+ heritage.wthread_max = ub;
+ if (i >= 3)
+ heritage.wthread_timeout = uc;
+}
+
/*--------------------------------------------------------------------*/
/* for development purposes */
int
main(int argc, char *argv[])
{
- int o, i;
- unsigned ua, ub;
+ int o;
const char *portnumber = "8080";
unsigned dflag = 1; /* XXX: debug=on for now */
const char *bflag = NULL;
VCC_InitCompile();
heritage.default_ttl = 120;
- heritage.wthread_min = 5;
- heritage.wthread_max = 5;
+ heritage.wthread_min = 1;
+ heritage.wthread_max = UINT_MAX;
+ heritage.wthread_timeout = 10;
heritage.mem_http_headerspace= 4096;
heritage.mem_http_headers= 32;
heritage.mem_workspace = 0;
heritage.default_ttl = strtoul(optarg, NULL, 0);
break;
case 'w':
- i = sscanf(optarg, "%u,%u", &ua, &ub);
- if (i == 0)
- usage();
- heritage.wthread_min = ua;
- heritage.wthread_max = ua;
- if (i == 2)
- heritage.wthread_max = ub;
+ tackle_warg(optarg);
break;
default:
usage();