Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usm: http2: Use mmapable telemetry #30363

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions pkg/network/protocols/http2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
package http2

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"os"
"time"
"unsafe"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/features"
"github.com/davecgh/go-spew/spew"
"golang.org/x/sys/unix"

manager "github.com/DataDog/ebpf-manager"

Expand All @@ -39,6 +44,8 @@ type Protocol struct {

// http2Telemetry is used to retrieve metrics from the kernel
http2Telemetry *kernelTelemetry
http2TelemetryBuffer *bytes.Reader
http2TelemetryBufferRawBuf []byte
kernelTelemetryStopChannel chan struct{}

dynamicTable *DynamicTable
Expand Down Expand Up @@ -272,6 +279,13 @@ func (p *Protocol) ConfigureOptions(mgr *manager.Manager, opts *manager.Options)
// Configure event stream
events.Configure(p.cfg, eventStream, mgr, opts)
p.dynamicTable.configureOptions(mgr, opts)

if features.HaveMapFlag(features.BPF_F_MMAPABLE) == nil {
opts.MapSpecEditors[TelemetryMap] = manager.MapSpecEditor{
Flags: unix.BPF_F_MMAPABLE,
EditorFlag: manager.EditFlags,
}
}
}

// PreStart is called before the start of the provided eBPF manager.
Expand All @@ -295,6 +309,17 @@ func (p *Protocol) PreStart(mgr *manager.Manager) (err error) {
p.statkeeper = http.NewStatkeeper(p.cfg, p.telemetry, NewIncompleteBuffer(p.cfg))
p.eventsConsumer.Start()

if telemetryMap, _, mapErr := mgr.GetMap(TelemetryMap); mapErr == nil {
if telemetryMap.Flags()&unix.BPF_F_MMAPABLE != 0 {
// Telemetry struct is 240 bytes, but the mmaped buffer must be page-aligned, so using a single page
p.http2TelemetryBufferRawBuf, err = unix.Mmap(telemetryMap.FD(), 0, os.Getpagesize(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
if err != nil {
return fmt.Errorf("failed to mmap %q map: %w", TelemetryMap, err)
}
p.http2TelemetryBuffer = bytes.NewReader(p.http2TelemetryBufferRawBuf)
}
}

return
}

Expand All @@ -310,6 +335,30 @@ func (p *Protocol) PostStart(mgr *manager.Manager) error {
}

func (p *Protocol) updateKernelTelemetry(mgr *manager.Manager) {
ticker := time.NewTicker(30 * time.Second)
http2Telemetry := &HTTP2Telemetry{}

if p.http2TelemetryBuffer != nil {
go func() {
defer ticker.Stop()

for {
select {
case <-ticker.C:
p.http2TelemetryBuffer.Seek(0, io.SeekStart)
binary.Read(p.http2TelemetryBuffer, binary.LittleEndian, http2Telemetry)
p.http2Telemetry.update(http2Telemetry, false)

p.http2Telemetry.Log()
case <-p.kernelTelemetryStopChannel:
return
}
}
}()
return
}

// Backward compatibility for kernels that do not support mmapable maps
mp, err := protocols.GetMap(mgr, TelemetryMap)
if err != nil {
log.Warn(err)
Expand All @@ -323,8 +372,6 @@ func (p *Protocol) updateKernelTelemetry(mgr *manager.Manager) {
}

var zero uint32
http2Telemetry := &HTTP2Telemetry{}
ticker := time.NewTicker(30 * time.Second)

go func() {
defer ticker.Stop()
Expand Down Expand Up @@ -369,6 +416,12 @@ func (p *Protocol) Stop(_ *manager.Manager) {
p.statkeeper.Close()
}

if p.http2TelemetryBufferRawBuf != nil {
if err := unix.Munmap(p.http2TelemetryBufferRawBuf); err != nil {
log.Errorf("failed to munmap http2 telemetry buffer: %s", err)
}
p.http2TelemetryBuffer = nil
}
close(p.kernelTelemetryStopChannel)
}

Expand Down
Loading