diff --git a/Makefile b/Makefile index 84c4473..d0a9018 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ SOURCES = \ src/bloom.c \ src/city.c \ src/histogram.c \ + src/hs.c \ src/ht.c \ src/http.c \ src/internal_sampler.c \ @@ -25,6 +26,7 @@ SOURCES = \ src/samplers/statsd-secure.c \ src/samplers/statsd.c \ src/server.c \ + src/set.c \ src/setproctitle.c \ src/slab.c \ src/utils.c diff --git a/README.md b/README.md index 4bd6de5..5b220ee 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Brubeck supports most of the metric types from statsd and many other implementat - `C` - Counters - `h` - Histograms - `ms` - Timers (in milliseconds) +- `s` - Sets Client-sent sampling rates are ignored. diff --git a/src/brubeck.h b/src/brubeck.h index fcabe86..f89ebb7 100644 --- a/src/brubeck.h +++ b/src/brubeck.h @@ -24,6 +24,10 @@ #define MAX_ADDR 256 typedef double value_t; +typedef union { + value_t n; + char *s; +} sample_value_t; typedef uint64_t hash_t; struct brubeck_server; @@ -34,6 +38,8 @@ struct brubeck_metric; #include "utils.h" #include "slab.h" #include "histogram.h" +#include "hs.h" +#include "set.h" #include "metric.h" #include "sampler.h" #include "backend.h" diff --git a/src/hs.c b/src/hs.c new file mode 100644 index 0000000..92b9667 --- /dev/null +++ b/src/hs.c @@ -0,0 +1,54 @@ +#include + +#include "brubeck.h" +#include "../vendor/ck/src/ck_ht_hash.h" +#include "ck_malloc.h" + +static unsigned long +hs_hash(const void *object, unsigned long seed) +{ + const char *c = object; + return (unsigned long)MurmurHash64A(c, strlen(c), seed); +} + +static bool +hs_compare(const void *previous, const void *compare) +{ + return strcmp(previous, compare) == 0; +} + +brubeck_hashset_t * +brubeck_hashset_new(const uint64_t size) +{ + brubeck_hashset_t *hs = xmalloc(sizeof(brubeck_hashset_t)); + if (!ck_hs_init(hs, CK_HS_MODE_DIRECT, hs_hash, hs_compare, &ALLOCATOR, + (uint64_t)size, 0xDD15EA5E)) { + free(hs); + return NULL; + } + + return hs; +} + +void +brubeck_hashset_free(brubeck_hashset_t *hs) +{ + /* no-op */ +} + +bool +brubeck_hashset_add(brubeck_hashset_t *hs, const char *key) { + if(NULL != ck_hs_get(hs, CK_HS_HASH(hs, hs_hash, key), key)) + return true; + + return ck_hs_put(hs, CK_HS_HASH(hs, hs_hash, key), xstrdup(key)); +} + +bool +brubeck_hashset_clear(brubeck_hashset_t *hs) { + ck_hs_iterator_t iterator = CK_HS_ITERATOR_INITIALIZER; + void *key; + while(ck_hs_next(hs, &iterator, &key)) + free(key); + return ck_hs_reset(hs); +} diff --git a/src/hs.h b/src/hs.h new file mode 100644 index 0000000..77cbea6 --- /dev/null +++ b/src/hs.h @@ -0,0 +1,14 @@ +#ifndef _HS_H_ +#define _HS_H_ + +#include +#include "ck_hs.h" + +typedef struct ck_hs brubeck_hashset_t; + +brubeck_hashset_t * brubeck_hashset_new(const uint64_t size); +void brubeck_hashset_free(brubeck_hashset_t *hs); +bool brubeck_hashset_add(brubeck_hashset_t *hs, const char *key); +bool brubeck_hashset_clear(brubeck_hashset_t *hs); + +#endif diff --git a/src/ht.c b/src/ht.c index 890696b..823b600 100644 --- a/src/ht.c +++ b/src/ht.c @@ -21,7 +21,7 @@ ht_free(void *p, size_t b, bool r) free(p); } -static struct ck_malloc ALLOCATOR = { +struct ck_malloc ALLOCATOR = { .malloc = ht_malloc, .free = ht_free }; diff --git a/src/ht.h b/src/ht.h index 0881c3d..d228141 100644 --- a/src/ht.h +++ b/src/ht.h @@ -3,6 +3,7 @@ #include +extern struct ck_malloc ALLOCATOR; struct brubeck_metric; typedef struct brubeck_hashtable_t brubeck_hashtable_t; diff --git a/src/http.c b/src/http.c index adf8935..6b7c561 100644 --- a/src/http.c +++ b/src/http.c @@ -27,7 +27,7 @@ static struct MHD_Response * send_metric(struct brubeck_server *server, const char *url) { static const char *metric_types[] = { - "gauge", "meter", "counter", "histogram", "timer", "internal" + "gauge", "meter", "counter", "histogram", "timer", "set", "internal" }; static const char *expire_status[] = { "disabled", "inactive", "active" diff --git a/src/metric.c b/src/metric.c index f34a239..ef691f2 100644 --- a/src/metric.c +++ b/src/metric.c @@ -22,7 +22,7 @@ new_metric(struct brubeck_server *server, const char *key, size_t key_len, uint8 return metric; } -typedef void (*mt_prototype_record)(struct brubeck_metric *, value_t); +typedef void (*mt_prototype_record)(struct brubeck_metric *, sample_value_t); typedef void (*mt_prototype_sample)(struct brubeck_metric *, brubeck_sample_cb, void *); @@ -32,11 +32,11 @@ typedef void (*mt_prototype_sample)(struct brubeck_metric *, brubeck_sample_cb, * ALLOC: mt + 4 bytes *********************************************/ static void -gauge__record(struct brubeck_metric *metric, value_t value) +gauge__record(struct brubeck_metric *metric, sample_value_t value) { pthread_spin_lock(&metric->lock); { - metric->as.gauge.value = value; + metric->as.gauge.value = value.n; } pthread_spin_unlock(&metric->lock); } @@ -62,11 +62,11 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa * ALLOC: mt + 4 *********************************************/ static void -meter__record(struct brubeck_metric *metric, value_t value) +meter__record(struct brubeck_metric *metric, sample_value_t value) { pthread_spin_lock(&metric->lock); { - metric->as.meter.value += value; + metric->as.meter.value += value.n; } pthread_spin_unlock(&metric->lock); } @@ -93,19 +93,19 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa * ALLOC: mt + 4 + 4 + 4 *********************************************/ static void -counter__record(struct brubeck_metric *metric, value_t value) +counter__record(struct brubeck_metric *metric, sample_value_t value) { pthread_spin_lock(&metric->lock); { if (metric->as.counter.previous > 0.0) { - value_t diff = (value >= metric->as.counter.previous) ? - (value - metric->as.counter.previous) : - (value); + value_t diff = (value.n >= metric->as.counter.previous) ? + (value.n - metric->as.counter.previous) : + (value.n); metric->as.counter.value += diff; } - metric->as.counter.previous = value; + metric->as.counter.previous = value.n; } pthread_spin_unlock(&metric->lock); } @@ -132,11 +132,11 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o * ALLOC: mt + 16 + 4 *********************************************/ static void -histogram__record(struct brubeck_metric *metric, value_t value) +histogram__record(struct brubeck_metric *metric, sample_value_t value) { pthread_spin_lock(&metric->lock); { - brubeck_histo_push(&metric->as.histogram, value); + brubeck_histo_push(&metric->as.histogram, value.n); } pthread_spin_unlock(&metric->lock); } @@ -204,6 +204,39 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void } } + +/********************************************* + * Set + * + * ALLOC: ? + *********************************************/ +static void +set__record(struct brubeck_metric *metric, sample_value_t value) +{ + pthread_spin_lock(&metric->lock); + { + brubeck_set_add(metric, value.s); + } + pthread_spin_unlock(&metric->lock); +} + +static void +set__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opaque) +{ + value_t value = 0.0; + + pthread_spin_lock(&metric->lock); + { + if(NULL != metric->as.set) { + value = brubeck_set_size(metric->as.set); + brubeck_set_clear(metric->as.set); + } + } + pthread_spin_unlock(&metric->lock); + + sample(metric->key, value, opaque); +} + /********************************************************/ static struct brubeck_metric__proto { @@ -240,6 +273,12 @@ static struct brubeck_metric__proto { &histogram__sample }, + /* Set */ + { + &set__record, + &set__sample + }, + /* Internal -- used for sampling brubeck itself */ { NULL, /* recorded manually */ @@ -252,7 +291,7 @@ void brubeck_metric_sample(struct brubeck_metric *metric, brubeck_sample_cb cb, _prototypes[metric->type].sample(metric, cb, backend); } -void brubeck_metric_record(struct brubeck_metric *metric, value_t value) +void brubeck_metric_record(struct brubeck_metric *metric, sample_value_t value) { _prototypes[metric->type].record(metric, value); } diff --git a/src/metric.h b/src/metric.h index 1be7fed..877c381 100644 --- a/src/metric.h +++ b/src/metric.h @@ -7,6 +7,7 @@ enum brubeck_metric_t { BRUBECK_MT_COUNTER, /** C */ BRUBECK_MT_HISTO, /** h */ BRUBECK_MT_TIMER, /** ms */ + BRUBECK_MT_SET, /** s */ BRUBECK_MT_INTERNAL_STATS }; @@ -39,6 +40,7 @@ struct brubeck_metric { struct { value_t value, previous; } counter; + brubeck_hashset_t *set; struct brubeck_histo histogram; void *other; } as; @@ -52,7 +54,7 @@ typedef void (*brubeck_sample_cb)( void *backend); void brubeck_metric_sample(struct brubeck_metric *metric, brubeck_sample_cb cb, void *backend); -void brubeck_metric_record(struct brubeck_metric *metric, value_t value); +void brubeck_metric_record(struct brubeck_metric *metric, sample_value_t value); struct brubeck_metric *brubeck_metric_new(struct brubeck_server *server, const char *, size_t, uint8_t); struct brubeck_metric *brubeck_metric_find(struct brubeck_server *server, const char *, size_t, uint8_t); diff --git a/src/samplers/statsd.c b/src/samplers/statsd.c index 622156b..e4f55d9 100644 --- a/src/samplers/statsd.c +++ b/src/samplers/statsd.c @@ -129,6 +129,7 @@ static void statsd_run_recvmsg(struct brubeck_statsd *statsd, int sock) int brubeck_statsd_msg_parse(struct brubeck_statsd_msg *msg, char *buffer, size_t length) { + char *start; char *end = buffer + length; *end = '\0'; @@ -158,18 +159,82 @@ int brubeck_statsd_msg_parse(struct brubeck_statsd_msg *msg, char *buffer, size_ return -1; } + /** + * Message type: one or two char identifier with the + * message type. Valid values: g, c, C, h, ms, s + * + * gaugor:333|g + * ^ + */ + { + msg->type = -1; + start = buffer; + + while(*buffer != '|') { + if(*buffer == '\0' || *buffer == '\n') + return -1; + ++buffer; + } + *buffer++ = '\0'; // null-terminate value + + switch (*buffer) { + case 'g': msg->type = BRUBECK_MT_GAUGE; break; + case 'c': msg->type = BRUBECK_MT_METER; break; + case 'C': msg->type = BRUBECK_MT_COUNTER; break; + case 'h': msg->type = BRUBECK_MT_HISTO; break; + case 's': msg->type = BRUBECK_MT_SET; break; + case 'm': + ++buffer; + if (*buffer == 's') { + msg->type = BRUBECK_MT_TIMER; + break; + } + + default: + return -1; + } + if(-1 == msg->type) + return -1; + } + + /** + * Trailing bytes: data appended at the end of the message. + * This is stored verbatim and will be parsed when processing + * the specific message type. This is optional. + * + * gorets:1|c|@0.1 + * ^^^^---- + */ + { + buffer++; + + if (buffer[0] == '\0' || (buffer[0] == '\n' && buffer[1] == '\0')) { + msg->trail = NULL; + } else if (*buffer == '@' || *buffer == '|') { + msg->trail = buffer; + } else { + return -1; + } + } + + /////////////////////////////////////////// + /** * Message value: the numeric value between ':' and '|'. * This is already converted to an integer. * + * TODO: support non-numeric values for set metrics + * * gaugor:333|g * ^^^ */ - { + if(BRUBECK_MT_SET == msg->type) { + msg->value.s = start; + } else { int negative = 0; - char *start = buffer; + buffer = start; - msg->value = 0.0; + msg->value.n = 0.0; if (*buffer == '-') { ++buffer; @@ -177,7 +242,7 @@ int brubeck_statsd_msg_parse(struct brubeck_statsd_msg *msg, char *buffer, size_ } while (*buffer >= '0' && *buffer <= '9') { - msg->value = (msg->value * 10.0) + (*buffer - '0'); + msg->value.n = (msg->value.n * 10.0) + (*buffer - '0'); ++buffer; } @@ -191,70 +256,20 @@ int brubeck_statsd_msg_parse(struct brubeck_statsd_msg *msg, char *buffer, size_ n += 1.0; } - msg->value += f / pow(10.0, n); + msg->value.n += f / pow(10.0, n); } if (negative) - msg->value = -msg->value; + msg->value.n = -msg->value.n; if (unlikely(*buffer == 'e')) { - msg->value = strtod(start, &buffer); + msg->value.n = strtod(start, &buffer); } - if (*buffer != '|') + if (*buffer != '\0') return -1; - - buffer++; - } - - /** - * Message type: one or two char identifier with the - * message type. Valid values: g, c, C, h, ms - * - * gaugor:333|g - * ^ - */ - { - switch (*buffer) { - case 'g': msg->type = BRUBECK_MT_GAUGE; break; - case 'c': msg->type = BRUBECK_MT_METER; break; - case 'C': msg->type = BRUBECK_MT_COUNTER; break; - case 'h': msg->type = BRUBECK_MT_HISTO; break; - case 'm': - ++buffer; - if (*buffer == 's') { - msg->type = BRUBECK_MT_TIMER; - break; - } - - default: - return -1; - } - } - - /** - * Trailing bytes: data appended at the end of the message. - * This is stored verbatim and will be parsed when processing - * the specific message type. This is optional. - * - * gorets:1|c|@0.1 - * ^^^^---- - */ - { - buffer++; - - if (buffer[0] == '\0' || (buffer[0] == '\n' && buffer[1] == '\0')) { - msg->trail = NULL; - return 0; - } - - if (*buffer == '@' || *buffer == '|') { - msg->trail = buffer; - return 0; - } - - return -1; } + return 0; } static void *statsd__thread(void *_in) diff --git a/src/samplers/statsd.h b/src/samplers/statsd.h index 3b7e3d3..04eb1b3 100644 --- a/src/samplers/statsd.h +++ b/src/samplers/statsd.h @@ -7,7 +7,7 @@ struct brubeck_statsd_msg { char *key; /* The key of the message, NULL terminated */ uint16_t key_len; /* length of the key */ uint16_t type; /* type of the messaged, as a brubeck_mt_t */ - value_t value; /* integer value of the message */ + sample_value_t value; /* value of the message */ char *trail; /* Any data following the 'key:value|type' construct, NULL terminated*/ }; diff --git a/src/server.c b/src/server.c index 0584697..1f625a4 100644 --- a/src/server.c +++ b/src/server.c @@ -76,7 +76,7 @@ expire_metric(struct brubeck_metric *mt, void *_) static void dump_metric(struct brubeck_metric *mt, void *out_file) { - static const char *METRIC_NAMES[] = {"g", "c", "C", "h", "ms", "internal"}; + static const char *METRIC_NAMES[] = {"g", "c", "C", "h", "ms", "s", "internal"}; fprintf((FILE *)out_file, "%s|%s\n", mt->key, METRIC_NAMES[mt->type]); } diff --git a/src/set.c b/src/set.c new file mode 100644 index 0000000..d517032 --- /dev/null +++ b/src/set.c @@ -0,0 +1,23 @@ +#include "brubeck.h" + +#define SET_INIT_SIZE 16 + +void brubeck_set_add(struct brubeck_metric *metric, const char *key) { + if(NULL == metric->as.set) + metric->as.set = brubeck_hashset_new(SET_INIT_SIZE); + + if(NULL != metric->as.set) + brubeck_hashset_add(metric->as.set, key); +} + +size_t +brubeck_set_size(brubeck_hashset_t *hs) +{ + return ck_hs_count(hs); +} + +bool +brubeck_set_clear(brubeck_hashset_t *hs) +{ + return brubeck_hashset_clear(hs); +} diff --git a/src/set.h b/src/set.h new file mode 100644 index 0000000..08f7fc6 --- /dev/null +++ b/src/set.h @@ -0,0 +1,8 @@ +#ifndef __BRUBECK_SET_H__ +#define __BRUBECK_SET_H__ + +void brubeck_set_add(struct brubeck_metric *metric, const char *key); +size_t brubeck_set_size(brubeck_hashset_t *hs); +bool brubeck_set_clear(brubeck_hashset_t *hs); + +#endif diff --git a/src/utils.h b/src/utils.h index 2dc4f66..dd7ddf6 100644 --- a/src/utils.h +++ b/src/utils.h @@ -57,6 +57,13 @@ static inline void *xrealloc(void *ptr, size_t size) return new_ptr; } +// strdup +static inline char *xstrdup(const char *s) +{ + void *ptr = xmalloc(strlen(s) + 1); + return strcpy(ptr, s); +} + #define brubeck_atomic_inc(P) __sync_add_and_fetch((P), 1) #define brubeck_atomic_dec(P) __sync_add_and_fetch((P), -1) #define brubeck_atomic_add(P, V) __sync_add_and_fetch((P), (V)) diff --git a/tests/statsd_msg.c b/tests/statsd_msg.c index 5ca232f..58d2c15 100644 --- a/tests/statsd_msg.c +++ b/tests/statsd_msg.c @@ -10,7 +10,17 @@ static void try_parse(struct brubeck_statsd_msg *msg, const char *msg_text, doub memcpy(buffer, msg_text, len); sput_fail_unless(brubeck_statsd_msg_parse(msg, buffer, len) == 0, msg_text); - sput_fail_unless(expected == msg->value, "msg.value == expected"); + sput_fail_unless(expected == msg->value.n, "msg.value.n == expected"); +} + +static void try_parse_set(struct brubeck_statsd_msg *msg, const char *msg_text, const char *expected) +{ + char buffer[64]; + size_t len = strlen(msg_text); + memcpy(buffer, msg_text, len); + + sput_fail_unless(brubeck_statsd_msg_parse(msg, buffer, len) == 0, msg_text); + sput_fail_unless(0 == strcmp(msg->value.s, expected), "msg.value.s == expected"); } void test_statsd_msg__parse_strings(void) @@ -26,4 +36,8 @@ void test_statsd_msg__parse_strings(void) try_parse(&msg, "this.is.sparta:23.23|g", 23.23); try_parse(&msg, "this.is.sparta:0.232030|g", 0.23203); try_parse(&msg, "this.are.some.floats:1234567.89|g", 1234567.89); + +#define EXPECT "some.non-float.value" + try_parse_set(&msg, "this.is.a.set:" EXPECT "|s|@args", EXPECT); + try_parse_set(&msg, "this.is.a.set:" EXPECT "|s", EXPECT); }