From 6c40351758834b2a3d25e5b7ecda0ce837ca5495 Mon Sep 17 00:00:00 2001 From: des Date: Mon, 30 Jun 2008 02:11:54 +0000 Subject: [PATCH] Old patch: greatly improve performance and memory usage by using per-thread pre-allocated buffers instead of malloc(). git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@2864 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- .../bin/varnishreplay/varnishreplay.c | 513 +++++++++--------- 1 file changed, 271 insertions(+), 242 deletions(-) diff --git a/varnish-cache/bin/varnishreplay/varnishreplay.c b/varnish-cache/bin/varnishreplay/varnishreplay.c index 73d3b29f..7b55c937 100644 --- a/varnish-cache/bin/varnishreplay/varnishreplay.c +++ b/varnish-cache/bin/varnishreplay/varnishreplay.c @@ -58,6 +58,37 @@ static struct vss_addr *addr_info; static int debug; +static int +isprefix(const char *str, const char *prefix, const char **next) +{ + + while (*str && *prefix && + tolower((int)*str) == tolower((int)*prefix)) + ++str, ++prefix; + if (*str && *str != ' ') + return (0); + if (next) { + while (*str && *str == ' ') + ++str; + *next = str; + } + return (1); +} + +#if 0 +static int +isequal(const char *str, const char *reference, const char *end) +{ + + while (str < end && *str && *reference && + tolower((int)*str) == tolower((int)*reference)) + ++str, ++reference; + if (str != end || *reference) + return (0); + return (1); +} +#endif + /* * mailbox toolkit */ @@ -69,6 +100,8 @@ struct message { VSTAILQ_ENTRY(message) list; }; +#define MAX_MAILBOX_SIZE 30 + struct mailbox { pthread_mutex_t lock; pthread_cond_t has_mail; @@ -136,11 +169,6 @@ mailbox_close(struct mailbox *mbox) * thread toolkit */ -struct thread { - pthread_t thread_id; - struct mailbox mbox; -}; - static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; static void @@ -161,9 +189,56 @@ thread_log(int lvl, int errcode, const char *fmt, ...) pthread_mutex_unlock(&log_mutex); } +struct thread { + pthread_t thread_id; + struct mailbox mbox; + + int sock; + + int fd; /* original fd from logs */ + + char *method; /* Request method*/ + char *proto; /* Protocol version */ + char *url; /* URL and query string */ + const char *conn; /* Connection info (keep-alive, close) */ + char *hdr[64]; /* Headers */ + int nhdr; /* Number of headers */ + int bogus; /* bogus request */ + + char arena[4096]; + int top; + char line[2048]; + char temp[2048]; +}; + static struct thread **threads; static size_t nthreads; +/* + * Clear thread state + */ +static void +thread_clear(struct thread *thr) +{ + + thr->method = thr->proto = thr->url = NULL; + thr->conn = NULL; + memset(&thr->hdr, 0, sizeof thr->hdr); + thr->nhdr = 0; + thr->bogus = 0; + memset(&thr->arena, 0, sizeof thr->arena); + thr->top = 0; + memset(&thr->line, 0, sizeof thr->line); + memset(&thr->temp, 0, sizeof thr->temp); + if (thr->sock != -1) + close(thr->sock); + thr->sock = -1; +} + +#define THREAD_FAIL ((struct thread *)-1) + +static pthread_attr_t thread_attr; + static struct thread * thread_get(int fd, void *(*thread_main)(void *)) { @@ -185,16 +260,23 @@ thread_get(int fd, void *(*thread_main)(void *)) if (threads[fd] == NULL) { threads[fd] = malloc(sizeof *threads[fd]); assert(threads[fd] != NULL); + threads[fd]->sock = -1; + thread_clear(threads[fd]); mailbox_create(&threads[fd]->mbox); - if (pthread_create(&threads[fd]->thread_id, NULL, + if (pthread_create(&threads[fd]->thread_id, &thread_attr, thread_main, threads[fd]) != 0) { thread_log(0, errno, "pthread_create()"); mailbox_destroy(&threads[fd]->mbox); freez(threads[fd]); + threads[fd] = THREAD_FAIL; + } else { + threads[fd]->fd = fd; + thread_log(0, 0, "thread %p:%d started", + (void *)threads[fd]->thread_id, fd); } - thread_log(0, 0, "thread %p started", - (void *)threads[fd]->thread_id); } + if (threads[fd] == THREAD_FAIL) + return (NULL); return (threads[fd]); } @@ -202,74 +284,56 @@ static void thread_close(int fd) { - assert(fd == 0 || fd < nthreads); - if (fd == 0) { - for (fd = 1; fd < nthreads; ++fd) + if (fd == -1) { + for (fd = 0; fd < nthreads; ++fd) thread_close(fd); return; } + assert(fd < nthreads); + if (threads[fd] == NULL) return; mailbox_close(&threads[fd]->mbox); pthread_join(threads[fd]->thread_id, NULL); thread_log(0, 0, "thread %p stopped", (void *)threads[fd]->thread_id); + thread_clear(threads[fd]); mailbox_destroy(&threads[fd]->mbox); freez(threads[fd]); } /* - * ... + * Allocate from thread arena */ - -static int -isprefix(const char *str, const char *prefix, const char *end, const char **next) -{ - - while (str < end && *str && *prefix && - tolower((int)*str) == tolower((int)*prefix)) - ++str, ++prefix; - if (*str && *str != ' ') - return (0); - if (next) { - while (str < end && *str && *str == ' ') - ++str; - *next = str; - } - return (1); -} - -#if 0 -static int -isequal(const char *str, const char *reference, const char *end) +static void * +thread_alloc(struct thread *thr, size_t len) { + void *ptr; - while (str < end && *str && *reference && - tolower((int)*str) == tolower((int)*reference)) - ++str, ++reference; - if (str != end || *reference) - return (0); - return (1); + if (sizeof thr->arena - thr->top < len) + return (NULL); + ptr = thr->arena + thr->top; + thr->top += len; + return (ptr); } -#endif /* * Returns a copy of the entire string with leading and trailing spaces * trimmed. */ static char * -trimline(const char *str, const char *end) +trimline(struct thread *thr, const char *str) { size_t len; char *p; /* skip leading space */ - while (str < end && *str && *str == ' ') + while (*str && *str == ' ') ++str; /* seek to end of string */ - for (len = 0; &str[len] < end && str[len]; ++len) + for (len = 0; str[len]; ++len) /* nothing */ ; /* trim trailing space */ @@ -277,87 +341,45 @@ trimline(const char *str, const char *end) --len; /* copy and return */ - p = malloc(len + 1); - assert(p != NULL); + if ((p = thread_alloc(thr, len + 1)) == NULL) + return (NULL); memcpy(p, str, len); p[len] = '\0'; return (p); } -/* Initiate a connection to
by resolving the - * hostname and returning a struct with necessary - * connection info. - */ -static struct vss_addr * -init_connection(const char *address) -{ - struct vss_addr **ta; - struct vss_addr *tap; - char *addr, *port; - int i, n; - - if (VSS_parse(address, &addr, &port) != 0) { - thread_log(0, 0, "Invalid address"); - exit(2); - } - n = VSS_resolve(addr, port, &ta); - free(addr); - free(port); - if (n == 0) { - thread_log(0, 0, "Could not connect to server"); - exit(2); - } - for (i = 1; i < n; ++i) { - free(ta[i]); - ta[i] = NULL; - } - tap = ta[0]; - free(ta); - - return (tap); -} - /* Read a line from the socket and return the number of bytes read. * After returning, line will point to the read bytes in memory. * A line is terminated by \r\n */ static int -read_line(char **line, int sock) +read_line(struct thread *thr) { - char *buf; - unsigned nbuf, lbuf; - int i; + int i, len; - lbuf = 4096; - buf = malloc(lbuf); - XXXAN(buf); - nbuf = 0; + len = 0; while (1) { - if (nbuf + 2 >= lbuf) { - lbuf += lbuf; - buf = realloc(buf, lbuf); - XXXAN(buf); + if (len + 2 > sizeof thr->line) { + thread_log(0, 0, "overflow"); + return (-1); } - i = read(sock, buf + nbuf, 1); + i = read(thr->sock, thr->line + len, 1); if (i < 0) { thread_log(0, errno, "read(%d, %p, %d)", - sock, buf + nbuf, 1); - free(buf); + thr->sock, thr->line + len, 1); return (-1); } - if (i == 0) { - buf[nbuf] = '\0'; + if (i == 0) break; - } - nbuf += i; - if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n') { - buf[nbuf-2] = '\0'; + len += i; + if (len >= 2 && thr->line[len - 2] == '\r' && + thr->line[len - 1] == '\n') { + len -= 2; break; } - } - *line = buf; - return (nbuf - 2); + thr->line[len] = '\0'; + return (len); } /* Read a block of data from the socket, and do nothing with it. @@ -365,36 +387,31 @@ read_line(char **line, int sock) * the number of bytes read. */ static int -read_block(int length, int sock) +read_block(struct thread *thr, int len) { - char *buf; - int len, n, nbuf; - - buf = malloc(length); - nbuf = 0; - while (nbuf < length) { - len = 2048 < length - nbuf ? 2048 : length - nbuf; - n = read(sock, buf + nbuf, len); - if (n < 0) { + int n, r, tot; + + for (tot = 0; tot < len; tot += r) { + n = len - tot; + if (n > sizeof thr->temp) + n = sizeof thr->temp; + r = read(thr->sock, thr->temp, n); + if (r < 0) { thread_log(0, errno, "read(%d, %p, %d)", - sock, buf + nbuf, len); - nbuf = -1; - break; + thr->sock, thr->temp, n); + return (-1); } - if (n == 0) + if (r == 0) break; - nbuf += n; } - free(buf); - return (nbuf); + return (tot); } /* Receive the response after sending a request. */ static int -receive_response(int sock) +receive_response(struct thread *thr) { - char *line, *end; const char *next; int line_len; long chunk_length, content_length; @@ -406,26 +423,22 @@ receive_response(int sock) /* Read header */ for (;;) { - line_len = read_line(&line, sock); + line_len = read_line(thr); if (line_len < 0) return (-1); - thread_log(2, 0, "< %.*s", line_len, line); - end = line + line_len; - if (line_len == 0) { - freez(line); + thread_log(2, 0, "< %.*s", line_len, thr->line); + if (line_len == 0) break; - } - if (strncmp(line, "HTTP", 4) == 0) { - sscanf(line, "%*s %d %*s\r\n", &status); + if (strncmp(thr->line, "HTTP", 4) == 0) { + sscanf(thr->line, "%*s %d %*s\r\n", &status); failed = (status != 200); - } else if (isprefix(line, "content-length:", end, &next)) { + } else if (isprefix(thr->line, "content-length:", &next)) { content_length = strtol(next, NULL, 10); - } else if (isprefix(line, "transfer-encoding:", end, &next)) { + } else if (isprefix(thr->line, "transfer-encoding:", &next)) { chunked = (strcasecmp(next, "chunked") == 0); - } else if (isprefix(line, "connection:", end, &next)) { + } else if (isprefix(thr->line, "connection:", &next)) { connclose = (strcasecmp(next, "close") == 0); } - freez(line); } thread_log(1, 0, "status: %d", status); @@ -435,34 +448,33 @@ receive_response(int sock) /* Chunked encoding, read size and bytes until no more */ thread_log(1, 0, "chunked encoding"); for (;;) { - if ((line_len = read_line(&line, sock)) < 0) + line_len = read_line(thr); + if (line_len < 0) return (-1); - end = line + line_len; /* read_line() guarantees null-termination */ - chunk_length = strtol(line, NULL, 16); - freez(line); + chunk_length = strtol(thr->line, NULL, 16); if (chunk_length == 0) break; - if ((n = read_block(chunk_length, sock)) < 0) + if ((n = read_block(thr, chunk_length)) < 0) return (-1); if (n < chunk_length) thread_log(0, 0, "short read: %d/%ld", n, chunk_length); thread_log(1, 0, "chunk length: %ld", chunk_length); thread_log(1, 0, "bytes read: %d", n); - /* trainling CR LF */ - if ((n = read_line(&line, sock)) < 0) + /* trailing CR LF */ + if ((n = read_line(thr)) < 0) return (-1); - freez(line); } /* trailing CR LF */ - n = read_line(&line, sock); - freez(line); + n = read_line(thr); + if (n < 0) + return (-1); } else if (content_length > 0) { /* Fixed body size, read content_length bytes */ thread_log(1, 0, "fixed length"); thread_log(1, 0, "content length: %ld", content_length); - if ((n = read_block(content_length, sock)) < 0) + if ((n = read_block(thr, content_length)) < 0) return (1); if (n < content_length) thread_log(0, 0, "short read: %d/%ld", @@ -487,62 +499,49 @@ replay_thread(void *arg) enum shmlogtag tag; size_t len; char *ptr; - const char *end, *next; - - char *df_method = NULL; /* Request method*/ - char *df_proto = NULL; /* Protocol version */ - char *df_url = NULL; /* URL and query string */ - char *df_conn = NULL; /* Connection info (keep-alive, close) */ - char **df_hdr = NULL; /* Headers */ - size_t df_hdrsz = 0; /* Size of df_hdr */ - int df_nhdr = 0; /* Number of headers */ - int bogus = 0; /* bogus request */ - int i; + const char *next; - int sock = -1, reopen = 1; + int i; - df_hdrsz = 16; - df_hdr = malloc(df_hdrsz * sizeof *df_hdr); + int reopen = 1; while ((msg = mailbox_get(&thr->mbox)) != NULL) { tag = msg->tag; len = msg->len; ptr = msg->ptr; - end = ptr + len; thread_log(2, 0, "%s(%s)", VSL_tags[tag], msg->ptr); switch (tag) { case SLT_RxRequest: - if (df_method != NULL) - bogus = 1; + if (thr->method != NULL) + thr->bogus = 1; else - df_method = trimline(ptr, end); + thr->method = trimline(thr, ptr); break; case SLT_RxURL: - if (df_url != NULL) - bogus = 1; + if (thr->url != NULL) + thr->bogus = 1; else - df_url = trimline(ptr, end); + thr->url = trimline(thr, ptr); break; case SLT_RxProtocol: - if (df_proto != NULL) - bogus = 1; + if (thr->proto != NULL) + thr->bogus = 1; else - df_proto = trimline(ptr, end); + thr->proto = trimline(thr, ptr); break; case SLT_RxHeader: - while (df_hdrsz <= df_nhdr) { - df_hdrsz *= 2; - df_hdr = realloc(df_hdr, df_hdrsz * sizeof *df_hdr); - XXXAN(df_hdr); + if (thr->nhdr >= sizeof thr->hdr / sizeof *thr->hdr) { + thr->bogus = 1; + } else { + thr->hdr[thr->nhdr++] = trimline(thr, ptr); + if (isprefix(ptr, "connection:", &next)) + thr->conn = trimline(thr, next); } - df_hdr[df_nhdr++] = trimline(ptr, end); - if (isprefix(ptr, "connection:", end, &next)) - df_conn = trimline(next, end); break; default: @@ -555,89 +554,81 @@ replay_thread(void *arg) if (tag != SLT_ReqEnd) continue; - if (!df_method || !df_url || !df_proto) { - bogus = 1; - } else if (strcmp(df_method, "GET") != 0 && strcmp(df_method, "HEAD") != 0) { - bogus = 1; - } else if (strcmp(df_proto, "HTTP/1.0") == 0) { - reopen = !(df_conn && strcasecmp(df_conn, "keep-alive") == 0); - } else if (strcmp(df_proto, "HTTP/1.1") == 0) { - reopen = (df_conn && strcasecmp(df_conn, "close") == 0); + if (!thr->method || !thr->url || !thr->proto) { + thr->bogus = 1; + } else if (strcmp(thr->method, "GET") != 0 && strcmp(thr->method, "HEAD") != 0) { + thr->bogus = 1; + } else if (strcmp(thr->proto, "HTTP/1.0") == 0) { + reopen = !(thr->conn && strcasecmp(thr->conn, "keep-alive") == 0); + } else if (strcmp(thr->proto, "HTTP/1.1") == 0) { + reopen = (thr->conn && strcasecmp(thr->conn, "close") == 0); } else { - bogus = 1; + thr->bogus = 1; } - if (bogus) { + if (thr->bogus) { thread_log(1, 0, "bogus"); - } else { - if (sock == -1) { - thread_log(1, 0, "open"); - sock = VSS_connect(addr_info); - assert(sock != -1); - } + goto clear; + } - thread_log(1, 0, "%s %s %s", df_method, df_url, df_proto); - - iov[0].iov_base = df_method; - iov[0].iov_len = strlen(df_method); - iov[2].iov_base = df_url; - iov[2].iov_len = strlen(df_url); - iov[4].iov_base = df_proto; - iov[4].iov_len = strlen(df_proto); - iov[1].iov_base = iov[3].iov_base = space; - iov[1].iov_len = iov[3].iov_len = 1; - iov[5].iov_base = crlf; - iov[5].iov_len = 2; - if (writev(sock, iov, 6) == -1) { - thread_log(0, errno, "writev()"); - goto close; + if (thr->sock == -1) { + for (;;) { + thread_log(1, 0, "sleeping before connect..."); + usleep(1000 * (thr->fd % 3001)); + if ((thr->sock = VSS_connect(addr_info)) >= 0) + break; + thread_log(0, errno, "connect failed"); } + } - for (i = 0; i < df_nhdr; ++i) { - thread_log(2, 0, "%d %s", i, df_hdr[i]); - iov[0].iov_base = df_hdr[i]; - iov[0].iov_len = strlen(df_hdr[i]); - iov[1].iov_base = crlf; - iov[1].iov_len = 2; - if (writev(sock, iov, 2) == -1) { - thread_log(0, errno, "writev()"); - goto close; - } - } - if (write(sock, crlf, 2) == -1) { + thread_log(1, 0, "%s %s %s", thr->method, thr->url, thr->proto); + + iov[0].iov_base = thr->method; + iov[0].iov_len = strlen(thr->method); + iov[2].iov_base = thr->url; + iov[2].iov_len = strlen(thr->url); + iov[4].iov_base = thr->proto; + iov[4].iov_len = strlen(thr->proto); + iov[1].iov_base = iov[3].iov_base = space; + iov[1].iov_len = iov[3].iov_len = 1; + iov[5].iov_base = crlf; + iov[5].iov_len = 2; + if (writev(thr->sock, iov, 6) == -1) { + thread_log(0, errno, "writev()"); + goto close; + } + + for (i = 0; i < thr->nhdr; ++i) { + thread_log(2, 0, "%d %s", i, thr->hdr[i]); + iov[0].iov_base = thr->hdr[i]; + iov[0].iov_len = strlen(thr->hdr[i]); + iov[1].iov_base = crlf; + iov[1].iov_len = 2; + if (writev(thr->sock, iov, 2) == -1) { thread_log(0, errno, "writev()"); goto close; } - if (receive_response(sock) || reopen) { + } + if (write(thr->sock, crlf, 2) == -1) { + thread_log(0, errno, "writev()"); + goto close; + } + if (receive_response(thr) || reopen) { close: - thread_log(1, 0, "close"); - close(sock); - sock = -1; - } + thread_log(1, 0, "close"); + assert(thr->sock != -1); + close(thr->sock); + thr->sock = -1; } + sleep(1); +clear: /* clean up */ - freez(df_method); - freez(df_url); - freez(df_proto); - freez(df_conn); - while (df_nhdr) { - --df_nhdr; - freez(df_hdr[df_nhdr]); - } - bogus = 0; + thread_clear(thr); } /* leftovers */ - freez(df_method); - freez(df_url); - freez(df_proto); - freez(df_conn); - while (df_nhdr) { - --df_nhdr; - freez(df_hdr[df_nhdr]); - } - freez(df_hdr); + thread_clear(thr); return (0); } @@ -659,6 +650,8 @@ gen_traffic(void *priv, enum shmlogtag tag, unsigned fd, thread_log(3, 0, "%d %s", fd, VSL_tags[tag]); thr = thread_get(fd, replay_thread); + if (thr == NULL) + return (0); msg = malloc(sizeof (struct message)); msg->tag = tag; msg->len = len; @@ -668,6 +661,39 @@ gen_traffic(void *priv, enum shmlogtag tag, unsigned fd, return (0); } +/* Initiate a connection to
by resolving the + * hostname and returning a struct with necessary + * connection info. + */ +static struct vss_addr * +init_connection(const char *address) +{ + struct vss_addr **ta; + struct vss_addr *tap; + char *addr, *port; + int i, n; + + if (VSS_parse(address, &addr, &port) != 0) { + thread_log(0, 0, "Invalid address"); + exit(2); + } + n = VSS_resolve(addr, port, &ta); + free(addr); + free(port); + if (n == 0) { + thread_log(0, 0, "Could not connect to server"); + exit(2); + } + for (i = 1; i < n; ++i) { + free(ta[i]); + ta[i] = NULL; + } + tap = ta[0]; + free(ta); + + return (tap); +} + /*--------------------------------------------------------------------*/ static void @@ -715,8 +741,11 @@ main(int argc, char *argv[]) signal(SIGPIPE, SIG_IGN); + pthread_attr_init(&thread_attr); + pthread_attr_setstacksize(&thread_attr, 16384); + while (VSL_Dispatch(vd, gen_traffic, NULL) == 0) /* nothing */ ; - thread_close(0); + thread_close(-1); exit(0); } -- 2.39.5