Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-859943: Add basic support for functions.window #2545

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

sfc-gh-jrose
Copy link
Contributor

@sfc-gh-jrose sfc-gh-jrose commented Oct 31, 2024

  1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.

    Fixes SNOW-859943

  2. Fill out the following pre-review checklist:

    • I am adding a new automated test(s) to verify correctness of my new code
      • If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing
    • I am adding new logging messages
    • I am adding a new telemetry message
    • I am adding new credentials
    • I am adding a new dependency
    • If this is a new feature/behavior, I'm adding the Local Testing parity changes.
    • I acknowledge that I have ensured my changes to be thread-safe. Follow the link for more information: Thread-safe Developer Guidelines
  3. Please describe how your code solves the related issue.

    This PR adds partial support for functions.window. This involved a few notable changes:

  • Moved utils around time part aliases from local testing mock utils to the internal anlyzer utils. The pyspark equivalent uses interval strings to support this feature so we need some amount of parsing logic for compatibility.
  • This function does not have a sql equivalent so it is instead composed of dataframe operations that are equivalent to the logic needed.
    • The resulting column would have an overly verbose name so I've instead aliased it to the default value of window. These columns are often used as aggregate keys though which cannot be aliased. In order to support this use case I've modified the aggregation logic to remove the alias if present. This has the side-effect of also allowing users to have a statement like this: df.groupby(upper(col("cat")).alias("cat").agg(...). The resulting aggregate key column would have the name cat instead of UPPER(""CAT"")"
    • Users may want to immediately re-alias the column so I've modified Column so that if you alias an already aliased column it replaces the alias instead of trying to alias twice. This allows a statement like col("cat").alias("a1").alias("a2") which results in a column of name a2.
  • This addition does not have full parity with the pyspark window function yet. The slideDuration parameter causes this function to go from a simple mapping to something that would require a udtf. That would have a significantly different performance profile so I've left it unimplemented until we can get better sql support.
  • Modified local testing groupby to allow an arbitrary function expression. As far as I can tell, in live mode function expressions are allowed as groupby keys as long as they aren't aggregate or generator expressions.

@sfc-gh-jrose sfc-gh-jrose requested review from a team as code owners October 31, 2024 17:41
@github-actions github-actions bot added the local testing Local Testing issues/PRs label Oct 31, 2024
@sfc-gh-jrose sfc-gh-jrose requested a review from a team October 31, 2024 17:42
@@ -27,6 +27,7 @@
- Added support for `Index.to_numpy`.
- Added support for `DataFrame.align` and `Series.align` for `axis=0`.
- Added support for `size` in `GroupBy.aggregate`, `DataFrame.aggregate`, and `Series.aggregate`.
- Added partial support for `snowflake.snowpark.functions.window`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it up to 1.25.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call it partial support? lets documents its capabilities to function description and not word it as partial support.

Comment on lines +231 to +263
"year": {"year", "y", "yy", "yyy", "yyyy", "yr", "years", "yrs"},
"quarter": {"quarter", "q", "qtr", "qtrs", "quarters"},
"month": {"month", "mm", "mon", "mons", "months"},
"week": {"week", "w", "wk", "weekofyear", "woy", "wy"},
"day": {"day", "d", "dd", "days", "dayofmonth"},
"hour": {"hour", "h", "hh", "hr", "hours", "hrs"},
"minute": {"minute", "m", "mi", "min", "minutes", "mins"},
"second": {"second", "s", "sec", "seconds", "secs"},
"millisecond": {"millisecond", "ms", "msec", "milliseconds"},
"microsecond": {"microsecond", "us", "usec", "microseconds"},
"nanosecond": {
"nanosecond",
"ns",
"nsec",
"nanosec",
"nsecond",
"nanoseconds",
"nanosecs",
"nseconds",
},
"dayofweek": {"dayofweek", "weekday", "dow", "dw"},
"dayofweekiso": {"dayofweekiso", "weekday_iso", "dow_iso", "dw_iso"},
"dayofyear": {"dayofyear", "yearday", "doy", "dy"},
"weekiso": {"weekiso", "week_iso", "weekofyeariso", "weekofyear_iso"},
"yearofweek": {"yearofweek"},
"yearofweekiso": {"yearofweekiso"},
"epoch_second": {"epoch_second", "epoch", "epoch_seconds"},
"epoch_millisecond": {"epoch_millisecond", "epoch_milliseconds"},
"epoch_microsecond": {"epoch_microsecond", "epoch_microseconds"},
"epoch_nanosecond": {"epoch_nanosecond", "epoch_nanoseconds"},
"timezone_hour": {"timezone_hour", "tzh"},
"timezone_minute": {"timezone_minute", "tzm"},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you do a lot of trial and error for this? Are these aliases documented somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/snowflake/snowpark/functions.py Outdated Show resolved Hide resolved
Comment on lines +4599 to +4602
timeColumn: ColumnOrName,
windowDuration: str,
slideDuration: Optional[str] = None,
startTime: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camelCase is generally not pythonic. Can we convert this into snake_case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but to maintain compatibility with the pyspark implementation I think camelCase might be required. I could modify so that either version is accepted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IHMO api style consistency within the snowpark lib outweighs consistency with pyspark.

"snowflake.snowpark.functions.window does not support slideDuration parameter yet."
)

epoch = lit("1970-01-01 00:00:00").cast(TimestampType())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to care about timezone here?

# SNOW-1063685: slideDuration changes this function from a 1:1 mapping to a 1:N mapping. That
# currently would require a udtf which may have significantly different performance.
raise NotImplementedError(
"snowflake.snowpark.functions.window does not support slideDuration parameter yet."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since @sfc-gh-qding volunteered as a sql expert for our team, we can discuss if doing this is possible without udtf.

Co-authored-by: Afroz Alam <afroz.alam@snowflake.com>
Comment on lines +879 to +882
# Only function expressions that are a mapping of existing columns can be aggregated on.
# Any increase or reduction in number of rows is an invalid function expression.
if len(materialized_column) == len(child_rf):
child_rf[column_name] = materialized_column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to the window function support in local testing as groupby expression?
also I don't understand why we want to materialized_column here -- what's the case it's addressing. is there an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to having any function expression in the groupby. For example:
df2 = df.group_by(upper(df.a)).agg(max_(df.b))
This is perfectly valid in live mode, but does not work in local testing at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have test for the new window function?

I saw pyspark that it has example of using groupby + windows function: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a doctest example that shows it being used as a groupby expression.

Comment on lines +4599 to +4602
timeColumn: ColumnOrName,
windowDuration: str,
slideDuration: Optional[str] = None,
startTime: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IHMO api style consistency within the snowpark lib outweighs consistency with pyspark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
local testing Local Testing issues/PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants