From 5cb10c9dce12d22e175e62c19c850aff60c8d670 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 2 Dec 2020 20:41:34 -0500 Subject: [PATCH] Reset gateway connection status when health checker reports connected When the health checker reports a connection error, the gateway syncer sets the gateway connection status to error. When the health checker subsequently reports connected, the gateway syncer needs to set the status back to connected. The rest of the changes were related to adding unit tests for the health checker's interaction with the gateway syncer. This entailed creating a PingerInterface and a fake implementation that can be injected into the health checker. Signed-off-by: Tom Pantelis --- main.go | 9 +- pkg/cableengine/healthchecker/fake/pinger.go | 62 ++++++ .../healthchecker/healthchecker.go | 95 ++++----- pkg/cableengine/healthchecker/pinger.go | 55 +++-- pkg/cableengine/syncer/syncer.go | 3 + pkg/cableengine/syncer/syncer_test.go | 200 +++++++++++++++--- 6 files changed, 332 insertions(+), 92 deletions(-) create mode 100644 pkg/cableengine/healthchecker/fake/pinger.go diff --git a/main.go b/main.go index 2aae4c0d4..606157f57 100644 --- a/main.go +++ b/main.go @@ -125,8 +125,13 @@ func main() { var cableHealthchecker healthchecker.Interface if len(submSpec.GlobalCidr) == 0 && submSpec.HealthCheckEnabled { - cableHealthchecker, err = healthchecker.New(&watcher.Config{RestConfig: cfg}, submSpec.Namespace, - submSpec.ClusterID, submSpec.HealthCheckInterval, submSpec.HealthCheckMaxPacketLossCount) + cableHealthchecker, err = healthchecker.New(&healthchecker.Config{ + WatcherConfig: &watcher.Config{RestConfig: cfg}, + EndpointNamespace: submSpec.Namespace, + ClusterID: submSpec.ClusterID, + PingInterval: submSpec.HealthCheckInterval, + MaxPacketLossCount: submSpec.HealthCheckMaxPacketLossCount, + }) if err != nil { klog.Errorf("Error creating healthChecker: %v", err) } diff --git a/pkg/cableengine/healthchecker/fake/pinger.go b/pkg/cableengine/healthchecker/fake/pinger.go new file mode 100644 index 000000000..68a814172 --- /dev/null +++ b/pkg/cableengine/healthchecker/fake/pinger.go @@ -0,0 +1,62 @@ +package fake + +import ( + "sync/atomic" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" +) + +type Pinger struct { + ip string + latencyInfo atomic.Value + start chan struct{} + stop chan struct{} +} + +func NewPinger(ip string) *Pinger { + return &Pinger{ + ip: ip, + start: make(chan struct{}), + stop: make(chan struct{}), + } +} + +func (p *Pinger) Start() { + defer GinkgoRecover() + Expect(p.start).ToNot(BeClosed()) + close(p.start) +} + +func (p *Pinger) Stop() { + defer GinkgoRecover() + Expect(p.stop).ToNot(BeClosed()) + close(p.stop) +} + +func (p *Pinger) GetLatencyInfo() *healthchecker.LatencyInfo { + o := p.latencyInfo.Load() + if o != nil { + info := o.(healthchecker.LatencyInfo) + return &info + } + + return nil +} + +func (p *Pinger) SetLatencyInfo(info *healthchecker.LatencyInfo) { + p.latencyInfo.Store(*info) +} + +func (p *Pinger) GetIP() string { + return p.ip +} + +func (p *Pinger) AwaitStart() { + Eventually(p.start).Should(BeClosed(), "Start was not called") +} + +func (p *Pinger) AwaitStop() { + Eventually(p.stop).Should(BeClosed(), "Stop was not called") +} diff --git a/pkg/cableengine/healthchecker/healthchecker.go b/pkg/cableengine/healthchecker/healthchecker.go index 404619ea4..c848263cc 100644 --- a/pkg/cableengine/healthchecker/healthchecker.go +++ b/pkg/cableengine/healthchecker/healthchecker.go @@ -1,7 +1,6 @@ package healthchecker import ( - "strconv" "sync" "time" @@ -23,21 +22,27 @@ type Interface interface { GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo } +type Config struct { + WatcherConfig *watcher.Config + EndpointNamespace string + ClusterID string + PingInterval uint + MaxPacketLossCount uint + NewPinger func(string, time.Duration, uint) PingerInterface +} + type controller struct { - endpointWatcher watcher.Interface - pingers sync.Map - clusterID string - pingInterval uint - maxPacketLossCount uint + endpointWatcher watcher.Interface + pingers sync.Map + config *Config } -func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterval, maxPacketLossCount uint) (Interface, error) { +func New(config *Config) (Interface, error) { controller := &controller{ - clusterID: clusterID, - pingInterval: pingInterval, - maxPacketLossCount: maxPacketLossCount, + config: config, } - config.ResourceConfigs = []watcher.ResourceConfig{ + + config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{ { Name: "HealthChecker Endpoint Controller", ResourceType: &submarinerv1.Endpoint{}, @@ -46,41 +51,23 @@ func New(config *watcher.Config, endpointNameSpace, clusterID string, pingInterv OnUpdateFunc: controller.endpointCreatedorUpdated, OnDeleteFunc: controller.endpointDeleted, }, - SourceNamespace: endpointNameSpace, + SourceNamespace: config.EndpointNamespace, }, } - endpointWatcher, err := watcher.New(config) + var err error + controller.endpointWatcher, err = watcher.New(config.WatcherConfig) if err != nil { return nil, err } - controller.endpointWatcher = endpointWatcher - return controller, nil } func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo { if obj, found := h.pingers.Load(endpoint.CableName); found { - pinger := obj.(*pingerInfo) - - lastTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.lastRtt, 10) + "ns") - minTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.minRtt, 10) + "ns") - averageTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.mean, 10) + "ns") - maxTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.maxRtt, 10) + "ns") - stdDevTime, _ := time.ParseDuration(strconv.FormatUint(pinger.statistics.stdDev, 10) + "ns") - - return &LatencyInfo{ - ConnectionError: pinger.failureMsg, - Spec: &submarinerv1.LatencyRTTSpec{ - Last: lastTime.String(), - Min: minTime.String(), - Average: averageTime.String(), - Max: maxTime.String(), - StdDev: stdDevTime.String(), - }, - } + return obj.(PingerInterface).GetLatencyInfo() } return nil @@ -91,13 +78,16 @@ func (h *controller) Start(stopCh <-chan struct{}) error { return err } + klog.Infof("CableEngine HealthChecker started with PingInterval: %v, MaxPacketLossCount: %v", h.config.PingInterval, + h.config.MaxPacketLossCount) + return nil } func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool { klog.V(log.TRACE).Infof("Endpoint created: %#v", obj) endpointCreated := obj.(*submarinerv1.Endpoint) - if endpointCreated.Spec.ClusterID == h.clusterID { + if endpointCreated.Spec.ClusterID == h.config.ClusterID { return false } @@ -108,33 +98,38 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool { } if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found { - pinger := obj.(*pingerInfo) - if pinger.healthCheckIP == endpointCreated.Spec.HealthCheckIP { + pinger := obj.(PingerInterface) + if pinger.GetIP() == endpointCreated.Spec.HealthCheckIP { return false } klog.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name) - pinger.stop() + pinger.Stop() h.pingers.Delete(endpointCreated.Spec.CableName) } - klog.V(log.TRACE).Infof("Starting Pinger for CableName: %q, with HealthCheckIP: %q", - endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP) - - pingInterval := DefaultPingInterval - if h.pingInterval != 0 { - pingInterval = time.Second * time.Duration(h.pingInterval) + pingInterval := defaultPingInterval + if h.config.PingInterval != 0 { + pingInterval = time.Second * time.Duration(h.config.PingInterval) } - maxPacketLossCount := DefaultMaxPacketLossCount + maxPacketLossCount := defaultMaxPacketLossCount + + if h.config.MaxPacketLossCount != 0 { + maxPacketLossCount = h.config.MaxPacketLossCount + } - if h.maxPacketLossCount != 0 { - maxPacketLossCount = h.maxPacketLossCount + newPingerFunc := h.config.NewPinger + if newPingerFunc == nil { + newPingerFunc = newPinger } - pinger := newPinger(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount) + pinger := newPingerFunc(endpointCreated.Spec.HealthCheckIP, pingInterval, maxPacketLossCount) h.pingers.Store(endpointCreated.Spec.CableName, pinger) - pinger.start() + pinger.Start() + + klog.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q", + endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP) return false } @@ -146,8 +141,8 @@ func (h *controller) endpointDeleted(obj runtime.Object) bool { } if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found { - pinger := obj.(*pingerInfo) - pinger.stop() + pinger := obj.(PingerInterface) + pinger.Stop() h.pingers.Delete(endpointDeleted.Spec.CableName) } diff --git a/pkg/cableengine/healthchecker/pinger.go b/pkg/cableengine/healthchecker/pinger.go index 30196244a..d0b45f175 100644 --- a/pkg/cableengine/healthchecker/pinger.go +++ b/pkg/cableengine/healthchecker/pinger.go @@ -2,25 +2,34 @@ package healthchecker import ( "fmt" + "strconv" "time" "github.com/go-ping/ping" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "k8s.io/klog" ) var waitTime = 15 * time.Second -var DefaultMaxPacketLossCount uint = 5 +var defaultMaxPacketLossCount uint = 5 // The RTT will be stored and will be used to calculate the statistics until // the size is reached. Once the size is reached the array will be reset and // the last elements will be added to the array for statistics. var size uint64 = 1000 -var DefaultPingInterval = 1 * time.Second +var defaultPingInterval = 1 * time.Second + +type PingerInterface interface { + Start() + Stop() + GetLatencyInfo() *LatencyInfo + GetIP() string +} type pingerInfo struct { - healthCheckIP string + ip string pingInterval time.Duration maxPacketLossCount uint statistics statistics @@ -28,9 +37,9 @@ type pingerInfo struct { stopCh chan struct{} } -func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCount uint) *pingerInfo { +func newPinger(ip string, pingInterval time.Duration, maxPacketLossCount uint) PingerInterface { return &pingerInfo{ - healthCheckIP: healthCheckIP, + ip: ip, pingInterval: pingInterval, maxPacketLossCount: maxPacketLossCount, statistics: statistics{ @@ -41,7 +50,7 @@ func newPinger(healthCheckIP string, pingInterval time.Duration, maxPacketLossCo } } -func (p *pingerInfo) start() { +func (p *pingerInfo) Start() { go func() { for { select { @@ -52,17 +61,16 @@ func (p *pingerInfo) start() { } } }() - klog.Infof("CableEngine HealthChecker started pinger for IP %q", p.healthCheckIP) } -func (p *pingerInfo) stop() { +func (p *pingerInfo) Stop() { close(p.stopCh) } func (p *pingerInfo) sendPing() { - pinger, err := ping.NewPinger(p.healthCheckIP) + pinger, err := ping.NewPinger(p.ip) if err != nil { - klog.Errorf("Error creating pinger for IP %q: %v", p.healthCheckIP, err) + klog.Errorf("Error creating pinger for IP %q: %v", p.ip, err) return } @@ -73,7 +81,7 @@ func (p *pingerInfo) sendPing() { pinger.OnSend = func(packet *ping.Packet) { // Pinger will mark a connection as an error if the packet loss reaches the threshold if pinger.PacketsSent-pinger.PacketsRecv > int(p.maxPacketLossCount) { - p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.healthCheckIP) + p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.ip) pinger.PacketsSent = 0 pinger.PacketsRecv = 0 } @@ -86,6 +94,29 @@ func (p *pingerInfo) sendPing() { err = pinger.Run() if err != nil { - klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.healthCheckIP, err) + klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.ip, err) + } +} + +func (p *pingerInfo) GetIP() string { + return p.ip +} + +func (p *pingerInfo) GetLatencyInfo() *LatencyInfo { + lastTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.lastRtt, 10) + "ns") + minTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.minRtt, 10) + "ns") + averageTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.mean, 10) + "ns") + maxTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.maxRtt, 10) + "ns") + stdDevTime, _ := time.ParseDuration(strconv.FormatUint(p.statistics.stdDev, 10) + "ns") + + return &LatencyInfo{ + ConnectionError: p.failureMsg, + Spec: &submarinerv1.LatencyRTTSpec{ + Last: lastTime.String(), + Min: minTime.String(), + Average: averageTime.String(), + Max: maxTime.String(), + StdDev: stdDevTime.String(), + }, } } diff --git a/pkg/cableengine/syncer/syncer.go b/pkg/cableengine/syncer/syncer.go index 6cc9337c6..05539e558 100644 --- a/pkg/cableengine/syncer/syncer.go +++ b/pkg/cableengine/syncer/syncer.go @@ -218,6 +218,9 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway { connection.Status = v1.ConnectionError connection.StatusMessage = latencyInfo.ConnectionError } + } else if connection.Status == v1.ConnectionError && latencyInfo.ConnectionError == "" { + connection.Status = v1.Connected + connection.StatusMessage = "" } } } diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index d54ba008f..3261e7f2e 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -1,13 +1,16 @@ package syncer_test import ( - "fmt" + "reflect" "strconv" + "strings" "testing" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" + gomegaTypes "github.com/onsi/gomega/types" "github.com/pkg/errors" . "github.com/submariner-io/admiral/pkg/gomega" "github.com/submariner-io/admiral/pkg/syncer/test" @@ -15,14 +18,17 @@ import ( submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" fakeEngine "github.com/submariner-io/submariner/pkg/cableengine/fake" "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker/fake" "github.com/submariner-io/submariner/pkg/cableengine/syncer" fakeClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned/fake" fakeClientsetv1 "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1/fake" submarinerInformers "github.com/submariner-io/submariner/pkg/client/informers/externalversions" "github.com/submariner-io/submariner/pkg/types" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/submariner-io/submariner/pkg/util" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" kubeScheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" @@ -33,9 +39,9 @@ const ( namespace = "submariner" ) -var _ = Describe("", func() { +func init() { klog.InitFlags(nil) -}) +} var _ = BeforeSuite(func() { syncer.GatewayUpdateInterval = 200 * time.Millisecond @@ -46,6 +52,7 @@ var _ = Describe("", func() { Context("Gateway syncing", testGatewaySyncing) Context("Stale Gateway cleanup", testStaleGatewayCleanup) Context("Gateway sync errors", testGatewaySyncErrors) + Context("Gateway latency info", testGatewayLatencyInfo) }) func testGatewaySyncing() { @@ -133,7 +140,7 @@ func testStaleGatewayCleanup() { BeforeEach(func() { t = newTestDriver() staleGateway = &submarinerv1.Gateway{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "raiders", }, Status: submarinerv1.GatewayStatus{ @@ -285,11 +292,101 @@ func testGatewaySyncErrors() { }) } +func testGatewayLatencyInfo() { + var t *testDriver + + BeforeEach(func() { + t = newTestDriver() + }) + + JustBeforeEach(func() { + t.run() + }) + + AfterEach(func() { + t.stop() + }) + + When("the health checker provides latency info", func() { + It("should correctly update the Gateway Status information", func() { + t.awaitGatewayUpdated(t.expectedGateway) + + endpointSpec := &submarinerv1.EndpointSpec{ + ClusterID: "north", + CableName: "submariner-cable-north-192-68-1-20", + PrivateIP: "192-68-1-20", + HealthCheckIP: t.pinger.GetIP(), + } + + endpointName, err := util.GetEndpointCRDNameFromParams(endpointSpec.ClusterID, endpointSpec.CableName) + Expect(err).To(Succeed()) + + test.CreateResource(t.endpoints, &submarinerv1.Endpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: endpointName, + }, + Spec: *endpointSpec, + }) + + t.engine.Lock() + + t.expectedGateway.Status.HAStatus = submarinerv1.HAStatusActive + t.engine.HAStatus = t.expectedGateway.Status.HAStatus + + t.expectedGateway.Status.Connections = []submarinerv1.Connection{ + { + Status: submarinerv1.Connected, + Endpoint: *endpointSpec, + }, + } + + t.engine.Connections = []submarinerv1.Connection{t.expectedGateway.Status.Connections[0]} + + t.expectedGateway.Status.Connections[0].LatencyRTT = &submarinerv1.LatencyRTTSpec{ + Last: "93ms", + Min: "90ms", + Average: "95ms", + Max: "100ms", + StdDev: "94ms", + } + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.engine.Unlock() + + t.awaitGatewayUpdated(t.expectedGateway) + + t.expectedGateway.Status.Connections[0].Status = submarinerv1.ConnectionError + t.expectedGateway.Status.Connections[0].StatusMessage = "Ping failed" + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + ConnectionError: t.expectedGateway.Status.Connections[0].StatusMessage, + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.awaitGatewayUpdated(t.expectedGateway) + + t.expectedGateway.Status.Connections[0].Status = submarinerv1.Connected + t.expectedGateway.Status.Connections[0].StatusMessage = "" + + t.pinger.SetLatencyInfo(&healthchecker.LatencyInfo{ + Spec: t.expectedGateway.Status.Connections[0].LatencyRTT, + }) + + t.awaitGatewayUpdated(t.expectedGateway) + }) + }) +} + type testDriver struct { engine *fakeEngine.Engine gateways *fakeClientsetv1.FailingGateways syncer *syncer.GatewaySyncer healthChecker healthchecker.Interface + pinger *fake.Pinger + endpoints dynamic.ResourceInterface expectedGateway *submarinerv1.Gateway expectedDeletedAfter *submarinerv1.Gateway gatewayUpdated chan *submarinerv1.Gateway @@ -321,7 +418,7 @@ func newTestDriver() *testDriver { }} t.expectedGateway = &submarinerv1.Gateway{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: t.engine.LocalEndPoint.Spec.Hostname, }, Status: submarinerv1.GatewayStatus{ @@ -352,11 +449,26 @@ func (t *testDriver) run() { client := fakeClientset.NewSimpleClientset() t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace) - t.healthChecker, _ = healthchecker.New(&watcher.Config{ - RestMapper: restMapper, - Client: dynamicClient, - Scheme: scheme, - }, namespace, "west", 1, 15) + + t.pinger = fake.NewPinger("10.130.2.2") + + t.healthChecker, _ = healthchecker.New(&healthchecker.Config{ + WatcherConfig: &watcher.Config{ + RestMapper: restMapper, + Client: dynamicClient, + Scheme: scheme, + }, + EndpointNamespace: namespace, + ClusterID: t.engine.LocalEndPoint.Spec.ClusterID, + NewPinger: func(ip string, i time.Duration, m uint) healthchecker.PingerInterface { + defer GinkgoRecover() + Expect(ip).To(Equal(t.pinger.GetIP())) + return t.pinger + }, + }) + + t.endpoints = dynamicClient.Resource(*test.GetGroupVersionResourceFor(restMapper, &submarinerv1.Endpoint{})).Namespace(namespace) + t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, t.healthChecker) informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0) @@ -378,6 +490,8 @@ func (t *testDriver) run() { Expect(cache.WaitForCacheSync(t.stopInformer, informer.HasSynced)).To(BeTrue()) t.syncer.Run(t.stopSyncer) + + Expect(t.healthChecker.Start(t.stopSyncer)).To(Succeed()) } func (t *testDriver) stop() { @@ -392,7 +506,7 @@ func (t *testDriver) stop() { } func (t *testDriver) awaitGatewayUpdated(expected *submarinerv1.Gateway) { - t.awaitGateway(t.gatewayUpdated, fmt.Sprintf("Gateway was not received - %#v", expected), expected) + t.awaitGateway(t.gatewayUpdated, expected) } func (t *testDriver) awaitNoGatewayUpdated() { @@ -400,36 +514,66 @@ func (t *testDriver) awaitNoGatewayUpdated() { } func (t *testDriver) awaitGatewayDeleted(expected *submarinerv1.Gateway) { - t.awaitGateway(t.gatewayDeleted, fmt.Sprintf("Gateway was not deleted - %#v", expected), expected) + t.awaitGateway(t.gatewayDeleted, expected) } func (t *testDriver) awaitNoGatewayDeleted() { Consistently(t.gatewayDeleted, syncer.GatewayUpdateInterval+50).ShouldNot(Receive(), "Gateway was unexpectedly deleted") } -func (t *testDriver) awaitGateway(gatewayChan chan *submarinerv1.Gateway, msg string, expected *submarinerv1.Gateway) { - actual, err := func() (*submarinerv1.Gateway, error) { +func (t *testDriver) awaitGateway(gatewayChan chan *submarinerv1.Gateway, expected *submarinerv1.Gateway) { + var last *submarinerv1.Gateway + + Eventually(func() *submarinerv1.Gateway { select { case gw := <-gatewayChan: - return gw, nil - case <-time.After(5 * time.Second): - return nil, fmt.Errorf(msg) + last = gw + return gw + default: + return last } - }() + }, 5).Should(equalGateway(expected)) +} + +func TestSyncer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Cable engine syncer Suite") +} + +type equalGatewayMatcher struct { + expected *submarinerv1.Gateway +} + +func equalGateway(expected *submarinerv1.Gateway) gomegaTypes.GomegaMatcher { + return &equalGatewayMatcher{expected} +} - Expect(err).To(Succeed()) - Expect(actual.Name).To(Equal(expected.Name)) +func (m *equalGatewayMatcher) Match(x interface{}) (bool, error) { + actual := x.(*submarinerv1.Gateway) + if actual == nil { + return false, nil + } + + if actual.Name != m.expected.Name { + return false, nil + } + + if m.expected.Status.StatusFailure != "" { + if !strings.Contains(actual.Status.StatusFailure, m.expected.Status.StatusFailure) { + return false, nil + } - if expected.Status.StatusFailure != "" { - Expect(actual.Status.StatusFailure).To(ContainSubstring(expected.Status.StatusFailure)) actual = actual.DeepCopy() - actual.Status.StatusFailure = expected.Status.StatusFailure + actual.Status.StatusFailure = m.expected.Status.StatusFailure } - Expect(actual.Status).To(Equal(expected.Status)) + return reflect.DeepEqual(actual.Status, m.expected.Status), nil } -func TestSyncer(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Cable engine syncer Suite") +func (m *equalGatewayMatcher) FailureMessage(actual interface{}) string { + return format.Message(actual, "to equal", m.expected) +} + +func (m *equalGatewayMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return format.Message(actual, "not to equal", m.expected) }