From 6e409ce10d134625626d1eddfd6152755ef1908d Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 27 Dec 2011 22:51:46 +0100 Subject: [PATCH] journald: implement sophisticated rate limiting --- Makefile.am | 1 + TODO | 2 + src/journal/journal-rate-limit.c | 273 +++++++++++++++++++++++++++++++ src/journal/journal-rate-limit.h | 34 ++++ src/journal/journald.c | 213 ++++++++++++++++++++++-- src/journal/sd-journal.h | 1 + 6 files changed, 512 insertions(+), 12 deletions(-) create mode 100644 src/journal/journal-rate-limit.c create mode 100644 src/journal/journal-rate-limit.h diff --git a/Makefile.am b/Makefile.am index 81b5c501..5fe67fd1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1042,6 +1042,7 @@ systemd_journald_SOURCES = \ src/journal/sd-journal.c \ src/journal/journal-file.c \ src/journal/lookup3.c \ + src/journal/journal-rate-limit.c \ src/sd-id128.c \ src/acl-util.c \ src/cgroup-util.c diff --git a/TODO b/TODO index 8daf79a7..02688ad9 100644 --- a/TODO +++ b/TODO @@ -21,6 +21,8 @@ Bugfixes: Features: +* logind: selinux is borked... + * logind: sends SessionNew on Lock()? * logind: allow showing logout dialog from system diff --git a/src/journal/journal-rate-limit.c b/src/journal/journal-rate-limit.c new file mode 100644 index 00000000..f69ab277 --- /dev/null +++ b/src/journal/journal-rate-limit.c @@ -0,0 +1,273 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see . +***/ + +#include +#include + +#include "journal-rate-limit.h" +#include "list.h" +#include "util.h" +#include "hashmap.h" + +#define POOLS_MAX 5 +#define BUCKETS_MAX 127 +#define GROUPS_MAX 2047 + +static const int priority_map[] = { + [LOG_EMERG] = 0, + [LOG_ALERT] = 0, + [LOG_CRIT] = 0, + [LOG_ERR] = 1, + [LOG_WARNING] = 2, + [LOG_NOTICE] = 3, + [LOG_INFO] = 3, + [LOG_DEBUG] = 4 +}; + +typedef struct JournalRateLimitPool JournalRateLimitPool; +typedef struct JournalRateLimitGroup JournalRateLimitGroup; + +struct JournalRateLimitPool { + usec_t begin; + unsigned num; + unsigned suppressed; +}; + +struct JournalRateLimitGroup { + JournalRateLimit *parent; + + char *id; + JournalRateLimitPool pools[POOLS_MAX]; + unsigned hash; + + LIST_FIELDS(JournalRateLimitGroup, bucket); + LIST_FIELDS(JournalRateLimitGroup, lru); +}; + +struct JournalRateLimit { + usec_t interval; + unsigned burst; + + JournalRateLimitGroup* buckets[BUCKETS_MAX]; + JournalRateLimitGroup *lru, *lru_tail; + + unsigned n_groups; +}; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst) { + JournalRateLimit *r; + + assert(interval > 0 || burst == 0); + + r = new0(JournalRateLimit, 1); + if (!r) + return NULL; + + r->interval = interval; + r->burst = burst; + + return r; +} + +static void journal_rate_limit_group_free(JournalRateLimitGroup *g) { + assert(g); + + if (g->parent) { + assert(g->parent->n_groups > 0); + + if (g->parent->lru_tail == g) + g->parent->lru_tail = g->lru_prev; + + LIST_REMOVE(JournalRateLimitGroup, lru, g->parent->lru, g); + LIST_REMOVE(JournalRateLimitGroup, bucket, g->parent->buckets[g->hash % BUCKETS_MAX], g); + + g->parent->n_groups --; + } + + free(g->id); + free(g); +} + +void journal_rate_limit_free(JournalRateLimit *r) { + assert(r); + + while (r->lru) + journal_rate_limit_group_free(r->lru); +} + +static bool journal_rate_limit_group_expired(JournalRateLimitGroup *g, usec_t ts) { + unsigned i; + + assert(g); + + for (i = 0; i < POOLS_MAX; i++) + if (g->pools[i].begin + g->parent->interval >= ts) + return false; + + return true; +} + +static void journal_rate_limit_vacuum(JournalRateLimit *r, usec_t ts) { + assert(r); + + /* Makes room for at least one new item, but drop all + * expored items too. */ + + while (r->n_groups >= GROUPS_MAX || + (r->lru_tail && journal_rate_limit_group_expired(r->lru_tail, ts))) + journal_rate_limit_group_free(r->lru_tail); +} + +static JournalRateLimitGroup* journal_rate_limit_group_new(JournalRateLimit *r, const char *id, usec_t ts) { + JournalRateLimitGroup *g; + + assert(r); + assert(id); + + g = new0(JournalRateLimitGroup, 1); + if (!g) + return NULL; + + g->id = strdup(id); + if (!g->id) + goto fail; + + g->hash = string_hash_func(g->id); + + journal_rate_limit_vacuum(r, ts); + + LIST_PREPEND(JournalRateLimitGroup, bucket, r->buckets[g->hash % BUCKETS_MAX], g); + LIST_PREPEND(JournalRateLimitGroup, lru, r->lru, g); + if (!g->lru_next) + r->lru_tail = g; + r->n_groups ++; + + g->parent = r; + return g; + +fail: + journal_rate_limit_group_free(g); + return NULL; +} + +static uint64_t u64log2(uint64_t n) { + unsigned r; + + if (n <= 1) + return 0; + + r = 0; + for (;;) { + n = n >> 1; + if (!n) + return r; + r++; + } +} + +static unsigned burst_modulate(unsigned burst, uint64_t available) { + unsigned k; + + /* Modulates the burst rate a bit with the amount of available + * disk space */ + + k = u64log2(available); + + /* 1MB */ + if (k <= 20) + return burst; + + burst = (burst * (k-20)) / 4; + + /* + * Example: + * + * <= 1MB = rate * 1 + * 16MB = rate * 2 + * 256MB = rate * 3 + * 4GB = rate * 4 + * 64GB = rate * 5 + * 1TB = rate * 6 + */ + + return burst; +} + +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available) { + unsigned h; + JournalRateLimitGroup *g; + JournalRateLimitPool *p; + unsigned burst; + usec_t ts; + + assert(id); + + if (!r) + return 1; + + if (r->interval == 0 || r->burst == 0) + return 1; + + burst = burst_modulate(r->burst, available); + + ts = now(CLOCK_MONOTONIC); + + h = string_hash_func(id); + g = r->buckets[h % BUCKETS_MAX]; + + LIST_FOREACH(bucket, g, g) + if (streq(g->id, id)) + break; + + if (!g) { + g = journal_rate_limit_group_new(r, id, ts); + if (!g) + return -ENOMEM; + } + + p = &g->pools[priority_map[priority]]; + + if (p->begin <= 0) { + p->suppressed = 0; + p->num = 1; + p->begin = ts; + return 1; + } + + if (p->begin + r->interval < ts) { + unsigned s; + + s = p->suppressed; + p->suppressed = 0; + p->num = 1; + p->begin = ts; + + return 1 + s; + } + + if (p->num <= burst) { + p->num++; + return 1; + } + + p->suppressed++; + return 0; +} diff --git a/src/journal/journal-rate-limit.h b/src/journal/journal-rate-limit.h new file mode 100644 index 00000000..2bbdd5f9 --- /dev/null +++ b/src/journal/journal-rate-limit.h @@ -0,0 +1,34 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalratelimithfoo +#define foojournalratelimithfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see . +***/ + +#include "macro.h" +#include "util.h" + +typedef struct JournalRateLimit JournalRateLimit; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst); +void journal_rate_limit_free(JournalRateLimit *r); +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available); + +#endif diff --git a/src/journal/journald.c b/src/journal/journald.c index 5d3956ea..9f753013 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "hashmap.h" #include "journal-file.h" @@ -38,6 +39,7 @@ #include "acl-util.h" #include "cgroup-util.h" #include "list.h" +#include "journal-rate-limit.h" #define USER_JOURNALS_MAX 1024 #define STDOUT_STREAMS_MAX 4096 @@ -60,6 +62,8 @@ typedef struct Server { char *buffer; size_t buffer_size; + JournalRateLimit *rate_limit; + JournalMetrics metrics; uint64_t max_use; bool compress; @@ -95,6 +99,76 @@ struct StdoutStream { LIST_FIELDS(StdoutStream, stdout_stream); }; +static uint64_t available_space(Server *s) { + char ids[33]; + sd_id128_t machine; + char *p; + const char *f; + struct statvfs ss; + uint64_t sum = 0, avail = 0, ss_avail = 0; + int r; + DIR *d; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return 0; + + if (s->system_journal) + f = "/var/log/journal/"; + else + f = "/run/log/journal/"; + + p = strappend(f, sd_id128_to_string(machine, ids)); + if (!p) + return 0; + + d = opendir(p); + free(p); + + if (!d) + return 0; + + if (fstatvfs(dirfd(d), &ss) < 0) + goto finish; + + for (;;) { + struct stat st; + struct dirent buf, *de; + int k; + + k = readdir_r(d, &buf, &de); + if (k != 0) { + r = -k; + goto finish; + } + + if (!de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize; + } + + avail = sum >= s->max_use ? 0 : s->max_use - sum; + + ss_avail = ss.f_bsize * ss.f_bavail; + + ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free; + + if (ss_avail < avail) + avail = ss_avail; + +finish: + closedir(d); + + return avail; +} + static void fix_perms(JournalFile *f, uid_t uid) { acl_t acl; acl_entry_t entry; @@ -254,7 +328,40 @@ static void server_vacuum(Server *s) { free(p); } -static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigned m, struct ucred *ucred, struct timeval *tv) { +static char *shortened_cgroup_path(pid_t pid) { + int r; + char *process_path, *init_path, *path; + + assert(pid > 0); + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path); + if (r < 0) + return NULL; + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path); + if (r < 0) { + free(process_path); + return NULL; + } + + if (streq(init_path, "/")) + init_path[0] = 0; + + if (startswith(process_path, init_path)) + path = process_path + strlen(init_path); + else + path = process_path; + + free(init_path); + + return path; +} + +static void dispatch_message_real(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv) { + char *pid = NULL, *uid = NULL, *gid = NULL, *source_time = NULL, *boot_id = NULL, *machine_id = NULL, *comm = NULL, *cmdline = NULL, *hostname = NULL, @@ -270,11 +377,8 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne bool vacuumed = false; assert(s); - assert(iovec || n == 0); - - if (n == 0) - return; - + assert(iovec); + assert(n > 0); assert(n + 13 <= m); if (ucred) { @@ -326,11 +430,12 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0) IOVEC_SET_STRING(iovec[n++], audit_loginuid); - r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, ucred->pid, &path); - if (r >= 0) { + path = shortened_cgroup_path(ucred->pid); + if (path) { cgroup = strappend("_SYSTEMD_CGROUP=", path); if (cgroup) IOVEC_SET_STRING(iovec[n++], cgroup); + free(path); } } @@ -400,6 +505,72 @@ retry: free(cgroup); } +static void dispatch_message(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv, + int priority) { + int rl; + char *path, *c; + + assert(s); + assert(iovec || n == 0); + + if (n == 0) + return; + + if (!ucred) + goto finish; + + path = shortened_cgroup_path(ucred->pid); + if (!path) + goto finish; + + /* example: /user/lennart/3/foobar + * /system/dbus.service/foobar + * + * So let's cut of everything past the third /, since that is + * wher user directories start */ + + c = strchr(path, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) + *c = 0; + } + } + + rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s)); + + if (rl == 0) { + free(path); + return; + } + + if (rl > 1) { + int j = 0; + char suppress_message[LINE_MAX]; + struct iovec suppress_iovec[15]; + + /* Write a suppression message if we suppressed something */ + + snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path); + char_array_0(suppress_message); + + IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5"); + IOVEC_SET_STRING(suppress_iovec[j++], suppress_message); + + dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL); + } + + free(path); + +finish: + dispatch_message_real(s, iovec, n, m, ucred, tv); +} + static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL; struct iovec iovec[16]; @@ -422,7 +593,7 @@ static void process_syslog_message(Server *s, const char *buf, struct ucred *ucr if (message) IOVEC_SET_STRING(iovec[n++], message); - dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv); + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK); free(message); free(syslog_facility); @@ -469,6 +640,7 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ unsigned n = 0, m = 0, j; const char *p; size_t remaining; + int priority = LOG_INFO; assert(s); assert(buffer || n == 0); @@ -489,8 +661,9 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ if (e == p) { /* Entry separator */ - dispatch_message(s, iovec, n, m, ucred, tv); + dispatch_message(s, iovec, n, m, ucred, tv, priority); n = 0; + priority = LOG_INFO; p++; remaining--; @@ -532,6 +705,15 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ iovec[n].iov_base = (char*) p; iovec[n].iov_len = e - p; n++; + + /* We need to determine the priority + * of this entry for the rate limiting + * logic */ + if (e - p == 10 && + memcmp(p, "PRIORITY=", 10) == 0 && + p[10] >= '0' && + p[10] <= '9') + priority = p[10] - '0'; } remaining -= (e - p) + 1; @@ -577,7 +759,7 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ } } - dispatch_message(s, iovec, n, m, ucred, tv); + dispatch_message(s, iovec, n, m, ucred, tv, priority); for (j = 0; j < n; j++) if (iovec[j].iov_base < buffer || @@ -630,7 +812,7 @@ static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) { n++; } - dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL); + dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority); if (s->tee_console) { int console; @@ -1378,6 +1560,10 @@ static int server_init(Server *s) { if (r < 0) return r; + s->rate_limit = journal_rate_limit_new(10*USEC_PER_SEC, 2); + if (!s->rate_limit) + return -ENOMEM; + return 0; } @@ -1413,6 +1599,9 @@ static void server_done(Server *s) { if (s->stdout_fd >= 0) close_nointr_nofail(s->stdout_fd); + + if (s->rate_limit) + journal_rate_limit_free(s->rate_limit); } int main(int argc, char *argv[]) { diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h index 0333db4a..7e2ef153 100644 --- a/src/journal/sd-journal.h +++ b/src/journal/sd-journal.h @@ -38,6 +38,7 @@ * - throttling * - cryptographic hash * - never access beyond fle size check + * - OR of matches is borked... */ /* Write to daemon */ -- 2.39.5