Skip to content

Commit

Permalink
Added literal offloading checks across hetrogeneous tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
pmahindrakar-oss committed Sep 6, 2024
1 parent 16474e5 commit 114e004
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
catalogClient, recoveryClient, cfg.LiteralOffloadingConfig, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
Expand Down
36 changes: 35 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ type nodeExecutor struct {
defaultExecutionDeadline time.Duration
enqueueWorkflow v1alpha1.EnqueueWorkflow
eventConfig *config.EventConfig
literalOffloadingConfig config.LiteralOffloadingConfig
interruptibleFailureThreshold int32
maxNodeRetriesForSystemFailures uint32
metrics *nodeMetrics
Expand Down Expand Up @@ -764,6 +765,10 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
}

if nodeInputs != nil {
p := c.checkOffloadingCompat(ctx, nCtx, nodeInputs.Literals, node)
if p != handler.PhaseInfoUndefined {
return p, nil
}
inputsFile := v1alpha1.GetInputsFile(dataDir)
if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
Expand All @@ -790,6 +795,34 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
return handler.PhaseInfoNotReady("predecessor node not yet complete"), nil
}

// checkOffloadingCompat checks if the upstream and downstream nodes are compatible with the literal offloading feature
func (c *nodeExecutor) checkOffloadingCompat(ctx context.Context, nCtx interfaces.NodeExecutionContext, inputLiterals map[string]*core.Literal, node v1alpha1.ExecutableNode) handler.PhaseInfo {
isOffloadLiteral := false
for _, val := range inputLiterals {
if val != nil && val.GetOffloadedMetadata() != nil {
isOffloadLiteral = true
break
}
}
switch node.GetKind() {
case v1alpha1.NodeKindTask:
taskID := *node.GetTaskID()
taskNode, err := nCtx.ExecutionContext().GetTask(taskID)
if err != nil {
return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "GetTaskIDFailure", err.Error(), nil)
}
runtimeData := taskNode.CoreTask().GetMetadata().GetRuntime()
if isOffloadLiteral && (c.literalOffloadingConfig.Enabled && !c.literalOffloadingConfig.IsSupportedSDKVersion(runtimeData.GetType().String(), runtimeData.GetVersion())) {
logger.Debugf(ctx, "literal offloading : sdk version check failed for task [%s]", taskID)
return handler.PhaseInfoFailure(core.ExecutionError_USER, "LiteralOffloadingNotSupported", "Literal offloading is not supported for this task", nil)
}
break
default:
logger.Warnf(ctx, "literal offloading : skipping sdk version check for node kind '%s'", node.GetKind())
}
return handler.PhaseInfoUndefined
}

func isTimeoutExpired(queuedAt *metav1.Time, timeout time.Duration) bool {
if !queuedAt.IsZero() && timeout != 0 {
deadline := queuedAt.Add(timeout)
Expand Down Expand Up @@ -1417,7 +1450,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur

func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client,
catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient,
catalogClient catalog.Client, recoveryClient recovery.Client, literalOffloadingConfig config.LiteralOffloadingConfig, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient,
nodeHandlerFactory interfaces.HandlerFactory, scope promutils.Scope) (interfaces.Node, error) {

// TODO we may want to make this configurable.
Expand Down Expand Up @@ -1469,6 +1502,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration,
enqueueWorkflow: enQWorkflow,
eventConfig: eventConfig,
literalOffloadingConfig: literalOffloadingConfig,
interruptibleFailureThreshold: nodeConfig.InterruptibleFailureThreshold,
maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures),
metrics: metrics,
Expand Down

0 comments on commit 114e004

Please sign in to comment.