Skip to content

Commit

Permalink
Implements Carbon namespacing
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Nov 13, 2019
1 parent 236d619 commit 160478c
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 33 deletions.
10 changes: 8 additions & 2 deletions config.default.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
{
"type" : "carbon",
"address" : "localhost",
"port" : 2003,
"frequency" : 10
"port" : 2004,
"frequency" : 10,
"pickle" : true,
"global_prefix" : "stats",
"prefix_counter" : "counters",
"prefix_timer" : "timers",
"prefix_histo" : "histos",
"prefix_gauge" : "gauges"
}
],

Expand Down
2 changes: 1 addition & 1 deletion src/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct brubeck_backend {

int (*connect)(void *);
bool (*is_connected)(void *);
void (*sample)(const char *, value_t, void *);
void (*sample)(uint8_t, const char *, value_t, void *);
void (*flush)(void *);

uint32_t tick_time;
Expand Down
119 changes: 111 additions & 8 deletions src/backends/carbon.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static void carbon_disconnect(struct brubeck_carbon *self)
}

static void plaintext_each(
uint8_t type,
const char *key,
value_t value,
void *backend)
Expand All @@ -58,6 +59,30 @@ static void plaintext_each(
if (!carbon_is_connected(carbon))
return;

if (carbon->namespacing.global) {
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
ptr += carbon->namespacing.global_len;
*ptr++ = '.';
}

if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
memcpy(ptr, carbon->namespacing.counter, carbon->namespacing.counter_len);
ptr += carbon->namespacing.counter_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
memcpy(ptr, carbon->namespacing.timer, carbon->namespacing.timer_len);
ptr += carbon->namespacing.timer_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
memcpy(ptr, carbon->namespacing.histo, carbon->namespacing.histo_len);
ptr += carbon->namespacing.histo_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
memcpy(ptr, carbon->namespacing.gauge, carbon->namespacing.gauge_len);
ptr += carbon->namespacing.gauge_len;
*ptr++ = '.';
}

memcpy(ptr, key, key_len);
ptr += key_len;
*ptr++ = ' ';
Expand Down Expand Up @@ -103,18 +128,61 @@ static inline size_t pickle1_double(char *ptr, void *_src)
}

static void pickle1_push(
struct pickler *buf,
struct brubeck_carbon *carbon,
uint8_t type,
const char *key,
uint8_t key_len,
uint32_t timestamp,
value_t value)
{
uint8_t namespaced_key_len = 0;
char *type_namespace = NULL;
size_t type_namespace_len = 0;
struct pickler *buf = &carbon->pickler;
char *ptr = buf->ptr + buf->pos;

if (carbon->namespacing.global) {
// the global namespace plus the "." character
namespaced_key_len += carbon->namespacing.global_len + 1;
}

if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
type_namespace = carbon->namespacing.counter;
type_namespace_len = carbon->namespacing.counter_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.counter_len + 1;
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
type_namespace = carbon->namespacing.timer;
type_namespace_len = carbon->namespacing.timer_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.timer_len + 1;
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
type_namespace = carbon->namespacing.histo;
type_namespace_len = carbon->namespacing.histo_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.histo_len + 1;
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
type_namespace = carbon->namespacing.gauge;
type_namespace_len = carbon->namespacing.gauge_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.gauge_len + 1;
}

namespaced_key_len += key_len;

*ptr++ = '(';

*ptr++ = 'U';
*ptr++ = key_len;
*ptr++ = namespaced_key_len;
if (carbon->namespacing.global) {
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
ptr += carbon->namespacing.global_len;
*ptr++ = '.';
}
if (type_namespace) {
memcpy(ptr, type_namespace, type_namespace_len);
ptr += type_namespace_len;
*ptr++ = '.';
}
memcpy(ptr, key, key_len);
ptr += key_len;

Expand All @@ -123,7 +191,7 @@ static void pickle1_push(

*ptr++ = '(';

ptr += pickle1_int32(ptr, &timestamp);
ptr += pickle1_int32(ptr, &carbon->backend.tick_time);
ptr += pickle1_double(ptr, &value);

*ptr++ = 't';
Expand Down Expand Up @@ -177,6 +245,7 @@ static void pickle1_flush(void *backend)
}

static void pickle1_each(
uint8_t type,
const char *key,
value_t value,
void *backend)
Expand All @@ -192,23 +261,32 @@ static void pickle1_each(
if (!carbon_is_connected(carbon))
return;

pickle1_push(&carbon->pickler, key, key_len,
carbon->backend.tick_time, value);
pickle1_push(carbon, type, key, key_len, value);
}

struct brubeck_backend *
brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
{
struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon));
char *address;
char *global_prefix = NULL,
*prefix_counter = NULL,
*prefix_timer = NULL,
*prefix_histo = NULL,
*prefix_gauge = NULL;
int port, frequency, pickle = 0;

json_unpack_or_die(settings,
"{s:s, s:i, s?:b, s:i}",
"{s:s, s:i, s?:b, s:i, s?:s, s?:s, s?:s, s?:s, s?:s}",
"address", &address,
"port", &port,
"pickle", &pickle,
"frequency", &frequency);
"frequency", &frequency,
"global_prefix", &global_prefix,
"prefix_counter", &prefix_counter,
"prefix_timer", &prefix_timer,
"prefix_histo", &prefix_histo,
"prefix_gauge", &prefix_gauge);

carbon->backend.type = BRUBECK_BACKEND_CARBON;
carbon->backend.shard_n = shard_n;
Expand All @@ -225,6 +303,31 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
carbon->backend.flush = NULL;
}

if (global_prefix) {
carbon->namespacing.global = global_prefix;
carbon->namespacing.global_len = strlen(global_prefix);
}

if (prefix_counter) {
carbon->namespacing.counter = prefix_counter;
carbon->namespacing.counter_len = strlen(prefix_counter);
}

if (prefix_timer) {
carbon->namespacing.timer = prefix_timer;
carbon->namespacing.timer_len = strlen(prefix_timer);
}

if (prefix_histo) {
carbon->namespacing.histo = prefix_histo;
carbon->namespacing.histo_len = strlen(prefix_histo);
}

if (prefix_gauge) {
carbon->namespacing.gauge = prefix_gauge;
carbon->namespacing.gauge_len = strlen(prefix_gauge);
}

carbon->backend.sample_freq = frequency;
carbon->backend.server = server;
carbon->out_sock = -1;
Expand Down
16 changes: 16 additions & 0 deletions src/backends/carbon.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ struct brubeck_carbon {
uint16_t pos;
uint16_t pt;
} pickler;
struct namespacing {
char *global;
size_t global_len;

char *counter;
size_t counter_len;

char *timer;
size_t timer_len;

char *histo;
size_t histo_len;

char *gauge;
size_t gauge_len;
} namespacing;
size_t sent;
};

Expand Down
14 changes: 7 additions & 7 deletions src/internal_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,44 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
WITH_SUFFIX(".metrics") {
value = brubeck_atomic_swap(&stats->live.metrics, 0);
stats->sample.metrics = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".errors") {
value = brubeck_atomic_swap(&stats->live.errors, 0);
stats->sample.errors = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".unique_keys") {
value = brubeck_atomic_fetch(&stats->live.unique_keys);
stats->sample.unique_keys = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

/* Secure statsd endpoint */
WITH_SUFFIX(".secure.failed") {
value = brubeck_atomic_swap(&stats->live.secure.failed, 0);
stats->sample.secure.failed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.from_future") {
value = brubeck_atomic_swap(&stats->live.secure.from_future, 0);
stats->sample.secure.from_future = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.delayed") {
value = brubeck_atomic_swap(&stats->live.secure.delayed, 0);
stats->sample.secure.delayed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.replayed") {
value = brubeck_atomic_swap(&stats->live.secure.replayed, 0);
stats->sample.secure.replayed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

/*
Expand Down
30 changes: 15 additions & 15 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -98,7 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -140,7 +140,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -179,12 +179,12 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void


WITH_SUFFIX(".count") {
sample(key, hsample.count, opaque);
sample(metric->type, key, hsample.count, opaque);
}

WITH_SUFFIX(".count_ps") {
struct brubeck_backend *backend = opaque;
sample(key, hsample.count / (double)backend->sample_freq, opaque);
sample(metric->type, key, hsample.count / (double)backend->sample_freq, opaque);
}

/* if there have been no metrics during this sampling period,
Expand All @@ -193,43 +193,43 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
return;

WITH_SUFFIX(".min") {
sample(key, hsample.min, opaque);
sample(metric->type, key, hsample.min, opaque);
}

WITH_SUFFIX(".max") {
sample(key, hsample.max, opaque);
sample(metric->type, key, hsample.max, opaque);
}

WITH_SUFFIX(".sum") {
sample(key, hsample.sum, opaque);
sample(metric->type, key, hsample.sum, opaque);
}

WITH_SUFFIX(".mean") {
sample(key, hsample.mean, opaque);
sample(metric->type, key, hsample.mean, opaque);
}

WITH_SUFFIX(".median") {
sample(key, hsample.median, opaque);
sample(metric->type, key, hsample.median, opaque);
}

WITH_SUFFIX(".percentile.75") {
sample(key, hsample.percentile[PC_75], opaque);
sample(metric->type, key, hsample.percentile[PC_75], opaque);
}

WITH_SUFFIX(".percentile.95") {
sample(key, hsample.percentile[PC_95], opaque);
sample(metric->type, key, hsample.percentile[PC_95], opaque);
}

WITH_SUFFIX(".percentile.98") {
sample(key, hsample.percentile[PC_98], opaque);
sample(metric->type, key, hsample.percentile[PC_98], opaque);
}

WITH_SUFFIX(".percentile.99") {
sample(key, hsample.percentile[PC_99], opaque);
sample(metric->type, key, hsample.percentile[PC_99], opaque);
}

WITH_SUFFIX(".percentile.999") {
sample(key, hsample.percentile[PC_999], opaque);
sample(metric->type, key, hsample.percentile[PC_999], opaque);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct brubeck_metric {
};

typedef void (*brubeck_sample_cb)(
uint8_t type,
const char *key,
value_t value,
void *backend);
Expand Down

0 comments on commit 160478c

Please sign in to comment.