Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added literal offloading for array node map tasks #5697

Merged
merged 9 commits into from
Sep 16, 2024

Conversation

pmahindrakar-oss
Copy link
Contributor

@pmahindrakar-oss pmahindrakar-oss commented Aug 27, 2024

Tracking issue

RFC #5103
This PR tackles the mapped o/p results exceeeding the offloaded min size

Why are the changes needed?

Without these changes we hit grpc limit when propeller tries to create an execution by passing inputs inline which it got from output of large map task

Here you can see the failure where we are hitting the default 4MB grpc message size limit
This is the line where it fails

_, err = a.adminClient.CreateExecution(ctx, req)

[UserError] failed to launch workflow, caused by: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (26624663 vs. 4194304)

See the testing details for the workflow used. In particular this section

def ref_wf(mbs: int):
    processed_results = my_wf(mbs=mbs)  # Promise of List[str]
    big_inputs_wf_lp(input=processed_results) 

Where we have large task output being returned in processed results and is being passed to launchplan and propeller sends these inputs inline and hence if the value is too large then we exceed the grpc limit

What changes were proposed in this pull request?

  • Adds literal offloading config which provides a way to define min and max offloading size for the literal and also minimum supported SDK version
  • After the outputs are gathered from the map tasks and before writing the outputs of the combined literal, we write the data to offloaded location and clear out the literal value and store the offloaded location in the literal and in that way when we create execution from propeller the inputs are small and only contain references to the data and not the actual data.

How was this patch tested?

Using the following workflow which generates ~ 20 MB map output when launched with input =20 for the following workflow ref_wf

Workflow used for testing

import logging
from typing import List
from flytekit import map_task, task, workflow, LaunchPlan

# Set the logging level to DEBUG
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("flytekit")
logger.setLevel(logging.DEBUG)

# Task to generate 1MB of string
@task
def my_1mb_task(i: str) -> str:
    return f"Hello world {i}" * 100 * 1024

# Task to generate a list of strings
@task
def generate_strs(count: int) -> List[str]:
    return ["a"] * count

# Workflow to process the list of strings and return processed results
@workflow
def my_wf(mbs: int) -> List[str]:
    strs = generate_strs(count=mbs)
    return map_task(my_1mb_task)(i=strs)

# Task to handle inputs (with metadata printing)
@task
def noop_with_inputs(input: List[str]):
    # Print metadata about the input
    print("Input Metadata:")
    print(f"Number of items: {len(input)}")
    print(f"Total size: {sum(len(s) for s in input)} bytes")

# Workflow to handle big inputs
@workflow
def big_inputs_wf(input: List[str]):
    noop_with_inputs(input=input)

# LaunchPlan for big inputs workflow
big_inputs_wf_lp = LaunchPlan.get_or_create(name="big_inputs_wf_lp", workflow=big_inputs_wf)

# Main workflow that orchestrates other workflows
@workflow
def ref_wf(mbs: int):
    processed_results = my_wf(mbs=mbs)  # Promise of List[str]
    big_inputs_wf_lp(input=processed_results) 

Before the change receive the following error

[UserError] failed to launch workflow, caused by: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (26624663 vs. 4194304)

After the Change

The workflow succeeds

Following is the subworkflow crd which shows the offloaded uri being used as input when propeller launches and execution

inputs:
  literals:
    input:
      offloadedMetadata:
        inferredType:
          collectionType:
            simple: STRING
        sizeBytes: "26624325"
        uri: s3://my-s3-bucket/metadata/propeller/flytesnacks-development-argv96zc87ljhdz2fmgt/n0/data/0/n1/0/o0_offloaded_metadata.pb

Logs from flytekit reading the offloaded literal and passing this in subworkflow. This uses the flytekit changes from here

flyteorg/flytekit#2685
which reads the offloaded literal if present

Download data to local from s3://my-s3-bucket/metadata/propeller/flytesnacks-development-argv96zc87ljhdz2fmgt/n0/data/0/n1/0/o0_offloaded_metadata.pb. [Time: 0.081294s]", "taskName": null}

Pending : Unit tests

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

codecov bot commented Aug 27, 2024

Codecov Report

Attention: Patch coverage is 80.59701% with 26 lines in your changes missing coverage. Please review.

Project coverage is 36.29%. Comparing base (89efcc6) to head (4a03f91).
Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
...lytepropeller/pkg/controller/nodes/common/utils.go 78.18% 8 Missing and 4 partials ⚠️
flytepropeller/pkg/controller/config/config.go 75.00% 4 Missing ⚠️
...propeller/pkg/apis/flyteworkflow/v1alpha1/iface.go 0.00% 2 Missing ⚠️
...epropeller/pkg/compiler/transformers/k8s/inputs.go 66.66% 2 Missing ⚠️
flytepropeller/pkg/controller/controller.go 0.00% 2 Missing ⚠️
...ytepropeller/pkg/controller/nodes/array/handler.go 71.42% 1 Missing and 1 partial ⚠️
flytepropeller/pkg/controller/nodes/executor.go 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5697      +/-   ##
==========================================
+ Coverage   36.21%   36.29%   +0.07%     
==========================================
  Files        1303     1305       +2     
  Lines      109568   109991     +423     
==========================================
+ Hits        39683    39924     +241     
- Misses      65764    65912     +148     
- Partials     4121     4155      +34     
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.62% <100.00%> (-0.01%) ⬇️
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl 62.21% <ø> (-0.05%) ⬇️
unittests-flyteidl 7.12% <ø> (ø)
unittests-flyteplugins 53.35% <ø> (ø)
unittests-flytepropeller 41.87% <75.92%> (+0.11%) ⬆️
unittests-flytestdlib 55.21% <ø> (-0.15%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@katrogan katrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great to me!

flyteadmin/pkg/manager/impl/validation/validation.go Outdated Show resolved Hide resolved
flyteadmin/pkg/manager/impl/validation/validation.go Outdated Show resolved Hide resolved
flyteidl/protos/flyteidl/core/literals.proto Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High-level question - it looks like we're using semver comparison for the flytekit version for the specific task that is executing. Is this right? For example we have a workflow with and ArrayNode (n0) that writes sends data to a TaskNode (n1). We're checking if the flytekit version of the ArrayNode task for n0 supports offloading. IIUC that task will execute normally, we will transparently offload, and we are really concerned whether the task for n1 supports offloading.

flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
@pmahindrakar-oss
Copy link
Contributor Author

High-level question - it looks like we're using semver comparison for the flytekit version for the specific task that is executing. Is this right? For example we have a workflow with and ArrayNode (n0) that writes sends data to a TaskNode (n1). We're checking if the flytekit version of the ArrayNode task for n0 supports offloading. IIUC that task will execute normally, we will transparently offload, and we are really concerned whether the task for n1 supports offloading.

@hamersaw I agree with this concern and i think we had raised a similar concern during the spec review @katrogan and we had decided to allow things to fail for these heterogeneous scenarios .

Still trying to get to understand the type system and propeller code but If we need to support this what do you think would be needed here.
We could fail fast at execution time if we know this is bound to happen and prevent it from happening ?

@hamersaw
Copy link
Contributor

hamersaw commented Aug 30, 2024

@pmahindrakar-oss

Still trying to get to understand the type system and propeller code but If we need to support this what do you think would be needed here.

This is really difficult, because it's unclear if we want to fail just because propeller supports offloading and the task (or downstream tasks) do not. If the outputs aren't large enough to offload, we would want it to run successfully rather than fail even though it could succeed right?

If we absolutely need to validate the flytekit version of tasks to support offloading the only place that makes sense to me is when propeller begins evaluation (in the handleReadyWorkflow function). We can't do it during registration or compilation because flyteadmin doesn't know if propeller has offloading enabled. In this function we could iterate through all tasks in the WorkflowSpec and validate the flytekit versions the same way you already have. This would have to parse through SubWorkflows and LaunchPlans as well. Of course, this still wouldn't catch issues with dynamics since we do not know what the underlying workflow looks like.

We could fail fast at execution time if we know this is bound to happen and prevent it from happening ?

This would be much easier. In the node executor we copy inputs from previous nodes when transitioning the node from NotYetStarted to Queued here if the inputs are offloaded literals and the node is a TaskNode that does not have a compatible flytekit version we can fail. It might make more sense to do this in the TaskHandler, or PodPlugin? Offloaded literals won't work for things like Spark, Dask, Agents, etc right? I have to admit I wasn't able to read the doc in depth.

@pmahindrakar-oss
Copy link
Contributor Author

pmahindrakar-oss commented Aug 30, 2024

Thanks @hamersaw . Would want to avoid complicated logic if possible and rely on a rollout process to solve it.
Since we are enabling offloading behind a config, there is controlled rollout that is possible .
By default i will keep this disabled and have notes to enable this only when all the workflows/tasks/launchplans have been upgraded to use the new SDK version. This statement though sounds very heavy handed and not sure how many of our users would be willing to do this, but given that this solves a very peculiar problem when the data outputed by the task is large, i am assuming the user has already worked around this by breaking his workflows or increasing the grpc message size and when he is trying to enable this would be aware of the release note actions and hence my point of allowing things to fail for this heterogeneous case .

If we cant rely on this process and we want automated checks then we should think about how we can make this fully work (even dynamic case) for old workflows without failing them

Offloaded literals won't work for things like Spark, Dask, Agents, etc right

yes this is not supported for plugins . I dont think we have even scoped this for plugins in general as we are tackling the map task as the first usecase of this broad problem with large datasets

cc : @katrogan

@katrogan
Copy link
Contributor

katrogan commented Sep 3, 2024

This would be much easier. In the node executor we copy inputs from previous nodes when transitioning the node from NotYetStarted to Queued here if the inputs are offloaded literals and the node is a TaskNode that does not have a compatible flytekit version we can fail. It might make more sense to do this in the TaskHandler, or PodPlugin? Offloaded literals won't work for things like Spark, Dask, Agents, etc right? I have to admit I wasn't able to read the doc in depth.

This sounds like a reasonable, we don't attempt to type check at the onset of execution but can lazily decide to fail if a downstream task appears incapable of consuming an offloaded output but only in the scenario where we have offloaded an output.

Re: plugins: I believe spark, agents and flytekit plugins should continue to work (they use flytekit type transformers to unmarshal literals which is the SDK check we're gating against). For example, spark calls pyflyte execute: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-spark/flytekitplugins/spark/task.py#L221

cc @eapolinario would love for you to double check this

There is follow up work to make copilot data reading work that is slated for after this (thank you @pmahindrakar-oss )

@pmahindrakar-oss
Copy link
Contributor Author

@katrogan @hamersaw tested with new changes which now does version checks in heterogeneous tasks scenario and heres the result
Screenshot 2024-09-05 at 5 09 01 PM

Copy link
Contributor

@katrogan katrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great!

flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/executor.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/executor.go Outdated Show resolved Hide resolved
katrogan
katrogan previously approved these changes Sep 9, 2024
Copy link
Contributor

@katrogan katrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but please wait for @hamersaw's review too

flytepropeller/pkg/controller/nodes/common/utils.go Outdated Show resolved Hide resolved
flytepropeller/pkg/controller/nodes/common/utils.go Outdated Show resolved Hide resolved
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
flytepropeller/pkg/controller/nodes/common/utils.go Outdated Show resolved Hide resolved
phaseInfo = handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "GetTaskIDFailure", err.Error(), nil)
return &phaseInfo
}
runtimeData := taskNode.CoreTask().GetMetadata().GetRuntime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have good context here, but TaskNodes encompass all task executions in Flyte (actors, agents, spark, dask, ray, etc) how is this runtimeData set for each? For example, if we offload into a agent task is there a specific check we need to do there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @eapolinario let me know if this sounds correct to you, would love someone from OSS to verify 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@pmahindrakar-oss pmahindrakar-oss Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users may use flytekit 1.13.5 to serialize the workflow, but use flytekit 1.12 in the container.

Which should be fine , the version checks make sure of the compatibility to use this feature

yes, type transformer is used to ser/de literal, so we should only change the transformer.
Given the above path @katrogan pointed out, i dont see we need to change anything here as the base changes already take care of reading the offloaded values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the more concerning bit is that the runtime metadata doesn't correspond to the container image. is the only way to verify the SDK version using the code to introspect the image then?

I think so. We could probably release flytekit first and ensure most users have upgraded it.

Copy link
Contributor Author

@pmahindrakar-oss pmahindrakar-oss Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We handle the flytekit version check, so IMO it shouldn't block this PR and if folks are ok with the current version of the code then lets check this in unless there are any other concerns. @hamersaw @katrogan @pingsutw . i can add higher version for flytekit in this pr or wait on the release of flytekit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me, I think this is the best check we have for compatibility

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with this.

flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
hamersaw
hamersaw previously approved these changes Sep 16, 2024
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit.

flytepropeller/pkg/controller/nodes/array/handler.go Outdated Show resolved Hide resolved
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
@pmahindrakar-oss pmahindrakar-oss merged commit 7989209 into master Sep 16, 2024
50 of 52 checks passed
@pmahindrakar-oss pmahindrakar-oss deleted the offload-literal-upstream-2 branch September 16, 2024 16:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants