-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-859943: Add basic support for functions.window #2545
base: main
Are you sure you want to change the base?
Changes from all commits
7a2a050
d3904ee
af44942
1dabe75
e6893be
1dfceb9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,5 +322,6 @@ Functions | |
when | ||
when_matched | ||
when_not_matched | ||
window | ||
xmlget | ||
year |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -224,6 +224,66 @@ class TempObjectType(Enum): | |
CTE = "CTE" | ||
|
||
|
||
# More info about all allowed aliases here: | ||
# https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts | ||
|
||
DATETIME_PART_TO_ALIASES = { | ||
"year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"}, | ||
"quarter": {"quarter", "q", "qtr", "qtrs", "quarters"}, | ||
"month": {"month", "mm", "mon", "mons", "months"}, | ||
"week": {"week", "w", "wk", "weekofyear", "woy", "wy"}, | ||
"day": {"day", "d", "dd", "days", "dayofmonth"}, | ||
"hour": {"hour", "h", "hh", "hr", "hours", "hrs"}, | ||
"minute": {"minute", "m", "mi", "min", "minutes", "mins"}, | ||
"second": {"second", "s", "sec", "seconds", "secs"}, | ||
"millisecond": {"millisecond", "ms", "msec", "milliseconds"}, | ||
"microsecond": {"microsecond", "us", "usec", "microseconds"}, | ||
"nanosecond": { | ||
"nanosecond", | ||
"ns", | ||
"nsec", | ||
"nanosec", | ||
"nsecond", | ||
"nanoseconds", | ||
"nanosecs", | ||
"nseconds", | ||
}, | ||
"dayofweek": {"dayofweek", "weekday", "dow", "dw"}, | ||
"dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"}, | ||
"dayofyear": {"dayofyear", "yearday", "doy", "dy"}, | ||
"weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"}, | ||
"yearofweek": {"yearofweek"}, | ||
"yearofweekiso": {"yearofweekiso"}, | ||
"epoch_second": {"epoch_second", "epoch", "epoch_seconds"}, | ||
"epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"}, | ||
"epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"}, | ||
"epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"}, | ||
"timezone_hour": {"timezone_hour", "tzh"}, | ||
"timezone_minute": {"timezone_minute", "tzm"}, | ||
} | ||
Comment on lines
+231
to
+263
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you do a lot of trial and error for this? Are these aliases documented somewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a handy chart here: https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts |
||
|
||
DATETIME_PARTS = set(DATETIME_PART_TO_ALIASES.keys()) | ||
ALIASES_TO_DATETIME_PART = { | ||
v: k for k, l in DATETIME_PART_TO_ALIASES.items() for v in l | ||
} | ||
DATETIME_ALIASES = set(ALIASES_TO_DATETIME_PART.keys()) | ||
|
||
|
||
def unalias_datetime_part(part): | ||
lowered_part = part.lower() | ||
if lowered_part in DATETIME_ALIASES: | ||
return ALIASES_TO_DATETIME_PART[lowered_part] | ||
else: | ||
raise ValueError(f"{part} is not a recognized date or time part.") | ||
|
||
|
||
def parse_duration_string(duration: str) -> Tuple[int, str]: | ||
length, unit = duration.split(" ") | ||
length = int(length) | ||
unit = unalias_datetime_part(unit) | ||
return length, unit | ||
|
||
|
||
def validate_object_name(name: str): | ||
if not SNOWFLAKE_OBJECT_RE_PATTERN.match(name): | ||
raise SnowparkClientExceptionMessages.GENERAL_INVALID_OBJECT_NAME(name) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -191,6 +191,7 @@ | |
) | ||
from snowflake.snowpark._internal.udf_utils import check_decorator_args | ||
from snowflake.snowpark._internal.utils import ( | ||
parse_duration_string, | ||
parse_positional_args_to_list, | ||
validate_object_name, | ||
) | ||
|
@@ -4594,6 +4595,90 @@ def dayofyear(e: ColumnOrName) -> Column: | |
return builtin("dayofyear")(c) | ||
|
||
|
||
def window( | ||
timeColumn: ColumnOrName, | ||
windowDuration: str, | ||
slideDuration: Optional[str] = None, | ||
startTime: Optional[str] = None, | ||
Comment on lines
+4599
to
+4602
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. camelCase is generally not pythonic. Can we convert this into snake_case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but to maintain compatibility with the pyspark implementation I think camelCase might be required. I could modify so that either version is accepted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IHMO api style consistency within the snowpark lib outweighs consistency with pyspark. |
||
) -> Column: | ||
""" | ||
Converts a time column into a window object with start and end times. Window start times are | ||
inclusive while end times are exclusive. For example 9:30 is in the window [9:30, 10:00), but not [9:00, 9:30). | ||
|
||
Args: | ||
timeColumn: The column to apply the window transformation to. | ||
windowDuration: An interval string that determines the length of each window. | ||
slideDuration: An interval string representing the amount of time in-between the start of | ||
each window. Note that this parameter is not supported yet. Specifying it will raise a | ||
NotImplementedError exception. | ||
startTime: An interval string representing the amount of time the start of each window is | ||
offset. eg. a five minute window with startTime of '2 minutes' will be from [9:02, 9:07) | ||
instead of [9:00, 9:05) | ||
|
||
Note: | ||
Interval strings are of the form 'quantity unit' where quantity is an integer and unitis | ||
is a supported time unit. This function supports the same time units as dateadd. see | ||
`supported time units <https://docs.snowflake.com/en/sql-reference/functions-date-time#label-supported-date-time-parts>`_ | ||
for more information. | ||
|
||
Example:: | ||
|
||
>>> import datetime | ||
>>> from snowflake.snowpark.functions import window | ||
>>> df = session.createDataFrame( | ||
... [(datetime.datetime.strptime("2024-10-31 09:05:00.000", "%Y-%m-%d %H:%M:%S.%f"),)], | ||
... schema=["time"] | ||
... ) | ||
>>> df.select(window(df.time, "5 minutes")).show() | ||
---------------------------------------- | ||
|"WINDOW" | | ||
---------------------------------------- | ||
|{ | | ||
| "end": "2024-10-31 09:10:00.000", | | ||
| "start": "2024-10-31 09:05:00.000" | | ||
|} | | ||
---------------------------------------- | ||
<BLANKLINE> | ||
>>> df.select(window(df.time, "5 minutes", startTime="2 minutes")).show() | ||
---------------------------------------- | ||
|"WINDOW" | | ||
---------------------------------------- | ||
|{ | | ||
| "end": "2024-10-31 09:07:00.000", | | ||
| "start": "2024-10-31 09:02:00.000" | | ||
|} | | ||
---------------------------------------- | ||
<BLANKLINE> | ||
""" | ||
if slideDuration: | ||
# SNOW-1063685: slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That | ||
# currently would require a udtf which may have significantly different performance. | ||
raise NotImplementedError( | ||
"snowflake.snowpark.functions.window does not support slideDuration parameter yet." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since @sfc-gh-qding volunteered as a sql expert for our team, we can discuss if doing this is possible without udtf. |
||
) | ||
|
||
epoch = lit("1970-01-01 00:00:00").cast(TimestampType()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to care about timezone here? |
||
time = _to_col_if_str(timeColumn, "window") | ||
|
||
window_duration, window_unit = parse_duration_string(windowDuration) | ||
window_duration = lit(window_duration) | ||
window_unit = f"{window_unit}s" | ||
|
||
base = epoch | ||
if startTime: | ||
start_duration, start_unit = parse_duration_string(startTime) | ||
base += make_interval(**{f"{start_unit}s": start_duration}) | ||
|
||
window = floor(datediff(window_unit, base, time) / window_duration) | ||
window_start = dateadd(window_unit, window * window_duration, base) | ||
return object_construct_keep_null( | ||
lit("start"), | ||
window_start, | ||
lit("end"), | ||
dateadd(window_unit, window_duration, window_start), | ||
).alias("window") | ||
|
||
|
||
def is_array(col: ColumnOrName) -> Column: | ||
""" | ||
Returns true if the specified VARIANT column contains an ARRAY value. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -871,9 +871,16 @@ def execute_mock_plan( | |
col_name = f"<local_test_internal_{str(exp.value)}>" | ||
by_column_expression.append(child_rf[col_name]) | ||
else: | ||
by_column_expression.append( | ||
child_rf[plan.session._analyzer.analyze(exp)] | ||
) | ||
column_name = plan.session._analyzer.analyze(exp) | ||
if isinstance(exp, FunctionExpression): | ||
materialized_column = calculate_expression( | ||
exp, child_rf, plan.session._analyzer, expr_to_alias | ||
) | ||
# Only function expressions that are a mapping of existing columns can be aggregated on. | ||
# Any increase or reduction in number of rows is an invalid function expression. | ||
if len(materialized_column) == len(child_rf): | ||
child_rf[column_name] = materialized_column | ||
Comment on lines
+879
to
+882
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this related to the window function support in local testing as groupby expression? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is related to having any function expression in the groupby. For example: |
||
by_column_expression.append(child_rf[column_name]) | ||
except KeyError as e: | ||
raise SnowparkLocalTestingException( | ||
f"This is not a valid group by expression due to exception {e!r}" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have test for the new window function? I saw pyspark that it has example of using groupby + windows function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add a doctest example that shows it being used as a groupby expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move it up to 1.25.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call it partial support? lets documents its capabilities to function description and not word it as partial support.