Skip to content

Commit

Permalink
Merge branch 'master' into release-1.5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
VladoLavor authored Jul 23, 2018
2 parents 3b0501a + 366dea1 commit 19109ed
Showing 1 changed file with 170 additions and 145 deletions.
315 changes: 170 additions & 145 deletions plugins/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
govppapi "git.fd.io/govpp.git/api"
"github.com/ligato/cn-infra/flavors/local"
prom "github.com/ligato/cn-infra/rpc/prometheus"
"github.com/ligato/cn-infra/utils/safeclose"
"github.com/ligato/vpp-agent/plugins/govppmux"
"github.com/ligato/vpp-agent/plugins/govppmux/vppcalls"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,8 +84,6 @@ const (
type Plugin struct {
Deps

vppCh govppapi.Channel

runtimeGaugeVecs map[string]*prometheus.GaugeVec
runtimeStats map[string]*runtimeStats

Expand All @@ -102,6 +99,8 @@ type Plugin struct {
// From config file
updatePeriod time.Duration
disabled bool

quit chan struct{}
}

// Deps represents dependencies of Telemetry Plugin
Expand Down Expand Up @@ -151,14 +150,19 @@ func (p *Plugin) Init() error {
p.disabled = true
return nil
}
if config.PollingInterval > 0 {
// This prevents setting the update period to less than 5 seconds,
// which can have significant performance hit.
if config.PollingInterval > time.Second*5 {
p.updatePeriod = config.PollingInterval
p.Log.Infof("Telemetry polling period changed to %v", p.updatePeriod)
} else {
// Set default value
p.updatePeriod = defaultUpdatePeriod
} else if config.PollingInterval > 0 {
p.Log.Warnf("Telemetry polling period has to be at least 5s, using default: %v", defaultUpdatePeriod)
}
}
// This serves as fallback if the config was not found or if the value is not set in config.
if p.updatePeriod == 0 {
p.updatePeriod = defaultUpdatePeriod
}

// Register '/vpp' registry path
err = p.Prometheus.NewRegistry(registryPath, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})
Expand Down Expand Up @@ -292,13 +296,6 @@ func (p *Plugin) Init() error {
}
}

// Create GoVPP channel
p.vppCh, err = p.GoVppmux.NewAPIChannel()
if err != nil {
p.Log.Errorf("Error creating channel: %v", err)
return err
}

return nil
}

Expand All @@ -309,156 +306,184 @@ func (p *Plugin) AfterInit() error {
return nil
}

// Periodically update data
go func() {
for {
// Update runtime
runtimeInfo, err := vppcalls.GetRuntimeInfo(p.vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, thread := range runtimeInfo.Threads {
for _, item := range thread.Items {
stats, ok := p.runtimeStats[item.Name]
if !ok {
stats = &runtimeStats{
threadID: thread.ID,
threadName: thread.Name,
itemName: item.Name,
metrics: map[string]prometheus.Gauge{},
}

// add gauges with corresponding labels into vectors
for k, vec := range p.runtimeGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
runtimeItemLabel: item.Name,
runtimeThreadLabel: thread.Name,
runtimeThreadIDLabel: strconv.Itoa(int(thread.ID)),
})
if err != nil {
p.Log.Error(err)
}
}
}
p.quit = make(chan struct{})

go p.periodicUpdates()

return nil
}

// Close is used to clean up resources used by Telemetry Plugin
func (p *Plugin) Close() error {
if p.quit != nil {
close(p.quit)
p.quit = nil
}
return nil
}

// periodic updates for the metrics data
func (p *Plugin) periodicUpdates() {
// Create GoVPP channel
vppCh, err := p.GoVppmux.NewAPIChannel()
if err != nil {
p.Log.Errorf("Error creating channel: %v", err)
return
}

Loop:
for {
select {
// Delay period between updates
case <-time.After(p.updatePeriod):
p.updateData(vppCh)
// Plugin has stopped.
case <-p.quit:
break Loop
}
}

// Close GoVPP channel
vppCh.Close()
}

func (p *Plugin) updateData(vppCh govppapi.Channel) {
// Update runtime
runtimeInfo, err := vppcalls.GetRuntimeInfo(vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, thread := range runtimeInfo.Threads {
for _, item := range thread.Items {
stats, ok := p.runtimeStats[item.Name]
if !ok {
stats = &runtimeStats{
threadID: thread.ID,
threadName: thread.Name,
itemName: item.Name,
metrics: map[string]prometheus.Gauge{},
}

stats.metrics[runtimeCallsMetric].Set(float64(item.Calls))
stats.metrics[runtimeVectorsMetric].Set(float64(item.Vectors))
stats.metrics[runtimeSuspendsMetric].Set(float64(item.Suspends))
stats.metrics[runtimeClocksMetric].Set(item.Clocks)
stats.metrics[runtimeVectorsPerCallMetric].Set(item.VectorsPerCall)
// add gauges with corresponding labels into vectors
for k, vec := range p.runtimeGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
runtimeItemLabel: item.Name,
runtimeThreadLabel: thread.Name,
runtimeThreadIDLabel: strconv.Itoa(int(thread.ID)),
})
if err != nil {
p.Log.Error(err)
}
}
}

stats.metrics[runtimeCallsMetric].Set(float64(item.Calls))
stats.metrics[runtimeVectorsMetric].Set(float64(item.Vectors))
stats.metrics[runtimeSuspendsMetric].Set(float64(item.Suspends))
stats.metrics[runtimeClocksMetric].Set(item.Clocks)
stats.metrics[runtimeVectorsPerCallMetric].Set(item.VectorsPerCall)
}
}
}

// Update memory
memoryInfo, err := vppcalls.GetMemory(p.vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, thread := range memoryInfo.Threads {
stats, ok := p.memoryStats[thread.Name]
if !ok {
stats = &memoryStats{
threadName: thread.Name,
threadID: thread.ID,
metrics: map[string]prometheus.Gauge{},
}
// Update memory
memoryInfo, err := vppcalls.GetMemory(vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, thread := range memoryInfo.Threads {
stats, ok := p.memoryStats[thread.Name]
if !ok {
stats = &memoryStats{
threadName: thread.Name,
threadID: thread.ID,
metrics: map[string]prometheus.Gauge{},
}

// add gauges with corresponding labels into vectors
for k, vec := range p.memoryGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
memoryThreadLabel: thread.Name,
memoryThreadIDLabel: strconv.Itoa(int(thread.ID)),
})
if err != nil {
p.Log.Error(err)
}
}
// add gauges with corresponding labels into vectors
for k, vec := range p.memoryGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
memoryThreadLabel: thread.Name,
memoryThreadIDLabel: strconv.Itoa(int(thread.ID)),
})
if err != nil {
p.Log.Error(err)
}

stats.metrics[memoryObjectsMetric].Set(float64(thread.Objects))
stats.metrics[memoryUsedMetric].Set(float64(thread.Used))
stats.metrics[memoryTotalMetric].Set(float64(thread.Total))
stats.metrics[memoryFreeMetric].Set(float64(thread.Free))
stats.metrics[memoryReclaimedMetric].Set(float64(thread.Reclaimed))
stats.metrics[memoryOverheadMetric].Set(float64(thread.Overhead))
stats.metrics[memoryCapacityMetric].Set(float64(thread.Capacity))
}
}

// Update buffers
buffersInfo, err := vppcalls.GetBuffersInfo(p.vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, item := range buffersInfo.Items {
stats, ok := p.buffersStats[item.Name]
if !ok {
stats = &buffersStats{
threadID: item.ThreadID,
itemName: item.Name,
itemIndex: item.Index,
metrics: map[string]prometheus.Gauge{},
}
stats.metrics[memoryObjectsMetric].Set(float64(thread.Objects))
stats.metrics[memoryUsedMetric].Set(float64(thread.Used))
stats.metrics[memoryTotalMetric].Set(float64(thread.Total))
stats.metrics[memoryFreeMetric].Set(float64(thread.Free))
stats.metrics[memoryReclaimedMetric].Set(float64(thread.Reclaimed))
stats.metrics[memoryOverheadMetric].Set(float64(thread.Overhead))
stats.metrics[memoryCapacityMetric].Set(float64(thread.Capacity))
}
}

// add gauges with corresponding labels into vectors
for k, vec := range p.buffersGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
buffersThreadIDLabel: strconv.Itoa(int(item.ThreadID)),
buffersItemLabel: item.Name,
buffersIndexLabel: strconv.Itoa(int(item.Index)),
})
if err != nil {
p.Log.Error(err)
}
}
}
// Update buffers
buffersInfo, err := vppcalls.GetBuffersInfo(vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, item := range buffersInfo.Items {
stats, ok := p.buffersStats[item.Name]
if !ok {
stats = &buffersStats{
threadID: item.ThreadID,
itemName: item.Name,
itemIndex: item.Index,
metrics: map[string]prometheus.Gauge{},
}

stats.metrics[buffersSizeMetric].Set(float64(item.Size))
stats.metrics[buffersAllocMetric].Set(float64(item.Alloc))
stats.metrics[buffersFreeMetric].Set(float64(item.Free))
stats.metrics[buffersNumAllocMetric].Set(float64(item.NumAlloc))
stats.metrics[buffersNumFreeMetric].Set(float64(item.NumFree))
// add gauges with corresponding labels into vectors
for k, vec := range p.buffersGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
buffersThreadIDLabel: strconv.Itoa(int(item.ThreadID)),
buffersItemLabel: item.Name,
buffersIndexLabel: strconv.Itoa(int(item.Index)),
})
if err != nil {
p.Log.Error(err)
}
}
}

// Update node counters
nodeCountersInfo, err := vppcalls.GetNodeCounters(p.vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, item := range nodeCountersInfo.Counters {
stats, ok := p.nodeCounterStats[item.Node]
if !ok {
stats = &nodeCounterStats{
itemName: item.Node,
metrics: map[string]prometheus.Gauge{},
}
stats.metrics[buffersSizeMetric].Set(float64(item.Size))
stats.metrics[buffersAllocMetric].Set(float64(item.Alloc))
stats.metrics[buffersFreeMetric].Set(float64(item.Free))
stats.metrics[buffersNumAllocMetric].Set(float64(item.NumAlloc))
stats.metrics[buffersNumFreeMetric].Set(float64(item.NumFree))
}
}

// add gauges with corresponding labels into vectors
for k, vec := range p.nodeCounterGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
nodeCounterItemLabel: item.Node,
nodeCounterReasonLabel: item.Reason,
})
if err != nil {
p.Log.Error(err)
}
}
}
// Update node counters
nodeCountersInfo, err := vppcalls.GetNodeCounters(vppCh)
if err != nil {
p.Log.Errorf("Command failed: %v", err)
} else {
for _, item := range nodeCountersInfo.Counters {
stats, ok := p.nodeCounterStats[item.Node]
if !ok {
stats = &nodeCounterStats{
itemName: item.Node,
metrics: map[string]prometheus.Gauge{},
}

stats.metrics[nodeCounterCountMetric].Set(float64(item.Count))
// add gauges with corresponding labels into vectors
for k, vec := range p.nodeCounterGaugeVecs {
stats.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
nodeCounterItemLabel: item.Node,
nodeCounterReasonLabel: item.Reason,
})
if err != nil {
p.Log.Error(err)
}
}
}

// Delay period between updates
time.Sleep(p.updatePeriod)
stats.metrics[nodeCounterCountMetric].Set(float64(item.Count))
}
}()
return nil
}

// Close is used to clean up resources used by Telemetry Plugin
func (p *Plugin) Close() error {
return safeclose.Close(p.vppCh)
}
}

0 comments on commit 19109ed

Please sign in to comment.