From 0787e814d07166ade9913061a5de698b87a89a4e Mon Sep 17 00:00:00 2001 From: cecilihf Date: Tue, 3 Jul 2007 08:07:09 +0000 Subject: [PATCH] parellising varnishreplay. Work in progress. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1620 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- varnish-cache/bin/varnishreplay/Makefile.am | 4 +- .../bin/varnishreplay/varnishreplay.c | 385 ++++++++++-------- 2 files changed, 229 insertions(+), 160 deletions(-) diff --git a/varnish-cache/bin/varnishreplay/Makefile.am b/varnish-cache/bin/varnishreplay/Makefile.am index 676bba71..c00169cf 100644 --- a/varnish-cache/bin/varnishreplay/Makefile.am +++ b/varnish-cache/bin/varnishreplay/Makefile.am @@ -14,4 +14,6 @@ varnishreplay_CFLAGS = -include config.h varnishreplay_LDADD = \ $(top_builddir)/lib/libvarnish/libvarnish.la \ $(top_builddir)/lib/libcompat/libcompat.a \ - $(top_builddir)/lib/libvarnishapi/libvarnishapi.la + $(top_builddir)/lib/libvarnishapi/libvarnishapi.la \ + ${PTHREAD_LIBS} + diff --git a/varnish-cache/bin/varnishreplay/varnishreplay.c b/varnish-cache/bin/varnishreplay/varnishreplay.c index 7deae15e..ccab7c6b 100644 --- a/varnish-cache/bin/varnishreplay/varnishreplay.c +++ b/varnish-cache/bin/varnishreplay/varnishreplay.c @@ -30,32 +30,63 @@ #include #include +#include +#include #include #include #include #include -#include #include "libvarnish.h" +#include "queue.h" #include "varnishapi.h" #include "vss.h" -static struct request { - char *df_H; /* %H, Protocol version */ - char *df_Host; /* %{Host}i */ - char *df_Uq; /* %U%q, URL path and query string */ - char *df_m; /* %m, Request method*/ - char *df_c; /* Connection info (keep-alive, close) */ - int bogus; /* bogus request */ -} **req; - -static size_t nreq; +static struct thread { + pthread_t thread_id; + struct mailbox *mbox; +} **threads; + +struct mailbox { + pthread_mutex_t lock; + pthread_cond_t has_mail; + STAILQ_HEAD(msgq_head, message) messages; +}; + +struct message { + enum shmlogtag tag; + char *ptr; + unsigned len; + STAILQ_ENTRY(message) list; +}; + +static size_t nthreads; static struct vss_addr *adr_info; -static int sock; -static int reopen; static int debug; +static void +mailbox_put(struct mailbox *mbox, struct message *msg) +{ + pthread_mutex_lock(&mbox->lock); + STAILQ_INSERT_TAIL(&mbox->messages, msg, list); + pthread_cond_signal(&mbox->has_mail); + pthread_mutex_unlock(&mbox->lock); +} + +static struct message * +mailbox_get(struct mailbox *mbox) +{ + struct message *msg; + + pthread_mutex_lock(&mbox->lock); + while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL) + pthread_cond_wait(&mbox->has_mail, &mbox->lock); + STAILQ_REMOVE_HEAD(&mbox->messages, list); + pthread_mutex_unlock(&mbox->lock); + return msg; +} + static int isprefix(const char *str, const char *prefix, const char *end, const char **next) { @@ -153,7 +184,7 @@ init_connection(const char *address) * A line is terminated by \r\n */ static int -read_line(char **line) +read_line(char **line, int sock) { char *buf; unsigned nbuf, lbuf; @@ -191,7 +222,7 @@ read_line(char **line) * the number of bytes read. */ static int -read_block(int length) +read_block(int length, int sock) { char *buf; int n, nbuf; @@ -214,7 +245,7 @@ read_block(int length) /* Receive the response after sending a request. */ static int -receive_response(void) +receive_response(int sock) { char *line, *end; const char *next; @@ -229,7 +260,7 @@ receive_response(void) /* Read header */ while (1) { - line_len = read_line(&line); + line_len = read_line(&line, sock); end = line + line_len; if (*line == '\r' && *(line + 1) == '\n') { @@ -260,7 +291,7 @@ receive_response(void) /* Fixed body size, read content_length bytes */ if (debug) fprintf(stderr, "fixed length\n"); - n = read_block(content_length); + n = read_block(content_length, sock); if (debug) { fprintf(stderr, "size of body: %d\n", (int)content_length); fprintf(stderr, "bytes read: %d\n", n); @@ -270,22 +301,22 @@ receive_response(void) if (debug) fprintf(stderr, "chunked encoding\n"); while (1) { - line_len = read_line(&line); + line_len = read_line(&line, sock); end = line + line_len; block_len = strtol(line, &end, 16); if (block_len == 0) { break; } - n = read_block(block_len); + n = read_block(block_len, sock); if (debug) { fprintf(stderr, "size of body: %d\n", (int)block_len); fprintf(stderr, "bytes read: %d\n", n); } free(line); - n = read_line(&line); + n = read_line(&line, sock); free(line); } - n = read_line(&line); + n = read_line(&line, sock); free(line); } else if ((content_length <= 0 && !chunked) || req_failed) { /* No body --> stop reading. */ @@ -302,155 +333,198 @@ receive_response(void) return close_connection; } - -static int -gen_traffic(void *priv, enum shmlogtag tag, unsigned fd, - unsigned len, unsigned spec, const char *ptr) +static void * +pthread_main(void *arg) { + struct message *msg; + struct thread *th = (struct thread*)arg; + enum shmlogtag tag; + int len; + char *ptr; const char *end, *next; - FILE *fo; - struct request *rp; - - end = ptr + len; - - if (!(spec & VSL_S_CLIENT)) - return (0); - - if (fd >= nreq) { - struct request **newreq = req; - size_t newnreq = nreq; - - while (fd >= newnreq) - newnreq += newnreq + 1; - newreq = realloc(newreq, newnreq * sizeof *newreq); - assert(newreq != NULL); - memset(newreq + nreq, 0, (newnreq - nreq) * sizeof *newreq); - req = newreq; - nreq = newnreq; - } - if (req[fd] == NULL) { - req[fd] = calloc(sizeof *req[fd], 1); - assert(req[fd] != NULL); - } - rp = req[fd]; - - switch (tag) { - case SLT_RxRequest: - if (tag == SLT_RxRequest && (spec & VSL_S_BACKEND)) + + char *df_H = NULL; /* %H, Protocol version */ + char *df_Host = NULL; /* %{Host}i */ + char *df_Uq = NULL; /* %U%q, URL path and query string */ + char *df_m = NULL; /* %m, Request method*/ + char *df_c = NULL; /* Connection info (keep-alive, close) */ + int bogus = 0; /* bogus request */ + + int sock, reopen = 1; + + //fprintf(stderr, "thread started\n"); + + do { + msg = mailbox_get(th->mbox); + tag = msg->tag; + len = msg->len; + ptr = msg->ptr; + end = ptr + len; + + //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr); + + switch (tag) { + case SLT_RxRequest: + if (df_m != NULL) + bogus = 1; + else + df_m = trimline(ptr, end); break; - if (rp->df_m != NULL) - rp->bogus = 1; - else - rp->df_m = trimline(ptr, end); - break; - - case SLT_RxURL: - if (tag == SLT_RxURL && (spec & VSL_S_BACKEND)) + case SLT_RxURL: + if (df_Uq != NULL) + bogus = 1; + else + df_Uq = trimline(ptr, end); break; - if (rp->df_Uq != NULL) - rp->bogus = 1; - else - rp->df_Uq = trimline(ptr, end); - break; + case SLT_RxProtocol: + if (df_H != NULL) + bogus = 1; + else + df_H = trimline(ptr, end); + break; - case SLT_RxProtocol: - if (tag == SLT_RxProtocol && (spec & VSL_S_BACKEND)) + case SLT_RxHeader: + if (isprefix(ptr, "host:", end, &next)) + df_Host = trimline(next, end); + if (isprefix(ptr, "connection:", end, &next)) + df_c = trimline(next, end); break; - if (rp->df_H != NULL) - rp->bogus = 1; - else - rp->df_H = trimline(ptr, end); - break; - - case SLT_RxHeader: - if (isprefix(ptr, "host:", end, &next)) - rp->df_Host = trimline(next, end); - if (isprefix(ptr, "connection:", end, &next)) - rp->df_c = trimline(next, end); - break; - - default: - break; - } + default: + break; + } - if ((spec & VSL_S_CLIENT) && tag != SLT_ReqEnd) - return (0); + if (tag != SLT_ReqEnd) + continue; + + //fprintf(stderr, "bogus: %d %s\n", bogus, df_m); - if (!rp->bogus) { - fo = priv; + if (!bogus) { + /* If the method is supported (GET or HEAD), send the request out + * on the socket. If the socket needs reopening, reopen it first. + * When the request is sent, call the function for receiving + * the answer. + */ + if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) { + if (reopen) + sock = VSS_connect(adr_info); + reopen = 0; + + if (debug) { + fprintf(stderr, "%s ", df_m); + fprintf(stderr, "%s ", df_Uq); + fprintf(stderr, "%s ", df_H); + fprintf(stderr, "\n"); + fprintf(stderr, "Host: "); + } + write(sock, df_m, strlen(df_m)); + write(sock, " ", 1); + write(sock, df_Uq, strlen(df_Uq)); + write(sock, " ", 1); + write(sock, df_H, strlen(df_H)); + write(sock, " ", 1); + write(sock, "\r\n", 2); - /* If the method is supported (GET or HEAD), send the request out - * on the socket. If the socket needs reopening, reopen it first. - * When the request is sent, call the function for receiving - * the answer. - */ - if (!(strncmp(rp->df_m, "GET", 3) && strncmp(rp->df_m, "HEAD", 4))) { - if (reopen) - sock = VSS_connect(adr_info); - reopen = 0; + if (strncmp(df_H, "HTTP/1.0", 8)) + reopen = 1; - if (debug) { - fprintf(fo, "%s ", rp->df_m); - fprintf(fo, "%s ", rp->df_Uq); - fprintf(fo, "%s ", rp->df_H); - fprintf(fo, "\n"); - fprintf(fo, "Host: "); - } - write(sock, rp->df_m, strlen(rp->df_m)); - write(sock, " ", 1); - write(sock, rp->df_Uq, strlen(rp->df_Uq)); - write(sock, " ", 1); - write(sock, rp->df_H, strlen(rp->df_H)); - write(sock, " ", 1); - write(sock, "\r\n", 2); - - if (strncmp(rp->df_H, "HTTP/1.0", 8)) - reopen = 1; - - write(sock, "Host: ", 6); - if (rp->df_Host) { + write(sock, "Host: ", 6); + if (df_Host) { + if (debug) + fprintf(stderr, df_Host); + write(sock, df_Host, strlen(df_Host)); + } if (debug) - fprintf(fo, rp->df_Host); - write(sock, rp->df_Host, strlen(rp->df_Host)); - } - if (debug) - fprintf(fo, "\n"); - write(sock, "\r\n", 2); - if (rp->df_c) { + fprintf(stderr, "\n"); + write(sock, "\r\n", 2); + if (df_c) { + if (debug) + fprintf(stderr, "Connection: %s\n", df_c); + write(sock, "Connection: ", 12); + write(sock, df_c, strlen(df_c)); + write(sock, "\r\n", 2); + if (isequal(df_c, "keep-alive", df_c + strlen(df_c))) + reopen = 0; + } if (debug) - fprintf(fo, "Connection: %s\n", rp->df_c); - write(sock, "Connection: ", 12); - write(sock, rp->df_c, strlen(rp->df_c)); + fprintf(stderr, "\n"); write(sock, "\r\n", 2); - if (isequal(rp->df_c, "keep-alive", rp->df_c + strlen(rp->df_c))) - reopen = 0; + if (!reopen) + reopen = receive_response(sock); + if (reopen) + close(sock); } - if (debug) - fprintf(fo, "\n"); - write(sock, "\r\n", 2); - if (!reopen) - reopen = receive_response(); - if (reopen) - close(sock); } - } - /* clean up */ + /* clean up */ #define freez(x) do { if (x) free(x); x = NULL; } while (0); - freez(rp->df_H); - freez(rp->df_Host); - freez(rp->df_Uq); - freez(rp->df_m); - freez(rp->df_c); + freez(df_H); + freez(df_Host); + freez(df_Uq); + freez(df_m); + freez(df_c); #undef freez - rp->bogus = 0; + bogus = 0; + } while (1); return (0); } + +static int +gen_traffic(void *priv, enum shmlogtag tag, unsigned fd, + unsigned len, unsigned spec, const char *ptr) +{ + const char *end; + struct message *msg; + int err; + + (void)priv; + + end = ptr + len; + + if (!(spec & VSL_S_CLIENT)) + return (0); + + //fprintf(stderr, "gen_traffic\n"); + + if (fd >= nthreads) { + struct thread **newthreads = threads; + size_t newnthreads = nthreads; + + while (fd >= newnthreads) + newnthreads += newnthreads + 1; + newthreads = realloc(newthreads, newnthreads * sizeof *newthreads); + assert(newthreads != NULL); + memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads); + threads = newthreads; + nthreads = newnthreads; + } + if (threads[fd] == NULL) { + threads[fd] = malloc(sizeof *threads[fd]); + assert(threads[fd] != NULL); + threads[fd]->mbox = malloc(sizeof (struct mailbox)); + STAILQ_INIT(&threads[fd]->mbox->messages); + pthread_mutex_init(&threads[fd]->mbox->lock, NULL); + pthread_cond_init(&threads[fd]->mbox->has_mail, NULL); + err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]); + if (err) + fprintf(stderr, "thread creation failed\n"); + fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err); + } + msg = malloc(sizeof (struct message)); + msg->tag = tag; + msg->ptr = strdup(ptr); + msg->len = len; + mailbox_put(threads[fd]->mbox, msg); + //fprintf(stderr, "message put\n"); + + return 0; +} + + /* This function is for testing only, and only sends * the raw data from the file to the address. * The receive function is called for each blank line. @@ -461,6 +535,8 @@ send_test_request(char *file, const char *address) int fd = open(file, O_RDONLY); char buf[2]; char last = ' '; + int sock, reopen = 1; + adr_info = init_connection(address); sock = VSS_connect(adr_info); while (read(fd, buf, 1)) { @@ -468,7 +544,7 @@ send_test_request(char *file, const char *address) fprintf(stderr, "%s", buf); if (*buf == '\n' && last == '\n'){ fprintf(stderr, "receive\n"); - reopen = receive_response(); + reopen = receive_response(sock); } last = *buf; } @@ -491,15 +567,14 @@ main(int argc, char *argv[]) { int c; struct VSL_data *vd; - const char *ofn = NULL; const char *address = NULL; - FILE *of; char *test_file = NULL; vd = VSL_New(); debug = 0; + VSL_Arg(vd, 'c', NULL); while ((c = getopt(argc, argv, "a:Dr:t:")) != -1) { switch (c) { case 'a': @@ -534,18 +609,10 @@ main(int argc, char *argv[]) if (VSL_OpenLog(vd, NULL)) exit(1); - ofn = "stdout"; - of = stdout; - adr_info = init_connection(address); - reopen = 1; - while (VSL_Dispatch(vd, gen_traffic, of) == 0) { - if (fflush(of) != 0) { - perror(ofn); - exit(1); - } - } + while (VSL_Dispatch(vd, gen_traffic, NULL) == 0) + /* nothing */ ; exit(0); } -- 2.39.5