diff --git a/.travis.yml b/.travis.yml
index 530e56db43ca..11e283840ca4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,27 +28,43 @@ group: deprecated-2017Q3
cache:
directories:
- $HOME/.m2/repository
+ - $HOME/.thrift
services:
- docker
+before_install:
+ - |
+ if [[ ! -e $HOME/.thrift/bin/thrift ]]; then
+ sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libboost-filesystem-dev libboost-thread-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
+ wget https://www.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz
+ tar xfz thrift-0.9.3.tar.gz
+ cd thrift-0.9.3 && ./configure --without-cpp --without-c_glib --without-python --without-ruby --without-php --without-erlang --without-go --without-nodejs -q --prefix=$HOME/.thrift
+ sudo make install > thrift_make_install.log
+ cd ..
+ fi
+ - |
+ if [[ ! -e /usr/local/bin/thrift ]]; then
+ sudo ln -s $HOME/.thrift/bin/thrift /usr/local/bin/thrift
+ fi
+
install:
- ./mvnw -v
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl $TEST_SPECIFIC_MODULES -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl $TEST_SPECIFIC_MODULES -am
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server,!presto-server-rpm'
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT || -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl '!presto-docs,!presto-server-rpm'
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl '!presto-docs,!presto-server-rpm'
fi
- |
if [[ -v HIVE_TESTS ]]; then
- ./mvnw install $MAVEN_FAST_INSTALL -pl presto-hive-hadoop2 -am
+ ./mvnw install $MAVEN_FAST_INSTALL -P !twitter-modules -pl presto-hive-hadoop2 -am
fi
before_script:
@@ -61,30 +77,40 @@ before_script:
script:
- |
if [[ -v MAVEN_CHECKS ]]; then
- ./mvnw install -DskipTests -B -T C1 -P ci
+ ./mvnw install -DskipTests -B -T C1 -P 'ci,!twitter-modules'
fi
- |
if [[ -v TEST_SPECIFIC_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_SPECIFIC_MODULES $TEST_FLAGS
fi
- |
if [[ -v TEST_OTHER_MODULES ]]; then
- ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -B -pl $TEST_OTHER_MODULES
+ ./mvnw test $MAVEN_SKIP_CHECKS_AND_DOCS -P !twitter-modules -B -pl $TEST_OTHER_MODULES
fi
- |
if [[ -v PRODUCT_TESTS_BASIC_ENVIRONMENT ]]; then
presto-product-tests/bin/run_on_docker.sh \
- multinode -x quarantine,big_query,storage_formats,profile_specific_tests,tpcds
+ multinode -x quarantine,big_query,storage_formats,profile_specific_tests,tpcds,cli,hive_connector
+ fi
+ - |
+ if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
+ presto-product-tests/bin/run_on_docker.sh \
+ singlenode -g hdfs_no_impersonation
+ fi
+ - |
+ if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
+ presto-product-tests/bin/run_on_docker.sh \
+ singlenode-hdfs-impersonation -g storage_formats,hdfs_impersonation
fi
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
presto-product-tests/bin/run_on_docker.sh \
- singlenode-kerberos-hdfs-impersonation -g storage_formats,cli,hdfs_impersonation,authorization
+ singlenode-kerberos-hdfs-impersonation -g storage_formats,hdfs_impersonation,authorization
fi
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
presto-product-tests/bin/run_on_docker.sh \
- singlenode-ldap -g ldap -x simba_jdbc
+ singlenode-ldap -g ldap -x simba_jdbc,ldap_cli
fi
# SQL server image sporadically hangs during the startup
# TODO: Uncomment it once issue is fixed
@@ -97,7 +123,7 @@ script:
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
presto-product-tests/bin/run_on_docker.sh \
- multinode-tls -g smoke,cli,group-by,join,tls
+ multinode-tls -g smoke,group-by,join,tls
fi
- |
if [[ -v HIVE_TESTS ]]; then
@@ -109,8 +135,9 @@ before_cache:
- rm -rf $HOME/.m2/repository/com/facebook
notifications:
- slack:
- secure: V5eyoGShxFoCcYJcp858vf/T6gC9KeMxL0C1EElcpZRcKBrIVZzvhek3HLHxZOxlghqnvNVsyDtU3u5orkEaAXeXj5c2dN+4XBsAB9oeN5MtQ0Z3VLAhZDqKIW1LzcXrq4DpzM0PkGhjfjum/P94/qFYk0UckPtB6a341AuYRo8=
+ hipchat:
+ rooms:
+ secure: peNh1KxwlxIpFyb60S8AMvaJThgh1LsjE+Whf1rYkJalVd2wUrqBIoyDKVSueyHD01hQ06gT7rBV6Pu/QcBMR1a9BbMCjERfxLZFUAheuC2Rsb+p1c4dyvBcFUGacgW7XWKCaVYGDGxuUvb0I3Z8cR6KxhK2xi88tHiqBGVGV2yI6zzOTpWVknMfFBtn+ONU1Ob2P6trclXaDyFd4MxubULri6CQdl35eQAq/VnmR3SZOgyVu3V30MGKwI3zhSli+3VqmW0JmaDGoHN6gznM1+VqABLgmIq0P+n+r5gdZWRCorq10NZCFMhVQ8U6rQHcL7sAniYJJsC/yRt6+pjyzIF4N+LSzZ7T+FLxQqT7k/1ukNgrujLDfTpn76Mo9eYTZmfAdzbm1QKJDACwr8Slqhq1jGzcrFMHunvXhVqjOs24R+JAHblY0O9PXvv7aR29GOQWDCvD7nV5QBUr8Xz5q7ozbLqHTI+yH02Jj4EaZ+azWYdRmnr9wDBxWMYBEgOdj4pII9b298XEDB72TxA3KpLTpdLxBTR+gIk/LjJqb/wb84xUv8gPXkaXccltGd5YI90c84cX8isbzNkAylzyfF2Eyueh0XbnMHfpFqBS7qaVM0/D+UxZkU0WNJ0x7G9XJvkiq49bZz2q1KLE4XuvVnTZSSjVSUAS8RtHfwUV33c=
before_deploy:
- mkdir /tmp/artifacts
diff --git a/README.md b/README.md
index f7428dba837d..9325bc3edef5 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Presto [![Build Status](https://travis-ci.org/prestodb/presto.svg?branch=master)](https://travis-ci.org/prestodb/presto)
+# Presto [![Build Status](https://travis-ci.org/twitter-forks/presto.svg?branch=twitter-master)](https://travis-ci.org/twitter-forks/presto)
Presto is a distributed SQL query engine for big data.
diff --git a/pom.xml b/pom.xml
index 2882da39f559..a55f9154446a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
pom
presto-root
@@ -28,9 +28,9 @@
- scm:git:git://github.com/facebook/presto.git
- https://github.com/facebook/presto
- 0.188
+ scm:git:git://github.com/twitter-forks/presto.git
+ https://github.com/twitter-forks/presto
+ 0.188-tw-0.43
@@ -764,6 +764,12 @@
3.6.1
+
+ org.apache.commons
+ commons-pool2
+ 2.4.2
+
+
commons-codec
commons-codec
@@ -846,6 +852,54 @@
1.1.1.7
+
+ org.apache.curator
+ curator-recipes
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-framework
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-client
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+ org.apache.curator
+ curator-test
+ 2.12.0
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
org.apache.zookeeper
zookeeper
@@ -869,7 +923,7 @@
com.101tec
zkclient
- 0.8
+ 0.10
log4j
@@ -879,6 +933,10 @@
org.slf4j
slf4j-log4j12
+
+ org.apache.zookeeper
+ zookeeper
+
@@ -978,6 +1036,11 @@
3.1.4-1
+
+ com.hadoop.gplcompression
+ hadoop-lzo
+ 0.4.16
+
org.javassist
@@ -1257,4 +1320,18 @@
+
+
+
+ twitter-modules
+
+ true
+
+
+ presto-kafka07
+ twitter-eventlistener-plugin
+ presto-twitter-server
+
+
+
diff --git a/presto-accumulo/pom.xml b/presto-accumulo/pom.xml
index 7aff1063715f..bd39c22d518c 100644
--- a/presto-accumulo/pom.xml
+++ b/presto-accumulo/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-accumulo
diff --git a/presto-array/pom.xml b/presto-array/pom.xml
index f82338b19199..b8e220e16f9b 100644
--- a/presto-array/pom.xml
+++ b/presto-array/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-array
diff --git a/presto-atop/pom.xml b/presto-atop/pom.xml
index b2dd0f44c1c2..092fdbdf6e7b 100644
--- a/presto-atop/pom.xml
+++ b/presto-atop/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-atop
diff --git a/presto-base-jdbc/pom.xml b/presto-base-jdbc/pom.xml
index 18daf2cd97aa..d8b8013517e1 100644
--- a/presto-base-jdbc/pom.xml
+++ b/presto-base-jdbc/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-base-jdbc
diff --git a/presto-benchmark-driver/pom.xml b/presto-benchmark-driver/pom.xml
index 8e74cc381e76..283f7bf2068b 100644
--- a/presto-benchmark-driver/pom.xml
+++ b/presto-benchmark-driver/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-benchmark-driver
diff --git a/presto-benchmark/pom.xml b/presto-benchmark/pom.xml
index e17ba6de0e2c..0ac504eb6fcd 100644
--- a/presto-benchmark/pom.xml
+++ b/presto-benchmark/pom.xml
@@ -5,7 +5,7 @@
presto-root
com.facebook.presto
- 0.188
+ 0.188-tw-0.43
presto-benchmark
diff --git a/presto-benchto-benchmarks/pom.xml b/presto-benchto-benchmarks/pom.xml
index 0051682b4800..bdf040f33943 100644
--- a/presto-benchto-benchmarks/pom.xml
+++ b/presto-benchto-benchmarks/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-benchto-benchmarks
diff --git a/presto-blackhole/pom.xml b/presto-blackhole/pom.xml
index 2550b56517e1..667ef78ec3e8 100644
--- a/presto-blackhole/pom.xml
+++ b/presto-blackhole/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-blackhole
diff --git a/presto-bytecode/pom.xml b/presto-bytecode/pom.xml
index b8fc8e151823..3b315bc3ab04 100644
--- a/presto-bytecode/pom.xml
+++ b/presto-bytecode/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-bytecode
diff --git a/presto-cassandra/pom.xml b/presto-cassandra/pom.xml
index 8ade3e8b43be..6677efaf5215 100644
--- a/presto-cassandra/pom.xml
+++ b/presto-cassandra/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-cassandra
diff --git a/presto-cli/pom.xml b/presto-cli/pom.xml
index 48d22e70392e..a4463ddbf9cc 100644
--- a/presto-cli/pom.xml
+++ b/presto-cli/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-cli
diff --git a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java
index 12f5b67217df..79bf95a058ef 100644
--- a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java
+++ b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java
@@ -17,6 +17,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
+import com.sun.security.auth.module.UnixSystem;
import io.airlift.airline.Option;
import io.airlift.units.Duration;
@@ -71,15 +72,16 @@ public class ClientOptions
@Option(name = "--keystore-password", title = "keystore password", description = "Keystore password")
public String keystorePassword;
+ // Pick the user name for the logged in user.
+ // Do not let it be overridden by users.
+ public String user = new UnixSystem().getUsername();
+
@Option(name = "--truststore-path", title = "truststore path", description = "Truststore path")
public String truststorePath;
@Option(name = "--truststore-password", title = "truststore password", description = "Truststore password")
public String truststorePassword;
- @Option(name = "--user", title = "user", description = "Username")
- public String user = System.getProperty("user.name");
-
@Option(name = "--password", title = "password", description = "Prompt for password")
public boolean password;
diff --git a/presto-client/pom.xml b/presto-client/pom.xml
index ca418f9c2531..a1668221505e 100644
--- a/presto-client/pom.xml
+++ b/presto-client/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-client
diff --git a/presto-docs/pom.xml b/presto-docs/pom.xml
index 3d360add49d8..159a3a95d1f6 100644
--- a/presto-docs/pom.xml
+++ b/presto-docs/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-docs
diff --git a/presto-example-http/pom.xml b/presto-example-http/pom.xml
index 234d3117d8a1..1cacfb08cb11 100644
--- a/presto-example-http/pom.xml
+++ b/presto-example-http/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-example-http
diff --git a/presto-geospatial/pom.xml b/presto-geospatial/pom.xml
index e7dc3d2a7db2..04519e7bc63d 100644
--- a/presto-geospatial/pom.xml
+++ b/presto-geospatial/pom.xml
@@ -4,7 +4,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-geospatial
diff --git a/presto-hive-hadoop2/bin/run_on_docker.sh b/presto-hive-hadoop2/bin/run_on_docker.sh
index 150194bef124..f0ba2e69741e 100755
--- a/presto-hive-hadoop2/bin/run_on_docker.sh
+++ b/presto-hive-hadoop2/bin/run_on_docker.sh
@@ -110,7 +110,7 @@ fi
# run product tests
pushd $PROJECT_ROOT
set +e
-./mvnw -pl presto-hive-hadoop2 test -P test-hive-hadoop2 \
+./mvnw -pl presto-hive-hadoop2 test -P test-hive-hadoop2,!twitter-modules \
-Dhive.hadoop2.timeZone=UTC \
-DHADOOP_USER_NAME=hive \
-Dhive.hadoop2.metastoreHost=hadoop-master \
diff --git a/presto-hive-hadoop2/pom.xml b/presto-hive-hadoop2/pom.xml
index 3c8520857f14..5a12438c8778 100644
--- a/presto-hive-hadoop2/pom.xml
+++ b/presto-hive-hadoop2/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-hive-hadoop2
diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml
index 388d4352f8dc..c0c7c820729b 100644
--- a/presto-hive/pom.xml
+++ b/presto-hive/pom.xml
@@ -5,7 +5,7 @@
com.facebook.presto
presto-root
- 0.188
+ 0.188-tw-0.43
presto-hive
@@ -48,11 +48,37 @@
hive-apache
+
+ org.apache.curator
+ curator-recipes
+
+
+
+ org.apache.curator
+ curator-framework
+
+
+
+ org.apache.curator
+ curator-client
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ runtime
+
+
org.apache.thrift
libthrift
+
+ org.apache.commons
+ commons-pool2
+
+
io.airlift
aircompressor
@@ -93,6 +119,12 @@
configuration
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1
+
+
com.google.guava
guava
@@ -183,6 +215,41 @@
jackson-databind
+
+ com.twitter.elephantbird
+ elephant-bird-core
+ 4.14
+ thrift9
+
+
+ commons-logging
+ commons-logging
+
+
+ com.hadoop.gplcompression
+ hadoop-lzo
+
+
+
+
+
+ commons-io
+ commons-io
+ 2.5
+ runtime
+
+
+
+ commons-lang
+ commons-lang
+ 2.4
+
+
+
+ org.anarres.lzo
+ lzo-hadoop
+
+
io.airlift
@@ -211,7 +278,26 @@
jackson-annotations
+
+
+
+ org.apache.curator
+ curator-test
+ test
+
+
+
+ com.101tec
+ zkclient
+ test
+
+
com.facebook.presto
presto-main
@@ -254,12 +340,6 @@
test
-
- org.anarres.lzo
- lzo-hadoop
- test
-
-
com.facebook.presto
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java
index 6f6f4e690e38..5686d835a3ca 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java
@@ -33,6 +33,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.CharStreams;
+import com.hadoop.compression.lzo.LzoIndex;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -79,6 +80,9 @@
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveUtil.checkCondition;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
+import static com.facebook.presto.hive.HiveUtil.getLzopIndexPath;
+import static com.facebook.presto.hive.HiveUtil.isLzopCompressedFile;
+import static com.facebook.presto.hive.HiveUtil.isLzopIndexFile;
import static com.facebook.presto.hive.HiveUtil.isSplittable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf;
@@ -528,6 +532,7 @@ private Iterator createHiveSplitIterator(
Optional pathDomain)
throws IOException
{
+ Path filePath = new Path(path);
if (!pathMatchesPredicate(pathDomain, path)) {
return emptyIterator();
}
@@ -540,6 +545,7 @@ private Iterator createHiveSplitIterator(
return new AbstractIterator()
{
private long chunkOffset = 0;
+ private LzoIndex index = isLzopCompressedFile(filePath) ? LzoIndex.readIndex(hdfsEnvironment.getFileSystem(hdfsContext, getLzopIndexPath(filePath)), filePath) : null;
@Override
protected InternalHiveSplit computeNext()
@@ -570,6 +576,17 @@ protected InternalHiveSplit computeNext()
// adjust the actual chunk size to account for the overrun when chunks are slightly bigger than necessary (see above)
long chunkLength = Math.min(targetChunkSize, blockLocation.getLength() - chunkOffset);
+ // align the end point to the indexed point for lzo compressed file
+ if (isLzopCompressedFile(filePath)) {
+ long offset = blockLocation.getOffset() + chunkOffset;
+ if (index.isEmpty()) {
+ chunkLength = length - offset;
+ }
+ else {
+ chunkLength = index.alignSliceEndToIndex(offset + chunkLength, length) - offset;
+ }
+ }
+
InternalHiveSplit result = new InternalHiveSplit(
partitionName,
path,
@@ -585,10 +602,19 @@ protected InternalHiveSplit computeNext()
chunkOffset += chunkLength;
- if (chunkOffset >= blockLocation.getLength()) {
- checkState(chunkOffset == blockLocation.getLength(), "Error splitting blocks");
+ while (chunkOffset >= blockLocation.getLength()) {
+ // allow overrun for lzo compressed file for intermediate blocks
+ if (!isLzopCompressedFile(filePath) || blockLocation.getOffset() + blockLocation.getLength() >= length) {
+ checkState(chunkOffset == blockLocation.getLength(), "Error splitting blocks for file: " + filePath.toString());
+ }
blockLocationIterator.next();
- chunkOffset = 0;
+ chunkOffset -= blockLocation.getLength();
+ if (chunkOffset == 0) {
+ break;
+ }
+ if (blockLocationIterator.hasNext()) {
+ blockLocation = blockLocationIterator.peek();
+ }
}
return result;
@@ -684,6 +710,10 @@ private static Optional getPathDomain(TupleDomain effe
private static boolean pathMatchesPredicate(Optional pathDomain, String path)
{
+ if (isLzopIndexFile(new Path(path))) {
+ return false;
+ }
+
if (!pathDomain.isPresent()) {
return true;
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
index 8764a03b4d6b..64b49b4a22ea 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java
@@ -26,6 +26,10 @@
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.type.TypeManager;
+import com.facebook.presto.twitter.hive.PooledHiveMetastoreClientFactory;
+import com.facebook.presto.twitter.hive.thrift.HiveThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftFieldIdResolverFactory;
+import com.facebook.presto.twitter.hive.thrift.ThriftHiveRecordCursorProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@@ -83,15 +87,16 @@ public void configure(Binder binder)
newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class, connectorId));
binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON);
- binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON);
- configBinder(binder).bindConfig(StaticMetastoreConfig.class);
+ binder.bind(PooledHiveMetastoreClientFactory.class).in(Scopes.SINGLETON);
binder.bind(NodeManager.class).toInstance(nodeManager);
binder.bind(TypeManager.class).toInstance(typeManager);
binder.bind(PageIndexerFactory.class).toInstance(pageIndexerFactory);
+ binder.bind(ThriftFieldIdResolverFactory.class).toInstance(new HiveThriftFieldIdResolverFactory());
Multibinder recordCursorProviderBinder = newSetBinder(binder, HiveRecordCursorProvider.class);
recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class).in(Scopes.SINGLETON);
+ recordCursorProviderBinder.addBinding().to(ThriftHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionPolicy.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionPolicy.java
index ed9a5a505ad9..7224b35cc791 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionPolicy.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionPolicy.java
@@ -16,15 +16,23 @@
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.VarcharType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import javax.inject.Inject;
+import java.util.List;
+import java.util.stream.Collectors;
+
import static com.facebook.presto.hive.HiveType.HIVE_BYTE;
import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE;
import static com.facebook.presto.hive.HiveType.HIVE_FLOAT;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_LONG;
import static com.facebook.presto.hive.HiveType.HIVE_SHORT;
+
import static java.util.Objects.requireNonNull;
public class HiveCoercionPolicy
@@ -62,6 +70,49 @@ public boolean canCoerce(HiveType fromHiveType, HiveType toHiveType)
return toHiveType.equals(HIVE_DOUBLE);
}
- return false;
+ return canCoerceForList(fromHiveType, toHiveType) || canCoerceForMap(fromHiveType, toHiveType) || canCoerceForStruct(fromHiveType, toHiveType);
+ }
+
+ private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType)
+ {
+ if (!fromHiveType.getCategory().equals(Category.MAP) || !toHiveType.getCategory().equals(Category.MAP)) {
+ return false;
+ }
+ HiveType fromKeyType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ HiveType fromValueType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ HiveType toKeyType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ HiveType toValueType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ return (fromKeyType.equals(toKeyType) || canCoerce(fromKeyType, toKeyType)) && (fromValueType.equals(toValueType) || canCoerce(fromValueType, toValueType));
+ }
+
+ private boolean canCoerceForList(HiveType fromHiveType, HiveType toHiveType)
+ {
+ if (!fromHiveType.getCategory().equals(Category.LIST) || !toHiveType.getCategory().equals(Category.LIST)) {
+ return false;
+ }
+ HiveType fromElementType = HiveType.valueOf(((ListTypeInfo) fromHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ HiveType toElementType = HiveType.valueOf(((ListTypeInfo) toHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ return fromElementType.equals(toElementType) || canCoerce(fromElementType, toElementType);
+ }
+
+ private boolean canCoerceForStruct(HiveType fromHiveType, HiveType toHiveType)
+ {
+ if (!fromHiveType.getCategory().equals(Category.STRUCT) || !toHiveType.getCategory().equals(Category.STRUCT)) {
+ return false;
+ }
+ List fromFieldTypes = getAllStructFieldTypeInfos(fromHiveType);
+ List toFieldTypes = getAllStructFieldTypeInfos(toHiveType);
+ for (int i = 0; i < Math.min(fromFieldTypes.size(), toFieldTypes.size()); i++) {
+ if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i)) && !canCoerce(fromFieldTypes.get(i), toFieldTypes.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private List getAllStructFieldTypeInfos(HiveType hiveType)
+ {
+ return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldTypeInfos()
+ .stream().map(typeInfo -> HiveType.valueOf(typeInfo.getTypeName())).collect(Collectors.toList());
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionRecordCursor.java
index 442be5042d91..7f3f77dd1a96 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionRecordCursor.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCoercionRecordCursor.java
@@ -16,14 +16,23 @@
import com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import static com.facebook.presto.hive.HiveType.HIVE_BYTE;
import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE;
@@ -288,6 +297,16 @@ else if (fromHiveType.equals(HIVE_INT) && toHiveType.equals(HIVE_LONG)) {
else if (fromHiveType.equals(HIVE_FLOAT) && toHiveType.equals(HIVE_DOUBLE)) {
return new FloatToDoubleCoercer();
}
+ else if (HiveUtil.isArrayType(fromType) && HiveUtil.isArrayType(toType)) {
+ return new ListToListCoercer(typeManager, fromHiveType, toHiveType);
+ }
+ else if (HiveUtil.isMapType(fromType) && HiveUtil.isMapType(toType)) {
+ return new MapToMapCoercer(typeManager, fromHiveType, toHiveType);
+ }
+ else if (HiveUtil.isRowType(fromType) && HiveUtil.isRowType(toType)) {
+ return new StructToStructCoercer(typeManager, fromHiveType, toHiveType);
+ }
+
throw new PrestoException(NOT_SUPPORTED, format("Unsupported coercion from %s to %s", fromHiveType, toHiveType));
}
@@ -367,4 +386,291 @@ public void coerce(RecordCursor delegate, int field)
}
}
}
+
+ private static class ListToListCoercer
+ extends Coercer
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final HiveType fromElementHiveType;
+ private final HiveType toElementHiveType;
+ private final Coercer elementCoercer;
+
+ public ListToListCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ this.fromElementHiveType = HiveType.valueOf(((ListTypeInfo) fromHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ this.toElementHiveType = HiveType.valueOf(((ListTypeInfo) toHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ this.elementCoercer = fromElementHiveType.equals(toElementHiveType) ? null : createCoercer(typeManager, fromElementHiveType, toElementHiveType);
+ }
+
+ @Override
+ public void coerce(RecordCursor delegate, int field)
+ {
+ if (delegate.isNull(field)) {
+ setIsNull(true);
+ return;
+ }
+ Block block = (Block) delegate.getObject(field);
+ BlockBuilder builder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), 1);
+ BlockBuilder listBuilder = builder.beginBlockEntry();
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ listBuilder.appendNull();
+ }
+ else if (elementCoercer == null) {
+ block.writePositionTo(i, listBuilder);
+ listBuilder.closeEntry();
+ }
+ else {
+ rewriteBlock(fromElementHiveType, toElementHiveType, block, i, listBuilder, elementCoercer, typeManager, field);
+ }
+ }
+ builder.closeEntry();
+ setObject(builder.build().getObject(0, Block.class));
+ }
+ }
+
+ private static class MapToMapCoercer
+ extends Coercer
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final HiveType fromKeyHiveType;
+ private final HiveType toKeyHiveType;
+ private final HiveType fromValueHiveType;
+ private final HiveType toValueHiveType;
+ private final Coercer keyCoercer;
+ private final Coercer valueCoercer;
+
+ public MapToMapCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ this.fromKeyHiveType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ this.fromValueHiveType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ this.toKeyHiveType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ this.toValueHiveType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ this.keyCoercer = fromKeyHiveType.equals(toKeyHiveType) ? null : createCoercer(typeManager, fromKeyHiveType, toKeyHiveType);
+ this.valueCoercer = fromValueHiveType.equals(toValueHiveType) ? null : createCoercer(typeManager, fromValueHiveType, toValueHiveType);
+ }
+
+ @Override
+ public void coerce(RecordCursor delegate, int field)
+ {
+ if (delegate.isNull(field)) {
+ setIsNull(true);
+ return;
+ }
+ Block block = (Block) delegate.getObject(field);
+ BlockBuilder builder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), 1);
+ BlockBuilder mapBuilder = builder.beginBlockEntry();
+ for (int i = 0; i < block.getPositionCount(); i += 2) {
+ if (block.isNull(i)) {
+ mapBuilder.appendNull();
+ }
+ else if (keyCoercer == null) {
+ block.writePositionTo(i, mapBuilder);
+ mapBuilder.closeEntry();
+ }
+ else {
+ rewriteBlock(fromKeyHiveType, toKeyHiveType, block.getSingleValueBlock(i), 0, mapBuilder, keyCoercer, typeManager, field);
+ }
+ if (block.isNull(i + 1)) {
+ mapBuilder.appendNull();
+ }
+ if (valueCoercer == null) {
+ block.writePositionTo(i + 1, mapBuilder);
+ mapBuilder.closeEntry();
+ }
+ else {
+ rewriteBlock(fromValueHiveType, toValueHiveType, block.getSingleValueBlock(i + 1), 0, mapBuilder, valueCoercer, typeManager, field);
+ }
+ }
+ builder.closeEntry();
+ setObject(builder.build().getObject(0, Block.class));
+ }
+ }
+
+ private static class StructToStructCoercer
+ extends Coercer
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final List fromFieldTypes;
+ private final List toFieldTypes;
+ private final Coercer[] coercers;
+
+ public StructToStructCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ this.fromFieldTypes = getAllStructFieldTypeInfos(fromHiveType);
+ this.toFieldTypes = getAllStructFieldTypeInfos(toHiveType);
+ this.coercers = new Coercer[toFieldTypes.size()];
+ Arrays.fill(this.coercers, null);
+ for (int i = 0; i < Math.min(fromFieldTypes.size(), toFieldTypes.size()); i++) {
+ if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i))) {
+ coercers[i] = createCoercer(typeManager, fromFieldTypes.get(i), toFieldTypes.get(i));
+ }
+ }
+ }
+
+ @Override
+ public void coerce(RecordCursor delegate, int field)
+ {
+ if (delegate.isNull(field)) {
+ setIsNull(true);
+ return;
+ }
+ Block block = (Block) delegate.getObject(field);
+ BlockBuilder builder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), 1);
+ BlockBuilder rowBuilder = builder.beginBlockEntry();
+ for (int i = 0; i < toFieldTypes.size(); i++) {
+ if (i >= fromFieldTypes.size() || block.isNull(i)) {
+ rowBuilder.appendNull();
+ }
+ else if (coercers[i] == null) {
+ block.writePositionTo(i, rowBuilder);
+ rowBuilder.closeEntry();
+ }
+ else {
+ rewriteBlock(fromFieldTypes.get(i), toFieldTypes.get(i), block, i, rowBuilder, coercers[i], typeManager, field);
+ }
+ }
+ builder.closeEntry();
+ setObject(builder.build().getObject(0, Block.class));
+ }
+ }
+
+ private static void rewriteBlock(
+ HiveType fromFieldHiveType,
+ HiveType toFieldHiveType,
+ Block block,
+ int position,
+ BlockBuilder builder,
+ Coercer coercer,
+ TypeManager typeManager,
+ int field)
+ {
+ Type fromFieldType = fromFieldHiveType.getType(typeManager);
+ Type toFieldType = toFieldHiveType.getType(typeManager);
+ Object value = null;
+ if (fromFieldHiveType.equals(HIVE_BYTE) || fromFieldHiveType.equals(HIVE_SHORT) || fromFieldHiveType.equals(HIVE_INT) || fromFieldHiveType.equals(HIVE_LONG) || fromFieldHiveType.equals(HIVE_FLOAT)) {
+ value = fromFieldType.getLong(block, position);
+ }
+ else if (fromFieldType instanceof VarcharType) {
+ value = fromFieldType.getSlice(block, position);
+ }
+ else if (HiveUtil.isStructuralType(fromFieldHiveType)) {
+ value = fromFieldType.getObject(block, position);
+ }
+ coercer.reset();
+ RecordCursor bridgingRecordCursor = createBridgingRecordCursor(value, typeManager, fromFieldHiveType);
+ if (coercer.isNull(bridgingRecordCursor, field)) {
+ builder.appendNull();
+ }
+ else if (toFieldHiveType.equals(HIVE_BYTE) || toFieldHiveType.equals(HIVE_SHORT) || toFieldHiveType.equals(HIVE_INT) || toFieldHiveType.equals(HIVE_LONG) || toFieldHiveType.equals(HIVE_FLOAT)) {
+ toFieldType.writeLong(builder, coercer.getLong(bridgingRecordCursor, field));
+ }
+ else if (toFieldHiveType.equals(HIVE_DOUBLE)) {
+ toFieldType.writeDouble(builder, coercer.getDouble(bridgingRecordCursor, field));
+ }
+ else if (toFieldType instanceof VarcharType) {
+ toFieldType.writeSlice(builder, coercer.getSlice(bridgingRecordCursor, field));
+ }
+ else if (HiveUtil.isStructuralType(toFieldHiveType)) {
+ toFieldType.writeObject(builder, coercer.getObject(bridgingRecordCursor, field));
+ }
+ else {
+ throw new PrestoException(NOT_SUPPORTED, format("Unsupported coercion from %s to %s", fromFieldHiveType, toFieldHiveType));
+ }
+ coercer.reset();
+ }
+
+ private static RecordCursor createBridgingRecordCursor(
+ Object value,
+ TypeManager typeManager,
+ HiveType hiveType)
+ {
+ return new RecordCursor() {
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public Type getType(int field)
+ {
+ return hiveType.getType(typeManager);
+ }
+
+ @Override
+ public boolean advanceNextPosition()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean getBoolean(int field)
+ {
+ return (Boolean) value;
+ }
+
+ @Override
+ public long getLong(int field)
+ {
+ return (Long) value;
+ }
+
+ @Override
+ public double getDouble(int field)
+ {
+ return (Double) value;
+ }
+
+ @Override
+ public Slice getSlice(int field)
+ {
+ return (Slice) value;
+ }
+
+ @Override
+ public Object getObject(int field)
+ {
+ return value;
+ }
+
+ @Override
+ public boolean isNull(int field)
+ {
+ return Objects.isNull(value);
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+ }
+
+ private static List getAllStructFieldTypeInfos(HiveType hiveType)
+ {
+ return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldTypeInfos()
+ .stream().map(typeInfo -> HiveType.valueOf(typeInfo.getTypeName())).collect(Collectors.toList());
+ }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java
index bca390b7d088..2b7fdd2bc287 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java
@@ -32,6 +32,9 @@
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
+import com.facebook.presto.twitter.hive.MetastoreStaticClusterModule;
+import com.facebook.presto.twitter.hive.MetastoreZkDiscoveryBasedModule;
+import com.facebook.presto.twitter.hive.ZookeeperServersetMetastoreConfig;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
@@ -49,6 +52,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static io.airlift.configuration.ConditionalModule.installModuleIf;
import static java.util.Objects.requireNonNull;
public class HiveConnectorFactory
@@ -88,6 +92,14 @@ public Connector create(String connectorId, Map config, Connecto
new EventModule(),
new MBeanModule(),
new JsonModule(),
+ installModuleIf(
+ ZookeeperServersetMetastoreConfig.class,
+ zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() == null,
+ new MetastoreStaticClusterModule()),
+ installModuleIf(
+ ZookeeperServersetMetastoreConfig.class,
+ zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() != null,
+ new MetastoreZkDiscoveryBasedModule()),
new HiveClientModule(
connectorId,
context.getTypeManager(),
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java
index 4fcebf1798ec..6237f949b57d 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java
@@ -28,11 +28,16 @@
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.base.Throwables;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.joda.time.DateTimeZone;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static com.facebook.presto.hive.HiveType.HIVE_BYTE;
@@ -287,6 +292,15 @@ else if (fromHiveType.equals(HIVE_INT) && toHiveType.equals(HIVE_LONG)) {
else if (fromHiveType.equals(HIVE_FLOAT) && toHiveType.equals(HIVE_DOUBLE)) {
return new FloatToDoubleCoercer();
}
+ else if (HiveUtil.isArrayType(fromType) && HiveUtil.isArrayType(toType)) {
+ return new ListToListCoercer(typeManager, fromHiveType, toHiveType);
+ }
+ else if (HiveUtil.isMapType(fromType) && HiveUtil.isMapType(toType)) {
+ return new MapToMapCoercer(typeManager, fromHiveType, toHiveType);
+ }
+ else if (HiveUtil.isRowType(fromType) && HiveUtil.isRowType(toType)) {
+ return new StructToStructCoercer(typeManager, fromHiveType, toHiveType);
+ }
throw new PrestoException(NOT_SUPPORTED, format("Unsupported coercion from %s to %s", fromHiveType, toHiveType));
}
@@ -424,6 +438,195 @@ public Block apply(Block block)
}
}
+ private static class ListToListCoercer
+ implements Function
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final Function elementCoercer;
+
+ public ListToListCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ HiveType fromElementHiveType = HiveType.valueOf(((ListTypeInfo) fromHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ HiveType toElementHiveType = HiveType.valueOf(((ListTypeInfo) toHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
+ this.elementCoercer = fromElementHiveType.equals(toElementHiveType) ? null : createCoercer(typeManager, fromElementHiveType, toElementHiveType);
+ }
+
+ @Override
+ public Block apply(Block block)
+ {
+ BlockBuilder blockBuilder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), block.getPositionCount());
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ blockBuilder.appendNull();
+ continue;
+ }
+ Block singleArrayBlock = block.getObject(i, Block.class);
+ BlockBuilder singleArrayBuilder = blockBuilder.beginBlockEntry();
+ for (int j = 0; j < singleArrayBlock.getPositionCount(); j++) {
+ if (singleArrayBlock.isNull(j)) {
+ singleArrayBuilder.appendNull();
+ }
+ else if (elementCoercer == null) {
+ singleArrayBlock.writePositionTo(j, singleArrayBuilder);
+ singleArrayBuilder.closeEntry();
+ }
+ else {
+ Block singleElementBlock = elementCoercer.apply(singleArrayBlock.getSingleValueBlock(j));
+ if (singleElementBlock.isNull(0)) {
+ singleArrayBuilder.appendNull();
+ }
+ else {
+ singleElementBlock.writePositionTo(0, singleArrayBuilder);
+ singleArrayBuilder.closeEntry();
+ }
+ }
+ }
+ blockBuilder.closeEntry();
+ }
+ return blockBuilder.build();
+ }
+ }
+
+ private static class MapToMapCoercer
+ implements Function
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final Function keyCoercer;
+ private final Function valueCoercer;
+
+ public MapToMapCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ HiveType fromKeyHiveType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ HiveType fromValueHiveType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ HiveType toKeyHiveType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
+ HiveType toValueHiveType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
+ this.keyCoercer = fromKeyHiveType.equals(toKeyHiveType) ? null : createCoercer(typeManager, fromKeyHiveType, toKeyHiveType);
+ this.valueCoercer = fromValueHiveType.equals(toValueHiveType) ? null : createCoercer(typeManager, fromValueHiveType, toValueHiveType);
+ }
+
+ @Override
+ public Block apply(Block block)
+ {
+ BlockBuilder blockBuilder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), block.getPositionCount());
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ blockBuilder.appendNull();
+ continue;
+ }
+ Block singleMapBlock = block.getObject(i, Block.class);
+ BlockBuilder singleMapBuilder = blockBuilder.beginBlockEntry();
+ if (singleMapBlock.isNull(0)) {
+ singleMapBuilder.appendNull();
+ }
+ else if (keyCoercer == null) {
+ singleMapBlock.writePositionTo(0, singleMapBuilder);
+ singleMapBuilder.closeEntry();
+ }
+ else {
+ Block singleKeyBlock = keyCoercer.apply(singleMapBlock.getSingleValueBlock(0));
+ if (singleKeyBlock.isNull(0)) {
+ singleMapBuilder.appendNull();
+ }
+ else {
+ singleKeyBlock.writePositionTo(0, singleMapBuilder);
+ singleMapBuilder.closeEntry();
+ }
+ }
+ if (singleMapBlock.isNull(1)) {
+ singleMapBuilder.appendNull();
+ }
+ else if (valueCoercer == null) {
+ singleMapBlock.writePositionTo(1, singleMapBuilder);
+ singleMapBuilder.closeEntry();
+ }
+ else {
+ Block singleValueBlock = valueCoercer.apply(singleMapBlock.getSingleValueBlock(1));
+ if (singleValueBlock.isNull(0)) {
+ singleMapBuilder.appendNull();
+ }
+ else {
+ singleValueBlock.writePositionTo(0, singleMapBuilder);
+ singleMapBuilder.closeEntry();
+ }
+ }
+ blockBuilder.closeEntry();
+ }
+ return blockBuilder.build();
+ }
+ }
+
+ private static class StructToStructCoercer
+ implements Function
+ {
+ private final TypeManager typeManager;
+ private final HiveType fromHiveType;
+ private final HiveType toHiveType;
+ private final List fromFieldTypes;
+ private final List toFieldTypes;
+ private final Function[] coercers;
+
+ public StructToStructCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
+ {
+ this.typeManager = requireNonNull(typeManager, "typeManage is null");
+ this.fromHiveType = requireNonNull(fromHiveType, "fromHiveType is null");
+ this.toHiveType = requireNonNull(toHiveType, "toHiveType is null");
+ this.fromFieldTypes = getAllStructFieldTypeInfos(fromHiveType);
+ this.toFieldTypes = getAllStructFieldTypeInfos(toHiveType);
+ this.coercers = new Function[toFieldTypes.size()];
+ Arrays.fill(this.coercers, null);
+ for (int i = 0; i < Math.min(fromFieldTypes.size(), toFieldTypes.size()); i++) {
+ if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i))) {
+ coercers[i] = createCoercer(typeManager, fromFieldTypes.get(i), toFieldTypes.get(i));
+ }
+ }
+ }
+
+ @Override
+ public Block apply(Block block)
+ {
+ BlockBuilder blockBuilder = toHiveType.getType(typeManager).createBlockBuilder(new BlockBuilderStatus(), block.getPositionCount());
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ if (block.isNull(i)) {
+ blockBuilder.appendNull();
+ continue;
+ }
+ Block singleRowBlock = block.getObject(i, Block.class);
+ BlockBuilder singleRowBuilder = blockBuilder.beginBlockEntry();
+ for (int j = 0; j < toFieldTypes.size(); j++) {
+ if (j >= fromFieldTypes.size() || singleRowBlock.isNull(j)) {
+ singleRowBuilder.appendNull();
+ }
+ else if (coercers[j] == null) {
+ singleRowBlock.writePositionTo(j, singleRowBuilder);
+ singleRowBuilder.closeEntry();
+ }
+ else {
+ Block singleFieldBlock = coercers[j].apply(singleRowBlock.getSingleValueBlock(j));
+ if (singleFieldBlock.isNull(0)) {
+ singleRowBuilder.appendNull();
+ }
+ else {
+ singleFieldBlock.writePositionTo(0, singleRowBuilder);
+ singleRowBuilder.closeEntry();
+ }
+ }
+ }
+ blockBuilder.closeEntry();
+ }
+ return blockBuilder.build();
+ }
+ }
+
private final class CoercionLazyBlockLoader
implements LazyBlockLoader
{
@@ -450,4 +653,10 @@ public void load(LazyBlock lazyBlock)
block = null;
}
}
+
+ private static List getAllStructFieldTypeInfos(HiveType hiveType)
+ {
+ return ((StructTypeInfo) hiveType.getTypeInfo()).getAllStructFieldTypeInfos()
+ .stream().map(typeInfo -> HiveType.valueOf(typeInfo.getTypeName())).collect(Collectors.toList());
+ }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
index 4e4523d04185..5a9edf5b383c 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStorageFormat.java
@@ -14,6 +14,7 @@
package com.facebook.presto.hive;
import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -32,6 +33,7 @@
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -93,6 +95,11 @@ public enum HiveStorageFormat
LazySimpleSerDe.class.getName(),
TextInputFormat.class.getName(),
HiveIgnoreKeyTextOutputFormat.class.getName(),
+ new DataSize(8, Unit.MEGABYTE)),
+ THRIFTBINARY(
+ LazyBinarySerDe.class.getName(),
+ ThriftGeneralInputFormat.class.getName(),
+ HiveIgnoreKeyTextOutputFormat.class.getName(),
new DataSize(8, Unit.MEGABYTE));
private final String serde;
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
index f0965b522761..ff0c9bb03cb8 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
@@ -27,6 +27,7 @@
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
+import com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
@@ -39,6 +40,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -110,6 +112,7 @@
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.transform;
+import static com.hadoop.compression.lzo.LzoIndex.LZO_INDEX_SUFFIX;
import static java.lang.Byte.parseByte;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
@@ -146,6 +149,22 @@ public final class HiveUtil
private static final String BIG_DECIMAL_POSTFIX = "BD";
+ private static final PathFilter LZOP_DEFAULT_SUFFIX_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path)
+ {
+ return path.toString().endsWith(".lzo");
+ }
+ };
+
+ private static final PathFilter LZOP_INDEX_DEFAULT_SUFFIX_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path)
+ {
+ return path.toString().endsWith(".lzo.index");
+ }
+ };
+
static {
DateTimeParser[] timestampWithoutTimeZoneParser = {
DateTimeFormat.forPattern("yyyy-M-d").getParser(),
@@ -178,7 +197,7 @@ private HiveUtil()
// propagate serialization configuration to getRecordReader
schema.stringPropertyNames().stream()
- .filter(name -> name.startsWith("serialization."))
+ .filter(name -> name.startsWith("serialization.") || name.startsWith("elephantbird."))
.forEach(name -> jobConf.set(name, schema.getProperty(name)));
// add Airlift LZO and LZOP to head of codecs list so as to not override existing entries
@@ -242,6 +261,11 @@ public static void setReadColumns(Configuration configuration, List rea
return MapredParquetInputFormat.class;
}
+ // Remove this after https://github.com/twitter/elephant-bird/pull/481 is included in a release
+ if ("com.twitter.elephantbird.mapred.input.HiveMultiInputFormat".equals(inputFormatName)) {
+ return ThriftGeneralInputFormat.class;
+ }
+
Class> clazz = conf.getClassByName(inputFormatName);
return (Class extends InputFormat, ?>>) clazz.asSubclass(InputFormat.class);
}
@@ -294,6 +318,21 @@ static boolean isSplittable(InputFormat, ?> inputFormat, FileSystem fileSystem
}
}
+ public static boolean isLzopCompressedFile(Path filePath)
+ {
+ return LZOP_DEFAULT_SUFFIX_FILTER.accept(filePath);
+ }
+
+ public static boolean isLzopIndexFile(Path filePath)
+ {
+ return LZOP_INDEX_DEFAULT_SUFFIX_FILTER.accept(filePath);
+ }
+
+ public static Path getLzopIndexPath(Path lzoPath)
+ {
+ return lzoPath.suffix(LZO_INDEX_SUFFIX);
+ }
+
public static StructObjectInspector getTableObjectInspector(Properties schema)
{
return getTableObjectInspector(getDeserializer(schema));
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
index 089d0c56cdde..f9d4ac8893f0 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveWriteUtils.java
@@ -445,6 +445,17 @@ public static boolean isViewFileSystem(HdfsContext context, HdfsEnvironment hdfs
}
}
+ public static boolean isHDFSCompatibleViewFileSystem(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path)
+ {
+ try {
+ return getRawFileSystem(hdfsEnvironment.getFileSystem(context, path))
+ .getClass().getName().equals("org.apache.hadoop.fs.viewfs.HDFSCompatibleViewFileSystem");
+ }
+ catch (IOException e) {
+ throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed checking path: " + path, e);
+ }
+ }
+
private static FileSystem getRawFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof FilterFileSystem) {
@@ -473,6 +484,11 @@ public static Path createTemporaryPath(HdfsContext context, HdfsEnvironment hdfs
temporaryPrefix = ".hive-staging";
}
+ // use relative temporary directory on HDFSCompatibleViewFileSystem
+ if (isHDFSCompatibleViewFileSystem(context, hdfsEnvironment, targetPath)) {
+ temporaryPrefix = "../.hive-staging";
+ }
+
// create a temporary directory on the same filesystem
Path temporaryRoot = new Path(targetPath, temporaryPrefix);
Path temporaryPath = new Path(temporaryRoot, randomUUID().toString());
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
index 3b87f79c3b62..79baf4065ef4 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java
@@ -138,16 +138,18 @@ public V run(String callableName, Callable callable)
return callable.call();
}
catch (Exception e) {
+ log.debug("Failed on executing %s with attempt %d, Exception: %s", callableName, attempt, e.getMessage());
e = exceptionMapper.apply(e);
for (Class extends Exception> clazz : exceptionWhiteList) {
if (clazz.isInstance(e)) {
+ log.debug("Exception is in whitelist.");
throw e;
}
}
if (attempt >= maxAttempts || Duration.nanosSince(startTime).compareTo(maxRetryTime) >= 0) {
+ log.debug("Maximum attempts or maximum retry time reached. attempt: %d, maxAttempts: %d, duration: [%s] maxRetryTime: [%s]", attempt, maxAttempts, Duration.nanosSince(startTime).toString(), maxRetryTime.toString());
throw e;
}
- log.debug("Failed on executing %s with attempt %d, will retry. Exception: %s", callableName, attempt, e.getMessage());
int delayInMs = (int) Math.min(minSleepTime.toMillis() * Math.pow(scaleFactor, attempt - 1), maxSleepTime.toMillis());
int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayInMs * 0.1)));
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/authentication/SimpleHadoopAuthentication.java b/presto-hive/src/main/java/com/facebook/presto/hive/authentication/SimpleHadoopAuthentication.java
index 663d852e4399..0aced6734483 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/authentication/SimpleHadoopAuthentication.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/authentication/SimpleHadoopAuthentication.java
@@ -25,7 +25,7 @@ public class SimpleHadoopAuthentication
public UserGroupInformation getUserGroupInformation()
{
try {
- return UserGroupInformation.getCurrentUser();
+ return UserGroupInformation.getLoginUser();
}
catch (IOException e) {
throw Throwables.propagate(e);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ByteBufferInputStream.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ByteBufferInputStream.java
new file mode 100644
index 000000000000..1b93ffeea316
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ByteBufferInputStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.hive.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+/*
+ * Implementation by https://github.com/google/mr4c
+ */
+public class ByteBufferInputStream
+ extends InputStream
+{
+ ByteBuffer buf;
+
+ public ByteBufferInputStream(ByteBuffer buf)
+ {
+ this.buf = buf;
+ }
+
+ public int read() throws IOException
+ {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len) throws IOException
+ {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetCompressionUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetCompressionUtils.java
index ef9a300a4acd..e391994a0da3 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetCompressionUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetCompressionUtils.java
@@ -13,11 +13,14 @@
*/
package com.facebook.presto.hive.parquet;
+import com.hadoop.compression.lzo.LzoCodec;
import io.airlift.compress.Decompressor;
+//import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.snappy.SnappyDecompressor;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
+import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
@@ -38,7 +41,7 @@ public final class ParquetCompressionUtils
private ParquetCompressionUtils() {}
- public static Slice decompress(CompressionCodecName codec, Slice input, int uncompressedSize)
+ public static Slice decompress(CompressionCodecName codec, Slice input, int uncompressedSize, Configuration configuration)
throws IOException
{
requireNonNull(input, "input is null");
@@ -55,7 +58,7 @@ public static Slice decompress(CompressionCodecName codec, Slice input, int unco
case UNCOMPRESSED:
return input;
case LZO:
- return decompressLZO(input, uncompressedSize);
+ return decompressLZOP(input, uncompressedSize, configuration);
default:
throw new ParquetCorruptionException("Codec not supported in Parquet: " + codec);
}
@@ -86,6 +89,21 @@ private static Slice decompressGzip(Slice input, int uncompressedSize)
}
}
+ private static Slice decompressLZOP(Slice input, int uncompressedSize, Configuration configuration)
+ throws IOException
+ {
+ // over allocate buffer which makes decompression easier
+ byte[] output = new byte[uncompressedSize + SIZE_OF_LONG];
+
+ LzoCodec lzoCodec = new LzoCodec();
+ lzoCodec.setConf(configuration);
+ int decompressedSize = lzoCodec
+ .createInputStream(new ByteBufferInputStream(input.toByteBuffer()))
+ .read(output, 0, uncompressedSize);
+ checkArgument(decompressedSize == uncompressedSize);
+ return wrappedBuffer(output, 0, uncompressedSize);
+ }
+
private static Slice decompressLZO(Slice input, int uncompressedSize)
{
LzoDecompressor lzoDecompressor = new LzoDecompressor();
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java
index 2b3b6090dc4a..157aba8c6b28 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java
@@ -347,7 +347,7 @@ private ParquetRecordReader createParquetRecordReader(
if (predicatePushdownEnabled) {
TupleDomain parquetTupleDomain = getParquetTupleDomain(fileSchema, requestedSchema, effectivePredicate);
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, fileSchema);
- if (predicateMatches(parquetPredicate, block, dataSource, fileSchema, requestedSchema, parquetTupleDomain)) {
+ if (predicateMatches(parquetPredicate, block, dataSource, fileSchema, requestedSchema, parquetTupleDomain, configuration)) {
offsets.add(block.getStartingPos());
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
index d128e3412f52..7e25190e5e74 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java
@@ -170,7 +170,7 @@ public static ParquetPageSource createParquetPageSource(
ParquetPredicate parquetPredicate = buildParquetPredicate(requestedSchema, parquetTupleDomain, fileMetaData.getSchema());
final ParquetDataSource finalDataSource = dataSource;
blocks = blocks.stream()
- .filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, fileSchema, requestedSchema, parquetTupleDomain))
+ .filter(block -> predicateMatches(parquetPredicate, block, finalDataSource, fileSchema, requestedSchema, parquetTupleDomain, configuration))
.collect(toList());
}
@@ -180,7 +180,8 @@ public static ParquetPageSource createParquetPageSource(
blocks,
dataSource,
typeManager,
- systemMemoryContext);
+ systemMemoryContext,
+ configuration);
return new ParquetPageSource(
parquetReader,
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
index 4e6fd09b880a..8cf4a8c11157 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java
@@ -146,7 +146,7 @@ public static int getFieldIndex(MessageType fileSchema, String name)
public static parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
- return getParquetTypeByName(column.getName(), messageType);
+ return findParquetTypeByName(column, messageType);
}
if (column.getHiveColumnIndex() < messageType.getFieldCount()) {
@@ -155,6 +155,28 @@ public static parquet.schema.Type getParquetType(HiveColumnHandle column, Messag
return null;
}
+ /**
+ * Find the column type by name using returning the first match with the following logic:
+ *
+ * - direct match
+ * - case-insensitive match
+ * - if the name ends with _, remove it and direct match
+ * - if the name ends with _, remove it and case-insensitive match
+ *
+ */
+ private static parquet.schema.Type findParquetTypeByName(HiveColumnHandle column, MessageType messageType)
+ {
+ String name = column.getName();
+ parquet.schema.Type type = getParquetTypeByName(name, messageType);
+
+ // when a parquet field is a hive keyword we append an _ to it in hive. When doing
+ // a name-based lookup, we need to strip it off again if we didn't get a direct match.
+ if (type == null && name.endsWith("_")) {
+ type = getParquetTypeByName(name.substring(0, name.length() - 1), messageType);
+ }
+ return type;
+ }
+
public static ParquetEncoding getParquetEncoding(Encoding encoding)
{
switch (encoding) {
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/predicate/ParquetPredicateUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/predicate/ParquetPredicateUtils.java
index bebed56b95c2..0faedf936f8d 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/predicate/ParquetPredicateUtils.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/predicate/ParquetPredicateUtils.java
@@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.slice.Slice;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.column.statistics.Statistics;
@@ -103,14 +104,14 @@ public static ParquetPredicate buildParquetPredicate(MessageType requestedSchema
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build());
}
- public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, MessageType fileSchema, MessageType requestedSchema, TupleDomain parquetTupleDomain)
+ public static boolean predicateMatches(ParquetPredicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, MessageType fileSchema, MessageType requestedSchema, TupleDomain parquetTupleDomain, Configuration configuration)
{
Map> columnStatistics = getStatistics(block, fileSchema, requestedSchema);
if (!parquetPredicate.matches(block.getRowCount(), columnStatistics)) {
return false;
}
- Map dictionaries = getDictionaries(block, dataSource, fileSchema, requestedSchema, parquetTupleDomain);
+ Map dictionaries = getDictionaries(block, dataSource, fileSchema, requestedSchema, parquetTupleDomain, configuration);
return parquetPredicate.matches(dictionaries);
}
@@ -129,7 +130,7 @@ private static Map> getStatistics(BlockMetaData
return statistics.build();
}
- private static Map getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, MessageType fileSchema, MessageType requestedSchema, TupleDomain parquetTupleDomain)
+ private static Map getDictionaries(BlockMetaData blockMetadata, ParquetDataSource dataSource, MessageType fileSchema, MessageType requestedSchema, TupleDomain parquetTupleDomain, Configuration configuration)
{
ImmutableMap.Builder dictionaries = ImmutableMap.builder();
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
@@ -141,7 +142,7 @@ private static Map getDictionarie
int totalSize = toIntExact(columnMetaData.getTotalSize());
byte[] buffer = new byte[totalSize];
dataSource.readFully(columnMetaData.getStartingPos(), buffer);
- Optional dictionaryPage = readDictionaryPage(buffer, columnMetaData.getCodec());
+ Optional dictionaryPage = readDictionaryPage(buffer, columnMetaData.getCodec(), configuration);
dictionaries.put(columnDescriptor, new ParquetDictionaryDescriptor(columnDescriptor, dictionaryPage));
}
catch (IOException ignored) {
@@ -153,7 +154,7 @@ private static Map getDictionarie
return dictionaries.build();
}
- private static Optional readDictionaryPage(byte[] data, CompressionCodecName codecName)
+ private static Optional readDictionaryPage(byte[] data, CompressionCodecName codecName, Configuration configuration)
{
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
@@ -168,7 +169,7 @@ private static Optional readDictionaryPage(byte[] data, C
ParquetEncoding encoding = getParquetEncoding(Encoding.valueOf(dicHeader.getEncoding().name()));
int dictionarySize = dicHeader.getNum_values();
- return Optional.of(new ParquetDictionaryPage(decompress(codecName, compressedData, pageHeader.getUncompressed_page_size()), dictionarySize, encoding));
+ return Optional.of(new ParquetDictionaryPage(decompress(codecName, compressedData, pageHeader.getUncompressed_page_size(), configuration), dictionarySize, encoding));
}
catch (IOException ignored) {
return Optional.empty();
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/.ParquetReader.java.swp b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/.ParquetReader.java.swp
new file mode 100644
index 000000000000..9476aee4ebdf
Binary files /dev/null and b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/.ParquetReader.java.swp differ
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBinaryColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBinaryColumnReader.java
index 224689da5c7d..e130775c53b3 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBinaryColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBinaryColumnReader.java
@@ -16,6 +16,7 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import parquet.io.api.Binary;
@@ -29,9 +30,9 @@
public class ParquetBinaryColumnReader
extends ParquetColumnReader
{
- public ParquetBinaryColumnReader(ColumnDescriptor descriptor)
+ public ParquetBinaryColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBooleanColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBooleanColumnReader.java
index 398cbae0b9ae..93ee33757fa1 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBooleanColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetBooleanColumnReader.java
@@ -15,14 +15,15 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
public class ParquetBooleanColumnReader
extends ParquetColumnReader
{
- public ParquetBooleanColumnReader(ColumnDescriptor descriptor)
+ public ParquetBooleanColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetColumnReader.java
index 0433213816ba..09dec7f89b3a 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetColumnReader.java
@@ -28,6 +28,7 @@
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
import it.unimi.dsi.fastutil.ints.IntList;
+import org.apache.hadoop.conf.Configuration;
import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.column.values.ValuesReader;
@@ -55,6 +56,7 @@ public abstract class ParquetColumnReader
protected ValuesReader valuesReader;
protected int nextBatchSize;
+ private Configuration configuration;
private ParquetLevelReader repetitionReader;
private ParquetLevelReader definitionReader;
private int repetitionLevel;
@@ -70,44 +72,45 @@ public abstract class ParquetColumnReader
protected abstract void skipValue();
- public static ParquetColumnReader createReader(RichColumnDescriptor descriptor)
+ public static ParquetColumnReader createReader(RichColumnDescriptor descriptor, Configuration configuration)
{
switch (descriptor.getType()) {
case BOOLEAN:
- return new ParquetBooleanColumnReader(descriptor);
+ return new ParquetBooleanColumnReader(descriptor, configuration);
case INT32:
- return createDecimalColumnReader(descriptor).orElse(new ParquetIntColumnReader(descriptor));
+ return createDecimalColumnReader(descriptor, configuration).orElse(new ParquetIntColumnReader(descriptor, configuration));
case INT64:
- return createDecimalColumnReader(descriptor).orElse(new ParquetLongColumnReader(descriptor));
+ return createDecimalColumnReader(descriptor, configuration).orElse(new ParquetLongColumnReader(descriptor, configuration));
case INT96:
- return new ParquetTimestampColumnReader(descriptor);
+ return new ParquetTimestampColumnReader(descriptor, configuration);
case FLOAT:
- return new ParquetFloatColumnReader(descriptor);
+ return new ParquetFloatColumnReader(descriptor, configuration);
case DOUBLE:
- return new ParquetDoubleColumnReader(descriptor);
+ return new ParquetDoubleColumnReader(descriptor, configuration);
case BINARY:
- return createDecimalColumnReader(descriptor).orElse(new ParquetBinaryColumnReader(descriptor));
+ return createDecimalColumnReader(descriptor, configuration).orElse(new ParquetBinaryColumnReader(descriptor, configuration));
case FIXED_LEN_BYTE_ARRAY:
- return createDecimalColumnReader(descriptor)
+ return createDecimalColumnReader(descriptor, configuration)
.orElseThrow(() -> new PrestoException(NOT_SUPPORTED, "Parquet type FIXED_LEN_BYTE_ARRAY supported as DECIMAL; got " + descriptor.getPrimitiveType().getOriginalType()));
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType());
}
}
- private static Optional createDecimalColumnReader(RichColumnDescriptor descriptor)
+ private static Optional createDecimalColumnReader(RichColumnDescriptor descriptor, Configuration configuration)
{
Optional type = createDecimalType(descriptor);
if (type.isPresent()) {
DecimalType decimalType = (DecimalType) type.get();
- return Optional.of(ParquetDecimalColumnReaderFactory.createReader(descriptor, decimalType.getPrecision(), decimalType.getScale()));
+ return Optional.of(ParquetDecimalColumnReaderFactory.createReader(descriptor, decimalType.getPrecision(), decimalType.getScale(), configuration));
}
return Optional.empty();
}
- public ParquetColumnReader(ColumnDescriptor columnDescriptor)
+ public ParquetColumnReader(ColumnDescriptor columnDescriptor, Configuration configuration)
{
this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor");
+ this.configuration = requireNonNull(configuration, "Hadoop Configuration");
pageReader = null;
}
@@ -119,7 +122,8 @@ public ParquetPageReader getPageReader()
public void setPageReader(ParquetPageReader pageReader)
{
this.pageReader = requireNonNull(pageReader, "pageReader");
- ParquetDictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ this.pageReader.setConfiguration(configuration);
+ ParquetDictionaryPage dictionaryPage = this.pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
@@ -132,8 +136,8 @@ public void setPageReader(ParquetPageReader pageReader)
else {
dictionary = null;
}
- checkArgument(pageReader.getTotalValueCount() > 0, "page is empty");
- totalValueCount = pageReader.getTotalValueCount();
+ checkArgument(this.pageReader.getTotalValueCount() > 0, "page is empty");
+ totalValueCount = this.pageReader.getTotalValueCount();
}
public void prepareNextRead(int batchSize)
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDecimalColumnReaderFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDecimalColumnReaderFactory.java
index cb1b28a88dd8..7f3eb1a3ad82 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDecimalColumnReaderFactory.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDecimalColumnReaderFactory.java
@@ -15,19 +15,20 @@
import com.facebook.presto.hive.parquet.RichColumnDescriptor;
import com.facebook.presto.spi.type.DecimalType;
+import org.apache.hadoop.conf.Configuration;
public final class ParquetDecimalColumnReaderFactory
{
private ParquetDecimalColumnReaderFactory() {}
- public static ParquetColumnReader createReader(RichColumnDescriptor descriptor, int precision, int scale)
+ public static ParquetColumnReader createReader(RichColumnDescriptor descriptor, int precision, int scale, Configuration configuration)
{
DecimalType decimalType = DecimalType.createDecimalType(precision, scale);
if (decimalType.isShort()) {
- return new ParquetShortDecimalColumnReader(descriptor);
+ return new ParquetShortDecimalColumnReader(descriptor, configuration);
}
else {
- return new ParquetLongDecimalColumnReader(descriptor);
+ return new ParquetLongDecimalColumnReader(descriptor, configuration);
}
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDoubleColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDoubleColumnReader.java
index 07564b358797..669604799d5b 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDoubleColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetDoubleColumnReader.java
@@ -15,14 +15,15 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
public class ParquetDoubleColumnReader
extends ParquetColumnReader
{
- public ParquetDoubleColumnReader(ColumnDescriptor descriptor)
+ public ParquetDoubleColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetFloatColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetFloatColumnReader.java
index 60ce674ca137..7f789ce57585 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetFloatColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetFloatColumnReader.java
@@ -15,6 +15,7 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import static java.lang.Float.floatToRawIntBits;
@@ -22,9 +23,9 @@
public class ParquetFloatColumnReader
extends ParquetColumnReader
{
- public ParquetFloatColumnReader(ColumnDescriptor descriptor)
+ public ParquetFloatColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetIntColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetIntColumnReader.java
index 1ac1a2bc3aa2..6b3e64163c52 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetIntColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetIntColumnReader.java
@@ -15,14 +15,15 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
public class ParquetIntColumnReader
extends ParquetColumnReader
{
- public ParquetIntColumnReader(ColumnDescriptor descriptor)
+ public ParquetIntColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongColumnReader.java
index 441b7d87ecf2..1f8eb5718c9a 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongColumnReader.java
@@ -15,14 +15,15 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
public class ParquetLongColumnReader
extends ParquetColumnReader
{
- public ParquetLongColumnReader(ColumnDescriptor descriptor)
+ public ParquetLongColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongDecimalColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongDecimalColumnReader.java
index 611625894a11..1bd6e4087623 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongDecimalColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetLongDecimalColumnReader.java
@@ -16,6 +16,7 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Decimals;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import parquet.io.api.Binary;
@@ -24,9 +25,9 @@
public class ParquetLongDecimalColumnReader
extends ParquetColumnReader
{
- ParquetLongDecimalColumnReader(ColumnDescriptor descriptor)
+ ParquetLongDecimalColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetPageReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetPageReader.java
index 4302c3be22fe..1202fda6298a 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetPageReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetPageReader.java
@@ -17,6 +17,7 @@
import com.facebook.presto.hive.parquet.ParquetDataPageV1;
import com.facebook.presto.hive.parquet.ParquetDataPageV2;
import com.facebook.presto.hive.parquet.ParquetDictionaryPage;
+import org.apache.hadoop.conf.Configuration;
import parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
@@ -28,6 +29,7 @@
class ParquetPageReader
{
+ private Configuration configuration;
private final CompressionCodecName codec;
private final long valueCount;
private final List compressedPages;
@@ -47,6 +49,11 @@ public ParquetPageReader(CompressionCodecName codec,
this.valueCount = count;
}
+ public void setConfiguration(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
public long getTotalValueCount()
{
return valueCount;
@@ -62,7 +69,7 @@ public ParquetDataPage readPage()
if (compressedPage instanceof ParquetDataPageV1) {
ParquetDataPageV1 dataPageV1 = (ParquetDataPageV1) compressedPage;
return new ParquetDataPageV1(
- decompress(codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize()),
+ decompress(codec, dataPageV1.getSlice(), dataPageV1.getUncompressedSize(), configuration),
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
dataPageV1.getStatistics(),
@@ -85,7 +92,7 @@ public ParquetDataPage readPage()
dataPageV2.getRepetitionLevels(),
dataPageV2.getDefinitionLevels(),
dataPageV2.getDataEncoding(),
- decompress(codec, dataPageV2.getSlice(), uncompressedSize),
+ decompress(codec, dataPageV2.getSlice(), uncompressedSize, configuration),
dataPageV2.getUncompressedSize(),
dataPageV2.getStatistics(),
false);
@@ -103,7 +110,7 @@ public ParquetDictionaryPage readDictionaryPage()
}
try {
return new ParquetDictionaryPage(
- decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()),
+ decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize(), configuration),
compressedDictionaryPage.getDictionarySize(),
compressedDictionaryPage.getEncoding());
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetReader.java
index 6e3ff5ed93d1..a96c2df149c8 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetReader.java
@@ -29,6 +29,7 @@
import com.facebook.presto.spi.type.TypeSignatureParameter;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -64,6 +65,7 @@ public class ParquetReader
private static final String ARRAY_TYPE_NAME = "bag";
private static final String ARRAY_ELEMENT_NAME = "array_element";
+ private final Configuration configuration;
private final MessageType fileSchema;
private final MessageType requestedSchema;
private final List blocks;
@@ -86,7 +88,8 @@ public ParquetReader(MessageType fileSchema,
List blocks,
ParquetDataSource dataSource,
TypeManager typeManager,
- AggregatedMemoryContext systemMemoryContext)
+ AggregatedMemoryContext systemMemoryContext,
+ Configuration configuration)
{
this.fileSchema = fileSchema;
this.requestedSchema = requestedSchema;
@@ -95,6 +98,7 @@ public ParquetReader(MessageType fileSchema,
this.typeManager = typeManager;
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
this.currentRowGroupMemoryContext = systemMemoryContext.newAggregatedMemoryContext();
+ this.configuration = configuration;
initializeColumnReaders();
}
@@ -288,7 +292,7 @@ private void initializeColumnReaders()
for (PrimitiveColumnIO columnIO : getColumns(fileSchema, requestedSchema)) {
ColumnDescriptor descriptor = columnIO.getColumnDescriptor();
RichColumnDescriptor column = new RichColumnDescriptor(descriptor.getPath(), columnIO.getType().asPrimitiveType(), descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel());
- columnReadersMap.put(column, ParquetColumnReader.createReader(column));
+ columnReadersMap.put(column, ParquetColumnReader.createReader(column, configuration));
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetShortDecimalColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetShortDecimalColumnReader.java
index beef2337a121..e97f4c871b44 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetShortDecimalColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetShortDecimalColumnReader.java
@@ -15,6 +15,7 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import static com.facebook.presto.hive.util.DecimalUtils.getShortDecimalValue;
@@ -24,9 +25,9 @@
public class ParquetShortDecimalColumnReader
extends ParquetColumnReader
{
- ParquetShortDecimalColumnReader(ColumnDescriptor descriptor)
+ ParquetShortDecimalColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetTimestampColumnReader.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetTimestampColumnReader.java
index 88fd27c7e0b5..09f4725bd8ea 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetTimestampColumnReader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/reader/ParquetTimestampColumnReader.java
@@ -15,6 +15,7 @@
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
+import org.apache.hadoop.conf.Configuration;
import parquet.column.ColumnDescriptor;
import parquet.io.api.Binary;
@@ -23,9 +24,9 @@
public class ParquetTimestampColumnReader
extends ParquetColumnReader
{
- public ParquetTimestampColumnReader(ColumnDescriptor descriptor)
+ public ParquetTimestampColumnReader(ColumnDescriptor descriptor, Configuration configuration)
{
- super(descriptor);
+ super(descriptor, configuration);
}
@Override
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
index 29b5678927b1..d3a80f445ced 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
@@ -35,7 +35,6 @@
import java.util.Properties;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
-import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static java.util.Objects.requireNonNull;
public class HiveFileIterator
@@ -99,7 +98,8 @@ protected LocatedFileStatus computeNext()
return endOfData();
}
catch (FileNotFoundException e) {
- throw new PrestoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path);
+ // We are okay if the path does not exist.
+ return endOfData();
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, e);
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java
new file mode 100644
index 000000000000..b8c1b7dc9a60
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.HiveCluster;
+import com.facebook.presto.hive.StaticHiveCluster;
+import com.facebook.presto.hive.StaticMetastoreConfig;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class MetastoreStaticClusterModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(StaticMetastoreConfig.class);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java
new file mode 100644
index 000000000000..775a5afaf4c8
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.HiveCluster;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class MetastoreZkDiscoveryBasedModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(HiveCluster.class).to(ZookeeperServersetHiveCluster.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(ZookeeperServersetMetastoreConfig.class);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java
new file mode 100644
index 000000000000..8bbb11972338
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.ThriftHiveMetastoreClient;
+import com.facebook.presto.hive.authentication.HiveMetastoreAuthentication;
+import com.facebook.presto.hive.metastore.HiveMetastoreClient;
+import com.facebook.presto.twitter.hive.util.PooledTTransportFactory;
+import com.facebook.presto.twitter.hive.util.TTransportPool;
+import com.google.common.net.HostAndPort;
+import io.airlift.units.Duration;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class PooledHiveMetastoreClientFactory
+{
+ private final HostAndPort socksProxy;
+ private final int timeoutMillis;
+ private final HiveMetastoreAuthentication metastoreAuthentication;
+ private final TTransportPool transportPool;
+
+ public PooledHiveMetastoreClientFactory(@Nullable HostAndPort socksProxy,
+ Duration timeout,
+ HiveMetastoreAuthentication metastoreAuthentication,
+ int maxTransport,
+ long idleTimeout,
+ long transportEvictInterval,
+ int evictNumTests)
+ {
+ this.socksProxy = socksProxy;
+ this.timeoutMillis = toIntExact(timeout.toMillis());
+ this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null");
+ GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
+ poolConfig.setMaxIdle(maxTransport);
+ poolConfig.setMaxTotal(maxTransport);
+ poolConfig.setMinEvictableIdleTimeMillis(idleTimeout);
+ poolConfig.setTimeBetweenEvictionRunsMillis(transportEvictInterval);
+ poolConfig.setNumTestsPerEvictionRun(evictNumTests);
+ this.transportPool = new TTransportPool(poolConfig);
+ }
+
+ @Inject
+ public PooledHiveMetastoreClientFactory(HiveClientConfig config,
+ ZookeeperServersetMetastoreConfig zkConfig,
+ HiveMetastoreAuthentication metastoreAuthentication)
+ {
+ this(config.getMetastoreSocksProxy(),
+ config.getMetastoreTimeout(),
+ metastoreAuthentication,
+ zkConfig.getMaxTransport(),
+ zkConfig.getTransportIdleTimeout(),
+ zkConfig.getTransportEvictInterval(),
+ zkConfig.getTransportEvictNumTests());
+ }
+
+ public HiveMetastoreClient create(String host, int port)
+ throws TTransportException
+ {
+ try {
+ TTransport transport = transportPool.borrowObject(host, port);
+ if (transport == null) {
+ transport = transportPool.borrowObject(host, port,
+ new PooledTTransportFactory(transportPool,
+ host, port, socksProxy,
+ timeoutMillis, metastoreAuthentication));
+ }
+ return new ThriftHiveMetastoreClient(transport);
+ }
+ catch (Exception e) {
+ throw new TTransportException(String.format("%s: %s", host, e.getMessage()), e.getCause());
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java
new file mode 100644
index 000000000000..f340abf47334
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.google.common.net.HostAndPort;
+
+import io.airlift.log.Logger;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+public class ZookeeperMetastoreMonitor
+ implements PathChildrenCacheListener
+{
+ public static final Logger log = Logger.get(ZookeeperMetastoreMonitor.class);
+ private CuratorFramework client;
+ private PathChildrenCache cache;
+ private ConcurrentMap servers; // (Node_Name->HostAndPort)
+
+ public ZookeeperMetastoreMonitor(String zkServer, String watchPath, int maxRetries, int retrySleepTime)
+ throws Exception
+ {
+ client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(retrySleepTime, maxRetries));
+ client.start();
+
+ cache = new PathChildrenCache(client, watchPath, true); // true indicating cache node contents in addition to the stat
+ try {
+ cache.start();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Curator PathCache Creation failed: " + ex.getMessage());
+ }
+
+ cache.getListenable().addListener(this);
+ servers = new ConcurrentHashMap<>();
+ }
+
+ public void close()
+ {
+ client.close();
+
+ try {
+ cache.close();
+ }
+ catch (IOException ex) {
+ // do nothing
+ }
+ }
+
+ public List getServers()
+ {
+ return servers.values().stream().collect(Collectors.toList());
+ }
+
+ private HostAndPort deserialize(byte[] bytes)
+ {
+ String serviceEndpoint = "serviceEndpoint";
+ JSONObject data = (JSONObject) JSONValue.parse(new String(bytes));
+ if (data != null && data.containsKey(serviceEndpoint)) {
+ Map hostPortMap = (Map) data.get(serviceEndpoint);
+ String host = hostPortMap.get("host").toString();
+ int port = Integer.parseInt(hostPortMap.get("port").toString());
+ return HostAndPort.fromParts(host, port);
+ }
+ else {
+ log.warn("failed to deserialize child node data");
+ throw new IllegalArgumentException("No host:port found");
+ }
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED: {
+ HostAndPort hostPort = deserialize(event.getData().getData());
+ String node = ZKPaths.getNodeFromPath(event.getData().getPath());
+ log.info("child updated: " + node + ": " + hostPort);
+ servers.put(node, hostPort);
+ break;
+ }
+
+ case CHILD_REMOVED: {
+ String node = ZKPaths.getNodeFromPath(event.getData().getPath());
+ log.info("child removed: " + node);
+ servers.remove(node);
+ break;
+ }
+
+ default:
+ log.info("connection state changed: " + event.getType());
+ break;
+ }
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java
new file mode 100644
index 000000000000..c4f803311b80
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import com.facebook.presto.hive.HiveCluster;
+import com.facebook.presto.hive.metastore.HiveMetastoreClient;
+import com.google.common.net.HostAndPort;
+import io.airlift.log.Logger;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ZookeeperServersetHiveCluster
+ implements HiveCluster
+{
+ private static final Logger log = Logger.get(ZookeeperServersetHiveCluster.class);
+ private final PooledHiveMetastoreClientFactory clientFactory;
+ private ZookeeperMetastoreMonitor zkMetastoreMonitor;
+
+ @Inject
+ public ZookeeperServersetHiveCluster(ZookeeperServersetMetastoreConfig config, PooledHiveMetastoreClientFactory clientFactory)
+ throws Exception
+ {
+ String zkServerHostAndPort = requireNonNull(config.getZookeeperServerHostAndPort(), "zkServerHostAndPort is null");
+ String zkMetastorePath = requireNonNull(config.getZookeeperMetastorePath(), "zkMetastorePath is null");
+ int zkRetries = requireNonNull(config.getZookeeperMaxRetries(), "zkMaxRetried is null");
+ int zkRetrySleepTime = requireNonNull(config.getZookeeperRetrySleepTime(), "zkRetrySleepTime is null");
+ this.clientFactory = requireNonNull(clientFactory, "clientFactory is null");
+ this.zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServerHostAndPort, zkMetastorePath, zkRetries, zkRetrySleepTime);
+ }
+
+ @Override
+ public HiveMetastoreClient createMetastoreClient()
+ {
+ List metastores = zkMetastoreMonitor.getServers();
+ Collections.shuffle(metastores);
+ TTransportException lastException = null;
+ for (HostAndPort metastore : metastores) {
+ try {
+ log.info("Connecting to metastore at: %s", metastore.toString());
+ return clientFactory.create(metastore.getHostText(), metastore.getPort());
+ }
+ catch (TTransportException e) {
+ log.debug("Failed connecting to Hive metastore at: %s", metastore.toString());
+ lastException = e;
+ }
+ }
+
+ throw new RuntimeException("Failed connecting to Hive metastore.", lastException);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java
new file mode 100644
index 000000000000..65b424b6c437
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+public class ZookeeperServersetMetastoreConfig
+{
+ private String zookeeperServerHostAndPort;
+ private String zookeeperMetastorePath;
+ private int zookeeperRetrySleepTime = 500; // ms
+ private int zookeeperMaxRetries = 3;
+ private int maxTransport = 128;
+ private long transportIdleTimeout = 300_000L;
+ private long transportEvictInterval = 10_000L;
+ private int transportEvictNumTests = 3;
+
+ public String getZookeeperServerHostAndPort()
+ {
+ return zookeeperServerHostAndPort;
+ }
+
+ @Config("hive.metastore.zookeeper.uri")
+ @ConfigDescription("Zookeeper Host and Port")
+ public ZookeeperServersetMetastoreConfig setZookeeperServerHostAndPort(String zookeeperServerHostAndPort)
+ {
+ this.zookeeperServerHostAndPort = zookeeperServerHostAndPort;
+ return this;
+ }
+
+ public String getZookeeperMetastorePath()
+ {
+ return zookeeperMetastorePath;
+ }
+
+ @Config("hive.metastore.zookeeper.path")
+ @ConfigDescription("Hive metastore Zookeeper path")
+ public ZookeeperServersetMetastoreConfig setZookeeperMetastorePath(String zkPath)
+ {
+ this.zookeeperMetastorePath = zkPath;
+ return this;
+ }
+
+ @NotNull
+ public int getZookeeperRetrySleepTime()
+ {
+ return zookeeperRetrySleepTime;
+ }
+
+ @Config("hive.metastore.zookeeper.retry.sleeptime")
+ @ConfigDescription("Zookeeper sleep time between reties")
+ public ZookeeperServersetMetastoreConfig setZookeeperRetrySleepTime(int zookeeperRetrySleepTime)
+ {
+ this.zookeeperRetrySleepTime = zookeeperRetrySleepTime;
+ return this;
+ }
+
+ @Min(1)
+ public int getZookeeperMaxRetries()
+ {
+ return zookeeperMaxRetries;
+ }
+
+ @Config("hive.metastore.zookeeper.max.retries")
+ @ConfigDescription("Zookeeper max reties")
+ public ZookeeperServersetMetastoreConfig setZookeeperMaxRetries(int zookeeperMaxRetries)
+ {
+ this.zookeeperMaxRetries = zookeeperMaxRetries;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxTransport()
+ {
+ return maxTransport;
+ }
+
+ @Config("hive.metastore.max-transport-num")
+ public ZookeeperServersetMetastoreConfig setMaxTransport(int maxTransport)
+ {
+ this.maxTransport = maxTransport;
+ return this;
+ }
+
+ public long getTransportIdleTimeout()
+ {
+ return transportIdleTimeout;
+ }
+
+ @Config("hive.metastore.transport-idle-timeout")
+ public ZookeeperServersetMetastoreConfig setTransportIdleTimeout(long transportIdleTimeout)
+ {
+ this.transportIdleTimeout = transportIdleTimeout;
+ return this;
+ }
+
+ public long getTransportEvictInterval()
+ {
+ return transportEvictInterval;
+ }
+
+ @Config("hive.metastore.transport-eviction-interval")
+ public ZookeeperServersetMetastoreConfig setTransportEvictInterval(long transportEvictInterval)
+ {
+ this.transportEvictInterval = transportEvictInterval;
+ return this;
+ }
+
+ @Min(0)
+ public int getTransportEvictNumTests()
+ {
+ return transportEvictNumTests;
+ }
+
+ @Config("hive.metastore.transport-eviction-num-tests")
+ public ZookeeperServersetMetastoreConfig setTransportEvictNumTests(int transportEvictNumTests)
+ {
+ this.transportEvictNumTests = transportEvictNumTests;
+ return this;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java
new file mode 100644
index 000000000000..077a409fd2a2
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolver.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Resolve the translation of continuous hive ids to discontinuous thrift ids by using a json property.
+ * Example:
+ * We have the thrift definition:
+ *
+ * struct Name {
+ * 1: string first,
+ * 2: string last
+ * }
+ * struct Person {
+ * 1: Name name,
+ * 3: String phone
+ * }
+ *
+ * Hive table for Person:
+ *
+ * +---------+-------------+----------------------------------+-----------------+
+ * | hive id | column name | type | thrift field id |
+ * +---------+-------------+----------------------------------+-----------------+
+ * | 0 | name | struct | 1 |
+ * +---------+-------------+----------------------------------+-----------------+
+ * | 1 | phone | string | 3 |
+ * +---------+-------------+----------------------------------+-----------------+
+ *
+ * The corresponding id mapping object is:
+ *
+ * x = {
+ * '0': {
+ * '0': 1,
+ * '1': 2,
+ * },
+ * '1': 3
+ * }
+ *
+ * The json property is:
+ *
+ * {"0":{"0":1,"1":2},"1":3}
+ */
+public class HiveThriftFieldIdResolver
+ implements ThriftFieldIdResolver
+{
+ private final JsonNode root;
+ private final Map nestedResolvers = new HashMap<>();
+ private final Map thriftIds = new HashMap<>();
+
+ public HiveThriftFieldIdResolver(JsonNode root)
+ {
+ this.root = root;
+ }
+
+ @Override
+ public short getThriftId(int hiveIndex)
+ {
+ if (root == null) {
+ return (short) (hiveIndex + 1);
+ }
+
+ Short thriftId = thriftIds.get(hiveIndex);
+ if (thriftId != null) {
+ return thriftId;
+ }
+ else {
+ JsonNode child = root.get(String.valueOf(hiveIndex));
+ checkCondition(child != null, HIVE_INVALID_METADATA, "Missed json value for hiveIndex: %s, root: %s", hiveIndex, root);
+ if (child.isNumber()) {
+ thriftId = (short) child.asInt();
+ }
+ else {
+ checkCondition(child.get("id") != null, HIVE_INVALID_METADATA, "Missed id for hiveIndex: %s, root: %s", hiveIndex, root);
+ thriftId = (short) child.get("id").asInt();
+ }
+ thriftIds.put(hiveIndex, thriftId);
+ return thriftId;
+ }
+ }
+
+ @Override
+ public ThriftFieldIdResolver getNestedResolver(int hiveIndex)
+ {
+ if (root == null) {
+ return this;
+ }
+
+ ThriftFieldIdResolver nestedResolver = nestedResolvers.get(hiveIndex);
+ if (nestedResolver != null) {
+ return nestedResolver;
+ }
+ else {
+ JsonNode child = root.get(String.valueOf(hiveIndex));
+ checkCondition(child != null, HIVE_INVALID_METADATA, "Missed json value for hiveIndex: %s, root: %s", hiveIndex, root);
+ nestedResolver = new HiveThriftFieldIdResolver(child);
+ nestedResolvers.put(hiveIndex, nestedResolver);
+ return nestedResolver;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("root", root)
+ .add("nestedResolvers", nestedResolvers)
+ .add("thriftIds", thriftIds)
+ .toString();
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java
new file mode 100644
index 000000000000..c1c736a9f453
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/HiveThriftFieldIdResolverFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.airlift.log.Logger;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HiveThriftFieldIdResolverFactory
+ implements ThriftFieldIdResolverFactory
+{
+ private static final Logger log = Logger.get(HiveThriftFieldIdResolverFactory.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ public static final String THRIFT_FIELD_ID_JSON = "thrift.field.id.json";
+ // The default resolver which returns thrift id as hive id plus one
+ public static final ThriftFieldIdResolver HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER = new HiveThriftFieldIdResolver(null);
+
+ public ThriftFieldIdResolver createResolver(Properties schema)
+ {
+ String jsonData = schema.getProperty(THRIFT_FIELD_ID_JSON);
+ if (jsonData == null) {
+ return HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER;
+ }
+
+ try {
+ JsonNode root = objectMapper.readTree(jsonData);
+ return new HiveThriftFieldIdResolver(root);
+ }
+ catch (IOException e) {
+ log.debug(e, "Failed to create an optimized thrift id resolver, json string: %s, schema: %s. Will use a default resolver.", jsonData, schema);
+ }
+
+ return HIVE_THRIFT_FIELD_ID_DEFAULT_RESOLVER;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java
new file mode 100644
index 000000000000..83047e1d953f
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolver.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+public interface ThriftFieldIdResolver
+{
+ ThriftFieldIdResolver getNestedResolver(int hiveIndex);
+ short getThriftId(int hiveIndex);
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java
new file mode 100644
index 000000000000..034308aaa569
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftFieldIdResolverFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import java.util.Properties;
+
+public interface ThriftFieldIdResolverFactory
+{
+ ThriftFieldIdResolver createResolver(Properties schema);
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java
new file mode 100644
index 000000000000..001994f7a7e4
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralDeserializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+
+import java.util.Properties;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static org.apache.hadoop.hive.serde.Constants.SERIALIZATION_CLASS;
+
+public class ThriftGeneralDeserializer
+{
+ private static final String REQUIRED_SERIALIZATION_CLASS = ThriftGenericRow.class.getName();
+ public ThriftGeneralDeserializer(Configuration conf, Properties properties)
+ {
+ String thriftClassName = properties.getProperty(SERIALIZATION_CLASS, null);
+ checkCondition(thriftClassName != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive deserializer property: %s", SERIALIZATION_CLASS);
+ checkCondition(thriftClassName.equals(REQUIRED_SERIALIZATION_CLASS), HIVE_INVALID_METADATA, SERIALIZATION_CLASS + thriftClassName + " cannot match " + REQUIRED_SERIALIZATION_CLASS);
+ }
+
+ public ThriftGenericRow deserialize(Writable writable, short[] thriftIds)
+ {
+ checkCondition(writable instanceof ThriftWritable, HIVE_UNKNOWN_ERROR, "Not an instance of ThriftWritable: " + writable);
+ ThriftGenericRow row = (ThriftGenericRow) ((ThriftWritable) writable).get();
+ try {
+ row.parse(thriftIds);
+ }
+ catch (TException e) {
+ throw new IllegalStateException("ThriftGenericRow failed to parse values", e);
+ }
+ return row;
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java
new file mode 100644
index 000000000000..fea6a8d76875
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGeneralInputFormat.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import com.facebook.presto.spi.PrestoException;
+import com.twitter.elephantbird.mapred.input.DeprecatedFileInputFormatWrapper;
+import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
+import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
+import com.twitter.elephantbird.util.TypeRef;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static com.facebook.presto.hive.HiveUtil.checkCondition;
+import static java.lang.String.format;
+import static org.apache.hadoop.hive.serde.Constants.SERIALIZATION_CLASS;
+
+/**
+ * Mirror of com.twitter.elephantbird.mapred.input.HiveMultiInputFormat allows to pass the thriftClassName
+ * directly as a property of JobConfig.
+ * PR for twitter/elephant-bird: https://github.com/twitter/elephant-bird/pull/481
+ * Remove the class once #481 is included in a release
+ */
+@SuppressWarnings("deprecation")
+public class ThriftGeneralInputFormat
+ extends DeprecatedFileInputFormatWrapper
+{
+ public ThriftGeneralInputFormat()
+ {
+ super(new MultiInputFormat());
+ }
+
+ private void initialize(FileSplit split, JobConf job) throws IOException
+ {
+ String thriftClassName = job.get(SERIALIZATION_CLASS);
+ checkCondition(thriftClassName != null, HIVE_INVALID_METADATA, "Table or partition is missing Hive deserializer property: %s", SERIALIZATION_CLASS);
+
+ try {
+ Class thriftClass = job.getClassByName(thriftClassName);
+ setInputFormatInstance(new MultiInputFormat(new TypeRef(thriftClass) {}));
+ }
+ catch (ClassNotFoundException e) {
+ throw new PrestoException(HIVE_INVALID_METADATA, format("Failed getting class for %s", thriftClassName));
+ }
+ }
+
+ @Override
+ public RecordReader getRecordReader(
+ InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException
+ {
+ initialize((FileSplit) split, job);
+ return super.getRecordReader(split, job, reporter);
+ }
+}
diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java
new file mode 100644
index 000000000000..c319bba2d375
--- /dev/null
+++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/thrift/ThriftGenericRow.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.twitter.hive.thrift;
+
+import io.airlift.log.Logger;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolUtil;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TType;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ThriftGenericRow
+ implements TBase
+{
+ private static final Logger log = Logger.get(ThriftGenericRow.class);
+ private final Map values = new HashMap<>();
+ private byte[] buf = null;
+ private int off = 0;
+ private int len = 0;
+
+ public ThriftGenericRow()
+ {
+ }
+
+ public ThriftGenericRow(Map values)
+ {
+ this.values.putAll(values);
+ }
+
+ public class Fields
+ implements TFieldIdEnum
+ {
+ private final short thriftId;
+ private final String fieldName;
+
+ Fields(short thriftId, String fieldName)
+ {
+ this.thriftId = thriftId;
+ this.fieldName = fieldName;
+ }
+
+ public short getThriftFieldId()
+ {
+ return thriftId;
+ }
+
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+ }
+
+ public void read(TProtocol iprot)
+ throws TException
+ {
+ TTransport trans = iprot.getTransport();
+ buf = trans.getBuffer();
+ off = trans.getBufferPosition();
+ TProtocolUtil.skip(iprot, TType.STRUCT);
+ len = trans.getBufferPosition() - off;
+ }
+
+ public void parse()
+ throws TException
+ {
+ parse(null);
+ }
+
+ public void parse(short[] thriftIds)
+ throws TException
+ {
+ Set idSet = thriftIds == null ? null : new HashSet(Arrays.asList(ArrayUtils.toObject(thriftIds)));
+ TMemoryInputTransport trans = new TMemoryInputTransport(buf, off, len);
+ TBinaryProtocol iprot = new TBinaryProtocol(trans);
+ TField field;
+ iprot.readStructBegin();
+ while (true) {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ if (idSet != null && !idSet.remove(Short.valueOf(field.id))) {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ else {
+ values.put(field.id, readElem(iprot, field.type));
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ }
+
+ private Object readElem(TProtocol iprot, byte type)
+ throws TException
+ {
+ switch (type) {
+ case TType.BOOL:
+ return iprot.readBool();
+ case TType.BYTE:
+ return iprot.readByte();
+ case TType.I16:
+ return iprot.readI16();
+ case TType.ENUM:
+ case TType.I32:
+ return iprot.readI32();
+ case TType.I64:
+ return iprot.readI64();
+ case TType.DOUBLE:
+ return iprot.readDouble();
+ case TType.STRING:
+ return iprot.readString();
+ case TType.STRUCT:
+ return readStruct(iprot);
+ case TType.LIST:
+ return readList(iprot);
+ case TType.SET:
+ return readSet(iprot);
+ case TType.MAP:
+ return readMap(iprot);
+ default:
+ TProtocolUtil.skip(iprot, type);
+ return null;
+ }
+ }
+
+ private Object readStruct(TProtocol iprot)
+ throws TException
+ {
+ ThriftGenericRow elem = new ThriftGenericRow();
+ elem.read(iprot);
+ elem.parse();
+ return elem;
+ }
+
+ private Object readList(TProtocol iprot)
+ throws TException
+ {
+ TList ilist = iprot.readListBegin();
+ List