Skip to content
This repository has been archived by the owner on Nov 14, 2019. It is now read-only.

support 'set' type metrics #32

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SOURCES = \
src/bloom.c \
src/city.c \
src/histogram.c \
src/hs.c \
src/ht.c \
src/http.c \
src/internal_sampler.c \
Expand All @@ -25,6 +26,7 @@ SOURCES = \
src/samplers/statsd-secure.c \
src/samplers/statsd.c \
src/server.c \
src/set.c \
src/setproctitle.c \
src/slab.c \
src/utils.c
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Brubeck supports most of the metric types from statsd and many other implementat
- `C` - Counters
- `h` - Histograms
- `ms` - Timers (in milliseconds)
- `s` - Sets

Client-sent sampling rates are ignored.

Expand Down
6 changes: 6 additions & 0 deletions src/brubeck.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#define MAX_ADDR 256

typedef double value_t;
typedef union {
value_t n;
char *s;
} sample_value_t;
typedef uint64_t hash_t;

struct brubeck_server;
Expand All @@ -34,6 +38,8 @@ struct brubeck_metric;
#include "utils.h"
#include "slab.h"
#include "histogram.h"
#include "hs.h"
#include "set.h"
#include "metric.h"
#include "sampler.h"
#include "backend.h"
Expand Down
54 changes: 54 additions & 0 deletions src/hs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <stdlib.h>

#include "brubeck.h"
#include "../vendor/ck/src/ck_ht_hash.h"
#include "ck_malloc.h"

static unsigned long
hs_hash(const void *object, unsigned long seed)
{
const char *c = object;
return (unsigned long)MurmurHash64A(c, strlen(c), seed);
}

static bool
hs_compare(const void *previous, const void *compare)
{
return strcmp(previous, compare) == 0;
}

brubeck_hashset_t *
brubeck_hashset_new(const uint64_t size)
{
brubeck_hashset_t *hs = xmalloc(sizeof(brubeck_hashset_t));
if (!ck_hs_init(hs, CK_HS_MODE_DIRECT, hs_hash, hs_compare, &ALLOCATOR,
(uint64_t)size, 0xDD15EA5E)) {
free(hs);
return NULL;
}

return hs;
}

void
brubeck_hashset_free(brubeck_hashset_t *hs)
{
/* no-op */
}

bool
brubeck_hashset_add(brubeck_hashset_t *hs, const char *key) {
if(NULL != ck_hs_get(hs, CK_HS_HASH(hs, hs_hash, key), key))
return true;

return ck_hs_put(hs, CK_HS_HASH(hs, hs_hash, key), xstrdup(key));
}

bool
brubeck_hashset_clear(brubeck_hashset_t *hs) {
ck_hs_iterator_t iterator = CK_HS_ITERATOR_INITIALIZER;
void *key;
while(ck_hs_next(hs, &iterator, &key))
free(key);
return ck_hs_reset(hs);
}
14 changes: 14 additions & 0 deletions src/hs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef _HS_H_
#define _HS_H_

#include <stdbool.h>
#include "ck_hs.h"

typedef struct ck_hs brubeck_hashset_t;

brubeck_hashset_t * brubeck_hashset_new(const uint64_t size);
void brubeck_hashset_free(brubeck_hashset_t *hs);
bool brubeck_hashset_add(brubeck_hashset_t *hs, const char *key);
bool brubeck_hashset_clear(brubeck_hashset_t *hs);

#endif
2 changes: 1 addition & 1 deletion src/ht.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ht_free(void *p, size_t b, bool r)
free(p);
}

static struct ck_malloc ALLOCATOR = {
struct ck_malloc ALLOCATOR = {
.malloc = ht_malloc,
.free = ht_free
};
Expand Down
1 change: 1 addition & 0 deletions src/ht.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <stdbool.h>

extern struct ck_malloc ALLOCATOR;
struct brubeck_metric;
typedef struct brubeck_hashtable_t brubeck_hashtable_t;

Expand Down
2 changes: 1 addition & 1 deletion src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static struct MHD_Response *
send_metric(struct brubeck_server *server, const char *url)
{
static const char *metric_types[] = {
"gauge", "meter", "counter", "histogram", "timer", "internal"
"gauge", "meter", "counter", "histogram", "timer", "set", "internal"
};
static const char *expire_status[] = {
"disabled", "inactive", "active"
Expand Down
65 changes: 52 additions & 13 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ new_metric(struct brubeck_server *server, const char *key, size_t key_len, uint8
return metric;
}

typedef void (*mt_prototype_record)(struct brubeck_metric *, value_t);
typedef void (*mt_prototype_record)(struct brubeck_metric *, sample_value_t);
typedef void (*mt_prototype_sample)(struct brubeck_metric *, brubeck_sample_cb, void *);


Expand All @@ -32,11 +32,11 @@ typedef void (*mt_prototype_sample)(struct brubeck_metric *, brubeck_sample_cb,
* ALLOC: mt + 4 bytes
*********************************************/
static void
gauge__record(struct brubeck_metric *metric, value_t value)
gauge__record(struct brubeck_metric *metric, sample_value_t value)
{
pthread_spin_lock(&metric->lock);
{
metric->as.gauge.value = value;
metric->as.gauge.value = value.n;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -62,11 +62,11 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
* ALLOC: mt + 4
*********************************************/
static void
meter__record(struct brubeck_metric *metric, value_t value)
meter__record(struct brubeck_metric *metric, sample_value_t value)
{
pthread_spin_lock(&metric->lock);
{
metric->as.meter.value += value;
metric->as.meter.value += value.n;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -93,19 +93,19 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
* ALLOC: mt + 4 + 4 + 4
*********************************************/
static void
counter__record(struct brubeck_metric *metric, value_t value)
counter__record(struct brubeck_metric *metric, sample_value_t value)
{
pthread_spin_lock(&metric->lock);
{
if (metric->as.counter.previous > 0.0) {
value_t diff = (value >= metric->as.counter.previous) ?
(value - metric->as.counter.previous) :
(value);
value_t diff = (value.n >= metric->as.counter.previous) ?
(value.n - metric->as.counter.previous) :
(value.n);

metric->as.counter.value += diff;
}

metric->as.counter.previous = value;
metric->as.counter.previous = value.n;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -132,11 +132,11 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
* ALLOC: mt + 16 + 4
*********************************************/
static void
histogram__record(struct brubeck_metric *metric, value_t value)
histogram__record(struct brubeck_metric *metric, sample_value_t value)
{
pthread_spin_lock(&metric->lock);
{
brubeck_histo_push(&metric->as.histogram, value);
brubeck_histo_push(&metric->as.histogram, value.n);
}
pthread_spin_unlock(&metric->lock);
}
Expand Down Expand Up @@ -204,6 +204,39 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
}
}


/*********************************************
* Set
*
* ALLOC: ?
*********************************************/
static void
set__record(struct brubeck_metric *metric, sample_value_t value)
{
pthread_spin_lock(&metric->lock);
{
brubeck_set_add(metric, value.s);
}
pthread_spin_unlock(&metric->lock);
}

static void
set__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opaque)
{
value_t value = 0.0;

pthread_spin_lock(&metric->lock);
{
if(NULL != metric->as.set) {
value = brubeck_set_size(metric->as.set);
brubeck_set_clear(metric->as.set);
}
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
}

/********************************************************/

static struct brubeck_metric__proto {
Expand Down Expand Up @@ -240,6 +273,12 @@ static struct brubeck_metric__proto {
&histogram__sample
},

/* Set */
{
&set__record,
&set__sample
},

/* Internal -- used for sampling brubeck itself */
{
NULL, /* recorded manually */
Expand All @@ -252,7 +291,7 @@ void brubeck_metric_sample(struct brubeck_metric *metric, brubeck_sample_cb cb,
_prototypes[metric->type].sample(metric, cb, backend);
}

void brubeck_metric_record(struct brubeck_metric *metric, value_t value)
void brubeck_metric_record(struct brubeck_metric *metric, sample_value_t value)
{
_prototypes[metric->type].record(metric, value);
}
Expand Down
4 changes: 3 additions & 1 deletion src/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ enum brubeck_metric_t {
BRUBECK_MT_COUNTER, /** C */
BRUBECK_MT_HISTO, /** h */
BRUBECK_MT_TIMER, /** ms */
BRUBECK_MT_SET, /** s */
BRUBECK_MT_INTERNAL_STATS
};

Expand Down Expand Up @@ -39,6 +40,7 @@ struct brubeck_metric {
struct {
value_t value, previous;
} counter;
brubeck_hashset_t *set;
struct brubeck_histo histogram;
void *other;
} as;
Expand All @@ -52,7 +54,7 @@ typedef void (*brubeck_sample_cb)(
void *backend);

void brubeck_metric_sample(struct brubeck_metric *metric, brubeck_sample_cb cb, void *backend);
void brubeck_metric_record(struct brubeck_metric *metric, value_t value);
void brubeck_metric_record(struct brubeck_metric *metric, sample_value_t value);

struct brubeck_metric *brubeck_metric_new(struct brubeck_server *server, const char *, size_t, uint8_t);
struct brubeck_metric *brubeck_metric_find(struct brubeck_server *server, const char *, size_t, uint8_t);
Expand Down
Loading