Skip to content

Commit

Permalink
Create Reaper StatefulSet (part 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
emerkle826 committed May 8, 2024
1 parent caa8b5b commit a846f89
Show file tree
Hide file tree
Showing 3 changed files with 424 additions and 17 deletions.
141 changes: 132 additions & 9 deletions controllers/reaper/reaper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,29 @@ func (r *ReaperReconciler) reconcile(ctx context.Context, actualReaper *reaperap
actualReaper.Status.Progress = reaperapi.ReaperProgressPending
actualReaper.Status.SetNotReady()

var actualDc *cassdcapi.CassandraDatacenter
// skip DC reconcile if Reaper is in Control Plane
if actualReaper.Spec.ControlPlaneMode {
logger.Info("Skipping Reaper DC reconcile, Reaper is in Control Plane mode")
logger.Info("Reconciling Reaper in Control Plane mode as StatefulSet")
if result, err := r.reconcileAsStatefulSet(ctx, actualReaper, logger); !result.IsZero() || err != nil {
return result, err
}
} else {
dc, result, err := r.reconcileDatacenter(ctx, actualReaper, logger)
if !result.IsZero() || err != nil {
logger.Info("Reconciling Reaper as Deployment")
if result, err := r.reconcileAsDeployment(ctx, actualReaper, logger); !result.IsZero() || err != nil {
return result, err
}
actualDc = dc
}

actualReaper.Status.Progress = reaperapi.ReaperProgressRunning
actualReaper.Status.SetReady()

logger.Info("Reaper successfully reconciled")
return ctrl.Result{}, nil
}

func (r *ReaperReconciler) reconcileAsDeployment(ctx context.Context, actualReaper *reaperapi.Reaper, logger logr.Logger) (ctrl.Result, error) {
actualDc, result, err := r.reconcileDatacenter(ctx, actualReaper, logger)
if !result.IsZero() || err != nil {
return result, err
}

actualReaper.Status.Progress = reaperapi.ReaperProgressDeploying
Expand All @@ -119,13 +132,26 @@ func (r *ReaperReconciler) reconcile(ctx context.Context, actualReaper *reaperap
return result, err
}

actualReaper.Status.Progress = reaperapi.ReaperProgressRunning
actualReaper.Status.SetReady()
return ctrl.Result{}, nil
}

func (r *ReaperReconciler) reconcileAsStatefulSet(ctx context.Context, actualReaper *reaperapi.Reaper, logger logr.Logger) (ctrl.Result, error) {
actualReaper.Status.Progress = reaperapi.ReaperProgressDeploying

if result, err := r.reconcileStatefulSet(ctx, actualReaper, logger); !result.IsZero() || err != nil {
return result, err
}

if result, err := r.reconcileService(ctx, actualReaper, logger); !result.IsZero() || err != nil {
return result, err
}

actualReaper.Status.Progress = reaperapi.ReaperProgressConfiguring

logger.Info("Reaper successfully reconciled")
return ctrl.Result{}, nil
}


func (r *ReaperReconciler) reconcileDatacenter(
ctx context.Context,
actualReaper *reaperapi.Reaper,
Expand Down Expand Up @@ -254,6 +280,103 @@ func (r *ReaperReconciler) reconcileDeployment(
return ctrl.Result{}, nil
}

func (r *ReaperReconciler) reconcileStatefulSet(
ctx context.Context,
actualReaper *reaperapi.Reaper,
logger logr.Logger,
) (ctrl.Result, error) {

statefuleSetKey := types.NamespacedName{Namespace: actualReaper.Namespace, Name: actualReaper.Name}
logger = logger.WithValues("StatefulSet", statefuleSetKey)
logger.Info("Reconciling Reaper StatefulSet, req was %#v", actualReaper)

authVars, err := r.collectAuthVars(ctx, actualReaper, logger)
if err != nil {
logger.Error(err, "Failed to collect Reaper auth variables")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
}
logger.Info("Collected Reaper auth variables", "authVars", authVars)

var keystorePassword *string
var truststorePassword *string

if actualReaper.Spec.ClientEncryptionStores != nil && !actualReaper.Spec.UseExternalSecrets() {
if password, err := cassandra.ReadEncryptionStorePassword(ctx, actualReaper.Namespace, r.Client, actualReaper.Spec.ClientEncryptionStores, encryption.StoreNameKeystore); err != nil {
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
} else {
keystorePassword = pointer.String(password)
}

if password, err := cassandra.ReadEncryptionStorePassword(ctx, actualReaper.Namespace, r.Client, actualReaper.Spec.ClientEncryptionStores, encryption.StoreNameTruststore); err != nil {
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
} else {
truststorePassword = pointer.String(password)
}
}

// TODO reconcile Vector configmap
//if vectorReconcileResult, err := r.reconcileVectorConfigMap(ctx, *actualReaper, actualDc, r.Client, logger); err != nil {
// return vectorReconcileResult, err
//} else if vectorReconcileResult.Requeue {
// return vectorReconcileResult, nil
//}

logger.Info("Reconciling reaper statefulset", "actualReaper", actualReaper)
desiredStatefulSet := reaper.NewStatefulSet(actualReaper, keystorePassword, truststorePassword, logger, authVars...)

actualStatefuleSet := &appsv1.StatefulSet{}
if err := r.Get(ctx, statefuleSetKey, actualStatefuleSet); err != nil {
if errors.IsNotFound(err) {
if err = controllerutil.SetControllerReference(actualReaper, desiredStatefulSet, r.Scheme); err != nil {
logger.Error(err, "Failed to set owner on Reaper StatefulSet")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
} else if err = r.Create(ctx, desiredStatefulSet); err != nil {
if errors.IsAlreadyExists(err) {
// the read from the local cache didn't catch that the resource was created
// already; simply requeue until the cache is up-to-date
return ctrl.Result{Requeue: true}, nil
} else {
logger.Error(err, "Failed to create Reaper StatefulSet")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
}
}
logger.Info("Reaper StatefulSet created successfully")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
} else {
logger.Error(err, "Failed to get Reaper StatefulSet")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
}
}

if err = r.reconcileReaperTelemetry(ctx, actualReaper, logger, r.Client); err != nil {
logger.Error(err, "reconcileReaperTelemetry failed")
return ctrl.Result{}, err
}

actualStatefuleSet = actualStatefuleSet.DeepCopy()

// Check if the statefulset needs to be updated
if !annotations.CompareHashAnnotations(actualStatefuleSet, desiredStatefulSet) {
logger.Info("Updating Reaper StatefulSet")
resourceVersion := actualStatefuleSet.GetResourceVersion()
desiredStatefulSet.DeepCopyInto(actualStatefuleSet)
actualStatefuleSet.SetResourceVersion(resourceVersion)
if err := controllerutil.SetControllerReference(actualReaper, actualStatefuleSet, r.Scheme); err != nil {
logger.Error(err, "Failed to set controller reference on updated Reaper StatefulSet")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
} else if err := r.Update(ctx, actualStatefuleSet); err != nil {
logger.Error(err, "Failed to update Reaper StatefulSet")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
} else {
logger.Info("Reaper StatefulSet updated successfully")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}
}

logger.Info("Reaper StatefulSet ready")
return ctrl.Result{}, nil
}

func (r *ReaperReconciler) reconcileService(
ctx context.Context,
actualReaper *reaperapi.Reaper,
Expand Down
Loading

0 comments on commit a846f89

Please sign in to comment.