Skip to content

Commit

Permalink
Improve config label support (ligato#1930)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Motičák <peter.moticak@pantheon.tech>
Signed-off-by: pemoticak <109155041+pemoticak@users.noreply.github.com>
  • Loading branch information
pemoticak authored Jun 2, 2023
1 parent 838a647 commit a5dcb5a
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 79 deletions.
27 changes: 27 additions & 0 deletions client/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ type UpdateItem struct {
Labels map[string]string
}

// GetValue exists so that UpdateItem satisfies datasync.LazyValue interface.
func (item UpdateItem) GetValue(out proto.Message) error {
if item.Message != nil {
proto.Merge(out, item.Message)
}
return nil
}

func (item UpdateItem) GetLabels() map[string]string {
return item.Labels
}

type UpdateResult struct {
Key string
Status *generic.ItemStatus
Expand Down Expand Up @@ -62,6 +74,9 @@ type GenericClient interface {
// ChangeRequest returns transaction for changing config.
ChangeRequest() ChangeRequest

// NewItemChange() return transaction with label support for changing config.
NewItemChange() ItemChange

// ResyncConfig overwrites existing config.
ResyncConfig(items ...proto.Message) error

Expand Down Expand Up @@ -94,3 +109,15 @@ type ChangeRequest interface {
// Send sends the request.
Send(ctx context.Context) error
}

// ItemChange is interface for update item change request (so it supports item labels as well).
type ItemChange interface {
// Update appends updates for given items to the batch change request.
Update(items ...UpdateItem) ItemChange

// Delete appends deletes for given items to the batch change request.
Delete(items ...UpdateItem) ItemChange

// Send sends the batch change request.
Send(ctx context.Context) error
}
144 changes: 105 additions & 39 deletions client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/sirupsen/logrus"
"go.ligato.io/cn-infra/v2/datasync"
"go.ligato.io/cn-infra/v2/datasync/kvdbsync/local"
"go.ligato.io/cn-infra/v2/datasync/syncbase"
"go.ligato.io/cn-infra/v2/db/keyval"
"google.golang.org/protobuf/proto"

"go.ligato.io/vpp-agent/v3/pkg/models"
Expand All @@ -36,17 +37,17 @@ import (
// Updates and resyncs of this client use local.DefaultRegistry for propagating data to orchestrator.Dispatcher
// (going through watcher.Aggregator together with other data sources). However, data retrieval uses
// orchestrator.Dispatcher directly.
var LocalClient = NewClient(&txnFactory{local.DefaultRegistry}, &orchestrator.DefaultPlugin)
var LocalClient = NewClient(local.DefaultRegistry, &orchestrator.DefaultPlugin)

type client struct {
txnFactory ProtoTxnFactory
registry *syncbase.Registry
dispatcher orchestrator.Dispatcher
}

// NewClient returns new instance that uses given registry for data propagation and dispatcher for data retrieval.
func NewClient(factory ProtoTxnFactory, dispatcher orchestrator.Dispatcher) ConfigClient {
func NewClient(registry *syncbase.Registry, dispatcher orchestrator.Dispatcher) ConfigClient {
return &client{
txnFactory: factory,
registry: registry,
dispatcher: dispatcher,
}
}
Expand All @@ -64,15 +65,16 @@ func (c *client) KnownModels(class string) ([]*ModelInfo, error) {
return modules, nil
}

func (c *client) ResyncConfig(items ...proto.Message) error {
txn := c.txnFactory.NewTxn(true)
func (c *client) ResyncConfig(msgs ...proto.Message) error {
txn := c.newLazyValTxn(true)

for _, item := range items {
key, err := models.GetKey(item)
uis := ProtosToUpdateItems(msgs)
for _, ui := range uis {
key, err := models.GetKey(ui.Message)
if err != nil {
return err
}
txn.Put(key, item)
txn.Put(key, ui)
}

ctx := context.Background()
Expand Down Expand Up @@ -161,45 +163,53 @@ func (c *client) DumpState() ([]*generic.StateItem, error) {
}

func (c *client) ChangeRequest() ChangeRequest {
return &changeRequest{txn: c.txnFactory.NewTxn(false)}
return &changeRequest{itemChange: itemChange{txn: c.newLazyValTxn(false)}}
}

type changeRequest struct {
txn keyval.ProtoTxn
err error
func (c *client) NewItemChange() ItemChange {
return &itemChange{txn: c.newLazyValTxn(false)}
}

func (r *changeRequest) Update(items ...proto.Message) ChangeRequest {
if r.err != nil {
return r
}
for _, item := range items {
key, err := models.GetKey(item)
if err != nil {
r.err = err
return r
}
r.txn.Put(key, item)
func (c *client) newLazyValTxn(resync bool) *LazyValTxn {
if resync {
return NewLazyValTxn(c.registry.PropagateResync)
}
return r
return NewLazyValTxn(c.registry.PropagateChanges)
}

type itemChange struct {
txn *LazyValTxn
err error
}

func (r *changeRequest) Delete(items ...proto.Message) ChangeRequest {
func (r *itemChange) update(delete bool, items ...UpdateItem) *itemChange {
if r.err != nil {
return r
}
for _, item := range items {
key, err := models.GetKey(item)
key, err := models.GetKey(item.Message)
if err != nil {
r.err = err
return r
}
r.txn.Delete(key)
if delete {
r.txn.Delete(key)
} else {
r.txn.Put(key, item)
}
}
return r
}

func (r *changeRequest) Send(ctx context.Context) error {
func (r *itemChange) Update(items ...UpdateItem) ItemChange {
return r.update(false, items...)
}

func (r *itemChange) Delete(items ...UpdateItem) ItemChange {
return r.update(true, items...)
}

func (r *itemChange) Send(ctx context.Context) error {
if r.err != nil {
return r.err
}
Expand All @@ -210,20 +220,76 @@ func (r *changeRequest) Send(ctx context.Context) error {
return r.txn.Commit(ctx)
}

// ProtoTxnFactory defines interface for keyval transaction provider.
type ProtoTxnFactory interface {
NewTxn(resync bool) keyval.ProtoTxn
type changeRequest struct {
itemChange
}

func (r *changeRequest) Update(msgs ...proto.Message) ChangeRequest {
uis := ProtosToUpdateItems(msgs)
r.itemChange = *r.update(false, uis...)
return r
}

type txnFactory struct {
registry *syncbase.Registry
func (r *changeRequest) Delete(msgs ...proto.Message) ChangeRequest {
uis := ProtosToUpdateItems(msgs)
r.itemChange = *r.update(true, uis...)
return r
}

func (p *txnFactory) NewTxn(resync bool) keyval.ProtoTxn {
if resync {
return local.NewProtoTxn(p.registry.PropagateResync)
func (r *changeRequest) Send(ctx context.Context) error {
return r.itemChange.Send(ctx)
}

type LazyValTxn struct {
mu sync.Mutex
changes map[string]datasync.ChangeValue
commit func(context.Context, map[string]datasync.ChangeValue) error
}

func NewLazyValTxn(commit func(context.Context, map[string]datasync.ChangeValue) error) *LazyValTxn {
return &LazyValTxn{
changes: make(map[string]datasync.ChangeValue),
commit: commit,
}
}

func (txn *LazyValTxn) Put(key string, value datasync.LazyValue) *LazyValTxn {
txn.mu.Lock()
defer txn.mu.Unlock()

txn.changes[key] = NewChangeLazy(key, value, 0, datasync.Put)
return txn
}

func (txn *LazyValTxn) Delete(key string) *LazyValTxn {
txn.mu.Lock()
defer txn.mu.Unlock()

txn.changes[key] = NewChangeLazy(key, nil, 0, datasync.Delete)
return txn
}

func (txn *LazyValTxn) Commit(ctx context.Context) error {
txn.mu.Lock()
defer txn.mu.Unlock()

return txn.commit(ctx, txn.changes)
}

func NewChangeLazy(key string, value datasync.LazyValue, rev int64, changeType datasync.Op) *syncbase.Change {
// syncbase.Change does not export its changeType field so we set it first with syncbase.NewChange
change := syncbase.NewChange("", nil, 0, changeType)
change.KeyVal = syncbase.NewKeyVal(key, value, rev)
return change
}

func ProtosToUpdateItems(msgs []proto.Message) []UpdateItem {
var uis []UpdateItem
for _, msg := range msgs {
// resulting UpdateItem will have no labels
uis = append(uis, UpdateItem{Message: msg})
}
return local.NewProtoTxn(p.registry.PropagateChanges)
return uis
}

func extractProtoMessages(dsts []interface{}) []proto.Message {
Expand Down
74 changes: 50 additions & 24 deletions client/remoteclient/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ func (c *grpcClient) KnownModels(class string) ([]*client.ModelInfo, error) {

func (c *grpcClient) ChangeRequest() client.ChangeRequest {
return &setConfigRequest{
setItemRequest: setItemRequest{
client: c.manager,
modelRegistry: c.modelRegistry,
req: &generic.SetConfigRequest{},
},
}
}

func (c *grpcClient) NewItemChange() client.ItemChange {
return &setItemRequest{
client: c.manager,
modelRegistry: c.modelRegistry,
req: &generic.SetConfigRequest{},
Expand Down Expand Up @@ -271,56 +281,72 @@ func (c *grpcClient) DumpState() ([]*client.StateItem, error) {
return resp.GetItems(), nil
}

type setConfigRequest struct {
type setItemRequest struct {
client generic.ManagerServiceClient
modelRegistry models.Registry
req *generic.SetConfigRequest
err error
}

func (r *setConfigRequest) Update(items ...proto.Message) client.ChangeRequest {
func (r *setItemRequest) update(delete bool, items ...client.UpdateItem) *setItemRequest {
if r.err != nil {
return r
}
for _, protoModel := range items {
for _, ui := range items {
var item *generic.Item
item, r.err = models.MarshalItemUsingModelRegistry(protoModel, r.modelRegistry)
if r.err != nil {
return r
}
r.req.Updates = append(r.req.Updates, &generic.UpdateItem{
Item: item,
})
}
return r
}

func (r *setConfigRequest) Delete(items ...proto.Message) client.ChangeRequest {
if r.err != nil {
return r
}
for _, protoModel := range items {
item, err := models.MarshalItemUsingModelRegistry(protoModel, r.modelRegistry)
item, err := models.MarshalItemUsingModelRegistry(ui.Message, r.modelRegistry)
if err != nil {
r.err = err
return r
}
item.Data = nil // delete
if delete {
item.Data = nil
ui.Labels = nil
}
r.req.Updates = append(r.req.Updates, &generic.UpdateItem{
Item: item,
Item: item,
Labels: ui.Labels,
})
}
return r
}

func (r *setConfigRequest) Send(ctx context.Context) (err error) {
func (r *setItemRequest) Update(items ...client.UpdateItem) client.ItemChange {
return r.update(false, items...)
}

func (r *setItemRequest) Delete(items ...client.UpdateItem) client.ItemChange {
return r.update(true, items...)
}

func (r *setItemRequest) Send(ctx context.Context) error {
if r.err != nil {
return r.err
}
_, err = r.client.SetConfig(ctx, r.req)
_, err := r.client.SetConfig(ctx, r.req)
return err
}

type setConfigRequest struct {
setItemRequest
}

func (r *setConfigRequest) Update(msgs ...proto.Message) client.ChangeRequest {
uis := client.ProtosToUpdateItems(msgs)
r.setItemRequest = *r.update(false, uis...)
return r
}

func (r *setConfigRequest) Delete(msgs ...proto.Message) client.ChangeRequest {
uis := client.ProtosToUpdateItems(msgs)
r.setItemRequest = *r.update(true, uis...)
return r
}

func (r *setConfigRequest) Send(ctx context.Context) error {
return r.setItemRequest.Send(ctx)
}

// UseRemoteRegistry modifies remote client to use copy of remote model registry instead of local model registry.
// The copy is filled with remote known models for given class (modelClass).
func UseRemoteRegistry(modelClass string) NewClientOption {
Expand Down
Loading

0 comments on commit a5dcb5a

Please sign in to comment.