]> err.no Git - varnish/commitdiff
parellising varnishreplay. Work in progress.
authorcecilihf <cecilihf@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Tue, 3 Jul 2007 08:07:09 +0000 (08:07 +0000)
committercecilihf <cecilihf@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Tue, 3 Jul 2007 08:07:09 +0000 (08:07 +0000)
git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1620 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishreplay/Makefile.am
varnish-cache/bin/varnishreplay/varnishreplay.c

index 676bba71be12458fe7c88b71d7632b34af87b368..c00169cf5e60424e94fe27768bc3e828dcec7b42 100644 (file)
@@ -14,4 +14,6 @@ varnishreplay_CFLAGS = -include config.h
 varnishreplay_LDADD = \
        $(top_builddir)/lib/libvarnish/libvarnish.la \
        $(top_builddir)/lib/libcompat/libcompat.a \
-       $(top_builddir)/lib/libvarnishapi/libvarnishapi.la
+       $(top_builddir)/lib/libvarnishapi/libvarnishapi.la \
+       ${PTHREAD_LIBS}
+
index 7deae15e45a7ff2450340c05d93d71dc5241bf11..ccab7c6b64f8c9ff00b26b42493afe3e53ab151f 100644 (file)
 
 #include <ctype.h>
 #include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <fcntl.h>
 
 #include "libvarnish.h"
+#include "queue.h"
 #include "varnishapi.h"
 #include "vss.h"
 
-static struct request {
-       char *df_H;                     /* %H, Protocol version */
-       char *df_Host;                  /* %{Host}i */
-       char *df_Uq;                    /* %U%q, URL path and query string */
-       char *df_m;                     /* %m, Request method*/
-       char *df_c;                     /* Connection info (keep-alive, close) */
-       int bogus;                      /* bogus request */
-} **req;
-
-static size_t nreq;
+static struct thread {
+       pthread_t thread_id;
+       struct mailbox *mbox;
+} **threads;
+       
+struct mailbox {
+       pthread_mutex_t lock;
+       pthread_cond_t has_mail;
+       STAILQ_HEAD(msgq_head, message) messages;
+};
+
+struct message {
+       enum shmlogtag tag;
+       char *ptr;
+       unsigned len;
+       STAILQ_ENTRY(message) list;
+};
+
+static size_t nthreads;
 
 static struct vss_addr *adr_info;
-static int sock;
-static int reopen;
 static int debug;
 
+static void
+mailbox_put(struct mailbox *mbox, struct message *msg)
+{
+       pthread_mutex_lock(&mbox->lock);
+       STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
+       pthread_cond_signal(&mbox->has_mail);
+       pthread_mutex_unlock(&mbox->lock);
+}
+
+static struct message *
+mailbox_get(struct mailbox *mbox)
+{
+       struct message *msg;
+       
+       pthread_mutex_lock(&mbox->lock);
+       while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+               pthread_cond_wait(&mbox->has_mail, &mbox->lock);
+       STAILQ_REMOVE_HEAD(&mbox->messages, list);
+       pthread_mutex_unlock(&mbox->lock);
+       return msg;
+}
+
 static int
 isprefix(const char *str, const char *prefix, const char *end, const char **next)
 {
@@ -153,7 +184,7 @@ init_connection(const char *address)
  * A line is terminated by \r\n
  */
 static int
-read_line(char **line)
+read_line(char **line, int sock)
 {
        char *buf;
        unsigned nbuf, lbuf;
@@ -191,7 +222,7 @@ read_line(char **line)
  * the number of bytes read.
  */
 static int
-read_block(int length)
+read_block(int length, int sock)
 {
        char *buf;
        int n, nbuf;
@@ -214,7 +245,7 @@ read_block(int length)
 /* Receive the response after sending a request.
  */
 static int
-receive_response(void)
+receive_response(int sock)
 {
        char *line, *end;
        const char *next;
@@ -229,7 +260,7 @@ receive_response(void)
 
        /* Read header */
        while (1) {
-               line_len = read_line(&line);
+               line_len = read_line(&line, sock);
                end = line + line_len;
 
                if (*line == '\r' && *(line + 1) == '\n') {
@@ -260,7 +291,7 @@ receive_response(void)
                /* Fixed body size, read content_length bytes */
                if (debug)
                        fprintf(stderr, "fixed length\n");
-               n = read_block(content_length);
+               n = read_block(content_length, sock);
                if (debug) {
                        fprintf(stderr, "size of body: %d\n", (int)content_length);
                        fprintf(stderr, "bytes read: %d\n", n);
@@ -270,22 +301,22 @@ receive_response(void)
                if (debug)
                        fprintf(stderr, "chunked encoding\n");
                while (1) {
-                       line_len = read_line(&line);
+                       line_len = read_line(&line, sock);
                        end = line + line_len;
                        block_len = strtol(line, &end, 16);
                        if (block_len == 0) {
                                break;
                        }
-                       n = read_block(block_len);
+                       n = read_block(block_len, sock);
                        if (debug) {
                                fprintf(stderr, "size of body: %d\n", (int)block_len);
                                fprintf(stderr, "bytes read: %d\n", n);
                        }
                        free(line);
-                       n = read_line(&line);
+                       n = read_line(&line, sock);
                        free(line);
                }
-               n = read_line(&line);
+               n = read_line(&line, sock);
                free(line);
        } else if ((content_length <= 0 && !chunked) || req_failed) {
                /* No body --> stop reading. */
@@ -302,155 +333,198 @@ receive_response(void)
        return close_connection;
 }
 
-
-static int
-gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
-    unsigned len, unsigned spec, const char *ptr)
+static void *
+pthread_main(void *arg)
 {
+       struct message *msg;
+       struct thread *th = (struct thread*)arg;
+       enum shmlogtag tag;
+       int len;
+       char *ptr;
        const char *end, *next;
-       FILE *fo;
-       struct request *rp;
-
-       end = ptr + len;
-
-       if (!(spec & VSL_S_CLIENT))
-               return (0);
-
-       if (fd >= nreq) {
-               struct request **newreq = req;
-               size_t newnreq = nreq;
-
-               while (fd >= newnreq)
-                       newnreq += newnreq + 1;
-               newreq = realloc(newreq, newnreq * sizeof *newreq);
-               assert(newreq != NULL);
-               memset(newreq + nreq, 0, (newnreq - nreq) * sizeof *newreq);
-               req = newreq;
-               nreq = newnreq;
-       }
-       if (req[fd] == NULL) {
-               req[fd] = calloc(sizeof *req[fd], 1);
-               assert(req[fd] != NULL);
-       }
-       rp = req[fd];
-
-       switch (tag) {
-       case SLT_RxRequest:
-               if (tag == SLT_RxRequest && (spec & VSL_S_BACKEND))
+       
+       char *df_H = NULL;                      /* %H, Protocol version */
+       char *df_Host = NULL;                   /* %{Host}i */
+       char *df_Uq = NULL;                     /* %U%q, URL path and query string */
+       char *df_m = NULL;                      /* %m, Request method*/
+       char *df_c = NULL;                      /* Connection info (keep-alive, close) */
+       int bogus = 0;                          /* bogus request */
+
+       int sock, reopen = 1;
+       
+       //fprintf(stderr, "thread started\n");
+       
+       do {
+               msg = mailbox_get(th->mbox);
+               tag = msg->tag;
+               len = msg->len;
+               ptr = msg->ptr;
+               end = ptr + len;
+       
+               //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
+               
+               switch (tag) {
+               case SLT_RxRequest:
+                       if (df_m != NULL)
+                               bogus = 1;
+                       else
+                               df_m = trimline(ptr, end);
                        break;
 
-               if (rp->df_m != NULL)
-                       rp->bogus = 1;
-               else
-                       rp->df_m = trimline(ptr, end);
-               break;
-
-       case SLT_RxURL:
-               if (tag == SLT_RxURL && (spec & VSL_S_BACKEND))
+               case SLT_RxURL:
+                       if (df_Uq != NULL)
+                               bogus = 1;
+                       else
+                               df_Uq = trimline(ptr, end);
                        break;
 
-               if (rp->df_Uq != NULL)
-                       rp->bogus = 1;
-               else
-                       rp->df_Uq = trimline(ptr, end);
-               break;
+               case SLT_RxProtocol:
+                       if (df_H != NULL)
+                               bogus = 1;
+                       else
+                               df_H = trimline(ptr, end);
+                       break;
 
-       case SLT_RxProtocol:
-               if (tag == SLT_RxProtocol && (spec & VSL_S_BACKEND))
+               case SLT_RxHeader:
+                       if (isprefix(ptr, "host:", end, &next))
+                               df_Host = trimline(next, end);
+                       if (isprefix(ptr, "connection:", end, &next))
+                               df_c = trimline(next, end);
                        break;
 
-               if (rp->df_H != NULL)
-                       rp->bogus = 1;
-               else
-                       rp->df_H = trimline(ptr, end);
-               break;
-
-       case SLT_RxHeader:
-               if (isprefix(ptr, "host:", end, &next))
-                       rp->df_Host = trimline(next, end);
-               if (isprefix(ptr, "connection:", end, &next))
-                       rp->df_c = trimline(next, end);
-               break;
-
-       default:
-               break;
-       }
+               default:
+                       break;
+               }
 
-       if ((spec & VSL_S_CLIENT) && tag != SLT_ReqEnd)
-               return (0);
+               if (tag != SLT_ReqEnd)
+                       continue;
+                       
+               //fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
 
-       if (!rp->bogus) {
-               fo = priv;
+               if (!bogus) {
+                       /* If the method is supported (GET or HEAD), send the request out
+                        * on the socket. If the socket needs reopening, reopen it first.
+                        * When the request is sent, call the function for receiving
+                        * the answer.
+                        */
+                       if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
+                               if (reopen)
+                                       sock = VSS_connect(adr_info);
+                               reopen = 0;
+
+                               if (debug) {
+                                       fprintf(stderr, "%s ", df_m);
+                                       fprintf(stderr, "%s ", df_Uq);
+                                       fprintf(stderr, "%s ", df_H);
+                                       fprintf(stderr, "\n");
+                                       fprintf(stderr, "Host: ");
+                               }
+                               write(sock, df_m, strlen(df_m));
+                               write(sock, " ", 1);
+                               write(sock, df_Uq, strlen(df_Uq));
+                               write(sock, " ", 1);
+                               write(sock, df_H, strlen(df_H));
+                               write(sock, " ", 1);
+                               write(sock, "\r\n", 2);
 
-               /* If the method is supported (GET or HEAD), send the request out
-                * on the socket. If the socket needs reopening, reopen it first.
-                * When the request is sent, call the function for receiving
-                * the answer.
-                */
-               if (!(strncmp(rp->df_m, "GET", 3) && strncmp(rp->df_m, "HEAD", 4))) {
-                       if (reopen)
-                               sock = VSS_connect(adr_info);
-                       reopen = 0;
+                               if (strncmp(df_H, "HTTP/1.0", 8))
+                                       reopen = 1;
 
-                       if (debug) {
-                               fprintf(fo, "%s ", rp->df_m);
-                               fprintf(fo, "%s ", rp->df_Uq);
-                               fprintf(fo, "%s ", rp->df_H);
-                               fprintf(fo, "\n");
-                               fprintf(fo, "Host: ");
-                       }
-                       write(sock, rp->df_m, strlen(rp->df_m));
-                       write(sock, " ", 1);
-                       write(sock, rp->df_Uq, strlen(rp->df_Uq));
-                       write(sock, " ", 1);
-                       write(sock, rp->df_H, strlen(rp->df_H));
-                       write(sock, " ", 1);
-                       write(sock, "\r\n", 2);
-
-                       if (strncmp(rp->df_H, "HTTP/1.0", 8))
-                               reopen = 1;
-
-                       write(sock, "Host: ", 6);
-                       if (rp->df_Host) {
+                               write(sock, "Host: ", 6);
+                               if (df_Host) {
+                                       if (debug)
+                                               fprintf(stderr, df_Host);
+                                       write(sock, df_Host, strlen(df_Host));
+                               }
                                if (debug)
-                                       fprintf(fo, rp->df_Host);
-                               write(sock, rp->df_Host, strlen(rp->df_Host));
-                       }
-                       if (debug)
-                               fprintf(fo, "\n");
-                       write(sock, "\r\n", 2);
-                       if (rp->df_c) {
+                                       fprintf(stderr, "\n");
+                               write(sock, "\r\n", 2);
+                               if (df_c) {
+                                       if (debug)
+                                               fprintf(stderr, "Connection: %s\n", df_c);
+                                       write(sock, "Connection: ", 12);
+                                       write(sock, df_c, strlen(df_c));
+                                       write(sock, "\r\n", 2);
+                                       if (isequal(df_c, "keep-alive", df_c + strlen(df_c)))
+                                               reopen = 0;
+                               }
                                if (debug)
-                                       fprintf(fo, "Connection: %s\n", rp->df_c);
-                               write(sock, "Connection: ", 12);
-                               write(sock, rp->df_c, strlen(rp->df_c));
+                                       fprintf(stderr, "\n");
                                write(sock, "\r\n", 2);
-                               if (isequal(rp->df_c, "keep-alive", rp->df_c + strlen(rp->df_c)))
-                                       reopen = 0;
+                               if (!reopen)
+                                       reopen = receive_response(sock);
+                               if (reopen)
+                                       close(sock);
                        }
-                       if (debug)
-                               fprintf(fo, "\n");
-                       write(sock, "\r\n", 2);
-                       if (!reopen)
-                               reopen = receive_response();
-                       if (reopen)
-                               close(sock);
                }
-       }
 
-       /* clean up */
+               /* clean up */
 #define freez(x) do { if (x) free(x); x = NULL; } while (0);
-       freez(rp->df_H);
-       freez(rp->df_Host);
-       freez(rp->df_Uq);
-       freez(rp->df_m);
-       freez(rp->df_c);
+               freez(df_H);
+               freez(df_Host);
+               freez(df_Uq);
+               freez(df_m);
+               freez(df_c);
 #undef freez
-       rp->bogus = 0;
+               bogus = 0;
+       } while (1);            
 
        return (0);
 }
 
+
+static int
+gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
+    unsigned len, unsigned spec, const char *ptr)
+{
+       const char *end;
+       struct message *msg;
+       int err;
+       
+       (void)priv;
+
+       end = ptr + len;
+
+       if (!(spec & VSL_S_CLIENT))
+               return (0);
+
+       //fprintf(stderr, "gen_traffic\n");
+
+       if (fd >= nthreads) {
+               struct thread **newthreads = threads;
+               size_t newnthreads = nthreads;
+
+               while (fd >= newnthreads)
+                       newnthreads += newnthreads + 1;
+               newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+               assert(newthreads != NULL);
+               memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
+               threads = newthreads;
+               nthreads = newnthreads;
+       }
+       if (threads[fd] == NULL) {
+               threads[fd] = malloc(sizeof *threads[fd]);
+               assert(threads[fd] != NULL);
+               threads[fd]->mbox = malloc(sizeof (struct mailbox));
+               STAILQ_INIT(&threads[fd]->mbox->messages);
+               pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
+               pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
+               err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
+               if (err)
+                       fprintf(stderr, "thread creation failed\n");
+               fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
+       }
+       msg = malloc(sizeof (struct message));
+       msg->tag = tag;
+       msg->ptr = strdup(ptr);
+       msg->len = len;
+       mailbox_put(threads[fd]->mbox, msg);
+       //fprintf(stderr, "message put\n");
+       
+       return 0;
+}
+
+
 /* This function is for testing only, and only sends
  * the raw data from the file to the address.
  * The receive function is called for each blank line.
@@ -461,6 +535,8 @@ send_test_request(char *file, const char *address)
        int fd = open(file, O_RDONLY);
        char buf[2];
        char last = ' ';
+       int sock, reopen = 1;
+       
        adr_info = init_connection(address);
        sock = VSS_connect(adr_info);
        while (read(fd, buf, 1)) {
@@ -468,7 +544,7 @@ send_test_request(char *file, const char *address)
                fprintf(stderr, "%s", buf);
                if (*buf == '\n' && last == '\n'){
                        fprintf(stderr, "receive\n");
-                       reopen = receive_response();
+                       reopen = receive_response(sock);
                }
                last = *buf;
        }
@@ -491,15 +567,14 @@ main(int argc, char *argv[])
 {
        int c;
        struct VSL_data *vd;
-       const char *ofn = NULL;
        const char *address = NULL;
-       FILE *of;
 
        char *test_file = NULL;
 
        vd = VSL_New();
        debug = 0;
 
+       VSL_Arg(vd, 'c', NULL);
        while ((c = getopt(argc, argv, "a:Dr:t:")) != -1) {
                switch (c) {
                case 'a':
@@ -534,18 +609,10 @@ main(int argc, char *argv[])
        if (VSL_OpenLog(vd, NULL))
                exit(1);
 
-       ofn = "stdout";
-       of = stdout;
-
        adr_info = init_connection(address);
-       reopen = 1;
 
-       while (VSL_Dispatch(vd, gen_traffic, of) == 0) {
-               if (fflush(of) != 0) {
-                       perror(ofn);
-                       exit(1);
-               }
-       }
+       while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
+               /* nothing */ ;
 
        exit(0);
 }