Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

third iteration: docs restructure #1315

Merged
merged 15 commits into from
Dec 15, 2023
Merged
2 changes: 1 addition & 1 deletion docs/getting_started/tasks_and_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ workflow_with_subworkflow(num_samples=10, seed=3)
```

```{important}
Learn more about subworkflows in the {ref}`User Guide <subworkflows>`.
Learn more about subworkflows in the {ref}`User Guide <subworkflow>`.
```

### Specifying Dependencies without Passing Data
Expand Down
9 changes: 2 additions & 7 deletions examples/advanced_composition/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root

# This is necessary for opencv to work
RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg build-essential curl

WORKDIR /root
RUN apt-get update && apt-get install -y build-essential curl

# Virtual environment
ENV VENV /opt/venv
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

# Install Python dependencies
COPY requirements.txt /root
RUN pip install -r /root/requirements.txt
RUN pip install flytekit==1.10.2

# Copy the actual code
COPY . /root
Expand Down
9 changes: 4 additions & 5 deletions examples/advanced_composition/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

# Advanced Composition

This section of the user guide introduces the advanced features of the flytekit Python SDK.
This section of the user guide introduces the advanced features of the Flytekit Python SDK.
These examples cover more complex aspects of Flyte, including conditions, subworkflows,
dynamic workflows, map tasks, gate nodes and more.

```{auto-examples-toc}
conditions
conditional
chain_entities
subworkflows
dynamics
subworkflow
dynamic_workflow
map_task
merge_sort
eager_workflows
decorating_tasks
decorating_workflows
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
# %% [markdown]
# (chain_flyte_entities)=
#
# # Chain Flyte Entities
# # Chaining Flyte Entities
#
# ```{eval-rst}
# .. tags:: Basic
# ```
#
# flytekit provides a mechanism to chain Flyte entities using the `>>` operator.
# Flytekit offers a mechanism for chaining Flyte entities using the `>>` operator.
# This is particularly valuable when chaining tasks and subworkflows without the need for data flow between the entities.
#
# ## Tasks
#
# Let's enforce an order for `t1()` to happen after `t0()`, and for `t2()` to happen after `t1()`.
#
# Import the necessary dependencies.
# Let's establish a sequence where `t1()` occurs after `t0()`, and `t2()` follows `t1()`.
# %%
from flytekit import task, workflow


@task
def t2():
pass
print("Running t2")
return


@task
def t1():
pass
print("Running t1")
return


@task
def t0():
pass
print("Running t0")
return


# %% [markdown]
# We want to enforce an order here: `t0()` followed by `t1()` followed by `t2()`.
# %%
@workflow
def chain_tasks_wf():
t2_promise = t2()
Expand All @@ -47,9 +46,10 @@ def chain_tasks_wf():


# %% [markdown]
# ## Chain SubWorkflows
# (chain_subworkflow)=
# ## Sub workflows
#
# Similar to tasks, you can chain {ref}`subworkflows <subworkflows>`.
# Just like tasks, you can chain {ref}`subworkflows <subworkflow>`.
# %%
@workflow
def sub_workflow_1():
Expand All @@ -61,9 +61,6 @@ def sub_workflow_0():
t0()


# %% [markdown]
# Use `>>` to chain the subworkflows.
# %%
@workflow
def chain_workflows_wf():
sub_wf1 = sub_workflow_1()
Expand All @@ -73,10 +70,21 @@ def chain_workflows_wf():


# %% [markdown]
# Run the workflows locally.
# To run the provided workflows on the Flyte cluster, use the following commands:
#
# %%
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running chain_tasks_wf()... {chain_tasks_wf()}")
print(f"Running chain_workflows_wf()... {chain_workflows_wf()}")
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/chain_entities.py \
# chain_tasks_wf
Comment on lines +75 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's figure out a way to templatize this at some point (not in this PR)

# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/chain_entities.py \
# chain_workflows_wf
# ```
#
# :::{note}
# Chaining tasks and subworkflows is not supported in local environments.
# Follow the progress of this issue [here](https://github.com/flyteorg/flyte/issues/4080).
# :::
117 changes: 63 additions & 54 deletions examples/advanced_composition/advanced_composition/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,48 @@
# .. tags:: MachineLearning, Intermediate
# ```
#
# :::{note}
# This feature is available from Flytekit version 0.30.0b6+ and needs a Flyte backend version of at least 0.19.0+.
# :::
# A checkpoint in Flyte serves to recover a task from a previous failure by preserving the task's state before the failure
# and resuming from the latest recorded state.
#
# A checkpoint recovers a task from a previous failure by recording the state of a task before the failure and
# resuming from the latest recorded state.
# ## Why intra-task checkpoints?
#
# ## Why Intra-task Checkpoints?
# The inherent design of Flyte, being a workflow engine, allows users to break down operations, programs or ideas
# into smaller tasks within workflows. In the event of a task failure, the workflow doesn't need to rerun the
# previously completed tasks. Instead, it can retry the specific task that encountered an issue.
# Once the problematic task succeeds, it won't be rerun. Consequently, the natural boundaries between tasks act as implicit checkpoints.
#
# Flyte, at its core, is a workflow engine. Workflows provide a way to break up an operation/program/idea
# logically into smaller tasks. If a task fails, the workflow does not need to run the previously completed tasks. It can
# simply retry the task that failed. Eventually, when the task succeeds, it will not run again. Thus, task boundaries
# naturally serve as checkpoints.
# However, there are scenarios where breaking a task into smaller tasks is either challenging or undesirable due to the associated overhead.
# This is especially true when running a substantial computation in a tight loop.
# In such cases, users may consider splitting each loop iteration into individual tasks using dynamic workflows.
# Yet, the overhead of spawning new tasks, recording intermediate results, and reconstructing the state can incur additional expenses.
#
# There are cases where it is not easy or desirable to break a task into smaller tasks, because running a task
# adds to the overhead. This is true when running a large computation in a tight-loop. In such cases, users can
# split each loop iteration into its own task using {ref}`dynamic workflows <Dynamic Workflows>`, but the overhead of spawning new tasks, recording
# intermediate results, and reconstituting the state can be expensive.
# ### Use case: Model training
#
# ### Model-training Use Case
# An exemplary scenario illustrating the utility of intra-task checkpointing is during model training.
# In situations where executing multiple epochs or iterations with the same dataset might be time-consuming,
# setting task boundaries can incur a high bootstrap time and be costly.
#
# An example of this case is model training. Running multiple epochs or different iterations with the same
# dataset can take a long time, but the bootstrap time may be high and creating task boundaries can be expensive.
# Flyte addresses this challenge by providing a mechanism to checkpoint progress within a task execution,
# saving it as a file or set of files. In the event of a failure, the checkpoint file can be re-read to
# resume most of the state without rerunning the entire task.
# This feature opens up possibilities to leverage alternate, more cost-effective compute systems,
# such as [AWS spot instances](https://aws.amazon.com/ec2/spot/),
# [GCP pre-emptible instances](https://cloud.google.com/compute/docs/instances/preemptible) and others.
#
# To tackle this, Flyte offers a way to checkpoint progress within a task execution as a file or a set of files. These
# checkpoints can be written synchronously or asynchronously. In case of failure, the checkpoint file can be re-read to resume
# most of the state without re-running the entire task. This opens up the opportunity to use alternate compute systems with
# lower guarantees like [AWS Spot Instances](https://aws.amazon.com/ec2/spot/), [GCP Pre-emptible Instances](https://cloud.google.com/compute/docs/instances/preemptible), etc.
# These instances offer great performance at significantly lower price points compared to their on-demand or reserved counterparts.
# This becomes feasible when tasks are constructed in a fault-tolerant manner.
# For tasks running within a short duration, e.g., less than 10 minutes, the likelihood of failure is negligible,
# and task-boundary-based recovery provides substantial fault tolerance for successful completion.
#
# These instances offer great performance at much lower price-points as compared to their on-demand or reserved alternatives.
# This is possible if you construct the tasks in a fault-tolerant manner. In most cases, when the task runs for a short duration,
# e.g., less than 10 minutes, the potential of failure is insignificant and task-boundary-based recovery offers
# significant fault-tolerance to ensure successful completion.
# However, as the task execution time increases, the cost of re-running it also increases,
# reducing the chances of successful completion. This is precisely where Flyte's intra-task checkpointing proves to be highly beneficial.
#
# But as the time for a task increases, the cost of re-running it increases, and reduces the chances of successful
# completion. This is where Flyte's intra-task checkpointing truly shines.
# Here's an example illustrating how to develop tasks that leverage intra-task checkpointing.
# It's important to note that Flyte currently offers the low-level API for checkpointing.
# Future integrations aim to incorporate higher-level checkpointing APIs from popular training frameworks
# like Keras, PyTorch, Scikit-learn, and big-data frameworks such as Spark and Flink, enhancing their fault-tolerance capabilities.
#
# Let's look at an example of how to develop tasks which utilize intra-task checkpointing. It only provides the low-level API, though. We intend to integrate
# higher-level checkpointing APIs available in popular training frameworks like Keras, Pytorch, Scikit-learn, and
# big-data frameworks like Spark and Flink to supercharge their fault-tolerance.

# To begin, import the necessary libraries and set the number of task retries to `3`.
# %%
from flytekit import current_context, task, workflow
from flytekit.exceptions.user import FlyteRecoverableException
Expand All @@ -54,50 +55,58 @@


# %% [markdown]
# This task shows how checkpoints can help resume execution in case of a failure. This is an example task and shows the API for
# the checkpointer. The checkpoint system exposes other APIs. For a detailed understanding, refer to the [checkpointer code](https://github.com/flyteorg/flytekit/blob/master/flytekit/core/checkpointer.py).
#
# The goal of this method is to loop for exactly n_iterations, checkpointing state and recovering from simualted failures.
# We define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures.
# %%
@task(retries=RETRIES)
def use_checkpoint(n_iterations: int) -> int:
cp = current_context().checkpoint
prev = cp.read()

start = 0
if prev:
start = int(prev.decode())

# create a failure interval so we can create failures for across 'n' iterations and then succeed after
# configured retries
# Create a failure interval to simulate failures across 'n' iterations and then succeed after configured retries
failure_interval = n_iterations // RETRIES
i = 0
for i in range(start, n_iterations):
# simulate a deterministic failure, for demonstration. We want to show how it eventually completes within
# the given retries
if i > start and i % failure_interval == 0:
raise FlyteRecoverableException(f"Failed at iteration {i}, failure_interval {failure_interval}")
# save progress state. It is also entirely possible save state every few intervals.
cp.write(f"{i + 1}".encode())

return i
index = 0
for index in range(start, n_iterations):
# Simulate a deterministic failure for demonstration. Showcasing how it eventually completes within the given retries
if index > start and index % failure_interval == 0:
raise FlyteRecoverableException(f"Failed at iteration {index}, failure_interval {failure_interval}.")
# Save progress state. It is also entirely possible to save state every few intervals
cp.write(f"{index + 1}".encode())
return index


# %% [markdown]
# The workflow here simply calls the task. The task itself
# will be retried for the {ref}`FlyteRecoverableException <flytekit:exception_handling>`.
# The checkpoint system offers additional APIs, documented in the code accessible at
# [checkpointer code](https://github.com/flyteorg/flytekit/blob/master/flytekit/core/checkpointer.py).
#
# Create a workflow that invokes the task.
# The task will automatically undergo retries in the event of a {ref}`FlyteRecoverableException <flytekit:exception_handling>`.
# %%
@workflow
def example(n_iterations: int) -> int:
def checkpointing_example(n_iterations: int) -> int:
return use_checkpoint(n_iterations=n_iterations)


# %% [markdown]
# The checkpoint is stored locally, but it is not used since retries are not supported.
#
# The local checkpoint is not utilized here because retries are not supported.
# %%
if __name__ == "__main__":
try:
example(n_iterations=10)
checkpointing_example(n_iterations=10)
except RuntimeError as e: # noqa : F841
# no retries are performed, so an exception is expected when run locally.
# Since no retries are performed, an exception is expected when run locally
pass

# %% [markdown]
# ## Run the example on the Flyte cluster
#
# To run the provided workflow on the Flyte cluster, use the following command:
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/checkpoint.py \
# checkpointing_example --n_iterations 10
# ```
Loading
Loading