From: phk Date: Mon, 21 Aug 2006 18:55:24 +0000 (+0000) Subject: Overhaul kqueue acceptor in light of todays learnings. X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e6a1848e45395160bc1a74ef6607a2f4dad076a9;p=varnish Overhaul kqueue acceptor in light of todays learnings. Use the pipe trick to inject sessions into the system, as far as I can tell it is cheaper because of the low rate it happens and the high rate of mutex operations avoided. Ignore the timer event, but purge the list every time we wake up to reduce lumpyness of timeout'ing. Centralize the polling of a session so we don't have the same two messages spread out all over the place. Centralize the acceptor thread and send things directly to the worker thread, leaving only the session-herder in the split out files. poll & epoll not yet updated accordingly. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@875 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- diff --git a/varnish-cache/bin/varnishd/cache.h b/varnish-cache/bin/varnishd/cache.h index 455c1f76..499132ba 100644 --- a/varnish-cache/bin/varnishd/cache.h +++ b/varnish-cache/bin/varnishd/cache.h @@ -272,7 +272,6 @@ struct sess { struct workreq workreq; struct acct acct; - unsigned kqa; }; struct backend { diff --git a/varnish-cache/bin/varnishd/cache_acceptor.c b/varnish-cache/bin/varnishd/cache_acceptor.c index 4ff494fc..9f191975 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor.c +++ b/varnish-cache/bin/varnishd/cache_acceptor.c @@ -40,6 +40,7 @@ static struct acceptor *vca_acceptors[] = { }; static unsigned xids; +static pthread_t vca_thread_acct; struct sess * vca_accept_sess(int fd) @@ -128,6 +129,23 @@ vca_handfirst(struct sess *sp) /*--------------------------------------------------------------------*/ +int +vca_pollsession(struct sess *sp) +{ + int i; + + i = http_RecvSome(sp->fd, sp->http); + if (i < 1) + return (i); + if (i == 1) + vca_close_session(sp, "overflow"); + else if (i == 2) + vca_close_session(sp, "no request"); + return (1); +} + +/*--------------------------------------------------------------------*/ + void vca_close_session(struct sess *sp, const char *why) { @@ -143,9 +161,33 @@ vca_return_session(struct sess *sp) { CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + if (sp->fd >= 0) { + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); + if (http_RecvPrepAgain(sp->http)) + vca_handover(sp, 0); + } vca_acceptors[0]->recycle(sp); } +/*--------------------------------------------------------------------*/ + +static void * +vca_acct(void *arg) +{ + struct sess *sp; + + (void)arg; + while (1) { + sp = vca_accept_sess(heritage.socket); + if (sp == NULL) + continue; + http_RecvPrep(sp->http); + vca_handfirst(sp); + } +} + + /*--------------------------------------------------------------------*/ void @@ -161,4 +203,5 @@ VCA_Init(void) exit (2); } vca_acceptors[0]->init(); + AZ(pthread_create(&vca_thread_acct, NULL, vca_acct, NULL)); } diff --git a/varnish-cache/bin/varnishd/cache_acceptor.h b/varnish-cache/bin/varnishd/cache_acceptor.h index 5746f4f4..9813674c 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor.h +++ b/varnish-cache/bin/varnishd/cache_acceptor.h @@ -29,4 +29,5 @@ extern struct acceptor acceptor_poll; struct sess *vca_accept_sess(int fd); void vca_handover(struct sess *sp, int bad); void vca_handfirst(struct sess *sp); +int vca_pollsession(struct sess *sp); diff --git a/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c b/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c index 488ec416..1735980c 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c +++ b/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c @@ -19,109 +19,82 @@ #include #include -#ifndef HAVE_SRANDOMDEV -#include "compat/srandomdev.h" -#endif - #include "heritage.h" #include "shmlog.h" #include "cache.h" #include "cache_acceptor.h" -static pthread_t vca_kqueue_thread1; -static pthread_t vca_kqueue_thread2; +static pthread_t vca_kqueue_thread; static int kq = -1; +static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); +static int pipes[2]; + #define NKEV 100 static void vca_kq_sess(struct sess *sp, int arm) { - struct kevent ke[2]; - int i, j, arm2; + struct kevent ke; + int i; CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - memset(ke, 0, sizeof ke); - if (arm == EV_ADD || arm == EV_ENABLE) { - assert(sp->kqa == 0); - sp->kqa = 1; - arm2 = EV_ADD; - } else { - assert(sp->kqa == 1); - sp->kqa = 0; - arm2 = EV_DELETE; - } - j = 0; - EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2, - 0, params->sess_timeout * 1000, sp); - if (sp->fd >= 0) - EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp); - - i = kevent(kq, ke, j, NULL, 0, NULL); + if (sp->fd < 0) + return; + EV_SET(&ke, sp->fd, EVFILT_READ, arm, 0, 0, sp); + i = kevent(kq, &ke, 1, NULL, 0, NULL); assert(i == 0); } -static struct sess * +static void vca_kev(struct kevent *kp) { int i; struct sess *sp; - if (kp->udata == NULL) { - VSL(SLT_Debug, 0, - "KQ RACE %s flags %x fflags %x data %x", - kp->filter == EVFILT_READ ? "R" : "T", - kp->flags, kp->fflags, kp->data); - return (NULL); + assert(kp->udata != NULL); + if (kp->udata == pipes) { + while (kp->data > 0) { + i = read(pipes[0], &sp, sizeof sp); + assert(i == sizeof sp); + kp->data -= i; + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + TAILQ_INSERT_TAIL(&sesshead, sp, list); + vca_kq_sess(sp, EV_ADD); + } + return; } CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC); - if (sp->kqa == 0) { - VSL(SLT_Debug, sp->id, - "KQ %s flags %x fflags %x data %x", - kp->filter == EVFILT_READ ? "R" : "T", - kp->flags, kp->fflags, kp->data); - return (NULL); - } - if (kp->filter == EVFILT_READ) { - if (kp->data > 0) { - i = http_RecvSome(sp->fd, sp->http); - switch (i) { - case -1: - return (NULL); - case 0: - vca_kq_sess(sp, EV_DISABLE); - vca_handover(sp, i); - return (NULL); /* ?? */ - case 1: - vca_close_session(sp, "overflow"); - break; - case 2: - vca_close_session(sp, "no request"); - break; - default: - INCOMPL(); - } - return (sp); - } - if (kp->flags == EV_EOF) { - vca_close_session(sp, "EOF"); - return (sp); + if (kp->data > 0) { + i = vca_pollsession(sp); + if (i == -1) + return; + TAILQ_REMOVE(&sesshead, sp, list); + if (i == 0) { + vca_kq_sess(sp, EV_DELETE); + vca_handover(sp, i); + } else { + SES_Delete(sp); } - INCOMPL(); + return; } - if (kp->filter == EVFILT_TIMER) { - vca_close_session(sp, "timeout"); - return (sp); + if (kp->flags == EV_EOF) { + TAILQ_REMOVE(&sesshead, sp, list); + vca_close_session(sp, "EOF"); + SES_Delete(sp); + return; } INCOMPL(); } +/*--------------------------------------------------------------------*/ static void * vca_kqueue_main(void *arg) { struct kevent ke[NKEV], *kp; int i, j, n; + struct timespec ts; struct sess *sp; (void)arg; @@ -129,36 +102,36 @@ vca_kqueue_main(void *arg) kq = kqueue(); assert(kq >= 0); + j = 0; + EV_SET(&ke[j++], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL); + EV_SET(&ke[j++], pipes[0], EVFILT_READ, EV_ADD, 0, 0, pipes); + i = kevent(kq, ke, j, NULL, 0, NULL); + assert(i == 0); + while (1) { n = kevent(kq, NULL, 0, ke, NKEV, NULL); assert(n >= 1 && n <= NKEV); for (kp = ke, j = 0; j < n; j++, kp++) { - sp = vca_kev(kp); - if (sp != NULL) { - vca_kq_sess(sp, EV_DELETE); - SES_Delete(sp); - for (i = j; i < n; i++) - if (ke[i].udata == sp) - ke[i].udata = NULL; - } + if (kp->filter == EVFILT_TIMER) + continue; + assert(kp->filter == EVFILT_READ); + vca_kev(kp); + } + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec -= params->sess_timeout; + for (;;) { + sp = TAILQ_FIRST(&sesshead); + if (sp == NULL) + break; + if (sp->t_open.tv_sec > ts.tv_sec) + break; + if (sp->t_open.tv_sec == ts.tv_sec && + sp->t_open.tv_nsec > ts.tv_nsec) + break; + TAILQ_REMOVE(&sesshead, sp, list); + vca_close_session(sp, "timeout"); + SES_Delete(sp); } - } - INCOMPL(); -} - -static void * -vca_kqueue_acct(void *arg) -{ - struct sess *sp; - - (void)arg; - while (1) { - sp = vca_accept_sess(heritage.socket); - if (sp == NULL) - continue; - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - http_RecvPrep(sp->http); - vca_kq_sess(sp, EV_ADD); } } @@ -168,23 +141,18 @@ static void vca_kqueue_recycle(struct sess *sp) { - if (sp->fd < 0) { + if (sp->fd < 0) SES_Delete(sp); - return; - } - (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); - VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); - if (http_RecvPrepAgain(sp->http)) - vca_handover(sp, 0); - else - vca_kq_sess(sp, EV_ENABLE); + else + assert(write(pipes[1], &sp, sizeof sp) == sizeof sp); } static void vca_kqueue_init(void) { - AZ(pthread_create(&vca_kqueue_thread1, NULL, vca_kqueue_main, NULL)); - AZ(pthread_create(&vca_kqueue_thread2, NULL, vca_kqueue_acct, NULL)); + + AZ(pipe(pipes)); + AZ(pthread_create(&vca_kqueue_thread, NULL, vca_kqueue_main, NULL)); } struct acceptor acceptor_kqueue = {