From 8a20a7434fa4444c16cebb1c09b421a6b6bb73f9 Mon Sep 17 00:00:00 2001 From: phk Date: Wed, 2 Aug 2006 09:34:40 +0000 Subject: [PATCH] Bite the bullet and write an alternate acceptor which uses kqueue directly instead of libevent. Degeneralize the header reading code in cache_http.c which seems to be cleaner anyway. An #ifdef at the top of cache_acceptor.c selects which implementation you want: libevent or kqueue. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@597 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/cache.h | 10 +- varnish-cache/bin/varnishd/cache_acceptor.c | 291 +++++++++++++++----- varnish-cache/bin/varnishd/cache_http.c | 62 ++--- 3 files changed, 246 insertions(+), 117 deletions(-) diff --git a/varnish-cache/bin/varnishd/cache.h b/varnish-cache/bin/varnishd/cache.h index 39011176..ad3acac6 100644 --- a/varnish-cache/bin/varnishd/cache.h +++ b/varnish-cache/bin/varnishd/cache.h @@ -46,8 +46,6 @@ enum step { * RSN: struct worker and struct session will have one of these embedded. */ -typedef void http_callback_f(void *, int bad); - struct http_hdr { char *b; char *e; @@ -56,9 +54,6 @@ struct http_hdr { struct http { unsigned magic; #define HTTP_MAGIC 0x6428b5c9 - struct event ev; - http_callback_f *callback; - void *arg; char *s; /* (S)tart of buffer */ char *t; /* start of (T)railing data */ @@ -230,6 +225,7 @@ struct sess { int id; unsigned xid; + struct event ev; struct worker *wrk; unsigned sockaddrlen; @@ -346,7 +342,9 @@ int http_GetStatus(struct http *hp); int http_HdrIs(struct http *hp, const char *hdr, const char *val); int http_GetTail(struct http *hp, unsigned len, char **b, char **e); int http_Read(struct http *hp, int fd, void *b, unsigned len); -void http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg); +void http_RecvPrep(struct http *hp); +int http_RecvPrepAgain(struct http *hp); +int http_RecvSome(int fd, struct http *hp); int http_RecvHead(struct http *hp, int fd); int http_DissectRequest(struct http *sp, int fd); int http_DissectResponse(struct http *sp, int fd); diff --git a/varnish-cache/bin/varnishd/cache_acceptor.c b/varnish-cache/bin/varnishd/cache_acceptor.c index 6a3a121a..1e0ac873 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor.c +++ b/varnish-cache/bin/varnishd/cache_acceptor.c @@ -6,6 +6,9 @@ * write the session pointer to a pipe which the event engine monitors. */ +#define ACCEPTOR_USE_KQUEUE +#undef ACCEPTOR_USE_LIBEVENT + #include #include #include @@ -16,14 +19,73 @@ #include #include -#include - #include "config.h" #include "libvarnish.h" #include "heritage.h" #include "shmlog.h" #include "cache.h" +static pthread_t vca_thread; +static unsigned xids; + +static struct sess * +vca_accept_sess(int fd) +{ + socklen_t l; + struct sockaddr addr[2]; /* XXX: IPv6 hack */ + struct sess *sp; + int i; + struct linger linger; + + VSL_stats->client_conn++; + + l = sizeof addr; + i = accept(fd, addr, &l); + if (i < 0) { + VSL(SLT_Debug, fd, "Accept failed errno=%d", errno); + /* XXX: stats ? */ + return (NULL); + } + sp = SES_New(addr, l); + assert(sp != NULL); /* XXX handle */ + + sp->fd = i; + sp->id = i; + +#ifdef SO_NOSIGPIPE /* XXX Linux */ + i = 1; + AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i)); +#endif +#ifdef SO_LINGER /* XXX Linux*/ + linger.l_onoff = 0; + linger.l_linger = 0; + AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger)); +#endif + + TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port); + VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port); + return (sp); +} + +static void +vca_handover(struct sess *sp, int bad) +{ + + if (bad) { + vca_close_session(sp, + bad == 1 ? "overflow" : "no request"); + vca_return_session(sp); + return; + } + sp->step = STP_RECV; + VSL_stats->client_req++; + sp->xid = xids++; + VSL(SLT_XID, sp->fd, "%u", sp->xid); + WRK_QueueSession(sp); +} + +#ifdef ACCEPTOR_USE_LIBEVENT + static struct event_base *evb; static struct event pipe_e; static int pipes[2]; @@ -31,12 +93,11 @@ static int pipes[2]; static struct event tick_e; static struct timeval tick_rate; -static pthread_t vca_thread; -static unsigned xids; - static struct event accept_e[2 * HERITAGE_NSOCKS]; static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); +/*--------------------------------------------------------------------*/ + static void vca_tick(int a, short b, void *c) { @@ -59,25 +120,33 @@ vca_tick(int a, short b, void *c) } static void -vca_callback(void *arg, int bad) +vca_rcvhd_f(int fd, short event, void *arg) { struct sess *sp; + int i; + + (void)event; CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC); - TAILQ_REMOVE(&sesshead, sp, list); - if (bad) { - if (bad == 1) - vca_close_session(sp, "overflow"); - else - vca_close_session(sp, "no request"); - vca_return_session(sp); + i = http_RecvSome(fd, sp->http); + if (i < 0) return; - } - sp->step = STP_RECV; - VSL_stats->client_req++; - sp->xid = xids++; - VSL(SLT_XID, sp->fd, "%u", sp->xid); - WRK_QueueSession(sp); + + event_del(&sp->ev); + TAILQ_REMOVE(&sesshead, sp, list); + vca_handover(sp, i); +} + +static void +vca_rcvhdev(struct sess *sp) +{ + + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); + TAILQ_INSERT_TAIL(&sesshead, sp, list); + event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp); + AZ(event_base_set(evb, &sp->ev)); + AZ(event_add(&sp->ev, NULL)); /* XXX: timeout */ } static void @@ -90,53 +159,27 @@ pipe_f(int fd, short event, void *arg) (void)arg; i = read(fd, &sp, sizeof sp); assert(i == sizeof sp); - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - TAILQ_INSERT_TAIL(&sesshead, sp, list); - http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp); + if (http_RecvPrepAgain(sp->http)) { + vca_handover(sp, 0); + return; + } + vca_rcvhdev(sp); } static void accept_f(int fd, short event, void *arg) { - socklen_t l; - struct sockaddr addr[2]; /* XXX: IPv6 hack */ struct sess *sp; - int i; - struct linger linger; (void)event; (void)arg; - VSL_stats->client_conn++; - - l = sizeof addr; - i = accept(fd, addr, &l); - if (i < 0) { - VSL(SLT_Debug, fd, "Accept failed errno=%d", errno); - /* XXX: stats ? */ + sp = vca_accept_sess(fd); + if (sp == NULL) return; - } - sp = SES_New(addr, l); - assert(sp != NULL); /* XXX handle */ - - sp->fd = i; - sp->id = i; - -#ifdef SO_NOSIGPIPE /* XXX Linux */ - i = 1; - AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i)); -#endif -#ifdef SO_LINGER /* XXX Linux*/ - linger.l_onoff = 0; - linger.l_linger = 0; - AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger)); -#endif - TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port); - VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port); - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - TAILQ_INSERT_TAIL(&sesshead, sp, list); - http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp); + http_RecvPrep(sp->http); + vca_rcvhdev(sp); } static void * @@ -147,6 +190,8 @@ vca_main(void *arg) (void)arg; + tick_rate.tv_sec = 1; + tick_rate.tv_usec = 0; AZ(pipe(pipes)); evb = event_init(); assert(evb != NULL); @@ -187,13 +232,114 @@ vca_main(void *arg) /*--------------------------------------------------------------------*/ void -vca_close_session(struct sess *sp, const char *why) +vca_return_session(struct sess *sp) { - VSL(SLT_SessionClose, sp->fd, why); - if (sp->fd >= 0) - AZ(close(sp->fd)); - sp->fd = -1; + if (sp->fd < 0) { + SES_Delete(sp); + return; + } + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); +} + +#endif /* ACCEPTOR_USE_LIBEVENT */ + +#ifdef ACCEPTOR_USE_KQUEUE +#include + +static int kq = -1; + +static void +vca_kq_sess(struct sess *sp, int arm) +{ + struct kevent ke[2]; + + assert(arm == EV_ADD || arm == EV_DELETE); + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + memset(ke, 0, sizeof ke); + EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp); + EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0, 5000, sp); + AZ(kevent(kq, ke, 2, NULL, 0, NULL)); +} + +static void +accept_f(int fd) +{ + struct sess *sp; + + sp = vca_accept_sess(fd); + if (sp == NULL) + return; + clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); + http_RecvPrep(sp->http); + vca_kq_sess(sp, EV_ADD); +} + +static void * +vca_main(void *arg) +{ + unsigned u; + struct kevent ke; + int i; + struct sess *sp; + + (void)arg; + + kq = kqueue(); + assert(kq >= 0); + + + for (u = 0; u < HERITAGE_NSOCKS; u++) { + if (heritage.sock_local[u] >= 0) { + memset(&ke, 0, sizeof ke); + EV_SET(&ke, heritage.sock_local[u], + EVFILT_READ, EV_ADD, 0, 0, accept_f); + AZ(kevent(kq, &ke, 1, NULL, 0, NULL)); + } + if (heritage.sock_remote[u] >= 0) { + memset(&ke, 0, sizeof ke); + EV_SET(&ke, heritage.sock_remote[u], + EVFILT_READ, EV_ADD, 0, 0, accept_f); + AZ(kevent(kq, &ke, 1, NULL, 0, NULL)); + } + } + + while (1) { + i = kevent(kq, NULL, 0, &ke, 1, NULL); + assert(i == 1); +#if 0 + printf("i = %d\n", i); + printf("ke.ident = %ju\n", (uintmax_t)ke.ident); + printf("ke.filter = %u\n", ke.filter); + printf("ke.flags = %u\n", ke.flags); + printf("ke.fflags = %u\n", ke.fflags); + printf("ke.data = %jd\n", (intmax_t)ke.data); + printf("ke.udata = %p\n", ke.udata); +#endif + if (ke.udata == accept_f) { + accept_f(ke.ident); + continue; + } + CAST_OBJ_NOTNULL(sp, ke.udata, SESS_MAGIC); + if (ke.filter == EVFILT_READ) { + i = http_RecvSome(sp->fd, sp->http); + if (i == -1) + continue; + vca_kq_sess(sp, EV_DELETE); + vca_handover(sp, i); + continue; + } + if (ke.filter == EVFILT_TIMER) { + vca_kq_sess(sp, EV_DELETE); + vca_close_session(sp, "timeout"); + vca_return_session(sp); + continue; + } + INCOMPL(); + } + + INCOMPL(); } /*--------------------------------------------------------------------*/ @@ -202,12 +348,29 @@ void vca_return_session(struct sess *sp) { - if (sp->fd >= 0) { - VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); - assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); - } else { + if (sp->fd < 0) { SES_Delete(sp); + return; } + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + if (http_RecvPrepAgain(sp->http)) + vca_handover(sp, 0); + else + vca_kq_sess(sp, EV_ADD); +} + +#endif /* ACCEPTOR_USE_KQUEUE */ + +/*--------------------------------------------------------------------*/ + +void +vca_close_session(struct sess *sp, const char *why) +{ + + VSL(SLT_SessionClose, sp->fd, why); + if (sp->fd >= 0) + AZ(close(sp->fd)); + sp->fd = -1; } /*--------------------------------------------------------------------*/ @@ -216,8 +379,6 @@ void VCA_Init(void) { - tick_rate.tv_sec = 1; - tick_rate.tv_usec = 0; AZ(pthread_create(&vca_thread, NULL, vca_main, NULL)); srandomdev(); xids = random(); diff --git a/varnish-cache/bin/varnishd/cache_http.c b/varnish-cache/bin/varnishd/cache_http.c index 99de6a1d..7a6bd5b9 100644 --- a/varnish-cache/bin/varnishd/cache_http.c +++ b/varnish-cache/bin/varnishd/cache_http.c @@ -436,14 +436,15 @@ http_header_complete(struct http *hp) if (++p > hp->v) return (0); hp->t = p; + assert(hp->t > hp->s); assert(hp->t <= hp->v); return (1); } /*--------------------------------------------------------------------*/ -static void -http_preprecv(struct http *hp) +void +http_RecvPrep(struct http *hp) { unsigned l; @@ -462,10 +463,19 @@ http_preprecv(struct http *hp) } } +int +http_RecvPrepAgain(struct http *hp) +{ + http_RecvPrep(hp); + if (hp->v == hp->s) + return (0); + return (http_header_complete(hp)); +} + /*--------------------------------------------------------------------*/ -static int -http_read_hdr(int fd, struct http *hp) +int +http_RecvSome(int fd, struct http *hp) { unsigned l; int i; @@ -507,55 +517,15 @@ http_read_hdr(int fd, struct http *hp) /*--------------------------------------------------------------------*/ -static void -http_read_f(int fd, short event, void *arg) -{ - struct http *hp; - int i; - - (void)event; - - CAST_OBJ_NOTNULL(hp, arg, HTTP_MAGIC); - i = http_read_hdr(fd, hp); - if (i < 0) - return; - - event_del(&hp->ev); - if (hp->callback != NULL) - hp->callback(hp->arg, i); -} - - -void -http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg) -{ - - CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC); - assert(func != NULL); - http_preprecv(hp); - if (hp->v != hp->s && http_header_complete(hp)) { - func(arg, 0); - return; - } - hp->callback = func; - hp->arg = arg; - event_set(&hp->ev, fd, EV_READ | EV_PERSIST, http_read_f, hp); - AZ(event_base_set(eb, &hp->ev)); - AZ(event_add(&hp->ev, NULL)); /* XXX: timeout */ - return; -} - -/*--------------------------------------------------------------------*/ - int http_RecvHead(struct http *hp, int fd) { int i; CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC); - http_preprecv(hp); + http_RecvPrep(hp); do - i = http_read_hdr(fd, hp); + i = http_RecvSome(fd, hp); while (i == -1); return (i); } -- 2.39.5