-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add minimal Pyspark support #908
base: main
Are you sure you want to change the base?
Conversation
This PR diff is getting big because of all the |
For dask we started with it's own test file, so that we didn't have to modify every other file. Would that be a good strategy again? |
for more information, see https://pre-commit.ci
This is finally ready for review 🥵 I have to fix the tests on windows (I think I may need to set up Java or something similar) and the test with old version of pandas (that is not compatible with PySpark) I tried my best to keep it as small as possible while trying to implement the main functionality. Let me know what you think |
if self._backend_version < (3, 4) or parse_version(np.__version__) > (2, 0): | ||
from pyspark.sql.functions import stddev | ||
|
||
_ = ddof | ||
return stddev(_input) | ||
from pyspark.pandas.spark.functions import stddev | ||
|
||
return stddev(_input, ddof=ddof) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is ideal. Unfortunately the stddev
in pyspark SQL does not support ddof
From 3.4
they added one function in the pandas namespace that supports that. (but that is not available with numpy 2.0.0)
Based on which version one has, std
may return a different result :( Any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Correct numbers will depend if spark default ddof changes but for now it seems to be ddof=1)
The formula for adjusting should be fairly easy, something along the line of:
import pyspark.pandas.spark.functions as F
N = F.length(_input)
return F.stddev(_input) * F.sqrt((N-1)/(N-ddof))
@MarcoGorelli @FBruzzesi which criteria did we use to decide the minimal supported versions? Popularity? Time of release? |
i'd suggest, the lowest one that's not too difficult to support 😄 I think it'd be OK to set it quite high here, we can always work on lowering it later if there's demand |
Because of I decided to make * technically compatible: Ubuntu tests on 3.12 seem fine, Windows complains. But Python 3.12 will be officially supported starting from 4.0.0: https://issues.apache.org/jira/browse/SPARK-44120 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @EdAbati this is an awesome effort! And the CI failing is just due to coverage and no failing test is even more incredible!
I am not working with pyspark for some time, I tried to look at it with some critical eyes! I hope it helps
from narwhals._pyspark.dataframe import PySparkLazyFrame | ||
from narwhals._pyspark.expr import PySparkExpr | ||
from narwhals._pyspark.namespace import PySparkNamespace | ||
from narwhals._pyspark.typing import IntoPySparkExpr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting here as first encounter, and very nitpick/opinionated: I would rather go with SparkLazyFrame
and so on, as for pyarrow everything is just Arrow<class-name>
and not PyArrow<class-name>
def collect(self) -> Any: | ||
import pandas as pd # ignore-banned-import() | ||
|
||
from narwhals._pandas_like.dataframe import PandasLikeDataFrame | ||
|
||
return PandasLikeDataFrame( | ||
native_dataframe=self._native_frame.toPandas(), | ||
implementation=Implementation.PANDAS, | ||
backend_version=parse_version(pd.__version__), | ||
dtypes=self._dtypes, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A similar discussion was happening when I opened #1042 with Marco's concern for how to collect duckdb.
My opinion is that we should let the use decide to which eager backend collect (maybe we one as default).
Now I am not using pyspark in a couple of years, but if pandas is not a dependency, then this collect may also fail.
if self._backend_version >= (3, 3, 0): | ||
spark_session = self._native_frame.sparkSession | ||
else: # pragma: no cover | ||
from pyspark.sql import SparkSession | ||
|
||
spark_session = SparkSession.builder.getOrCreate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that 3.3 is then min we want/can support (?), so this would get cleaner?
new_columns_list = [col.alias(col_name) for col_name, col in new_columns.items()] | ||
return self._from_native_frame(self._native_frame.select(*new_columns_list)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👌 how do aggregations/reductions behave?
def func(df: PySparkLazyFrame) -> list[Column]: | ||
from pyspark.sql import functions as F # noqa: N812 | ||
|
||
_ = df | ||
return [F.col(col_name) for col_name in column_names] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming that _ = df
is just to avoid complains from linters. You can do:
def func(df: PySparkLazyFrame) -> list[Column]: | |
from pyspark.sql import functions as F # noqa: N812 | |
_ = df | |
return [F.col(col_name) for col_name in column_names] | |
def func(_: PySparkLazyFrame) -> list[Column]: | |
from pyspark.sql import functions as F # noqa: N812 | |
return [F.col(col_name) for col_name in column_names] |
if self._backend_version < (3, 4) or parse_version(np.__version__) > (2, 0): | ||
from pyspark.sql.functions import stddev | ||
|
||
_ = ddof | ||
return stddev(_input) | ||
from pyspark.pandas.spark.functions import stddev | ||
|
||
return stddev(_input, ddof=ddof) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Correct numbers will depend if spark default ddof changes but for now it seems to be ddof=1)
The formula for adjusting should be fairly easy, something along the line of:
import pyspark.pandas.spark.functions as F
N = F.length(_input)
return F.stddev(_input) * F.sqrt((N-1)/(N-ddof))
def func(df: PySparkLazyFrame) -> list[Column]: | ||
cols = [c for _expr in parsed_exprs for c in _expr._call(df)] | ||
col_name = get_column_name(df, cols[0]) | ||
return [reduce(operator.and_, cols).alias(col_name)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not able to find any docs on Column.<__and__|__or__>
, happy to see they just work
dtypes=self._dtypes, | ||
) | ||
|
||
def __add__(self, other: PySparkExpr) -> Self: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would other require to be parsed into column (e.g. if it is an expr)? Same for all other dunder methods
What type of PR is this? (check all applicable)
Related issues
Checklist
If you have comments or can explain your changes, please do so below.
As mentioned in the latest call, I've started working on the support for PySpark.
The goal of this PR would be to have a minimal initial implementation as a starting point. As we did for Dask, we can implement single methods in following PRs!