Skip to content

Commit

Permalink
Resolved Comment
Browse files Browse the repository at this point in the history
Signed-off-by: Utkarsh Bhatt <utkarsh.bhatt@canonical.com>
  • Loading branch information
UtkarshBhatthere committed Oct 15, 2024
1 parent 38f0840 commit 10c86c5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 47 deletions.
7 changes: 4 additions & 3 deletions microceph/api/ops_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,21 @@ func deleteOpsReplicationResource(s state.State, r *http.Request) response.Respo

// cmdOpsReplication is the common handler for all requests on replication endpoint.
func cmdOpsReplication(s state.State, r *http.Request, patchRequest types.ReplicationRequestType) response.Response {
// NOTE (utkarshbhatthere): unescaping API $wl and $name is not required
// as that information is present in payload.
// Get workload name from API
wl, err := url.PathUnescape(mux.Vars(r)["wl"])
if err != nil {
logger.Errorf("REP: %v", err.Error())
return response.InternalError(err)
}

// Get resource name from API
resource, err := url.PathUnescape(mux.Vars(r)["name"])
if err != nil {
logger.Errorf("REP: %v", err.Error())
return response.InternalError(err)
}

// Populate the replication request with necessary information for RESTfullnes
var req types.ReplicationRequest
if wl == string(types.RbdWorkload) {
var data types.RbdReplicationRequest
Expand All @@ -101,7 +102,7 @@ func cmdOpsReplication(s state.State, r *http.Request, patchRequest types.Replic

req = data
} else {
return response.SmartError(fmt.Errorf(""))
return response.SmartError(fmt.Errorf("unknown workload %s, resource %s", wl, resource))
}

return handleReplicationRequest(s, r.Context(), req)
Expand Down
7 changes: 6 additions & 1 deletion microceph/api/types/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
)

// ################################## Generic Replication Request ##################################
// ReplicationRequestType defines the various events replication request types.
type ReplicationRequestType string

// This value is split till '-' to get the API request value.
// This value is split till '-' to get the API request type and the event name encoded in one string.
const (
EnableReplicationRequest ReplicationRequestType = "POST-" + constants.EventEnableReplication
ConfigureReplicationRequest ReplicationRequestType = "PUT-" + constants.EventConfigureReplication
Expand All @@ -24,6 +25,10 @@ const (
RgwWorkload CephWorkloadType = "rgw"
)

// ReplicationRequest is interface for all Replication implementations (rbd, cephfs, rgw).
// It defines methods used by:
// 1. client code to make the API request
// 2. Replication state machine to feed the correct event trigger.
type ReplicationRequest interface {
GetWorkloadType() CephWorkloadType
GetAPIObjectId() string
Expand Down
70 changes: 47 additions & 23 deletions microceph/ceph/rbd_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func GetRbdMirrorPoolInfo(pool string, cluster string, client string) (RbdReplic

output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Warnf("failed info operation on res(%s): %v", pool, err)
logger.Warnf("REPRBD: failed pool info operation on res(%s): %v", pool, err)
return RbdReplicationPoolInfo{Mode: types.RbdResourceDisabled}, nil
}

err = json.Unmarshal([]byte(output), &response)
if err != nil {
ne := fmt.Errorf("cannot unmarshal rbd response: %v", err)
logger.Errorf(ne.Error())
logger.Errorf("REPRBD: %s", ne.Error())
return RbdReplicationPoolInfo{Mode: types.RbdResourceDisabled}, ne
}

Expand All @@ -52,6 +52,7 @@ func populatePoolStatus(status string) (RbdReplicationPoolStatus, error) {

err := json.Unmarshal([]byte(status), &summary)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return RbdReplicationPoolStatus{}, err
}

Expand All @@ -64,7 +65,7 @@ func GetRbdMirrorPoolStatus(pool string, cluster string, client string) (RbdRepl

output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Warnf("failed info operation on res(%s): %v", pool, err)
logger.Warnf("failed pool status operation on res(%s): %v", pool, err)
return RbdReplicationPoolStatus{State: StateDisabledReplication}, nil
}

Expand Down Expand Up @@ -100,7 +101,7 @@ func GetRbdMirrorVerbosePoolStatus(pool string, cluster string, client string) (
// Get verbose pool status
output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Warnf("failed info operation on res(%s): %v", pool, err)
logger.Warnf("REPRBD: failed verbose pool status operation on res(%s): %v", pool, err)
return RbdReplicationVerbosePoolStatus{Summary: RbdReplicationPoolStatus{State: StateDisabledReplication}}, nil
}

Expand Down Expand Up @@ -147,7 +148,7 @@ func GetRbdMirrorImageStatus(pool string, image string, cluster string, client s

output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Warnf("failed info operation on res(%s): %v", resource, err)
logger.Warnf("failed image status operation on res(%s): %v", resource, err)
return RbdReplicationImageStatus{State: StateDisabledReplication}, nil
}

Expand All @@ -172,12 +173,14 @@ func EnablePoolMirroring(pool string, mode types.RbdResourceType, localName stri
// Enable pool mirroring on the local cluster.
err := configurePoolMirroring(pool, mode, "", "")
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return err
}

// Enable pool mirroring on the remote cluster.
err = configurePoolMirroring(pool, mode, localName, remoteName)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -190,21 +193,21 @@ func DisablePoolMirroring(pool string, peer RbdReplicationPeer, localName string
// remove peer permissions
err := RemovePeer(pool, localName, remoteName)
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

// Disable pool mirroring on the local cluster.
err = configurePoolMirroring(pool, types.RbdResourceDisabled, "", "")
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

// Disable pool mirroring on the remote cluster.
err = configurePoolMirroring(pool, types.RbdResourceDisabled, localName, remoteName)
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -216,7 +219,7 @@ func DisableMirroringAllImagesInPool(poolName string) error {
poolStatus, err := GetRbdMirrorVerbosePoolStatus(poolName, "", "")
if err != nil {
err := fmt.Errorf("failed to fetch status for %s pool: %v", poolName, err)
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -234,39 +237,46 @@ func DisableMirroringAllImagesInPool(poolName string) error {
}

// getPeerUUID returns the peer ID for the requested peer name.
func getPeerUUID(pool string, peerName string, client string, cluster string) string {
func getPeerUUID(pool string, peerName string, client string, cluster string) (string, error) {
poolInfo, err := GetRbdMirrorPoolInfo(pool, cluster, client)
if err != nil {
logger.Error(err.Error())
return ""
return "", err
}

for _, peer := range poolInfo.Peers {
if peer.RemoteName == peerName {
return peer.Id
return peer.Id, nil
}
}

return ""
return "", fmt.Errorf("no peer found")
}

// RemovePeer removes the rbd-mirror peer permissions for requested pool.
func RemovePeer(pool string, localName string, remoteName string) error {
// find local site's peer with name $remoteName
localPeer := getPeerUUID(pool, remoteName, "", "")
remotePeer := getPeerUUID(pool, localName, localName, remoteName)
localPeer, err := getPeerUUID(pool, remoteName, "", "")
if err != nil {
return err
}

remotePeer, err := getPeerUUID(pool, localName, localName, remoteName)
if err != nil {
return err
}

// Remove local cluster's peer
err := peerRemove(pool, localPeer, "", "")
err = peerRemove(pool, localPeer, "", "")
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

// Remove remote's peer
err = peerRemove(pool, remotePeer, localName, remoteName)
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -278,14 +288,14 @@ func BootstrapPeer(pool string, localName string, remoteName string) error {
// create bootstrap token on local site.
token, err := peerBootstrapCreate(pool, "", localName)
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

// persist the peer token
err = writeRemotePeerToken(token, remoteName)
if err != nil {
logger.Error(err.Error())
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -307,6 +317,7 @@ func configurePoolMirroring(pool string, mode types.RbdResourceType, localName s

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to execute rbd command: %v", err)
}

Expand All @@ -329,17 +340,20 @@ func configureImageMirroring(req types.RbdReplicationRequest) error {

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to configure rbd image feature: %v", err)
}

if mode == types.RbdReplicationSnapshot {
err = createSnapshot(pool, image)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to create image(%s/%s) snapshot : %v", pool, image, err)
}

err = configureSnapshotSchedule(pool, image, schedule, "")
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to create image(%s/%s) snapshot schedule(%s) : %v", pool, image, schedule, err)
}
}
Expand All @@ -354,12 +368,14 @@ func getSnapshotSchedule(pool string, image string) (imageSnapshotSchedule, erro

output, err := listSnapshotSchedule(pool, image)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return imageSnapshotSchedule{}, err
}

ret := []imageSnapshotSchedule{}
err = json.Unmarshal(output, &ret)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return imageSnapshotSchedule{}, nil
}

Expand All @@ -381,6 +397,7 @@ func listSnapshotSchedule(pool string, image string) ([]byte, error) {

output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return []byte(""), err
}

Expand All @@ -401,7 +418,7 @@ func listAllImagesInPool(pool string, localName string, remoteName string) []str
var ret []string
err = json.Unmarshal([]byte(output), &ret)
if err != nil {
logger.Errorf("unexpected error encountered while parsing json output %s", output)
logger.Errorf("REPRBD: unexpected error encountered while parsing json output %s", output)
return []string{}
}

Expand Down Expand Up @@ -433,6 +450,7 @@ func configureSnapshotSchedule(pool string, image string, schedule string, start

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -445,6 +463,7 @@ func createSnapshot(pool string, image string) error {

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return err
}

Expand All @@ -458,6 +477,7 @@ func configureImageFeatures(pool string, image string, op string, feature string

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to configure rbd image feature: %v", err)
}

Expand All @@ -477,6 +497,7 @@ func peerBootstrapCreate(pool string, client string, cluster string) (string, er

output, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return "", fmt.Errorf("failed to bootstrap peer token: %v", err)
}

Expand All @@ -502,6 +523,7 @@ func peerBootstrapImport(pool string, client string, cluster string) error {

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to import peer bootstrap token: %v", err)
}

Expand All @@ -519,6 +541,7 @@ func peerRemove(pool string, peerId string, localName string, remoteName string)

_, err := processExec.RunCommand("rbd", args...)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("failed to remove peer(%s) for pool(%s): %v", peerId, pool, err)
}

Expand Down Expand Up @@ -580,6 +603,7 @@ func writeRemotePeerToken(token string, remoteName string) error {

err := os.MkdirAll(tokenDirPath, constants.PermissionWorldNoAccess)
if err != nil {
logger.Errorf("REPRBD: %s", err.Error())
return fmt.Errorf("unable to create %q: %w", tokenDirPath, err)
}

Expand All @@ -591,15 +615,15 @@ func writeRemotePeerToken(token string, remoteName string) error {
file, err := os.Create(tokenFilePath)
if err != nil {
ne := fmt.Errorf("failed to create the token file(%s): %w", tokenFilePath, err)
logger.Error(ne.Error())
logger.Errorf("REPRBD: %s", ne.Error())
return ne
}

// write to file
_, err = file.WriteString(token)
if err != nil {
ne := fmt.Errorf("failed to write the token file(%s): %w", tokenFilePath, err)
logger.Error(ne.Error())
logger.Errorf("REPRBD: %s", ne.Error())
return ne
}

Expand Down
Loading

0 comments on commit 10c86c5

Please sign in to comment.