diff --git a/flyteadmin/pkg/repositories/config/migrations.go b/flyteadmin/pkg/repositories/config/migrations.go index 27da97f29c..d48356fe01 100644 --- a/flyteadmin/pkg/repositories/config/migrations.go +++ b/flyteadmin/pkg/repositories/config/migrations.go @@ -1263,20 +1263,28 @@ var ContinuedMigrations = []*gormigrate.Migration{ return tx.Migrator().DropTable("execution_tags") }, }, - { ID: "2024-06-06-drop-execution_admin-tags", Migrate: func(tx *gorm.DB) error { return tx.Migrator().DropTable("execution_admin_tags") }, }, - { ID: "2024-06-06-drop-admin-tags", Migrate: func(tx *gorm.DB) error { return tx.Migrator().DropTable("admin_tags") }, }, + { + ID: "2024-08-08-remove-input-uri-for-start-nodes", + Migrate: func(db *gorm.DB) error { + return db.Exec("UPDATE node_executions SET input_uri = '' WHERE node_id = 'start-node'").Error + }, + Rollback: func(db *gorm.DB) error { + // can't rollback missing data + return nil + }, + }, } var m = append(LegacyMigrations, NoopMigrations...) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 2c3103e4ad..b25ad64fb6 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -1248,10 +1248,17 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter targetEntity := common.GetTargetEntity(ctx, nCtx) - nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), - p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(), - nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase, - c.eventConfig, targetEntity) + nev, err := ToNodeExecutionEvent( + nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + p, + nCtx.InputReader().GetInputPath().String(), + nCtx.NodeStatus(), + nCtx.ExecutionContext().GetEventVersion(), + nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), + c.clusterID, + nCtx.NodeStateReader().GetDynamicNodeState().Phase, + c.eventConfig, + targetEntity) if err != nil { return interfaces.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event") } diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 7fc4c05992..35ab105623 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -1723,6 +1723,7 @@ func TestNodeExecutor_FinalizeHandler(t *testing.T) { assert.NoError(t, exec.FinalizeHandler(ctx, nil, nil, nl, n)) }) } + func TestNodeExecutionEventStartNode(t *testing.T) { execID := &core.WorkflowExecutionIdentifier{ Name: "e1", @@ -1763,9 +1764,11 @@ func TestNodeExecutionEventStartNode(t *testing.T) { ns.OnGetParentTaskID().Return(tID) ns.OnGetOutputDirMatch(mock.Anything).Return("dummy://dummyOutUrl") ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) + ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyReference, }, subWfID) + assert.NoError(t, err) assert.Equal(t, "start-node", ev.Id.NodeId) assert.Equal(t, execID, ev.Id.ExecutionId) @@ -1778,6 +1781,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) { ev.OutputResult.(*event.NodeExecutionEvent_OutputUri).OutputUri) assert.Equal(t, ev.ProducerId, testClusterID) assert.Equal(t, subWfID, ev.GetTargetEntity()) + assert.Nil(t, ev.InputValue) } func TestNodeExecutionEventV0(t *testing.T) { @@ -1821,6 +1825,7 @@ func TestNodeExecutionEventV0(t *testing.T) { assert.Empty(t, ev.NodeName) assert.Empty(t, ev.RetryGroup) assert.Empty(t, ev.TargetEntity) + assert.Equal(t, "reference", ev.GetInputUri()) } func TestNodeExecutionEventV1(t *testing.T) { @@ -1859,9 +1864,11 @@ func TestNodeExecutionEventV1(t *testing.T) { ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted) nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns) ns.OnGetParentTaskID().Return(tID) + eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyInline, }, nil) + assert.NoError(t, err) assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId) assert.Equal(t, execID, eventOpt.Id.ExecutionId) @@ -1875,6 +1882,7 @@ func TestNodeExecutionEventV1(t *testing.T) { assert.Equal(t, "2", eventOpt.RetryGroup) assert.True(t, proto.Equal(eventOpt.GetInputData(), inputs)) assert.Empty(t, eventOpt.TargetEntity) + assert.Equal(t, inputs, eventOpt.GetInputData()) } func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index c9f7d5fc76..a252d17344 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -76,7 +76,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase { } } -func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, +func ToNodeExecutionEvent( + nodeExecID *core.NodeExecutionIdentifier, info handler.PhaseInfo, inputPath string, status v1alpha1.ExecutableNodeStatus, @@ -109,9 +110,11 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, dynamicChain = true } + eInfo := info.GetInfo() var nev *event.NodeExecutionEvent - // Start node is special case where the Inputs and Outputs are the same and hence here we copy the Output file + // Start node is special case where the Outputs are the same and hence here we copy the Output file // into the OutputResult and in admin we copy it over into input as well. + // Start node doesn't have inputs. if nodeExecID.NodeId == v1alpha1.StartNodeID { outputsFile := v1alpha1.GetOutputsFile(status.GetOutputDir()) nev = &event.NodeExecutionEvent{ @@ -139,6 +142,17 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, TargetEntity: targetEntity, IsInDynamicChain: dynamicChain, } + if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline { + if eInfo != nil { + nev.InputValue = &event.NodeExecutionEvent_InputData{ + InputData: eInfo.Inputs, + } + } + } else { + nev.InputValue = &event.NodeExecutionEvent_InputUri{ + InputUri: inputPath, + } + } } if eventVersion == v1alpha1.EventVersion0 && status.GetParentTaskID() != nil { @@ -163,7 +177,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, nev.NodeName = node.GetName() } - eInfo := info.GetInfo() if eInfo != nil { if eInfo.WorkflowNodeInfo != nil { v := ToNodeExecWorkflowNodeMetadata(eInfo.WorkflowNodeInfo) @@ -201,17 +214,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, nev.IsParent = true } } - if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline { - if eInfo != nil { - nev.InputValue = &event.NodeExecutionEvent_InputData{ - InputData: eInfo.Inputs, - } - } - } else { - nev.InputValue = &event.NodeExecutionEvent_InputUri{ - InputUri: inputPath, - } - } return nev, nil }