Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-859943: Add basic support for functions.window #2545

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Added support for `Index.to_numpy`.
- Added support for `DataFrame.align` and `Series.align` for `axis=0`.
- Added support for `size` in `GroupBy.aggregate`, `DataFrame.aggregate`, and `Series.aggregate`.
- Added partial support for `snowflake.snowpark.functions.window`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it up to 1.25.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call it partial support? lets documents its capabilities to function description and not word it as partial support.

- Added support for `pd.read_pickle` (Uses native pandas for processing).
- Added support for `pd.read_html` (Uses native pandas for processing).
- Added support for `pd.read_xml` (Uses native pandas for processing).
Expand Down
1 change: 1 addition & 0 deletions docs/source/snowpark/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,6 @@ Functions
when
when_matched
when_not_matched
window
xmlget
year
60 changes: 60 additions & 0 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,66 @@ class TempObjectType(Enum):
CTE = "CTE"


# More info about all allowed aliases here:
# https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts

DATETIME_PART_TO_ALIASES = {
"year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"},
"quarter": {"quarter", "q", "qtr", "qtrs", "quarters"},
"month": {"month", "mm", "mon", "mons", "months"},
"week": {"week", "w", "wk", "weekofyear", "woy", "wy"},
"day": {"day", "d", "dd", "days", "dayofmonth"},
"hour": {"hour", "h", "hh", "hr", "hours", "hrs"},
"minute": {"minute", "m", "mi", "min", "minutes", "mins"},
"second": {"second", "s", "sec", "seconds", "secs"},
"millisecond": {"millisecond", "ms", "msec", "milliseconds"},
"microsecond": {"microsecond", "us", "usec", "microseconds"},
"nanosecond": {
"nanosecond",
"ns",
"nsec",
"nanosec",
"nsecond",
"nanoseconds",
"nanosecs",
"nseconds",
},
"dayofweek": {"dayofweek", "weekday", "dow", "dw"},
"dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"},
"dayofyear": {"dayofyear", "yearday", "doy", "dy"},
"weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"},
"yearofweek": {"yearofweek"},
"yearofweekiso": {"yearofweekiso"},
"epoch_second": {"epoch_second", "epoch", "epoch_seconds"},
"epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"},
"epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"},
"epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"},
"timezone_hour": {"timezone_hour", "tzh"},
"timezone_minute": {"timezone_minute", "tzm"},
}
Comment on lines +231 to +263
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you do a lot of trial and error for this? Are these aliases documented somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


DATETIME_PARTS = set(DATETIME_PART_TO_ALIASES.keys())
ALIASES_TO_DATETIME_PART = {
v: k for k, l in DATETIME_PART_TO_ALIASES.items() for v in l
}
DATETIME_ALIASES = set(ALIASES_TO_DATETIME_PART.keys())


def unalias_datetime_part(part):
lowered_part = part.lower()
if lowered_part in DATETIME_ALIASES:
return ALIASES_TO_DATETIME_PART[lowered_part]
else:
raise ValueError(f"{part} is not a recognized date or time part.")


def parse_duration_string(duration: str) -> Tuple[int, str]:
length, unit = duration.split(" ")
length = int(length)
unit = unalias_datetime_part(unit)
return length, unit


def validate_object_name(name: str):
if not SNOWFLAKE_OBJECT_RE_PATTERN.match(name):
raise SnowparkClientExceptionMessages.GENERAL_INVALID_OBJECT_NAME(name)
Expand Down
5 changes: 4 additions & 1 deletion src/snowflake/snowpark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,10 @@ def alias(self, alias: str) -> "Column":

def name(self, alias: str) -> "Column":
"""Returns a new renamed Column."""
return Column(Alias(self._expression, quote_name(alias)))
expr = self._expression
if isinstance(expr, Alias):
expr = expr.child
return Column(Alias(expr, quote_name(alias)))

def over(self, window: Optional[WindowSpec] = None) -> "Column":
"""
Expand Down
85 changes: 85 additions & 0 deletions src/snowflake/snowpark/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
)
from snowflake.snowpark._internal.udf_utils import check_decorator_args
from snowflake.snowpark._internal.utils import (
parse_duration_string,
parse_positional_args_to_list,
validate_object_name,
)
Expand Down Expand Up @@ -4594,6 +4595,90 @@ def dayofyear(e: ColumnOrName) -> Column:
return builtin("dayofyear")(c)


def window(
timeColumn: ColumnOrName,
windowDuration: str,
slideDuration: Optional[str] = None,
startTime: Optional[str] = None,
Comment on lines +4599 to +4602
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camelCase is generally not pythonic. Can we convert this into snake_case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but to maintain compatibility with the pyspark implementation I think camelCase might be required. I could modify so that either version is accepted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IHMO api style consistency within the snowpark lib outweighs consistency with pyspark.

) -> Column:
"""
Converts a time column into a window object with start and end times. Window start times are
inclusive while end times are exclusive. For example 9:30 is in the window [9:30, 10:00), but not [9:00, 9:30).

Args:
timeColumn: The column to apply the window transformation to.
windowDuration: An interval string that determines the length of each window.
slideDuration: An interval string representing the amount of time in-between the start of
each window. Note that this parameter is not supported yet. Specifying it will raise a
NotImplementedError exception.
startTime: An interval string representing the amount of time the start of each window is
offset. eg. a five minute window with startTime of '2 minutes' will be from [9:02, 9:07)
instead of [9:00, 9:05)

Note:
Interval strings are of the form 'quantity unit' where quantity is an integer and unitis
is a supported time unit. This function supports the same time units as dateadd. see
`supported time units <https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts>`_
for more information.

Example::

>>> import datetime
>>> from snowflake.snowpark.functions import window
>>> df = session.createDataFrame(
... [(datetime.datetime.strptime("2024-10-31 09:05:00.000", "%Y-%m-%d %H:%M:%S.%f"),)],
... schema=["time"]
... )
>>> df.select(window(df.time, "5 minutes")).show()
----------------------------------------
|"WINDOW" |
----------------------------------------
|{ |
| "end": "2024-10-31 09:10:00.000", |
| "start": "2024-10-31 09:05:00.000" |
|} |
----------------------------------------
<BLANKLINE>
>>> df.select(window(df.time, "5 minutes", startTime="2 minutes")).show()
----------------------------------------
|"WINDOW" |
----------------------------------------
|{ |
| "end": "2024-10-31 09:07:00.000", |
| "start": "2024-10-31 09:02:00.000" |
|} |
----------------------------------------
<BLANKLINE>
"""
if slideDuration:
# SNOW-1063685: slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That
# currently would require a udtf which may have significantly different performance.
raise NotImplementedError(
"snowflake.snowpark.functions.window does not support slideDuration parameter yet."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since @sfc-gh-qding volunteered as a sql expert for our team, we can discuss if doing this is possible without udtf.

)

epoch = lit("1970-01-01 00:00:00").cast(TimestampType())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to care about timezone here?

time = _to_col_if_str(timeColumn, "window")

window_duration, window_unit = parse_duration_string(windowDuration)
window_duration = lit(window_duration)
window_unit = f"{window_unit}s"

base = epoch
if startTime:
start_duration, start_unit = parse_duration_string(startTime)
base += make_interval(**{f"{start_unit}s": start_duration})

window = floor(datediff(window_unit, base, time) / window_duration)
window_start = dateadd(window_unit, window * window_duration, base)
return object_construct_keep_null(
lit("start"),
window_start,
lit("end"),
dateadd(window_unit, window_duration, window_start),
).alias("window")


def is_array(col: ColumnOrName) -> Column:
"""
Returns true if the specified VARIANT column contains an ARRAY value.
Expand Down
2 changes: 1 addition & 1 deletion src/snowflake/snowpark/mock/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import snowflake.snowpark
from snowflake.snowpark._internal.analyzer.expression import FunctionExpression
from snowflake.snowpark._internal.utils import unalias_datetime_part
from snowflake.snowpark.mock._options import numpy, pandas
from snowflake.snowpark.mock._snowflake_data_type import (
_TIMESTAMP_TYPE_MAPPING,
Expand Down Expand Up @@ -57,7 +58,6 @@
convert_numeric_string_value_to_float_seconds,
convert_snowflake_datetime_format,
process_string_time_with_fractional_seconds,
unalias_datetime_part,
)

RETURN_TYPE = Union[ColumnEmulator, TableEmulator]
Expand Down
13 changes: 10 additions & 3 deletions src/snowflake/snowpark/mock/_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,9 +871,16 @@ def execute_mock_plan(
col_name = f"<local_test_internal_{str(exp.value)}>"
by_column_expression.append(child_rf[col_name])
else:
by_column_expression.append(
child_rf[plan.session._analyzer.analyze(exp)]
)
column_name = plan.session._analyzer.analyze(exp)
if isinstance(exp, FunctionExpression):
materialized_column = calculate_expression(
exp, child_rf, plan.session._analyzer, expr_to_alias
)
# Only function expressions that are a mapping of existing columns can be aggregated on.
# Any increase or reduction in number of rows is an invalid function expression.
if len(materialized_column) == len(child_rf):
child_rf[column_name] = materialized_column
Comment on lines +879 to +882
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to the window function support in local testing as groupby expression?
also I don't understand why we want to materialized_column here -- what's the case it's addressing. is there an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to having any function expression in the groupby. For example:
df2 = df.group_by(upper(df.a)).agg(max_(df.b))
This is perfectly valid in live mode, but does not work in local testing at the moment.

by_column_expression.append(child_rf[column_name])
except KeyError as e:
raise SnowparkLocalTestingException(
f"This is not a valid group by expression due to exception {e!r}"
Expand Down
56 changes: 0 additions & 56 deletions src/snowflake/snowpark/mock/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from snowflake.snowpark._internal.utils import parse_table_name, quote_name
from snowflake.snowpark.mock._options import pandas as pd
from snowflake.snowpark.mock._snowflake_data_type import ColumnEmulator
from snowflake.snowpark.mock.exceptions import SnowparkLocalTestingException
from snowflake.snowpark.types import (
ArrayType,
BinaryType,
Expand Down Expand Up @@ -263,61 +262,6 @@ def fix_drift_between_column_sf_type_and_dtype(col: ColumnEmulator):
return col


# More info about all allowed aliases here:
# https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts

DATETIME_PART_TO_ALIASES = {
"year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"},
"quarter": {"quarter", "q", "qtr", "qtrs", "quarters"},
"month": {"month", "mm", "mon", "mons", "months"},
"week": {"week", "w", "wk", "weekofyear", "woy", "wy"},
"day": {"day", "d", "dd", "days", "dayofmonth"},
"hour": {"hour", "h", "hh", "hr", "hours", "hrs"},
"minute": {"minute", "m", "mi", "min", "minutes", "mins"},
"second": {"second", "s", "sec", "seconds", "secs"},
"millisecond": {"millisecond", "ms", "msec", "milliseconds"},
"microsecond": {"microsecond", "us", "usec", "microseconds"},
"nanosecond": {
"nanosecond",
"ns",
"nsec",
"nanosec",
"nsecond",
"nanoseconds",
"nanosecs",
"nseconds",
},
"dayofweek": {"dayofweek", "weekday", "dow", "dw"},
"dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"},
"dayofyear": {"dayofyear", "yearday", "doy", "dy"},
"weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"},
"yearofweek": {"yearofweek"},
"yearofweekiso": {"yearofweekiso"},
"epoch_second": {"epoch_second", "epoch", "epoch_seconds"},
"epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"},
"epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"},
"epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"},
"timezone_hour": {"timezone_hour", "tzh"},
"timezone_minute": {"timezone_minute", "tzm"},
}

DATETIME_PARTS = set(DATETIME_PART_TO_ALIASES.keys())
ALIASES_TO_DATETIME_PART = {
v: k for k, l in DATETIME_PART_TO_ALIASES.items() for v in l
}
DATETIME_ALIASES = set(ALIASES_TO_DATETIME_PART.keys())


def unalias_datetime_part(part):
lowered_part = part.lower()
if lowered_part in DATETIME_ALIASES:
return ALIASES_TO_DATETIME_PART[lowered_part]
else:
SnowparkLocalTestingException.raise_from_error(
ValueError(f"{part} is not a recognized date or time part.")
)


def get_fully_qualified_name(
name: Union[str, Iterable[str]], current_schema: str, current_database: str
) -> str:
Expand Down
13 changes: 9 additions & 4 deletions src/snowflake/snowpark/relational_grouped_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,30 +156,35 @@ def _to_df(self, agg_exprs: List[Expression]) -> DataFrame:
used = set()
unique = [a for a in aliased_agg if a not in used and (used.add(a) or True)]
aliased_agg = [_alias(a) for a in unique]
# Aliases cannot be used in grouping statements, but the child expresion can
unaliased_grouping = [
expr.child if isinstance(expr, Alias) else expr
for expr in self._grouping_exprs
]

if isinstance(self._group_type, _GroupByType):
group_plan = Aggregate(
self._grouping_exprs,
unaliased_grouping,
aliased_agg,
self._df._select_statement or self._df._plan,
)
elif isinstance(self._group_type, _RollupType):
group_plan = Aggregate(
[Rollup(self._grouping_exprs)],
[Rollup(unaliased_grouping)],
aliased_agg,
self._df._select_statement or self._df._plan,
)
elif isinstance(self._group_type, _CubeType):
group_plan = Aggregate(
[Cube(self._grouping_exprs)],
[Cube(unaliased_grouping)],
aliased_agg,
self._df._select_statement or self._df._plan,
)
elif isinstance(self._group_type, _PivotType):
if len(agg_exprs) != 1:
raise SnowparkClientExceptionMessages.DF_PIVOT_ONLY_SUPPORT_ONE_AGG_EXPR()
group_plan = Pivot(
self._grouping_exprs,
unaliased_grouping,
self._group_type.pivot_col,
self._group_type.values,
agg_exprs,
Expand Down
8 changes: 8 additions & 0 deletions tests/integ/test_column_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ def verify_column_result(
assert res == expected_rows


def test_nested_alias(session):
df = session.create_dataframe(["v"], schema=["c"])
df2 = df.select(df.c.alias("foo").alias("bar"))
rows = df.collect()
assert df2.columns == ["BAR"]
assert rows == [Row(BAR="v")]


def test_like(session):
df1 = session.create_dataframe(["v"], schema=["c"])
df2 = df1.select(df1["c"].like(lit("v%")))
Expand Down
22 changes: 22 additions & 0 deletions tests/integ/test_df_aggregate.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have test for the new window function?

I saw pyspark that it has example of using groupby + windows function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a doctest example that shows it being used as a groupby expression.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
stddev,
stddev_pop,
sum as sum_,
upper,
)
from snowflake.snowpark.mock._snowflake_data_type import ColumnEmulator, ColumnType
from snowflake.snowpark.types import DoubleType
Expand Down Expand Up @@ -664,3 +665,24 @@ def mock_stddev_pop(column: ColumnEmulator):
Utils.check_answer(
origin_df.select(stddev("n"), stddev_pop("m")).collect(), Row(123.0, 456.0)
)


def test_agg_column_naming(session):
df = session.create_dataframe(
[
("x", 1),
("x", 2),
("y", 1),
],
schema=["a", "b"],
)

# DF with automatic naming
df2 = df.group_by(upper(df.a)).agg(max_(df.b))

# DF with specific naming
df3 = df.group_by(upper(df.a).alias("UPPER")).agg(max_(df.b).alias("max"))

assert df2.columns == ['"UPPER(A)"', '"MAX(B)"']
assert df3.columns == ["UPPER", "MAX"]
assert df2.collect() == df3.collect() == [Row("X", 2), Row("Y", 1)]
Loading