]> err.no Git - varnish/commitdiff
Bite the bullet and write an alternate acceptor which uses kqueue
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Wed, 2 Aug 2006 09:34:40 +0000 (09:34 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Wed, 2 Aug 2006 09:34:40 +0000 (09:34 +0000)
directly instead of libevent.

Degeneralize the header reading code in cache_http.c which seems to
be cleaner anyway.

An #ifdef at the top of cache_acceptor.c selects which implementation
you want: libevent or kqueue.

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@597 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishd/cache.h
varnish-cache/bin/varnishd/cache_acceptor.c
varnish-cache/bin/varnishd/cache_http.c

index 39011176050858bbeef53c0e2c910668f9988674..ad3acac60c08a4ab8380083d8ba22b1483319c5d 100644 (file)
@@ -46,8 +46,6 @@ enum step {
  * RSN: struct worker and struct session will have one of these embedded.
  */
 
-typedef void http_callback_f(void *, int bad);
-
 struct http_hdr {
        char                    *b;
        char                    *e;
@@ -56,9 +54,6 @@ struct http_hdr {
 struct http {
        unsigned                magic;
 #define HTTP_MAGIC             0x6428b5c9
-       struct event            ev;
-       http_callback_f         *callback;
-       void                    *arg;
 
        char                    *s;             /* (S)tart of buffer */
        char                    *t;             /* start of (T)railing data */
@@ -230,6 +225,7 @@ struct sess {
        int                     id;
        unsigned                xid;
 
+       struct event            ev;
        struct worker           *wrk;
 
        unsigned                sockaddrlen;
@@ -346,7 +342,9 @@ int http_GetStatus(struct http *hp);
 int http_HdrIs(struct http *hp, const char *hdr, const char *val);
 int http_GetTail(struct http *hp, unsigned len, char **b, char **e);
 int http_Read(struct http *hp, int fd, void *b, unsigned len);
-void http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg);
+void http_RecvPrep(struct http *hp);
+int http_RecvPrepAgain(struct http *hp);
+int http_RecvSome(int fd, struct http *hp);
 int http_RecvHead(struct http *hp, int fd);
 int http_DissectRequest(struct http *sp, int fd);
 int http_DissectResponse(struct http *sp, int fd);
index 6a3a121a6be7e980a1df9b6cd658d856fd6fc698..1e0ac873bbbc54e757706785068ced8309972ce4 100644 (file)
@@ -6,6 +6,9 @@
  * write the session pointer to a pipe which the event engine monitors.
  */
 
+#define ACCEPTOR_USE_KQUEUE
+#undef ACCEPTOR_USE_LIBEVENT
+
 #include <stdio.h>
 #include <errno.h>
 #include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 
-#include <netdb.h>
-
 #include "config.h"
 #include "libvarnish.h"
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
 
+static pthread_t vca_thread;
+static unsigned                xids;
+
+static struct sess *
+vca_accept_sess(int fd)
+{
+       socklen_t l;
+       struct sockaddr addr[2];        /* XXX: IPv6 hack */
+       struct sess *sp;
+       int i;
+       struct linger linger;
+
+       VSL_stats->client_conn++;
+
+       l = sizeof addr;
+       i = accept(fd, addr, &l);
+       if (i < 0) {
+               VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
+               /* XXX: stats ? */
+               return (NULL);
+       }
+       sp = SES_New(addr, l);
+       assert(sp != NULL);     /* XXX handle */
+
+       sp->fd = i;
+       sp->id = i;
+
+#ifdef SO_NOSIGPIPE /* XXX Linux */
+       i = 1;
+       AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
+#endif
+#ifdef SO_LINGER /* XXX Linux*/
+       linger.l_onoff = 0;
+       linger.l_linger = 0;
+       AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
+#endif
+
+       TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
+       VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
+       return (sp);
+}
+
+static void
+vca_handover(struct sess *sp, int bad)
+{
+
+       if (bad) {
+               vca_close_session(sp,
+                   bad == 1 ? "overflow" : "no request");
+               vca_return_session(sp);
+               return;
+       }
+       sp->step = STP_RECV;
+       VSL_stats->client_req++;
+       sp->xid = xids++;
+       VSL(SLT_XID, sp->fd, "%u", sp->xid);
+       WRK_QueueSession(sp);
+}
+
+#ifdef ACCEPTOR_USE_LIBEVENT
+
 static struct event_base *evb;
 static struct event pipe_e;
 static int pipes[2];
@@ -31,12 +93,11 @@ static int pipes[2];
 static struct event tick_e;
 static struct timeval tick_rate;
 
-static pthread_t vca_thread;
-static unsigned                xids;
-
 static struct event accept_e[2 * HERITAGE_NSOCKS];
 static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
 
+/*--------------------------------------------------------------------*/
+
 static void
 vca_tick(int a, short b, void *c)
 {
@@ -59,25 +120,33 @@ vca_tick(int a, short b, void *c)
 }
 
 static void
-vca_callback(void *arg, int bad)
+vca_rcvhd_f(int fd, short event, void *arg)
 {
        struct sess *sp;
+       int i;
+
+       (void)event;
 
        CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
-       TAILQ_REMOVE(&sesshead, sp, list);
-       if (bad) {
-               if (bad == 1)
-                       vca_close_session(sp, "overflow");
-               else
-                       vca_close_session(sp, "no request");
-               vca_return_session(sp);
+       i = http_RecvSome(fd, sp->http);
+       if (i < 0)
                return;
-       }
-       sp->step = STP_RECV;
-       VSL_stats->client_req++;
-       sp->xid = xids++;
-       VSL(SLT_XID, sp->fd, "%u", sp->xid);
-       WRK_QueueSession(sp);
+
+       event_del(&sp->ev);
+       TAILQ_REMOVE(&sesshead, sp, list);
+       vca_handover(sp, i);
+}
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+       TAILQ_INSERT_TAIL(&sesshead, sp, list);
+       event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp);
+       AZ(event_base_set(evb, &sp->ev));
+       AZ(event_add(&sp->ev, NULL));      /* XXX: timeout */
 }
 
 static void
@@ -90,53 +159,27 @@ pipe_f(int fd, short event, void *arg)
        (void)arg;
        i = read(fd, &sp, sizeof sp);
        assert(i == sizeof sp);
-       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-       TAILQ_INSERT_TAIL(&sesshead, sp, list);
-       http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+       if (http_RecvPrepAgain(sp->http)) {
+               vca_handover(sp, 0);
+               return;
+       }
+       vca_rcvhdev(sp);
 }
 
 static void
 accept_f(int fd, short event, void *arg)
 {
-       socklen_t l;
-       struct sockaddr addr[2];        /* XXX: IPv6 hack */
        struct sess *sp;
-       int i;
-       struct linger linger;
 
        (void)event;
        (void)arg;
-       VSL_stats->client_conn++;
-
 
-       l = sizeof addr;
-       i = accept(fd, addr, &l);
-       if (i < 0) {
-               VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
-               /* XXX: stats ? */
+       sp = vca_accept_sess(fd);
+       if (sp == NULL)
                return;
-       }
-       sp = SES_New(addr, l);
-       assert(sp != NULL);     /* XXX handle */
-
-       sp->fd = i;
-       sp->id = i;
-
-#ifdef SO_NOSIGPIPE /* XXX Linux */
-       i = 1;
-       AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
-#endif
-#ifdef SO_LINGER /* XXX Linux*/
-       linger.l_onoff = 0;
-       linger.l_linger = 0;
-       AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
-#endif
 
-       TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
-       VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
-       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-       TAILQ_INSERT_TAIL(&sesshead, sp, list);
-       http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+       http_RecvPrep(sp->http);
+       vca_rcvhdev(sp);
 }
 
 static void *
@@ -147,6 +190,8 @@ vca_main(void *arg)
 
        (void)arg;
 
+       tick_rate.tv_sec = 1;
+       tick_rate.tv_usec = 0;
        AZ(pipe(pipes));
        evb = event_init();
        assert(evb != NULL);
@@ -187,13 +232,114 @@ vca_main(void *arg)
 /*--------------------------------------------------------------------*/
 
 void
-vca_close_session(struct sess *sp, const char *why)
+vca_return_session(struct sess *sp)
 {
 
-       VSL(SLT_SessionClose, sp->fd, why);
-       if (sp->fd >= 0)
-               AZ(close(sp->fd));
-       sp->fd = -1;
+       if (sp->fd < 0) {
+               SES_Delete(sp);
+               return;
+       }
+       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+       assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+#endif /* ACCEPTOR_USE_LIBEVENT */
+
+#ifdef ACCEPTOR_USE_KQUEUE
+#include <sys/event.h>
+
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+       struct kevent ke[2];
+
+       assert(arm == EV_ADD || arm == EV_DELETE);
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       memset(ke, 0, sizeof ke);
+       EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+       EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0, 5000, sp);
+       AZ(kevent(kq, ke, 2, NULL, 0, NULL));
+}
+
+static void
+accept_f(int fd)
+{
+       struct sess *sp;
+
+       sp = vca_accept_sess(fd);
+       if (sp == NULL)
+               return;
+       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+       http_RecvPrep(sp->http);
+       vca_kq_sess(sp, EV_ADD);
+}
+
+static void *
+vca_main(void *arg)
+{
+       unsigned u;
+       struct kevent ke;
+       int i;
+       struct sess *sp;
+
+       (void)arg;
+
+       kq = kqueue();
+       assert(kq >= 0);
+
+
+       for (u = 0; u < HERITAGE_NSOCKS; u++) {
+               if (heritage.sock_local[u] >= 0) {
+                       memset(&ke, 0, sizeof ke);
+                       EV_SET(&ke, heritage.sock_local[u],
+                           EVFILT_READ, EV_ADD, 0, 0, accept_f);
+                       AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+               }
+               if (heritage.sock_remote[u] >= 0) {
+                       memset(&ke, 0, sizeof ke);
+                       EV_SET(&ke, heritage.sock_remote[u],
+                           EVFILT_READ, EV_ADD, 0, 0, accept_f);
+                       AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+               }
+       }
+
+       while (1) {
+               i = kevent(kq, NULL, 0, &ke, 1, NULL);
+               assert(i == 1);
+#if 0
+               printf("i = %d\n", i);
+               printf("ke.ident = %ju\n", (uintmax_t)ke.ident);
+               printf("ke.filter = %u\n", ke.filter);
+               printf("ke.flags = %u\n", ke.flags);
+               printf("ke.fflags = %u\n", ke.fflags);
+               printf("ke.data = %jd\n", (intmax_t)ke.data);
+               printf("ke.udata = %p\n", ke.udata);
+#endif
+               if (ke.udata == accept_f) {
+                       accept_f(ke.ident);
+                       continue;
+               }
+               CAST_OBJ_NOTNULL(sp, ke.udata, SESS_MAGIC);
+               if (ke.filter == EVFILT_READ) {
+                       i = http_RecvSome(sp->fd, sp->http);
+                       if (i == -1)
+                               continue;
+                       vca_kq_sess(sp, EV_DELETE);
+                       vca_handover(sp, i);
+                       continue;
+               }
+               if (ke.filter == EVFILT_TIMER) {
+                       vca_kq_sess(sp, EV_DELETE);
+                       vca_close_session(sp, "timeout");
+                       vca_return_session(sp);
+                       continue;
+               } 
+               INCOMPL();
+       }
+
+       INCOMPL();
 }
 
 /*--------------------------------------------------------------------*/
@@ -202,12 +348,29 @@ void
 vca_return_session(struct sess *sp)
 {
 
-       if (sp->fd >= 0) {
-               VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-               assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-       } else {
+       if (sp->fd < 0) {
                SES_Delete(sp);
+               return;
        }
+       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+       if (http_RecvPrepAgain(sp->http))
+               vca_handover(sp, 0);
+       else
+               vca_kq_sess(sp, EV_ADD);
+}
+
+#endif /* ACCEPTOR_USE_KQUEUE */
+
+/*--------------------------------------------------------------------*/
+
+void
+vca_close_session(struct sess *sp, const char *why)
+{
+
+       VSL(SLT_SessionClose, sp->fd, why);
+       if (sp->fd >= 0)
+               AZ(close(sp->fd));
+       sp->fd = -1;
 }
 
 /*--------------------------------------------------------------------*/
@@ -216,8 +379,6 @@ void
 VCA_Init(void)
 {
 
-       tick_rate.tv_sec = 1;
-       tick_rate.tv_usec = 0;
        AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
        srandomdev();
        xids = random();
index 99de6a1d84fe303d377783eee418fb5843280a38..7a6bd5b9ed6d0f0a5cd6e28e1f2fb3f341628127 100644 (file)
@@ -436,14 +436,15 @@ http_header_complete(struct http *hp)
        if (++p > hp->v)
                return (0);
        hp->t = p;
+       assert(hp->t > hp->s);
        assert(hp->t <= hp->v);
        return (1);
 }
 
 /*--------------------------------------------------------------------*/
 
-static void
-http_preprecv(struct http *hp)
+void
+http_RecvPrep(struct http *hp)
 {
        unsigned l;
 
@@ -462,10 +463,19 @@ http_preprecv(struct http *hp)
        }
 }
 
+int
+http_RecvPrepAgain(struct http *hp)
+{
+       http_RecvPrep(hp);
+       if (hp->v == hp->s)
+               return (0);
+       return (http_header_complete(hp));
+}
+
 /*--------------------------------------------------------------------*/
 
-static int
-http_read_hdr(int fd, struct http *hp)
+int
+http_RecvSome(int fd, struct http *hp)
 {
        unsigned l;
        int i;
@@ -507,55 +517,15 @@ http_read_hdr(int fd, struct http *hp)
 
 /*--------------------------------------------------------------------*/
 
-static void
-http_read_f(int fd, short event, void *arg)
-{
-       struct http *hp;
-       int i;
-
-       (void)event;
-
-       CAST_OBJ_NOTNULL(hp, arg, HTTP_MAGIC);
-       i = http_read_hdr(fd, hp);
-       if (i < 0)
-               return;
-
-       event_del(&hp->ev);
-       if (hp->callback != NULL)
-               hp->callback(hp->arg, i);
-}
-
-
-void
-http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg)
-{
-
-       CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
-       assert(func != NULL);
-       http_preprecv(hp);
-       if (hp->v != hp->s && http_header_complete(hp)) {
-               func(arg, 0);
-               return;
-       }
-       hp->callback = func;
-       hp->arg = arg;
-       event_set(&hp->ev, fd, EV_READ | EV_PERSIST, http_read_f, hp);
-       AZ(event_base_set(eb, &hp->ev));
-       AZ(event_add(&hp->ev, NULL));      /* XXX: timeout */
-       return;
-}
-
-/*--------------------------------------------------------------------*/
-
 int
 http_RecvHead(struct http *hp, int fd)
 {
        int i;
 
        CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
-       http_preprecv(hp);
+       http_RecvPrep(hp);
        do 
-               i = http_read_hdr(fd, hp);
+               i = http_RecvSome(fd, hp);
        while (i == -1);
        return (i);
 }