]> err.no Git - varnish/commitdiff
Overhaul kqueue acceptor in light of todays learnings.
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 21 Aug 2006 18:55:24 +0000 (18:55 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Mon, 21 Aug 2006 18:55:24 +0000 (18:55 +0000)
Use the pipe trick to inject sessions into the system, as far as I
can tell it is cheaper because of the low rate it happens and the
high rate of mutex operations avoided.

Ignore the timer event, but purge the list every time we wake up
to reduce lumpyness of timeout'ing.

Centralize the polling of a session so we don't have the same two
messages spread out all over the place.

Centralize the acceptor thread and send things directly to the worker
thread, leaving only the session-herder in the split out files.

poll & epoll not yet updated accordingly.

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@875 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishd/cache.h
varnish-cache/bin/varnishd/cache_acceptor.c
varnish-cache/bin/varnishd/cache_acceptor.h
varnish-cache/bin/varnishd/cache_acceptor_kqueue.c

index 455c1f76a0922850fb36d6fb16ff305f0b1db70b..499132ba84a8f56d88b1de3a7fbf9a8466004dda 100644 (file)
@@ -272,7 +272,6 @@ struct sess {
 
        struct workreq          workreq;
        struct acct             acct;
-       unsigned                kqa;
 };
 
 struct backend {
index 4ff494fc9e0e5787c0280c2145cfb6c7a237d440..9f191975e65a14228845af4edbb83fb4bc9e66d0 100644 (file)
@@ -40,6 +40,7 @@ static struct acceptor *vca_acceptors[] = {
 };
 
 static unsigned                xids;
+static pthread_t       vca_thread_acct;
 
 struct sess *
 vca_accept_sess(int fd)
@@ -128,6 +129,23 @@ vca_handfirst(struct sess *sp)
 
 /*--------------------------------------------------------------------*/
 
+int
+vca_pollsession(struct sess *sp)
+{
+       int i;
+
+       i = http_RecvSome(sp->fd, sp->http);
+       if (i < 1)
+               return (i);
+       if (i == 1)
+               vca_close_session(sp, "overflow");
+       else if (i == 2)
+               vca_close_session(sp, "no request");
+       return (1);
+}
+
+/*--------------------------------------------------------------------*/
+
 void
 vca_close_session(struct sess *sp, const char *why)
 {
@@ -143,9 +161,33 @@ vca_return_session(struct sess *sp)
 {
 
        CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+       if (sp->fd >= 0) {
+               VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+               (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+               if (http_RecvPrepAgain(sp->http))
+                       vca_handover(sp, 0);
+       }
        vca_acceptors[0]->recycle(sp);
 }
 
+/*--------------------------------------------------------------------*/
+
+static void *
+vca_acct(void *arg)
+{
+       struct sess *sp;
+
+       (void)arg;
+       while (1) {
+               sp = vca_accept_sess(heritage.socket);
+               if (sp == NULL)
+                       continue;
+               http_RecvPrep(sp->http);
+               vca_handfirst(sp);
+       }
+}
+
+
 /*--------------------------------------------------------------------*/
 
 void
@@ -161,4 +203,5 @@ VCA_Init(void)
                exit (2);
        }
        vca_acceptors[0]->init();
+       AZ(pthread_create(&vca_thread_acct, NULL, vca_acct, NULL));
 }
index 5746f4f4927b30a219415377be031d97265b9e9d..9813674ccad351f0867e417666cff321e2d19eda 100644 (file)
@@ -29,4 +29,5 @@ extern struct acceptor acceptor_poll;
 struct sess *vca_accept_sess(int fd);
 void vca_handover(struct sess *sp, int bad);
 void vca_handfirst(struct sess *sp);
+int vca_pollsession(struct sess *sp);
 
index 488ec41632480e897f8fe918be03cd1d68bdf7aa..1735980c4b73c1c5165e86ac1361a5dfedd7bbf9 100644 (file)
 #include <sys/socket.h>
 #include <sys/event.h>
 
-#ifndef HAVE_SRANDOMDEV
-#include "compat/srandomdev.h"
-#endif
-
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
 #include "cache_acceptor.h"
 
-static pthread_t vca_kqueue_thread1;
-static pthread_t vca_kqueue_thread2;
+static pthread_t vca_kqueue_thread;
 static int kq = -1;
 
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+static int pipes[2];
+
 #define NKEV   100
 
 static void
 vca_kq_sess(struct sess *sp, int arm)
 {
-       struct kevent ke[2];
-       int i, j, arm2;
+       struct kevent ke;
+       int i;
 
        CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-       memset(ke, 0, sizeof ke);
-       if (arm == EV_ADD || arm == EV_ENABLE) {
-               assert(sp->kqa == 0);
-               sp->kqa = 1;
-               arm2 = EV_ADD;
-       } else  {
-               assert(sp->kqa == 1);
-               sp->kqa = 0;
-               arm2 = EV_DELETE;
-       }
-       j = 0;
-       EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
-           0, params->sess_timeout * 1000, sp);
-       if (sp->fd >= 0)
-               EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
-       i = kevent(kq, ke, j, NULL, 0, NULL);
+       if (sp->fd < 0)
+               return;
+       EV_SET(&ke, sp->fd, EVFILT_READ, arm, 0, 0, sp);
+       i = kevent(kq, &ke, 1, NULL, 0, NULL);
        assert(i == 0);
 }
 
-static struct sess *
+static void
 vca_kev(struct kevent *kp)
 {
        int i;
        struct sess *sp;
 
-       if (kp->udata == NULL) {
-               VSL(SLT_Debug, 0,
-                   "KQ RACE %s flags %x fflags %x data %x",
-                   kp->filter == EVFILT_READ ? "R" : "T",
-                   kp->flags, kp->fflags, kp->data);
-               return (NULL);
+       assert(kp->udata != NULL);
+       if (kp->udata == pipes) {
+               while (kp->data > 0) {
+                       i = read(pipes[0], &sp, sizeof sp);
+                       assert(i == sizeof sp);
+                       kp->data -= i;
+                       CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+                       TAILQ_INSERT_TAIL(&sesshead, sp, list);
+                       vca_kq_sess(sp, EV_ADD);
+               }
+               return;
        }
        CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
-       if (sp->kqa == 0) {
-               VSL(SLT_Debug, sp->id,
-                   "KQ %s flags %x fflags %x data %x",
-                   kp->filter == EVFILT_READ ? "R" : "T",
-                   kp->flags, kp->fflags, kp->data);
-               return (NULL);
-       }
-       if (kp->filter == EVFILT_READ) {
-               if (kp->data > 0) {
-                       i = http_RecvSome(sp->fd, sp->http);
-                       switch (i) {
-                       case -1:
-                               return (NULL);
-                       case 0:
-                               vca_kq_sess(sp, EV_DISABLE);
-                               vca_handover(sp, i);
-                               return (NULL);   /* ?? */
-                       case 1:
-                               vca_close_session(sp, "overflow");
-                               break;
-                       case 2:
-                               vca_close_session(sp, "no request");
-                               break;
-                       default:
-                               INCOMPL();
-                       }
-                       return (sp);
-               }
-               if (kp->flags == EV_EOF) {
-                       vca_close_session(sp, "EOF");
-                       return (sp);
+       if (kp->data > 0) {
+               i = vca_pollsession(sp);
+               if (i == -1)
+                       return;
+               TAILQ_REMOVE(&sesshead, sp, list);
+               if (i == 0) {
+                       vca_kq_sess(sp, EV_DELETE);
+                       vca_handover(sp, i);
+               } else {
+                       SES_Delete(sp);
                }
-               INCOMPL();
+               return;
        }
-       if (kp->filter == EVFILT_TIMER) {
-               vca_close_session(sp, "timeout");
-               return (sp);
+       if (kp->flags == EV_EOF) {
+               TAILQ_REMOVE(&sesshead, sp, list);
+               vca_close_session(sp, "EOF");
+               SES_Delete(sp);
+               return;
        }
        INCOMPL();
 }
 
+/*--------------------------------------------------------------------*/
 
 static void *
 vca_kqueue_main(void *arg)
 {
        struct kevent ke[NKEV], *kp;
        int i, j, n;
+       struct timespec ts;
        struct sess *sp;
 
        (void)arg;
@@ -129,36 +102,36 @@ vca_kqueue_main(void *arg)
        kq = kqueue();
        assert(kq >= 0);
 
+       j = 0;
+       EV_SET(&ke[j++], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+       EV_SET(&ke[j++], pipes[0], EVFILT_READ, EV_ADD, 0, 0, pipes);
+       i = kevent(kq, ke, j, NULL, 0, NULL);
+       assert(i == 0);
+
        while (1) {
                n = kevent(kq, NULL, 0, ke, NKEV, NULL);
                assert(n >= 1 && n <= NKEV);
                for (kp = ke, j = 0; j < n; j++, kp++) {
-                       sp = vca_kev(kp);
-                       if (sp != NULL) {
-                               vca_kq_sess(sp, EV_DELETE);
-                               SES_Delete(sp);
-                               for (i = j; i < n; i++)
-                                       if (ke[i].udata == sp)
-                                               ke[i].udata = NULL;
-                       }
+                       if (kp->filter == EVFILT_TIMER)
+                               continue; 
+                       assert(kp->filter == EVFILT_READ);
+                       vca_kev(kp);
+               }
+               clock_gettime(CLOCK_REALTIME, &ts);
+               ts.tv_sec -= params->sess_timeout;
+               for (;;) {
+                       sp = TAILQ_FIRST(&sesshead);
+                       if (sp == NULL)
+                               break;
+                       if (sp->t_open.tv_sec > ts.tv_sec) 
+                               break;
+                       if (sp->t_open.tv_sec == ts.tv_sec &&
+                           sp->t_open.tv_nsec > ts.tv_nsec)
+                               break;
+                       TAILQ_REMOVE(&sesshead, sp, list);
+                       vca_close_session(sp, "timeout");
+                       SES_Delete(sp);
                }
-       }
-       INCOMPL();
-}
-
-static void *
-vca_kqueue_acct(void *arg)
-{
-       struct sess *sp;
-
-       (void)arg;
-       while (1) {
-               sp = vca_accept_sess(heritage.socket);
-               if (sp == NULL)
-                       continue;
-               clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-               http_RecvPrep(sp->http);
-               vca_kq_sess(sp, EV_ADD);
        }
 }
 
@@ -168,23 +141,18 @@ static void
 vca_kqueue_recycle(struct sess *sp)
 {
 
-       if (sp->fd < 0) {
+       if (sp->fd < 0)
                SES_Delete(sp);
-               return;
-       }
-       (void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-       VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-       if (http_RecvPrepAgain(sp->http))
-               vca_handover(sp, 0);
-       else 
-               vca_kq_sess(sp, EV_ENABLE);
+       else
+               assert(write(pipes[1], &sp, sizeof sp) == sizeof sp);
 }
 
 static void
 vca_kqueue_init(void)
 {
-       AZ(pthread_create(&vca_kqueue_thread1, NULL, vca_kqueue_main, NULL));
-       AZ(pthread_create(&vca_kqueue_thread2, NULL, vca_kqueue_acct, NULL));
+
+       AZ(pipe(pipes));
+       AZ(pthread_create(&vca_kqueue_thread, NULL, vca_kqueue_main, NULL));
 }
 
 struct acceptor acceptor_kqueue = {