From ebb0deb1b5c063b9a6cdf63765b4ee4376a64ef7 Mon Sep 17 00:00:00 2001 From: phk Date: Mon, 21 Aug 2006 10:59:00 +0000 Subject: [PATCH] Break the acceptors out into their own files. The intent here is to compile in all acceptors supported on the operating system and allow the user to select one at startup time. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@862 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishd/Makefile.am | 3 + varnish-cache/bin/varnishd/cache_acceptor.c | 465 +----------------- .../bin/varnishd/cache_acceptor_epoll.c | 157 ++++++ .../bin/varnishd/cache_acceptor_kqueue.c | 195 ++++++++ .../bin/varnishd/cache_acceptor_poll.c | 195 ++++++++ 5 files changed, 577 insertions(+), 438 deletions(-) create mode 100644 varnish-cache/bin/varnishd/cache_acceptor_epoll.c create mode 100644 varnish-cache/bin/varnishd/cache_acceptor_kqueue.c create mode 100644 varnish-cache/bin/varnishd/cache_acceptor_poll.c diff --git a/varnish-cache/bin/varnishd/Makefile.am b/varnish-cache/bin/varnishd/Makefile.am index 41586d9f..b16e2a2e 100644 --- a/varnish-cache/bin/varnishd/Makefile.am +++ b/varnish-cache/bin/varnishd/Makefile.am @@ -8,6 +8,9 @@ dist_man_MANS = varnishd.1 varnishd_SOURCES = \ cache_acceptor.c \ + cache_acceptor_epoll.c \ + cache_acceptor_poll.c \ + cache_acceptor_kqueue.c \ cache_backend.c \ cache_ban.c \ cache_center.c \ diff --git a/varnish-cache/bin/varnishd/cache_acceptor.c b/varnish-cache/bin/varnishd/cache_acceptor.c index e2a40c1f..3abc20ec 100644 --- a/varnish-cache/bin/varnishd/cache_acceptor.c +++ b/varnish-cache/bin/varnishd/cache_acceptor.c @@ -9,7 +9,7 @@ #undef ACCEPTOR_USE_KQUEUE #undef ACCEPTOR_USE_EPOLL #undef ACCEPTOR_USE_POLL - + #if defined(HAVE_KQUEUE) #define ACCEPTOR_USE_KQUEUE 1 #elif defined(HAVE_EPOLL_CTL) @@ -37,11 +37,25 @@ #include "heritage.h" #include "shmlog.h" #include "cache.h" +#include "cache_acceptor.h" + + +static struct acceptor *vca_acceptors[] = { +#if defined(HAVE_KQUEUE) + &acceptor_kqueue, +#endif +#if defined(HAVE_EPOLL_CTL) + &acceptor_epoll, +#endif +#if defined(HAVE_POLL_CTL) + &acceptor_poll, +#endif + NULL, +}; -static pthread_t vca_thread; static unsigned xids; -static struct sess * +struct sess * vca_accept_sess(int fd) { socklen_t l; @@ -90,7 +104,7 @@ vca_accept_sess(int fd) return (sp); } -static void +void vca_handover(struct sess *sp, int bad) { @@ -107,451 +121,24 @@ vca_handover(struct sess *sp, int bad) WRK_QueueSession(sp); } -/*====================================================================*/ -#ifdef ACCEPTOR_USE_POLL - -#include - -static struct pollfd *pollfd; -static unsigned npoll; - -static int pipes[2]; - -static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); - -/*--------------------------------------------------------------------*/ - -static void -vca_pollspace(int fd) -{ - struct pollfd *p; - unsigned u, v; - - if (fd < npoll) - return; - if (npoll == 0) - npoll = 16; - for (u = npoll; fd >= u; ) - u += u; - VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u); - p = realloc(pollfd, u * sizeof *p); - assert(p != NULL); - memset(p + npoll, 0, (u - npoll) * sizeof *p); - for (v = npoll ; v <= u; v++) - p->fd = -1; - pollfd = p; - npoll = u; -} - -/*--------------------------------------------------------------------*/ - -static void -vca_poll(int fd) -{ - vca_pollspace(fd); - pollfd[fd].fd = fd; - pollfd[fd].events = POLLIN; -} - -static void -vca_unpoll(int fd) -{ - vca_pollspace(fd); - pollfd[fd].fd = -1; - pollfd[fd].events = 0; -} - -/*--------------------------------------------------------------------*/ - -static void -vca_rcvhdev(struct sess *sp) -{ - - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - TAILQ_INSERT_TAIL(&sesshead, sp, list); - vca_poll(sp->fd); -} - -static void -accept_f(int fd) -{ - struct sess *sp; - - sp = vca_accept_sess(fd); - if (sp == NULL) - return; - - http_RecvPrep(sp->http); - vca_rcvhdev(sp); -} - -static void * -vca_main(void *arg) -{ - unsigned u, v; - struct sess *sp, *sp2; - struct timespec t; - int i; - - (void)arg; - - AZ(pipe(pipes)); - vca_poll(pipes[0]); - - if (heritage.socket >= 0) - vca_poll(heritage.socket); - - while (1) { - v = poll(pollfd, npoll, 5000); - if (v && pollfd[pipes[0]].revents) { - v--; - i = read(pipes[0], &sp, sizeof sp); - assert(i == sizeof sp); - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - if (http_RecvPrepAgain(sp->http)) - vca_handover(sp, 0); - else - vca_rcvhdev(sp); - } - if (heritage.socket >= 0 && - pollfd[heritage.socket].revents) { - accept_f(heritage.socket); - v--; - } - clock_gettime(CLOCK_MONOTONIC, &t); - TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) { - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - if (pollfd[sp->fd].revents) { - v--; - i = http_RecvSome(sp->fd, sp->http); - if (i < 0) - continue; - - vca_unpoll(sp->fd); - TAILQ_REMOVE(&sesshead, sp, list); - vca_handover(sp, i); - continue; - } - if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) { - TAILQ_REMOVE(&sesshead, sp, list); - vca_unpoll(sp->fd); - vca_close_session(sp, "timeout"); - vca_return_session(sp); - continue; - } - if (v == 0) - break; - } - } - - INCOMPL(); -} - /*--------------------------------------------------------------------*/ void -vca_return_session(struct sess *sp) -{ - - if (sp->fd < 0) { - SES_Delete(sp); - return; - } - (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); - VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); - assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); -} - -#endif /* ACCEPTOR_USE_POLL */ -/*====================================================================*/ -#ifdef ACCEPTOR_USE_EPOLL - -#include - -static int epfd = -1; -static int pipes[2]; - -static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); - -static void -vca_add(int fd, void *data) -{ - struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } }; - AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)); -} - -static void -vca_del(int fd) -{ - AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)); -} - -static void -vca_rcvhdev(struct sess *sp) -{ - - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - TAILQ_INSERT_TAIL(&sesshead, sp, list); - vca_add(sp->fd, sp); -} - -static void -accept_f(int fd) -{ - struct sess *sp; - - sp = vca_accept_sess(fd); - if (sp == NULL) - return; - http_RecvPrep(sp->http); - vca_rcvhdev(sp); -} - -static void * -vca_main(void *arg) -{ - struct epoll_event ev; - struct timespec t; - struct sess *sp, *sp2; - int i; - - (void)arg; - - epfd = epoll_create(16); - assert(epfd >= 0); - - AZ(pipe(pipes)); - vca_add(pipes[0], pipes); - - if (heritage.socket >= 0) - vca_add(heritage.socket, accept_f); - - while (1) { - if (epoll_wait(epfd, &ev, 1, 5000) > 0) { - if (ev.data.ptr == pipes) { - i = read(pipes[0], &sp, sizeof sp); - assert(i == sizeof sp); - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - if (http_RecvPrepAgain(sp->http)) - vca_handover(sp, 0); - else - vca_rcvhdev(sp); - } else if (ev.data.ptr == accept_f) { - accept_f(heritage.socket); - } else { - CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC); - i = http_RecvSome(sp->fd, sp->http); - if (i != -1) { - TAILQ_REMOVE(&sesshead, sp, list); - vca_del(sp->fd); - vca_handover(sp, i); - } - } - } - /* check for timeouts */ - clock_gettime(CLOCK_MONOTONIC, &t); - TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) { - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) { - TAILQ_REMOVE(&sesshead, sp, list); - vca_del(sp->fd); - vca_close_session(sp, "timeout"); - vca_return_session(sp); - continue; - } - } - } - - INCOMPL(); -} - -/*--------------------------------------------------------------------*/ - -void -vca_return_session(struct sess *sp) -{ - - if (sp->fd < 0) { - SES_Delete(sp); - return; - } - (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); - VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); - assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); -} - -#endif /* ACCEPTOR_USE_EPOLL */ -/*====================================================================*/ -#ifdef ACCEPTOR_USE_KQUEUE - -#include - -static int kq = -1; - -static void -vca_kq_sess(struct sess *sp, int arm) +vca_close_session(struct sess *sp, const char *why) { - struct kevent ke[2]; - int i, j, arm2; - CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); - memset(ke, 0, sizeof ke); - if (arm == EV_ADD || arm == EV_ENABLE) { - assert(sp->kqa == 0); - sp->kqa = 1; - arm2 = EV_ADD; - } else { - assert(sp->kqa == 1); - sp->kqa = 0; - arm2 = EV_DELETE; - } - j = 0; - EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2, - 0, params->sess_timeout * 1000, sp); + VSL(SLT_SessionClose, sp->fd, why); if (sp->fd >= 0) - EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp); - - i = kevent(kq, ke, j, NULL, 0, NULL); - assert(i == 0); -} - -static struct sess * -vca_kev(struct kevent *kp) -{ - int i; - struct sess *sp; - - if (kp->udata == vca_accept_sess) { - while (kp->data-- > 0) { - sp = vca_accept_sess(kp->ident); - if (sp == NULL) - return (NULL); - clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); - http_RecvPrep(sp->http); - vca_kq_sess(sp, EV_ADD); - } - return (NULL); - } - if (kp->udata == NULL) { - VSL(SLT_Debug, 0, - "KQ RACE %s flags %x fflags %x data %x", - kp->filter == EVFILT_READ ? "R" : "T", - kp->flags, kp->fflags, kp->data); - return (NULL); - } - CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC); - if (sp->kqa == 0) { - VSL(SLT_Debug, sp->id, - "KQ %s flags %x fflags %x data %x", - kp->filter == EVFILT_READ ? "R" : "T", - kp->flags, kp->fflags, kp->data); - return (NULL); - } - if (kp->filter == EVFILT_READ) { - if (kp->data > 0) { - i = http_RecvSome(sp->fd, sp->http); - switch (i) { - case -1: - return (NULL); - case 0: - vca_kq_sess(sp, EV_DISABLE); - vca_handover(sp, i); - return (NULL); /* ?? */ - case 1: - vca_close_session(sp, "overflow"); - break; - case 2: - vca_close_session(sp, "no request"); - break; - default: - INCOMPL(); - } - return (sp); - } - if (kp->flags == EV_EOF) { - vca_close_session(sp, "EOF"); - return (sp); - } - INCOMPL(); - } - if (kp->filter == EVFILT_TIMER) { - vca_close_session(sp, "timeout"); - return (sp); - } - INCOMPL(); -} - - -#define NKEV 100 - -static void * -vca_main(void *arg) -{ - struct kevent ke[NKEV], *kp; - int i, j, n; - struct sess *sp; - - (void)arg; - - kq = kqueue(); - assert(kq >= 0); - - - assert(heritage.socket >= 0); - EV_SET(&ke[0], heritage.socket, - EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess); - AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL)); - - while (1) { - n = kevent(kq, NULL, 0, ke, NKEV, NULL); - assert(n >= 1 && n <= NKEV); - for (kp = ke, j = 0; j < n; j++, kp++) { - sp = vca_kev(kp); - if (sp != NULL) { - vca_kq_sess(sp, EV_DELETE); - SES_Delete(sp); - for (i = j; i < n; i++) - if (ke[i].udata == sp) - ke[i].udata = NULL; - } - } - } - INCOMPL(); + AZ(close(sp->fd)); + sp->fd = -1; } -/*--------------------------------------------------------------------*/ - void vca_return_session(struct sess *sp) { - if (sp->fd < 0) { - SES_Delete(sp); - return; - } - (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); - VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); - if (http_RecvPrepAgain(sp->http)) - vca_handover(sp, 0); - else - vca_kq_sess(sp, EV_ENABLE); -} - -#endif /* ACCEPTOR_USE_KQUEUE */ -/*====================================================================*/ - -/*--------------------------------------------------------------------*/ - -void -vca_close_session(struct sess *sp, const char *why) -{ - - VSL(SLT_SessionClose, sp->fd, why); - if (sp->fd >= 0) - AZ(close(sp->fd)); - sp->fd = -1; + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + vca_acceptors[0]->recycle(sp); } /*--------------------------------------------------------------------*/ @@ -560,7 +147,9 @@ void VCA_Init(void) { - AZ(pthread_create(&vca_thread, NULL, vca_main, NULL)); srandomdev(); xids = random(); + + /* XXX: Add selector mechanism at some point */ + vca_acceptors[0]->init(); } diff --git a/varnish-cache/bin/varnishd/cache_acceptor_epoll.c b/varnish-cache/bin/varnishd/cache_acceptor_epoll.c new file mode 100644 index 00000000..befe654f --- /dev/null +++ b/varnish-cache/bin/varnishd/cache_acceptor_epoll.c @@ -0,0 +1,157 @@ +/* + * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $ + * + * XXX: We need to pass sessions back into the event engine when they are + * reused. Not sure what the most efficient way is for that. For now + * write the session pointer to a pipe which the event engine monitors. + */ + +#if defined(HAVE_EPOLL_CTL) + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#ifndef HAVE_SRANDOMDEV +#include "compat/srandomdev.h" +#endif + +#include "heritage.h" +#include "shmlog.h" +#include "cache.h" +#include "cache_acceptor.h" + +static pthread_t vca_epoll_thread; +static int epfd = -1; +static int pipes[2]; + +static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); + +static void +vca_add(int fd, void *data) +{ + struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } }; + AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)); +} + +static void +vca_del(int fd) +{ + AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL)); +} + +static void +vca_rcvhdev(struct sess *sp) +{ + + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); + TAILQ_INSERT_TAIL(&sesshead, sp, list); + vca_add(sp->fd, sp); +} + +static void +accept_f(int fd) +{ + struct sess *sp; + + sp = vca_accept_sess(fd); + if (sp == NULL) + return; + http_RecvPrep(sp->http); + vca_rcvhdev(sp); +} + +static void * +vca_main(void *arg) +{ + struct epoll_event ev; + struct timespec t; + struct sess *sp, *sp2; + int i; + + (void)arg; + + epfd = epoll_create(16); + assert(epfd >= 0); + + AZ(pipe(pipes)); + vca_add(pipes[0], pipes); + + if (heritage.socket >= 0) + vca_add(heritage.socket, accept_f); + + while (1) { + if (epoll_wait(epfd, &ev, 1, 5000) > 0) { + if (ev.data.ptr == pipes) { + i = read(pipes[0], &sp, sizeof sp); + assert(i == sizeof sp); + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + if (http_RecvPrepAgain(sp->http)) + vca_handover(sp, 0); + else + vca_rcvhdev(sp); + } else if (ev.data.ptr == accept_f) { + accept_f(heritage.socket); + } else { + CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC); + i = http_RecvSome(sp->fd, sp->http); + if (i != -1) { + TAILQ_REMOVE(&sesshead, sp, list); + vca_del(sp->fd); + vca_handover(sp, i); + } + } + } + /* check for timeouts */ + clock_gettime(CLOCK_MONOTONIC, &t); + TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) { + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) { + TAILQ_REMOVE(&sesshead, sp, list); + vca_del(sp->fd); + vca_close_session(sp, "timeout"); + vca_return_session(sp); + continue; + } + } + } + + INCOMPL(); +} + +/*--------------------------------------------------------------------*/ + +static void +vca_epoll_recycle(struct sess *sp) +{ + + if (sp->fd < 0) { + SES_Delete(sp); + return; + } + (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); +} + +static void +vca_epoll_init(void) +{ + AZ(pthread_create(&vca_epoll_thread, NULL, vca_main, NULL)); +} + +struct acceptor acceptor_epoll = { + .name = "epoll", + .init = vca_epoll_init, + .recycle = vca_epoll_recycle, +}; + +#endif /* defined(HAVE_EPOLL_CTL) */ diff --git a/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c b/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c new file mode 100644 index 00000000..4be68271 --- /dev/null +++ b/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c @@ -0,0 +1,195 @@ +/* + * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $ + * + * XXX: We need to pass sessions back into the event engine when they are + * reused. Not sure what the most efficient way is for that. For now + * write the session pointer to a pipe which the event engine monitors. + */ + +#if defined(HAVE_KQUEUE) + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#ifndef HAVE_SRANDOMDEV +#include "compat/srandomdev.h" +#endif + +#include "heritage.h" +#include "shmlog.h" +#include "cache.h" +#include "cache_acceptor.h" + +static pthread_t vca_kqueue_thread; +static int kq = -1; + +static void +vca_kq_sess(struct sess *sp, int arm) +{ + struct kevent ke[2]; + int i, j, arm2; + + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + memset(ke, 0, sizeof ke); + if (arm == EV_ADD || arm == EV_ENABLE) { + assert(sp->kqa == 0); + sp->kqa = 1; + arm2 = EV_ADD; + } else { + assert(sp->kqa == 1); + sp->kqa = 0; + arm2 = EV_DELETE; + } + j = 0; + EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2, + 0, params->sess_timeout * 1000, sp); + if (sp->fd >= 0) + EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp); + + i = kevent(kq, ke, j, NULL, 0, NULL); + assert(i == 0); +} + +static struct sess * +vca_kev(struct kevent *kp) +{ + int i; + struct sess *sp; + + if (kp->udata == vca_accept_sess) { + while (kp->data-- > 0) { + sp = vca_accept_sess(kp->ident); + if (sp == NULL) + return (NULL); + clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); + http_RecvPrep(sp->http); + vca_kq_sess(sp, EV_ADD); + } + return (NULL); + } + if (kp->udata == NULL) { + VSL(SLT_Debug, 0, + "KQ RACE %s flags %x fflags %x data %x", + kp->filter == EVFILT_READ ? "R" : "T", + kp->flags, kp->fflags, kp->data); + return (NULL); + } + CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC); + if (sp->kqa == 0) { + VSL(SLT_Debug, sp->id, + "KQ %s flags %x fflags %x data %x", + kp->filter == EVFILT_READ ? "R" : "T", + kp->flags, kp->fflags, kp->data); + return (NULL); + } + if (kp->filter == EVFILT_READ) { + if (kp->data > 0) { + i = http_RecvSome(sp->fd, sp->http); + switch (i) { + case -1: + return (NULL); + case 0: + vca_kq_sess(sp, EV_DISABLE); + vca_handover(sp, i); + return (NULL); /* ?? */ + case 1: + vca_close_session(sp, "overflow"); + break; + case 2: + vca_close_session(sp, "no request"); + break; + default: + INCOMPL(); + } + return (sp); + } + if (kp->flags == EV_EOF) { + vca_close_session(sp, "EOF"); + return (sp); + } + INCOMPL(); + } + if (kp->filter == EVFILT_TIMER) { + vca_close_session(sp, "timeout"); + return (sp); + } + INCOMPL(); +} + + +#define NKEV 100 + +static void * +vca_main(void *arg) +{ + struct kevent ke[NKEV], *kp; + int i, j, n; + struct sess *sp; + + (void)arg; + + kq = kqueue(); + assert(kq >= 0); + + + assert(heritage.socket >= 0); + EV_SET(&ke[0], heritage.socket, + EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess); + AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL)); + + while (1) { + n = kevent(kq, NULL, 0, ke, NKEV, NULL); + assert(n >= 1 && n <= NKEV); + for (kp = ke, j = 0; j < n; j++, kp++) { + sp = vca_kev(kp); + if (sp != NULL) { + vca_kq_sess(sp, EV_DELETE); + SES_Delete(sp); + for (i = j; i < n; i++) + if (ke[i].udata == sp) + ke[i].udata = NULL; + } + } + } + INCOMPL(); +} + +/*--------------------------------------------------------------------*/ + +static void +vca_kqueue_recycle(struct sess *sp) +{ + + if (sp->fd < 0) { + SES_Delete(sp); + return; + } + (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + if (http_RecvPrepAgain(sp->http)) + vca_handover(sp, 0); + else + vca_kq_sess(sp, EV_ENABLE); +} + +static void +vca_kqueue_init(void) +{ + AZ(pthread_create(&vca_kqueue_thread, NULL, vca_main, NULL)); +} + +struct acceptor acceptor_kqueue = { + .name = "kqueue", + .init = vca_kqueue_init, + .recycle = vca_kqueue_recycle, +}; + +#endif /* defined(HAVE_KQUEUE) */ diff --git a/varnish-cache/bin/varnishd/cache_acceptor_poll.c b/varnish-cache/bin/varnishd/cache_acceptor_poll.c new file mode 100644 index 00000000..3eebfe5c --- /dev/null +++ b/varnish-cache/bin/varnishd/cache_acceptor_poll.c @@ -0,0 +1,195 @@ +/* + * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $ + * + * XXX: We need to pass sessions back into the event engine when they are + * reused. Not sure what the most efficient way is for that. For now + * write the session pointer to a pipe which the event engine monitors. + */ + +#if defined(HAVE_POLL) + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#ifndef HAVE_SRANDOMDEV +#include "compat/srandomdev.h" +#endif + +#include "heritage.h" +#include "shmlog.h" +#include "cache.h" +#include "cache_acceptor.h" + +static pthread_t vca_poll_thread; +static struct pollfd *pollfd; +static unsigned npoll; + +static int pipes[2]; + +static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead); + +/*--------------------------------------------------------------------*/ + +static void +vca_pollspace(int fd) +{ + struct pollfd *p; + unsigned u, v; + + if (fd < npoll) + return; + if (npoll == 0) + npoll = 16; + for (u = npoll; fd >= u; ) + u += u; + VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u); + p = realloc(pollfd, u * sizeof *p); + assert(p != NULL); + memset(p + npoll, 0, (u - npoll) * sizeof *p); + for (v = npoll ; v <= u; v++) + p->fd = -1; + pollfd = p; + npoll = u; +} + +/*--------------------------------------------------------------------*/ + +static void +vca_poll(int fd) +{ + vca_pollspace(fd); + pollfd[fd].fd = fd; + pollfd[fd].events = POLLIN; +} + +static void +vca_unpoll(int fd) +{ + vca_pollspace(fd); + pollfd[fd].fd = -1; + pollfd[fd].events = 0; +} + +/*--------------------------------------------------------------------*/ + +static void +vca_rcvhdev(struct sess *sp) +{ + + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + clock_gettime(CLOCK_MONOTONIC, &sp->t_idle); + TAILQ_INSERT_TAIL(&sesshead, sp, list); + vca_poll(sp->fd); +} + +static void +accept_f(int fd) +{ + struct sess *sp; + + sp = vca_accept_sess(fd); + if (sp == NULL) + return; + + http_RecvPrep(sp->http); + vca_rcvhdev(sp); +} + +static void * +vca_main(void *arg) +{ + unsigned u, v; + struct sess *sp, *sp2; + struct timespec t; + int i; + + (void)arg; + + AZ(pipe(pipes)); + vca_poll(pipes[0]); + + if (heritage.socket >= 0) + vca_poll(heritage.socket); + + while (1) { + v = poll(pollfd, npoll, 5000); + if (v && pollfd[pipes[0]].revents) { + v--; + i = read(pipes[0], &sp, sizeof sp); + assert(i == sizeof sp); + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + if (http_RecvPrepAgain(sp->http)) + vca_handover(sp, 0); + else + vca_rcvhdev(sp); + } + if (heritage.socket >= 0 && + pollfd[heritage.socket].revents) { + accept_f(heritage.socket); + v--; + } + clock_gettime(CLOCK_MONOTONIC, &t); + TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) { + CHECK_OBJ_NOTNULL(sp, SESS_MAGIC); + if (pollfd[sp->fd].revents) { + v--; + i = http_RecvSome(sp->fd, sp->http); + if (i < 0) + continue; + + vca_unpoll(sp->fd); + TAILQ_REMOVE(&sesshead, sp, list); + vca_handover(sp, i); + continue; + } + if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) { + TAILQ_REMOVE(&sesshead, sp, list); + vca_unpoll(sp->fd); + vca_close_session(sp, "timeout"); + vca_return_session(sp); + continue; + } + if (v == 0) + break; + } + } + + INCOMPL(); +} + +/*--------------------------------------------------------------------*/ + +static void +vca_poll_recycle(struct sess *sp) +{ + + if (sp->fd < 0) { + SES_Delete(sp); + return; + } + (void)clock_gettime(CLOCK_REALTIME, &sp->t_open); + VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port); + assert(sizeof sp == write(pipes[1], &sp, sizeof sp)); +} + +static void +vca_poll_init(void) +{ + AZ(pthread_create(&vca_poll_thread, NULL, vca_main, NULL)); +} + +struct acceptor acceptor_poll = { + .name = "poll", + .init = vca_poll_init, + .recycle = vca_poll_recycle, +}; + +#endif /* defined(HAVE_POLL) */ -- 2.39.5