pandas_on_spark.
transform_batch
Transform chunks with a function that takes pandas DataFrame and outputs pandas DataFrame. The pandas DataFrame given to the function is of a batch used internally. The length of each input and output should be the same.
See also Transform and apply a function.
Note
the func is unable to access the whole input frame. pandas-on-Spark internally splits the input series into multiple batches and calls func with each batch multiple times. Therefore, operations such as global aggregations are impossible. See the example below.
>>> # This case does not return the length of whole frame but of the batch internally ... # used. ... def length(pdf) -> ps.DataFrame[int]: ... return pd.DataFrame([len(pdf)] * len(pdf)) ... >>> df = ps.DataFrame({'A': range(1000)}) >>> df.pandas_on_spark.transform_batch(length) c0 0 83 1 83 2 83 ...
this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting.
To avoid this, specify return type in func, for instance, as below:
func
>>> def plus_one(x) -> ps.DataFrame[int, [float, float]]: ... return x + 1
If the return type is specified, the output column names become c0, c1, c2 … cn. These names are positionally mapped to the returned DataFrame in func.
To specify the column names, you can assign them in a NumPy compound type style as below:
>>> def plus_one(x) -> ps.DataFrame[("index", int), [("a", float), ("b", float)]]: ... return x + 1
>>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 4, 5]}) >>> def plus_one(x) -> ps.DataFrame[ ... (pdf.index.name, pdf.index.dtype), zip(pdf.dtypes, pdf.columns)]: ... return x + 1
Function to transform each pandas frame.
Positional arguments to pass to func.
Keyword arguments to pass to func.
See also
DataFrame.pandas_on_spark.apply_batch
For row/columnwise operations.
Series.pandas_on_spark.transform_batch
transform the search as each pandas chunks.
Examples
>>> df = ps.DataFrame([(1, 2), (3, 4), (5, 6)], columns=['A', 'B']) >>> df A B 0 1 2 1 3 4 2 5 6
>>> def plus_one_func(pdf) -> ps.DataFrame[int, [int, int]]: ... return pdf + 1 >>> df.pandas_on_spark.transform_batch(plus_one_func) c0 c1 0 2 3 1 4 5 2 6 7
>>> def plus_one_func(pdf) -> ps.DataFrame[("index", int), [('A', int), ('B', int)]]: ... return pdf + 1 >>> df.pandas_on_spark.transform_batch(plus_one_func) A B index 0 2 3 1 4 5 2 6 7
>>> def plus_one_func(pdf) -> ps.Series[int]: ... return pdf.B + 1 >>> df.pandas_on_spark.transform_batch(plus_one_func) 0 3 1 5 2 7 dtype: int64
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.pandas_on_spark.transform_batch(lambda pdf: pdf + 1) A B 0 2 3 1 4 5 2 6 7
>>> (df * -1).pandas_on_spark.transform_batch(abs) A B 0 1 2 1 3 4 2 5 6
Note that you should not transform the index. The index information will not change.
>>> df.pandas_on_spark.transform_batch(lambda pdf: pdf.B + 1) 0 3 1 5 2 7 Name: B, dtype: int64
You can also specify extra arguments as below.
>>> df.pandas_on_spark.transform_batch(lambda pdf, a, b, c: pdf.B + a + b + c, 1, 2, c=3) 0 8 1 10 2 12 Name: B, dtype: int64