Skip to content

Commit

Permalink
add sleeps before getting metrics for e2e tests, move collector to pa…
Browse files Browse the repository at this point in the history
…ckage

Signed-off-by: glightfoot <glightfoot@rsglab.com>
  • Loading branch information
glightfoot committed Jun 22, 2020
1 parent f6262ec commit dfccf08
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 124 deletions.
4 changes: 3 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestIssue61(t *testing.T) {
}
defer exporter.Process.Kill()

for i := 0; i < 10; i++ {
for i := 0; i < 20; i++ {
if i > 0 {
time.Sleep(1 * time.Second)
}
Expand Down Expand Up @@ -93,6 +93,8 @@ rspamd.spam_count 3 NOW`
t.Fatalf("write error: %v", err)
}

time.Sleep(5 * time.Second)

resp, err := http.Get("http://" + path.Join(webAddr, "metrics"))
if err != nil {
t.Fatalf("get error: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions e2e/issue90_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func TestIssue90(t *testing.T) {
conn.Close()
}

time.Sleep(5 * time.Second)

resp, err := http.Get("http://" + path.Join(webAddr, "metrics"))
if err != nil {
t.Fatalf("get error: %v", err)
Expand Down
123 changes: 8 additions & 115 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -35,9 +32,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/mapper"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/prometheus/graphite_exporter/pkg/graphitesample"
"github.com/prometheus/graphite_exporter/pkg/line"
"github.com/prometheus/graphite_exporter/pkg/metricmapper"
"github.com/prometheus/graphite_exporter/pkg/collector"
)

var (
Expand Down Expand Up @@ -77,108 +72,6 @@ var (
)
)

type graphiteCollector struct {
samples map[string]*graphitesample.GraphiteSample
mu *sync.Mutex
mapper metricmapper.MetricMapper
sampleCh chan *graphitesample.GraphiteSample
lineCh chan string
strictMatch bool
logger log.Logger
}

func newGraphiteCollector(logger log.Logger) *graphiteCollector {
c := &graphiteCollector{
sampleCh: make(chan *graphitesample.GraphiteSample),
lineCh: make(chan string),
mu: &sync.Mutex{},
samples: map[string]*graphitesample.GraphiteSample{},
strictMatch: *strictMatch,
logger: logger,
}

go c.processSamples()
go c.processLines()

return c
}

func (c *graphiteCollector) processReader(reader io.Reader) {
lineScanner := bufio.NewScanner(reader)

for {
if ok := lineScanner.Scan(); !ok {
break
}
c.lineCh <- lineScanner.Text()
}
}

func (c *graphiteCollector) processLines() {
for l := range c.lineCh {
line.ProcessLine(l, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger)
}
}

func (c *graphiteCollector) processSamples() {
ticker := time.NewTicker(time.Minute).C

for {
select {
case sample, ok := <-c.sampleCh:
if sample == nil || !ok {
return
}

c.mu.Lock()
c.samples[sample.OriginalName] = sample
c.mu.Unlock()
case <-ticker:
// Garbage collect expired samples.
ageLimit := time.Now().Add(-*sampleExpiry)

c.mu.Lock()
for k, sample := range c.samples {
if ageLimit.After(sample.Timestamp) {
delete(c.samples, k)
}
}
c.mu.Unlock()
}
}
}

// Collect implements prometheus.Collector.
func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) {
ch <- lastProcessed

c.mu.Lock()
samples := make([]*graphitesample.GraphiteSample, 0, len(c.samples))

for _, sample := range c.samples {
samples = append(samples, sample)
}
c.mu.Unlock()

ageLimit := time.Now().Add(-*sampleExpiry)

for _, sample := range samples {
if ageLimit.After(sample.Timestamp) {
continue
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels),
sample.Type,
sample.Value,
)
}
}

// Describe implements prometheus.Collector.
func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- lastProcessed.Desc()
}

func init() {
prometheus.MustRegister(version.NewCollector("graphite_exporter"))
}
Expand Down Expand Up @@ -223,24 +116,24 @@ func main() {

http.Handle(*metricsPath, promhttp.Handler())

c := newGraphiteCollector(logger)
c := collector.NewGraphiteCollector(logger, *strictMatch, *sampleExpiry, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics)
prometheus.MustRegister(c)

c.mapper = &mapper.MetricMapper{}
c.Mapper = &mapper.MetricMapper{}
cacheOption := mapper.WithCacheType(*cacheType)

if *mappingConfig != "" {
err := c.mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
err := c.Mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil {
level.Error(logger).Log("msg", "Error loading metric mapping config", "err", err)
os.Exit(1)
}
} else {
c.mapper.InitCache(*cacheSize, cacheOption)
c.Mapper.InitCache(*cacheSize, cacheOption)
}

if *dumpFSMPath != "" {
err := dumpFSM(c.mapper.(*mapper.MetricMapper), *dumpFSMPath, logger)
err := dumpFSM(c.Mapper.(*mapper.MetricMapper), *dumpFSMPath, logger)
if err != nil {
level.Error(logger).Log("msg", "Error dumping FSM", "err", err)
os.Exit(1)
Expand All @@ -263,7 +156,7 @@ func main() {

go func() {
defer conn.Close()
c.processReader(conn)
c.ProcessReader(conn)
}()
}
}()
Expand Down Expand Up @@ -292,7 +185,7 @@ func main() {
continue
}

go c.processReader(bytes.NewReader(buf[0:chars]))
go c.ProcessReader(bytes.NewReader(buf[0:chars]))
}
}()

Expand Down
17 changes: 10 additions & 7 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package main
import (
"strings"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/stretchr/testify/assert"

"github.com/prometheus/graphite_exporter/pkg/collector"
"github.com/prometheus/graphite_exporter/pkg/line"
)

Expand Down Expand Up @@ -145,31 +147,32 @@ func TestProcessLine(t *testing.T) {
},
}

c := newGraphiteCollector(log.NewNopLogger())
// func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector {
c := collector.NewGraphiteCollector(log.NewNopLogger(), false, 5*time.Minute, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics)

for _, testCase := range testCases {
if testCase.mappingPresent {
c.mapper = &mockMapper{
c.Mapper = &mockMapper{
name: testCase.name,
labels: testCase.mappingLabels,
action: testCase.action,
present: testCase.mappingPresent,
}
} else {
c.mapper = &mockMapper{
c.Mapper = &mockMapper{
present: testCase.mappingPresent,
}
}

c.strictMatch = testCase.strict
line.ProcessLine(testCase.line, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger)
c.StrictMatch = testCase.strict
line.ProcessLine(testCase.line, c.Mapper, c.SampleCh, c.StrictMatch, tagErrors, lastProcessed, invalidMetrics, c.Logger)
}

c.sampleCh <- nil
c.SampleCh <- nil

for _, k := range testCases {
originalName := strings.Split(k.line, " ")[0]
sample := c.samples[originalName]
sample := c.Samples[originalName]

if k.willFail {
assert.Nil(t, sample, "Found %s", k.name)
Expand Down
Loading

0 comments on commit dfccf08

Please sign in to comment.