pyspark.sql.protobuf.functions.to_protobuf¶
-
pyspark.sql.protobuf.functions.
to_protobuf
(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, binaryDescriptorSet: Optional[bytes] = None) → pyspark.sql.column.Column[source]¶ Converts a column into binary of protobuf format. The Protobuf definition is provided in one of these ways:
- Protobuf descriptor file: E.g. a descriptor file created with
protoc –include_imports –descriptor_set_out=abc.desc abc.proto
Protobuf descriptor as binary: Rather than file path as in previous option, we can provide the binary content of the file. This allows flexibility in how the descriptor set is created and fetched.
Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.
New in version 3.4.0.
Changed in version 3.5.0: Supports binaryDescriptorSet arg to pass binary descriptor directly. Supports Spark Connect.
- Parameters
- data
Column
or str the data column.
- messageName: str, optional
the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent.
- descFilePathstr, optional
the Protobuf descriptor file.
- optionsdict, optional
- binaryDescriptorSet: bytes, optional
The Protobuf FileDescriptorSet serialized as binary.
- data
Notes
Protobuf functionality is provided as a pluggable external module
Examples
>>> import tempfile >>> data = [([(2, "Alice", 13093020)])] >>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>" >>> df = spark.createDataFrame(data, ddl_schema) >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') >>> # Writing a protobuf description into a file, generated by using >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file >>> with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select( # With file name for descriptor ... to_protobuf(df.value, message_name, desc_file_path).alias("suite")) ... proto_df.show(truncate=False) ... proto_df_2 = df.select( # With binary for descriptor ... to_protobuf(df.value, message_name, ... binaryDescriptorSet=bytearray.fromhex(desc_hex)) ... .alias("suite")) ... proto_df_2.show(truncate=False) +-------------------------------------------+ |suite | +-------------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| +-------------------------------------------+ +-------------------------------------------+ |suite | +-------------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| +-------------------------------------------+ >>> data = [([(1668035962, 2020)])] >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>" >>> df = spark.createDataFrame(data, ddl_schema) >>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp" >>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite")) >>> proto_df.show(truncate=False) +----------------------------+ |suite | +----------------------------+ |[08 FA EA B0 9B 06 10 E4 0F]| +----------------------------+