]> err.no Git - varnish/commitdiff
Break the acceptors out into their own files.
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 21 Aug 2006 10:59:00 +0000 (10:59 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 21 Aug 2006 10:59:00 +0000 (10:59 +0000)
The intent here is to compile in all acceptors supported on the
operating system and allow the user to select one at startup time.

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@862 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishd/Makefile.am
varnish-cache/bin/varnishd/cache_acceptor.c
varnish-cache/bin/varnishd/cache_acceptor_epoll.c [new file with mode: 0644]
varnish-cache/bin/varnishd/cache_acceptor_kqueue.c [new file with mode: 0644]
varnish-cache/bin/varnishd/cache_acceptor_poll.c [new file with mode: 0644]

index 41586d9fffcfe18f7d893913b509b8ee4c77f8b9..b16e2a2e42d79f76fc18b6eb2cd4d5fbb31e7915 100644 (file)
@@ -8,6 +8,9 @@ dist_man_MANS = varnishd.1
 
 varnishd_SOURCES = \
        cache_acceptor.c \
+       cache_acceptor_epoll.c \
+       cache_acceptor_poll.c \
+       cache_acceptor_kqueue.c \
        cache_backend.c \
        cache_ban.c \
        cache_center.c \
index e2a40c1f47eef675c668f186be0efd2e0e1e0c7a..3abc20ecb3eb656a9b22838be0ffd12fe1a62f03 100644 (file)
@@ -9,7 +9,7 @@
 #undef ACCEPTOR_USE_KQUEUE
 #undef ACCEPTOR_USE_EPOLL
 #undef ACCEPTOR_USE_POLL
-
+    
 #if defined(HAVE_KQUEUE)
 #define ACCEPTOR_USE_KQUEUE 1
 #elif defined(HAVE_EPOLL_CTL)
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
+#include "cache_acceptor.h"
+
+
+static struct acceptor *vca_acceptors[] = {
+#if defined(HAVE_KQUEUE)
+       &acceptor_kqueue,
+#endif
+#if defined(HAVE_EPOLL_CTL)
+       &acceptor_epoll,
+#endif
+#if defined(HAVE_POLL_CTL)
+       &acceptor_poll,
+#endif
+       NULL,
+};
 
-static pthread_t vca_thread;
 static unsigned                xids;
 
-static struct sess *
+struct sess *
 vca_accept_sess(int fd)
 {
        socklen_t l;
@@ -90,7 +104,7 @@ vca_accept_sess(int fd)
        return (sp);
 }
 
-static void
+void
 vca_handover(struct sess *sp, int bad)
 {
 
@@ -107,451 +121,24 @@ vca_handover(struct sess *sp, int bad)
        WRK_QueueSession(sp);
 }
 
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_POLL
-
-#include <poll.h>
-
-static struct pollfd *pollfd;
-static unsigned npoll;
-
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_pollspace(int fd)
-{
-       struct pollfd *p;
-       unsigned u, v;
-
-       if (fd < npoll)
-               return;
-       if (npoll == 0)
-               npoll = 16;
-       for (u = npoll; fd >= u; )
-               u += u;
-       VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
-       p = realloc(pollfd, u * sizeof *p);
-       assert(p != NULL);
-       memset(p + npoll, 0, (u - npoll) * sizeof *p);
-       for (v = npoll ; v <= u; v++) 
-               p->fd = -1;
-       pollfd = p;
-       npoll = u;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_poll(int fd)
-{
-       vca_pollspace(fd);
-       pollfd[fd].fd = fd;
-       pollfd[fd].events = POLLIN;
-}
-
-static void
-vca_unpoll(int fd)
-{
-       vca_pollspace(fd);
-       pollfd[fd].fd = -1;
-       pollfd[fd].events = 0;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
-       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-       TAILQ_INSERT_TAIL(&sesshead, sp, list);
-       vca_poll(sp->fd);
-}
-
-static void
-accept_f(int fd)
-{
-       struct sess *sp;
-
-       sp = vca_accept_sess(fd);
-       if (sp == NULL)
-               return;
-
-       http_RecvPrep(sp->http);
-       vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
-       unsigned u, v;
-       struct sess *sp, *sp2;
-       struct timespec t;
-       int i;
-
-       (void)arg;
-
-       AZ(pipe(pipes));
-       vca_poll(pipes[0]);
-
-       if (heritage.socket >= 0)
-               vca_poll(heritage.socket);
-
-       while (1) {
-               v = poll(pollfd, npoll, 5000);
-               if (v && pollfd[pipes[0]].revents) {
-                       v--;
-                       i = read(pipes[0], &sp, sizeof sp);
-                       assert(i == sizeof sp);
-                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-                       if (http_RecvPrepAgain(sp->http))
-                               vca_handover(sp, 0);
-                       else
-                               vca_rcvhdev(sp);
-               }
-               if (heritage.socket >= 0 &&
-                   pollfd[heritage.socket].revents) {
-                       accept_f(heritage.socket);
-                       v--;
-               }
-               clock_gettime(CLOCK_MONOTONIC, &t);
-               TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
-                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-                       if (pollfd[sp->fd].revents) {
-                               v--;
-                               i = http_RecvSome(sp->fd, sp->http);
-                               if (i < 0)
-                                       continue;
-
-                               vca_unpoll(sp->fd);
-                               TAILQ_REMOVE(&sesshead, sp, list);
-                               vca_handover(sp, i);
-                               continue;
-                       }
-                       if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
-                               TAILQ_REMOVE(&sesshead, sp, list);
-                               vca_unpoll(sp->fd);
-                               vca_close_session(sp, "timeout");
-                               vca_return_session(sp);
-                               continue;
-                       }
-                       if (v == 0)
-                               break;
-               }
-       }
-
-       INCOMPL();
-}
-
 /*--------------------------------------------------------------------*/
 
 void
-vca_return_session(struct sess *sp)
-{
-
-       if (sp->fd < 0) {
-               SES_Delete(sp);
-               return;
-       }
-       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-       assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-}
-
-#endif /* ACCEPTOR_USE_POLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_EPOLL
-
-#include <sys/epoll.h>
-
-static int epfd = -1;
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
-static void
-vca_add(int fd, void *data)
-{
-       struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
-       AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
-}
-
-static void
-vca_del(int fd)
-{
-       AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
-}
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
-       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-       TAILQ_INSERT_TAIL(&sesshead, sp, list);
-       vca_add(sp->fd, sp);
-}
-
-static void
-accept_f(int fd)
-{
-       struct sess *sp;
-
-       sp = vca_accept_sess(fd);
-       if (sp == NULL)
-               return;
-       http_RecvPrep(sp->http);
-       vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
-       struct epoll_event ev;
-       struct timespec t;
-       struct sess *sp, *sp2;
-       int i;
-
-       (void)arg;
-
-       epfd = epoll_create(16);
-       assert(epfd >= 0);
-
-       AZ(pipe(pipes));
-       vca_add(pipes[0], pipes);
-
-       if (heritage.socket >= 0)
-               vca_add(heritage.socket, accept_f);
-
-       while (1) {
-               if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
-                       if (ev.data.ptr == pipes) {
-                               i = read(pipes[0], &sp, sizeof sp);
-                               assert(i == sizeof sp);
-                               CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-                               if (http_RecvPrepAgain(sp->http))
-                                       vca_handover(sp, 0);
-                               else
-                                       vca_rcvhdev(sp);
-                       } else if (ev.data.ptr == accept_f) {
-                               accept_f(heritage.socket);
-                       } else {
-                               CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
-                               i = http_RecvSome(sp->fd, sp->http);
-                               if (i != -1) {
-                                       TAILQ_REMOVE(&sesshead, sp, list);
-                                       vca_del(sp->fd);
-                                       vca_handover(sp, i);
-                               }
-                       }
-               }
-               /* check for timeouts */
-               clock_gettime(CLOCK_MONOTONIC, &t);
-               TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
-                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-                       if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
-                               TAILQ_REMOVE(&sesshead, sp, list);
-                               vca_del(sp->fd);
-                               vca_close_session(sp, "timeout");
-                               vca_return_session(sp);
-                               continue;
-                       }
-               }
-       }
-
-       INCOMPL();
-}
-
-/*--------------------------------------------------------------------*/
-
-void
-vca_return_session(struct sess *sp)
-{
-
-       if (sp->fd < 0) {
-               SES_Delete(sp);
-               return;
-       }
-       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-       assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-}
-
-#endif /* ACCEPTOR_USE_EPOLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_KQUEUE
-
-#include <sys/event.h>
-
-static int kq = -1;
-
-static void
-vca_kq_sess(struct sess *sp, int arm)
+vca_close_session(struct sess *sp, const char *why)
 {
-       struct kevent ke[2];
-       int i, j, arm2;
 
-       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-       memset(ke, 0, sizeof ke);
-       if (arm == EV_ADD || arm == EV_ENABLE) {
-               assert(sp->kqa == 0);
-               sp->kqa = 1;
-               arm2 = EV_ADD;
-       } else  {
-               assert(sp->kqa == 1);
-               sp->kqa = 0;
-               arm2 = EV_DELETE;
-       }
-       j = 0;
-       EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
-           0, params->sess_timeout * 1000, sp);
+       VSL(SLT_SessionClose, sp->fd, why);
        if (sp->fd >= 0)
-               EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
-       i = kevent(kq, ke, j, NULL, 0, NULL);
-       assert(i == 0);
-}
-
-static struct sess *
-vca_kev(struct kevent *kp)
-{
-       int i;
-       struct sess *sp;
-
-       if (kp->udata == vca_accept_sess) {
-               while (kp->data-- > 0) {
-                       sp = vca_accept_sess(kp->ident);
-                       if (sp == NULL)
-                               return (NULL);
-                       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-                       http_RecvPrep(sp->http);
-                       vca_kq_sess(sp, EV_ADD);
-               }
-               return (NULL);
-       }
-       if (kp->udata == NULL) {
-               VSL(SLT_Debug, 0,
-                   "KQ RACE %s flags %x fflags %x data %x",
-                   kp->filter == EVFILT_READ ? "R" : "T",
-                   kp->flags, kp->fflags, kp->data);
-               return (NULL);
-       }
-       CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
-       if (sp->kqa == 0) {
-               VSL(SLT_Debug, sp->id,
-                   "KQ %s flags %x fflags %x data %x",
-                   kp->filter == EVFILT_READ ? "R" : "T",
-                   kp->flags, kp->fflags, kp->data);
-               return (NULL);
-       }
-       if (kp->filter == EVFILT_READ) {
-               if (kp->data > 0) {
-                       i = http_RecvSome(sp->fd, sp->http);
-                       switch (i) {
-                       case -1:
-                               return (NULL);
-                       case 0:
-                               vca_kq_sess(sp, EV_DISABLE);
-                               vca_handover(sp, i);
-                               return (NULL);   /* ?? */
-                       case 1:
-                               vca_close_session(sp, "overflow");
-                               break;
-                       case 2:
-                               vca_close_session(sp, "no request");
-                               break;
-                       default:
-                               INCOMPL();
-                       }
-                       return (sp);
-               }
-               if (kp->flags == EV_EOF) {
-                       vca_close_session(sp, "EOF");
-                       return (sp);
-               }
-               INCOMPL();
-       }
-       if (kp->filter == EVFILT_TIMER) {
-               vca_close_session(sp, "timeout");
-               return (sp);
-       }
-       INCOMPL();
-}
-
-
-#define NKEV   100
-
-static void *
-vca_main(void *arg)
-{
-       struct kevent ke[NKEV], *kp;
-       int i, j, n;
-       struct sess *sp;
-
-       (void)arg;
-
-       kq = kqueue();
-       assert(kq >= 0);
-
-
-       assert(heritage.socket >= 0);
-       EV_SET(&ke[0], heritage.socket,
-           EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
-       AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
-
-       while (1) {
-               n = kevent(kq, NULL, 0, ke, NKEV, NULL);
-               assert(n >= 1 && n <= NKEV);
-               for (kp = ke, j = 0; j < n; j++, kp++) {
-                       sp = vca_kev(kp);
-                       if (sp != NULL) {
-                               vca_kq_sess(sp, EV_DELETE);
-                               SES_Delete(sp);
-                               for (i = j; i < n; i++)
-                                       if (ke[i].udata == sp)
-                                               ke[i].udata = NULL;
-                       }
-               }
-       }
-       INCOMPL();
+               AZ(close(sp->fd));
+       sp->fd = -1;
 }
 
-/*--------------------------------------------------------------------*/
-
 void
 vca_return_session(struct sess *sp)
 {
 
-       if (sp->fd < 0) {
-               SES_Delete(sp);
-               return;
-       }
-       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-       if (http_RecvPrepAgain(sp->http))
-               vca_handover(sp, 0);
-       else 
-               vca_kq_sess(sp, EV_ENABLE);
-}
-
-#endif /* ACCEPTOR_USE_KQUEUE */
-/*====================================================================*/
-
-/*--------------------------------------------------------------------*/
-
-void
-vca_close_session(struct sess *sp, const char *why)
-{
-
-       VSL(SLT_SessionClose, sp->fd, why);
-       if (sp->fd >= 0)
-               AZ(close(sp->fd));
-       sp->fd = -1;
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       vca_acceptors[0]->recycle(sp);
 }
 
 /*--------------------------------------------------------------------*/
@@ -560,7 +147,9 @@ void
 VCA_Init(void)
 {
 
-       AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
        srandomdev();
        xids = random();
+
+       /* XXX: Add selector mechanism at some point */
+       vca_acceptors[0]->init();
 }
diff --git a/varnish-cache/bin/varnishd/cache_acceptor_epoll.c b/varnish-cache/bin/varnishd/cache_acceptor_epoll.c
new file mode 100644 (file)
index 0000000..befe654
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_EPOLL_CTL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_epoll_thread;
+static int epfd = -1;
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+static void
+vca_add(int fd, void *data)
+{
+       struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
+       AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
+}
+
+static void
+vca_del(int fd)
+{
+       AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
+}
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+       TAILQ_INSERT_TAIL(&sesshead, sp, list);
+       vca_add(sp->fd, sp);
+}
+
+static void
+accept_f(int fd)
+{
+       struct sess *sp;
+
+       sp = vca_accept_sess(fd);
+       if (sp == NULL)
+               return;
+       http_RecvPrep(sp->http);
+       vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+       struct epoll_event ev;
+       struct timespec t;
+       struct sess *sp, *sp2;
+       int i;
+
+       (void)arg;
+
+       epfd = epoll_create(16);
+       assert(epfd >= 0);
+
+       AZ(pipe(pipes));
+       vca_add(pipes[0], pipes);
+
+       if (heritage.socket >= 0)
+               vca_add(heritage.socket, accept_f);
+
+       while (1) {
+               if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
+                       if (ev.data.ptr == pipes) {
+                               i = read(pipes[0], &sp, sizeof sp);
+                               assert(i == sizeof sp);
+                               CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+                               if (http_RecvPrepAgain(sp->http))
+                                       vca_handover(sp, 0);
+                               else
+                                       vca_rcvhdev(sp);
+                       } else if (ev.data.ptr == accept_f) {
+                               accept_f(heritage.socket);
+                       } else {
+                               CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
+                               i = http_RecvSome(sp->fd, sp->http);
+                               if (i != -1) {
+                                       TAILQ_REMOVE(&sesshead, sp, list);
+                                       vca_del(sp->fd);
+                                       vca_handover(sp, i);
+                               }
+                       }
+               }
+               /* check for timeouts */
+               clock_gettime(CLOCK_MONOTONIC, &t);
+               TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+                       if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+                               TAILQ_REMOVE(&sesshead, sp, list);
+                               vca_del(sp->fd);
+                               vca_close_session(sp, "timeout");
+                               vca_return_session(sp);
+                               continue;
+                       }
+               }
+       }
+
+       INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_epoll_recycle(struct sess *sp)
+{
+
+       if (sp->fd < 0) {
+               SES_Delete(sp);
+               return;
+       }
+       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+       assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_epoll_init(void)
+{
+       AZ(pthread_create(&vca_epoll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_epoll = {
+       .name =         "epoll",
+       .init =         vca_epoll_init,
+       .recycle =      vca_epoll_recycle,
+};
+
+#endif /* defined(HAVE_EPOLL_CTL) */
diff --git a/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c b/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
new file mode 100644 (file)
index 0000000..4be6827
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_KQUEUE)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/event.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_kqueue_thread;
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+       struct kevent ke[2];
+       int i, j, arm2;
+
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       memset(ke, 0, sizeof ke);
+       if (arm == EV_ADD || arm == EV_ENABLE) {
+               assert(sp->kqa == 0);
+               sp->kqa = 1;
+               arm2 = EV_ADD;
+       } else  {
+               assert(sp->kqa == 1);
+               sp->kqa = 0;
+               arm2 = EV_DELETE;
+       }
+       j = 0;
+       EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
+           0, params->sess_timeout * 1000, sp);
+       if (sp->fd >= 0)
+               EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+
+       i = kevent(kq, ke, j, NULL, 0, NULL);
+       assert(i == 0);
+}
+
+static struct sess *
+vca_kev(struct kevent *kp)
+{
+       int i;
+       struct sess *sp;
+
+       if (kp->udata == vca_accept_sess) {
+               while (kp->data-- > 0) {
+                       sp = vca_accept_sess(kp->ident);
+                       if (sp == NULL)
+                               return (NULL);
+                       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+                       http_RecvPrep(sp->http);
+                       vca_kq_sess(sp, EV_ADD);
+               }
+               return (NULL);
+       }
+       if (kp->udata == NULL) {
+               VSL(SLT_Debug, 0,
+                   "KQ RACE %s flags %x fflags %x data %x",
+                   kp->filter == EVFILT_READ ? "R" : "T",
+                   kp->flags, kp->fflags, kp->data);
+               return (NULL);
+       }
+       CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
+       if (sp->kqa == 0) {
+               VSL(SLT_Debug, sp->id,
+                   "KQ %s flags %x fflags %x data %x",
+                   kp->filter == EVFILT_READ ? "R" : "T",
+                   kp->flags, kp->fflags, kp->data);
+               return (NULL);
+       }
+       if (kp->filter == EVFILT_READ) {
+               if (kp->data > 0) {
+                       i = http_RecvSome(sp->fd, sp->http);
+                       switch (i) {
+                       case -1:
+                               return (NULL);
+                       case 0:
+                               vca_kq_sess(sp, EV_DISABLE);
+                               vca_handover(sp, i);
+                               return (NULL);   /* ?? */
+                       case 1:
+                               vca_close_session(sp, "overflow");
+                               break;
+                       case 2:
+                               vca_close_session(sp, "no request");
+                               break;
+                       default:
+                               INCOMPL();
+                       }
+                       return (sp);
+               }
+               if (kp->flags == EV_EOF) {
+                       vca_close_session(sp, "EOF");
+                       return (sp);
+               }
+               INCOMPL();
+       }
+       if (kp->filter == EVFILT_TIMER) {
+               vca_close_session(sp, "timeout");
+               return (sp);
+       }
+       INCOMPL();
+}
+
+
+#define NKEV   100
+
+static void *
+vca_main(void *arg)
+{
+       struct kevent ke[NKEV], *kp;
+       int i, j, n;
+       struct sess *sp;
+
+       (void)arg;
+
+       kq = kqueue();
+       assert(kq >= 0);
+
+
+       assert(heritage.socket >= 0);
+       EV_SET(&ke[0], heritage.socket,
+           EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
+       AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
+
+       while (1) {
+               n = kevent(kq, NULL, 0, ke, NKEV, NULL);
+               assert(n >= 1 && n <= NKEV);
+               for (kp = ke, j = 0; j < n; j++, kp++) {
+                       sp = vca_kev(kp);
+                       if (sp != NULL) {
+                               vca_kq_sess(sp, EV_DELETE);
+                               SES_Delete(sp);
+                               for (i = j; i < n; i++)
+                                       if (ke[i].udata == sp)
+                                               ke[i].udata = NULL;
+                       }
+               }
+       }
+       INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_kqueue_recycle(struct sess *sp)
+{
+
+       if (sp->fd < 0) {
+               SES_Delete(sp);
+               return;
+       }
+       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+       if (http_RecvPrepAgain(sp->http))
+               vca_handover(sp, 0);
+       else 
+               vca_kq_sess(sp, EV_ENABLE);
+}
+
+static void
+vca_kqueue_init(void)
+{
+       AZ(pthread_create(&vca_kqueue_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_kqueue = {
+       .name =         "kqueue",
+       .init =         vca_kqueue_init,
+       .recycle =      vca_kqueue_recycle,
+};
+
+#endif /* defined(HAVE_KQUEUE) */
diff --git a/varnish-cache/bin/varnishd/cache_acceptor_poll.c b/varnish-cache/bin/varnishd/cache_acceptor_poll.c
new file mode 100644 (file)
index 0000000..3eebfe5
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_POLL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <poll.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_poll_thread;
+static struct pollfd *pollfd;
+static unsigned npoll;
+
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_pollspace(int fd)
+{
+       struct pollfd *p;
+       unsigned u, v;
+
+       if (fd < npoll)
+               return;
+       if (npoll == 0)
+               npoll = 16;
+       for (u = npoll; fd >= u; )
+               u += u;
+       VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
+       p = realloc(pollfd, u * sizeof *p);
+       assert(p != NULL);
+       memset(p + npoll, 0, (u - npoll) * sizeof *p);
+       for (v = npoll ; v <= u; v++) 
+               p->fd = -1;
+       pollfd = p;
+       npoll = u;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll(int fd)
+{
+       vca_pollspace(fd);
+       pollfd[fd].fd = fd;
+       pollfd[fd].events = POLLIN;
+}
+
+static void
+vca_unpoll(int fd)
+{
+       vca_pollspace(fd);
+       pollfd[fd].fd = -1;
+       pollfd[fd].events = 0;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+       TAILQ_INSERT_TAIL(&sesshead, sp, list);
+       vca_poll(sp->fd);
+}
+
+static void
+accept_f(int fd)
+{
+       struct sess *sp;
+
+       sp = vca_accept_sess(fd);
+       if (sp == NULL)
+               return;
+
+       http_RecvPrep(sp->http);
+       vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+       unsigned u, v;
+       struct sess *sp, *sp2;
+       struct timespec t;
+       int i;
+
+       (void)arg;
+
+       AZ(pipe(pipes));
+       vca_poll(pipes[0]);
+
+       if (heritage.socket >= 0)
+               vca_poll(heritage.socket);
+
+       while (1) {
+               v = poll(pollfd, npoll, 5000);
+               if (v && pollfd[pipes[0]].revents) {
+                       v--;
+                       i = read(pipes[0], &sp, sizeof sp);
+                       assert(i == sizeof sp);
+                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+                       if (http_RecvPrepAgain(sp->http))
+                               vca_handover(sp, 0);
+                       else
+                               vca_rcvhdev(sp);
+               }
+               if (heritage.socket >= 0 &&
+                   pollfd[heritage.socket].revents) {
+                       accept_f(heritage.socket);
+                       v--;
+               }
+               clock_gettime(CLOCK_MONOTONIC, &t);
+               TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+                       if (pollfd[sp->fd].revents) {
+                               v--;
+                               i = http_RecvSome(sp->fd, sp->http);
+                               if (i < 0)
+                                       continue;
+
+                               vca_unpoll(sp->fd);
+                               TAILQ_REMOVE(&sesshead, sp, list);
+                               vca_handover(sp, i);
+                               continue;
+                       }
+                       if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+                               TAILQ_REMOVE(&sesshead, sp, list);
+                               vca_unpoll(sp->fd);
+                               vca_close_session(sp, "timeout");
+                               vca_return_session(sp);
+                               continue;
+                       }
+                       if (v == 0)
+                               break;
+               }
+       }
+
+       INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll_recycle(struct sess *sp)
+{
+
+       if (sp->fd < 0) {
+               SES_Delete(sp);
+               return;
+       }
+       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+       assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_poll_init(void)
+{
+       AZ(pthread_create(&vca_poll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_poll = {
+       .name =         "poll",
+       .init =         vca_poll_init,
+       .recycle =      vca_poll_recycle,
+};
+
+#endif /* defined(HAVE_POLL) */