if (bad) {
vca_close_session(sp,
bad == 1 ? "overflow" : "no request");
- vca_return_session(sp);
+ SES_Delete(sp);
return;
}
sp->step = STP_RECV;
vca_kq_sess(struct sess *sp, int arm)
{
struct kevent ke[2];
- int i;
+ int i, j;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
memset(ke, 0, sizeof ke);
- EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp);
- EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0,
- params->sess_timeout * 1000, sp);
- i = kevent(kq, ke, 2, NULL, 0, NULL);
- if (arm == EV_ADD)
- assert(i == 0);
- else
- assert(i == 0 || errno == ENOENT);
+ j = 0;
+ /* close(2) automatically removes the EVFILT_READ event */
+ if (sp->fd >= 0)
+ EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+ EV_SET(&ke[j++], sp->id, EVFILT_TIMER | EV_ONESHOT,
+ arm == EV_ADD || arm == EV_ENABLE ? EV_ADD : EV_DELETE,
+ 0, params->sess_timeout * 1000, sp);
+ if (arm == EV_ADD || arm == EV_ENABLE)
+ sp->kqa = 1;
+ else
+ sp->kqa = 0;
+ i = kevent(kq, ke, j, NULL, 0, NULL);
+ assert(i == 0);
}
-static void
-accept_f(int fd, int nq)
+static struct sess *
+vca_kev(struct kevent *kp)
{
+ int i;
struct sess *sp;
- while (nq-- > 0) {
- sp = vca_accept_sess(fd);
- if (sp == NULL)
- return;
- clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
- http_RecvPrep(sp->http);
- vca_kq_sess(sp, EV_ADD);
- sp->kqa = 1;
+ if (kp->udata == vca_accept_sess) {
+ assert(kp->data > 0);
+ while (kp->data-- > 0) {
+ sp = vca_accept_sess(kp->ident);
+ if (sp == NULL)
+ return (NULL);
+ clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+ http_RecvPrep(sp->http);
+ vca_kq_sess(sp, EV_ADD);
+ }
+ return (NULL);
}
+ if (kp->udata == NULL) {
+ VSL(SLT_Debug, 0,
+ "KQ RACE %s flags %x fflags %x data %x",
+ kp->filter == EVFILT_READ ? "R" : "T",
+ kp->flags, kp->fflags, kp->data);
+ return (NULL);
+ }
+ CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
+ if (sp->kqa == 0) {
+ VSL(SLT_Debug, sp->fd,
+ "KQ %s flags %x fflags %x data %x",
+ kp->filter == EVFILT_READ ? "R" : "T",
+ kp->flags, kp->fflags, kp->data);
+ return (NULL);
+ }
+ if (kp->filter == EVFILT_READ) {
+ if (kp->data > 0) {
+ i = http_RecvSome(sp->fd, sp->http);
+ switch (i) {
+ case -1:
+ return (NULL);
+ case 0:
+ vca_kq_sess(sp, EV_DISABLE);
+ vca_handover(sp, i);
+ return (NULL); /* ?? */
+ case 1:
+ vca_close_session(sp, "overflow");
+ break;
+ case 2:
+ vca_close_session(sp, "no request");
+ break;
+ default:
+ INCOMPL();
+ }
+ return (sp);
+ }
+ if (kp->flags == EV_EOF) {
+ vca_close_session(sp, "EOF");
+ return (sp);
+ }
+ INCOMPL();
+ }
+ if (kp->filter == EVFILT_TIMER) {
+ vca_close_session(sp, "timeout");
+ return (sp);
+ }
+ INCOMPL();
}
-#define NKEV 20
+
+#define NKEV 100
static void *
vca_main(void *arg)
{
- struct kevent ke[10], *kp;
+ struct kevent ke[NKEV], *kp;
int i, j, n;
struct sess *sp;
assert(kq >= 0);
- if (heritage.socket >= 0) {
- memset(&ke[0], 0, sizeof ke[0]);
- EV_SET(&ke[0], heritage.socket,
- EVFILT_READ, EV_ADD, 0, 0, accept_f);
- AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
- }
+ assert(heritage.socket >= 0);
+ EV_SET(&ke[0], heritage.socket,
+ EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
+ AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
while (1) {
n = kevent(kq, NULL, 0, ke, NKEV, NULL);
- assert(n >= 1);
- VSL(SLT_Debug, 0, "KQ %d", n);
+ assert(n >= 1 && n <= NKEV);
for (kp = ke, j = 0; j < n; j++, kp++) {
- if (kp->udata == accept_f) {
- VSL(SLT_Debug, kp->ident,
- "KQ accept data %d\n", kp->data);
- accept_f(kp->ident, kp->data);
- continue;
- }
- CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
- if (sp->kqa == 0 || 1) {
- VSL(SLT_Debug, sp->fd,
- "KQ %d/%d %s flags %x fflags %x data %x\n",
- j, n,
- kp->filter == EVFILT_READ ? "R" : "T",
- kp->flags, kp->fflags, kp->data);
- assert (sp->kqa);
- }
- if (kp->filter == EVFILT_READ) {
- if (kp->data > 0) {
- i = http_RecvSome(sp->fd, sp->http);
- if (i == -1)
- continue;
- vca_kq_sess(sp, EV_DISABLE);
- sp->kqa = 0;
- vca_handover(sp, i);
- } else if (kp->flags == EV_EOF) {
- sp->kqa = 0;
- vca_close_session(sp, "no request");
- SES_Delete(sp);
- } else
- INCOMPL();
- continue;
- }
- if (kp->filter == EVFILT_TIMER) {
- vca_kq_sess(sp, EV_DISABLE);
- sp->kqa = 0;
- vca_close_session(sp, "timeout");
+ sp = vca_kev(kp);
+ if (sp != NULL) {
+ vca_kq_sess(sp, EV_DELETE);
SES_Delete(sp);
- continue;
- }
- INCOMPL();
+ for (i = j; i < n; i++)
+ if (ke[i].udata == sp)
+ ke[i].udata = NULL;
+ }
}
}
-
INCOMPL();
}
VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
if (http_RecvPrepAgain(sp->http))
vca_handover(sp, 0);
- else {
- sp->kqa = 1;
+ else
vca_kq_sess(sp, EV_ENABLE);
- }
}
#endif /* ACCEPTOR_USE_KQUEUE */