From: des Date: Wed, 4 Jul 2007 13:15:32 +0000 (+0000) Subject: Replace the old C fetcher with a Perl script which recursively retrieves the X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3405aa3f1319631ffb67ccc769fd409235574125;p=varnish Replace the old C fetcher with a Perl script which recursively retrieves the specified URLs using multiple paralell clients. git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@1641 d4fa192b-c00b-0410-8231-f00ffab90ce4 --- diff --git a/varnish-tools/fetcher/Makefile b/varnish-tools/fetcher/Makefile index eb9aea09..ec3d7a09 100644 --- a/varnish-tools/fetcher/Makefile +++ b/varnish-tools/fetcher/Makefile @@ -2,8 +2,10 @@ # $Id$ # -PROG = fetcher -WARNS ?= 6 -MAN = +PREFIX?= ${HOME} +BINDIR?= ${DESTDIR}${PREFIX}/bin -.include +all: + +install: + install -m 0755 fetcher.pl ${BINDIR}/fetcher diff --git a/varnish-tools/fetcher/fetcher.c b/varnish-tools/fetcher/fetcher.c deleted file mode 100644 index b23fccbd..00000000 --- a/varnish-tools/fetcher/fetcher.c +++ /dev/null @@ -1,220 +0,0 @@ -/* - * $Id$ - */ - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -static int random_order; -static int verbose; - -static char data[8192]; - -static const char * -read_line(FILE *f) -{ - static char *buf; - static size_t bufsz; - const char *line; - size_t len; - - if ((line = fgetln(f, &len)) == NULL) - return (NULL); - while (len && (line[len - 1] == '\r' || line[len - 1] == '\n')) - --len; - if (bufsz < len + 1) { - bufsz = len * 2; - if ((buf = realloc(buf, bufsz)) == NULL) - err(1, "realloc()"); - } - memcpy(buf, line, len); - buf[len] = '\0'; - if (verbose) - fprintf(stderr, "<<< [%s]\n", buf); - return (buf); -} - -static int -open_socket(const char *host, const char *port) -{ - struct addrinfo hints, *res; - int error, sd; - - /* connect to accelerator */ - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - if ((error = getaddrinfo(host, port, &hints, &res)) != 0) - errx(1, "%s", gai_strerror(error)); - if ((sd = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) - err(1, "socket()"); - if (connect(sd, res->ai_addr, res->ai_addrlen) < 0) - err(1, "connect()"); - return (sd); -} - -static const char HEAD[] = "HEAD"; -static const char GET[] = "GET"; - -static int reqcount; - -static void -send_request(FILE *f, const char *method, const char *host, const char *url) -{ - static const char *req_pattern = - "%s %s HTTP/1.1\r\n" - "Host: %s\r\n" - "Connection: Keep-Alive\r\n" - "\r\n"; - - reqcount++; - - /* send request */ - if (fprintf(f, req_pattern, method, url, host) < 0) - errx(1, "fprintf()"); - if (verbose) - fprintf(stderr, req_pattern, method, url, host); -} - -static int respcount; - -static void -receive_response(FILE *f, const char *method) -{ - const char *line; - size_t clen, rlen; - int code; - - respcount++; - - /* get response header */ - if ((line = read_line(f)) == NULL) - errx(1, "protocol error"); - if (sscanf(line, "HTTP/%*d.%*d %d %*s", &code) != 1) - errx(1, "protocol error"); - if (code != 200) - errx(1, "code %d", code); - - /* get content-length */ - clen = 0; - for (;;) { - if ((line = read_line(f)) == NULL) - errx(1, "protocol error"); - if (line[0] == '\0') - break; - sscanf(line, "Content-Length: %zu\n", &clen); - } - - /* eat contents */ - if (method == HEAD) - return; - while (clen > 0) { - rlen = clen > sizeof(data) ? sizeof(data) : clen; - rlen = fread(data, 1, rlen, f); - if (rlen == 0) - err(1, "fread()"); - if (verbose) - fprintf(stderr, "read %zu bytes\n", rlen); - clen -= rlen; - } -} - -static volatile sig_atomic_t got_sig; - -static void -handler(int sig) -{ - got_sig = sig; -} - -static void -usage(void) -{ - fprintf(stderr, "usage: fetcher [-h]\n"); - exit(1); -} - -#define MAX_CTR 100000 - -int -main(int argc, char *argv[]) -{ - struct timeval start, stop; - double elapsed; - char url[PATH_MAX]; - int opt, sd; - FILE *f; - - const char *method = GET; - const char *host = "varnish-test-1.linpro.no"; - const char *url_pattern = "/cgi-bin/recursor.pl?foo=%d"; - int ctr = MAX_CTR * 10; - int depth = 1; - - while ((opt = getopt(argc, argv, "c:d:hrv")) != -1) - switch (opt) { - case 'c': - ctr = atoi(optarg); - break; - case 'd': - depth = atoi(optarg); - break; - case 'h': - method = HEAD; - break; - case 'r': - random_order++; - break; - case 'v': - verbose++; - break; - default: - usage(); - } - - argc -= optind; - argv += optind; - - if (argc != 0) - usage(); - - if (random_order) - srandomdev(); - - sd = open_socket("varnish-test-2.linpro.no", "8080"); - if ((f = fdopen(sd, "w+")) == NULL) - err(1, "fdopen()"); - - got_sig = 0; - signal(SIGINT, handler); - signal(SIGTERM, handler); - gettimeofday(&start, NULL); - while (respcount < ctr && !got_sig) { - while (reqcount < ctr && reqcount - respcount < depth && !got_sig) { - int serial = (random_order ? random() : reqcount) % MAX_CTR; - if (!verbose && (random_order || (reqcount % 29) == 0)) - fprintf(stderr, "\r%d ", serial); - snprintf(url, sizeof url, url_pattern, serial); - send_request(f, method, host, url); - } - receive_response(f, method); - } - gettimeofday(&stop, NULL); - fclose(f); - - elapsed = (stop.tv_sec * 1000000.0 + stop.tv_usec) - - (start.tv_sec * 1000000.0 + start.tv_usec); - fprintf(stderr, "%d requests in %.3f seconds (%d rps)\n", - reqcount, elapsed / 1000000, (int)(reqcount / (elapsed / 1000000))); - - exit(got_sig); -} diff --git a/varnish-tools/fetcher/fetcher.pl b/varnish-tools/fetcher/fetcher.pl new file mode 100755 index 00000000..275b89e0 --- /dev/null +++ b/varnish-tools/fetcher/fetcher.pl @@ -0,0 +1,199 @@ +#!/usr/bin/perl -w +#- +# Copyright (c) 2007 Linpro AS +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer +# in this position and unchanged. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ +# + +package Varnish::Fetcher; + +use strict; +use Getopt::Long; +use IO::Handle; +use IO::Multiplex; +use LWP::UserAgent; +use Socket; +use URI; + +our %TODO; +our %DONE; +our %CHILD; +our $BUSY; + +sub new($$) { + my ($this, $mux, $fh) = @_; + my $class = ref($this) || $this; + + bless { + 'mux' => $mux, + 'fh' => $fh, + 'url' => undef, + }; +} + +sub run($$) { + my ($self, $s) = @_; + + my $ua = new LWP::UserAgent(); + for (;;) { + $0 = "[fetcher] idle"; + my $url = <$s>; + exit(0) + unless defined($url); + chomp($url); + die "no more work\n" + if $url eq "done"; + $0 = "[fetcher] requesting $url"; + print STDERR "Retrieving $url\n"; + my $resp = $ua->get($url); + $0 = "[fetcher] checking $url"; + if ($resp->header('Content-Type') =~ m/^text\//) { + my %urls = map { $_ => 1 } + ($resp->content =~ m/\b(?:href|src)=[\'\"](.+?)[\'\"]/g); + foreach (keys(%urls)) { + $s->write("add $_\n"); + } + } + $0 = "[fetcher] ready"; + $s->write("ready\n"); + } +} + +sub send_url($) { + my ($child) = @_; + + die "child busy\n" + if $child->{'url'}; + return undef + unless (keys(%TODO)); + my $url = (keys(%TODO))[0]; + delete $TODO{$url}; + $DONE{$url} = 1; + $child->{'url'} = $url; + $child->{'mux'}->write($child->{'fh'}, "$url\n"); + ++$BUSY; +} + +sub get_url($$) { + my ($child, $url) = @_; + + die "child not busy\n" + unless $child->{'url'}; + my $uri = URI->new_abs($1, $child->{'url'}); + $url = $uri->canonical; + if ($uri->scheme() ne 'http' || + $uri->host_port() ne URI->new($child->{'url'})->host_port()) { + print STDERR "Rejected $url\n"; + return; + } + return if $TODO{$url} || $DONE{$url}; + $TODO{$url} = 1; +} + +sub mux_input($$$$) { + my ($child, $mux, $fh, $input) = @_; + + die "unknown child\n" + unless $child; + + while ($$input =~ s/^(.*?)\n//) { + my $line = $1; + if ($line eq "ready") { + $$child{'url'} = ''; + --$BUSY; + $mux->endloop(); + } elsif ($line =~ m/^add (.*?)$/) { + get_url($child, $1); + } else { + die "can't grok [$line]\n"; + } + } +} + +sub fetcher($@) { + my ($n, @urls) = @_; + + my $mux = new IO::Multiplex; + + # prepare work queue + foreach my $url (@urls) { + $TODO{URI->new($url)->canonical} = 1; + } + + # start children + $BUSY = 0; + for (my $i = 0; $i < $n; ++$i) { + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC); + $s1->autoflush(1); + $s2->autoflush(1); + my $child = __PACKAGE__->new($mux, $s1); + my $pid = fork(); + last unless defined($pid); + if ($pid == 0) { + close($s1); + $child->run($s2); + die "not reachable"; + } else { + close($s2); + $CHILD{$i} = $child; + $mux->add($s1); + $mux->set_callback_object($child, $s1); + } + } + + # main loop + for (;;) { + foreach my $child (values(%CHILD)) { + $child->send_url() + unless $child->{'url'}; + } + last unless $BUSY; + $mux->loop(); + } + + # done + foreach my $child (values(%CHILD)) { + $mux->close($$child{'fh'}); + } +} + +sub usage() { + + print STDERR "usage: $0 [-j n] URL ...\n"; + exit(1); +} + +MAIN:{ + my $jobs = 1; + GetOptions("j|jobs=i" => \$jobs) + or usage(); + $jobs > 0 + or usage(); + @ARGV + or usage(); + fetcher($jobs, @ARGV); +}