Skip to content

Commit

Permalink
feat: add NATS discovery provider (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 23, 2023
1 parent 6595ebd commit 2ad725a
Show file tree
Hide file tree
Showing 24 changed files with 1,469 additions and 71 deletions.
1 change: 1 addition & 0 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func TestActorSystem(t *testing.T) {
provider.EXPECT().Deregister().Return(nil)
provider.EXPECT().SetConfig(config).Return(nil)
provider.EXPECT().DiscoverPeers().Return(addrs, nil)
provider.EXPECT().Close().Return(nil)

// start the actor system
err = newActorSystem.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Cluster struct {
kvStore olric.DMap

// specifies the Cluster host
host *node
host *discovery.Node

// specifies the hasher
hasher hash.Hasher
Expand Down Expand Up @@ -117,7 +117,7 @@ func New(name string, serviceDiscovery *discovery.ServiceDiscovery, opts ...Opti
}

// get the host info
hostNode, err := hostNode()
hostNode, err := discovery.HostNode()
// handle the error
if err != nil {
cl.logger.Error(errors.Wrap(err, "failed get the host node.💥"))
Expand Down
2 changes: 2 additions & 0 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestCluster(t *testing.T) {
provider.EXPECT().Deregister().Return(nil)
provider.EXPECT().SetConfig(config).Return(nil)
provider.EXPECT().DiscoverPeers().Return(addrs, nil)
provider.EXPECT().Close().Return(nil)

// create the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(provider, config)
Expand Down Expand Up @@ -125,6 +126,7 @@ func TestCluster(t *testing.T) {
provider.EXPECT().Deregister().Return(nil)
provider.EXPECT().SetConfig(config).Return(nil)
provider.EXPECT().DiscoverPeers().Return(addrs, nil)
provider.EXPECT().Close().Return(nil)

// create the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(provider, config)
Expand Down
2 changes: 1 addition & 1 deletion cluster/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,5 @@ func (d *discoveryProvider) DiscoverPeers() ([]string, error) {

// Close implementation
func (d *discoveryProvider) Close() error {
return nil
return d.provider.Close()
}
1 change: 1 addition & 0 deletions cluster/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func TestDiscoveryProvider(t *testing.T) {
t.Run("With Close", func(t *testing.T) {
// mock the underlying discovery provider
provider := new(testkit.Provider)
provider.EXPECT().Close().Return(nil)
wrapper := &discoveryProvider{
provider: provider,
log: log.DefaultLogger.StdLogger(),
Expand Down
2 changes: 2 additions & 0 deletions discovery/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ var (
ErrNotInitialized = errors.New("provider not initialized")
// ErrAlreadyRegistered is used when attempting to re-register the provider
ErrAlreadyRegistered = errors.New("provider already registered")
// ErrNotRegistered is used when attempting to de-register the provider
ErrNotRegistered = errors.New("provider is not registered")
)
43 changes: 24 additions & 19 deletions discovery/kubernetes/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const (
RemotingPortName = "remoting-port"
)

// option represents the kubernetes provider option
type option struct {
// discoConfig represents the kubernetes provider discoConfig
type discoConfig struct {
// Provider specifies the provider name
Provider string
// NameSpace specifies the namespace
Expand All @@ -65,13 +65,13 @@ type option struct {

// Discovery represents the kubernetes discovery
type Discovery struct {
option *option
option *discoConfig
client kubernetes.Interface
mu sync.Mutex

stopChan chan struct{}
// states whether the actor system has started or not
isInitialized *atomic.Bool
initialized *atomic.Bool
}

// enforce compilation error
Expand All @@ -80,14 +80,14 @@ var _ discovery.Provider = &Discovery{}
// NewDiscovery returns an instance of the kubernetes discovery provider
func NewDiscovery() *Discovery {
// create an instance of
k8 := &Discovery{
mu: sync.Mutex{},
stopChan: make(chan struct{}, 1),
isInitialized: atomic.NewBool(false),
option: &option{},
discovery := &Discovery{
mu: sync.Mutex{},
stopChan: make(chan struct{}, 1),
initialized: atomic.NewBool(false),
option: &discoConfig{},
}

return k8
return discovery
}

// ID returns the discovery provider id
Expand All @@ -102,7 +102,7 @@ func (d *Discovery) Initialize() error {
// release the lock
defer d.mu.Unlock()
// first check whether the discovery provider is running
if d.isInitialized.Load() {
if d.initialized.Load() {
return discovery.ErrAlreadyInitialized
}

Expand All @@ -122,7 +122,7 @@ func (d *Discovery) SetConfig(meta discovery.Config) error {
defer d.mu.Unlock()

// first check whether the discovery provider is running
if d.isInitialized.Load() {
if d.initialized.Load() {
return discovery.ErrAlreadyInitialized
}

Expand All @@ -138,7 +138,7 @@ func (d *Discovery) Register() error {

// first check whether the discovery provider has started
// avoid to re-register the discovery
if d.isInitialized.Load() {
if d.initialized.Load() {
return discovery.ErrAlreadyRegistered
}

Expand All @@ -157,7 +157,7 @@ func (d *Discovery) Register() error {
// set the k8 client
d.client = client
// set initialized
d.isInitialized = atomic.NewBool(true)
d.initialized = atomic.NewBool(true)
return nil
}

Expand All @@ -169,11 +169,11 @@ func (d *Discovery) Deregister() error {
defer d.mu.Unlock()

// first check whether the discovery provider has started
if !d.isInitialized.Load() {
if !d.initialized.Load() {
return discovery.ErrNotInitialized
}
// set the initialized to false
d.isInitialized = atomic.NewBool(false)
d.initialized = atomic.NewBool(false)
// stop the watchers
close(d.stopChan)
// return
Expand All @@ -183,7 +183,7 @@ func (d *Discovery) Deregister() error {
// DiscoverPeers returns a list of known nodes.
func (d *Discovery) DiscoverPeers() ([]string, error) {
// first check whether the discovery provider is running
if !d.isInitialized.Load() {
if !d.initialized.Load() {
return nil, discovery.ErrNotInitialized
}

Expand Down Expand Up @@ -249,10 +249,15 @@ MainLoop:
return addresses.ToSlice(), nil
}

// setConfig sets the kubernetes option
// Close closes the provider
func (d *Discovery) Close() error {
return nil
}

// setConfig sets the kubernetes discoConfig
func (d *Discovery) setConfig(config discovery.Config) (err error) {
// create an instance of option
option := new(option)
option := new(discoConfig)
// extract the namespace
option.NameSpace, err = config.GetString(Namespace)
// handle the error in case the namespace value is not properly set
Expand Down
66 changes: 59 additions & 7 deletions discovery/kubernetes/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func TestDiscovery(t *testing.T) {
client := testclient.NewSimpleClientset(pods...)
// create the kubernetes discovery provider
provider := Discovery{
client: client,
isInitialized: atomic.NewBool(true),
option: &option{
client: client,
initialized: atomic.NewBool(true),
option: &discoConfig{
NameSpace: ns,
ActorSystemName: actorSystemName,
ApplicationName: appName,
Expand All @@ -182,6 +182,14 @@ func TestDiscovery(t *testing.T) {
}

assert.ElementsMatch(t, expected, actual)
assert.NoError(t, provider.Close())
})
t.Run("With DiscoverPeers: not initialized", func(t *testing.T) {
provider := NewDiscovery()
peers, err := provider.DiscoverPeers()
assert.Error(t, err)
assert.Empty(t, peers)
assert.EqualError(t, err, discovery.ErrNotInitialized.Error())
})
t.Run("With SetConfig", func(t *testing.T) {
// create the various config option
Expand All @@ -207,7 +215,7 @@ func TestDiscovery(t *testing.T) {
actorSystemName := "AccountsSystem"
// create the instance of provider
provider := NewDiscovery()
provider.isInitialized = atomic.NewBool(true)
provider.initialized = atomic.NewBool(true)
// create the config
config := discovery.Config{
ApplicationName: applicationName,
Expand All @@ -220,6 +228,50 @@ func TestDiscovery(t *testing.T) {
assert.Error(t, err)
assert.EqualError(t, err, discovery.ErrAlreadyInitialized.Error())
})
t.Run("With SetConfig: actor system not set", func(t *testing.T) {
// create the various config option
namespace := "default"
applicationName := "accounts"
// create the instance of provider
provider := NewDiscovery()
// create the config
config := discovery.Config{
ApplicationName: applicationName,
Namespace: namespace,
}

// set config
assert.Error(t, provider.SetConfig(config))
})
t.Run("With SetConfig: application name not set", func(t *testing.T) {
// create the various config option
namespace := "default"
actorSystemName := "AccountsSystem"
// create the instance of provider
provider := NewDiscovery()
// create the config
config := discovery.Config{
ActorSystemName: actorSystemName,
Namespace: namespace,
}

// set config
assert.Error(t, provider.SetConfig(config))
})
t.Run("With SetConfig: namespace not set", func(t *testing.T) {
applicationName := "accounts"
actorSystemName := "AccountsSystem"
// create the instance of provider
provider := NewDiscovery()
// create the config
config := discovery.Config{
ApplicationName: applicationName,
ActorSystemName: actorSystemName,
}

// set config
assert.Error(t, provider.SetConfig(config))
})
t.Run("With Initialize", func(t *testing.T) {
// create the various config option
namespace := "default"
Expand All @@ -241,21 +293,21 @@ func TestDiscovery(t *testing.T) {
t.Run("With Initialize: already initialized", func(t *testing.T) {
// create the instance of provider
provider := NewDiscovery()
provider.isInitialized = atomic.NewBool(true)
provider.initialized = atomic.NewBool(true)
assert.Error(t, provider.Initialize())
})
t.Run("With Deregister", func(t *testing.T) {
// create the instance of provider
provider := NewDiscovery()
// for the sake of the test
provider.isInitialized = atomic.NewBool(true)
provider.initialized = atomic.NewBool(true)
assert.NoError(t, provider.Deregister())
})
t.Run("With Deregister when not initialized", func(t *testing.T) {
// create the instance of provider
provider := NewDiscovery()
// for the sake of the test
provider.isInitialized = atomic.NewBool(false)
provider.initialized = atomic.NewBool(false)
err := provider.Deregister()
assert.Error(t, err)
assert.EqualError(t, err, discovery.ErrNotInitialized.Error())
Expand Down
Loading

0 comments on commit 2ad725a

Please sign in to comment.