Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Index.putmask #1560

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
101 changes: 100 additions & 1 deletion databricks/koalas/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@
is_object_dtype,
)
from pandas.io.formats.printing import pprint_thing
from pyspark.sql.functions import pandas_udf

import pyspark
from pyspark import sql as spark
from pyspark.sql import functions as F, Window
from pyspark.sql.types import BooleanType, NumericType, StringType, TimestampType, IntegralType
from pyspark.sql.types import (
BooleanType,
NumericType,
StringType,
TimestampType,
IntegralType,
)

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.config import get_option, option_context
Expand All @@ -49,6 +56,7 @@
from databricks.koalas.frame import DataFrame
from databricks.koalas.missing.indexes import MissingPandasLikeIndex, MissingPandasLikeMultiIndex
from databricks.koalas.series import Series, first_series
from databricks.koalas.typedef import as_spark_type
from databricks.koalas.utils import (
compare_allow_null,
compare_disallow_null,
Expand Down Expand Up @@ -1546,6 +1554,97 @@ def argmin(self):

return sdf.orderBy(self.spark.column.asc(), F.col(sequence_col).asc()).first()[0]

def putmask(self, mask, value):
"""
Return a new Index of the values set with the mask.

.. note:: this API can be pretty expensive since it is based on
a global sequence internally.

Returns
-------
Index

Example
-------
>>> kidx = ks.Index([1, 2, 3, 4, 5])
>>> kidx
Int64Index([1, 2, 3, 4, 5], dtype='int64')

>>> kidx.putmask(kidx > 3, 100).sort_values()
Int64Index([1, 2, 3, 100, 100], dtype='int64')

>>> kidx.putmask(kidx > 3, ks.Index([100, 200, 300, 400, 500])).sort_values()
Int64Index([1, 2, 3, 400, 500], dtype='int64')
"""
scol_name = self._internal.index_spark_column_names[0]
sdf = self._internal.spark_frame.select(self.spark.column)

dist_sequence_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
sdf = InternalFrame.attach_distributed_sequence_column(
sdf, column_name=dist_sequence_col_name
)

replace_col = verify_temp_column_name(sdf, "__replace_column__")
masking_col = verify_temp_column_name(sdf, "__masking_column__")

if isinstance(value, (list, tuple, Index, Series)):
if isinstance(value, (list, tuple)):
pandas_value = pd.Series(value)
elif isinstance(value, (Index, Series)):
pandas_value = value.to_pandas()

if self.size != pandas_value.size:
# TODO: We can't support different size of value for now.
raise ValueError("value and data must be the same size")
Comment on lines +1623 to +1631
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can support for only same size, I think we shouldn't support this API for non-scalar objects for now.

Since we're using pd.Series(value) and value.to_pandas() above, It looks quite dangerous.

Copy link
Contributor

@itholic itholic Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we better support this API only for the ks.Index so that we can avoid the collect all the data into single machine.

Maybe I think we can apply almost same concept with implementation of Series.where. (https://koalas.readthedocs.io/en/latest/_modules/databricks/koalas/series.html#Series.where)

Would you tell me what do you think about this way when you available, @ueshin @HyukjinKwon ?


replace_return_type = as_spark_type(pandas_value.dtype.type)
beobest2 marked this conversation as resolved.
Show resolved Hide resolved

@pandas_udf(returnType=replace_return_type if replace_return_type else StringType())
def replace_pandas_udf(sequence):
return pandas_value[sequence]

sdf = sdf.withColumn(replace_col, replace_pandas_udf(dist_sequence_col_name))
else:
sdf = sdf.withColumn(replace_col, F.lit(value))

if isinstance(mask, (list, tuple, Index, Series)):
if isinstance(mask, (list, tuple)):
pandas_mask = pd.Series(mask)
elif isinstance(mask, (Index, Series)):
pandas_mask = mask.to_pandas()

if self.size != pandas_mask.size:
raise ValueError("mask and data must be the same size")

@pandas_udf(returnType=BooleanType())
def masking_pandas_udf(sequence):
return pandas_mask[sequence]

sdf = sdf.withColumn(masking_col, masking_pandas_udf(dist_sequence_col_name))
elif not isinstance(mask, list) and not isinstance(mask, tuple):
raise TypeError("Mask data doesn't support type " "{0}".format(type(mask).__name__))

# spark_frame here looks like below
# +-------------------------------+-----------------+------------------+------------------+
# |__distributed_sequence_column__|__index_level_0__|__replace_column__|__masking_column__|
# +-------------------------------+-----------------+------------------+------------------+
# | 0| a| 100| true|
# | 3| d| 400| false|
# | 1| b| 200| true|
# | 2| c| 300| false|
# | 4| e| 500| false|
# +-------------------------------+-----------------+------------------+------------------+

cond = F.when(scol_for(sdf, masking_col), scol_for(sdf, replace_col)).otherwise(
scol_for(sdf, scol_name)
)
sdf = sdf.select(cond.alias(scol_name))

internal = InternalFrame(spark_frame=sdf, index_map=self._internal.index_map)

return ks.DataFrame(internal).index

def set_names(self, names, level=None, inplace=False):
"""
Set Index or MultiIndex name.
Expand Down
2 changes: 0 additions & 2 deletions databricks/koalas/missing/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class MissingPandasLikeIndex(object):
is_type_compatible = _unsupported_function("is_type_compatible")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
searchsorted = _unsupported_function("searchsorted")
Expand Down Expand Up @@ -129,7 +128,6 @@ class MissingPandasLikeMultiIndex(object):
is_type_compatible = _unsupported_function("is_type_compatible")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
remove_unused_levels = _unsupported_function("remove_unused_levels")
Expand Down
30 changes: 30 additions & 0 deletions databricks/koalas/tests/test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,36 @@ def test_dropna(self):
self.assert_eq(kidx.dropna(), pidx.dropna())
self.assert_eq((kidx + 1).dropna(), (pidx + 1).dropna())

def test_putmask(self):
pidx = pd.Index(["a", "b", "c", "d", "e"])
kidx = ks.from_pandas(pidx)

self.assert_eq(
kidx.putmask(kidx < "c", "k").sort_values(), pidx.putmask(pidx < "c", "k").sort_values()
)
self.assert_eq(
kidx.putmask(kidx < "c", ["g", "h", "i", "j", "k"]).sort_values(),
pidx.putmask(pidx < "c", ["g", "h", "i", "j", "k"]).sort_values(),
)
self.assert_eq(
kidx.putmask(kidx < "c", ("g", "h", "i", "j", "k")).sort_values(),
pidx.putmask(pidx < "c", ("g", "h", "i", "j", "k")).sort_values(),
)
self.assert_eq(
kidx.putmask(kidx < "c", ks.Index(["g", "h", "i", "j", "k"])).sort_values(),
pidx.putmask(pidx < "c", pd.Index(["g", "h", "i", "j", "k"])).sort_values(),
)
self.assert_eq(
kidx.putmask(kidx < "c", ks.Series(["g", "h", "i", "j", "k"])).sort_values(),
pidx.putmask(pidx < "c", pd.Series(["g", "h", "i", "j", "k"])).sort_values(),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the length of value is not same as the index length? Could you add the tests?

Copy link
Contributor Author

@beobest2 beobest2 Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin Thanks for the comment! I will address it as you comments. :)

Copy link
Contributor Author

@beobest2 beobest2 Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin
If the length of the mask in the pandas is different, ValueError is raised.

>>> pidx
Index(['a', 'b', 'c', 'd', 'e'], dtype='object')
>>> pidx.putmask([True, False], pd.Series(["g", "h", "i", "j", "k"])).sort_values()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hwpark/Desktop/git_koalas/venv/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 4041, in putmask
    raise err
  File "/Users/hwpark/Desktop/git_koalas/venv/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 4037, in putmask
    np.putmask(values, mask, self._convert_for_op(value))
  File "<__array_function__ internals>", line 6, in putmask
ValueError: putmask: mask and data must be the same size

So I fixed Koalas to raise the same error as well.

>>> kidx.putmask([True, False], ks.Series(["g", "h", "i", "j", "k"])).sort_values()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hwpark/Desktop/git_koalas/koalas/databricks/koalas/indexes.py", line 1612, in putmask
    raise ValueError("mask and data must be the same size")
ValueError: mask and data must be the same size

If the value ​​have different length in pandas, it works like this:

>>> pidx
Index(['a', 'b', 'c', 'd', 'e'], dtype='object')
>>> pidx.putmask(pidx > 'c', pd.Series(["g", "h"])).sort_values()
Index(['a', 'b', 'c', 'g', 'h'], dtype='object')
>>> pidx.putmask(pidx < 'c', pd.Series(["g", "h"])).sort_values()
Index(['c', 'd', 'e', 'g', 'h'], dtype='object')
>>> pidx.putmask(pidx < 'c', pd.Series(["g"])).sort_values()
Index(['c', 'd', 'e', 'g', 'g'], dtype='object')
>>> pidx.putmask([True, False, True, False, True], pd.Series(["g", "h"])).sort_values()
Index(['b', 'd', 'g', 'g', 'g'], dtype='object')

I thought the behavior of Pandas was ambiguous, so I left the comments at line 1593 for now.

# TODO: We can't support different size of value for now.


with self.assertRaisesRegexp(ValueError, "value and data must be the same size"):
kidx.putmask(kidx < "c", ks.Series(["g", "h"]))

with self.assertRaisesRegexp(ValueError, "mask and data must be the same size"):
kidx.putmask([True, False], ks.Series(["g", "h", "i", "j", "k"]))

def test_index_symmetric_difference(self):
pidx1 = pd.Index([1, 2, 3, 4])
pidx2 = pd.Index([2, 3, 4, 5])
Expand Down