12.3. Hive Connector
Overview
The Hive connector allows querying data stored in a Hive data warehouse. Hive is a combination of three components:
- Data files in varying formats that are typically stored in the Hadoop Distributed File System (HDFS) or in Amazon S3.
- Metadata about how the data files are mapped to schemas and tables. This metadata is stored in a database such as MySQL and is accessed via the Hive metastore service.
- A query language called HiveQL. This query language is executed on a distributed computing framework such as MapReduce or Tez.
Presto only uses the first two components: the data and the metadata. It does not use HiveQL or any part of Hive’s execution environment.
Supported File Types
The following file types are supported for the Hive connector:
- ORC
- Parquet
- Avro
- RCFile
- SequenceFile
- JSON
- Text
Configuration
The Hive connector supports Apache Hadoop 2.x and derivative distributions including Cloudera CDH 5 and Hortonworks Data Platform (HDP).
Create ~/.prestoadmin/catalog/hive.properties
with the following
contents to mount the hive-hadoop2
connector as the hive
catalog, replacing example.net:9083
with the correct host and port
for your Hive metastore Thrift service:
connector.name=hive-hadoop2
hive.metastore.uri=thrift://example.net:9083
Use presto-admin
to deploy the connector file. See Adding a Catalog.
Multiple Hive Clusters
You can have as many catalogs as you need, so if you have additional
Hive clusters, simply add another properties file to ~/.prestoadmin/catalog
with a different name (making sure it ends in .properties
). For
example, if you name the property file sales.properties
, Presto
will create a catalog named sales
using the configured connector.
HDFS Configuration
For basic setups, Presto configures the HDFS client automatically and
does not require any configuration files. In some cases, such as when using
federated HDFS or NameNode high availability, it is necessary to specify
additional HDFS client options in order to access your HDFS cluster. To do so,
add the hive.config.resources
property to reference your HDFS config files:
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
Only specify additional configuration files if necessary for your setup. We also recommend reducing the configuration files to have the minimum set of required properties, as additional properties may cause problems.
The configuration files must exist on all Presto nodes. If you are referencing existing Hadoop config files, make sure to copy them to any Presto nodes that are not running Hadoop.
HDFS Username
When not using Kerberos with HDFS, Presto will access HDFS using the
OS user of the Presto process. For example, if Presto is running as
nobody
, it will access HDFS as nobody
. You can override this
username by setting the HADOOP_USER_NAME
system property in the
Presto JVM Config, replacing hdfs_user
with the
appropriate username:
-DHADOOP_USER_NAME=hdfs_user
Accessing Hadoop clusters protected with Kerberos authentication
Kerberos authentication is supported for both HDFS and the Hive metastore. However, Kerberos authentication by ticket cache is not yet supported.
The properties that apply to Hive connector security are listed in the Hive Configuration Properties table. Please see the Hive Security Configuration section for a more detailed discussion of the security options in the Hive connector.
HDFS Permissions
Before running any CREATE TABLE
or CREATE TABLE ... AS
statements
for Hive tables in Presto, you need to check that the operating system user
running the Presto server has access to the Hive warehouse directory on HDFS. The Hive warehouse
directory is specified by the configuration variable hive.metastore.warehouse.dir
in hive-site.xml
, and the default value is /user/hive/warehouse
. If that
is not the case, either add the following to jvm.config
on all of the nodes:
-DHADOOP_USER_NAME=USER
, where USER
is an operating system user that has proper
permissions for the Hive warehouse directory, or start the Presto server as a user with
similar permissions. The hive
user generally works as USER
, since Hive is often
started with the hive
user. If you run into HDFS permissions problems on
CREATE TABLE ... AS
, remove /tmp/presto-*
on HDFS, fix the user as described
above, then restart all of the Presto servers.
Hive Configuration Properties
Property Name | Description | Default |
---|---|---|
hive.metastore.uri |
The URI(s) of the Hive metastore to connect to using the
Thrift protocol. If multiple URIs are provided, the first
URI is used by default and the rest of the URIs are
fallback metastores. This property is required.
Example: thrift://192.0.2.3:9083 or
thrift://192.0.2.3:9083,thrift://192.0.2.4:9083 |
|
hive.config.resources |
An optional comma-separated list of HDFS
configuration files. These files must exist on the
machines running Presto. Only specify this if
absolutely necessary to access HDFS.
Example: /etc/hdfs-site.xml |
|
hive.storage-format |
The default file format used when creating new tables. | RCBINARY |
hive.compression-codec |
The compression codec to use when writing files. | GZIP |
hive.force-local-scheduling |
See tuning section | false |
hive.respect-table-format |
Should new partitions be written using the existing table format or the default Presto format? | true |
hive.immutable-partitions |
Can new data be inserted into existing partitions? | false |
hive.max-partitions-per-writers |
Maximum number of partitions per writer. | 100 |
hive.metastore.authentication.type |
Hive metastore authentication type.
Possible values are NONE or KERBEROS . |
NONE |
hive.metastore.service.principal |
The Kerberos principal of the Hive metastore service. | |
hive.metastore.client.principal |
The Kerberos principal that Presto will use when connecting to the Hive metastore service. | |
hive.metastore.client.keytab |
Hive metastore client keytab location. | |
hive.hdfs.authentication.type |
HDFS authentication type.
Possible values are NONE or KERBEROS . |
NONE |
hive.hdfs.impersonation.enabled |
Enable HDFS end user impersonation. | false |
hive.hdfs.presto.principal |
The Kerberos principal that Presto will use when connecting to HDFS. | |
hive.hdfs.presto.keytab |
HDFS client keytab location. | |
hive.security |
See Hive Security Configuration. | |
security.config-file |
Path of config file to use when hive.security=file .
See File Based Authorization for details. |
|
hive.multi-file-bucketing.enabled |
Enable support for multiple files per bucket for Hive clustered tables. See Clustered Hive tables support | false |
hive.empty-bucketed-partitions.enabled |
Enable support for clustered tables with empty partitions. See Clustered Hive tables support | false |
Amazon S3 Configuration
The Hive Connector can read and write tables that are stored in S3. This is accomplished by having a table or database location that uses an S3 prefix rather than an HDFS prefix.
Presto uses its own S3 filesystem for the URI prefixes
s3://
, s3n://
and s3a://
.
S3 Configuration Properties
Property Name | Description |
---|---|
hive.s3.use-instance-credentials |
Use the EC2 metadata service to retrieve API credentials
(defaults to true ). This works with IAM roles in EC2. |
hive.s3.aws-access-key |
Default AWS access key to use. |
hive.s3.aws-secret-key |
Default AWS secret key to use. |
hive.s3.endpoint |
The S3 storage endpoint server. This can be used to
connect to an S3-compatible storage system instead
of AWS. When using v4 signatures, it is recommended to
set this to the AWS region-specific endpoint
(e.g., http[s]://<bucket>.s3-<AWS-region>.amazonaws.com ). |
hive.s3.signer-type |
Specify a different signer type for S3-compatible storage.
Example: S3SignerType for v2 signer type |
hive.s3.staging-directory |
Local staging directory for data written to S3.
This defaults to the Java temporary directory specified
by the JVM system property java.io.tmpdir . |
hive.s3.pin-client-to-current-region |
Pin S3 requests to the same region as the EC2
instance where Presto is running (defaults to false ). |
hive.s3.ssl.enabled |
Use HTTPS to communicate with the S3 API (defaults to true ). |
hive.s3.sse.enabled |
Use S3 server-side encryption (defaults to false ). |
hive.s3.sse.type |
The type of key management for S3 server-side encryption.
Use S3 for S3 managed or KMS for KMS-managed keys
(defaults to S3 ). |
hive.s3.ss3.kms-key-id |
The KMS Key ID to use for S3 server-side encryption with KMS-managed keys. If not set, the default key is used. |
hive.s3.kms-key-id |
If set, use S3 client-side encryption and use the AWS KMS to store encryption keys and use the value of this property as the KMS Key ID for newly created objects. |
hive.s3.encryption-materials-provider |
If set, use S3 client-side encryption and use the
value of this property as the fully qualified name of
a Java class which implements the AWS SDK’s
EncryptionMaterialsProvider interface. If the
class also implements Configurable from the Hadoop
API, the Hadoop configuration will be passed in after
the object has been created. |
S3 Credentials
If you are running Presto on Amazon EC2 using EMR or another facility,
it is highly recommended that you set hive.s3.use-instance-credentials
to true
and use IAM Roles for EC2 to govern access to S3. If this is
the case, your EC2 instances will need to be assigned an IAM Role which
grants appropriate access to the data stored in the S3 bucket(s) you wish
to use. This is much cleaner than setting AWS access and secret keys in
the hive.s3.aws-access-key
and hive.s3.aws-secret-key
settings, and also
allows EC2 to automatically rotate credentials on a regular basis without
any additional work on your part.
Custom S3 Credentials Provider
You can configure a custom S3 credentials provider by setting the Hadoop
configuration property presto.s3.credentials-provider
to be the
fully qualified class name of a custom AWS credentials provider
implementation. This class must implement the
AWSCredentialsProvider
interface and provide a two-argument constructor that takes a
java.net.URI
and a Hadoop org.apache.hadoop.conf.Configuration
as arguments. A custom credentials provider can be used to provide
temporary credentials from STS (using STSSessionCredentialsProvider
),
IAM role-based credentials (using STSAssumeRoleSessionCredentialsProvider
),
or credentials for a specific use case (e.g., bucket/user specific credentials).
This Hadoop configuration property must be set in the Hadoop configuration
files referenced by the hive.config.resources
Hive connector property.
Tuning Properties
The following tuning properties affect the behavior of the client
used by the Presto S3 filesystem when communicating with S3.
Most of these parameters affect settings on the ClientConfiguration
object associated with the AmazonS3Client
.
Property Name | Description | Default |
---|---|---|
hive.s3.max-error-retries |
Maximum number of error retries, set on the S3 client. | 10 |
hive.s3.max-client-retries |
Maximum number of read attempts to retry. | 3 |
hive.s3.max-backoff-time |
Use exponential backoff starting at 1 second up to this maximum value when communicating with S3. | 10 minutes |
hive.s3.max-retry-time |
Maximum time to retry communicating with S3. | 10 minutes |
hive.s3.connect-timeout |
TCP connect timeout. | 5 seconds |
hive.s3.socket-timeout |
TCP socket read timeout. | 5 seconds |
hive.s3.max-connections |
Maximum number of simultaneous open connections to S3. | 500 |
hive.s3.multipart.min-file-size |
Minimum file size before multi-part upload to S3 is used. | 16 MB |
hive.s3.multipart.min-part-size |
Minimum multi-part upload part size. | 5 MB |
S3 Data Encryption
Presto supports reading and writing encrypted data in S3 using both server-side encryption with S3 managed keys and client-side encryption using either the Amazon KMS or a software plugin to manage AES encryption keys.
With S3 server-side encryption,
(called SSE-S3 in the Amazon documentation) the S3 infrastructure takes care of all encryption and decryption
work (with the exception of SSL to the client, assuming you have hive.s3.ssl.enabled
set to true
).
S3 also manages all the encryption keys for you. To enable this, set hive.s3.sse.enabled
to true
.
With S3 client-side encryption,
S3 stores encrypted data and the encryption keys are managed outside of the S3 infrastructure. Data is encrypted
and decrypted by Presto instead of in the S3 infrastructure. In this case, encryption keys can be managed
either by using the AWS KMS or your own key management system. To use the AWS KMS for key management, set
hive.s3.kms-key-id
to the UUID of a KMS key. Your AWS credentials or EC2 IAM role will need to be
granted permission to use the given key as well.
To use a custom encryption key management system, set hive.s3.encryption-materials-provider
to the
fully qualified name of a class which implements the
EncryptionMaterialsProvider
interface from the AWS Java SDK. This class will have to be accessible to the Hive Connector through the
classpath and must be able to communicate with your custom key management system. If this class also implements
the org.apache.hadoop.conf.Configurable
interface from the Hadoop Java API, then the Hadoop configuration
will be passed in after the object instance is created and before it is asked to provision or retrieve any
encryption keys.
Schema Evolution
Hive allows the partitions in a table to have a different schema than the table. This occurs when the column types of a table are changed after partitions already exist (that use the original column types). The Hive connector supports this by allowing the same conversions as Hive:
varchar
to and fromtinyint
,smallint
,integer
andbigint
real
todouble
- Widening conversions for integers, such as
tinyint
tosmallint
Any conversion failure will result in null, which is the same behavior
as Hive. For example, converting the string 'foo'
to a number,
or converting the string '1234'
to a tinyint
(which has a
maximum value of 127
).
Examples
The Hive connector supports querying and manipulating Hive tables and schemas (databases). While some uncommon operations will need to be performed using Hive directly, most operations can be performed using Presto.
Create a new Hive schema named web
that will store tables in an
S3 bucket named my-bucket
:
CREATE SCHEMA hive.web
WITH (location = 's3://my-bucket/')
Create a new Hive table named page_views
in the web
schema
that is stored using the ORC file format, partitioned by date and
country, and bucketed by user into 50
buckets (note that Hive
requires the partition columns to be the last columns in the table):
CREATE TABLE hive.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioned_by = ARRAY['ds', 'country'],
bucketed_by = ARRAY['user_id'],
bucket_count = 50
)
Drop a partition from the page_views
table:
DELETE FROM hive.web.page_views
WHERE ds = DATE '2016-08-09'
AND country = 'US'
Query the page_views
table:
SELECT * FROM hive.web.page_views
Create an external Hive table named request_logs
that points at
existing data in S3:
CREATE TABLE hive.web.request_logs (
request_time timestamp,
url varchar,
ip varchar,
user_agent varchar
)
WITH (
format = 'TEXTFILE',
external_location = 's3://my-bucket/data/logs/'
)
Drop the external table request_logs
. This only drops the metadata
for the table. The referenced data directory is not deleted:
DROP TABLE hive.web.request_logs
Drop a schema:
DROP SCHEMA hive.web
Tuning
The following configuration properties may have an impact on connector performance:
hive.assume-canonical-partition-keys
- Type:
Boolean
- Default value:
false
- Description:
Enable optimized metastore partition fetching for non-string partition keys. Setting this property allows to filter non-string partition keys while reading them from hive, based on the assumption that they are stored in canonical (java) format. This is disabled by default as hive allows to use non-canonical format as well (eg. boolean valuefalse
may be represented as0
,false
,False
and more). Used correctly this property may drastically improve read time by reducing number of partition loaded from hive. Setting this property for non-canonical data format may cause erratic behavior.
hive.domain-compaction-threshold
- Type:
Integer
(at least1
)- Default value:
100
- Description:
Maximum number of ranges/values allowed while reading hive data without compacting it. A higher value will cause more data fragmentation but allow the use of the row skipping feature when reading ORC data. Increasing this value may have a large impact onIN
andOR
clause performance in scenarios making use of row skipping.
hive.force-local-scheduling
- Type:
Boolean
- Default value:
false
- Description:
Force splits to be scheduled on the same node (ignoring normal node selection procedures) as the Hadoop DataNode process serving the split data. This is useful for installations where Presto is collocated with every DataNode and may decrease queries time significantly. The drawback may be that if some data are accessed more often, the utilization of some nodes may be low even if the whole system is heavy loaded. See also node-scheduler.network-topology if less strict constrain is preferred - especially if some nodes are overloaded and other are not fully utilized.
hive.max-initial-split-size
- Type:
String
(data size)- Default value:
hive.max-split-size
/2
(32 MB
)- Description:
This property describes the maximum size of the firsthive.max-initial-splits
splits created for a query. the logic behind initial splits is described inhive.max-initial-splits
. Lower values will increase concurrency for small queries. This property represents the maximum size, as the real size may be lower when the amount of data to read is less thanhive.max-initial-split-size
(e.g. at the end of a block on a DataNode).
hive.max-initial-splits
- Type:
Integer
- Default value:
200
- Description:
This property describes how many splits may be initially created for a single query usinghive.max-initial-split-size
instead ofhive.max-split-size
. A higher value will force more splits to have a smaller size (hive.max-initial-splits
is expected to be smaller thanhive.max-split-size
), effectively increasing the definition of what is considered a “small query”. The purpose of the smaller split size for the initial splits is to increase concurrency for smaller queries.
hive.max-outstanding-splits
- Type:
Integer
(at least1
)- Default value:
1000
- Description:
Limit on the nubmer of splits waiting to be served by a split source. After reaching this limit, writers will stop writing new splits until some of hteme are used by workers. Higher values will increase memory usage, but allow IO to be concentrated at one time, which may be faster and increase resource utilization.
hive.max-partitions-per-writers
- Type:
Integer
(at least1
)- Default value:
100
- Description:
Maximum number of partitions per writer. A query will fail if it requires more partitions per writer than allowed by this property. It can be helpful to have queries beyond the expected maximum partitions to fail to help with error detection. Also it may allow to preactivly avoid out of memory problem.
hive.max-split-iterator-threads
- Type:
Integer
(at least1
)- Default value:
1000
- Description:
This property describes how many threads may be used to iterate through splits when loading them to the worker nodes. A higher value may increase parallelism, but increased concurrency may cause too much time to be spent on context switching.
hive.max-split-size
- Type:
String
(data size)- Default value:
64 MB
- Description:
The maximum size of splits created after the initial splits. The logic for initial splits is described inhive.max-initial-splits
. A higher value will reduce parallelism. This may be desirable for very large queries and a stable cluster because it allows for more efficient processing of local data without the context switching, synchronization and data collection that result from parallelization. The optimal value should be aligned with the average query size in the system.
hive.metastore.partition-batch-size.max
- Type:
Integer
(at least1
)- Default value:
100
- Description:
This together withhive.metastore.partition-batch-size.min
defines the range of partition sizes read from Hive. The first partition is always of sizehive.metastore.partition-batch-size.min
and each following partition is two times bigger than previous up tohive.mestastore.partition-batch-size.max
(the formula for partition sizen
is min(hive.metastore.partition-batch-size.max
, (2``^``n
) *hive.metastore.partition-batch-size.min
)). This algorithm allows for live adjustment of partition size according to the processing requirements. If the queries in the system will differ significantly from each other in size, then this range should be extended to better adjust to processing requirements. If the queries in the system will mostly be of the same size, then setting both values to the same maximally tuned value may give a slight edge in processing time.
hive.metastore.partition-batch-size.min
- Type:
Integer
(at least1
)- Default value:
10
- Description:
Seehive.metastore.partition-batch-size.max
.
hive.orc.max-buffer-size
- Type:
String
(data size)- Default value:
8 MB
- Description:
Serves as default value fororc_max_buffer_size
session properties defining max size of ORC read operators. Higher value will allow bigger chunks to be processed but will decrease concurrency level.
hive.orc.max-merge-distance
- Type:
String
(data size)- Default value:
1 MB
- Description:
Serves as the default value for theorc_max_merge_distance
session property. Two reads from an ORC file may be merged into a single read if the distance between the requested data ranges in the data source is less than or equal to this value.
hive.orc.stream-buffer-size
- Type:
String
(data size)- Default value:
8 MB
- Description:
Serves as the default value for theorc_max_buffer_size
session property. It defines the maximum size of ORC read operators. A higher value will allow bigger chunks to be processed, but will decrease concurrency.
hive.orc.use-column-names
- Type:
Boolean
- Default value:
false
- Description:
Access ORC columns using names from the file. By default, Hive access columns in ORC files using the order recoded in the Hive metastore. Setting this property allows to use columns names recorded in the ORC file instead.
hive.parquet-optimized-reader.enabled
- Type:
Boolean
- Default value:
false
- Description:
Deprecated Serves as default value forparquet_optimized_reader_enabled
session property. Enables number of reader improvements introduced by alternative parquet implementation. The new reader supports vectorized reads, lazy loading, and predicate push down, all of which make the reader more efficient and typically reduces wall clock time for a query. However as the code has changed significantly it may or may not introduce some minor issues, so it can be disabled if some problems with environment are noticed. This property enables/disables all optimizations except predicate push down as it is managed byhive.parquet-predicate-pushdown.enabled
property.
hive.parquet-predicate-pushdown.enabled
- Type:
Boolean
- Default value:
false
- Description:
Deprecated Serves as default value forparquet_predicate_pushdown_enabled
sesssion property. See hive.parquet-optimized-reader.enabled.
hive.parquet.use-column-names
- Type:
Boolean
- Default value:
false
- Description:
Access Parquet columns using names from the file. By default, columns in Parquet files are accessed by their ordinal position in the Hive metastore. Setting this property allows access by column name recorded in the Parquet file instead.
hive.s3.max-connections
- Type:
Integer
(at least1
)- Default value:
500
- Description:
The maximum number of connections to S3 that may be open at a time by the S3 driver. A higher value may increase network utilization when a cluster is used on a high speed network. However, a higher values relies more on S3 servers being well configured for high parallelism.
hive.s3.multipart.min-file-size
- Type:
String
(data size, at least16 MB
)- Default value:
16 MB
- Description:
This property describes how big a file must be to be uploaded to an S3 cluster using the multipart upload feature. Amazon recommends using100 MB
, but a lower value may increase upload parallelism and decrease thedata lost
/data sent
ratio in unstable network conditions.
hive.s3.multipart.min-part-size
- Type:
String
(data size, at least5 MB
)- Default value:
5 MB
- Description:
Defines the minimum part size for upload parts. Decreasing the minimum part size causes multipart uploads to be split into a larger number of smaller parts. Setting this value too low has a negative effect on transfer speeds, causing extra latency and network communication for each part.
There are also following session properties allowing to control connector behavior on single query basis:
orc_max_buffer_size
- Type:
String
(data size)- Default value:
hive.orc.max-buffer-size
(8 MB
)- Description:
orc_max_merge_distance
- Type:
String
(data size)- Default value:
hive.orc.max-merge-distance
(1 MB
)- Description:
orc_stream_buffer_size
- Type:
String
(data size)- Default value:
hive.orc.max-buffer-size
(8 MB
)- Description:
Clustered Hive tables support
By default Presto supports only one data file per bucket per partition for clustered tables (Hive tables declared with CLUSTERED BY
clause).
If number of files does not match number of buckets exception would be thrown.
To enable support for cases where there are more than one file per bucket, when multiple INSERTs were done to a single partition of the clustered table, you can use:
hive.multi-file-bucketing.enabled
config propertymulti_file_bucketing_enabled
session property (usingSET SESSION <connector_name>.multi_file_bucketing_enabled
)
Config property changes behaviour globally and session property can be used on per query basis. The default value of session property is taken from config property.
If support for multiple files per bucket is enabled Presto will group the files in partition directory. It will sort filenames lexicographically. Then it will treat part of filename up to first underscore character as bucket key. This pattern matches naming convention of files in directory when Hive is used to inject data into table.
Presto will still validate if number of file groups matches number of buckets declared for table and fail if it does not.
Similarly by default empty partitions (partitions with no files) are not allowed for clustered Hive tables. To enable support for empty paritions you can use:
hive.empty-bucketed-partitions.enabled
config propertyempty_bucketed_partitions_enabled
session property (usingSET SESSION <connector_name>.empty_bucketed_partitions_enabled
)
Hive Connector Limitations
DELETE is only supported if the WHERE
clause matches entire partitions.