Skip to content

Commit

Permalink
Merge pull request #324 from shriramsharma/generate-vs-cross-cluster-…
Browse files Browse the repository at this point in the history
…routing

generating ingress virtualservice to enable cross-cluster communication with passthrough gateway
  • Loading branch information
shriramsharma authored Aug 26, 2024
2 parents 09565cb + 91d212c commit 2d8556d
Show file tree
Hide file tree
Showing 6 changed files with 841 additions and 1 deletion.
6 changes: 6 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().StringVar(&params.ShardIdentityValue, "shard_identity_value", "", "Admiral operator should watch shards where shardIdentityLabelKey == shardIdentityValue")
rootCmd.PersistentFlags().StringVar(&params.OperatorSecretFilterTags, "operator_secret_filter_tags", "admiral/syncoperator",
"Filter tags for the specific admiral operator namespace secret to watch")

// Flags pertaining to VS based routing
rootCmd.PersistentFlags().BoolVar(&params.EnableVSRouting, "enable_vs_routing", false, "Enable/Disable VS Based Routing")
rootCmd.PersistentFlags().StringArrayVar(&params.VSRoutingGateways, "vs_routing_gateways", []string{}, "The PASSTHROUGH gateways to use for VS based routing")
rootCmd.PersistentFlags().StringArrayVar(&params.IngressVSExportToNamespaces, "ingress_vs_export_to_namespaces", []string{"istio-system"}, "List of namespaces where the ingress VS should be exported")

return rootCmd
}

Expand Down
32 changes: 32 additions & 0 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
gtpManagedByMeshAgent = "mesh-agent"
gtpManagerMeshAgentFieldValue = "ewok-mesh-agent"
errorCluster = "error-cluster"
ingressVSGenerationErrorMessage = "skipped generating ingress virtual service on cluster %s due to error %w"
)

func createServiceEntryForDeployment(
Expand Down Expand Up @@ -159,6 +160,7 @@ func modifyServiceEntryForNewServiceOrPod(
sourceClusters []string
isAdditionalEndpointGenerationEnabled bool
deployRolloutMigration = make(map[string]bool)
sourceIngressVirtualService = make(map[string]*v1alpha3.VirtualService)
)

clusterName, ok := ctx.Value(common.ClusterName).(string)
Expand Down Expand Up @@ -290,6 +292,16 @@ func modifyServiceEntryForNewServiceOrPod(
Namespace: namespace,
Type: common.Deployment,
}
if common.IsVSBasedRoutingEnabled() {
err := generateIngressVirtualServiceForDeployment(deployment, sourceIngressVirtualService)
if err != nil {
err = fmt.Errorf(ingressVSGenerationErrorMessage, clusterId, err)
ctxLogger.Errorf(common.CtxLogFormat, "generateIngressVirtualServiceForDeployment",
deployment.Name, deployment.Namespace, clusterId, err.Error())
modifySEerr = common.AppendError(modifySEerr, err)
}
}

}

if rollout != nil {
Expand Down Expand Up @@ -339,6 +351,16 @@ func modifyServiceEntryForNewServiceOrPod(
Namespace: namespace,
Type: common.Rollout,
}

if common.IsVSBasedRoutingEnabled() {
err := generateIngressVirtualServiceForRollout(rollout, sourceIngressVirtualService)
if err != nil {
err = fmt.Errorf(ingressVSGenerationErrorMessage, clusterId, err)
ctxLogger.Errorf(common.CtxLogFormat, "generateIngressVirtualServiceForRollout",
rollout.Name, rollout.Namespace, clusterId, err.Error())
modifySEerr = common.AppendError(modifySEerr, err)
}
}
}

start = time.Now()
Expand Down Expand Up @@ -768,6 +790,16 @@ func modifyServiceEntryForNewServiceOrPod(
ctxLogger.Infof(common.CtxLogFormat, "updateRegistryConfigForClusterPerEnvironment", deploymentOrRolloutName, deploymentOrRolloutNS, "", "done writing")
return nil, nil
}
// If VS based routing is enabled, generate VirtualServices for the source cluster's ingress
// This is done after the ServiceEntries are created for the source cluster
if common.IsVSBasedRoutingEnabled() && len(sourceIngressVirtualService) > 0 {
err := addUpdateVirtualServicesForSourceIngress(
ctx, ctxLogger, remoteRegistry, sourceServices, sourceIngressVirtualService, sourceDeployments, sourceRollouts)
if err != nil {
modifySEerr = common.AppendError(modifySEerr, err)
}
}

//Write to dependent clusters
start = time.Now()
isServiceEntryModifyCalledForSourceCluster = false
Expand Down
211 changes: 210 additions & 1 deletion admiral/pkg/clusters/virtualservice_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (

"github.com/google/uuid"
commonUtil "github.com/istio-ecosystem/admiral/admiral/pkg/util"
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"

argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
constUtil "github.com/istio-ecosystem/admiral/admiral/pkg/util"
log "github.com/sirupsen/logrus"
networkingV1Alpha3 "istio.io/api/networking/v1alpha3"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
Expand All @@ -22,6 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
vsRoutingLabel = "admiral.io/vs-routing"
)

// NewVirtualServiceHandler returns a new instance of VirtualServiceHandler after verifying
// the required properties are set correctly
func NewVirtualServiceHandler(remoteRegistry *RemoteRegistry, clusterID string) (*VirtualServiceHandler, error) {
Expand Down Expand Up @@ -474,7 +481,16 @@ func addUpdateVirtualService(
newCopy.Annotations = map[string]string{}
}
newCopy.Annotations["app.kubernetes.io/created-by"] = "admiral"
if common.EnableExportTo(newCopy.Spec.Hosts[0]) {

skipAddingExportTo := false
//Check if VS has the admiral.io/vs-routing label
// If it does, skip adding ExportTo since it is already set to "istio-system" only
// The VS created for routing cross cluster traffic should only be exported to istio-system
if newCopy.Labels != nil && newCopy.Labels[vsRoutingLabel] == "enabled" {
skipAddingExportTo = true
}

if common.EnableExportTo(newCopy.Spec.Hosts[0]) && !skipAddingExportTo {
sortedDependentNamespaces := getSortedDependentNamespaces(rr.AdmiralCache, newCopy.Spec.Hosts[0], rc.ClusterID, ctxLogger)
newCopy.Spec.ExportTo = sortedDependentNamespaces
ctxLogger.Infof(LogFormat, "ExportTo", common.VirtualServiceResourceType, newCopy.Name, rc.ClusterID, fmt.Sprintf("VS usecase-ExportTo updated to %v", newCopy.Spec.ExportTo))
Expand Down Expand Up @@ -612,3 +628,196 @@ func deleteVirtualService(ctx context.Context, exist *v1alpha3.VirtualService, n
}
return nil
}

// getBaseVirtualServiceForIngress generates the base virtual service for the ingress gateway
// The destinations should be added separately, this func does not have the context of the destinations.
// TODO: Support for multiple hosts and sniHosts. Will be needed when additional endpoints are generated using GTPs
func getBaseVirtualServiceForIngress(host, sniHost string) (*v1alpha3.VirtualService, error) {

if host == "" {
return nil, fmt.Errorf("host is empty")
}

if sniHost == "" {
return nil, fmt.Errorf("sniHost is empty")
}

gateways := common.GetVSRoutingGateways()
if len(gateways) == 0 {
return nil, fmt.Errorf("no gateways configured for ingress virtual service")
}

vs := networkingV1Alpha3.VirtualService{
// We are using the SNI host in hosts field as they need to match
Hosts: []string{sniHost},
Gateways: gateways,
ExportTo: common.GetIngressVSExportToNamespace(),
Tls: []*networkingV1Alpha3.TLSRoute{
{
Match: []*networkingV1Alpha3.TLSMatchAttributes{
{
Port: common.DefaultMtlsPort,
SniHosts: []string{sniHost},
},
},
},
},
}

// Explicitly labeling the VS for routing
vsLabels := map[string]string{
vsRoutingLabel: "enabled",
}

return &v1alpha3.VirtualService{
ObjectMeta: metaV1.ObjectMeta{
Name: host + "-routing-vs",
Namespace: common.GetSyncNamespace(),
Labels: vsLabels,
},
Spec: vs,
}, nil
}

// generateIngressVirtualServiceForDeployment generates the base virtual service for a given deployment
// and adds it to the sourceIngressVirtualService map
// sourceIngressVirtualService is a map of globalFQDN (.mesh) endpoint to VirtualService
func generateIngressVirtualServiceForDeployment(
deployment *k8sAppsV1.Deployment,
sourceIngressVirtualService map[string]*v1alpha3.VirtualService) error {
if deployment == nil {
return fmt.Errorf("deployment is nil")
}
workloadIdentityKey := common.GetWorkloadIdentifier()
cname := common.GetCname(deployment, workloadIdentityKey, common.GetHostnameSuffix())
if cname == "" {
return fmt.Errorf("cname is empty")
}
sniHost, err := generateSNIHost(cname)
if err != nil {
return err
}
baseVS, err := getBaseVirtualServiceForIngress(cname, sniHost)
if err != nil {
return err
}
sourceIngressVirtualService[cname] = baseVS
return nil
}

// generateIngressVirtualServiceForRollout generates the base virtual service for a given rollout
// and adds it to the sourceIngressVirtualService map
// sourceIngressVirtualService is a map of globalFQDN (.mesh) endpoint to VirtualService
// TODO: Generate VS for canary and blue green strategies
func generateIngressVirtualServiceForRollout(
rollout *argo.Rollout,
sourceIngressVirtualService map[string]*v1alpha3.VirtualService) error {
if rollout == nil {
return fmt.Errorf("rollout is nil")
}
workloadIdentityKey := common.GetWorkloadIdentifier()
cname := common.GetCnameForRollout(rollout, workloadIdentityKey, common.GetHostnameSuffix())
if cname == "" {
return fmt.Errorf("cname is empty")
}
sniHost, err := generateSNIHost(cname)
if err != nil {
return err
}
baseVS, err := getBaseVirtualServiceForIngress(cname, sniHost)
if err != nil {
return err
}
sourceIngressVirtualService[cname] = baseVS
return nil
}

// generateSNIHost generates the SNI host for the virtual service in the format outbound_.80_._.<fqdn>
// Example: outbound_.80_._.httpbin.global.mesh
func generateSNIHost(fqdn string) (string, error) {
if fqdn == "" {
return "", fmt.Errorf("fqdn is empty")
}
return fmt.Sprintf("outbound_.%d_._.%s", common.DefaultServiceEntryPort, fqdn), nil
}

// addUpdateVirtualServicesForSourceIngress adds or updates the cross-cluster routing VirtualServices exported to
// istio-system namespace.
func addUpdateVirtualServicesForSourceIngress(
ctx context.Context,
ctxLogger *log.Entry,
remoteRegistry *RemoteRegistry,
sourceServices map[string]map[string]*k8sV1.Service,
sourceIngressVirtualService map[string]*v1alpha3.VirtualService,
sourceDeployment map[string]*k8sAppsV1.Deployment,
sourceRollout map[string]*argo.Rollout) error {

if remoteRegistry == nil {
return fmt.Errorf("remoteRegistry is nil")
}

for sourceCluster, serviceInstance := range sourceServices {

destinationHostPort := make(map[string]uint32)
if serviceInstance[common.Deployment] != nil {
host := serviceInstance[common.Deployment].Name + "." + serviceInstance[common.Deployment].Namespace + ".svc.cluster.local"
protocolPortMap := common.GetMeshPortsForDeployments(sourceCluster, serviceInstance[common.Deployment], sourceDeployment[sourceCluster])
if port, ok := protocolPortMap[constUtil.Http]; ok {
destinationHostPort[host] = port
}
}
if serviceInstance[common.Rollout] != nil {
host := serviceInstance[common.Rollout].Name + "." + serviceInstance[common.Rollout].Namespace + ".svc.cluster.local"
protocolPortMap := GetMeshPortsForRollout(sourceCluster, serviceInstance[common.Deployment], sourceRollout[sourceCluster])
if port, ok := protocolPortMap[constUtil.Http]; ok {
destinationHostPort[host] = port
}
}

if len(destinationHostPort) == 0 {
return fmt.Errorf("no destination found for the ingress virtualservice")
}

rc := remoteRegistry.GetRemoteController(sourceCluster)

if rc == nil {
ctxLogger.Warnf(common.CtxLogFormat, "addUpdateVirtualServicesForSourceIngress",
"", "", sourceCluster, "remote controller not initialized on this cluster")
continue
}

for fqdn, virtualService := range sourceIngressVirtualService {
if len(virtualService.Spec.Tls) == 0 {
return fmt.Errorf("no TLSRoute found in the ingress virtualservice with host %s", fqdn)
}
if len(virtualService.Spec.Tls) > 1 {
return fmt.Errorf("more than one TLSRoute found in the ingress virtualservice with host %s", fqdn)
}
routeDestinations := make([]*networkingV1Alpha3.RouteDestination, 0)
for host, port := range destinationHostPort {
routeDestination := &networkingV1Alpha3.RouteDestination{
Destination: &networkingV1Alpha3.Destination{
Host: host,
Port: &networkingV1Alpha3.PortSelector{
Number: port,
},
},
}
routeDestinations = append(routeDestinations, routeDestination)
}
virtualService.Spec.Tls[0].Route = routeDestinations

existingVS, err := getExistingVS(ctxLogger, ctx, rc, virtualService.Name)
if err != nil {
ctxLogger.Warn(err.Error())
}

ctxLogger.Infof(common.CtxLogFormat, "addUpdateVirtualServicesForSourceIngress",
virtualService.Name, virtualService.Namespace, sourceCluster, "Add/Update ingress virtualservice")
err = addUpdateVirtualService(
ctxLogger, ctx, virtualService, existingVS, common.GetSyncNamespace(), rc, remoteRegistry)

}
}
return nil
}
Loading

0 comments on commit 2d8556d

Please sign in to comment.