Skip to content
This repository has been archived by the owner on Nov 14, 2019. It is now read-only.

remove expire mechanism #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config.default.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"server_name" : "brubeck_debug",
"dumpfile" : "./brubeck.dump",
"capacity" : 15,
"expire" : 20,
"http" : ":8080",

"backends" : [
Expand Down
2 changes: 1 addition & 1 deletion src/backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static void *backend__thread(void *_ptr)
self->tick_time = now.tv_sec;

for (mt = self->queue; mt; mt = mt->next) {
if (mt->expire > BRUBECK_EXPIRE_DISABLED)
if (mt->state != BRUBECK_STATE_DISABLED)
brubeck_metric_sample(mt, self->sample, self);
}

Expand Down
8 changes: 4 additions & 4 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ expire_metric(struct brubeck_server *server, const char *url)
server, url + strlen("/expire/"));

if (metric) {
metric->expire = BRUBECK_EXPIRE_DISABLED;
metric->state = BRUBECK_STATE_DISABLED;
return MHD_create_response_from_data(
0, "", 0, 0);
}
Expand All @@ -76,8 +76,8 @@ send_metric(struct brubeck_server *server, const char *url)
static const char *metric_types[] = {
"gauge", "meter", "counter", "histogram", "timer", "internal"
};
static const char *expire_status[] = {
"disabled", "inactive", "active"
static const char *metric_status[] = {
"inactive", "active"
};

struct brubeck_metric *metric = safe_lookup_metric(
Expand All @@ -92,7 +92,7 @@ send_metric(struct brubeck_server *server, const char *url)
#else
"shard", 0,
#endif
"expire", expire_status[metric->expire]
"status", metric_status[metric->state]
);

char *jsonr = json_dumps(mj, JSON_INDENT(4) | JSON_PRESERVE_ORDER);
Expand Down
2 changes: 1 addition & 1 deletion src/internal_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
* Mark the metric as active so it doesn't get disabled
* by the inactive metrics pruner
*/
metric->expire = BRUBECK_EXPIRE_ACTIVE;
metric->state = BRUBECK_STATE_ACTIVE;
}

void brubeck_internal__init(struct brubeck_server *server)
Expand Down
16 changes: 9 additions & 7 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ new_metric(struct brubeck_server *server, const char *key, size_t key_len, uint8
metric->key[key_len] = '\0';
metric->key_len = (uint16_t)key_len;

metric->expire = BRUBECK_EXPIRE_ACTIVE;
metric->state = BRUBECK_STATE_ACTIVE;
metric->type = type;
pthread_spin_init(&metric->lock, PTHREAD_PROCESS_PRIVATE);

Expand Down Expand Up @@ -49,6 +49,7 @@ gauge__record(struct brubeck_metric *metric, value_t value, value_t sample_freq,
} else {
metric->as.gauge.value = value;
}
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -61,6 +62,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
pthread_spin_lock(&metric->lock);
{
value = metric->as.gauge.value;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -82,6 +84,7 @@ meter__record(struct brubeck_metric *metric, value_t value, value_t sample_freq,
pthread_spin_lock(&metric->lock);
{
metric->as.meter.value += value;
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -95,6 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
{
value = metric->as.meter.value;
metric->as.meter.value = 0.0;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand Down Expand Up @@ -124,6 +128,7 @@ counter__record(struct brubeck_metric *metric, value_t value, value_t sample_fre
}

metric->as.counter.previous = value;
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -137,6 +142,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
{
value = metric->as.counter.value;
metric->as.counter.value = 0.0;
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -155,6 +161,7 @@ histogram__record(struct brubeck_metric *metric, value_t value, value_t sample_f
pthread_spin_lock(&metric->lock);
{
brubeck_histo_push(&metric->as.histogram, value, sample_freq);
metric->state= BRUBECK_STATE_ACTIVE;
}
pthread_spin_unlock(&metric->lock);
}
Expand All @@ -168,6 +175,7 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
pthread_spin_lock(&metric->lock);
{
brubeck_histo_sample(&hsample, &metric->as.histogram);
metric->state = BRUBECK_STATE_INACTIVE;
}
pthread_spin_unlock(&metric->lock);

Expand All @@ -187,11 +195,6 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
sample(key, hsample.count / (double)backend->sample_freq, opaque);
}

/* if there have been no metrics during this sampling period,
* we don't need to report any of the histogram samples */
if (hsample.count == 0.0)
return;

WITH_SUFFIX(".min") {
sample(key, hsample.min, opaque);
}
Expand Down Expand Up @@ -333,6 +336,5 @@ brubeck_metric_find(struct brubeck_server *server, const char *key, size_t key_l
brubeck_atomic_inc(&metric->flow);
#endif

metric->expire = BRUBECK_EXPIRE_ACTIVE;
return metric;
}
7 changes: 3 additions & 4 deletions src/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ enum brubeck_aggregate_t {
};

enum {
BRUBECK_EXPIRE_DISABLED = 0,
BRUBECK_EXPIRE_INACTIVE = 1,
BRUBECK_EXPIRE_ACTIVE = 2
BRUBECK_STATE_INACTIVE = 0,
BRUBECK_STATE_ACTIVE = 1
};

struct brubeck_metric {
Expand All @@ -38,7 +37,7 @@ struct brubeck_metric {
pthread_spinlock_t lock;
uint16_t key_len;
uint8_t type;
uint8_t expire;
uint8_t state;

union {
struct {
Expand Down
28 changes: 2 additions & 26 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ update_proctitle(struct brubeck_server *server)
setproctitle("brubeck", buf);
}

static void
expire_metric(struct brubeck_metric *mt, void *_)
{
/* If this metric is not disabled, turn "inactive"
* into "disabled" and "active" into "inactive"
*/
if (mt->expire > BRUBECK_EXPIRE_DISABLED)
mt->expire = mt->expire - 1;
}

static void
dump_metric(struct brubeck_metric *mt, void *out_file)
{
Expand Down Expand Up @@ -192,7 +182,6 @@ static void load_config(struct brubeck_server *server, const char *path)
json_t *backends, *samplers;

/* optional */
int expire = 0;
char *http = NULL;

server->name = "brubeck";
Expand All @@ -211,8 +200,7 @@ static void load_config(struct brubeck_server *server, const char *path)
"capacity", &capacity,
"backends", &backends,
"samplers", &samplers,
"http", &http,
"expire", &expire);
"http", &http);

gh_log_set_instance(server->name);

Expand All @@ -224,7 +212,6 @@ static void load_config(struct brubeck_server *server, const char *path)
load_samplers(server, samplers);

if (http) brubeck_http_endpoint_init(server, http);
if (expire) server->fd_expire = load_timerfd(expire);
}

void brubeck_server_init(struct brubeck_server *server, const char *config)
Expand All @@ -237,7 +224,6 @@ void brubeck_server_init(struct brubeck_server *server, const char *config)

server->fd_signal = load_signalfd();
server->fd_update = load_timerfd(1);
server->fd_expire = -1;

/* init the memory allocator */
brubeck_slab_init(&server->slab);
Expand Down Expand Up @@ -272,7 +258,7 @@ static int signal_triggered(struct pollfd *fd)

int brubeck_server_run(struct brubeck_server *server)
{
struct pollfd fds[3];
struct pollfd fds[2];
int nfd = 2;
size_t i;

Expand All @@ -284,11 +270,6 @@ int brubeck_server_run(struct brubeck_server *server)
fds[1].fd = server->fd_update;
fds[1].events = POLLIN;

if (server->fd_expire >= 0) {
fds[2].fd = server->fd_expire;
fds[2].events = POLLIN;
nfd++;
}

server->running = 1;
log_splunk("event=listening");
Expand All @@ -315,11 +296,6 @@ int brubeck_server_run(struct brubeck_server *server)
update_flows(server);
update_proctitle(server);
}

if (timer_elapsed(&fds[2])) {
log_splunk("event=expire_metrics");
brubeck_hashtable_foreach(server->metrics, &expire_metric, NULL);
}
}

for (i = 0; i < server->active_backends; ++i)
Expand Down
1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct brubeck_server {
int active_samplers;

int fd_signal;
int fd_expire;
int fd_update;

struct brubeck_slab slab;
Expand Down