]> err.no Git - varnish/commitdiff
Reorganize the code a little, and add code to wait for all threads to finish
authordes <des@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Tue, 3 Jul 2007 09:09:55 +0000 (09:09 +0000)
committerdes <des@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Tue, 3 Jul 2007 09:09:55 +0000 (09:09 +0000)
processing pending messages before we exit.

Note that VSL_Dispatch() will read in log data as fast as it can, so when
working from a log file, varnishreplay will usually read in the entire file
into memory within the first few seconds.

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1623 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/bin/varnishreplay/varnishreplay.c

index ccab7c6b64f8c9ff00b26b42493afe3e53ab151f..69b015f7d4634ef752cb2bb378ecfc59ac9ed0f9 100644 (file)
@@ -32,6 +32,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <pthread.h>
+#include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include "varnishapi.h"
 #include "vss.h"
 
-static struct thread {
-       pthread_t thread_id;
-       struct mailbox *mbox;
-} **threads;
-       
-struct mailbox {
-       pthread_mutex_t lock;
-       pthread_cond_t has_mail;
-       STAILQ_HEAD(msgq_head, message) messages;
-};
+#ifndef HAVE_STRNDUP
+#include "compat/strndup.h"
+#endif
+
+#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+
+static struct vss_addr *addr_info;
+static int debug;
+
+/*
+ * mailbox toolkit
+ */
 
 struct message {
        enum shmlogtag tag;
+       size_t len;
        char *ptr;
-       unsigned len;
        STAILQ_ENTRY(message) list;
 };
 
-static size_t nthreads;
+struct mailbox {
+       pthread_mutex_t lock;
+       pthread_cond_t has_mail;
+       int open;
+       STAILQ_HEAD(msgq_head, message) messages;
+};
 
-static struct vss_addr *adr_info;
-static int debug;
+static void
+mailbox_create(struct mailbox *mbox)
+{
+
+       STAILQ_INIT(&mbox->messages);
+       pthread_mutex_init(&mbox->lock, NULL);
+       pthread_cond_init(&mbox->has_mail, NULL);
+       mbox->open = 1;
+}
+
+static void
+mailbox_destroy(struct mailbox *mbox)
+{
+       struct message *msg;
+
+       while ((msg = STAILQ_FIRST(&mbox->messages))) {
+               STAILQ_REMOVE_HEAD(&mbox->messages, list);
+               free(msg);
+       }
+       pthread_cond_destroy(&mbox->has_mail);
+       pthread_mutex_destroy(&mbox->lock);
+}
 
 static void
 mailbox_put(struct mailbox *mbox, struct message *msg)
 {
+
        pthread_mutex_lock(&mbox->lock);
        STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
        pthread_cond_signal(&mbox->has_mail);
@@ -78,15 +107,113 @@ static struct message *
 mailbox_get(struct mailbox *mbox)
 {
        struct message *msg;
-       
+
        pthread_mutex_lock(&mbox->lock);
-       while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+       while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL && mbox->open)
                pthread_cond_wait(&mbox->has_mail, &mbox->lock);
-       STAILQ_REMOVE_HEAD(&mbox->messages, list);
+       if (msg != NULL)
+               STAILQ_REMOVE_HEAD(&mbox->messages, list);
        pthread_mutex_unlock(&mbox->lock);
        return msg;
 }
 
+static void
+mailbox_close(struct mailbox *mbox)
+{
+       pthread_mutex_lock(&mbox->lock);
+       mbox->open = 0;
+       pthread_cond_signal(&mbox->has_mail);
+       pthread_mutex_unlock(&mbox->lock);
+}
+
+/*
+ * thread toolkit
+ */
+
+struct thread {
+       pthread_t thread_id;
+       struct mailbox mbox;
+};
+
+static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+thread_log(int lvl, const char *fmt, ...)
+{
+       va_list ap;
+
+       if (lvl > debug)
+               return;
+       pthread_mutex_lock(&log_mutex);
+       fprintf(stderr, "%08x ", (unsigned int)pthread_self());
+       va_start(ap, fmt);
+       vfprintf(stderr, fmt, ap);
+       va_end(ap);
+       pthread_mutex_unlock(&log_mutex);
+}
+
+static struct thread **threads;
+static size_t nthreads;
+
+static struct thread *
+thread_get(int fd, void *(*thread_main)(void *))
+{
+
+       assert(fd != 0);
+       if (fd >= nthreads) {
+               struct thread **newthreads = threads;
+               size_t newnthreads = nthreads;
+
+               while (fd >= newnthreads)
+                       newnthreads += newnthreads + 1;
+               newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+               assert(newthreads != NULL);
+               memset(newthreads + nthreads, 0,
+                   (newnthreads - nthreads) * sizeof *newthreads);
+               threads = newthreads;
+               nthreads = newnthreads;
+       }
+       if (threads[fd] == NULL) {
+               threads[fd] = malloc(sizeof *threads[fd]);
+               assert(threads[fd] != NULL);
+               mailbox_create(&threads[fd]->mbox);
+               if (pthread_create(&threads[fd]->thread_id, NULL,
+                   thread_main, threads[fd]) != 0) {
+                       thread_log(0, "thread creation failed\n");
+                       mailbox_destroy(&threads[fd]->mbox);
+                       freez(threads[fd]);
+               }
+               thread_log(1, "thread %08x started\n",
+                   (unsigned int)threads[fd]->thread_id);
+       }
+       return (threads[fd]);
+}
+
+static void
+thread_close(int fd)
+{
+
+       assert(fd < nthreads);
+       if (fd == 0) {
+               for (fd = 1; fd < nthreads; ++fd)
+                       thread_close(fd);
+               return;
+       }
+
+       if (threads[fd] == NULL)
+               return;
+       mailbox_close(&threads[fd]->mbox);
+       pthread_join(threads[fd]->thread_id, NULL);
+       thread_log(1, "thread %08x stopped\n",
+           (unsigned int)threads[fd]->thread_id);
+       mailbox_destroy(&threads[fd]->mbox);
+       freez(threads[fd]);
+}
+
+/*
+ * ...
+ */
+
 static int
 isprefix(const char *str, const char *prefix, const char *end, const char **next)
 {
@@ -159,14 +286,14 @@ init_connection(const char *address)
        int i, n;
 
        if (VSS_parse(address, &addr, &port) != 0) {
-               fprintf(stderr, "Invalid address\n");
+               thread_log(0, "Invalid address\n");
                exit(2);
        }
        n = VSS_resolve(addr, port, &ta);
        free(addr);
        free(port);
        if (n == 0) {
-               fprintf(stderr, "Could not connect to server\n");
+               thread_log(0, "Could not connect to server\n");
                exit(2);
        }
        for (i = 1; i < n; ++i) {
@@ -200,12 +327,11 @@ read_line(char **line, int sock)
                        buf = realloc(buf, lbuf);
                        XXXAN(buf);
                }
-               //fprintf(stderr, "start reading\n");
                i = read(sock, buf + nbuf, 1);
                if (i <= 0) {
-                       perror("error in reading\n");
+                       thread_log(0, "read(): %s\n", strerror(errno));
                        free(buf);
-                       exit(1);
+                       return (-1);
                }
                nbuf += i;
                if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n')
@@ -233,8 +359,8 @@ read_block(int length, int sock)
                n = read(sock, buf + nbuf,
                    (2048 < length - nbuf ? 2048 : length - nbuf));
                if (n <= 0) {
-                       perror("failed reading the block\n");
-                       break;
+                       thread_log(0, "failed reading the block\n");
+                       return (-1);
                }
                nbuf += n;
        }
@@ -282,24 +408,20 @@ receive_response(int sock)
                free(line);
        }
 
-       if (debug)
-               fprintf(stderr, "status: %d\n", status);
+       thread_log(1, "status: %d\n", status);
 
 
        /* Read body */
        if (content_length > 0 && !chunked) {
                /* Fixed body size, read content_length bytes */
-               if (debug)
-                       fprintf(stderr, "fixed length\n");
-               n = read_block(content_length, sock);
-               if (debug) {
-                       fprintf(stderr, "size of body: %d\n", (int)content_length);
-                       fprintf(stderr, "bytes read: %d\n", n);
-               }
+               thread_log(1, "fixed length\n");
+               thread_log(1, "size of body: %ld\n", content_length);
+               if ((n = read_block(content_length, sock)) < 0)
+                       return (1);
+               thread_log(1, "bytes read: %d\n", n);
        } else if (chunked) {
                /* Chunked encoding, read size and bytes until no more */
-               if (debug)
-                       fprintf(stderr, "chunked encoding\n");
+               thread_log(1, "chunked encoding\n");
                while (1) {
                        line_len = read_line(&line, sock);
                        end = line + line_len;
@@ -308,10 +430,8 @@ receive_response(int sock)
                                break;
                        }
                        n = read_block(block_len, sock);
-                       if (debug) {
-                               fprintf(stderr, "size of body: %d\n", (int)block_len);
-                               fprintf(stderr, "bytes read: %d\n", n);
-                       }
+                       thread_log(1, "size of body: %d\n", (int)block_len);
+                       thread_log(1, "bytes read: %d\n", n);
                        free(line);
                        n = read_line(&line, sock);
                        free(line);
@@ -320,29 +440,27 @@ receive_response(int sock)
                free(line);
        } else if ((content_length <= 0 && !chunked) || req_failed) {
                /* No body --> stop reading. */
-               if (debug)
-                       fprintf(stderr, "no body\n");
+               thread_log(1, "no body\n");
+               return (1);
        } else {
                /* Unhandled case. */
-               fprintf(stderr, "An error occured\n");
-               exit(1);
+               thread_log(0, "An error occured\n");
+               return (1);
        }
-       if (debug)
-               fprintf(stderr, "\n");
 
        return close_connection;
 }
 
 static void *
-pthread_main(void *arg)
+replay_thread(void *arg)
 {
+       struct thread *thr = arg;
        struct message *msg;
-       struct thread *th = (struct thread*)arg;
        enum shmlogtag tag;
-       int len;
+       size_t len;
        char *ptr;
        const char *end, *next;
-       
+
        char *df_H = NULL;                      /* %H, Protocol version */
        char *df_Host = NULL;                   /* %{Host}i */
        char *df_Uq = NULL;                     /* %U%q, URL path and query string */
@@ -351,18 +469,15 @@ pthread_main(void *arg)
        int bogus = 0;                          /* bogus request */
 
        int sock, reopen = 1;
-       
-       //fprintf(stderr, "thread started\n");
-       
-       do {
-               msg = mailbox_get(th->mbox);
+
+       while ((msg = mailbox_get(&thr->mbox)) != NULL) {
                tag = msg->tag;
                len = msg->len;
                ptr = msg->ptr;
                end = ptr + len;
-       
-               //fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
-               
+
+               thread_log(2, "%s(%s)\n", VSL_tags[tag], msg->ptr);
+
                switch (tag) {
                case SLT_RxRequest:
                        if (df_m != NULL)
@@ -398,10 +513,10 @@ pthread_main(void *arg)
 
                if (tag != SLT_ReqEnd)
                        continue;
-                       
-               //fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
 
-               if (!bogus) {
+               if (bogus) {
+                       thread_log(1, "bogus\n");
+               } else {
                        /* If the method is supported (GET or HEAD), send the request out
                         * on the socket. If the socket needs reopening, reopen it first.
                         * When the request is sent, call the function for receiving
@@ -409,16 +524,11 @@ pthread_main(void *arg)
                         */
                        if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
                                if (reopen)
-                                       sock = VSS_connect(adr_info);
+                                       sock = VSS_connect(addr_info);
                                reopen = 0;
 
-                               if (debug) {
-                                       fprintf(stderr, "%s ", df_m);
-                                       fprintf(stderr, "%s ", df_Uq);
-                                       fprintf(stderr, "%s ", df_H);
-                                       fprintf(stderr, "\n");
-                                       fprintf(stderr, "Host: ");
-                               }
+                               thread_log(1, "%s %s %s\n", df_m, df_Uq, df_H);
+
                                write(sock, df_m, strlen(df_m));
                                write(sock, " ", 1);
                                write(sock, df_Uq, strlen(df_Uq));
@@ -432,16 +542,12 @@ pthread_main(void *arg)
 
                                write(sock, "Host: ", 6);
                                if (df_Host) {
-                                       if (debug)
-                                               fprintf(stderr, df_Host);
+                                       thread_log(1, "Host: %s\n", df_Host);
                                        write(sock, df_Host, strlen(df_Host));
                                }
-                               if (debug)
-                                       fprintf(stderr, "\n");
                                write(sock, "\r\n", 2);
                                if (df_c) {
-                                       if (debug)
-                                               fprintf(stderr, "Connection: %s\n", df_c);
+                                       thread_log(1, "Connection: %s\n", df_c);
                                        write(sock, "Connection: ", 12);
                                        write(sock, df_c, strlen(df_c));
                                        write(sock, "\r\n", 2);
@@ -449,7 +555,7 @@ pthread_main(void *arg)
                                                reopen = 0;
                                }
                                if (debug)
-                                       fprintf(stderr, "\n");
+                                       thread_log(0, "\n");
                                write(sock, "\r\n", 2);
                                if (!reopen)
                                        reopen = receive_response(sock);
@@ -459,16 +565,15 @@ pthread_main(void *arg)
                }
 
                /* clean up */
-#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+               freez(msg->ptr);
+               freez(msg);
                freez(df_H);
                freez(df_Host);
                freez(df_Uq);
                freez(df_m);
                freez(df_c);
-#undef freez
                bogus = 0;
-       } while (1);            
-
+       }
        return (0);
 }
 
@@ -477,50 +582,25 @@ static int
 gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
     unsigned len, unsigned spec, const char *ptr)
 {
+       struct thread *thr;
        const char *end;
        struct message *msg;
-       int err;
-       
+
        (void)priv;
 
        end = ptr + len;
 
-       if (!(spec & VSL_S_CLIENT))
+       if (fd == 0 || !(spec & VSL_S_CLIENT))
                return (0);
 
-       //fprintf(stderr, "gen_traffic\n");
-
-       if (fd >= nthreads) {
-               struct thread **newthreads = threads;
-               size_t newnthreads = nthreads;
-
-               while (fd >= newnthreads)
-                       newnthreads += newnthreads + 1;
-               newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
-               assert(newthreads != NULL);
-               memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
-               threads = newthreads;
-               nthreads = newnthreads;
-       }
-       if (threads[fd] == NULL) {
-               threads[fd] = malloc(sizeof *threads[fd]);
-               assert(threads[fd] != NULL);
-               threads[fd]->mbox = malloc(sizeof (struct mailbox));
-               STAILQ_INIT(&threads[fd]->mbox->messages);
-               pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
-               pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
-               err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
-               if (err)
-                       fprintf(stderr, "thread creation failed\n");
-               fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
-       }
+       thread_log(2, "%d %s\n", fd, VSL_tags[tag]);
+       thr = thread_get(fd, replay_thread);
        msg = malloc(sizeof (struct message));
        msg->tag = tag;
-       msg->ptr = strdup(ptr);
        msg->len = len;
-       mailbox_put(threads[fd]->mbox, msg);
-       //fprintf(stderr, "message put\n");
-       
+       msg->ptr = strndup(ptr, len);
+       mailbox_put(&thr->mbox, msg);
+
        return 0;
 }
 
@@ -536,14 +616,14 @@ send_test_request(char *file, const char *address)
        char buf[2];
        char last = ' ';
        int sock, reopen = 1;
-       
-       adr_info = init_connection(address);
-       sock = VSS_connect(adr_info);
+
+       addr_info = init_connection(address);
+       sock = VSS_connect(addr_info);
        while (read(fd, buf, 1)) {
                write(sock, buf, 1);
-               fprintf(stderr, "%s", buf);
+               thread_log(0, "%s", buf);
                if (*buf == '\n' && last == '\n'){
-                       fprintf(stderr, "receive\n");
+                       thread_log(0, "receive\n");
                        reopen = receive_response(sock);
                }
                last = *buf;
@@ -581,7 +661,7 @@ main(int argc, char *argv[])
                        address = optarg;
                        break;
                case 'D':
-                       debug = 1;
+                       ++debug;
                        break;
                case 't':
                        /* This option is for testing only. The test file must contain
@@ -609,10 +689,10 @@ main(int argc, char *argv[])
        if (VSL_OpenLog(vd, NULL))
                exit(1);
 
-       adr_info = init_connection(address);
+       addr_info = init_connection(address);
 
        while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
                /* nothing */ ;
-
+       thread_close(0);
        exit(0);
 }