diff --git a/client/client_api.go b/client/client_api.go index e00fec280f..cd78538b3b 100644 --- a/client/client_api.go +++ b/client/client_api.go @@ -35,6 +35,18 @@ type UpdateItem struct { Labels map[string]string } +// GetValue exists so that UpdateItem satisfies datasync.LazyValue interface. +func (item UpdateItem) GetValue(out proto.Message) error { + if item.Message != nil { + proto.Merge(out, item.Message) + } + return nil +} + +func (item UpdateItem) GetLabels() map[string]string { + return item.Labels +} + type UpdateResult struct { Key string Status *generic.ItemStatus @@ -62,6 +74,9 @@ type GenericClient interface { // ChangeRequest returns transaction for changing config. ChangeRequest() ChangeRequest + // NewItemChange() return transaction with label support for changing config. + NewItemChange() ItemChange + // ResyncConfig overwrites existing config. ResyncConfig(items ...proto.Message) error @@ -94,3 +109,15 @@ type ChangeRequest interface { // Send sends the request. Send(ctx context.Context) error } + +// ItemChange is interface for update item change request (so it supports item labels as well). +type ItemChange interface { + // Update appends updates for given items to the batch change request. + Update(items ...UpdateItem) ItemChange + + // Delete appends deletes for given items to the batch change request. + Delete(items ...UpdateItem) ItemChange + + // Send sends the batch change request. + Send(ctx context.Context) error +} diff --git a/client/local_client.go b/client/local_client.go index 85708bc720..0b0b5d51d2 100644 --- a/client/local_client.go +++ b/client/local_client.go @@ -18,11 +18,12 @@ import ( "context" "fmt" "strings" + "sync" "github.com/sirupsen/logrus" + "go.ligato.io/cn-infra/v2/datasync" "go.ligato.io/cn-infra/v2/datasync/kvdbsync/local" "go.ligato.io/cn-infra/v2/datasync/syncbase" - "go.ligato.io/cn-infra/v2/db/keyval" "google.golang.org/protobuf/proto" "go.ligato.io/vpp-agent/v3/pkg/models" @@ -36,17 +37,17 @@ import ( // Updates and resyncs of this client use local.DefaultRegistry for propagating data to orchestrator.Dispatcher // (going through watcher.Aggregator together with other data sources). However, data retrieval uses // orchestrator.Dispatcher directly. -var LocalClient = NewClient(&txnFactory{local.DefaultRegistry}, &orchestrator.DefaultPlugin) +var LocalClient = NewClient(local.DefaultRegistry, &orchestrator.DefaultPlugin) type client struct { - txnFactory ProtoTxnFactory + registry *syncbase.Registry dispatcher orchestrator.Dispatcher } // NewClient returns new instance that uses given registry for data propagation and dispatcher for data retrieval. -func NewClient(factory ProtoTxnFactory, dispatcher orchestrator.Dispatcher) ConfigClient { +func NewClient(registry *syncbase.Registry, dispatcher orchestrator.Dispatcher) ConfigClient { return &client{ - txnFactory: factory, + registry: registry, dispatcher: dispatcher, } } @@ -64,15 +65,16 @@ func (c *client) KnownModels(class string) ([]*ModelInfo, error) { return modules, nil } -func (c *client) ResyncConfig(items ...proto.Message) error { - txn := c.txnFactory.NewTxn(true) +func (c *client) ResyncConfig(msgs ...proto.Message) error { + txn := c.newLazyValTxn(true) - for _, item := range items { - key, err := models.GetKey(item) + uis := ProtosToUpdateItems(msgs) + for _, ui := range uis { + key, err := models.GetKey(ui.Message) if err != nil { return err } - txn.Put(key, item) + txn.Put(key, ui) } ctx := context.Background() @@ -161,45 +163,53 @@ func (c *client) DumpState() ([]*generic.StateItem, error) { } func (c *client) ChangeRequest() ChangeRequest { - return &changeRequest{txn: c.txnFactory.NewTxn(false)} + return &changeRequest{itemChange: itemChange{txn: c.newLazyValTxn(false)}} } -type changeRequest struct { - txn keyval.ProtoTxn - err error +func (c *client) NewItemChange() ItemChange { + return &itemChange{txn: c.newLazyValTxn(false)} } -func (r *changeRequest) Update(items ...proto.Message) ChangeRequest { - if r.err != nil { - return r - } - for _, item := range items { - key, err := models.GetKey(item) - if err != nil { - r.err = err - return r - } - r.txn.Put(key, item) +func (c *client) newLazyValTxn(resync bool) *LazyValTxn { + if resync { + return NewLazyValTxn(c.registry.PropagateResync) } - return r + return NewLazyValTxn(c.registry.PropagateChanges) +} + +type itemChange struct { + txn *LazyValTxn + err error } -func (r *changeRequest) Delete(items ...proto.Message) ChangeRequest { +func (r *itemChange) update(delete bool, items ...UpdateItem) *itemChange { if r.err != nil { return r } for _, item := range items { - key, err := models.GetKey(item) + key, err := models.GetKey(item.Message) if err != nil { r.err = err return r } - r.txn.Delete(key) + if delete { + r.txn.Delete(key) + } else { + r.txn.Put(key, item) + } } return r } -func (r *changeRequest) Send(ctx context.Context) error { +func (r *itemChange) Update(items ...UpdateItem) ItemChange { + return r.update(false, items...) +} + +func (r *itemChange) Delete(items ...UpdateItem) ItemChange { + return r.update(true, items...) +} + +func (r *itemChange) Send(ctx context.Context) error { if r.err != nil { return r.err } @@ -210,20 +220,76 @@ func (r *changeRequest) Send(ctx context.Context) error { return r.txn.Commit(ctx) } -// ProtoTxnFactory defines interface for keyval transaction provider. -type ProtoTxnFactory interface { - NewTxn(resync bool) keyval.ProtoTxn +type changeRequest struct { + itemChange +} + +func (r *changeRequest) Update(msgs ...proto.Message) ChangeRequest { + uis := ProtosToUpdateItems(msgs) + r.itemChange = *r.update(false, uis...) + return r } -type txnFactory struct { - registry *syncbase.Registry +func (r *changeRequest) Delete(msgs ...proto.Message) ChangeRequest { + uis := ProtosToUpdateItems(msgs) + r.itemChange = *r.update(true, uis...) + return r } -func (p *txnFactory) NewTxn(resync bool) keyval.ProtoTxn { - if resync { - return local.NewProtoTxn(p.registry.PropagateResync) +func (r *changeRequest) Send(ctx context.Context) error { + return r.itemChange.Send(ctx) +} + +type LazyValTxn struct { + mu sync.Mutex + changes map[string]datasync.ChangeValue + commit func(context.Context, map[string]datasync.ChangeValue) error +} + +func NewLazyValTxn(commit func(context.Context, map[string]datasync.ChangeValue) error) *LazyValTxn { + return &LazyValTxn{ + changes: make(map[string]datasync.ChangeValue), + commit: commit, + } +} + +func (txn *LazyValTxn) Put(key string, value datasync.LazyValue) *LazyValTxn { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.changes[key] = NewChangeLazy(key, value, 0, datasync.Put) + return txn +} + +func (txn *LazyValTxn) Delete(key string) *LazyValTxn { + txn.mu.Lock() + defer txn.mu.Unlock() + + txn.changes[key] = NewChangeLazy(key, nil, 0, datasync.Delete) + return txn +} + +func (txn *LazyValTxn) Commit(ctx context.Context) error { + txn.mu.Lock() + defer txn.mu.Unlock() + + return txn.commit(ctx, txn.changes) +} + +func NewChangeLazy(key string, value datasync.LazyValue, rev int64, changeType datasync.Op) *syncbase.Change { + // syncbase.Change does not export its changeType field so we set it first with syncbase.NewChange + change := syncbase.NewChange("", nil, 0, changeType) + change.KeyVal = syncbase.NewKeyVal(key, value, rev) + return change +} + +func ProtosToUpdateItems(msgs []proto.Message) []UpdateItem { + var uis []UpdateItem + for _, msg := range msgs { + // resulting UpdateItem will have no labels + uis = append(uis, UpdateItem{Message: msg}) } - return local.NewProtoTxn(p.registry.PropagateChanges) + return uis } func extractProtoMessages(dsts []interface{}) []proto.Message { diff --git a/client/remoteclient/grpc_client.go b/client/remoteclient/grpc_client.go index a751e99467..bcebf3697e 100644 --- a/client/remoteclient/grpc_client.go +++ b/client/remoteclient/grpc_client.go @@ -128,6 +128,16 @@ func (c *grpcClient) KnownModels(class string) ([]*client.ModelInfo, error) { func (c *grpcClient) ChangeRequest() client.ChangeRequest { return &setConfigRequest{ + setItemRequest: setItemRequest{ + client: c.manager, + modelRegistry: c.modelRegistry, + req: &generic.SetConfigRequest{}, + }, + } +} + +func (c *grpcClient) NewItemChange() client.ItemChange { + return &setItemRequest{ client: c.manager, modelRegistry: c.modelRegistry, req: &generic.SetConfigRequest{}, @@ -271,56 +281,72 @@ func (c *grpcClient) DumpState() ([]*client.StateItem, error) { return resp.GetItems(), nil } -type setConfigRequest struct { +type setItemRequest struct { client generic.ManagerServiceClient modelRegistry models.Registry req *generic.SetConfigRequest err error } -func (r *setConfigRequest) Update(items ...proto.Message) client.ChangeRequest { +func (r *setItemRequest) update(delete bool, items ...client.UpdateItem) *setItemRequest { if r.err != nil { return r } - for _, protoModel := range items { + for _, ui := range items { var item *generic.Item - item, r.err = models.MarshalItemUsingModelRegistry(protoModel, r.modelRegistry) - if r.err != nil { - return r - } - r.req.Updates = append(r.req.Updates, &generic.UpdateItem{ - Item: item, - }) - } - return r -} - -func (r *setConfigRequest) Delete(items ...proto.Message) client.ChangeRequest { - if r.err != nil { - return r - } - for _, protoModel := range items { - item, err := models.MarshalItemUsingModelRegistry(protoModel, r.modelRegistry) + item, err := models.MarshalItemUsingModelRegistry(ui.Message, r.modelRegistry) if err != nil { r.err = err return r } - item.Data = nil // delete + if delete { + item.Data = nil + ui.Labels = nil + } r.req.Updates = append(r.req.Updates, &generic.UpdateItem{ - Item: item, + Item: item, + Labels: ui.Labels, }) } return r } -func (r *setConfigRequest) Send(ctx context.Context) (err error) { +func (r *setItemRequest) Update(items ...client.UpdateItem) client.ItemChange { + return r.update(false, items...) +} + +func (r *setItemRequest) Delete(items ...client.UpdateItem) client.ItemChange { + return r.update(true, items...) +} + +func (r *setItemRequest) Send(ctx context.Context) error { if r.err != nil { return r.err } - _, err = r.client.SetConfig(ctx, r.req) + _, err := r.client.SetConfig(ctx, r.req) return err } +type setConfigRequest struct { + setItemRequest +} + +func (r *setConfigRequest) Update(msgs ...proto.Message) client.ChangeRequest { + uis := client.ProtosToUpdateItems(msgs) + r.setItemRequest = *r.update(false, uis...) + return r +} + +func (r *setConfigRequest) Delete(msgs ...proto.Message) client.ChangeRequest { + uis := client.ProtosToUpdateItems(msgs) + r.setItemRequest = *r.update(true, uis...) + return r +} + +func (r *setConfigRequest) Send(ctx context.Context) error { + return r.setItemRequest.Send(ctx) +} + // UseRemoteRegistry modifies remote client to use copy of remote model registry instead of local model registry. // The copy is filled with remote known models for given class (modelClass). func UseRemoteRegistry(modelClass string) NewClientOption { diff --git a/plugins/orchestrator/localregistry/initfileregistry.go b/plugins/orchestrator/localregistry/initfileregistry.go index ab6855afe3..af8dbabe4d 100644 --- a/plugins/orchestrator/localregistry/initfileregistry.go +++ b/plugins/orchestrator/localregistry/initfileregistry.go @@ -22,10 +22,8 @@ import ( yaml2 "github.com/ghodss/yaml" "go.ligato.io/cn-infra/v2/config" "go.ligato.io/cn-infra/v2/datasync" - "go.ligato.io/cn-infra/v2/datasync/kvdbsync/local" "go.ligato.io/cn-infra/v2/datasync/resync" "go.ligato.io/cn-infra/v2/datasync/syncbase" - "go.ligato.io/cn-infra/v2/db/keyval" "go.ligato.io/cn-infra/v2/infra" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -198,7 +196,7 @@ func (r *InitFileRegistry) watchResync(resyncReg resync.Registration) { // resyncReg.StatusChan == Started => resync if resyncStatus.ResyncStatus() == resync.Started && !r.pushedToWatchedRegistry { if !r.Empty() { // put preloaded NB init file data into watched p.registry - c := client.NewClient(&txnFactory{r.watchedRegistry}, &orchestrator.DefaultPlugin) + c := client.NewClient(r.watchedRegistry, &orchestrator.DefaultPlugin) if err := c.ResyncConfig(r.preloadedNBConfigs...); err != nil { r.Log.Errorf("resyncing preloaded NB init file data "+ "into watched registry failed: %w", err) @@ -269,14 +267,3 @@ func (r *InitFileRegistry) preloadNBConfigs(filePath string) error { return nil } - -type txnFactory struct { - registry *syncbase.Registry -} - -func (p *txnFactory) NewTxn(resync bool) keyval.ProtoTxn { - if resync { - return local.NewProtoTxn(p.registry.PropagateResync) - } - return local.NewProtoTxn(p.registry.PropagateChanges) -} diff --git a/plugins/orchestrator/orchestrator.go b/plugins/orchestrator/orchestrator.go index e9676b8071..2db3dba4f1 100644 --- a/plugins/orchestrator/orchestrator.go +++ b/plugins/orchestrator/orchestrator.go @@ -23,6 +23,7 @@ import ( "github.com/go-errors/errors" "go.ligato.io/cn-infra/v2/datasync" "go.ligato.io/cn-infra/v2/datasync/resync" + "go.ligato.io/cn-infra/v2/datasync/syncbase" "go.ligato.io/cn-infra/v2/infra" "go.ligato.io/cn-infra/v2/logging" "go.ligato.io/cn-infra/v2/rpc/grpc" @@ -184,6 +185,7 @@ func (p *Plugin) watchEvents() { var err error var kvPairs []KeyVal + labels := make(map[string]Labels) for _, x := range e.GetChanges() { kv := KeyVal{ @@ -196,6 +198,9 @@ func (p *Plugin) watchEvents() { continue } } + if item, ok := assertUpdateItem(x); ok { + labels[kv.Key] = item.GetLabels() + } kvPairs = append(kvPairs, kv) } @@ -216,13 +221,14 @@ func (p *Plugin) watchEvents() { ctx = contextdecorator.DataSrcContext(ctx, "datasync") } ctx = kvs.WithRetryDefault(ctx) - _, err = p.PushData(ctx, kvPairs, nil) + _, err = p.PushData(ctx, kvPairs, labels) e.Done(err) case e := <-p.resyncChan: p.log.Debugf("=> received RESYNC event (%v prefixes)", len(e.GetValues())) var kvPairs []KeyVal + labels := make(map[string]Labels) for prefix, iter := range e.GetValues() { var keyVals []datasync.KeyVal @@ -233,6 +239,11 @@ func (p *Plugin) watchEvents() { p.log.Errorf("unmarshal value for key %q failed: %v", key, err) continue } + if kv, ok := x.(*syncbase.KeyVal); ok { + if item, ok := kv.LazyValue.(updateItem); ok { + labels[key] = item.GetLabels() + } + } kvPairs = append(kvPairs, KeyVal{ Key: key, Val: val, @@ -263,7 +274,7 @@ func (p *Plugin) watchEvents() { ctx = kvs.WithResync(ctx, kvs.FullResync, true) ctx = kvs.WithRetryDefault(ctx) - _, err := p.PushData(ctx, kvPairs, nil) + _, err := p.PushData(ctx, kvPairs, labels) e.Done(err) case <-p.quit: @@ -272,6 +283,24 @@ func (p *Plugin) watchEvents() { } } +type updateItem interface { + datasync.LazyValue + GetLabels() map[string]string +} + +func assertUpdateItem(protoResp datasync.ProtoWatchResp) (updateItem, bool) { + if changeResp, ok := protoResp.(*syncbase.ChangeResp); ok { + if change, ok := changeResp.CurrVal.(*syncbase.Change); ok { + if kv, ok := change.KeyVal.(*syncbase.KeyVal); ok { + if item, ok := kv.LazyValue.(updateItem); ok { + return item, ok + } + } + } + } + return nil, false +} + func (p *Plugin) watchStatus(ch <-chan *kvscheduler.BaseValueStatus) { defer p.wg.Done()