public class ParquetRelation2 extends BaseRelation implements CatalystScan, InsertableRelation, SparkHadoopMapReduceUtil, Logging, scala.Product, scala.Serializable
ParquetRelation
that plugs in using the data sources API. This class is
intended as a full replacement of the Parquet support in Spark SQL. The old implementation will
be deprecated and eventually removed once this version is proved to be stable enough.
Compared with the old implementation, this class has the following notable differences:
- Partitioning discovery: Hive style multi-level partitions are auto discovered.
- Metadata discovery: Parquet is a format comes with schema evolving support. This data source
can detect and merge schemas from all Parquet part-files as long as they are compatible.
Also, metadata and FileStatus
es are cached for better performance.
- Statistics: Statistics for the size of the table are automatically populated during schema
discovery.
Modifier and Type | Class and Description |
---|---|
static class |
ParquetRelation2.PartitionValues |
static class |
ParquetRelation2.PartitionValues$ |
Constructor and Description |
---|
ParquetRelation2(scala.collection.Seq<String> paths,
scala.collection.immutable.Map<String,String> parameters,
scala.Option<org.apache.spark.sql.types.StructType> maybeSchema,
scala.Option<PartitionSpec> maybePartitionSpec,
SQLContext sqlContext) |
Modifier and Type | Method and Description |
---|---|
RDD<org.apache.spark.sql.Row> |
buildScan(scala.collection.Seq<org.apache.spark.sql.catalyst.expressions.Attribute> output,
scala.collection.Seq<org.apache.spark.sql.catalyst.expressions.Expression> predicates) |
static String |
DEFAULT_PARTITION_NAME() |
boolean |
equals(Object other) |
static org.apache.spark.sql.catalyst.expressions.Literal |
inferPartitionColumnValue(String raw,
String defaultPartitionName)
Converts a string to a
Literal with automatic type inference. |
void |
insert(DataFrame data,
boolean overwrite) |
boolean |
isPartitioned() |
scala.Option<PartitionSpec> |
maybePartitionSpec() |
scala.Option<org.apache.spark.sql.types.StructType> |
maybeSchema() |
static String |
MERGE_SCHEMA() |
static org.apache.spark.sql.types.StructType |
mergeMetastoreParquetSchema(org.apache.spark.sql.types.StructType metastoreSchema,
org.apache.spark.sql.types.StructType parquetSchema)
Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
schema and Parquet schema.
|
static String |
METASTORE_SCHEMA() |
scala.collection.immutable.Map<String,String> |
parameters() |
static ParquetRelation2.PartitionValues |
parsePartition(org.apache.hadoop.fs.Path path,
String defaultPartitionName)
Parses a single partition, returns column names and values of each partition column.
|
static PartitionSpec |
parsePartitions(scala.collection.Seq<org.apache.hadoop.fs.Path> paths,
String defaultPartitionName)
Given a group of qualified paths, tries to parse them and returns a partition specification.
|
org.apache.spark.sql.types.StructType |
partitionColumns() |
scala.collection.Seq<Partition> |
partitions() |
PartitionSpec |
partitionSpec() |
scala.collection.Seq<String> |
paths() |
static scala.Option<org.apache.spark.sql.types.StructType> |
readSchema(scala.collection.Seq<parquet.hadoop.Footer> footers,
SQLContext sqlContext) |
static scala.collection.Seq<ParquetRelation2.PartitionValues> |
resolvePartitions(scala.collection.Seq<ParquetRelation2.PartitionValues> values)
Resolves possible type conflicts between partitions by up-casting "lower" types.
|
org.apache.spark.sql.types.StructType |
schema() |
long |
sizeInBytes()
Returns an estimated size of this relation in bytes.
|
SparkContext |
sparkContext() |
SQLContext |
sqlContext() |
getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
firstAvailableClass, newJobContext, newTaskAttemptContext, newTaskAttemptID
initializeIfNecessary, initializeLogging, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public ParquetRelation2(scala.collection.Seq<String> paths, scala.collection.immutable.Map<String,String> parameters, scala.Option<org.apache.spark.sql.types.StructType> maybeSchema, scala.Option<PartitionSpec> maybePartitionSpec, SQLContext sqlContext)
public static String MERGE_SCHEMA()
public static String DEFAULT_PARTITION_NAME()
public static String METASTORE_SCHEMA()
public static scala.Option<org.apache.spark.sql.types.StructType> readSchema(scala.collection.Seq<parquet.hadoop.Footer> footers, SQLContext sqlContext)
public static org.apache.spark.sql.types.StructType mergeMetastoreParquetSchema(org.apache.spark.sql.types.StructType metastoreSchema, org.apache.spark.sql.types.StructType parquetSchema)
Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't distinguish binary and string). This method generates a correct schema by merging Metastore schema data types and Parquet schema field names.
public static PartitionSpec parsePartitions(scala.collection.Seq<org.apache.hadoop.fs.Path> paths, String defaultPartitionName)
hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
it returns:
PartitionSpec(
partitionColumns = StructType(
StructField(name = "a", dataType = IntegerType, nullable = true),
StructField(name = "b", dataType = StringType, nullable = true),
StructField(name = "c", dataType = DoubleType, nullable = true)),
partitions = Seq(
Partition(
values = Row(1, "hello", 3.14),
path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
Partition(
values = Row(2, "world", 6.28),
path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
public static ParquetRelation2.PartitionValues parsePartition(org.apache.hadoop.fs.Path path, String defaultPartitionName)
path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
it returns:
PartitionValues(
Seq("a", "b", "c"),
Seq(
Literal(42, IntegerType),
Literal("hello", StringType),
Literal(3.14, FloatType)))
public static scala.collection.Seq<ParquetRelation2.PartitionValues> resolvePartitions(scala.collection.Seq<ParquetRelation2.PartitionValues> values)
NullType ->
IntegerType -> LongType ->
FloatType -> DoubleType -> DecimalType.Unlimited ->
StringType
public static org.apache.spark.sql.catalyst.expressions.Literal inferPartitionColumnValue(String raw, String defaultPartitionName)
Literal
with automatic type inference. Currently only supports
IntegerType
, LongType
, FloatType
, DoubleType
, DecimalType.Unlimited
, and
StringType
.public scala.collection.Seq<String> paths()
public scala.collection.immutable.Map<String,String> parameters()
public scala.Option<org.apache.spark.sql.types.StructType> maybeSchema()
public scala.Option<PartitionSpec> maybePartitionSpec()
public SQLContext sqlContext()
sqlContext
in class BaseRelation
public boolean equals(Object other)
equals
in interface scala.Equals
equals
in class Object
public SparkContext sparkContext()
public PartitionSpec partitionSpec()
public org.apache.spark.sql.types.StructType partitionColumns()
public scala.collection.Seq<Partition> partitions()
public boolean isPartitioned()
public org.apache.spark.sql.types.StructType schema()
schema
in class BaseRelation
public long sizeInBytes()
BaseRelation
Note that it is always better to overestimate size than underestimate, because underestimation could lead to execution plans that are suboptimal (i.e. broadcasting a very large table).
sizeInBytes
in class BaseRelation
public RDD<org.apache.spark.sql.Row> buildScan(scala.collection.Seq<org.apache.spark.sql.catalyst.expressions.Attribute> output, scala.collection.Seq<org.apache.spark.sql.catalyst.expressions.Expression> predicates)
buildScan
in interface CatalystScan
public void insert(DataFrame data, boolean overwrite)
insert
in interface InsertableRelation