Skip to content

Commit

Permalink
discover: sync DirectPV nodes with CSI nodes (#795)
Browse files Browse the repository at this point in the history
Signed-off-by: Bala.FA <bala@minio.io>
  • Loading branch information
balamurugana authored Jun 13, 2023
1 parent d49add7 commit 514fc98
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 50 deletions.
44 changes: 44 additions & 0 deletions cmd/kubectl-directpv/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,51 @@ func discoverDevices(ctx context.Context, nodes []types.Node, teaProgram *tea.Pr
}
}

func syncNodes(ctx context.Context) (err error) {
csiNodes, err := getCSINodes(ctx)
if err != nil {
return fmt.Errorf("unable to get CSI nodes; %w", err)
}

nodes, err := node.NewLister().Get(ctx)
if err != nil {
return fmt.Errorf("unable to get nodes; %w", err)
}

var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}

// Add missing nodes.
for _, csiNode := range csiNodes {
if !utils.Contains(nodeNames, csiNode) {
node := types.NewNode(directpvtypes.NodeID(csiNode), nil)
node.Spec.Refresh = true
if _, err = client.NodeClient().Create(ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to create node %v; %w", csiNode, err)
}
}
}

// Remove non-existing nodes.
for _, nodeName := range nodeNames {
if !utils.Contains(csiNodes, nodeName) {
if err = client.NodeClient().Delete(ctx, nodeName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("unable to remove non-existing node %v; %w", nodeName, err)
}
}
}

return nil
}

func discoverMain(ctx context.Context) {
if err := syncNodes(ctx); err != nil {
utils.Eprintf(quietFlag, true, "sync failed; %v\n", err)
os.Exit(1)
}

nodes, err := node.NewLister().
NodeSelector(toLabelValues(nodesArgs)).
Get(ctx)
Expand Down
51 changes: 1 addition & 50 deletions cmd/kubectl-directpv/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/dustin/go-humanize"
"github.com/fatih/color"
Expand All @@ -32,10 +31,7 @@ import (
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
)

var infoCmd = &cobra.Command{
Expand Down Expand Up @@ -70,57 +66,12 @@ func infoMain(ctx context.Context) {
os.Exit(1)
}

storageClient, gvk, err := k8s.GetClientForNonCoreGroupVersionKind("storage.k8s.io", "CSINode", "v1", "v1beta1", "v1alpha1")
nodeList, err := getCSINodes(ctx)
if err != nil {
utils.Eprintf(quietFlag, true, "%v\n", err)
os.Exit(1)
}

nodeList := []string{}
switch gvk.Version {
case "v1":
result := &storagev1.CSINodeList{}
if err := storageClient.Get().
Resource("csinodes").
VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
Timeout(10 * time.Second).
Do(ctx).
Into(result); err != nil {
utils.Eprintf(quietFlag, true, "unable to get CSI nodes; %v\n", err)
os.Exit(1)
}
for _, csiNode := range result.Items {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == consts.Identity {
nodeList = append(nodeList, csiNode.Name)
break
}
}
}
case "v1beta1":
result := &storagev1beta1.CSINodeList{}
if err := storageClient.Get().
Resource(gvk.Kind).
VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
Timeout(10 * time.Second).
Do(ctx).
Into(result); err != nil {
utils.Eprintf(quietFlag, true, "unable to get CSI nodes; %v\n", err)
os.Exit(1)
}
for _, csiNode := range result.Items {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == consts.Identity {
nodeList = append(nodeList, csiNode.Name)
break
}
}
}
case "v1apha1":
utils.Eprintf(quietFlag, true, "storage.k8s.io/v1alpha1 is not supported\n")
os.Exit(1)
}

if len(nodeList) == 0 {
utils.Eprintf(quietFlag, true, "%v not installed\n", consts.AppPrettyName)
os.Exit(1)
Expand Down
59 changes: 59 additions & 0 deletions cmd/kubectl-directpv/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@
package main

import (
"context"
"errors"
"fmt"
"os"
"path"
"time"

"github.com/dustin/go-humanize"
"github.com/jedib0t/go-pretty/v6/table"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/utils"
"github.com/mitchellh/go-homedir"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -127,3 +134,55 @@ func validateOutputFormat(isWideSupported bool) error {
}
return nil
}

func getCSINodes(ctx context.Context) (nodes []string, err error) {
storageClient, gvk, err := k8s.GetClientForNonCoreGroupVersionKind("storage.k8s.io", "CSINode", "v1", "v1beta1", "v1alpha1")
if err != nil {
return nil, err
}

switch gvk.Version {
case "v1apha1":
err = fmt.Errorf("unsupported CSINode storage.k8s.io/v1alpha1")
case "v1":
result := &storagev1.CSINodeList{}
if err = storageClient.Get().
Resource("csinodes").
VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
Timeout(10 * time.Second).
Do(ctx).
Into(result); err != nil {
err = fmt.Errorf("unable to get csinodes; %w", err)
break
}
for _, csiNode := range result.Items {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == consts.Identity {
nodes = append(nodes, csiNode.Name)
break
}
}
}
case "v1beta1":
result := &storagev1beta1.CSINodeList{}
if err = storageClient.Get().
Resource(gvk.Kind).
VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
Timeout(10 * time.Second).
Do(ctx).
Into(result); err != nil {
err = fmt.Errorf("unable to get csinodes; %w", err)
break
}
for _, csiNode := range result.Items {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == consts.Identity {
nodes = append(nodes, csiNode.Name)
break
}
}
}
}

return nodes, err
}

0 comments on commit 514fc98

Please sign in to comment.