From 7a2a05026c29051d466083aeb4b970f2e6be52b6 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 31 Oct 2024 10:27:27 -0700 Subject: [PATCH 1/5] SNOW-859943: Add basic support for functions.window --- docs/source/snowpark/functions.rst | 1 + src/snowflake/snowpark/_internal/utils.py | 60 +++++++++++++ src/snowflake/snowpark/column.py | 5 +- src/snowflake/snowpark/functions.py | 85 +++++++++++++++++++ src/snowflake/snowpark/mock/_functions.py | 2 +- src/snowflake/snowpark/mock/_util.py | 56 ------------ .../snowpark/relational_grouped_dataframe.py | 13 ++- tests/integ/test_column_names.py | 8 ++ tests/integ/test_df_aggregate.py | 22 +++++ 9 files changed, 190 insertions(+), 62 deletions(-) diff --git a/docs/source/snowpark/functions.rst b/docs/source/snowpark/functions.rst index e9dfb7ce1ac..f2c6e9f258d 100644 --- a/docs/source/snowpark/functions.rst +++ b/docs/source/snowpark/functions.rst @@ -321,5 +321,6 @@ Functions when when_matched when_not_matched + window xmlget year diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index b2c5ce0f753..b771566fb05 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -224,6 +224,66 @@ class TempObjectType(Enum): CTE = "CTE" +# More info about all allowed aliases here: +# https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts + +DATETIME_PART_TO_ALIASES = { + "year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"}, + "quarter": {"quarter", "q", "qtr", "qtrs", "quarters"}, + "month": {"month", "mm", "mon", "mons", "months"}, + "week": {"week", "w", "wk", "weekofyear", "woy", "wy"}, + "day": {"day", "d", "dd", "days", "dayofmonth"}, + "hour": {"hour", "h", "hh", "hr", "hours", "hrs"}, + "minute": {"minute", "m", "mi", "min", "minutes", "mins"}, + "second": {"second", "s", "sec", "seconds", "secs"}, + "millisecond": {"millisecond", "ms", "msec", "milliseconds"}, + "microsecond": {"microsecond", "us", "usec", "microseconds"}, + "nanosecond": { + "nanosecond", + "ns", + "nsec", + "nanosec", + "nsecond", + "nanoseconds", + "nanosecs", + "nseconds", + }, + "dayofweek": {"dayofweek", "weekday", "dow", "dw"}, + "dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"}, + "dayofyear": {"dayofyear", "yearday", "doy", "dy"}, + "weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"}, + "yearofweek": {"yearofweek"}, + "yearofweekiso": {"yearofweekiso"}, + "epoch_second": {"epoch_second", "epoch", "epoch_seconds"}, + "epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"}, + "epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"}, + "epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"}, + "timezone_hour": {"timezone_hour", "tzh"}, + "timezone_minute": {"timezone_minute", "tzm"}, +} + +DATETIME_PARTS = set(DATETIME_PART_TO_ALIASES.keys()) +ALIASES_TO_DATETIME_PART = { + v: k for k, l in DATETIME_PART_TO_ALIASES.items() for v in l +} +DATETIME_ALIASES = set(ALIASES_TO_DATETIME_PART.keys()) + + +def unalias_datetime_part(part): + lowered_part = part.lower() + if lowered_part in DATETIME_ALIASES: + return ALIASES_TO_DATETIME_PART[lowered_part] + else: + raise ValueError(f"{part} is not a recognized date or time part.") + + +def parse_duration_string(duration: str) -> tuple[int, str]: + length, unit = duration.split(" ") + length = int(length) + unit = unalias_datetime_part(unit) + return length, unit + + def validate_object_name(name: str): if not SNOWFLAKE_OBJECT_RE_PATTERN.match(name): raise SnowparkClientExceptionMessages.GENERAL_INVALID_OBJECT_NAME(name) diff --git a/src/snowflake/snowpark/column.py b/src/snowflake/snowpark/column.py index d45835a7ff9..c3716a1ae72 100644 --- a/src/snowflake/snowpark/column.py +++ b/src/snowflake/snowpark/column.py @@ -668,7 +668,10 @@ def alias(self, alias: str) -> "Column": def name(self, alias: str) -> "Column": """Returns a new renamed Column.""" - return Column(Alias(self._expression, quote_name(alias))) + expr = self._expression + if isinstance(expr, Alias): + expr = expr.child + return Column(Alias(expr, quote_name(alias))) def over(self, window: Optional[WindowSpec] = None) -> "Column": """ diff --git a/src/snowflake/snowpark/functions.py b/src/snowflake/snowpark/functions.py index db98cac2c5e..809c61147d8 100644 --- a/src/snowflake/snowpark/functions.py +++ b/src/snowflake/snowpark/functions.py @@ -191,6 +191,7 @@ ) from snowflake.snowpark._internal.udf_utils import check_decorator_args from snowflake.snowpark._internal.utils import ( + parse_duration_string, parse_positional_args_to_list, validate_object_name, ) @@ -4594,6 +4595,90 @@ def dayofyear(e: ColumnOrName) -> Column: return builtin("dayofyear")(c) +def window( + timeColumn: ColumnOrName, + windowDuration: str, + slideDuration: Optional[str] = None, + startTime: Optional[str] = None, +) -> Column: + """ + Converts a time column into a window object with start and end times. Window start times areinclusive + inclusive while end times are exclusive. For example 9:30 is in the window [9:30, 10:00), but not [9:00, 9:30). + + Args: + timeColumn: The column to apply the window transformation to. + windowDuration: An interval string that determines the length of each window. + slideDuration: An interval string representing the amount of time in-between the start of + each window. Note that this parameter is not supported yet. Specifying it will raise a + NotImplementedError exception. + startTime: An interval string representing the amount of time the start of each window is + offset. eg. a five minute window with startTime of '2 minutes' will be from [9:02, 9:07) + instead of [9:00, 9:05) + + Note: + Interval strings are of the form 'quantity unit' where quantity is an integer and unitis + is a supported time unit. This function supports the same time units as dateadd. see + `supported time units `_ + for more information. + + Example:: + + >>> import datetime + >>> from snowflake.snowpark.functions import window + >>> df = session.createDataFrame( + ... [(datetime.datetime.strptime("2024-10-31 09:05:00.000", "%Y-%m-%d %H:%M:%S.%f"),)], + ... schema=["time"] + ... ) + >>> df.select(window(df.time, "5 minutes")).show() + ---------------------------------------- + |"WINDOW" | + ---------------------------------------- + |{ | + | "end": "2024-10-31 09:10:00.000", | + | "start": "2024-10-31 09:05:00.000" | + |} | + ---------------------------------------- + + >>> df.select(window(df.time, "5 minutes", startTime="2 minutes")).show() + ---------------------------------------- + |"WINDOW" | + ---------------------------------------- + |{ | + | "end": "2024-10-31 09:07:00.000", | + | "start": "2024-10-31 09:02:00.000" | + |} | + ---------------------------------------- + + """ + if slideDuration: + # SNOW- : slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That + # currently would require a udtf which may have significantly different performance. + raise NotImplementedError( + "snowflake.snowpark.functions.window does not support slideDuration parameter yet." + ) + + epoch = lit("1970-01-01 00:00:00").cast(TimestampType()) + time = _to_col_if_str(timeColumn, "window") + + window_duration, window_unit = parse_duration_string(windowDuration) + window_duration = lit(window_duration) + window_unit = f"{window_unit}s" + + base = epoch + if startTime: + start_duration, start_unit = parse_duration_string(startTime) + base += make_interval(**{f"{start_unit}s": start_duration}) + + window = floor(datediff(window_unit, base, time) / window_duration) + window_start = dateadd(window_unit, window * window_duration, base) + return object_construct_keep_null( + lit("start"), + window_start, + lit("end"), + dateadd(window_unit, window_duration, window_start), + ).alias("window") + + def is_array(col: ColumnOrName) -> Column: """ Returns true if the specified VARIANT column contains an ARRAY value. diff --git a/src/snowflake/snowpark/mock/_functions.py b/src/snowflake/snowpark/mock/_functions.py index ef469cec917..cfc50314a50 100644 --- a/src/snowflake/snowpark/mock/_functions.py +++ b/src/snowflake/snowpark/mock/_functions.py @@ -21,6 +21,7 @@ import snowflake.snowpark from snowflake.snowpark._internal.analyzer.expression import FunctionExpression +from snowflake.snowpark._internal.utils import unalias_datetime_part from snowflake.snowpark.mock._options import numpy, pandas from snowflake.snowpark.mock._snowflake_data_type import ( _TIMESTAMP_TYPE_MAPPING, @@ -57,7 +58,6 @@ convert_numeric_string_value_to_float_seconds, convert_snowflake_datetime_format, process_string_time_with_fractional_seconds, - unalias_datetime_part, ) RETURN_TYPE = Union[ColumnEmulator, TableEmulator] diff --git a/src/snowflake/snowpark/mock/_util.py b/src/snowflake/snowpark/mock/_util.py index 48418d7cd26..085d380eec7 100644 --- a/src/snowflake/snowpark/mock/_util.py +++ b/src/snowflake/snowpark/mock/_util.py @@ -13,7 +13,6 @@ from snowflake.snowpark._internal.utils import parse_table_name, quote_name from snowflake.snowpark.mock._options import pandas as pd from snowflake.snowpark.mock._snowflake_data_type import ColumnEmulator -from snowflake.snowpark.mock.exceptions import SnowparkLocalTestingException from snowflake.snowpark.types import ( ArrayType, BinaryType, @@ -263,61 +262,6 @@ def fix_drift_between_column_sf_type_and_dtype(col: ColumnEmulator): return col -# More info about all allowed aliases here: -# https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts - -DATETIME_PART_TO_ALIASES = { - "year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"}, - "quarter": {"quarter", "q", "qtr", "qtrs", "quarters"}, - "month": {"month", "mm", "mon", "mons", "months"}, - "week": {"week", "w", "wk", "weekofyear", "woy", "wy"}, - "day": {"day", "d", "dd", "days", "dayofmonth"}, - "hour": {"hour", "h", "hh", "hr", "hours", "hrs"}, - "minute": {"minute", "m", "mi", "min", "minutes", "mins"}, - "second": {"second", "s", "sec", "seconds", "secs"}, - "millisecond": {"millisecond", "ms", "msec", "milliseconds"}, - "microsecond": {"microsecond", "us", "usec", "microseconds"}, - "nanosecond": { - "nanosecond", - "ns", - "nsec", - "nanosec", - "nsecond", - "nanoseconds", - "nanosecs", - "nseconds", - }, - "dayofweek": {"dayofweek", "weekday", "dow", "dw"}, - "dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"}, - "dayofyear": {"dayofyear", "yearday", "doy", "dy"}, - "weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"}, - "yearofweek": {"yearofweek"}, - "yearofweekiso": {"yearofweekiso"}, - "epoch_second": {"epoch_second", "epoch", "epoch_seconds"}, - "epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"}, - "epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"}, - "epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"}, - "timezone_hour": {"timezone_hour", "tzh"}, - "timezone_minute": {"timezone_minute", "tzm"}, -} - -DATETIME_PARTS = set(DATETIME_PART_TO_ALIASES.keys()) -ALIASES_TO_DATETIME_PART = { - v: k for k, l in DATETIME_PART_TO_ALIASES.items() for v in l -} -DATETIME_ALIASES = set(ALIASES_TO_DATETIME_PART.keys()) - - -def unalias_datetime_part(part): - lowered_part = part.lower() - if lowered_part in DATETIME_ALIASES: - return ALIASES_TO_DATETIME_PART[lowered_part] - else: - SnowparkLocalTestingException.raise_from_error( - ValueError(f"{part} is not a recognized date or time part.") - ) - - def get_fully_qualified_name( name: Union[str, Iterable[str]], current_schema: str, current_database: str ) -> str: diff --git a/src/snowflake/snowpark/relational_grouped_dataframe.py b/src/snowflake/snowpark/relational_grouped_dataframe.py index 7b525ee9cd5..86a20b60427 100644 --- a/src/snowflake/snowpark/relational_grouped_dataframe.py +++ b/src/snowflake/snowpark/relational_grouped_dataframe.py @@ -156,22 +156,27 @@ def _to_df(self, agg_exprs: List[Expression]) -> DataFrame: used = set() unique = [a for a in aliased_agg if a not in used and (used.add(a) or True)] aliased_agg = [_alias(a) for a in unique] + # Aliases cannot be used in grouping statements, but the child expresion can + unaliased_grouping = [ + expr.child if isinstance(expr, Alias) else expr + for expr in self._grouping_exprs + ] if isinstance(self._group_type, _GroupByType): group_plan = Aggregate( - self._grouping_exprs, + unaliased_grouping, aliased_agg, self._df._select_statement or self._df._plan, ) elif isinstance(self._group_type, _RollupType): group_plan = Aggregate( - [Rollup(self._grouping_exprs)], + [Rollup(unaliased_grouping)], aliased_agg, self._df._select_statement or self._df._plan, ) elif isinstance(self._group_type, _CubeType): group_plan = Aggregate( - [Cube(self._grouping_exprs)], + [Cube(unaliased_grouping)], aliased_agg, self._df._select_statement or self._df._plan, ) @@ -179,7 +184,7 @@ def _to_df(self, agg_exprs: List[Expression]) -> DataFrame: if len(agg_exprs) != 1: raise SnowparkClientExceptionMessages.DF_PIVOT_ONLY_SUPPORT_ONE_AGG_EXPR() group_plan = Pivot( - self._grouping_exprs, + unaliased_grouping, self._group_type.pivot_col, self._group_type.values, agg_exprs, diff --git a/tests/integ/test_column_names.py b/tests/integ/test_column_names.py index 8ee0dc5536c..cb93cedfe6e 100644 --- a/tests/integ/test_column_names.py +++ b/tests/integ/test_column_names.py @@ -91,6 +91,14 @@ def verify_column_result( assert res == expected_rows +def test_nested_alias(session): + df = session.create_dataframe(["v"], schema=["c"]) + df2 = df.select(df.c.alias("foo").alias("bar")) + rows = df.collect() + assert df2.columns == ["BAR"] + assert rows == [Row(BAR="v")] + + def test_like(session): df1 = session.create_dataframe(["v"], schema=["c"]) df2 = df1.select(df1["c"].like(lit("v%"))) diff --git a/tests/integ/test_df_aggregate.py b/tests/integ/test_df_aggregate.py index bc0f25064b3..59f0677c0a1 100644 --- a/tests/integ/test_df_aggregate.py +++ b/tests/integ/test_df_aggregate.py @@ -12,6 +12,7 @@ from snowflake.snowpark.exceptions import SnowparkSQLException from snowflake.snowpark.functions import ( approx_percentile_combine, + ascii, avg, col, count, @@ -664,3 +665,24 @@ def mock_stddev_pop(column: ColumnEmulator): Utils.check_answer( origin_df.select(stddev("n"), stddev_pop("m")).collect(), Row(123.0, 456.0) ) + + +def test_agg_column_naming(session): + df = session.create_dataframe( + [ + ("x", 1), + ("x", 2), + ("y", 1), + ], + schema=["a", "b"], + ) + + # DF with automatic naming + df2 = df.group_by(ascii(df.a)).agg(max_(df.b)) + + # DF with specific naming + df3 = df.group_by(ascii(df.a).alias("ord")).agg(max_(df.b).alias("max")) + + assert df2.columns == ['"ASCII(A)"', '"MAX(B)"'] + assert df3.columns == ["ORD", "MAX"] + assert df2.collect() == df3.collect() == [Row(120, 2), Row(121, 1)] From d3904ee7ee757c0d79acee7198b1635edda298a2 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 31 Oct 2024 10:39:08 -0700 Subject: [PATCH 2/5] CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22ddfe7a7bd..d70019c8d3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Added support for `Index.to_numpy`. - Added support for `DataFrame.align` and `Series.align` for `axis=0`. - Added support for `size` in `GroupBy.aggregate`, `DataFrame.aggregate`, and `Series.aggregate`. +- Added partial support for `snowflake.snowpark.functions.window` ### Snowpark Local Testing Updates From 1dabe7591b4048cc97db8ac48a17dd3aee6e0a53 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 31 Oct 2024 13:32:08 -0700 Subject: [PATCH 3/5] fix local test --- src/snowflake/snowpark/functions.py | 2 +- src/snowflake/snowpark/mock/_plan.py | 13 ++++++++++--- tests/integ/test_df_aggregate.py | 12 ++++++------ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/snowflake/snowpark/functions.py b/src/snowflake/snowpark/functions.py index 809c61147d8..f562fed2593 100644 --- a/src/snowflake/snowpark/functions.py +++ b/src/snowflake/snowpark/functions.py @@ -4651,7 +4651,7 @@ def window( """ if slideDuration: - # SNOW- : slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That + # SNOW-1063685: slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That # currently would require a udtf which may have significantly different performance. raise NotImplementedError( "snowflake.snowpark.functions.window does not support slideDuration parameter yet." diff --git a/src/snowflake/snowpark/mock/_plan.py b/src/snowflake/snowpark/mock/_plan.py index a87051b568a..0b5197d7c24 100644 --- a/src/snowflake/snowpark/mock/_plan.py +++ b/src/snowflake/snowpark/mock/_plan.py @@ -871,9 +871,16 @@ def execute_mock_plan( col_name = f"" by_column_expression.append(child_rf[col_name]) else: - by_column_expression.append( - child_rf[plan.session._analyzer.analyze(exp)] - ) + column_name = plan.session._analyzer.analyze(exp) + if isinstance(exp, FunctionExpression): + materialized_column = calculate_expression( + exp, child_rf, plan.session._analyzer, expr_to_alias + ) + # Only function expressions that are a mapping of existing columns can be aggregated on. + # Any increase or reduction in number of rows is an invalid function expression. + if len(materialized_column) == len(child_rf): + child_rf[column_name] = materialized_column + by_column_expression.append(child_rf[column_name]) except KeyError as e: raise SnowparkLocalTestingException( f"This is not a valid group by expression due to exception {e!r}" diff --git a/tests/integ/test_df_aggregate.py b/tests/integ/test_df_aggregate.py index 59f0677c0a1..1572355839d 100644 --- a/tests/integ/test_df_aggregate.py +++ b/tests/integ/test_df_aggregate.py @@ -12,7 +12,6 @@ from snowflake.snowpark.exceptions import SnowparkSQLException from snowflake.snowpark.functions import ( approx_percentile_combine, - ascii, avg, col, count, @@ -29,6 +28,7 @@ stddev, stddev_pop, sum as sum_, + upper, ) from snowflake.snowpark.mock._snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import DoubleType @@ -678,11 +678,11 @@ def test_agg_column_naming(session): ) # DF with automatic naming - df2 = df.group_by(ascii(df.a)).agg(max_(df.b)) + df2 = df.group_by(upper(df.a)).agg(max_(df.b)) # DF with specific naming - df3 = df.group_by(ascii(df.a).alias("ord")).agg(max_(df.b).alias("max")) + df3 = df.group_by(upper(df.a).alias("UPPER")).agg(max_(df.b).alias("max")) - assert df2.columns == ['"ASCII(A)"', '"MAX(B)"'] - assert df3.columns == ["ORD", "MAX"] - assert df2.collect() == df3.collect() == [Row(120, 2), Row(121, 1)] + assert df2.columns == ['"UPPER(A)"', '"MAX(B)"'] + assert df3.columns == ["UPPER", "MAX"] + assert df2.collect() == df3.collect() == [Row("X", 2), Row("Y", 1)] From e6893be32f17718fed17cff761dbbb90decb6b82 Mon Sep 17 00:00:00 2001 From: Jamison Date: Thu, 31 Oct 2024 13:50:25 -0700 Subject: [PATCH 4/5] update type hint --- src/snowflake/snowpark/_internal/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index b771566fb05..8d174d4b40e 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -277,7 +277,7 @@ def unalias_datetime_part(part): raise ValueError(f"{part} is not a recognized date or time part.") -def parse_duration_string(duration: str) -> tuple[int, str]: +def parse_duration_string(duration: str) -> Tuple[int, str]: length, unit = duration.split(" ") length = int(length) unit = unalias_datetime_part(unit) From 1dfceb969fb9891d2918b98ecd68e5a752e1d84e Mon Sep 17 00:00:00 2001 From: Jamison Rose Date: Thu, 31 Oct 2024 15:06:38 -0700 Subject: [PATCH 5/5] Update src/snowflake/snowpark/functions.py Co-authored-by: Afroz Alam --- src/snowflake/snowpark/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/functions.py b/src/snowflake/snowpark/functions.py index f562fed2593..ee8fa5c4bad 100644 --- a/src/snowflake/snowpark/functions.py +++ b/src/snowflake/snowpark/functions.py @@ -4602,7 +4602,7 @@ def window( startTime: Optional[str] = None, ) -> Column: """ - Converts a time column into a window object with start and end times. Window start times areinclusive + Converts a time column into a window object with start and end times. Window start times are inclusive while end times are exclusive. For example 9:30 is in the window [9:30, 10:00), but not [9:00, 9:30). Args: