+++ /dev/null
-/*
- * $Id$
- */
-
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <sys/time.h>
-
-#include <err.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <fetch.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-static int random_order;
-static int verbose;
-
-static char data[8192];
-
-static const char *
-read_line(FILE *f)
-{
- static char *buf;
- static size_t bufsz;
- const char *line;
- size_t len;
-
- if ((line = fgetln(f, &len)) == NULL)
- return (NULL);
- while (len && (line[len - 1] == '\r' || line[len - 1] == '\n'))
- --len;
- if (bufsz < len + 1) {
- bufsz = len * 2;
- if ((buf = realloc(buf, bufsz)) == NULL)
- err(1, "realloc()");
- }
- memcpy(buf, line, len);
- buf[len] = '\0';
- if (verbose)
- fprintf(stderr, "<<< [%s]\n", buf);
- return (buf);
-}
-
-static int
-open_socket(const char *host, const char *port)
-{
- struct addrinfo hints, *res;
- int error, sd;
-
- /* connect to accelerator */
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
- if ((error = getaddrinfo(host, port, &hints, &res)) != 0)
- errx(1, "%s", gai_strerror(error));
- if ((sd = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0)
- err(1, "socket()");
- if (connect(sd, res->ai_addr, res->ai_addrlen) < 0)
- err(1, "connect()");
- return (sd);
-}
-
-static const char HEAD[] = "HEAD";
-static const char GET[] = "GET";
-
-static int reqcount;
-
-static void
-send_request(FILE *f, const char *method, const char *host, const char *url)
-{
- static const char *req_pattern =
- "%s %s HTTP/1.1\r\n"
- "Host: %s\r\n"
- "Connection: Keep-Alive\r\n"
- "\r\n";
-
- reqcount++;
-
- /* send request */
- if (fprintf(f, req_pattern, method, url, host) < 0)
- errx(1, "fprintf()");
- if (verbose)
- fprintf(stderr, req_pattern, method, url, host);
-}
-
-static int respcount;
-
-static void
-receive_response(FILE *f, const char *method)
-{
- const char *line;
- size_t clen, rlen;
- int code;
-
- respcount++;
-
- /* get response header */
- if ((line = read_line(f)) == NULL)
- errx(1, "protocol error");
- if (sscanf(line, "HTTP/%*d.%*d %d %*s", &code) != 1)
- errx(1, "protocol error");
- if (code != 200)
- errx(1, "code %d", code);
-
- /* get content-length */
- clen = 0;
- for (;;) {
- if ((line = read_line(f)) == NULL)
- errx(1, "protocol error");
- if (line[0] == '\0')
- break;
- sscanf(line, "Content-Length: %zu\n", &clen);
- }
-
- /* eat contents */
- if (method == HEAD)
- return;
- while (clen > 0) {
- rlen = clen > sizeof(data) ? sizeof(data) : clen;
- rlen = fread(data, 1, rlen, f);
- if (rlen == 0)
- err(1, "fread()");
- if (verbose)
- fprintf(stderr, "read %zu bytes\n", rlen);
- clen -= rlen;
- }
-}
-
-static volatile sig_atomic_t got_sig;
-
-static void
-handler(int sig)
-{
- got_sig = sig;
-}
-
-static void
-usage(void)
-{
- fprintf(stderr, "usage: fetcher [-h]\n");
- exit(1);
-}
-
-#define MAX_CTR 100000
-
-int
-main(int argc, char *argv[])
-{
- struct timeval start, stop;
- double elapsed;
- char url[PATH_MAX];
- int opt, sd;
- FILE *f;
-
- const char *method = GET;
- const char *host = "varnish-test-1.linpro.no";
- const char *url_pattern = "/cgi-bin/recursor.pl?foo=%d";
- int ctr = MAX_CTR * 10;
- int depth = 1;
-
- while ((opt = getopt(argc, argv, "c:d:hrv")) != -1)
- switch (opt) {
- case 'c':
- ctr = atoi(optarg);
- break;
- case 'd':
- depth = atoi(optarg);
- break;
- case 'h':
- method = HEAD;
- break;
- case 'r':
- random_order++;
- break;
- case 'v':
- verbose++;
- break;
- default:
- usage();
- }
-
- argc -= optind;
- argv += optind;
-
- if (argc != 0)
- usage();
-
- if (random_order)
- srandomdev();
-
- sd = open_socket("varnish-test-2.linpro.no", "8080");
- if ((f = fdopen(sd, "w+")) == NULL)
- err(1, "fdopen()");
-
- got_sig = 0;
- signal(SIGINT, handler);
- signal(SIGTERM, handler);
- gettimeofday(&start, NULL);
- while (respcount < ctr && !got_sig) {
- while (reqcount < ctr && reqcount - respcount < depth && !got_sig) {
- int serial = (random_order ? random() : reqcount) % MAX_CTR;
- if (!verbose && (random_order || (reqcount % 29) == 0))
- fprintf(stderr, "\r%d ", serial);
- snprintf(url, sizeof url, url_pattern, serial);
- send_request(f, method, host, url);
- }
- receive_response(f, method);
- }
- gettimeofday(&stop, NULL);
- fclose(f);
-
- elapsed = (stop.tv_sec * 1000000.0 + stop.tv_usec) -
- (start.tv_sec * 1000000.0 + start.tv_usec);
- fprintf(stderr, "%d requests in %.3f seconds (%d rps)\n",
- reqcount, elapsed / 1000000, (int)(reqcount / (elapsed / 1000000)));
-
- exit(got_sig);
-}
--- /dev/null
+#!/usr/bin/perl -w
+#-
+# Copyright (c) 2007 Linpro AS
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer
+# in this position and unchanged.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+#
+
+package Varnish::Fetcher;
+
+use strict;
+use Getopt::Long;
+use IO::Handle;
+use IO::Multiplex;
+use LWP::UserAgent;
+use Socket;
+use URI;
+
+our %TODO;
+our %DONE;
+our %CHILD;
+our $BUSY;
+
+sub new($$) {
+ my ($this, $mux, $fh) = @_;
+ my $class = ref($this) || $this;
+
+ bless {
+ 'mux' => $mux,
+ 'fh' => $fh,
+ 'url' => undef,
+ };
+}
+
+sub run($$) {
+ my ($self, $s) = @_;
+
+ my $ua = new LWP::UserAgent();
+ for (;;) {
+ $0 = "[fetcher] idle";
+ my $url = <$s>;
+ exit(0)
+ unless defined($url);
+ chomp($url);
+ die "no more work\n"
+ if $url eq "done";
+ $0 = "[fetcher] requesting $url";
+ print STDERR "Retrieving $url\n";
+ my $resp = $ua->get($url);
+ $0 = "[fetcher] checking $url";
+ if ($resp->header('Content-Type') =~ m/^text\//) {
+ my %urls = map { $_ => 1 }
+ ($resp->content =~ m/\b(?:href|src)=[\'\"](.+?)[\'\"]/g);
+ foreach (keys(%urls)) {
+ $s->write("add $_\n");
+ }
+ }
+ $0 = "[fetcher] ready";
+ $s->write("ready\n");
+ }
+}
+
+sub send_url($) {
+ my ($child) = @_;
+
+ die "child busy\n"
+ if $child->{'url'};
+ return undef
+ unless (keys(%TODO));
+ my $url = (keys(%TODO))[0];
+ delete $TODO{$url};
+ $DONE{$url} = 1;
+ $child->{'url'} = $url;
+ $child->{'mux'}->write($child->{'fh'}, "$url\n");
+ ++$BUSY;
+}
+
+sub get_url($$) {
+ my ($child, $url) = @_;
+
+ die "child not busy\n"
+ unless $child->{'url'};
+ my $uri = URI->new_abs($1, $child->{'url'});
+ $url = $uri->canonical;
+ if ($uri->scheme() ne 'http' ||
+ $uri->host_port() ne URI->new($child->{'url'})->host_port()) {
+ print STDERR "Rejected $url\n";
+ return;
+ }
+ return if $TODO{$url} || $DONE{$url};
+ $TODO{$url} = 1;
+}
+
+sub mux_input($$$$) {
+ my ($child, $mux, $fh, $input) = @_;
+
+ die "unknown child\n"
+ unless $child;
+
+ while ($$input =~ s/^(.*?)\n//) {
+ my $line = $1;
+ if ($line eq "ready") {
+ $$child{'url'} = '';
+ --$BUSY;
+ $mux->endloop();
+ } elsif ($line =~ m/^add (.*?)$/) {
+ get_url($child, $1);
+ } else {
+ die "can't grok [$line]\n";
+ }
+ }
+}
+
+sub fetcher($@) {
+ my ($n, @urls) = @_;
+
+ my $mux = new IO::Multiplex;
+
+ # prepare work queue
+ foreach my $url (@urls) {
+ $TODO{URI->new($url)->canonical} = 1;
+ }
+
+ # start children
+ $BUSY = 0;
+ for (my $i = 0; $i < $n; ++$i) {
+ my ($s1, $s2);
+ socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
+ $s1->autoflush(1);
+ $s2->autoflush(1);
+ my $child = __PACKAGE__->new($mux, $s1);
+ my $pid = fork();
+ last unless defined($pid);
+ if ($pid == 0) {
+ close($s1);
+ $child->run($s2);
+ die "not reachable";
+ } else {
+ close($s2);
+ $CHILD{$i} = $child;
+ $mux->add($s1);
+ $mux->set_callback_object($child, $s1);
+ }
+ }
+
+ # main loop
+ for (;;) {
+ foreach my $child (values(%CHILD)) {
+ $child->send_url()
+ unless $child->{'url'};
+ }
+ last unless $BUSY;
+ $mux->loop();
+ }
+
+ # done
+ foreach my $child (values(%CHILD)) {
+ $mux->close($$child{'fh'});
+ }
+}
+
+sub usage() {
+
+ print STDERR "usage: $0 [-j n] URL ...\n";
+ exit(1);
+}
+
+MAIN:{
+ my $jobs = 1;
+ GetOptions("j|jobs=i" => \$jobs)
+ or usage();
+ $jobs > 0
+ or usage();
+ @ARGV
+ or usage();
+ fetcher($jobs, @ARGV);
+}