static struct vss_addr *addr_info;
static int debug;
+static int
+isprefix(const char *str, const char *prefix, const char **next)
+{
+
+ while (*str && *prefix &&
+ tolower((int)*str) == tolower((int)*prefix))
+ ++str, ++prefix;
+ if (*str && *str != ' ')
+ return (0);
+ if (next) {
+ while (*str && *str == ' ')
+ ++str;
+ *next = str;
+ }
+ return (1);
+}
+
+#if 0
+static int
+isequal(const char *str, const char *reference, const char *end)
+{
+
+ while (str < end && *str && *reference &&
+ tolower((int)*str) == tolower((int)*reference))
+ ++str, ++reference;
+ if (str != end || *reference)
+ return (0);
+ return (1);
+}
+#endif
+
/*
* mailbox toolkit
*/
VSTAILQ_ENTRY(message) list;
};
+#define MAX_MAILBOX_SIZE 30
+
struct mailbox {
pthread_mutex_t lock;
pthread_cond_t has_mail;
* thread toolkit
*/
-struct thread {
- pthread_t thread_id;
- struct mailbox mbox;
-};
-
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
static void
pthread_mutex_unlock(&log_mutex);
}
+struct thread {
+ pthread_t thread_id;
+ struct mailbox mbox;
+
+ int sock;
+
+ int fd; /* original fd from logs */
+
+ char *method; /* Request method*/
+ char *proto; /* Protocol version */
+ char *url; /* URL and query string */
+ const char *conn; /* Connection info (keep-alive, close) */
+ char *hdr[64]; /* Headers */
+ int nhdr; /* Number of headers */
+ int bogus; /* bogus request */
+
+ char arena[4096];
+ int top;
+ char line[2048];
+ char temp[2048];
+};
+
static struct thread **threads;
static size_t nthreads;
+/*
+ * Clear thread state
+ */
+static void
+thread_clear(struct thread *thr)
+{
+
+ thr->method = thr->proto = thr->url = NULL;
+ thr->conn = NULL;
+ memset(&thr->hdr, 0, sizeof thr->hdr);
+ thr->nhdr = 0;
+ thr->bogus = 0;
+ memset(&thr->arena, 0, sizeof thr->arena);
+ thr->top = 0;
+ memset(&thr->line, 0, sizeof thr->line);
+ memset(&thr->temp, 0, sizeof thr->temp);
+ if (thr->sock != -1)
+ close(thr->sock);
+ thr->sock = -1;
+}
+
+#define THREAD_FAIL ((struct thread *)-1)
+
+static pthread_attr_t thread_attr;
+
static struct thread *
thread_get(int fd, void *(*thread_main)(void *))
{
if (threads[fd] == NULL) {
threads[fd] = malloc(sizeof *threads[fd]);
assert(threads[fd] != NULL);
+ threads[fd]->sock = -1;
+ thread_clear(threads[fd]);
mailbox_create(&threads[fd]->mbox);
- if (pthread_create(&threads[fd]->thread_id, NULL,
+ if (pthread_create(&threads[fd]->thread_id, &thread_attr,
thread_main, threads[fd]) != 0) {
thread_log(0, errno, "pthread_create()");
mailbox_destroy(&threads[fd]->mbox);
freez(threads[fd]);
+ threads[fd] = THREAD_FAIL;
+ } else {
+ threads[fd]->fd = fd;
+ thread_log(0, 0, "thread %p:%d started",
+ (void *)threads[fd]->thread_id, fd);
}
- thread_log(0, 0, "thread %p started",
- (void *)threads[fd]->thread_id);
}
+ if (threads[fd] == THREAD_FAIL)
+ return (NULL);
return (threads[fd]);
}
thread_close(int fd)
{
- assert(fd == 0 || fd < nthreads);
- if (fd == 0) {
- for (fd = 1; fd < nthreads; ++fd)
+ if (fd == -1) {
+ for (fd = 0; fd < nthreads; ++fd)
thread_close(fd);
return;
}
+ assert(fd < nthreads);
+
if (threads[fd] == NULL)
return;
mailbox_close(&threads[fd]->mbox);
pthread_join(threads[fd]->thread_id, NULL);
thread_log(0, 0, "thread %p stopped",
(void *)threads[fd]->thread_id);
+ thread_clear(threads[fd]);
mailbox_destroy(&threads[fd]->mbox);
freez(threads[fd]);
}
/*
- * ...
+ * Allocate from thread arena
*/
-
-static int
-isprefix(const char *str, const char *prefix, const char *end, const char **next)
-{
-
- while (str < end && *str && *prefix &&
- tolower((int)*str) == tolower((int)*prefix))
- ++str, ++prefix;
- if (*str && *str != ' ')
- return (0);
- if (next) {
- while (str < end && *str && *str == ' ')
- ++str;
- *next = str;
- }
- return (1);
-}
-
-#if 0
-static int
-isequal(const char *str, const char *reference, const char *end)
+static void *
+thread_alloc(struct thread *thr, size_t len)
{
+ void *ptr;
- while (str < end && *str && *reference &&
- tolower((int)*str) == tolower((int)*reference))
- ++str, ++reference;
- if (str != end || *reference)
- return (0);
- return (1);
+ if (sizeof thr->arena - thr->top < len)
+ return (NULL);
+ ptr = thr->arena + thr->top;
+ thr->top += len;
+ return (ptr);
}
-#endif
/*
* Returns a copy of the entire string with leading and trailing spaces
* trimmed.
*/
static char *
-trimline(const char *str, const char *end)
+trimline(struct thread *thr, const char *str)
{
size_t len;
char *p;
/* skip leading space */
- while (str < end && *str && *str == ' ')
+ while (*str && *str == ' ')
++str;
/* seek to end of string */
- for (len = 0; &str[len] < end && str[len]; ++len)
+ for (len = 0; str[len]; ++len)
/* nothing */ ;
/* trim trailing space */
--len;
/* copy and return */
- p = malloc(len + 1);
- assert(p != NULL);
+ if ((p = thread_alloc(thr, len + 1)) == NULL)
+ return (NULL);
memcpy(p, str, len);
p[len] = '\0';
return (p);
}
-/* Initiate a connection to <address> by resolving the
- * hostname and returning a struct with necessary
- * connection info.
- */
-static struct vss_addr *
-init_connection(const char *address)
-{
- struct vss_addr **ta;
- struct vss_addr *tap;
- char *addr, *port;
- int i, n;
-
- if (VSS_parse(address, &addr, &port) != 0) {
- thread_log(0, 0, "Invalid address");
- exit(2);
- }
- n = VSS_resolve(addr, port, &ta);
- free(addr);
- free(port);
- if (n == 0) {
- thread_log(0, 0, "Could not connect to server");
- exit(2);
- }
- for (i = 1; i < n; ++i) {
- free(ta[i]);
- ta[i] = NULL;
- }
- tap = ta[0];
- free(ta);
-
- return (tap);
-}
-
/* Read a line from the socket and return the number of bytes read.
* After returning, line will point to the read bytes in memory.
* A line is terminated by \r\n
*/
static int
-read_line(char **line, int sock)
+read_line(struct thread *thr)
{
- char *buf;
- unsigned nbuf, lbuf;
- int i;
+ int i, len;
- lbuf = 4096;
- buf = malloc(lbuf);
- XXXAN(buf);
- nbuf = 0;
+ len = 0;
while (1) {
- if (nbuf + 2 >= lbuf) {
- lbuf += lbuf;
- buf = realloc(buf, lbuf);
- XXXAN(buf);
+ if (len + 2 > sizeof thr->line) {
+ thread_log(0, 0, "overflow");
+ return (-1);
}
- i = read(sock, buf + nbuf, 1);
+ i = read(thr->sock, thr->line + len, 1);
if (i < 0) {
thread_log(0, errno, "read(%d, %p, %d)",
- sock, buf + nbuf, 1);
- free(buf);
+ thr->sock, thr->line + len, 1);
return (-1);
}
- if (i == 0) {
- buf[nbuf] = '\0';
+ if (i == 0)
break;
- }
- nbuf += i;
- if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n') {
- buf[nbuf-2] = '\0';
+ len += i;
+ if (len >= 2 && thr->line[len - 2] == '\r' &&
+ thr->line[len - 1] == '\n') {
+ len -= 2;
break;
}
-
}
- *line = buf;
- return (nbuf - 2);
+ thr->line[len] = '\0';
+ return (len);
}
/* Read a block of data from the socket, and do nothing with it.
* the number of bytes read.
*/
static int
-read_block(int length, int sock)
+read_block(struct thread *thr, int len)
{
- char *buf;
- int len, n, nbuf;
-
- buf = malloc(length);
- nbuf = 0;
- while (nbuf < length) {
- len = 2048 < length - nbuf ? 2048 : length - nbuf;
- n = read(sock, buf + nbuf, len);
- if (n < 0) {
+ int n, r, tot;
+
+ for (tot = 0; tot < len; tot += r) {
+ n = len - tot;
+ if (n > sizeof thr->temp)
+ n = sizeof thr->temp;
+ r = read(thr->sock, thr->temp, n);
+ if (r < 0) {
thread_log(0, errno, "read(%d, %p, %d)",
- sock, buf + nbuf, len);
- nbuf = -1;
- break;
+ thr->sock, thr->temp, n);
+ return (-1);
}
- if (n == 0)
+ if (r == 0)
break;
- nbuf += n;
}
- free(buf);
- return (nbuf);
+ return (tot);
}
/* Receive the response after sending a request.
*/
static int
-receive_response(int sock)
+receive_response(struct thread *thr)
{
- char *line, *end;
const char *next;
int line_len;
long chunk_length, content_length;
/* Read header */
for (;;) {
- line_len = read_line(&line, sock);
+ line_len = read_line(thr);
if (line_len < 0)
return (-1);
- thread_log(2, 0, "< %.*s", line_len, line);
- end = line + line_len;
- if (line_len == 0) {
- freez(line);
+ thread_log(2, 0, "< %.*s", line_len, thr->line);
+ if (line_len == 0)
break;
- }
- if (strncmp(line, "HTTP", 4) == 0) {
- sscanf(line, "%*s %d %*s\r\n", &status);
+ if (strncmp(thr->line, "HTTP", 4) == 0) {
+ sscanf(thr->line, "%*s %d %*s\r\n", &status);
failed = (status != 200);
- } else if (isprefix(line, "content-length:", end, &next)) {
+ } else if (isprefix(thr->line, "content-length:", &next)) {
content_length = strtol(next, NULL, 10);
- } else if (isprefix(line, "transfer-encoding:", end, &next)) {
+ } else if (isprefix(thr->line, "transfer-encoding:", &next)) {
chunked = (strcasecmp(next, "chunked") == 0);
- } else if (isprefix(line, "connection:", end, &next)) {
+ } else if (isprefix(thr->line, "connection:", &next)) {
connclose = (strcasecmp(next, "close") == 0);
}
- freez(line);
}
thread_log(1, 0, "status: %d", status);
/* Chunked encoding, read size and bytes until no more */
thread_log(1, 0, "chunked encoding");
for (;;) {
- if ((line_len = read_line(&line, sock)) < 0)
+ line_len = read_line(thr);
+ if (line_len < 0)
return (-1);
- end = line + line_len;
/* read_line() guarantees null-termination */
- chunk_length = strtol(line, NULL, 16);
- freez(line);
+ chunk_length = strtol(thr->line, NULL, 16);
if (chunk_length == 0)
break;
- if ((n = read_block(chunk_length, sock)) < 0)
+ if ((n = read_block(thr, chunk_length)) < 0)
return (-1);
if (n < chunk_length)
thread_log(0, 0, "short read: %d/%ld",
n, chunk_length);
thread_log(1, 0, "chunk length: %ld", chunk_length);
thread_log(1, 0, "bytes read: %d", n);
- /* trainling CR LF */
- if ((n = read_line(&line, sock)) < 0)
+ /* trailing CR LF */
+ if ((n = read_line(thr)) < 0)
return (-1);
- freez(line);
}
/* trailing CR LF */
- n = read_line(&line, sock);
- freez(line);
+ n = read_line(thr);
+ if (n < 0)
+ return (-1);
} else if (content_length > 0) {
/* Fixed body size, read content_length bytes */
thread_log(1, 0, "fixed length");
thread_log(1, 0, "content length: %ld", content_length);
- if ((n = read_block(content_length, sock)) < 0)
+ if ((n = read_block(thr, content_length)) < 0)
return (1);
if (n < content_length)
thread_log(0, 0, "short read: %d/%ld",
enum shmlogtag tag;
size_t len;
char *ptr;
- const char *end, *next;
-
- char *df_method = NULL; /* Request method*/
- char *df_proto = NULL; /* Protocol version */
- char *df_url = NULL; /* URL and query string */
- char *df_conn = NULL; /* Connection info (keep-alive, close) */
- char **df_hdr = NULL; /* Headers */
- size_t df_hdrsz = 0; /* Size of df_hdr */
- int df_nhdr = 0; /* Number of headers */
- int bogus = 0; /* bogus request */
- int i;
+ const char *next;
- int sock = -1, reopen = 1;
+ int i;
- df_hdrsz = 16;
- df_hdr = malloc(df_hdrsz * sizeof *df_hdr);
+ int reopen = 1;
while ((msg = mailbox_get(&thr->mbox)) != NULL) {
tag = msg->tag;
len = msg->len;
ptr = msg->ptr;
- end = ptr + len;
thread_log(2, 0, "%s(%s)", VSL_tags[tag], msg->ptr);
switch (tag) {
case SLT_RxRequest:
- if (df_method != NULL)
- bogus = 1;
+ if (thr->method != NULL)
+ thr->bogus = 1;
else
- df_method = trimline(ptr, end);
+ thr->method = trimline(thr, ptr);
break;
case SLT_RxURL:
- if (df_url != NULL)
- bogus = 1;
+ if (thr->url != NULL)
+ thr->bogus = 1;
else
- df_url = trimline(ptr, end);
+ thr->url = trimline(thr, ptr);
break;
case SLT_RxProtocol:
- if (df_proto != NULL)
- bogus = 1;
+ if (thr->proto != NULL)
+ thr->bogus = 1;
else
- df_proto = trimline(ptr, end);
+ thr->proto = trimline(thr, ptr);
break;
case SLT_RxHeader:
- while (df_hdrsz <= df_nhdr) {
- df_hdrsz *= 2;
- df_hdr = realloc(df_hdr, df_hdrsz * sizeof *df_hdr);
- XXXAN(df_hdr);
+ if (thr->nhdr >= sizeof thr->hdr / sizeof *thr->hdr) {
+ thr->bogus = 1;
+ } else {
+ thr->hdr[thr->nhdr++] = trimline(thr, ptr);
+ if (isprefix(ptr, "connection:", &next))
+ thr->conn = trimline(thr, next);
}
- df_hdr[df_nhdr++] = trimline(ptr, end);
- if (isprefix(ptr, "connection:", end, &next))
- df_conn = trimline(next, end);
break;
default:
if (tag != SLT_ReqEnd)
continue;
- if (!df_method || !df_url || !df_proto) {
- bogus = 1;
- } else if (strcmp(df_method, "GET") != 0 && strcmp(df_method, "HEAD") != 0) {
- bogus = 1;
- } else if (strcmp(df_proto, "HTTP/1.0") == 0) {
- reopen = !(df_conn && strcasecmp(df_conn, "keep-alive") == 0);
- } else if (strcmp(df_proto, "HTTP/1.1") == 0) {
- reopen = (df_conn && strcasecmp(df_conn, "close") == 0);
+ if (!thr->method || !thr->url || !thr->proto) {
+ thr->bogus = 1;
+ } else if (strcmp(thr->method, "GET") != 0 && strcmp(thr->method, "HEAD") != 0) {
+ thr->bogus = 1;
+ } else if (strcmp(thr->proto, "HTTP/1.0") == 0) {
+ reopen = !(thr->conn && strcasecmp(thr->conn, "keep-alive") == 0);
+ } else if (strcmp(thr->proto, "HTTP/1.1") == 0) {
+ reopen = (thr->conn && strcasecmp(thr->conn, "close") == 0);
} else {
- bogus = 1;
+ thr->bogus = 1;
}
- if (bogus) {
+ if (thr->bogus) {
thread_log(1, 0, "bogus");
- } else {
- if (sock == -1) {
- thread_log(1, 0, "open");
- sock = VSS_connect(addr_info);
- assert(sock != -1);
- }
+ goto clear;
+ }
- thread_log(1, 0, "%s %s %s", df_method, df_url, df_proto);
-
- iov[0].iov_base = df_method;
- iov[0].iov_len = strlen(df_method);
- iov[2].iov_base = df_url;
- iov[2].iov_len = strlen(df_url);
- iov[4].iov_base = df_proto;
- iov[4].iov_len = strlen(df_proto);
- iov[1].iov_base = iov[3].iov_base = space;
- iov[1].iov_len = iov[3].iov_len = 1;
- iov[5].iov_base = crlf;
- iov[5].iov_len = 2;
- if (writev(sock, iov, 6) == -1) {
- thread_log(0, errno, "writev()");
- goto close;
+ if (thr->sock == -1) {
+ for (;;) {
+ thread_log(1, 0, "sleeping before connect...");
+ usleep(1000 * (thr->fd % 3001));
+ if ((thr->sock = VSS_connect(addr_info)) >= 0)
+ break;
+ thread_log(0, errno, "connect failed");
}
+ }
- for (i = 0; i < df_nhdr; ++i) {
- thread_log(2, 0, "%d %s", i, df_hdr[i]);
- iov[0].iov_base = df_hdr[i];
- iov[0].iov_len = strlen(df_hdr[i]);
- iov[1].iov_base = crlf;
- iov[1].iov_len = 2;
- if (writev(sock, iov, 2) == -1) {
- thread_log(0, errno, "writev()");
- goto close;
- }
- }
- if (write(sock, crlf, 2) == -1) {
+ thread_log(1, 0, "%s %s %s", thr->method, thr->url, thr->proto);
+
+ iov[0].iov_base = thr->method;
+ iov[0].iov_len = strlen(thr->method);
+ iov[2].iov_base = thr->url;
+ iov[2].iov_len = strlen(thr->url);
+ iov[4].iov_base = thr->proto;
+ iov[4].iov_len = strlen(thr->proto);
+ iov[1].iov_base = iov[3].iov_base = space;
+ iov[1].iov_len = iov[3].iov_len = 1;
+ iov[5].iov_base = crlf;
+ iov[5].iov_len = 2;
+ if (writev(thr->sock, iov, 6) == -1) {
+ thread_log(0, errno, "writev()");
+ goto close;
+ }
+
+ for (i = 0; i < thr->nhdr; ++i) {
+ thread_log(2, 0, "%d %s", i, thr->hdr[i]);
+ iov[0].iov_base = thr->hdr[i];
+ iov[0].iov_len = strlen(thr->hdr[i]);
+ iov[1].iov_base = crlf;
+ iov[1].iov_len = 2;
+ if (writev(thr->sock, iov, 2) == -1) {
thread_log(0, errno, "writev()");
goto close;
}
- if (receive_response(sock) || reopen) {
+ }
+ if (write(thr->sock, crlf, 2) == -1) {
+ thread_log(0, errno, "writev()");
+ goto close;
+ }
+ if (receive_response(thr) || reopen) {
close:
- thread_log(1, 0, "close");
- close(sock);
- sock = -1;
- }
+ thread_log(1, 0, "close");
+ assert(thr->sock != -1);
+ close(thr->sock);
+ thr->sock = -1;
}
+ sleep(1);
+clear:
/* clean up */
- freez(df_method);
- freez(df_url);
- freez(df_proto);
- freez(df_conn);
- while (df_nhdr) {
- --df_nhdr;
- freez(df_hdr[df_nhdr]);
- }
- bogus = 0;
+ thread_clear(thr);
}
/* leftovers */
- freez(df_method);
- freez(df_url);
- freez(df_proto);
- freez(df_conn);
- while (df_nhdr) {
- --df_nhdr;
- freez(df_hdr[df_nhdr]);
- }
- freez(df_hdr);
+ thread_clear(thr);
return (0);
}
thread_log(3, 0, "%d %s", fd, VSL_tags[tag]);
thr = thread_get(fd, replay_thread);
+ if (thr == NULL)
+ return (0);
msg = malloc(sizeof (struct message));
msg->tag = tag;
msg->len = len;
return (0);
}
+/* Initiate a connection to <address> by resolving the
+ * hostname and returning a struct with necessary
+ * connection info.
+ */
+static struct vss_addr *
+init_connection(const char *address)
+{
+ struct vss_addr **ta;
+ struct vss_addr *tap;
+ char *addr, *port;
+ int i, n;
+
+ if (VSS_parse(address, &addr, &port) != 0) {
+ thread_log(0, 0, "Invalid address");
+ exit(2);
+ }
+ n = VSS_resolve(addr, port, &ta);
+ free(addr);
+ free(port);
+ if (n == 0) {
+ thread_log(0, 0, "Could not connect to server");
+ exit(2);
+ }
+ for (i = 1; i < n; ++i) {
+ free(ta[i]);
+ ta[i] = NULL;
+ }
+ tap = ta[0];
+ free(ta);
+
+ return (tap);
+}
+
/*--------------------------------------------------------------------*/
static void
signal(SIGPIPE, SIG_IGN);
+ pthread_attr_init(&thread_attr);
+ pthread_attr_setstacksize(&thread_attr, 16384);
+
while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
/* nothing */ ;
- thread_close(0);
+ thread_close(-1);
exit(0);
}