varnishd_SOURCES = \
cache_acceptor.c \
+ cache_acceptor_epoll.c \
+ cache_acceptor_poll.c \
+ cache_acceptor_kqueue.c \
cache_backend.c \
cache_ban.c \
cache_center.c \
#undef ACCEPTOR_USE_KQUEUE
#undef ACCEPTOR_USE_EPOLL
#undef ACCEPTOR_USE_POLL
-
+
#if defined(HAVE_KQUEUE)
#define ACCEPTOR_USE_KQUEUE 1
#elif defined(HAVE_EPOLL_CTL)
#include "heritage.h"
#include "shmlog.h"
#include "cache.h"
+#include "cache_acceptor.h"
+
+
+static struct acceptor *vca_acceptors[] = {
+#if defined(HAVE_KQUEUE)
+ &acceptor_kqueue,
+#endif
+#if defined(HAVE_EPOLL_CTL)
+ &acceptor_epoll,
+#endif
+#if defined(HAVE_POLL_CTL)
+ &acceptor_poll,
+#endif
+ NULL,
+};
-static pthread_t vca_thread;
static unsigned xids;
-static struct sess *
+struct sess *
vca_accept_sess(int fd)
{
socklen_t l;
return (sp);
}
-static void
+void
vca_handover(struct sess *sp, int bad)
{
WRK_QueueSession(sp);
}
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_POLL
-
-#include <poll.h>
-
-static struct pollfd *pollfd;
-static unsigned npoll;
-
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_pollspace(int fd)
-{
- struct pollfd *p;
- unsigned u, v;
-
- if (fd < npoll)
- return;
- if (npoll == 0)
- npoll = 16;
- for (u = npoll; fd >= u; )
- u += u;
- VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
- p = realloc(pollfd, u * sizeof *p);
- assert(p != NULL);
- memset(p + npoll, 0, (u - npoll) * sizeof *p);
- for (v = npoll ; v <= u; v++)
- p->fd = -1;
- pollfd = p;
- npoll = u;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_poll(int fd)
-{
- vca_pollspace(fd);
- pollfd[fd].fd = fd;
- pollfd[fd].events = POLLIN;
-}
-
-static void
-vca_unpoll(int fd)
-{
- vca_pollspace(fd);
- pollfd[fd].fd = -1;
- pollfd[fd].events = 0;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- TAILQ_INSERT_TAIL(&sesshead, sp, list);
- vca_poll(sp->fd);
-}
-
-static void
-accept_f(int fd)
-{
- struct sess *sp;
-
- sp = vca_accept_sess(fd);
- if (sp == NULL)
- return;
-
- http_RecvPrep(sp->http);
- vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
- unsigned u, v;
- struct sess *sp, *sp2;
- struct timespec t;
- int i;
-
- (void)arg;
-
- AZ(pipe(pipes));
- vca_poll(pipes[0]);
-
- if (heritage.socket >= 0)
- vca_poll(heritage.socket);
-
- while (1) {
- v = poll(pollfd, npoll, 5000);
- if (v && pollfd[pipes[0]].revents) {
- v--;
- i = read(pipes[0], &sp, sizeof sp);
- assert(i == sizeof sp);
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (http_RecvPrepAgain(sp->http))
- vca_handover(sp, 0);
- else
- vca_rcvhdev(sp);
- }
- if (heritage.socket >= 0 &&
- pollfd[heritage.socket].revents) {
- accept_f(heritage.socket);
- v--;
- }
- clock_gettime(CLOCK_MONOTONIC, &t);
- TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (pollfd[sp->fd].revents) {
- v--;
- i = http_RecvSome(sp->fd, sp->http);
- if (i < 0)
- continue;
-
- vca_unpoll(sp->fd);
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_handover(sp, i);
- continue;
- }
- if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_unpoll(sp->fd);
- vca_close_session(sp, "timeout");
- vca_return_session(sp);
- continue;
- }
- if (v == 0)
- break;
- }
- }
-
- INCOMPL();
-}
-
/*--------------------------------------------------------------------*/
void
-vca_return_session(struct sess *sp)
-{
-
- if (sp->fd < 0) {
- SES_Delete(sp);
- return;
- }
- (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
- VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
- assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-}
-
-#endif /* ACCEPTOR_USE_POLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_EPOLL
-
-#include <sys/epoll.h>
-
-static int epfd = -1;
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
-static void
-vca_add(int fd, void *data)
-{
- struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
- AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
-}
-
-static void
-vca_del(int fd)
-{
- AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
-}
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- TAILQ_INSERT_TAIL(&sesshead, sp, list);
- vca_add(sp->fd, sp);
-}
-
-static void
-accept_f(int fd)
-{
- struct sess *sp;
-
- sp = vca_accept_sess(fd);
- if (sp == NULL)
- return;
- http_RecvPrep(sp->http);
- vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
- struct epoll_event ev;
- struct timespec t;
- struct sess *sp, *sp2;
- int i;
-
- (void)arg;
-
- epfd = epoll_create(16);
- assert(epfd >= 0);
-
- AZ(pipe(pipes));
- vca_add(pipes[0], pipes);
-
- if (heritage.socket >= 0)
- vca_add(heritage.socket, accept_f);
-
- while (1) {
- if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
- if (ev.data.ptr == pipes) {
- i = read(pipes[0], &sp, sizeof sp);
- assert(i == sizeof sp);
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (http_RecvPrepAgain(sp->http))
- vca_handover(sp, 0);
- else
- vca_rcvhdev(sp);
- } else if (ev.data.ptr == accept_f) {
- accept_f(heritage.socket);
- } else {
- CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
- i = http_RecvSome(sp->fd, sp->http);
- if (i != -1) {
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_del(sp->fd);
- vca_handover(sp, i);
- }
- }
- }
- /* check for timeouts */
- clock_gettime(CLOCK_MONOTONIC, &t);
- TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_del(sp->fd);
- vca_close_session(sp, "timeout");
- vca_return_session(sp);
- continue;
- }
- }
- }
-
- INCOMPL();
-}
-
-/*--------------------------------------------------------------------*/
-
-void
-vca_return_session(struct sess *sp)
-{
-
- if (sp->fd < 0) {
- SES_Delete(sp);
- return;
- }
- (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
- VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
- assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-}
-
-#endif /* ACCEPTOR_USE_EPOLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_KQUEUE
-
-#include <sys/event.h>
-
-static int kq = -1;
-
-static void
-vca_kq_sess(struct sess *sp, int arm)
+vca_close_session(struct sess *sp, const char *why)
{
- struct kevent ke[2];
- int i, j, arm2;
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- memset(ke, 0, sizeof ke);
- if (arm == EV_ADD || arm == EV_ENABLE) {
- assert(sp->kqa == 0);
- sp->kqa = 1;
- arm2 = EV_ADD;
- } else {
- assert(sp->kqa == 1);
- sp->kqa = 0;
- arm2 = EV_DELETE;
- }
- j = 0;
- EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
- 0, params->sess_timeout * 1000, sp);
+ VSL(SLT_SessionClose, sp->fd, why);
if (sp->fd >= 0)
- EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
- i = kevent(kq, ke, j, NULL, 0, NULL);
- assert(i == 0);
-}
-
-static struct sess *
-vca_kev(struct kevent *kp)
-{
- int i;
- struct sess *sp;
-
- if (kp->udata == vca_accept_sess) {
- while (kp->data-- > 0) {
- sp = vca_accept_sess(kp->ident);
- if (sp == NULL)
- return (NULL);
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- http_RecvPrep(sp->http);
- vca_kq_sess(sp, EV_ADD);
- }
- return (NULL);
- }
- if (kp->udata == NULL) {
- VSL(SLT_Debug, 0,
- "KQ RACE %s flags %x fflags %x data %x",
- kp->filter == EVFILT_READ ? "R" : "T",
- kp->flags, kp->fflags, kp->data);
- return (NULL);
- }
- CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
- if (sp->kqa == 0) {
- VSL(SLT_Debug, sp->id,
- "KQ %s flags %x fflags %x data %x",
- kp->filter == EVFILT_READ ? "R" : "T",
- kp->flags, kp->fflags, kp->data);
- return (NULL);
- }
- if (kp->filter == EVFILT_READ) {
- if (kp->data > 0) {
- i = http_RecvSome(sp->fd, sp->http);
- switch (i) {
- case -1:
- return (NULL);
- case 0:
- vca_kq_sess(sp, EV_DISABLE);
- vca_handover(sp, i);
- return (NULL); /* ?? */
- case 1:
- vca_close_session(sp, "overflow");
- break;
- case 2:
- vca_close_session(sp, "no request");
- break;
- default:
- INCOMPL();
- }
- return (sp);
- }
- if (kp->flags == EV_EOF) {
- vca_close_session(sp, "EOF");
- return (sp);
- }
- INCOMPL();
- }
- if (kp->filter == EVFILT_TIMER) {
- vca_close_session(sp, "timeout");
- return (sp);
- }
- INCOMPL();
-}
-
-
-#define NKEV 100
-
-static void *
-vca_main(void *arg)
-{
- struct kevent ke[NKEV], *kp;
- int i, j, n;
- struct sess *sp;
-
- (void)arg;
-
- kq = kqueue();
- assert(kq >= 0);
-
-
- assert(heritage.socket >= 0);
- EV_SET(&ke[0], heritage.socket,
- EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
- AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
-
- while (1) {
- n = kevent(kq, NULL, 0, ke, NKEV, NULL);
- assert(n >= 1 && n <= NKEV);
- for (kp = ke, j = 0; j < n; j++, kp++) {
- sp = vca_kev(kp);
- if (sp != NULL) {
- vca_kq_sess(sp, EV_DELETE);
- SES_Delete(sp);
- for (i = j; i < n; i++)
- if (ke[i].udata == sp)
- ke[i].udata = NULL;
- }
- }
- }
- INCOMPL();
+ AZ(close(sp->fd));
+ sp->fd = -1;
}
-/*--------------------------------------------------------------------*/
-
void
vca_return_session(struct sess *sp)
{
- if (sp->fd < 0) {
- SES_Delete(sp);
- return;
- }
- (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
- VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
- if (http_RecvPrepAgain(sp->http))
- vca_handover(sp, 0);
- else
- vca_kq_sess(sp, EV_ENABLE);
-}
-
-#endif /* ACCEPTOR_USE_KQUEUE */
-/*====================================================================*/
-
-/*--------------------------------------------------------------------*/
-
-void
-vca_close_session(struct sess *sp, const char *why)
-{
-
- VSL(SLT_SessionClose, sp->fd, why);
- if (sp->fd >= 0)
- AZ(close(sp->fd));
- sp->fd = -1;
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ vca_acceptors[0]->recycle(sp);
}
/*--------------------------------------------------------------------*/
VCA_Init(void)
{
- AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
srandomdev();
xids = random();
+
+ /* XXX: Add selector mechanism at some point */
+ vca_acceptors[0]->init();
}
--- /dev/null
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused. Not sure what the most efficient way is for that. For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_EPOLL_CTL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_epoll_thread;
+static int epfd = -1;
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+static void
+vca_add(int fd, void *data)
+{
+ struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
+ AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
+}
+
+static void
+vca_del(int fd)
+{
+ AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
+}
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ TAILQ_INSERT_TAIL(&sesshead, sp, list);
+ vca_add(sp->fd, sp);
+}
+
+static void
+accept_f(int fd)
+{
+ struct sess *sp;
+
+ sp = vca_accept_sess(fd);
+ if (sp == NULL)
+ return;
+ http_RecvPrep(sp->http);
+ vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+ struct epoll_event ev;
+ struct timespec t;
+ struct sess *sp, *sp2;
+ int i;
+
+ (void)arg;
+
+ epfd = epoll_create(16);
+ assert(epfd >= 0);
+
+ AZ(pipe(pipes));
+ vca_add(pipes[0], pipes);
+
+ if (heritage.socket >= 0)
+ vca_add(heritage.socket, accept_f);
+
+ while (1) {
+ if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
+ if (ev.data.ptr == pipes) {
+ i = read(pipes[0], &sp, sizeof sp);
+ assert(i == sizeof sp);
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ else
+ vca_rcvhdev(sp);
+ } else if (ev.data.ptr == accept_f) {
+ accept_f(heritage.socket);
+ } else {
+ CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
+ i = http_RecvSome(sp->fd, sp->http);
+ if (i != -1) {
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_del(sp->fd);
+ vca_handover(sp, i);
+ }
+ }
+ }
+ /* check for timeouts */
+ clock_gettime(CLOCK_MONOTONIC, &t);
+ TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_del(sp->fd);
+ vca_close_session(sp, "timeout");
+ vca_return_session(sp);
+ continue;
+ }
+ }
+ }
+
+ INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_epoll_recycle(struct sess *sp)
+{
+
+ if (sp->fd < 0) {
+ SES_Delete(sp);
+ return;
+ }
+ (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_epoll_init(void)
+{
+ AZ(pthread_create(&vca_epoll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_epoll = {
+ .name = "epoll",
+ .init = vca_epoll_init,
+ .recycle = vca_epoll_recycle,
+};
+
+#endif /* defined(HAVE_EPOLL_CTL) */
--- /dev/null
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused. Not sure what the most efficient way is for that. For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_KQUEUE)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/event.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_kqueue_thread;
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+ struct kevent ke[2];
+ int i, j, arm2;
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ memset(ke, 0, sizeof ke);
+ if (arm == EV_ADD || arm == EV_ENABLE) {
+ assert(sp->kqa == 0);
+ sp->kqa = 1;
+ arm2 = EV_ADD;
+ } else {
+ assert(sp->kqa == 1);
+ sp->kqa = 0;
+ arm2 = EV_DELETE;
+ }
+ j = 0;
+ EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
+ 0, params->sess_timeout * 1000, sp);
+ if (sp->fd >= 0)
+ EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+
+ i = kevent(kq, ke, j, NULL, 0, NULL);
+ assert(i == 0);
+}
+
+static struct sess *
+vca_kev(struct kevent *kp)
+{
+ int i;
+ struct sess *sp;
+
+ if (kp->udata == vca_accept_sess) {
+ while (kp->data-- > 0) {
+ sp = vca_accept_sess(kp->ident);
+ if (sp == NULL)
+ return (NULL);
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ http_RecvPrep(sp->http);
+ vca_kq_sess(sp, EV_ADD);
+ }
+ return (NULL);
+ }
+ if (kp->udata == NULL) {
+ VSL(SLT_Debug, 0,
+ "KQ RACE %s flags %x fflags %x data %x",
+ kp->filter == EVFILT_READ ? "R" : "T",
+ kp->flags, kp->fflags, kp->data);
+ return (NULL);
+ }
+ CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
+ if (sp->kqa == 0) {
+ VSL(SLT_Debug, sp->id,
+ "KQ %s flags %x fflags %x data %x",
+ kp->filter == EVFILT_READ ? "R" : "T",
+ kp->flags, kp->fflags, kp->data);
+ return (NULL);
+ }
+ if (kp->filter == EVFILT_READ) {
+ if (kp->data > 0) {
+ i = http_RecvSome(sp->fd, sp->http);
+ switch (i) {
+ case -1:
+ return (NULL);
+ case 0:
+ vca_kq_sess(sp, EV_DISABLE);
+ vca_handover(sp, i);
+ return (NULL); /* ?? */
+ case 1:
+ vca_close_session(sp, "overflow");
+ break;
+ case 2:
+ vca_close_session(sp, "no request");
+ break;
+ default:
+ INCOMPL();
+ }
+ return (sp);
+ }
+ if (kp->flags == EV_EOF) {
+ vca_close_session(sp, "EOF");
+ return (sp);
+ }
+ INCOMPL();
+ }
+ if (kp->filter == EVFILT_TIMER) {
+ vca_close_session(sp, "timeout");
+ return (sp);
+ }
+ INCOMPL();
+}
+
+
+#define NKEV 100
+
+static void *
+vca_main(void *arg)
+{
+ struct kevent ke[NKEV], *kp;
+ int i, j, n;
+ struct sess *sp;
+
+ (void)arg;
+
+ kq = kqueue();
+ assert(kq >= 0);
+
+
+ assert(heritage.socket >= 0);
+ EV_SET(&ke[0], heritage.socket,
+ EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
+ AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
+
+ while (1) {
+ n = kevent(kq, NULL, 0, ke, NKEV, NULL);
+ assert(n >= 1 && n <= NKEV);
+ for (kp = ke, j = 0; j < n; j++, kp++) {
+ sp = vca_kev(kp);
+ if (sp != NULL) {
+ vca_kq_sess(sp, EV_DELETE);
+ SES_Delete(sp);
+ for (i = j; i < n; i++)
+ if (ke[i].udata == sp)
+ ke[i].udata = NULL;
+ }
+ }
+ }
+ INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_kqueue_recycle(struct sess *sp)
+{
+
+ if (sp->fd < 0) {
+ SES_Delete(sp);
+ return;
+ }
+ (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ else
+ vca_kq_sess(sp, EV_ENABLE);
+}
+
+static void
+vca_kqueue_init(void)
+{
+ AZ(pthread_create(&vca_kqueue_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_kqueue = {
+ .name = "kqueue",
+ .init = vca_kqueue_init,
+ .recycle = vca_kqueue_recycle,
+};
+
+#endif /* defined(HAVE_KQUEUE) */
--- /dev/null
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused. Not sure what the most efficient way is for that. For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_POLL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <poll.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_poll_thread;
+static struct pollfd *pollfd;
+static unsigned npoll;
+
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_pollspace(int fd)
+{
+ struct pollfd *p;
+ unsigned u, v;
+
+ if (fd < npoll)
+ return;
+ if (npoll == 0)
+ npoll = 16;
+ for (u = npoll; fd >= u; )
+ u += u;
+ VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
+ p = realloc(pollfd, u * sizeof *p);
+ assert(p != NULL);
+ memset(p + npoll, 0, (u - npoll) * sizeof *p);
+ for (v = npoll ; v <= u; v++)
+ p->fd = -1;
+ pollfd = p;
+ npoll = u;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll(int fd)
+{
+ vca_pollspace(fd);
+ pollfd[fd].fd = fd;
+ pollfd[fd].events = POLLIN;
+}
+
+static void
+vca_unpoll(int fd)
+{
+ vca_pollspace(fd);
+ pollfd[fd].fd = -1;
+ pollfd[fd].events = 0;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ TAILQ_INSERT_TAIL(&sesshead, sp, list);
+ vca_poll(sp->fd);
+}
+
+static void
+accept_f(int fd)
+{
+ struct sess *sp;
+
+ sp = vca_accept_sess(fd);
+ if (sp == NULL)
+ return;
+
+ http_RecvPrep(sp->http);
+ vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+ unsigned u, v;
+ struct sess *sp, *sp2;
+ struct timespec t;
+ int i;
+
+ (void)arg;
+
+ AZ(pipe(pipes));
+ vca_poll(pipes[0]);
+
+ if (heritage.socket >= 0)
+ vca_poll(heritage.socket);
+
+ while (1) {
+ v = poll(pollfd, npoll, 5000);
+ if (v && pollfd[pipes[0]].revents) {
+ v--;
+ i = read(pipes[0], &sp, sizeof sp);
+ assert(i == sizeof sp);
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ else
+ vca_rcvhdev(sp);
+ }
+ if (heritage.socket >= 0 &&
+ pollfd[heritage.socket].revents) {
+ accept_f(heritage.socket);
+ v--;
+ }
+ clock_gettime(CLOCK_MONOTONIC, &t);
+ TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (pollfd[sp->fd].revents) {
+ v--;
+ i = http_RecvSome(sp->fd, sp->http);
+ if (i < 0)
+ continue;
+
+ vca_unpoll(sp->fd);
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_handover(sp, i);
+ continue;
+ }
+ if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_unpoll(sp->fd);
+ vca_close_session(sp, "timeout");
+ vca_return_session(sp);
+ continue;
+ }
+ if (v == 0)
+ break;
+ }
+ }
+
+ INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll_recycle(struct sess *sp)
+{
+
+ if (sp->fd < 0) {
+ SES_Delete(sp);
+ return;
+ }
+ (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+ VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+ assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_poll_init(void)
+{
+ AZ(pthread_create(&vca_poll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_poll = {
+ .name = "poll",
+ .init = vca_poll_init,
+ .recycle = vca_poll_recycle,
+};
+
+#endif /* defined(HAVE_POLL) */