Skip to content

Commit

Permalink
compatible with v1 and v1alpha2 cri api version
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Aug 4, 2023
1 parent da374ff commit 23b21f5
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 17 deletions.
12 changes: 6 additions & 6 deletions pkg/daemon/criruntime/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
var typedVersion *runtimeapi.VersionResponse

switch cfg.runtimeType {
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeContainerd, ContainerRuntimeCommonCRI, ContainerRuntimePouch:
addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
if err != nil {
Expand All @@ -107,6 +101,12 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
continue
}
}

if _, err = imageService.ListImages(context.TODO()); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemon/criruntime/imageruntime/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func NewContainerdImageService(
accountManager: accountManager,
snapshotter: snapshotter,
client: client,
// TODO: compatible with v1alpha2 cri api
criImageClient: runtimeapi.NewImageServiceClient(conn),
httpProxy: httpProxy,
}, nil
Expand Down Expand Up @@ -325,6 +326,7 @@ func getDefaultValuesFromCRIStatus(conn *grpc.ClientConn) (snapshotter string, h
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()

// TODO: compatible with v1alpha2 cri api
rclient := runtimeapi.NewRuntimeServiceClient(conn)
resp, err := rclient.Status(ctx, &runtimeapi.StatusRequest{Verbose: true})
if err != nil {
Expand Down
165 changes: 154 additions & 11 deletions pkg/daemon/criruntime/imageruntime/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package imageruntime
import (
"context"
"io"
"reflect"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -26,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cri/remote/util"
"k8s.io/kubernetes/pkg/util/parsers"
Expand Down Expand Up @@ -53,26 +55,47 @@ func NewCRIImageService(runtimeURI string, accountManager daemonutil.ImagePullAc
return nil, err
}

klog.V(4).InfoS("Finding the CRI API image version")
imageClient := runtimeapi.NewImageServiceClient(conn)

if _, err := imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}); err == nil {
klog.V(2).InfoS("Using CRI v1 image API")
imageClientV1, imageClientV1alpha2, err := determineImageClientAPIVersion(conn)
if err != nil {
klog.ErrorS(err, "Failed to determine CRI image API version")
return nil, err
}

return &commonCRIImageService{
accountManager: accountManager,
criImageClient: imageClient,
accountManager: accountManager,
criImageClient: imageClientV1,
criImageClientV1alpha2: imageClientV1alpha2,
}, nil
}

type commonCRIImageService struct {
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
accountManager daemonutil.ImagePullAccountManager
criImageClient runtimeapi.ImageServiceClient
criImageClientV1alpha2 runtimeapiv1alpha2.ImageServiceClient
}

func (c *commonCRIImageService) useV1API() bool {
return c.criImageClientV1alpha2 == nil || reflect.ValueOf(c.criImageClientV1alpha2).IsNil()
}

// PullImage implements ImageService.PullImage.
func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
if c.useV1API() {
return c.pullImageV1(ctx, imageName, tag, pullSecrets, sandboxConfig)
}
return c.pullImageV1alpha2(ctx, imageName, tag, pullSecrets, sandboxConfig)
}

// ListImages implements ImageService.ListImages.
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
if c.useV1API() {
return c.listImagesV1(ctx)
}
return c.listImagesV1alpha2(ctx)
}

// PullImage implements ImageService.PullImage using v1 CRI client.
func (c *commonCRIImageService) pullImageV1(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
repoToPull, _, _, err := parsers.ParseImageName(fullImageName)
Expand Down Expand Up @@ -172,8 +195,8 @@ func (c *commonCRIImageService) PullImage(ctx context.Context, imageName, tag st
return newImagePullStatusReader(pipeR), nil
}

// ListImages implements ImageService.ListImages.
func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, error) {
// ListImages implements ImageService.ListImages using V1 CRI client.
func (c *commonCRIImageService) listImagesV1(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapi.ListImagesRequest{}
listImagesResp, err := c.criImageClient.ListImages(ctx, listImagesReq)
if err != nil {
Expand All @@ -190,3 +213,123 @@ func (c *commonCRIImageService) ListImages(ctx context.Context) ([]ImageInfo, er
}
return collection, nil
}

// PullImage implements ImageService.PullImage using v1alpha2 CRI client.
func (c *commonCRIImageService) pullImageV1alpha2(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (ImagePullStatusReader, error) {
registry := daemonutil.ParseRegistry(imageName)
fullImageName := imageName + ":" + tag
repoToPull, _, _, err := parsers.ParseImageName(fullImageName)
if err != nil {
return nil, err
}
// Reader
pipeR, pipeW := io.Pipe()
defer pipeW.Close()

var auth *runtimeapiv1alpha2.AuthConfig
pullImageReq := &runtimeapiv1alpha2.PullImageRequest{
Image: &runtimeapiv1alpha2.ImageSpec{
Image: fullImageName,
Annotations: make(map[string]string),
},
Auth: auth, //default is nil
}
if sandboxConfig != nil {
pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{
Annotations: sandboxConfig.Annotations,
Labels: sandboxConfig.Labels,
}
if pullImageReq.SandboxConfig.Annotations == nil {
pullImageReq.SandboxConfig.Annotations = map[string]string{}
}
} else {
pullImageReq.SandboxConfig = &runtimeapiv1alpha2.PodSandboxConfig{
Annotations: map[string]string{},
}
}
// Add this default annotation to avoid unexpected panic caused by sandboxConfig is nil
// for some runtime implementations.
pullImageReq.SandboxConfig.Annotations[pullingImageSandboxConfigAnno] = "kruise-daemon"

if len(pullSecrets) > 0 {
var authInfos []daemonutil.AuthInfo
authInfos, err = convertToRegistryAuths(pullSecrets, repoToPull)
if err == nil {
var pullErrs []error
for _, authInfo := range authInfos {
var pullErr error
klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username)
pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{
Username: authInfo.Username,
Password: authInfo.Password,
}
_, pullErr = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if pullErr == nil {
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}
klog.Warningf("Failed to pull image %v:%v with user %v, err %v", imageName, tag, authInfo.Username, pullErr)
pullErrs = append(pullErrs, pullErr)

}
if len(pullErrs) > 0 {
err = utilerrors.NewAggregate(pullErrs)
}
}
}

// Try the default secret
if c.accountManager != nil {
var authInfo *daemonutil.AuthInfo
var defaultErr error
authInfo, defaultErr = c.accountManager.GetAccountInfo(registry)
if defaultErr != nil {
klog.Warningf("Failed to get account for registry %v, err %v", registry, defaultErr)
// When the default account acquisition fails, try to pull anonymously
} else if authInfo != nil {
klog.V(5).Infof("Pull image %v:%v with user %v", imageName, tag, authInfo.Username)
pullImageReq.Auth = &runtimeapiv1alpha2.AuthConfig{
Username: authInfo.Username,
Password: authInfo.Password,
}
_, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if err == nil {
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}
klog.Warningf("Failed to pull image %v:%v, err %v", imageName, tag, err)
return nil, err
}
}

if err != nil {
return nil, err
}

// Anonymous pull
_, err = c.criImageClientV1alpha2.PullImage(ctx, pullImageReq)
if err != nil {
return nil, errors.Wrapf(err, "Failed to pull image reference %q", fullImageName)
}
pipeW.CloseWithError(io.EOF)
return newImagePullStatusReader(pipeR), nil
}

// ListImages implements ImageService.ListImages using V1alpha2 CRI client.
func (c *commonCRIImageService) listImagesV1alpha2(ctx context.Context) ([]ImageInfo, error) {
listImagesReq := &runtimeapiv1alpha2.ListImagesRequest{}
listImagesResp, err := c.criImageClientV1alpha2.ListImages(ctx, listImagesReq)
if err != nil {
return nil, err
}
collection := make([]ImageInfo, 0, len(listImagesResp.GetImages()))
for _, img := range listImagesResp.GetImages() {
collection = append(collection, ImageInfo{
ID: img.GetId(),
RepoTags: img.GetRepoTags(),
RepoDigests: img.GetRepoDigests(),
Size: int64(img.GetSize_()),
})
}
return collection, nil
}
27 changes: 27 additions & 0 deletions pkg/daemon/criruntime/imageruntime/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ limitations under the License.
package imageruntime

import (
"context"
"encoding/json"
"fmt"
"io"
"time"

dockermessage "github.com/docker/docker/pkg/jsonmessage"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
Expand Down Expand Up @@ -239,3 +246,23 @@ func (c ImageInfo) ContainsImage(name string, tag string) bool {
}
return false
}

func determineImageClientAPIVersion(conn *grpc.ClientConn) (runtimeapi.ImageServiceClient, runtimeapiv1alpha2.ImageServiceClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

klog.V(4).InfoS("Finding the CRI API image version")
imageClientV1 := runtimeapi.NewImageServiceClient(conn)

_, err := imageClientV1.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{})
if err == nil {
klog.V(2).InfoS("Using CRI v1 image API")
return imageClientV1, nil, nil

} else if status.Code(err) == codes.Unimplemented {
klog.V(2).InfoS("Falling back to CRI v1alpha2 image API (deprecated in k8s 1.24)")
return nil, runtimeapiv1alpha2.NewImageServiceClient(conn), nil
}

return nil, nil, fmt.Errorf("unable to determine image API version: %w", err)
}

0 comments on commit 23b21f5

Please sign in to comment.