Skip to content

Commit

Permalink
Reset gateway connection status when health checker reports connected
Browse files Browse the repository at this point in the history
When the health checker reports a connection error, the gateway syncer
sets the gateway connection status to error. When the health checker
subsequently reports connected, the gateway syncer needs to set the
status back to connected.

The rest of the changes were related to adding unit tests for the health
checker's interaction with the gateway syncer. This entailed creating
a PingerInterface and a fake implementation that can be injected into the
health checker.

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis committed Dec 3, 2020
1 parent a12d325 commit 5cb10c9
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 92 deletions.
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,13 @@ func main() {

var cableHealthchecker healthchecker.Interface
if len(submSpec.GlobalCidr) == 0 && submSpec.HealthCheckEnabled {
cableHealthchecker, err = healthchecker.New(&watcher.Config{RestConfig: cfg}, submSpec.Namespace,
submSpec.ClusterID, submSpec.HealthCheckInterval, submSpec.HealthCheckMaxPacketLossCount)
cableHealthchecker, err = healthchecker.New(&healthchecker.Config{
WatcherConfig: &watcher.Config{RestConfig: cfg},
EndpointNamespace: submSpec.Namespace,
ClusterID: submSpec.ClusterID,
PingInterval: submSpec.HealthCheckInterval,
MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount,
})
if err != nil {
klog.Errorf("Error creating healthChecker: %v", err)
}
Expand Down
62 changes: 62 additions & 0 deletions pkg/cableengine/healthchecker/fake/pinger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package fake

import (
"sync/atomic"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
)

type Pinger struct {
ip string
latencyInfo atomic.Value
start chan struct{}
stop chan struct{}
}

func NewPinger(ip string) *Pinger {
return &Pinger{
ip: ip,
start: make(chan struct{}),
stop: make(chan struct{}),
}
}

func (p *Pinger) Start() {
defer GinkgoRecover()
Expect(p.start).ToNot(BeClosed())
close(p.start)
}

func (p *Pinger) Stop() {
defer GinkgoRecover()
Expect(p.stop).ToNot(BeClosed())
close(p.stop)
}

func (p *Pinger) GetLatencyInfo() *healthchecker.LatencyInfo {
o := p.latencyInfo.Load()
if o != nil {
info := o.(healthchecker.LatencyInfo)
return &info
}

return nil
}

func (p *Pinger) SetLatencyInfo(info *healthchecker.LatencyInfo) {
p.latencyInfo.Store(*info)
}

func (p *Pinger) GetIP() string {
return p.ip
}

func (p *Pinger) AwaitStart() {
Eventually(p.start).Should(BeClosed(), "Start was not called")
}

func (p *Pinger) AwaitStop() {
Eventually(p.stop).Should(BeClosed(), "Stop was not called")
}
95 changes: 45 additions & 50 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package healthchecker

import (
"strconv"
"sync"
"time"

Expand All @@ -23,21 +22,27 @@ type Interface interface {
GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo
}

type Config struct {
WatcherConfig *watcher.Config
EndpointNamespace string
ClusterID string
PingInterval uint
MaxPacketLossCount uint
NewPinger func(string, time.Duration, uint) PingerInterface
}

type controller struct {
endpointWatcher watcher.Interface
pingers sync.Map
clusterID string
pingInterval uint
maxPacketLossCount uint
endpointWatcher watcher.Interface
pingers sync.Map
config *Config
}

func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterval, maxPacketLossCount uint) (Interface, error) {
func New(config *Config) (Interface, error) {
controller := &controller{
clusterID: clusterID,
pingInterval: pingInterval,
maxPacketLossCount: maxPacketLossCount,
config: config,
}
config.ResourceConfigs = []watcher.ResourceConfig{

config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "HealthChecker Endpoint Controller",
ResourceType: &submarinerv1.Endpoint{},
Expand All @@ -46,41 +51,23 @@ func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterv
OnUpdateFunc: controller.endpointCreatedorUpdated,
OnDeleteFunc: controller.endpointDeleted,
},
SourceNamespace: endpointNameSpace,
SourceNamespace: config.EndpointNamespace,
},
}

endpointWatcher, err := watcher.New(config)
var err error

controller.endpointWatcher, err = watcher.New(config.WatcherConfig)
if err != nil {
return nil, err
}

controller.endpointWatcher = endpointWatcher

return controller, nil
}

func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo {
if obj, found := h.pingers.Load(endpoint.CableName); found {
pinger := obj.(*pingerInfo)

lastTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.lastRtt, 10) + "ns")
minTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.minRtt, 10) + "ns")
averageTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.mean, 10) + "ns")
maxTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.maxRtt, 10) + "ns")
stdDevTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.stdDev, 10) + "ns")

return &LatencyInfo{
ConnectionError: pinger.failureMsg,
Spec: &submarinerv1.LatencyRTTSpec{
Last: lastTime.String(),
Min: minTime.String(),
Average: averageTime.String(),
Max: maxTime.String(),
StdDev: stdDevTime.String(),
},
}
return obj.(PingerInterface).GetLatencyInfo()
}

return nil
Expand All @@ -91,13 +78,16 @@ func (h *controller) Start(stopCh <-chan struct{}) error {
return err
}

klog.Infof("CableEngine HealthChecker started with PingInterval: %v, MaxPacketLossCount: %v", h.config.PingInterval,
h.config.MaxPacketLossCount)

return nil
}

func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool {
klog.V(log.TRACE).Infof("Endpoint created: %#v", obj)
endpointCreated := obj.(*submarinerv1.Endpoint)
if endpointCreated.Spec.ClusterID == h.clusterID {
if endpointCreated.Spec.ClusterID == h.config.ClusterID {
return false
}

Expand All @@ -108,33 +98,38 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool {
}

if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found {
pinger := obj.(*pingerInfo)
if pinger.healthCheckIP == endpointCreated.Spec.HealthCheckIP {
pinger := obj.(PingerInterface)
if pinger.GetIP() == endpointCreated.Spec.HealthCheckIP {
return false
}

klog.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name)
pinger.stop()
pinger.Stop()
h.pingers.Delete(endpointCreated.Spec.CableName)
}

klog.V(log.TRACE).Infof("Starting Pinger for CableName: %q, with HealthCheckIP: %q",
endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP)

pingInterval := DefaultPingInterval
if h.pingInterval != 0 {
pingInterval = time.Second * time.Duration(h.pingInterval)
pingInterval := defaultPingInterval
if h.config.PingInterval != 0 {
pingInterval = time.Second * time.Duration(h.config.PingInterval)
}

maxPacketLossCount := DefaultMaxPacketLossCount
maxPacketLossCount := defaultMaxPacketLossCount

if h.config.MaxPacketLossCount != 0 {
maxPacketLossCount = h.config.MaxPacketLossCount
}

if h.maxPacketLossCount != 0 {
maxPacketLossCount = h.maxPacketLossCount
newPingerFunc := h.config.NewPinger
if newPingerFunc == nil {
newPingerFunc = newPinger
}

pinger := newPinger(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount)
pinger := newPingerFunc(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount)
h.pingers.Store(endpointCreated.Spec.CableName, pinger)
pinger.start()
pinger.Start()

klog.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q",
endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP)

return false
}
Expand All @@ -146,8 +141,8 @@ func (h *controller) endpointDeleted(obj runtime.Object) bool {
}

if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found {
pinger := obj.(*pingerInfo)
pinger.stop()
pinger := obj.(PingerInterface)
pinger.Stop()
h.pingers.Delete(endpointDeleted.Spec.CableName)
}

Expand Down
55 changes: 43 additions & 12 deletions pkg/cableengine/healthchecker/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,44 @@ package healthchecker

import (
"fmt"
"strconv"
"time"

"github.com/go-ping/ping"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"k8s.io/klog"
)

var waitTime = 15 * time.Second

var DefaultMaxPacketLossCount uint = 5
var defaultMaxPacketLossCount uint = 5

// The RTT will be stored and will be used to calculate the statistics until
// the size is reached. Once the size is reached the array will be reset and
// the last elements will be added to the array for statistics.
var size uint64 = 1000

var DefaultPingInterval = 1 * time.Second
var defaultPingInterval = 1 * time.Second

type PingerInterface interface {
Start()
Stop()
GetLatencyInfo() *LatencyInfo
GetIP() string
}

type pingerInfo struct {
healthCheckIP string
ip string
pingInterval time.Duration
maxPacketLossCount uint
statistics statistics
failureMsg string
stopCh chan struct{}
}

func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCount uint) *pingerInfo {
func newPinger(ip string, pingInterval time.Duration, maxPacketLossCount uint) PingerInterface {
return &pingerInfo{
healthCheckIP: healthCheckIP,
ip: ip,
pingInterval: pingInterval,
maxPacketLossCount: maxPacketLossCount,
statistics: statistics{
Expand All @@ -41,7 +50,7 @@ func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCo
}
}

func (p *pingerInfo) start() {
func (p *pingerInfo) Start() {
go func() {
for {
select {
Expand All @@ -52,17 +61,16 @@ func (p *pingerInfo) start() {
}
}
}()
klog.Infof("CableEngine HealthChecker started pinger for IP %q", p.healthCheckIP)
}

func (p *pingerInfo) stop() {
func (p *pingerInfo) Stop() {
close(p.stopCh)
}

func (p *pingerInfo) sendPing() {
pinger, err := ping.NewPinger(p.healthCheckIP)
pinger, err := ping.NewPinger(p.ip)
if err != nil {
klog.Errorf("Error creating pinger for IP %q: %v", p.healthCheckIP, err)
klog.Errorf("Error creating pinger for IP %q: %v", p.ip, err)
return
}

Expand All @@ -73,7 +81,7 @@ func (p *pingerInfo) sendPing() {
pinger.OnSend = func(packet *ping.Packet) {
// Pinger will mark a connection as an error if the packet loss reaches the threshold
if pinger.PacketsSent-pinger.PacketsRecv > int(p.maxPacketLossCount) {
p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.healthCheckIP)
p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.ip)
pinger.PacketsSent = 0
pinger.PacketsRecv = 0
}
Expand All @@ -86,6 +94,29 @@ func (p *pingerInfo) sendPing() {

err = pinger.Run()
if err != nil {
klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.healthCheckIP, err)
klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.ip, err)
}
}

func (p *pingerInfo) GetIP() string {
return p.ip
}

func (p *pingerInfo) GetLatencyInfo() *LatencyInfo {
lastTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.lastRtt, 10) + "ns")
minTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.minRtt, 10) + "ns")
averageTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.mean, 10) + "ns")
maxTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.maxRtt, 10) + "ns")
stdDevTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.stdDev, 10) + "ns")

return &LatencyInfo{
ConnectionError: p.failureMsg,
Spec: &submarinerv1.LatencyRTTSpec{
Last: lastTime.String(),
Min: minTime.String(),
Average: averageTime.String(),
Max: maxTime.String(),
StdDev: stdDevTime.String(),
},
}
}
3 changes: 3 additions & 0 deletions pkg/cableengine/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway {
connection.Status = v1.ConnectionError
connection.StatusMessage = latencyInfo.ConnectionError
}
} else if connection.Status == v1.ConnectionError && latencyInfo.ConnectionError == "" {
connection.Status = v1.Connected
connection.StatusMessage = ""
}
}
}
Expand Down
Loading

0 comments on commit 5cb10c9

Please sign in to comment.