From: des Date: Tue, 3 Jul 2007 09:09:55 +0000 (+0000) Subject: Reorganize the code a little, and add code to wait for all threads to finish X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3637301f7a6de50b85fa7a89df0772182dc81beb;p=varnish Reorganize the code a little, and add code to wait for all threads to finish processing pending messages before we exit. Note that VSL_Dispatch() will read in log data as fast as it can, so when working from a log file, varnishreplay will usually read in the entire file into memory within the first few seconds. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1623 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- diff --git a/varnish-cache/bin/varnishreplay/varnishreplay.c b/varnish-cache/bin/varnishreplay/varnishreplay.c index ccab7c6b..69b015f7 100644 --- a/varnish-cache/bin/varnishreplay/varnishreplay.c +++ b/varnish-cache/bin/varnishreplay/varnishreplay.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -42,32 +43,60 @@ #include "varnishapi.h" #include "vss.h" -static struct thread { - pthread_t thread_id; - struct mailbox *mbox; -} **threads; - -struct mailbox { - pthread_mutex_t lock; - pthread_cond_t has_mail; - STAILQ_HEAD(msgq_head, message) messages; -}; +#ifndef HAVE_STRNDUP +#include "compat/strndup.h" +#endif + +#define freez(x) do { if (x) free(x); x = NULL; } while (0); + +static struct vss_addr *addr_info; +static int debug; + +/* + * mailbox toolkit + */ struct message { enum shmlogtag tag; + size_t len; char *ptr; - unsigned len; STAILQ_ENTRY(message) list; }; -static size_t nthreads; +struct mailbox { + pthread_mutex_t lock; + pthread_cond_t has_mail; + int open; + STAILQ_HEAD(msgq_head, message) messages; +}; -static struct vss_addr *adr_info; -static int debug; +static void +mailbox_create(struct mailbox *mbox) +{ + + STAILQ_INIT(&mbox->messages); + pthread_mutex_init(&mbox->lock, NULL); + pthread_cond_init(&mbox->has_mail, NULL); + mbox->open = 1; +} + +static void +mailbox_destroy(struct mailbox *mbox) +{ + struct message *msg; + + while ((msg = STAILQ_FIRST(&mbox->messages))) { + STAILQ_REMOVE_HEAD(&mbox->messages, list); + free(msg); + } + pthread_cond_destroy(&mbox->has_mail); + pthread_mutex_destroy(&mbox->lock); +} static void mailbox_put(struct mailbox *mbox, struct message *msg) { + pthread_mutex_lock(&mbox->lock); STAILQ_INSERT_TAIL(&mbox->messages, msg, list); pthread_cond_signal(&mbox->has_mail); @@ -78,15 +107,113 @@ static struct message * mailbox_get(struct mailbox *mbox) { struct message *msg; - + pthread_mutex_lock(&mbox->lock); - while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL) + while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL && mbox->open) pthread_cond_wait(&mbox->has_mail, &mbox->lock); - STAILQ_REMOVE_HEAD(&mbox->messages, list); + if (msg != NULL) + STAILQ_REMOVE_HEAD(&mbox->messages, list); pthread_mutex_unlock(&mbox->lock); return msg; } +static void +mailbox_close(struct mailbox *mbox) +{ + pthread_mutex_lock(&mbox->lock); + mbox->open = 0; + pthread_cond_signal(&mbox->has_mail); + pthread_mutex_unlock(&mbox->lock); +} + +/* + * thread toolkit + */ + +struct thread { + pthread_t thread_id; + struct mailbox mbox; +}; + +static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void +thread_log(int lvl, const char *fmt, ...) +{ + va_list ap; + + if (lvl > debug) + return; + pthread_mutex_lock(&log_mutex); + fprintf(stderr, "%08x ", (unsigned int)pthread_self()); + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + pthread_mutex_unlock(&log_mutex); +} + +static struct thread **threads; +static size_t nthreads; + +static struct thread * +thread_get(int fd, void *(*thread_main)(void *)) +{ + + assert(fd != 0); + if (fd >= nthreads) { + struct thread **newthreads = threads; + size_t newnthreads = nthreads; + + while (fd >= newnthreads) + newnthreads += newnthreads + 1; + newthreads = realloc(newthreads, newnthreads * sizeof *newthreads); + assert(newthreads != NULL); + memset(newthreads + nthreads, 0, + (newnthreads - nthreads) * sizeof *newthreads); + threads = newthreads; + nthreads = newnthreads; + } + if (threads[fd] == NULL) { + threads[fd] = malloc(sizeof *threads[fd]); + assert(threads[fd] != NULL); + mailbox_create(&threads[fd]->mbox); + if (pthread_create(&threads[fd]->thread_id, NULL, + thread_main, threads[fd]) != 0) { + thread_log(0, "thread creation failed\n"); + mailbox_destroy(&threads[fd]->mbox); + freez(threads[fd]); + } + thread_log(1, "thread %08x started\n", + (unsigned int)threads[fd]->thread_id); + } + return (threads[fd]); +} + +static void +thread_close(int fd) +{ + + assert(fd < nthreads); + if (fd == 0) { + for (fd = 1; fd < nthreads; ++fd) + thread_close(fd); + return; + } + + if (threads[fd] == NULL) + return; + mailbox_close(&threads[fd]->mbox); + pthread_join(threads[fd]->thread_id, NULL); + thread_log(1, "thread %08x stopped\n", + (unsigned int)threads[fd]->thread_id); + mailbox_destroy(&threads[fd]->mbox); + freez(threads[fd]); +} + +/* + * ... + */ + static int isprefix(const char *str, const char *prefix, const char *end, const char **next) { @@ -159,14 +286,14 @@ init_connection(const char *address) int i, n; if (VSS_parse(address, &addr, &port) != 0) { - fprintf(stderr, "Invalid address\n"); + thread_log(0, "Invalid address\n"); exit(2); } n = VSS_resolve(addr, port, &ta); free(addr); free(port); if (n == 0) { - fprintf(stderr, "Could not connect to server\n"); + thread_log(0, "Could not connect to server\n"); exit(2); } for (i = 1; i < n; ++i) { @@ -200,12 +327,11 @@ read_line(char **line, int sock) buf = realloc(buf, lbuf); XXXAN(buf); } - //fprintf(stderr, "start reading\n"); i = read(sock, buf + nbuf, 1); if (i <= 0) { - perror("error in reading\n"); + thread_log(0, "read(): %s\n", strerror(errno)); free(buf); - exit(1); + return (-1); } nbuf += i; if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n') @@ -233,8 +359,8 @@ read_block(int length, int sock) n = read(sock, buf + nbuf, (2048 < length - nbuf ? 2048 : length - nbuf)); if (n <= 0) { - perror("failed reading the block\n"); - break; + thread_log(0, "failed reading the block\n"); + return (-1); } nbuf += n; } @@ -282,24 +408,20 @@ receive_response(int sock) free(line); } - if (debug) - fprintf(stderr, "status: %d\n", status); + thread_log(1, "status: %d\n", status); /* Read body */ if (content_length > 0 && !chunked) { /* Fixed body size, read content_length bytes */ - if (debug) - fprintf(stderr, "fixed length\n"); - n = read_block(content_length, sock); - if (debug) { - fprintf(stderr, "size of body: %d\n", (int)content_length); - fprintf(stderr, "bytes read: %d\n", n); - } + thread_log(1, "fixed length\n"); + thread_log(1, "size of body: %ld\n", content_length); + if ((n = read_block(content_length, sock)) < 0) + return (1); + thread_log(1, "bytes read: %d\n", n); } else if (chunked) { /* Chunked encoding, read size and bytes until no more */ - if (debug) - fprintf(stderr, "chunked encoding\n"); + thread_log(1, "chunked encoding\n"); while (1) { line_len = read_line(&line, sock); end = line + line_len; @@ -308,10 +430,8 @@ receive_response(int sock) break; } n = read_block(block_len, sock); - if (debug) { - fprintf(stderr, "size of body: %d\n", (int)block_len); - fprintf(stderr, "bytes read: %d\n", n); - } + thread_log(1, "size of body: %d\n", (int)block_len); + thread_log(1, "bytes read: %d\n", n); free(line); n = read_line(&line, sock); free(line); @@ -320,29 +440,27 @@ receive_response(int sock) free(line); } else if ((content_length <= 0 && !chunked) || req_failed) { /* No body --> stop reading. */ - if (debug) - fprintf(stderr, "no body\n"); + thread_log(1, "no body\n"); + return (1); } else { /* Unhandled case. */ - fprintf(stderr, "An error occured\n"); - exit(1); + thread_log(0, "An error occured\n"); + return (1); } - if (debug) - fprintf(stderr, "\n"); return close_connection; } static void * -pthread_main(void *arg) +replay_thread(void *arg) { + struct thread *thr = arg; struct message *msg; - struct thread *th = (struct thread*)arg; enum shmlogtag tag; - int len; + size_t len; char *ptr; const char *end, *next; - + char *df_H = NULL; /* %H, Protocol version */ char *df_Host = NULL; /* %{Host}i */ char *df_Uq = NULL; /* %U%q, URL path and query string */ @@ -351,18 +469,15 @@ pthread_main(void *arg) int bogus = 0; /* bogus request */ int sock, reopen = 1; - - //fprintf(stderr, "thread started\n"); - - do { - msg = mailbox_get(th->mbox); + + while ((msg = mailbox_get(&thr->mbox)) != NULL) { tag = msg->tag; len = msg->len; ptr = msg->ptr; end = ptr + len; - - //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr); - + + thread_log(2, "%s(%s)\n", VSL_tags[tag], msg->ptr); + switch (tag) { case SLT_RxRequest: if (df_m != NULL) @@ -398,10 +513,10 @@ pthread_main(void *arg) if (tag != SLT_ReqEnd) continue; - - //fprintf(stderr, "bogus: %d %s\n", bogus, df_m); - if (!bogus) { + if (bogus) { + thread_log(1, "bogus\n"); + } else { /* If the method is supported (GET or HEAD), send the request out * on the socket. If the socket needs reopening, reopen it first. * When the request is sent, call the function for receiving @@ -409,16 +524,11 @@ pthread_main(void *arg) */ if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) { if (reopen) - sock = VSS_connect(adr_info); + sock = VSS_connect(addr_info); reopen = 0; - if (debug) { - fprintf(stderr, "%s ", df_m); - fprintf(stderr, "%s ", df_Uq); - fprintf(stderr, "%s ", df_H); - fprintf(stderr, "\n"); - fprintf(stderr, "Host: "); - } + thread_log(1, "%s %s %s\n", df_m, df_Uq, df_H); + write(sock, df_m, strlen(df_m)); write(sock, " ", 1); write(sock, df_Uq, strlen(df_Uq)); @@ -432,16 +542,12 @@ pthread_main(void *arg) write(sock, "Host: ", 6); if (df_Host) { - if (debug) - fprintf(stderr, df_Host); + thread_log(1, "Host: %s\n", df_Host); write(sock, df_Host, strlen(df_Host)); } - if (debug) - fprintf(stderr, "\n"); write(sock, "\r\n", 2); if (df_c) { - if (debug) - fprintf(stderr, "Connection: %s\n", df_c); + thread_log(1, "Connection: %s\n", df_c); write(sock, "Connection: ", 12); write(sock, df_c, strlen(df_c)); write(sock, "\r\n", 2); @@ -449,7 +555,7 @@ pthread_main(void *arg) reopen = 0; } if (debug) - fprintf(stderr, "\n"); + thread_log(0, "\n"); write(sock, "\r\n", 2); if (!reopen) reopen = receive_response(sock); @@ -459,16 +565,15 @@ pthread_main(void *arg) } /* clean up */ -#define freez(x) do { if (x) free(x); x = NULL; } while (0); + freez(msg->ptr); + freez(msg); freez(df_H); freez(df_Host); freez(df_Uq); freez(df_m); freez(df_c); -#undef freez bogus = 0; - } while (1); - + } return (0); } @@ -477,50 +582,25 @@ static int gen_traffic(void *priv, enum shmlogtag tag, unsigned fd, unsigned len, unsigned spec, const char *ptr) { + struct thread *thr; const char *end; struct message *msg; - int err; - + (void)priv; end = ptr + len; - if (!(spec & VSL_S_CLIENT)) + if (fd == 0 || !(spec & VSL_S_CLIENT)) return (0); - //fprintf(stderr, "gen_traffic\n"); - - if (fd >= nthreads) { - struct thread **newthreads = threads; - size_t newnthreads = nthreads; - - while (fd >= newnthreads) - newnthreads += newnthreads + 1; - newthreads = realloc(newthreads, newnthreads * sizeof *newthreads); - assert(newthreads != NULL); - memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads); - threads = newthreads; - nthreads = newnthreads; - } - if (threads[fd] == NULL) { - threads[fd] = malloc(sizeof *threads[fd]); - assert(threads[fd] != NULL); - threads[fd]->mbox = malloc(sizeof (struct mailbox)); - STAILQ_INIT(&threads[fd]->mbox->messages); - pthread_mutex_init(&threads[fd]->mbox->lock, NULL); - pthread_cond_init(&threads[fd]->mbox->has_mail, NULL); - err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]); - if (err) - fprintf(stderr, "thread creation failed\n"); - fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err); - } + thread_log(2, "%d %s\n", fd, VSL_tags[tag]); + thr = thread_get(fd, replay_thread); msg = malloc(sizeof (struct message)); msg->tag = tag; - msg->ptr = strdup(ptr); msg->len = len; - mailbox_put(threads[fd]->mbox, msg); - //fprintf(stderr, "message put\n"); - + msg->ptr = strndup(ptr, len); + mailbox_put(&thr->mbox, msg); + return 0; } @@ -536,14 +616,14 @@ send_test_request(char *file, const char *address) char buf[2]; char last = ' '; int sock, reopen = 1; - - adr_info = init_connection(address); - sock = VSS_connect(adr_info); + + addr_info = init_connection(address); + sock = VSS_connect(addr_info); while (read(fd, buf, 1)) { write(sock, buf, 1); - fprintf(stderr, "%s", buf); + thread_log(0, "%s", buf); if (*buf == '\n' && last == '\n'){ - fprintf(stderr, "receive\n"); + thread_log(0, "receive\n"); reopen = receive_response(sock); } last = *buf; @@ -581,7 +661,7 @@ main(int argc, char *argv[]) address = optarg; break; case 'D': - debug = 1; + ++debug; break; case 't': /* This option is for testing only. The test file must contain @@ -609,10 +689,10 @@ main(int argc, char *argv[]) if (VSL_OpenLog(vd, NULL)) exit(1); - adr_info = init_connection(address); + addr_info = init_connection(address); while (VSL_Dispatch(vd, gen_traffic, NULL) == 0) /* nothing */ ; - + thread_close(0); exit(0); }