]> err.no Git - varnish/commitdiff
Rework the binary heap, we use for expiry processing, to deal more
authorphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Fri, 14 Nov 2008 00:19:33 +0000 (00:19 +0000)
committerphk <phk@d4fa192b-c00b-0410-8231-f00ffab90ce4>
Fri, 14 Nov 2008 00:19:33 +0000 (00:19 +0000)
gracefully with large number of objects.

Previously we kept all objects in a single array which resultined
in increasingly infrequent but increasingly demanding calls to
calloc(3) with the consequent massive memory copies.  We also did
not release memory again if unused.

Now we stripe the array into rows of 64k objects each.

This number is a compromise between space wastage, max 1MB on a
64bit machine, and the desire to not add and delete rows all the
time.  With 64k objects in a row, even on a very busy server would
only add a new row every 5...10 seconds during ramp up.

Delete unused rows, but keep a hysteresis of an entire empty row
to avoid silly add-delete-add-delete-add-delete behaviour at row
boundaries.

Streamline some of the functions a bit.

Fixes #210

git-svn-id: svn+ssh://projects.linpro.no/svn/varnish/trunk@3390 d4fa192b-c00b-0410-8231-f00ffab90ce4

varnish-cache/lib/libvarnish/binary_heap.c

index 725f0fecce199d67fb0d3b09ad7e80a932f7a926..0330ac728937346b7144c279e3672bc1d3d6f23d 100644 (file)
@@ -32,8 +32,6 @@
  *
  * We use a malloc(3)/realloc(3) array to store the pointers using the
  * classical FORTRAN strategy.
- *
- * XXX: the array is not scaled back when items are deleted.
  */
 
 #include "config.h"
 #include "binary_heap.h"
 #include "libvarnish.h"
 
-/* Private definitions -----------------------------------------------*/
+/* Paramters ---------------------------------------------------------*/
+
+/*
+ * The number of elements in a row has to be a compromise between
+ * wasted space and number of memory allocations.
+ * With 64k objects per row, there will be at least 5...10 seconds
+ * between row additions on a very busy server.
+ * At the same time, the worst case amount of wasted memory is kept
+ * at a reasonable 1 MB -- two rows on 64bit system.
+ * Finally, but without practical significance: 16 bits should be
+ * easier for the compiler to optimize.
+ */
+#define ROW_SHIFT              16
 
-#define MIN_LENGTH             16
+/* Private definitions -----------------------------------------------*/
 
 #define ROOT_IDX               1
 
+#define ROW_WIDTH              (1 << ROW_SHIFT)
+
+/*lint -emacro(572, ROW) shift 0 >> by 16 */
+/*lint -emacro(835, ROW) 0 left of >> */
+/*lint -emacro(778, ROW) const >> evaluates to zero */
+#define ROW(b, n)              ((b)->array[(n) >> ROW_SHIFT])
+
+/*lint -emacro(835, A) 0 left of & */
+#define A(b, n)                        ROW(b, n)[(n) & (ROW_WIDTH - 1)]
+
 struct binheap {
        unsigned                magic;
 #define BINHEAP_MAGIC          0xf581581aU     /* from /dev/random */
        void                    *priv;
        binheap_cmp_t           *cmp;
        binheap_update_t        *update;
-       void                    **array;
+       void                    ***array;
+       unsigned                rows;
        unsigned                length;
        unsigned                next;
-       unsigned                granularity;
 };
 
 #define PARENT(u)      ((u) / 2)
+/*lint -emacro(835, CHILD) 0 right of + */
 #define CHILD(u,n)     ((u) * 2 + (n))
 
 /* Implementation ----------------------------------------------------*/
 
 static void
-binheap_update(const struct binheap *bh, unsigned u)
+binheap_addrow(struct binheap *bh)
 {
-       assert(bh->magic == BINHEAP_MAGIC);
-       assert(u < bh->next);
-       assert(bh->array[u] != NULL);
-       if (bh->update == NULL)
-               return;
-       bh->update(bh->priv, bh->array[u], u);
+       unsigned u;
+
+       /* First make sure we have space for another row */
+       if (&ROW(bh, bh->length) >= bh->array + bh->rows) {
+               u = bh->rows * 2;
+               bh->array = realloc(bh->array, sizeof(*bh->array) * u);
+               assert(bh->array != NULL);
+
+               /* NULL out new pointers */
+               while (bh->rows < u)
+                       bh->array[bh->rows++] = NULL;
+       }
+       assert(ROW(bh, bh->length) == NULL);
+       ROW(bh, bh->length) = malloc(sizeof(**bh->array) * ROW_WIDTH);
+       assert(ROW(bh, bh->length));
+       bh->length += ROW_WIDTH;
 }
 
 struct binheap *
@@ -90,14 +121,26 @@ binheap_new(void *priv, binheap_cmp_t *cmp_f, binheap_update_t *update_f)
        bh->cmp = cmp_f;
        bh->update = update_f;
        bh->next = ROOT_IDX;
-       bh->length = MIN_LENGTH;
-       bh->array = calloc(sizeof *bh->array, bh->length);
+       bh->rows = 16;          /* A tiny-ish number */
+       bh->array = calloc(sizeof *bh->array, bh->rows);
        assert(bh->array != NULL);
-       bh->granularity = getpagesize() / sizeof *bh->array;
+       binheap_addrow(bh);
+       A(bh, ROOT_IDX) = NULL;
        bh->magic = BINHEAP_MAGIC;
        return (bh);
 }
 
+static void
+binheap_update(const struct binheap *bh, unsigned u)
+{
+       assert(bh->magic == BINHEAP_MAGIC);
+       assert(u < bh->next);
+       assert(A(bh, u) != NULL);
+       if (bh->update == NULL)
+               return;
+       bh->update(bh->priv, A(bh, u), u);
+}
+
 static void
 binhead_swap(const struct binheap *bh, unsigned u, unsigned v)
 {
@@ -106,9 +149,9 @@ binhead_swap(const struct binheap *bh, unsigned u, unsigned v)
        assert(bh->magic == BINHEAP_MAGIC);
        assert(u < bh->next);
        assert(v < bh->next);
-       p = bh->array[u];
-       bh->array[u] = bh->array[v];
-       bh->array[v] = p;
+       p = A(bh, u);
+       A(bh, u) = A(bh, v);
+       A(bh, v) = p;
        binheap_update(bh, u);
        binheap_update(bh, v);
 }
@@ -121,11 +164,10 @@ binheap_trickleup(const struct binheap *bh, unsigned u)
        assert(bh->magic == BINHEAP_MAGIC);
        while (u > ROOT_IDX) {
                v = PARENT(u);
-               if (bh->cmp(bh->priv, bh->array[u], bh->array[v])) {
-                       binhead_swap(bh, u, v);
-                       u = v;
-               } else
+               if (!bh->cmp(bh->priv, A(bh, u), A(bh, v)))
                        break;
+               binhead_swap(bh, u, v);
+               u = v;
        }
        return (u);
 }
@@ -138,28 +180,15 @@ binheap_trickledown(const struct binheap *bh, unsigned u)
        assert(bh->magic == BINHEAP_MAGIC);
        while (1) {
                v1 = CHILD(u, 0);
-               v2 = CHILD(u, 1);
                if (v1 >= bh->next)
                        return;
-               if (v2 >= bh->next) {
-                       if (!bh->cmp(bh->priv, bh->array[u], bh->array[v1]))
-                               binhead_swap(bh, u, v1);
+               v2 = CHILD(u, 1);
+               if (v2 < bh->next && bh->cmp(bh->priv, A(bh, v2), A(bh, v1)))
+                       v1 = v2;
+               if (bh->cmp(bh->priv, A(bh, u), A(bh, v1))) 
                        return;
-               }
-               if (bh->cmp(bh->priv, bh->array[v1], bh->array[v2])) {
-                       if (!bh->cmp(bh->priv, bh->array[u], bh->array[v1])) {
-                               binhead_swap(bh, u, v1);
-                               u = v1;
-                               continue;
-                       }
-               } else {
-                       if (!bh->cmp(bh->priv, bh->array[u], bh->array[v2])) {
-                               binhead_swap(bh, u, v2);
-                               u = v2;
-                               continue;
-                       }
-               }
-               return;
+               binhead_swap(bh, u, v1);
+               u = v1;
        }
 }
 
@@ -171,18 +200,10 @@ binheap_insert(struct binheap *bh, void *p)
        assert(bh != NULL);
        assert(bh->magic == BINHEAP_MAGIC);
        assert(bh->length >= bh->next);
-       if (bh->length == bh->next) {
-               if (bh->length >= bh->granularity * 32)
-                       bh->length += bh->granularity * 32;
-               else if (bh->length > bh->granularity)
-                       bh->length += bh->granularity;
-               else
-                       bh->length += bh->length;
-               bh->array = realloc(bh->array, bh->length * sizeof *bh->array);
-               assert(bh->array != NULL);
-       }
+       if (bh->length == bh->next)
+               binheap_addrow(bh);
        u = bh->next++;
-       bh->array[u] = p;
+       A(bh, u) = p;
        binheap_update(bh, u);
        (void)binheap_trickleup(bh, u);
 }
@@ -193,11 +214,33 @@ binheap_root(const struct binheap *bh)
 
        assert(bh != NULL);
        assert(bh->magic == BINHEAP_MAGIC);
-       if (bh->next == ROOT_IDX)
-               return (NULL);
-       return (bh->array[ROOT_IDX]);
+       return (A(bh, ROOT_IDX));
 }
 
+/*
+ * It may seem counter-intuitive that we delete by replacement with
+ * the tail object. "That's almost certain to not belong there, in
+ * particular when we delete the root ?" is the typical reaction.
+ *
+ * If we tried to trickle up into the empty position, we would,
+ * eventually, end up with a hole in the bottom row, at which point
+ * we would move the tail object there.
+ * But there is no guarantee that the tail object would not need to
+ * trickle up from that position, in fact, it might be the new root
+ * of this half of the subtree.
+ * The total number of operations is guaranteed to be at least
+ * N{height} downward selections, because we have to get the hole
+ * all the way down, but in addition to that, we may get up to
+ * N{height}-1 upward trickles.
+ *
+ * When we fill the hole with the tail object, the worst case is
+ * that it trickles all the way down to become the tail object
+ * again.
+ * In other words worst case is N{height} downward trickles.
+ * But there is a pretty decent chance that it does not make
+ * it all the way down.
+ */
+
 void
 binheap_delete(struct binheap *bh, unsigned idx)
 {
@@ -207,18 +250,28 @@ binheap_delete(struct binheap *bh, unsigned idx)
        assert(bh->next > ROOT_IDX);
        assert(idx < bh->next);
        assert(idx > 0);
-       assert(bh->array[idx] != NULL);
-       bh->update(bh->priv, bh->array[idx], 0);
+       assert(A(bh, idx) != NULL);
+       bh->update(bh->priv, A(bh, idx), 0);
        if (idx == --bh->next) {
-               bh->array[bh->next] = NULL;
+               A(bh, bh->next) = NULL;
                return;
        }
-       bh->array[idx] = bh->array[bh->next];
-       bh->array[bh->next] = NULL;
+       A(bh, idx) = A(bh, bh->next);
+       A(bh, bh->next) = NULL;
        binheap_update(bh, idx);
        idx = binheap_trickleup(bh, idx);
        binheap_trickledown(bh, idx);
-       /* XXX: free part of array ? */
+
+       /*
+        * We keep a hysteresis of one full row before we start to
+        * return space to the OS to avoid silly behaviour around
+        * row boundaries.
+        */
+       if (bh->next + 2 * ROW_WIDTH <= bh->length) {
+               free(ROW(bh, bh->length - 1));
+               ROW(bh, bh->length - 1) = NULL;
+               bh->length -= ROW_WIDTH;
+       }
 }
 
 
@@ -226,7 +279,10 @@ binheap_delete(struct binheap *bh, unsigned idx)
 /* Test driver -------------------------------------------------------*/
 #include <stdio.h>
 
-#if 0
+#if 1
+
+#define N 23
+
 static int
 cmp(void *priv, void *a, void *b)
 {
@@ -247,14 +303,14 @@ dump(struct binheap *bh, const char *what, unsigned ptr)
        unsigned u, *up;
 
        printf("dump\n");
-       f = popen("dot -Tps >> /tmp/_.ps", "w");
+       f = popen("dot -Tps >> /tmp/_.ps 2>/dev/null", "w");
        assert(f != NULL);
        fprintf(f, "digraph binheap {\n");
        fprintf(f, "size=\"7,10\"\n");
        fprintf(f, "ptr [label=\"%s\"]\n", what);
        fprintf(f, "ptr -> node_%u\n", ptr);
        for (u = 1; u < bh->next; u++) {
-               up = bh->array[u];
+               up = A(bh, u);
                fprintf(f, "node_%u [label=\"%u\"];\n", u, *up);
                if (u > 0)
                        fprintf(f, "node_%u -> node_%u\n", PARENT(u), u);
@@ -263,7 +319,6 @@ dump(struct binheap *bh, const char *what, unsigned ptr)
        pclose(f);
 }
 
-#define N 31
 int
 main(int argc, char **argv)
 {
@@ -286,7 +341,7 @@ main(int argc, char **argv)
                        break;
                assert(*up >= lu);
                lu = *up;
-               u = random() % bh->next;
+               u = (random() % (bh->next - 1)) + 1;
                binheap_delete(bh, u);
                if (1)
                        dump(bh, "Delete", u);
@@ -301,7 +356,7 @@ struct foo {
 };
 
 #define M 1311191
-#define N 131
+#define N 1311
 
 struct foo ff[N];
 
@@ -332,8 +387,8 @@ chk(struct binheap *bh)
 
        for (u = 2; u < bh->next; u++) {
                v = PARENT(u);
-               fa = bh->array[u];
-               fb = bh->array[v];
+               fa = A(bh, u);
+               fb = A(bh, v);
                assert(fa->key > fb->key);
                continue;
                printf("[%2u/%2u] %10u > [%2u/%2u] %10u %s\n",
@@ -363,11 +418,17 @@ main(int argc, char **argv)
        for (u = 0; u < M; u++) {
                v = random() % N;
                if (ff[v].idx > 0) {
-                       printf("Delete [%u] %'u\n", v, ff[v].key);
+                       if (0)
+                               printf("Delete [%u] %'u\n", v, ff[v].key);
+                       else
+                               printf("-%u", v);
                        binheap_delete(bh, ff[v].idx);
                } else {
                        ff[v].key = random();
-                       printf("Insert [%u] %'u\n", v, ff[v].key);
+                       if (0) 
+                               printf("Insert [%u] %'u\n", v, ff[v].key);
+                       else
+                               printf("+%u", v);
                        binheap_insert(bh, &ff[v]);
                }
                chk(bh);