From 10c86c5cc0c88a2d66344e4b1546e53d99ad98b9 Mon Sep 17 00:00:00 2001 From: Utkarsh Bhatt Date: Tue, 15 Oct 2024 16:26:43 +0530 Subject: [PATCH] Resolved Comment Signed-off-by: Utkarsh Bhatt --- microceph/api/ops_replication.go | 7 +-- microceph/api/types/replication.go | 7 ++- microceph/ceph/rbd_mirror.go | 70 ++++++++++++++++++++---------- microceph/ceph/replication.go | 38 ++++++++-------- microceph/ceph/replication_rbd.go | 10 +++++ 5 files changed, 85 insertions(+), 47 deletions(-) diff --git a/microceph/api/ops_replication.go b/microceph/api/ops_replication.go index 8eb66741..406ff148 100644 --- a/microceph/api/ops_replication.go +++ b/microceph/api/ops_replication.go @@ -69,20 +69,21 @@ func deleteOpsReplicationResource(s state.State, r *http.Request) response.Respo // cmdOpsReplication is the common handler for all requests on replication endpoint. func cmdOpsReplication(s state.State, r *http.Request, patchRequest types.ReplicationRequestType) response.Response { - // NOTE (utkarshbhatthere): unescaping API $wl and $name is not required - // as that information is present in payload. + // Get workload name from API wl, err := url.PathUnescape(mux.Vars(r)["wl"]) if err != nil { logger.Errorf("REP: %v", err.Error()) return response.InternalError(err) } + // Get resource name from API resource, err := url.PathUnescape(mux.Vars(r)["name"]) if err != nil { logger.Errorf("REP: %v", err.Error()) return response.InternalError(err) } + // Populate the replication request with necessary information for RESTfullnes var req types.ReplicationRequest if wl == string(types.RbdWorkload) { var data types.RbdReplicationRequest @@ -101,7 +102,7 @@ func cmdOpsReplication(s state.State, r *http.Request, patchRequest types.Replic req = data } else { - return response.SmartError(fmt.Errorf("")) + return response.SmartError(fmt.Errorf("unknown workload %s, resource %s", wl, resource)) } return handleReplicationRequest(s, r.Context(), req) diff --git a/microceph/api/types/replication.go b/microceph/api/types/replication.go index f28aadd5..fb512c17 100644 --- a/microceph/api/types/replication.go +++ b/microceph/api/types/replication.go @@ -5,9 +5,10 @@ import ( ) // ################################## Generic Replication Request ################################## +// ReplicationRequestType defines the various events replication request types. type ReplicationRequestType string -// This value is split till '-' to get the API request value. +// This value is split till '-' to get the API request type and the event name encoded in one string. const ( EnableReplicationRequest ReplicationRequestType = "POST-" + constants.EventEnableReplication ConfigureReplicationRequest ReplicationRequestType = "PUT-" + constants.EventConfigureReplication @@ -24,6 +25,10 @@ const ( RgwWorkload CephWorkloadType = "rgw" ) +// ReplicationRequest is interface for all Replication implementations (rbd, cephfs, rgw). +// It defines methods used by: +// 1. client code to make the API request +// 2. Replication state machine to feed the correct event trigger. type ReplicationRequest interface { GetWorkloadType() CephWorkloadType GetAPIObjectId() string diff --git a/microceph/ceph/rbd_mirror.go b/microceph/ceph/rbd_mirror.go index aa2355ec..6fc2b85f 100644 --- a/microceph/ceph/rbd_mirror.go +++ b/microceph/ceph/rbd_mirror.go @@ -30,14 +30,14 @@ func GetRbdMirrorPoolInfo(pool string, cluster string, client string) (RbdReplic output, err := processExec.RunCommand("rbd", args...) if err != nil { - logger.Warnf("failed info operation on res(%s): %v", pool, err) + logger.Warnf("REPRBD: failed pool info operation on res(%s): %v", pool, err) return RbdReplicationPoolInfo{Mode: types.RbdResourceDisabled}, nil } err = json.Unmarshal([]byte(output), &response) if err != nil { ne := fmt.Errorf("cannot unmarshal rbd response: %v", err) - logger.Errorf(ne.Error()) + logger.Errorf("REPRBD: %s", ne.Error()) return RbdReplicationPoolInfo{Mode: types.RbdResourceDisabled}, ne } @@ -52,6 +52,7 @@ func populatePoolStatus(status string) (RbdReplicationPoolStatus, error) { err := json.Unmarshal([]byte(status), &summary) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return RbdReplicationPoolStatus{}, err } @@ -64,7 +65,7 @@ func GetRbdMirrorPoolStatus(pool string, cluster string, client string) (RbdRepl output, err := processExec.RunCommand("rbd", args...) if err != nil { - logger.Warnf("failed info operation on res(%s): %v", pool, err) + logger.Warnf("failed pool status operation on res(%s): %v", pool, err) return RbdReplicationPoolStatus{State: StateDisabledReplication}, nil } @@ -100,7 +101,7 @@ func GetRbdMirrorVerbosePoolStatus(pool string, cluster string, client string) ( // Get verbose pool status output, err := processExec.RunCommand("rbd", args...) if err != nil { - logger.Warnf("failed info operation on res(%s): %v", pool, err) + logger.Warnf("REPRBD: failed verbose pool status operation on res(%s): %v", pool, err) return RbdReplicationVerbosePoolStatus{Summary: RbdReplicationPoolStatus{State: StateDisabledReplication}}, nil } @@ -147,7 +148,7 @@ func GetRbdMirrorImageStatus(pool string, image string, cluster string, client s output, err := processExec.RunCommand("rbd", args...) if err != nil { - logger.Warnf("failed info operation on res(%s): %v", resource, err) + logger.Warnf("failed image status operation on res(%s): %v", resource, err) return RbdReplicationImageStatus{State: StateDisabledReplication}, nil } @@ -172,12 +173,14 @@ func EnablePoolMirroring(pool string, mode types.RbdResourceType, localName stri // Enable pool mirroring on the local cluster. err := configurePoolMirroring(pool, mode, "", "") if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return err } // Enable pool mirroring on the remote cluster. err = configurePoolMirroring(pool, mode, localName, remoteName) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -190,21 +193,21 @@ func DisablePoolMirroring(pool string, peer RbdReplicationPeer, localName string // remove peer permissions err := RemovePeer(pool, localName, remoteName) if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } // Disable pool mirroring on the local cluster. err = configurePoolMirroring(pool, types.RbdResourceDisabled, "", "") if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } // Disable pool mirroring on the remote cluster. err = configurePoolMirroring(pool, types.RbdResourceDisabled, localName, remoteName) if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -216,7 +219,7 @@ func DisableMirroringAllImagesInPool(poolName string) error { poolStatus, err := GetRbdMirrorVerbosePoolStatus(poolName, "", "") if err != nil { err := fmt.Errorf("failed to fetch status for %s pool: %v", poolName, err) - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -234,39 +237,46 @@ func DisableMirroringAllImagesInPool(poolName string) error { } // getPeerUUID returns the peer ID for the requested peer name. -func getPeerUUID(pool string, peerName string, client string, cluster string) string { +func getPeerUUID(pool string, peerName string, client string, cluster string) (string, error) { poolInfo, err := GetRbdMirrorPoolInfo(pool, cluster, client) if err != nil { logger.Error(err.Error()) - return "" + return "", err } for _, peer := range poolInfo.Peers { if peer.RemoteName == peerName { - return peer.Id + return peer.Id, nil } } - return "" + return "", fmt.Errorf("no peer found") } // RemovePeer removes the rbd-mirror peer permissions for requested pool. func RemovePeer(pool string, localName string, remoteName string) error { // find local site's peer with name $remoteName - localPeer := getPeerUUID(pool, remoteName, "", "") - remotePeer := getPeerUUID(pool, localName, localName, remoteName) + localPeer, err := getPeerUUID(pool, remoteName, "", "") + if err != nil { + return err + } + + remotePeer, err := getPeerUUID(pool, localName, localName, remoteName) + if err != nil { + return err + } // Remove local cluster's peer - err := peerRemove(pool, localPeer, "", "") + err = peerRemove(pool, localPeer, "", "") if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } // Remove remote's peer err = peerRemove(pool, remotePeer, localName, remoteName) if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -278,14 +288,14 @@ func BootstrapPeer(pool string, localName string, remoteName string) error { // create bootstrap token on local site. token, err := peerBootstrapCreate(pool, "", localName) if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } // persist the peer token err = writeRemotePeerToken(token, remoteName) if err != nil { - logger.Error(err.Error()) + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -307,6 +317,7 @@ func configurePoolMirroring(pool string, mode types.RbdResourceType, localName s _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to execute rbd command: %v", err) } @@ -329,17 +340,20 @@ func configureImageMirroring(req types.RbdReplicationRequest) error { _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to configure rbd image feature: %v", err) } if mode == types.RbdReplicationSnapshot { err = createSnapshot(pool, image) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to create image(%s/%s) snapshot : %v", pool, image, err) } err = configureSnapshotSchedule(pool, image, schedule, "") if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to create image(%s/%s) snapshot schedule(%s) : %v", pool, image, schedule, err) } } @@ -354,12 +368,14 @@ func getSnapshotSchedule(pool string, image string) (imageSnapshotSchedule, erro output, err := listSnapshotSchedule(pool, image) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return imageSnapshotSchedule{}, err } ret := []imageSnapshotSchedule{} err = json.Unmarshal(output, &ret) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return imageSnapshotSchedule{}, nil } @@ -381,6 +397,7 @@ func listSnapshotSchedule(pool string, image string) ([]byte, error) { output, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return []byte(""), err } @@ -401,7 +418,7 @@ func listAllImagesInPool(pool string, localName string, remoteName string) []str var ret []string err = json.Unmarshal([]byte(output), &ret) if err != nil { - logger.Errorf("unexpected error encountered while parsing json output %s", output) + logger.Errorf("REPRBD: unexpected error encountered while parsing json output %s", output) return []string{} } @@ -433,6 +450,7 @@ func configureSnapshotSchedule(pool string, image string, schedule string, start _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -445,6 +463,7 @@ func createSnapshot(pool string, image string) error { _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return err } @@ -458,6 +477,7 @@ func configureImageFeatures(pool string, image string, op string, feature string _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to configure rbd image feature: %v", err) } @@ -477,6 +497,7 @@ func peerBootstrapCreate(pool string, client string, cluster string) (string, er output, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return "", fmt.Errorf("failed to bootstrap peer token: %v", err) } @@ -502,6 +523,7 @@ func peerBootstrapImport(pool string, client string, cluster string) error { _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to import peer bootstrap token: %v", err) } @@ -519,6 +541,7 @@ func peerRemove(pool string, peerId string, localName string, remoteName string) _, err := processExec.RunCommand("rbd", args...) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("failed to remove peer(%s) for pool(%s): %v", peerId, pool, err) } @@ -580,6 +603,7 @@ func writeRemotePeerToken(token string, remoteName string) error { err := os.MkdirAll(tokenDirPath, constants.PermissionWorldNoAccess) if err != nil { + logger.Errorf("REPRBD: %s", err.Error()) return fmt.Errorf("unable to create %q: %w", tokenDirPath, err) } @@ -591,7 +615,7 @@ func writeRemotePeerToken(token string, remoteName string) error { file, err := os.Create(tokenFilePath) if err != nil { ne := fmt.Errorf("failed to create the token file(%s): %w", tokenFilePath, err) - logger.Error(ne.Error()) + logger.Errorf("REPRBD: %s", ne.Error()) return ne } @@ -599,7 +623,7 @@ func writeRemotePeerToken(token string, remoteName string) error { _, err = file.WriteString(token) if err != nil { ne := fmt.Errorf("failed to write the token file(%s): %w", tokenFilePath, err) - logger.Error(ne.Error()) + logger.Errorf("REPRBD: %s", ne.Error()) return ne } diff --git a/microceph/ceph/replication.go b/microceph/ceph/replication.go index 7195ac00..829e7eff 100644 --- a/microceph/ceph/replication.go +++ b/microceph/ceph/replication.go @@ -50,6 +50,16 @@ func GetReplicationHandler(name string) ReplicationHandlerInterface { return rh } +func getAllEvents() []stateless.Trigger { + return []stateless.Trigger{ + constants.EventEnableReplication, + constants.EventDisableReplication, + constants.EventConfigureReplication, + constants.EventListReplication, + constants.EventStatusReplication, + } +} + func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateMachine { newFsm := stateless.NewStateMachine(initialState) // Configure transitions for disabled state. @@ -63,23 +73,17 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM newFsm.Configure(StateEnabledReplication). Permit(constants.EventDisableReplication, StateDisabledReplication). OnEntryFrom(constants.EventEnableReplication, enableHandler). - InternalTransition(constants.EventEnableReplication, enableHandler). InternalTransition(constants.EventConfigureReplication, configureHandler). InternalTransition(constants.EventListReplication, listHandler). InternalTransition(constants.EventStatusReplication, statusHandler) // Check Event params type. - var output *string - var dummyState interfaces.CephState - var eventHandler ReplicationHandlerInterface - inputType := reflect.TypeOf(&eventHandler).Elem() - outputType := reflect.TypeOf(output) - stateType := reflect.TypeOf(dummyState) - newFsm.SetTriggerParameters(constants.EventEnableReplication, inputType, outputType, stateType) - newFsm.SetTriggerParameters(constants.EventDisableReplication, inputType, outputType, stateType) - newFsm.SetTriggerParameters(constants.EventConfigureReplication, inputType, outputType, stateType) - newFsm.SetTriggerParameters(constants.EventListReplication, inputType, outputType, stateType) - newFsm.SetTriggerParameters(constants.EventStatusReplication, inputType, outputType, stateType) + var outputType *string + var stateType interfaces.CephState + var inputType ReplicationHandlerInterface + for _, event := range getAllEvents() { + newFsm.SetTriggerParameters(event, reflect.TypeOf(&inputType).Elem(), reflect.TypeOf(outputType), reflect.TypeOf(stateType)) + } // Add logger callback for all transitions newFsm.OnTransitioning(logTransitionHandler) @@ -87,8 +91,7 @@ func GetReplicationStateMachine(initialState ReplicationState) *stateless.StateM // Add handler for unhandled transitions. newFsm.OnUnhandledTrigger(unhandledTransitionHandler) - logger.Infof("REPFSM: Created from state: %s", initialState) - + logger.Debugf("REPFSM: Created from state: %s", initialState) return newFsm } @@ -97,31 +100,26 @@ func logTransitionHandler(_ context.Context, t stateless.Transition) { } func unhandledTransitionHandler(_ context.Context, state stateless.State, trigger stateless.Trigger, _ []string) error { - return fmt.Errorf("operation: %s is not permitted at %s state", trigger, state) + return fmt.Errorf("REPFSM: operation: %s is not permitted at %s state", trigger, state) } func enableHandler(ctx context.Context, args ...any) error { rh := args[repArgHandler].(ReplicationHandlerInterface) - logger.Infof("REPFSM: Entered Enable Handler") return rh.EnableHandler(ctx, args...) } func disableHandler(ctx context.Context, args ...any) error { rh := args[repArgHandler].(ReplicationHandlerInterface) - logger.Infof("REPFSM: Entered Disable Handler") return rh.DisableHandler(ctx, args...) } func configureHandler(ctx context.Context, args ...any) error { rh := args[repArgHandler].(ReplicationHandlerInterface) - logger.Infof("REPFSM: Entered Configure Handler") return rh.ConfigureHandler(ctx, args...) } func listHandler(ctx context.Context, args ...any) error { rh := args[repArgHandler].(ReplicationHandlerInterface) - logger.Infof("REPFSM: Entered List Handler") return rh.ListHandler(ctx, args...) } func statusHandler(ctx context.Context, args ...any) error { rh := args[repArgHandler].(ReplicationHandlerInterface) - logger.Infof("REPFSM: Entered Status Handler") return rh.StatusHandler(ctx, args...) } diff --git a/microceph/ceph/replication_rbd.go b/microceph/ceph/replication_rbd.go index 94f0b0de..70fba9b1 100644 --- a/microceph/ceph/replication_rbd.go +++ b/microceph/ceph/replication_rbd.go @@ -121,6 +121,8 @@ func (rh *RbdReplicationHandler) GetResourceState() ReplicationState { // EnableHandler enables mirroring for requested rbd pool/image. func (rh *RbdReplicationHandler) EnableHandler(ctx context.Context, args ...any) error { + logger.Debugf("REPFSM: Enable handler, Req %v", rh.Request) + st := args[repArgState].(interfaces.CephState).ClusterState() dbRec, err := database.GetRemoteDb(ctx, st, rh.Request.RemoteName) if err != nil { @@ -140,6 +142,8 @@ func (rh *RbdReplicationHandler) EnableHandler(ctx context.Context, args ...any) // DisableHandler disables mirroring configured for requested rbd pool/image. func (rh *RbdReplicationHandler) DisableHandler(ctx context.Context, args ...any) error { + logger.Debugf("REPFSM: Disable handler, Req %v", rh.Request) + st := args[repArgState].(interfaces.CephState).ClusterState() dbRec, err := database.GetRemoteDb(ctx, st, rh.Request.RemoteName) if err != nil { @@ -159,6 +163,8 @@ func (rh *RbdReplicationHandler) DisableHandler(ctx context.Context, args ...any // ConfigureHandler configures replication properties for requested rbd pool/image. func (rh *RbdReplicationHandler) ConfigureHandler(ctx context.Context, args ...any) error { + logger.Debugf("REPFSM: Configure handler, Req %v", rh.Request) + schedule, err := getSnapshotSchedule(rh.Request.SourcePool, rh.Request.SourceImage) if err != nil { return err @@ -173,6 +179,8 @@ func (rh *RbdReplicationHandler) ConfigureHandler(ctx context.Context, args ...a // ListHandler fetches a list of rbd pools/images configured for mirroring. func (rh *RbdReplicationHandler) ListHandler(ctx context.Context, args ...any) error { + logger.Debugf("REPFSM: List handler, Req %v", rh.Request) + // fetch all ceph pools initialised with rbd application. pools := ListPools("rbd") @@ -223,6 +231,8 @@ func (rh *RbdReplicationHandler) ListHandler(ctx context.Context, args ...any) e // StatusHandler fetches the status of requested rbd pool/image resource. func (rh *RbdReplicationHandler) StatusHandler(ctx context.Context, args ...any) error { + logger.Debugf("REPFSM: Status handler, Req %v", rh.Request) + var resp any // Populate Status resp.