Skip to content

Commit

Permalink
Add example for file streaming (#1759)
Browse files Browse the repository at this point in the history
Add example for directory streaming

Signed-off-by: DenChenn <aaaa102234@gmail.com>
  • Loading branch information
DenChenn authored Oct 23, 2024
1 parent 9aadec2 commit 135ae99
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions examples/data_types_and_io/data_types_and_io/file_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
import pandas as pd
import os


@task()
def remove_some_rows(ff: FlyteFile) -> FlyteFile:
"""
Remove the rows that the value of city is 'Seattle'.
This is an example with streaming support.
"""
new_file = FlyteFile.new_remote_file("data_without_seattle.csv")
with ff.open("r") as r:
with new_file.open("w") as w:
df = pd.read_csv(r)
df = df[df["City"] != "Seattle"]
df.to_csv(w, index=False)
return new_file


@task
def process_folder(fd: FlyteDirectory) -> FlyteDirectory:
out_fd = FlyteDirectory.new_remote("folder-copy")
for base, x in fd.crawl():
src = str(os.path.join(base, x))
out_file = out_fd.new_file(x)
with FlyteFile(src).open("rb") as f:
with out_file.open("wb") as o:
o.write(f.read())
# The output path will be s3://my-s3-bucket/data/77/<execution-id>-<node-id>-0/folder-copy
return out_fd


@workflow()
def wf():
remove_some_rows(ff=FlyteFile("s3://custom-bucket/data.csv"))
process_folder(fd=FlyteDirectory("s3://my-s3-bucket/folder"))
return


if __name__ == "__main__":
print(f"Running wf() {wf()}")

0 comments on commit 135ae99

Please sign in to comment.