Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions python/sedona/spark/geopandas/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import shapely
import warnings
import numpy as np
import shapely
import geopandas as gpd
import pandas as pd
import pyspark.pandas as pspd
Expand Down Expand Up @@ -304,13 +303,13 @@ def __getitem__(self, key: Any) -> Any:
Name: value, dtype: int64
"""

# Here we are getting a ps.Series with the same underlying anchor (ps.Dataframe).
# This is important so we don't unnecessarily try to perform operations on different dataframes
# Here we are getting a ps.Series with the same underlying anchor (ps.DataFrame).
# This is important so we don't unnecessarily try to perform operations on different dataframes.
item = pspd.DataFrame.__getitem__(self, key)

if isinstance(item, pspd.DataFrame):
# don't specify crs=self.crs here because it might not include the geometry column
# if it does include the geometry column, we don't need to set crs anyways
# Don't specify crs=self.crs here because it might not include the geometry column.
# If it does include the geometry column, we don't need to set crs anyways.
return GeoDataFrame(item)
elif isinstance(item, pspd.Series):
ps_series: pspd.Series = item
Expand Down Expand Up @@ -352,7 +351,7 @@ def __init__(

# For each of these super().__init__() calls, we let pyspark decide which inputs are valid or not
# instead of calling e.g assert not dtype ourselves.
# This way, if Spark adds support later, than we inherit those changes naturally
# This way, if Spark adds support later, we inherit those changes naturally.
super().__init__(data, index=index, columns=columns, dtype=dtype, copy=copy)

elif isinstance(data, (PandasOnSparkDataFrame, SparkDataFrame)):
Expand All @@ -367,12 +366,12 @@ def __init__(

super().__init__(data, index=index, columns=columns, dtype=dtype, copy=copy)
else:
# below are not distributed dataframe types
# Below are not distributed dataframe types
if isinstance(data, gpd.GeoDataFrame):
# We can use GeoDataFrame.active_geometry_name once we drop support for geopandas < 1.0.0
# Below is the equivalent, since active_geometry_name simply calls _geometry_column_name
# We can use GeoDataFrame.active_geometry_name once we drop support for geopandas < 1.0.0.
# Below is the equivalent, since active_geometry_name simply calls _geometry_column_name.
if data._geometry_column_name:
# Geopandas stores crs as metadata instead of inside of the shapely objects so we must save it and set it manually later
# GeoPandas stores CRS as metadata instead of inside shapely objects, so we must save it and set it manually later.
if not crs:
crs = data.crs
if not geometry:
Expand All @@ -386,11 +385,11 @@ def __init__(
copy=copy,
)

# Spark complains if it's left as a geometry type
# Spark complains if it's left as a geometry type.
geom_type_cols = pd_df.select_dtypes(include=["geometry"]).columns
pd_df[geom_type_cols] = pd_df[geom_type_cols].astype(object)

# initialize the parent class pyspark Dataframe with the pandas Dataframe
# Initialize the parent class pyspark DataFrame with the pandas DataFrame.
super().__init__(
data=pd_df,
index=index,
Expand Down Expand Up @@ -420,7 +419,7 @@ def __init__(
if crs is not None:
self.set_geometry(geometry, inplace=True, crs=crs)

# No need to call set_geometry() here since it's already part of the df, just set the name
# No need to call set_geometry() here since it's already part of the df, just set the name.
self._geometry_column_name = "geometry"

if geometry is None and crs:
Expand Down Expand Up @@ -468,8 +467,8 @@ def _get_geometry(self) -> sgpd.GeoSeries:
def _set_geometry(self, col):
# This check is included in the original geopandas. Note that this prevents assigning a str to the property
# e.g. df.geometry = "geometry"
# However the user can still use specify a str in the public .set_geometry() method
# ie. df.geometry = "geometry1" errors, but df.set_geometry("geometry1") works
# However, the user can still specify a str in the public .set_geometry() method
# i.e. df.geometry = "geometry1" errors, but df.set_geometry("geometry1") works
if not pd.api.types.is_list_like(col):
raise ValueError("Must use a list-like to set the geometry property")
self.set_geometry(col, inplace=True)
Expand Down Expand Up @@ -624,10 +623,10 @@ def set_geometry(
"supported behaviour will match drop=False. To silence this "
"warning and adopt the future behaviour, stop providing "
"`drop` as a keyword to `set_geometry`. To replicate the "
"`drop=True` behaviour you should update "
"your code to\n`geo_col_name = gdf.active_geometry_name;"
" gdf.set_geometry(new_geo_col).drop("
"columns=geo_col_name).rename_geometry(geo_col_name)`."
"`drop=True` behaviour you should update your code to:\n"
"`geo_col_name = gdf.active_geometry_name; "
"gdf.set_geometry(new_geo_col).drop(columns=geo_col_name)"
".rename_geometry(geo_col_name)`."
)

if drop is False: # specifically False, not falsy i.e. None
Expand All @@ -645,7 +644,7 @@ def set_geometry(
)
)
else:
# if not dropping, set the active geometry name to the given col name
# If not dropping, set the active geometry name to the given col name.
geo_column_name = col

# This operation throws a warning to the user asking them to set pspd.set_option('compute.ops_on_diff_frames', True)
Expand All @@ -656,7 +655,7 @@ def set_geometry(

frame._geometry_column_name = geo_column_name
if new_series:
# Note: This casts GeoSeries back into pspd.Series, so we lose any metadata that's not serialized
# Note: This casts GeoSeries back into pspd.Series, so we lose any metadata that's not serialized.
frame[geo_column_name] = level

if not inplace:
Expand Down Expand Up @@ -810,15 +809,23 @@ def copy(self, deep=False) -> GeoDataFrame:
geometry value1 value2
0 POINT (1 1) 2 3
"""
# Note: The deep parameter is a dummy parameter just as it is in PySpark pandas
# Note: The deep parameter is a dummy parameter just as it is in PySpark pandas.
return GeoDataFrame(
pspd.DataFrame(self._internal.copy()), geometry=self.active_geometry_name
)

def _safe_get_crs(self):
"""
Helper method for getting the crs of the GeoDataframe safely.
Helper method for getting the CRS of the GeoDataFrame safely.

Returns None if no geometry column is set instead of raising an error.
This is useful for operations that need to check the CRS without
requiring a geometry column to be present.

Returns
-------
Any or None
The CRS of the active geometry column, or None if no geometry column is set.
"""
try:
return self.geometry.crs
Expand All @@ -831,7 +838,7 @@ def crs(self):

@crs.setter
def crs(self, value):
# Since pyspark dataframes are immutable, we can't modify in place, so we create the new geoseries and replace it
# Since PySpark DataFrames are immutable, we can't modify in place, so we create the new GeoSeries and replace it.
self.geometry = self.geometry.set_crs(value)

def set_crs(self, crs, inplace=False, allow_override=True):
Expand Down Expand Up @@ -910,7 +917,7 @@ def set_crs(self, crs, inplace=False, allow_override=True):
--------
GeoDataFrame.to_crs : re-project to another CRS
"""
# Since pyspark dataframes are immutable, we can't modify in place, so we create the new geoseries and replace it
# Since PySpark DataFrames are immutable, we can't modify in place, so we create the new GeoSeries and replace it.
new_geometry = self.geometry.set_crs(crs, allow_override=allow_override)
if inplace:
self.geometry = new_geometry
Expand Down Expand Up @@ -1169,12 +1176,11 @@ def to_json(
--------
GeoDataFrame.to_file : write GeoDataFrame to file
"""
# Because this function returns the geojson string in memory,
# we simply rely on geopandas's implementation.
# Additionally, spark doesn't seem to have a straight forward way to get the string
# without writing to a file first by using sdf.write.format("geojson").save(path, **kwargs)
# return self.to_geopandas().to_json(na, show_bbox, drop_id, to_wgs84, **kwargs)
# ST_AsGeoJSON() works only for one column
# Because this function returns the GeoJSON string in memory,
# we simply rely on GeoPandas's implementation.
# Additionally, Spark doesn't seem to have a straightforward way to get the string
# without writing to a file first by using sdf.write.format("geojson").save(path, **kwargs).
# ST_AsGeoJSON() works only for one column.
result = self.to_geopandas()
return result.to_json(na, show_bbox, drop_id, to_wgs84, **kwargs)

Expand Down Expand Up @@ -1276,8 +1282,8 @@ def to_arrow(
geometry: [[0101000000000000000000F03F0000000000000040,\
01010000000000000000000040000000000000F03F]]
"""
# Because this function returns the arrow table in memory, we simply rely on geopandas's implementation.
# This also returns a geopandas specific data type, which can be converted to an actual pyarrow table,
# Because this function returns the Arrow table in memory, we simply rely on GeoPandas's implementation.
# This also returns a GeoPandas-specific data type, which can be converted to an actual PyArrow table,
# so there is no direct Sedona equivalent. This way we also get all of the arguments implemented for free.
return self.to_geopandas().to_arrow(
index=index,
Expand Down
Loading
Loading