Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] [SNOW-1491199] merge phase0 server side #2136

Draft
wants to merge 123 commits into
base: main
Choose a base branch
from

Conversation

sfc-gh-lspiegelberg
Copy link
Contributor

@sfc-gh-lspiegelberg sfc-gh-lspiegelberg commented Aug 21, 2024

Captures work for server-side snowpark phase0.

sfc-gh-azwiegincew and others added 30 commits May 3, 2024 15:17
…: table(), filter() (#1468)

A very basic initial attempt at serializing the AST.

I'm trying to maintain a parallel codebase for phases 0 and 1 for now,
since it would be a shame to do this work twice. Once we complete and
ship phase 0, we'll be able to drastically simplify the phase 1 client.

Unlike what I mentioned before, this implementation doesn't flush
dependencies of eagerly evaluated expressions. Instead, any client-side
value is appended to the pending batch. This is simpler to implement and
will likely work well, although we may need to do some dependency
analysis on the server to ensure we don't issue unnecessary queries.
Updates our server branch with recent snowpark changes.
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-0

2. Fill out the following pre-review checklist:

- [ ] I am adding a new automated test(s) to verify correctness of my
new code
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [x] I am adding a new dependency

3. Please describe how your code solves the related issue.

Update `ast_pb2.py` (already present in the repository).
Add the `setuptools` dependencies required for development.
Include the module path for `ast_pb2.py` in the manifest, so that the
file makes it into the Snowpark wheel.
Run `update-from-devvm.sh` from within `src/snowflake/snowpark/_internal/proto/` with a running local devvm to update the proto file on the thin client.
….py (#1766)

Modifies `setup.py` to use the latest HEAD of https://github.com/snowflakedb/snowflake-connector-python/tree/server-side-snowpark which includes connector changes (most notable adding the `_dataframe_ast` field for phase 0).

To update your local dev environment run
```
pip uninstall snowflake-connector-python -y
python -m pip install --no-cache -e ".[development,pandas]"
```
Running the pip command should show `git clone` in the logs.
…frameAST field. (#1794)

Vendors snowflake vcrpy from https://github.com/Snowflake-Labs/snowflake-vcrpy (could not get install working, therefore vendoring it) with custom Snowflake changes to track requests in vendored urllib3 within the snowflake python connector.

Adds decorator `check_ast_encode_invoked` (applied with `autouse=True` to all tests) which checks that every query send contains `dataframeAst` property for phase 0, and errors out together with traceback information whenever tests need to be fixed / APIs are missing that need to be encoded within the AST.
… with Python 3.8 (#1796)

Remove temporarily Modin tests as Modin is incompatible with Python 3.8.
Exclude protobuf file in precommit (flake8), as protobuf does not adhere to good coding standards and a protobuf update will consequently fail precommit.
…ad of sbt (#1811)

Convenience script `update-unparser.sh` uses sbt build currently. With introduction of the bazel scala targets dependencies (i.e., IR changes) switch to using them to create an updates unparser.sh as IR/protobuf updates will be reflected.
sfc-gh-evandenberg and others added 6 commits October 23, 2024 11:11
…bility and readability (#2464)

Fixes SNOW-1738538 Update test expectations to textproto ast for better
stability and readability
Fixes doctest for phase0 with `--ast-enabled` by switching the runner to
macOS and fixing up various `logging.<func>` usages by switching them to
`_logger` as in the rest of the code base. Doctest do not pass at the
moment (error: returning <BLANKLINE>, interference with logging module,
cf. JIRA) under ubuntu (windows: not tested).

Other:
- warn for missing unparser.jar file now only if `--update-expectations`
is passed.
- Use file-specific logger instead of global loggers.
…st ast (#2497)

Fix multi ast eval validation in decoding expectation test ast
…st_dataframe.py (#2498)

Fixes several tests to reduce test failures for the merge-gate with
`session.ast_enabled=True`. With this PR all tests except for the
interval related ones pass for `test_dataframe.py`.

Details:
- Adds `ast_id` to shallow copy protocol for Dataframe.
- Modify APIs so checks happen before AST encoding to pass negative
tests.
- Add missing `ast_stmt` to `select(table_function(...))`
- Fix filter test.
- Fix overflow issue when using `datetime.time` due to timezone
encoding.
- Support array type (array.py from stdlib) by mapping to list.
- Fix na functions to carry out checks for `subset` parameter.
- Explicitly type check for `col` and `column` as expected in existing
tests.
- Add `NotImplementedError` for interval type in `make_interval`.
- Fix wrong usage of Expression in `function.col(...)`, use `Column`
instead in `apply_in_pandas`.
Addresses PR feedback for dataframe.py file.
Comment on lines +89 to 91
@publicapi
def mode(self, save_mode: str, _emit_ast: bool = True) -> "DataFrameWriter":
"""Set the save mode of this :class:`DataFrameWriter`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, some more public APIs were added - .partition_by, option, options

@@ -829,6 +964,9 @@ def create_map(*cols: Union[ColumnOrName, Iterable[ColumnOrName]]) -> Column:
-----------------------
<BLANKLINE>
"""

Copy link
Contributor

@sfc-gh-aalam sfc-gh-aalam Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type hint is correct although supporting we should probably support Tuple instead of Set. It allows the following

create_map(col_key, col_value)
create_map([col_key, col_value])
create_map((col_key, col_value))
create_map(col_key1, col_value1, col_key2, col_value2)

Comment on lines +980 to +981
if isinstance(cols, set):
cols = tuple(sorted(list(cols)))
Copy link
Contributor

@sfc-gh-aalam sfc-gh-aalam Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not do this. This will break the behavior

def create_map(*cols: Union[ColumnOrName, Iterable[ColumnOrName]]) -> Column:
@publicapi
def create_map(
*cols: Union[ColumnOrName, List[ColumnOrName], Set[ColumnOrName]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*cols: Union[ColumnOrName, List[ColumnOrName], Set[ColumnOrName]],
*cols: Union[ColumnOrName, List[ColumnOrName], Tuple[ColumnOrName]],

Comment on lines +2835 to +2844
b = (
lit(base, _emit_ast=_emit_ast)
if isinstance(base, (int, float))
else _to_col_if_str(base, "log")
)
arg = (
lit(x, _emit_ast=_emit_ast)
if isinstance(x, (int, float))
else _to_col_if_str(x, "log")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't _emit_ast=False in all these cases?

Comment on lines +2868 to +2875
lit(left, _emit_ast=_emit_ast)
if isinstance(left, (int, float))
else _to_col_if_str(left, "pow")
)
power = (
lit(right) if isinstance(right, (int, float)) else _to_col_if_str(right, "pow")
lit(right, _emit_ast=_emit_ast)
if isinstance(right, (int, float))
else _to_col_if_str(right, "pow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_emit_ast=False?

Comment on lines +3019 to 3023
p = pos if isinstance(pos, Column) else lit(pos, _emit_ast=_emit_ast)
length = len if isinstance(len, Column) else lit(len, _emit_ast=_emit_ast)
return builtin("substring", _emit_ast=_emit_ast)(s, p, length)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment

Comment on lines +3068 to +3070
strtok_array = builtin("strtok_to_array", _emit_ast=False)(s, delim)
c = builtin("array_to_string", _emit_ast=False)(
builtin("array_slice", _emit_ast=False)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more of the same

@@ -2713,11 +3113,15 @@ def regexp_count(
pos = lit(position)

params = [lit(p) for p in parameters]
return builtin(sql_func_name)(sub, pat, pos, *params)
return builtin(sql_func_name, _emit_ast=_emit_ast)(sub, pat, pos, *params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

…ner (#2524)

Reduces CI merge gate failures for AST enabled gate. Temp auto cleaner
is buggy together with AST enabled, for tests like
test_dataframe_copy_into tables are being deleted prematurely. Disabling
for now setting auto_temp_cleaner whenever `ast_enabled` is true in
session. The temp auto cleaner should be moved server-side anyways.

This reduces phase0 ast_enabled test failures from 95 to 39 on our merge
gate.

session = snowflake.snowpark.session._get_sandbox_conditional_active_session(
session
# Same as udtf, but pandas_udtf has kwargs input_names, max_batch_size in addition to plain udtf.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks


class QueryListener:
@abstractmethod
def _notify(self, query_record: QueryRecord, *args, **kwargs) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args is not really used now. Is it only done for future proofing? Do you foresee args to be used anytime soon?

Comment on lines +122 to +127
self._ast = with_src_position(proto.SpGroupingSets())
set_list, self._ast.sets.variadic = parse_positional_args_to_list_variadic(
*sets
)
for s in set_list:
build_expr_from_python_val(self._ast.sets.args.add(), s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be param protected?

@@ -450,79 +529,150 @@ def pivot(
------------------------------
<BLANKLINE>
"""
self._df, pc, pivot_values, default_on_null = prepare_pivot_arguments(

self._df, pc, pivot_values, pivot_default_on_null = prepare_pivot_arguments(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we made an earlier effort to replace self._df -> self._dataframe? Shall we do it here as well?

@@ -68,6 +74,9 @@ def __init__(
execute_as: typing.Literal["caller", "owner"] = "owner",
anonymous_sp_sql: Optional[str] = None,
packages: Optional[List[Union[str, ModuleType]]] = None,
_ast: Optional[proto.StoredProcedure] = None,
_ast_id: Optional[int] = None,
_stmt: Optional[proto.Assign] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called _ast_stmt as done it some other places? Or is it different?

@@ -408,7 +445,7 @@ class StoredProcedureRegistration:
-------------
<BLANKLINE>

Example 9
Example 11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you. I've been meaning to open a small PR to fix this but have been too lazy to do so.

)


class StoredProcedureProfiler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might need to cover this file as well?

Comment on lines -118 to +130
self._clause = UpdateMergeExpression(
self._condition_expr,
{Column(k)._expression: Column._to_expr(v) for k, v in assignments.items()},
)
self._clause = UpdateMergeExpression(self._condition_expr, assignments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand these changes. Are these unrelated to ast changes?

Comment on lines -225 to +242
keys = [Column(k)._expression for k in assignments.keys()]
values = [Column._to_expr(v) for v in assignments.values()]
keys = list(assignments.keys())
values = list(assignments.values())
else:
keys = []
values = [Column._to_expr(v) for v in assignments]
values = list(assignments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here.

Comment on lines +19 to +21
# TODO: connector installed_pandas is broken. If pyarrow is not installed, but pandas is this function returns the wrong answer.
# The core issue is that in the connector detection of both pandas/arrow are mixed, which is wrong.
# from snowflake.connector.options import installed_pandas, pandas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good observation. I'll create a JIRA to replace from snowflake.connector.options import installed_pandas -> from snowflake.snowpark._internal.utils import installed_pandas

@@ -41,12 +48,26 @@ def __repr__(self):
def is_primitive(self):
return True

def _fill_ast(self, ast: proto.SpDataType) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that this would not raise module not found error since we are packaging proto.SpDataType with snowpark now. I mentioned this to @sfc-gh-lspiegelberg offline but we should add a merge-gate without proto and tzlocal installed to ensure when AST is disabled, tests work correctly. This can be added to daily precommit similar to https://github.com/snowflakedb/snowpark-python/actions/runs/11609891507/job/32328205853

Comment on lines +272 to +273
if isinstance(start, WindowRelativePosition):
if start == WindowRelativePosition.CURRENT_ROW:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-jdu can you review this part. I know you added additional window features last quarter.

Comment on lines +2237 to +2255
if isinstance(self._conn, MockServerConnection):
if self._conn._suppress_not_implemented_error:

# TODO: Snowpark does not allow empty dataframes (no schema, no data). Have a dummy schema here.
ans = self.createDataFrame(
[],
schema=StructType([StructField("row", IntegerType())]),
_emit_ast=False,
)
if _emit_ast:
ans._ast_id = stmt.var_id.bitfield1
return ans
else:
# TODO: Implement table_function properly in local testing mode.
# self._conn.log_not_supported_error(
# external_feature_name="Session.table_function",
# raise_error=NotImplementedError,
# )
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what is going on here. Can someone elaborate on this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DO-NOT-MERGE local testing Local Testing issues/PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.