the objects waitinglist and disembark the worker thread so it can do
something sensible in the mean time.
This feature is unimportant in normal operation, but crucial to
resource management if a popular URL suddenly takes a long time to
reply from the backend.
Without this bit if semi-nasty code, we would tie up one worker
thread per client while waiting for the backend to come to it's
senses.
git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@478
d4fa192b-c00b-0410-8231-
f00ffab90ce4
unsigned refcnt;
unsigned xid;
struct objhead *objhead;
- pthread_cond_t cv;
unsigned heap_idx;
unsigned ban_seq;
TAILQ_ENTRY(object) deathrow;
TAILQ_HEAD(, storage) store;
+
+ TAILQ_HEAD(, sess) waitinglist;
};
struct objhead {
int FetchHeaders(struct worker *w, struct sess *sp);
/* cache_hash.c */
-struct object *HSH_Lookup(struct worker *w, struct http *h);
+struct object *HSH_Lookup(struct sess *sp);
void HSH_Unbusy(struct object *o);
void HSH_Deref(struct object *o);
void HSH_Init(void);
static struct timeval tick_rate;
static pthread_t vca_thread;
+static unsigned xids;
static struct event accept_e[2 * HERITAGE_NSOCKS];
static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
return;
}
sp->step = STP_RECV;
+ VSL_stats->client_req++;
+ sp->xid = xids++;
+ VSL(SLT_XID, sp->fd, "%u", sp->xid);
WRK_QueueSession(sp);
}
tick_rate.tv_sec = 1;
tick_rate.tv_usec = 0;
AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
+ srandomdev();
+ xids = random();
}
/*--------------------------------------------------------------------
* Look up request in hash table
*
+ * LOOKUP consists of two substates so that we can reenter if we
+ * encounter a busy object.
+ *
DOT subgraph cluster_lookup {
DOT lookup [
DOT shape=ellipse
DOT label="find obj in cache"
DOT ]
DOT LOOKUP -> lookup [style=bold]
-DOT lookup2 [
+DOT lookup3 [
DOT shape=ellipse
DOT label="Insert new busy object"
DOT ]
-DOT lookup -> lookup2 [style=bold]
+DOT lookup -> lookup3 [style=bold]
DOT }
DOT lookup -> HIT [label="hit", style=bold]
DOT lookup2 -> MISS [label="miss", style=bold]
static int
cnt_lookup(struct sess *sp)
{
+ sp->obj = NULL;
+ sp->step = STP_LOOKUP2;
+ return (0);
+}
- sp->obj = HSH_Lookup(sp->wrk, sp->http);
+static int
+cnt_lookup2(struct sess *sp)
+{
+ struct object *o;
+
+ /*
+ * We don't assign to sp->obj directly because it is used
+ * to store state when we encounter a busy object.
+ */
+ o = HSH_Lookup(sp);
+
+ /* If we encountered busy-object, disembark worker thread */
+ if (o == NULL) {
+ VSL(SLT_Debug, sp->fd,
+ "on waiting list on obj %u", sp->obj->xid);
+ return (1);
+ }
+
+ sp->obj = o;
+
+ /* If we inserted a new object it's a miss */
if (sp->obj->busy) {
VSL_stats->cache_miss++;
sp->step = STP_MISS;
return (0);
}
+
+ /* Account separately for pass and cache objects */
if (sp->obj->pass) {
VSL_stats->cache_hitpass++;
VSL(SLT_HitPass, sp->fd, "%u", sp->obj->xid);
- sp->step = STP_HIT;
- return (0);
- }
- VSL_stats->cache_hit++;
- VSL(SLT_Hit, sp->fd, "%u", sp->obj->xid);
+ } else {
+ VSL_stats->cache_hit++;
+ VSL(SLT_Hit, sp->fd, "%u", sp->obj->xid);
+ }
sp->step = STP_HIT;
+HERE();
return (0);
}
static struct hash_slinger *hash;
struct object *
-HSH_Lookup(struct worker *w, struct http *h)
+HSH_Lookup(struct sess *sp)
{
+ struct worker *w;
+ struct http *h;
struct objhead *oh;
struct object *o;
char *b, *c;
assert(hash != NULL);
+ w = sp->wrk;
+ h = sp->http;
+
/* Precreate an objhead and object in case we need them */
if (w->nobjhead == NULL) {
w->nobjhead = calloc(sizeof *w->nobjhead, 1);
w->nobj->busy = 1;
w->nobj->refcnt = 1;
TAILQ_INIT(&w->nobj->store);
- AZ(pthread_cond_init(&w->nobj->cv, NULL));
+ TAILQ_INIT(&w->nobj->waitinglist);
VSL_stats->n_object++;
}
assert(http_GetURL(h, &b));
if (!http_GetHdr(h, "Host", &c))
c = b;
+ if (sp->obj != NULL) {
+ o = sp->obj;
+ oh = o->objhead;
+ AZ(pthread_mutex_lock(&oh->mtx));
+ goto were_back;
+ }
oh = hash->lookup(b, c, w->nobjhead);
if (oh == w->nobjhead)
w->nobjhead = NULL;
AZ(pthread_mutex_lock(&oh->mtx));
TAILQ_FOREACH(o, &oh->objects, list) {
o->refcnt++;
- if (o->busy)
- AZ(pthread_cond_wait(&o->cv, &oh->mtx));
+ if (o->busy) {
+ TAILQ_INSERT_TAIL(&o->waitinglist, sp, list);
+ sp->obj = o;
+ AZ(pthread_mutex_unlock(&oh->mtx));
+ return (NULL);
+ }
+ were_back:
/* XXX: check ttl */
/* XXX: check Vary: */
if (!o->cacheable) {
void
HSH_Unbusy(struct object *o)
{
+ struct sess *sp;
+ assert(o != NULL);
+ assert(o->refcnt > 0);
if (o->cacheable)
EXP_Insert(o);
AZ(pthread_mutex_lock(&o->objhead->mtx));
o->busy = 0;
AZ(pthread_mutex_unlock(&o->objhead->mtx));
- AZ(pthread_cond_broadcast(&o->cv));
+ while (1) {
+ sp = TAILQ_FIRST(&o->waitinglist);
+ if (sp == NULL)
+ break;
+ TAILQ_REMOVE(&o->waitinglist, sp, list);
+ WRK_QueueSession(sp);
+ }
}
void
free(o->header);
VSL_stats->n_header--;
}
- AZ(pthread_cond_destroy(&o->cv));
TAILQ_FOREACH_SAFE(st, &o->store, list, stn) {
TAILQ_REMOVE(&o->store, st, list);
#include "cache.h"
static pthread_mutex_t wrk_mtx;
-static unsigned xids;
/* Number of work requests queued in excess of worker threads available */
static unsigned wrk_overflow;
sp->t_req = time(NULL);
- /*
- * No locking necessary, we're serialized in the acceptor thread
- * XXX: still ?
- */
- sp->xid = xids++;
- VSL(SLT_XID, sp->fd, "%u", sp->xid);
-
sp->workreq.sess = sp;
- VSL_stats->client_req++;
AZ(pthread_mutex_lock(&wrk_mtx));
TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
AZ(pthread_create(&tp, NULL, wrk_thread, &i));
AZ(pthread_detach(tp));
}
- srandomdev();
- xids = random();
}
STEP(pass, PASS)
STEP(passbody, PASSBODY)
STEP(lookup, LOOKUP)
+STEP(lookup2, LOOKUP2)
STEP(miss, MISS)
STEP(hit, HIT)
STEP(fetch, FETCH)