* write the session pointer to a pipe which the event engine monitors.
*/
-#define ACCEPTOR_USE_KQUEUE
-#undef ACCEPTOR_USE_LIBEVENT
+#define ACCEPTOR_USE_POLL
+#undef ACCEPTOR_USE_KQUEUE
#include <stdio.h>
#include <errno.h>
WRK_QueueSession(sp);
}
-#ifdef ACCEPTOR_USE_LIBEVENT
+/*====================================================================*/
+#ifdef ACCEPTOR_USE_POLL
-static struct event_base *evb;
-static struct event pipe_e;
-static int pipes[2];
+#include <poll.h>
+
+static struct pollfd *pollfd;
+static unsigned npoll;
-static struct event tick_e;
-static struct timeval tick_rate;
+static int pipes[2];
-static struct event accept_e[2 * HERITAGE_NSOCKS];
static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
/*--------------------------------------------------------------------*/
static void
-vca_tick(int a, short b, void *c)
+vca_pollspace(int fd)
{
- struct sess *sp, *sp2;
- struct timespec t;
+ struct pollfd *p;
+ unsigned u, v;
- (void)a;
- (void)b;
- (void)c;
- AZ(evtimer_add(&tick_e, &tick_rate));
- clock_gettime(CLOCK_MONOTONIC, &t);
- TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (sp->t_idle.tv_sec + 30 < t.tv_sec) {
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_close_session(sp, "timeout");
- vca_return_session(sp);
- }
- }
+ if (fd < npoll)
+ return;
+ if (npoll == 0)
+ npoll = 16;
+ for (u = npoll; fd >= u; )
+ u += u;
+ VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
+ p = realloc(pollfd, u * sizeof *p);
+ assert(p != NULL);
+ memset(p + npoll, 0, (u - npoll) * sizeof *p);
+ for (v = npoll ; v <= u; v++)
+ p->fd = -1;
+ pollfd = p;
+ npoll = u;
}
+/*--------------------------------------------------------------------*/
+
static void
-vca_rcvhd_f(int fd, short event, void *arg)
+vca_poll(int fd)
{
- struct sess *sp;
- int i;
-
- (void)event;
-
- CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
- i = http_RecvSome(fd, sp->http);
- if (i < 0)
- return;
+ vca_pollspace(fd);
+ pollfd[fd].fd = fd;
+ pollfd[fd].events = POLLIN;
+}
- event_del(&sp->ev);
- TAILQ_REMOVE(&sesshead, sp, list);
- vca_handover(sp, i);
+static void
+vca_unpoll(int fd)
+{
+ vca_pollspace(fd);
+ pollfd[fd].fd = -1;
+ pollfd[fd].events = 0;
}
+/*--------------------------------------------------------------------*/
+
static void
vca_rcvhdev(struct sess *sp)
{
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
TAILQ_INSERT_TAIL(&sesshead, sp, list);
- event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp);
- AZ(event_base_set(evb, &sp->ev));
- AZ(event_add(&sp->ev, NULL)); /* XXX: timeout */
-}
-
-static void
-pipe_f(int fd, short event, void *arg)
-{
- struct sess *sp;
- int i;
-
- (void)event;
- (void)arg;
- i = read(fd, &sp, sizeof sp);
- assert(i == sizeof sp);
- if (http_RecvPrepAgain(sp->http)) {
- vca_handover(sp, 0);
- return;
- }
- vca_rcvhdev(sp);
+ vca_poll(sp->fd);
}
static void
-accept_f(int fd, short event, void *arg)
+accept_f(int fd)
{
struct sess *sp;
- (void)event;
- (void)arg;
-
sp = vca_accept_sess(fd);
if (sp == NULL)
return;
static void *
vca_main(void *arg)
{
- unsigned u;
- struct event *ep;
+ unsigned u, v;
+ struct sess *sp, *sp2;
+ struct timespec t;
+ int i;
(void)arg;
- tick_rate.tv_sec = 1;
- tick_rate.tv_usec = 0;
AZ(pipe(pipes));
- evb = event_init();
- assert(evb != NULL);
-
- event_set(&pipe_e, pipes[0], EV_READ | EV_PERSIST, pipe_f, NULL);
- AZ(event_base_set(evb, &pipe_e));
- AZ(event_add(&pipe_e, NULL));
-
- evtimer_set(&tick_e, vca_tick, NULL);
- AZ(event_base_set(evb, &tick_e));
-
- AZ(evtimer_add(&tick_e, &tick_rate));
+ vca_poll(pipes[0]);
- ep = accept_e;
for (u = 0; u < HERITAGE_NSOCKS; u++) {
- if (heritage.sock_local[u] >= 0) {
- event_set(ep, heritage.sock_local[u],
- EV_READ | EV_PERSIST,
- accept_f, NULL);
- AZ(event_base_set(evb, ep));
- AZ(event_add(ep, NULL));
- ep++;
+ if (heritage.sock_local[u] >= 0)
+ vca_poll(heritage.sock_local[u]);
+ if (heritage.sock_remote[u] >= 0)
+ vca_poll(heritage.sock_local[u]);
+ }
+
+ while (1) {
+ v = poll(pollfd, npoll, 5000);
+ if (v && pollfd[pipes[0]].revents) {
+ v--;
+ i = read(pipes[0], &sp, sizeof sp);
+ assert(i == sizeof sp);
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (http_RecvPrepAgain(sp->http))
+ vca_handover(sp, 0);
+ else
+ vca_rcvhdev(sp);
}
- if (heritage.sock_remote[u] >= 0) {
- event_set(ep, heritage.sock_remote[u],
- EV_READ | EV_PERSIST,
- accept_f, NULL);
- AZ(event_base_set(evb, ep));
- AZ(event_add(ep, NULL));
- ep++;
+ for (u = 0; v && u < HERITAGE_NSOCKS; u++) {
+ if (heritage.sock_local[u] >= 0 &&
+ pollfd[heritage.sock_local[u]].revents) {
+ accept_f(heritage.sock_local[u]);
+ v--;
+ }
+ if (heritage.sock_remote[u] >= 0 &&
+ pollfd[heritage.sock_remote[u]].revents) {
+ accept_f(heritage.sock_local[u]);
+ v--;
+ }
+ }
+ clock_gettime(CLOCK_MONOTONIC, &t);
+ TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (pollfd[sp->fd].revents) {
+ v--;
+ i = http_RecvSome(sp->fd, sp->http);
+ if (i < 0)
+ continue;
+
+ vca_unpoll(sp->fd);
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_handover(sp, i);
+ continue;
+ }
+ if (sp->t_idle.tv_sec + 5 < t.tv_sec) {
+ TAILQ_REMOVE(&sesshead, sp, list);
+ vca_unpoll(sp->fd);
+ vca_close_session(sp, "timeout");
+ vca_return_session(sp);
+ continue;
+ }
+ if (v == 0)
+ break;
}
}
- AZ(event_base_loop(evb, 0));
INCOMPL();
}
assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
}
-#endif /* ACCEPTOR_USE_LIBEVENT */
-
+#endif /* ACCEPTOR_USE_POLL */
+/*====================================================================*/
#ifdef ACCEPTOR_USE_KQUEUE
#include <sys/event.h>
}
#endif /* ACCEPTOR_USE_KQUEUE */
+/*====================================================================*/
/*--------------------------------------------------------------------*/