aboutsummaryrefslogtreecommitdiff
path: root/contrib/native
diff options
context:
space:
mode:
authorParth Chandra <parthc@apache.org>2017-07-24 12:55:02 -0700
committerParth Chandra <parthc@apache.org>2017-10-11 19:27:48 -0700
commitf246c3cad7f44baeb8153913052ebc963c62276a (patch)
treee9a9457ae955876e4805f14ed0bc7db1dc253bc4 /contrib/native
parentfacbb92ba319373dd8b8baa171ac1d7978c926c5 (diff)
DRILL-5431: SSL Support (C++) - Update DrillClientImpl to use Channel implementation
Also remove ChannelContextFactory and merge it into ChannelFactory
Diffstat (limited to 'contrib/native')
-rw-r--r--contrib/native/client/CMakeLists.txt3
-rw-r--r--contrib/native/client/example/querySubmitter.cpp33
-rw-r--r--contrib/native/client/readme.linux13
-rw-r--r--contrib/native/client/readme.macos33
-rw-r--r--contrib/native/client/readme.sasl93
-rw-r--r--contrib/native/client/readme.ssl4
-rw-r--r--contrib/native/client/readme.win.txt14
-rw-r--r--contrib/native/client/src/clientlib/CMakeLists.txt2
-rw-r--r--contrib/native/client/src/clientlib/channel.cpp146
-rw-r--r--contrib/native/client/src/clientlib/channel.hpp58
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp270
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp38
-rw-r--r--contrib/native/client/src/clientlib/drillConfig.cpp2
-rw-r--r--contrib/native/client/src/include/drill/common.hpp7
-rw-r--r--contrib/native/client/src/include/drill/drillConfig.hpp22
-rw-r--r--contrib/native/client/src/include/drill/userProperties.hpp9
-rw-r--r--contrib/native/client/test/ssl/testSSL.cpp7
17 files changed, 428 insertions, 326 deletions
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index 0c104abc0..3f6c44b60 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -126,6 +126,9 @@ endif()
find_package(Protobuf REQUIRED )
include_directories(${PROTOBUF_INCLUDE_DIR})
+if (MSVC)
+ set(OPENSSL_USE_STATIC_LIBS TRUE)
+endif()
#Find SSL
find_package(OpenSSL REQUIRED )
if(OPENSSL_FOUND)
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 47e55de2d..43b909d26 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -25,7 +24,7 @@
#include <boost/algorithm/string/join.hpp>
#include "drill/drillc.hpp"
-int nOptions=19;
+int nOptions=25;
struct Option{
char name[32];
@@ -50,7 +49,14 @@ struct Option{
{"service_host", "Service host for Kerberos", false},
{"service_name", "Service name for Kerberos", false},
{"auth", "Authentication mechanism to use", false},
- {"sasl_encrypt", "Negotiate for encrypted connection", false}
+ {"sasl_encrypt", "Negotiate for encrypted connection", false},
+ {"enableSSL", "Enable SSL", false},
+ {"TLSProtocol", "TLS protocol version", false},
+ {"certFilePath", "Path to SSL certificate file", false},
+ {"disableHostnameVerification", "disable host name verification", false},
+ {"disableCertVerification", "disable certificate verification", false},
+ {"useSystemTrustStore", "[Windows only]. Use the system truststore.", false }
+
};
std::map<std::string, std::string> qsOptionValues;
@@ -304,6 +310,12 @@ int main(int argc, char* argv[]) {
std::string serviceHost=qsOptionValues["service_host"];
std::string serviceName=qsOptionValues["service_name"];
std::string auth=qsOptionValues["auth"];
+ std::string enableSSL=qsOptionValues["enableSSL"];
+ std::string tlsProtocol=qsOptionValues["TLSProtocol"];
+ std::string certFilePath=qsOptionValues["certFilePath"];
+ std::string disableHostnameVerification=qsOptionValues["disableHostnameVerification"];
+ std::string disableCertVerification=qsOptionValues["disableCertVerification"];
+ std::string useSystemTrustStore = qsOptionValues["useSystemTrustStore"];
Drill::QueryType type;
@@ -392,6 +404,20 @@ int main(int argc, char* argv[]) {
if(auth.length()>0){
props.setProperty(USERPROP_AUTH_MECHANISM, auth);
}
+ if(enableSSL.length()>0){
+ props.setProperty(USERPROP_USESSL, enableSSL);
+ if (enableSSL == "true" && certFilePath.length() <= 0 && useSystemTrustStore.length() <= 0){
+ std::cerr<< "SSL is enabled but no certificate or truststore provided. " << std::endl;
+ return -1;
+ }
+ props.setProperty(USERPROP_TLSPROTOCOL, tlsProtocol);
+ props.setProperty(USERPROP_CERTFILEPATH, certFilePath);
+ props.setProperty(USERPROP_DISABLE_HOSTVERIFICATION, disableHostnameVerification);
+ props.setProperty(USERPROP_DISABLE_CERTVERIFICATION, disableCertVerification);
+ if (useSystemTrustStore.length() > 0){
+ props.setProperty(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
+ }
+ }
if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
@@ -548,3 +574,4 @@ int main(int argc, char* argv[]) {
return 0;
}
+
diff --git a/contrib/native/client/readme.linux b/contrib/native/client/readme.linux
index 4eaeea5de..34c791b6f 100644
--- a/contrib/native/client/readme.linux
+++ b/contrib/native/client/readme.linux
@@ -84,6 +84,19 @@ OR
ln -svf libboost_filesystem.a libboost_filesystem-mt.a
ln -svf libboost_date_time.a libboost_date_time-mt.a
+5) Install or build Cyrus SASL
+ To Install
+ yum install cyrus-sasl-devel cyrus-sasl-gssapi
+ libs are installed in /usr/lib64/sasl2
+ includes are installed in /usr/include
+
+ To build your own
+ See readme.sasl for instructions
+
+6) Install OpenSSL
+ yum install openssl-devel openssl
+
+
(Optional) Refresh protobuf source files
----------------------------------------
When changes have been introduced to the protocol module, you might need to refresh the protobuf C++ source files too.
diff --git a/contrib/native/client/readme.macos b/contrib/native/client/readme.macos
index eee017ef5..e9be71267 100644
--- a/contrib/native/client/readme.macos
+++ b/contrib/native/client/readme.macos
@@ -44,12 +44,23 @@ Install Prerequisites
2.2) Install zookeeper
$> brew install zookeeper
-2.3) Install boost
+2.3) Install or build Cyrus SASL
+ To Install (need macports, brew did not work for me)
+ port install cyrus-sasl2
+
+ To build your own
+ See readme.sasl for instructions
+
+2.4) Install OpenSSL
+ [NOTE: MacOS has a neutered version of openssl installed. Install the build using brew. Make sure the brew installation is picked up]
+ brew install openssl
+
+2.5) Install boost
$> brew install boost
-2.3.1) For production builds, see the readme.boost file
+2.5.1) For production builds, see the readme.boost file
-2.3.1.1 Build using XCODE
+2.6) Build using XCODE
=========================
(Optional) Refresh protobuf source files
----------------------------------------
@@ -66,7 +77,7 @@ Build drill client
-------------------
$> cd DRILL_DIR/contrib/native/client
$> mkdir build
- $> cd build && cmake -G "Xcode" -D CMAKE_BUILD_TYPE=Debug ..
+ $> cd build && cmake -G "Xcode" -DOPENSSL_ROOT_DIR="/usr/local/opt/openssl "-D CMAKE_BUILD_TYPE=Debug ..
$> xcodebuild -project drillclient.xcodeproj -configuration ${BUILDTYPE} -target ALL_BUILD
@@ -74,7 +85,7 @@ XCode IDE
---------
You can open the drillclient.xcodeproj file in the XCode ide and run/debug as with any other command line app
-2.3.1.2 Build using MAKE
+2.7) Build using MAKE
========================
(Optional) Refresh protobuf source files
----------------------------------------
@@ -90,16 +101,22 @@ Build drill client
-------------------
$> cd DRILL_DIR/contrib/native/client
$> mkdir build
- $> cd build && cmake3 -G "Unix Makefiles" -D CMAKE_BUILD_TYPE=Debug ..
+ $> cd build && cmake3 -G "Unix Makefiles" -DOPENSSL_ROOT_DIR="/usr/local/opt/openssl" -D CMAKE_BUILD_TYPE=Debug ..
$> make
+2.10) Build using CLion
+CLion can recognize cmake projects automatically. Check CLion documentation for help on how to use CMake with CLion.
+To prevent CLion's cmake from picking up the system installed OpenSSL set the following define in CLion/Preferences/Build, Execution, Deployment/CMake/CMake Options
+-DOPENSSL_ROOT_DIR="/usr/local/opt/openssl
+Then reload the CMake project making sure to invalidate the CMake cache
+Tools/CMake/Reset Cache and Reload Project
-2.4 Test
+2.9 Test
--------
Run query submitter from the command line
$> querySubmitter query='select * from dfs.`/Users/pchandra/work/data/tpc-h/customer.parquet`' type=sql connectStr=local=10.250.0.146:31010 api=async logLevel=trace user=yourUserName password=yourPassWord
-2.5 Valgrind
+2.10 Valgrind
------------
Install valgrind using brew
$> brew install valgrind
diff --git a/contrib/native/client/readme.sasl b/contrib/native/client/readme.sasl
new file mode 100644
index 000000000..3eab47192
--- /dev/null
+++ b/contrib/native/client/readme.sasl
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+## On Mac OS and Linux
+
+* Download cyrus-sasl tarball or clone git sources.
+
+You can get the latest tarball from [here](ftp://ftp.cyrusimap.org/cyrus-sasl/).
+
+Note: If you download the source tarball, the configure script is already present. If you have cloned the source from git. It will be missing the .configure script and that needs to be generated. The steps to generate it are not clear. You can overcome this issue by just copying the .configure script form the tarball onto the folder containing the git sources.
+
+* Run configure
+
+`./configure --disable-cram --disable-digest --disable-scram --disable-login --disable-otp --disable-passdss --disable-krb4 --disable-srp --without-des CFLAGS="-g"`
+
+* Build the sources and install
+
+ `make`
+
+ `make install`
+
+(On Mac) - The SASL library should now be available in _/usr/local/lib/sasl2_
+
+ -rwxr-xr-x 1 root admin 659 Dec 23 15:24 libanonymous.la
+ -rwxr-xr-x 1 root admin 22276 Dec 23 15:24 libanonymous.plugin
+ -rwxr-xr-x 1 root admin 706 Dec 23 15:24 libgssapiv2.la
+ -rwxr-xr-x 1 root admin 37052 Dec 23 15:24 libgssapiv2.plugin
+ -rwxr-xr-x 1 root admin 643 Dec 23 15:24 libplain.la
+ -rwxr-xr-x 1 root admin 22028 Dec 23 15:24 libplain.plugin
+ -rwxr-xr-x 1 root admin 665 Dec 23 15:24 libsasldb.la
+ -rwxr-xr-x 1 root admin 33916 Dec 23 15:24 libsasldb.plugin
+
+## (On CentOS Linux) -
+
+* You need to download the "krb5-devel" rpm using command below. This will download `gssapi` directory and `gssapi.h` file under `/usr/include`.
+`yum install krb5-devel`
+
+* Copy the `gssapi` directory and `gssapi.h file` into the plugin directory found under cyrus-sasl untar directory.
+Example:
+ * `cp -r /usr/include/gssapi ~/cyrusSasl/cyrus-sasl-2.1.26/plugins`
+ * `cp /usr/include/gssapi.h ~/cyrusSasl/cyrus-sasl-2.1.26/plugins`
+
+* Remove the config cache file if any in cyrus-sasl directory
+ * `rm -f ~/cyrusSasl/cyrus-sasl-2.1.26/config.cache`
+
+* Run configure
+
+`./configure --disable-cram --disable-digest --disable-scram --disable-login --disable-otp --disable-passdss --disable-krb4 --disable-srp --without-des CFLAGS="-g"`
+
+* Build the sources and install
+ * `make clean`
+ * `make`
+ * `make install`
+
+(On CentOS Linux) The SASL library should now be available in _/usr/local/lib/sasl2_
+
+
+ -rwxr-xr-x 1 root root 684 Mar 15 18:12 libanonymous.la
+ -rwxr-xr-x 1 root root 53751 Mar 15 18:12 libanonymous.so.3.0.0
+ -rwxr-xr-x 1 root root 704 Mar 15 18:12 libgs2.la
+ -rwxr-xr-x 1 root root 81808 Mar 15 18:12 libgs2.so.3.0.0
+ -rwxr-xr-x 1 root root 734 Mar 15 18:12 libgssapiv2.la
+ -rwxr-xr-x 1 root root 83549 Mar 15 18:12 libgssapiv2.so.3.0.0
+ -rwxr-xr-x 1 root root 668 Mar 15 18:12 libplain.la
+ -rwxr-xr-x 1 root root 54515 Mar 15 18:12 libplain.so.3.0.0
+ -rwxr-xr-x 1 root root 684 Mar 15 18:12 libsasldb.la
+ -rwxr-xr-x 1 root root 98219 Mar 15 18:12 libsasldb.so.3.0.0
+
+## On Win 64
+
+* To build the base library and the plain plugin
+
+Change the _PLUGINS_ variable in _plugins/NTMakefile_ to include only the PLAIN plugin. Then run `nmake /f NTMakefile CFG=Release`
+
+The build will fail in sasldb but it would have already built the base library and PLAIN.
+
+Use `nmake /f NTMakefile CFG=Debug` to get a debug build.Building Cyrus SASL
+
diff --git a/contrib/native/client/readme.ssl b/contrib/native/client/readme.ssl
index 86d46fc79..8c875cfda 100644
--- a/contrib/native/client/readme.ssl
+++ b/contrib/native/client/readme.ssl
@@ -21,7 +21,9 @@ Installing OpenSSL -
brew install openssl
On Linux :
-Set up the certificate
+Set up the certificate for testing. The files generated by this set of steps are used by the boost example programs.
+These are also the steps used to generate the test certificates used by the Drillbit unit tests.
+
Generate a private key
openssl genrsa -des3 -out drillTestServerKey.pem 1024
diff --git a/contrib/native/client/readme.win.txt b/contrib/native/client/readme.win.txt
index 93910381b..bd84b915f 100644
--- a/contrib/native/client/readme.win.txt
+++ b/contrib/native/client/readme.win.txt
@@ -121,7 +121,6 @@ Windows platforms should be more or less similar.
2.2 Protobuf (2.5.0)
Get protobuf from here: https://protobuf.googlecode.com/files/protobuf-2.5.0.zip
-
a) Protobuf builds static libraries
b) In Visual Studio, open <PROTOBUF_HOME>/vsprojects/protobuf.sln. The IDE may
update the solution file. This should go thru successfully.
@@ -152,6 +151,15 @@ Windows platforms should be more or less similar.
c) InVisual Studio 2010 Express open <CPPUNIT_HOME>/src/CppUnitLibraries2010.sln
i) Build cppunit project
+2.5 Install or build Cyrus SASL
+ To build your own see readme.sasl for instructions
+
+2.6 Install OpenSSL
+ Download from https://slproweb.com/products/Win32OpenSSL.html
+ At the time of writing the compatible version is Win32OpenSSL-1_0_2L
+ OpenSSL is installed into C:\OpenSSL-Win64, If you install DLL's into bin directory, make sure the directory is added to the PATH
+
+
3 Building Drill Clientlib
3.1 SET the following environment variables
set BOOST_LIBRARYDIR=<BOOST_HOME>\BUILD_TYPE
@@ -163,10 +171,10 @@ Windows platforms should be more or less similar.
C:> cd build
a) For the 32 bit build :
- C:> cmake -G "Visual Studio 10" -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D CMAKE_BUILD_TYPE=Debug ..
+ C:> cmake -G "Visual Studio 10" -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D SASL_LIBRARY="<SASL_HOME>\lib64_debug\libsasl.lib" -D SASL_HOME=<SASL_HOME> -D OPENSSL_ROOT=<OPENSSL_HOME> -D CMAKE_BUILD_TYPE=Debug ..
b) For the 64 bit build :
- C:> cmake -G "Visual Studio 10 Win64 " -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D CMAKE_BUILD_TYPE=Debug ..
+ C:> cmake -G "Visual Studio 10 Win64 " -D ZOOKEEPER_HOME=<ZOOKEPER_HOME> -D PROTOBUF_SRC_ROOT_FOLDER=<PROTOBUF_HOME> -D CPPUNIT_HOME=<CPPUNIT_HOME> -D SASL_LIBRARY="<SASL_HOME>\lib64_debug\libsasl.lib" -D SASL_HOME=<SASL_HOME> -D OPENSSL_ROOT=<OPENSSL_HOME> -D CMAKE_BUILD_TYPE=Debug ..
3.3 Open the generated <DRILL_HOME>/contrib/native/client/build/drillclient.sln
file in Visual Studio.
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index 2270c91ee..7b9ecc3c0 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -40,7 +40,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../i
include_directories(${PROTOBUF_INCLUDE_DIR})
include_directories(${Zookeeper_INCLUDE_DIRS})
include_directories(${SASL_INCLUDE_DIRS})
-include_directories("${OPENSSL_INCLUDE_DIR}")
+include_directories(${OPENSSL_INCLUDE_DIR})
link_directories(/usr/local/lib)
diff --git a/contrib/native/client/src/clientlib/channel.cpp b/contrib/native/client/src/clientlib/channel.cpp
index 62ce976b1..84f3eb43f 100644
--- a/contrib/native/client/src/clientlib/channel.cpp
+++ b/contrib/native/client/src/clientlib/channel.cpp
@@ -39,7 +39,7 @@ ConnectionEndpoint::ConnectionEndpoint(const char* connStr){
ConnectionEndpoint::ConnectionEndpoint(const char* host, const char* port){
m_host=host;
m_port=port;
- m_protocol="drillbit"; // direct connection
+ m_protocol=PROTOCOL_TYPE_DIRECT; // direct connection
m_pError=NULL;
}
@@ -61,7 +61,7 @@ connectionStatus_t ConnectionEndpoint::getDrillbitEndpoint(){
DRILL_LOG(LOG_INFO) << "Failed to get endpoint from zk" << std::endl;
return ret;
}
- }else if(!this->isDirectConnection()){
+ }else if(!isDirectConnection()){
return handleError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, this->getProtocol().c_str()));
}
}else{
@@ -77,19 +77,19 @@ void ConnectionEndpoint::parseConnectString(){
boost::cmatch matched;
if(boost::regex_match(m_connectString.c_str(), matched, connStrExpr)){
- m_protocol.assign(matched[1].first, matched[1].second);
+ m_protocol = matched[1].str();
if(isDirectConnection()){
- m_host.assign(matched[4].first, matched[4].second);
- m_port.assign(matched[5].first, matched[5].second);
+ m_host = matched[4].str();
+ m_port = matched[5].str();
}else {
// if the connection is to a zookeeper,
// we will get the host and the port only after connecting to the Zookeeper
m_host = "";
m_port = "";
}
- m_hostPortStr.assign(matched[2].first, matched[2].second);
+ m_hostPortStr = matched[2].str();
if(matched[6].matched) {
- m_pathToDrill.assign(matched[6].first, matched[6].second);
+ m_pathToDrill = matched[6].str();
}
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
<< "Conn str: "<< m_connectString
@@ -106,12 +106,12 @@ void ConnectionEndpoint::parseConnectString(){
bool ConnectionEndpoint::isDirectConnection(){
assert(!m_protocol.empty());
- return (!strcmp(m_protocol.c_str(), "local") || !strcmp(m_protocol.c_str(), "drillbit"));
+ return ( m_protocol == PROTOCOL_TYPE_DIRECT || m_protocol == PROTOCOL_TYPE_DIRECT_2 );
}
bool ConnectionEndpoint::isZookeeperConnection(){
assert(!m_protocol.empty());
- return (!strcmp(m_protocol.c_str(), "zk"));
+ return (m_protocol == PROTOCOL_TYPE_ZK);
}
connectionStatus_t ConnectionEndpoint::getDrillbitEndpointFromZk(){
@@ -148,138 +148,86 @@ connectionStatus_t ConnectionEndpoint::handleError(connectionStatus_t status, st
return status;
}
-/****************************
- * Channel Context Factory
- ****************************/
-ChannelContext* ChannelContextFactory::getChannelContext(channelType_t t, DrillUserProperties* props){
- ChannelContext* pChannelContext=NULL;
- switch(t){
- case CHANNEL_TYPE_SOCKET:
- pChannelContext=new ChannelContext(props);
- break;
-#if defined(IS_SSL_ENABLED)
- case CHANNEL_TYPE_SSLSTREAM: {
-
- std::string protocol;
- props->getProp(USERPROP_TLSPROTOCOL, protocol);
- boost::asio::ssl::context::method tlsVersion = SSLChannelContext::getTlsVersion(protocol);
-
- std::string noVerifyCert;
- props->getProp(USERPROP_DISABLE_CERTVERIFICATION, noVerifyCert);
- boost::asio::ssl::context::verify_mode verifyMode = boost::asio::ssl::context::verify_peer;
- if (noVerifyCert == "true") {
- verifyMode = boost::asio::ssl::context::verify_none;
- }
-
- pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode);
- }
- break;
-#endif
- default:
- DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
- break;
- }
- return pChannelContext;
-}
-
/*******************
* ChannelFactory
* *****************/
-Channel* ChannelFactory::getChannel(channelType_t t, const char* connStr){
+Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr, DrillUserProperties* props){
Channel* pChannel=NULL;
+ ChannelContext_t * pChannelContext = ChannelFactory::getChannelContext(t, props);
switch(t){
case CHANNEL_TYPE_SOCKET:
- pChannel=new SocketChannel(connStr);
+ pChannel=new SocketChannel(ioService, connStr);
break;
#if defined(IS_SSL_ENABLED)
case CHANNEL_TYPE_SSLSTREAM:
- pChannel=new SSLStreamChannel(connStr);
+ pChannel=new SSLStreamChannel(ioService, connStr);
break;
#endif
default:
DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
break;
}
+ pChannel->m_pContext = pChannelContext;
return pChannel;
}
-Channel* ChannelFactory::getChannel(channelType_t t, const char* host, const char* port){
+Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port, DrillUserProperties* props){
Channel* pChannel=NULL;
+ ChannelContext_t * pChannelContext = ChannelFactory::getChannelContext(t, props);
switch(t){
case CHANNEL_TYPE_SOCKET:
- pChannel=new SocketChannel(host, port);
+ pChannel=new SocketChannel(ioService, host, port);
break;
#if defined(IS_SSL_ENABLED)
case CHANNEL_TYPE_SSLSTREAM:
- pChannel=new SSLStreamChannel(host, port);
+ pChannel=new SSLStreamChannel(ioService, host, port);
break;
#endif
default:
DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
break;
}
+ pChannel->m_pContext = pChannelContext;
return pChannel;
}
-Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr){
- Channel* pChannel=NULL;
+ChannelContext* ChannelFactory::getChannelContext(channelType_t t, DrillUserProperties* props){
+ ChannelContext* pChannelContext=NULL;
switch(t){
case CHANNEL_TYPE_SOCKET:
- pChannel=new SocketChannel(ioService, connStr);
+ pChannelContext=new ChannelContext(props);
break;
#if defined(IS_SSL_ENABLED)
- case CHANNEL_TYPE_SSLSTREAM:
- pChannel=new SSLStreamChannel(ioService, connStr);
- break;
-#endif
- default:
- DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
- break;
- }
- return pChannel;
-}
+ case CHANNEL_TYPE_SSLSTREAM: {
-Channel* ChannelFactory::getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port){
- Channel* pChannel=NULL;
- switch(t){
- case CHANNEL_TYPE_SOCKET:
- pChannel=new SocketChannel(ioService, host, port);
- break;
-#if defined(IS_SSL_ENABLED)
- case CHANNEL_TYPE_SSLSTREAM:
- pChannel=new SSLStreamChannel(ioService, host, port);
+ std::string protocol;
+ props->getProp(USERPROP_TLSPROTOCOL, protocol);
+ boost::asio::ssl::context::method tlsVersion = SSLChannelContext::getTlsVersion(protocol);
+
+ std::string noVerifyCert;
+ props->getProp(USERPROP_DISABLE_CERTVERIFICATION, noVerifyCert);
+ boost::asio::ssl::context::verify_mode verifyMode = boost::asio::ssl::context::verify_peer;
+ if (noVerifyCert == "true") {
+ verifyMode = boost::asio::ssl::context::verify_none;
+ }
+
+ pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode);
+ }
break;
#endif
default:
DRILL_LOG(LOG_ERROR) << "Channel type " << t << " is not supported." << std::endl;
break;
}
- return pChannel;
+ return pChannelContext;
}
/*******************
* Channel
* *****************/
-Channel::Channel(const char* connStr) : m_ioService(m_ioServiceFallback){
- m_pEndpoint=new ConnectionEndpoint(connStr);
- m_ownIoService = true;
- m_pSocket=NULL;
- m_state=CHANNEL_UNINITIALIZED;
- m_pError=NULL;
-}
-
-Channel::Channel(const char* host, const char* port) : m_ioService(m_ioServiceFallback){
- m_pEndpoint=new ConnectionEndpoint(host, port);
- m_ownIoService = true;
- m_pSocket=NULL;
- m_state=CHANNEL_UNINITIALIZED;
- m_pError=NULL;
-}
-
Channel::Channel(boost::asio::io_service& ioService, const char* connStr):m_ioService(ioService){
m_pEndpoint=new ConnectionEndpoint(connStr);
- m_ownIoService = false;
m_pSocket=NULL;
m_state=CHANNEL_UNINITIALIZED;
m_pError=NULL;
@@ -287,7 +235,6 @@ Channel::Channel(boost::asio::io_service& ioService, const char* connStr):m_ioSe
Channel::Channel(boost::asio::io_service& ioService, const char* host, const char* port) : m_ioService(ioService){
m_pEndpoint=new ConnectionEndpoint(host, port);
- m_ownIoService = true;
m_pSocket=NULL;
m_state=CHANNEL_UNINITIALIZED;
m_pError=NULL;
@@ -311,10 +258,9 @@ template <typename SettableSocketOption> void Channel::setOption(SettableSocketO
assert(0);
}
-connectionStatus_t Channel::init(ChannelContext_t* pContext){
+connectionStatus_t Channel::init(){
connectionStatus_t ret=CONN_SUCCESS;
this->m_state=CHANNEL_INITIALIZED;
- this->m_pContext = pContext;
return ret;
}
@@ -389,11 +335,11 @@ connectionStatus_t Channel::connectInternal() {
}
-connectionStatus_t SocketChannel::init(ChannelContext_t* pContext){
+connectionStatus_t SocketChannel::init(){
connectionStatus_t ret=CONN_SUCCESS;
m_pSocket=new Socket(m_ioService);
if(m_pSocket!=NULL){
- ret=Channel::init(pContext);
+ ret=Channel::init();
}else{
DRILL_LOG(LOG_ERROR) << "Channel initialization failure. " << std::endl;
handleError(CONN_NOSOCKET, getMessage(ERR_CONN_NOSOCKET));
@@ -403,17 +349,17 @@ connectionStatus_t SocketChannel::init(ChannelContext_t* pContext){
}
#if defined(IS_SSL_ENABLED)
-connectionStatus_t SSLStreamChannel::init(ChannelContext_t* pContext){
+connectionStatus_t SSLStreamChannel::init(){
connectionStatus_t ret=CONN_SUCCESS;
- const DrillUserProperties* props = pContext->getUserProperties();
+ const DrillUserProperties* props = m_pContext->getUserProperties();
std::string useSystemTrustStore;
props->getProp(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
if (useSystemTrustStore != "true"){
std::string certFile;
props->getProp(USERPROP_CERTFILEPATH, certFile);
try{
- ((SSLChannelContext_t*)pContext)->getSslContext().load_verify_file(certFile);
+ ((SSLChannelContext_t*)m_pContext)->getSslContext().load_verify_file(certFile);
}
catch (boost::system::system_error e){
DRILL_LOG(LOG_ERROR) << "Channel initialization failure. Certificate file "
@@ -429,13 +375,13 @@ connectionStatus_t SSLStreamChannel::init(ChannelContext_t* pContext){
props->getProp(USERPROP_DISABLE_HOSTVERIFICATION, disableHostVerification);
if (disableHostVerification != "true") {
std::string hostPortStr = m_pEndpoint->getHost() + ":" + m_pEndpoint->getPort();
- ((SSLChannelContext_t *) pContext)->getSslContext().set_verify_callback(
+ ((SSLChannelContext_t *) m_pContext)->getSslContext().set_verify_callback(
boost::asio::ssl::rfc2818_verification(hostPortStr.c_str()));
}
- m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)pContext)->getSslContext() );
+ m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)m_pContext)->getSslContext() );
if(m_pSocket!=NULL){
- ret=Channel::init(pContext);
+ ret=Channel::init();
}else{
DRILL_LOG(LOG_ERROR) << "Channel initialization failure. " << std::endl;
handleError(CONN_NOSOCKET, getMessage(ERR_CONN_NOSOCKET));
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
index 7f310e899..c7ebfeee6 100644
--- a/contrib/native/client/src/clientlib/channel.hpp
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -36,9 +36,9 @@ class UserProperties;
//parse the connection string and set up the host and port to connect to
connectionStatus_t getDrillbitEndpoint();
- std::string& getProtocol(){return m_protocol;}
- std::string& getHost(){return m_host;}
- std::string& getPort(){return m_port;}
+ const std::string& getProtocol() const {return m_protocol;}
+ const std::string& getHost() const {return m_host;}
+ const std::string& getPort() const {return m_port;}
DrillClientError* getError(){ return m_pError;};
private:
@@ -70,25 +70,21 @@ class UserProperties;
class SSLChannelContext: public ChannelContext{
public:
- static boost::asio::ssl::context::method getTlsVersion(std::string version){
- if(version.empty()){
- return boost::asio::ssl::context::tlsv12;
- } else if (version == "tlsv12") {
+ static boost::asio::ssl::context::method getTlsVersion(const std::string & version){
+ if (version == "tlsv12") {
return boost::asio::ssl::context::tlsv12;
} else if (version == "tlsv11") {
return boost::asio::ssl::context::tlsv11;
- } else if (version == "sslv23") {
- return boost::asio::ssl::context::sslv23;
} else if (version == "tlsv1") {
return boost::asio::ssl::context::tlsv1;
- } else if (version == "sslv3") {
- return boost::asio::ssl::context::sslv3;
} else {
return boost::asio::ssl::context::tlsv12;
}
}
- SSLChannelContext(DrillUserProperties *props, boost::asio::ssl::context::method tlsVersion, boost::asio::ssl::verify_mode verifyMode) :
+ SSLChannelContext(DrillUserProperties *props,
+ boost::asio::ssl::context::method tlsVersion,
+ boost::asio::ssl::verify_mode verifyMode) :
ChannelContext(props),
m_SSLContext(tlsVersion) {
m_SSLContext.set_default_verify_paths();
@@ -108,11 +104,6 @@ class UserProperties;
typedef ChannelContext ChannelContext_t;
typedef SSLChannelContext SSLChannelContext_t;
- class ChannelContextFactory{
- public:
- static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
- };
-
/***
* The Channel class encapsulates a connection to a drillbit. Based on
* the connection string and the options, the connection will be either
@@ -122,13 +113,12 @@ class UserProperties;
* will use to communicate with the server.
***/
class Channel{
+ friend class ChannelFactory;
public:
- Channel(const char* connStr);
- Channel(const char* host, const char* port);
Channel(boost::asio::io_service& ioService, const char* connStr);
Channel(boost::asio::io_service& ioService, const char* host, const char* port);
virtual ~Channel();
- virtual connectionStatus_t init(ChannelContext_t* context)=0;
+ virtual connectionStatus_t init()=0;
connectionStatus_t connect();
bool isConnected(){ return m_state == CHANNEL_CONNECTED;}
template <typename SettableSocketOption> void setOption(SettableSocketOption& option);
@@ -168,7 +158,6 @@ class UserProperties;
ChannelContext_t *m_pContext;
private:
-
typedef enum channelState{
CHANNEL_UNINITIALIZED=1,
CHANNEL_INITIALIZED,
@@ -189,45 +178,42 @@ class UserProperties;
channelState_t m_state;
DrillClientError* m_pError;
- bool m_ownIoService;
};
class SocketChannel: public Channel{
public:
- SocketChannel(const char* connStr):Channel(connStr){
- }
- SocketChannel(const char* host, const char* port):Channel(host, port){
- }
SocketChannel(boost::asio::io_service& ioService, const char* connStr)
:Channel(ioService, connStr){
}
SocketChannel(boost::asio::io_service& ioService, const char* host, const char* port)
:Channel(ioService, host, port){
}
- connectionStatus_t init(ChannelContext_t* context=NULL);
+ connectionStatus_t init();
};
class SSLStreamChannel: public Channel{
public:
- SSLStreamChannel(const char* connStr):Channel(connStr){
- }
- SSLStreamChannel(const char* host, const char* port):Channel(host, port){
- }
SSLStreamChannel(boost::asio::io_service& ioService, const char* connStr)
:Channel(ioService, connStr){
}
SSLStreamChannel(boost::asio::io_service& ioService, const char* host, const char* port)
:Channel(ioService, host, port){
}
- connectionStatus_t init(ChannelContext_t* context);
+ connectionStatus_t init();
};
class ChannelFactory{
public:
- static Channel* getChannel(channelType_t t, const char* connStr);
- static Channel* getChannel(channelType_t t, const char* host, const char* port);
- static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr);
- static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port);
+ static Channel* getChannel(channelType_t t,
+ boost::asio::io_service& ioService,
+ const char* connStr, DrillUserProperties* props);
+ static Channel* getChannel(channelType_t t,
+ boost::asio::io_service& ioService,
+ const char* host,
+ const char* port,
+ DrillUserProperties* props);
+ private:
+ static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
};
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 39ac847c6..f0bb636b3 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -19,31 +19,22 @@
#include "drill/common.hpp"
#include <queue>
-#include <string>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/assign.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/functional/factory.hpp>
-#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include "drill/drillClient.hpp"
#include "drill/fieldmeta.hpp"
#include "drill/recordBatch.hpp"
+#include "drill/userProperties.hpp"
#include "drillClientImpl.hpp"
-#include "collectionsImpl.hpp"
#include "errmsgs.hpp"
#include "logger.hpp"
-#include "metadata.hpp"
-#include "rpcMessage.hpp"
-#include "utils.hpp"
-#include "GeneralRPC.pb.h"
-#include "UserBitShared.pb.h"
#include "zookeeperClient.hpp"
-#include "saslAuthenticatorImpl.hpp"
namespace Drill{
namespace { // anonymous namespace
@@ -65,108 +56,69 @@ struct ToRpcType: public std::unary_function<google::protobuf::int32, exec::user
return static_cast<exec::user::RpcType>(i);
}
};
-}
-connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
- std::string pathToDrill, protocol, hostPortStr;
- std::string host;
- std::string port;
+} // anonymous
+connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
if (this->m_bIsConnected) {
- if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected
+ if(!std::strcmp(connStr, m_connectStr.c_str())){
+ // trying to connect to a different address is not allowed if already connected
return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
return CONN_SUCCESS;
}
-
- m_connectStr=connStr;
- Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
- if(protocol == "zk"){
- ZookeeperClient zook(pathToDrill);
- std::vector<std::string> drillbits;
- int err = zook.getAllDrillbits(hostPortStr, drillbits);
- if(!err){
- if (drillbits.empty()){
- return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
- }
- Utils::shuffle(drillbits);
- exec::DrillbitEndpoint endpoint;
- err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list
- if(!err){
- host=boost::lexical_cast<std::string>(endpoint.address());
- port=boost::lexical_cast<std::string>(endpoint.user_port());
- }
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;)
-
- }
- if(err){
- return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
- }
- zook.close();
- m_bIsDirectConnection=true;
- }else if(protocol == "local"){
- boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
- char tempStr[MAX_CONNECT_STR+1];
- strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
- host=strtok(tempStr, ":");
- port=strtok(NULL, "");
- m_bIsDirectConnection=false;
- }else{
- return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
- }
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
- std::string serviceHost;
- for (size_t i = 0; i < props->size(); i++) {
- if (props->keyAt(i) == USERPROP_SERVICE_HOST) {
- serviceHost = props->valueAt(i);
- }
+ std::string val;
+ channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
+ props->getProp(USERPROP_USESSL, val) =="true") ?
+ CHANNEL_TYPE_SSLSTREAM :
+ CHANNEL_TYPE_SOCKET;
+
+ connectionStatus_t ret = CONN_SUCCESS;
+ m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr, props);
+ ret=m_pChannel->init();
+ if(ret!=CONN_SUCCESS){
+ handleConnError(m_pChannel->getError());
+ return ret;
}
- if (serviceHost.empty()) {
- props->setProperty(USERPROP_SERVICE_HOST, host);
+ ret= m_pChannel->connect();
+ if(ret!=CONN_SUCCESS){
+ handleConnError(m_pChannel->getError());
+ return ret;
}
- connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+ props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
+ m_bIsConnected = true;
return ret;
}
-connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
- using boost::asio::ip::tcp;
- tcp::endpoint endpoint;
- try{
- tcp::resolver resolver(m_io_service);
- tcp::resolver::query query(tcp::v4(), host, port);
- tcp::resolver::iterator iter = resolver.resolve(query);
- tcp::resolver::iterator end;
- while (iter != end){
- endpoint = *iter++;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
- }
- boost::system::error_code ec;
- m_socket.connect(endpoint, ec);
- if(ec){
- return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str()));
- }
-
- }catch(const std::exception & e){
- // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve
- if (!strcmp(e.what(), "resolve")) {
- return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what()));
+connectionStatus_t DrillClientImpl::connect(const char* host, const char* port, DrillUserProperties* props){
+ if (this->m_bIsConnected) {
+ std::string connStr = std::string(host)+":"+std::string(port);
+ if(!std::strcmp(connStr.c_str(), m_connectStr.c_str())){
+ // trying to connect to a different address is not allowed if already connected
+ return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
- return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
+ return CONN_SUCCESS;
}
-
- m_bIsConnected=true;
- // set socket keep alive
- boost::asio::socket_base::keep_alive keepAlive(true);
- m_socket.set_option(keepAlive);
- // set no_delay
- boost::asio::ip::tcp::no_delay noDelay(true);
- m_socket.set_option(noDelay);
-
- std::ostringstream connectedHost;
- connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
- m_connectedHost = connectedHost.str();
- DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
-
- return CONN_SUCCESS;
+ std::string val;
+ channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
+ props->getProp(USERPROP_USESSL, val) =="true") ?
+ CHANNEL_TYPE_SSLSTREAM :
+ CHANNEL_TYPE_SOCKET;
+
+ connectionStatus_t ret = CONN_SUCCESS;
+ m_pChannel= ChannelFactory::getChannel(type, m_io_service, host, port, props);
+ ret=m_pChannel->init();
+ if(ret!=CONN_SUCCESS){
+ handleConnError(m_pChannel->getError());
+ return ret;
+ }
+ ret=m_pChannel->connect();
+ if(ret!=CONN_SUCCESS){
+ handleConnError(m_pChannel->getError());
+ return ret;
+ }
+ props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
+ m_bIsConnected = true;
+ return ret;
}
void DrillClientImpl::startHeartbeatTimer(){
@@ -250,7 +202,15 @@ void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite,
// Write all the bytes to socket. In case of error when all bytes are not successfully written
// proper errorCode will be set.
while(1) {
- size_t bytesWritten = m_socket.write_some(boost::asio::buffer(dataPtr, bytesToWrite), errorCode);
+ size_t bytesWritten;
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if(m_pChannel==NULL){
+ return;
+ }
+ bytesWritten = m_pChannel->getSocketStream().writeSome(boost::asio::buffer(dataPtr, bytesToWrite),
+ errorCode);
+ }
if(errorCode && boost::asio::error::interrupted != errorCode){
break;
@@ -359,8 +319,10 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
}
m_io_service.reset();
- if (DrillClientConfig::getHandshakeTimeout() > 0){
- m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHandshakeTimeout()));
+
+ int32_t handshakeTimeout=DrillClientConfig::getHandshakeTimeout();
+ if (handshakeTimeout > 0){
+ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(handshakeTimeout));
m_deadlineTimer.async_wait(boost::bind(
&DrillClientImpl::handleHShakeReadTimeout,
this,
@@ -370,16 +332,21 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
<< DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
}
- async_read(
- this->m_socket,
- boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
- boost::bind(
- &DrillClientImpl::handleHandshake,
- this,
- m_rbuf,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if (m_pChannel == NULL) {
+ return CONN_NOSOCKET;
+ }
+ m_pChannel->getSocketStream().asyncRead(
+ boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
+ boost::bind(
+ &DrillClientImpl::handleHandshake,
+ this,
+ m_rbuf,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)
+ );
+ }
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
m_io_service.run();
if(m_rbuf!=NULL){
@@ -418,8 +385,15 @@ void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead,
// Read all the bytes. In case when all the bytes were not read the proper
// errorCode will be set.
while(1){
- size_t dataBytesRead = m_socket.read_some(boost::asio::buffer(inBuf, bytesToRead),
+ size_t dataBytesRead;
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if(m_pChannel==NULL){
+ return;
+ }
+ dataBytesRead = m_pChannel->getSocketStream().readSome(boost::asio::buffer(inBuf, bytesToRead),
errorCode);
+ }
// Check if errorCode is EINTR then just retry otherwise break from loop
if(errorCode && boost::asio::error::interrupted != errorCode){
break;
@@ -518,8 +492,10 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
<< "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
m_io_service.stop();
- boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if(m_pChannel != NULL) m_pChannel->close();
+ }
}
}
return;
@@ -1027,16 +1003,21 @@ void DrillClientImpl::getNextResult(){
startHeartbeatTimer();
- async_read(
- this->m_socket,
- boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
- boost::bind(
- &DrillClientImpl::handleRead,
- this,
- readBuf,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if (m_pChannel == NULL) {
+ return;
+ }
+ m_pChannel->getSocketStream().asyncRead(
+ boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
+ boost::bind(
+ &DrillClientImpl::handleRead,
+ this,
+ readBuf,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)
+ );
+ }
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
}
@@ -1937,10 +1918,16 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
// defined. To be really sure, we need to close the socket. Closing the socket is a bit
// drastic and we will defer that till a later release.
#ifdef WIN32_SHUTDOWN_ON_TIMEOUT
- boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if(m_pChannel != NULL) m_pChannel->close();
+ }
+ m_pChannel->close();
#else // NOT WIN32_SHUTDOWN_ON_TIMEOUT
- m_socket.cancel();
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if(m_pChannel != NULL) m_pChannel->getInnerSocket().cancel();
+ }
#endif // WIN32_SHUTDOWN_ON_TIMEOUT
}
}
@@ -2149,6 +2136,20 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, c
return status;
}
+connectionStatus_t DrillClientImpl::handleConnError(DrillClientError* err){
+ DrillClientError* pErr = new DrillClientError(*err);
+ m_pendingRequests=0;
+ if(!m_queryHandles.empty()){
+ // set query error only if queries are running
+ broadcastError(pErr);
+ }else{
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_pError=pErr;
+ shutdownSocket();
+ }
+ return (connectionStatus_t)pErr->status;
+}
+
/*
* Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read
* and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found
@@ -2268,9 +2269,16 @@ void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
}
void DrillClientImpl::shutdownSocket(){
+ m_pendingRequests=0;
+ m_heartbeatTimer.cancel();
+ m_deadlineTimer.cancel();
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if (m_pChannel != NULL) {
+ m_pChannel->close();
+ }
+ }
m_io_service.stop();
- boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_bIsConnected=false;
// Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next
@@ -2697,7 +2705,7 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr, DrillUser
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
- stat = pDrillClientImpl->connect(host.c_str(), port.c_str());
+ stat = pDrillClientImpl->connect(host.c_str(), port.c_str(), props);
if(stat == CONN_SUCCESS){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
m_clientConnections.push_back(pDrillClientImpl);
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index dacc2c30a..dc4a67eaa 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -45,6 +45,7 @@
#include "drill/drillConfig.hpp"
#include "drill/drillError.hpp"
#include "drill/preparedStatement.hpp"
+#include "channel.hpp"
#include "collectionsImpl.hpp"
#include "metadata.hpp"
#include "rpcMessage.hpp"
@@ -386,7 +387,8 @@ class DrillClientImpl : public DrillClientImplBase{
m_pError(NULL),
m_pListenerThread(NULL),
m_pWork(NULL),
- m_socket(m_io_service),
+ m_pChannel(NULL),
+ m_pChannelContext(NULL),
m_deadlineTimer(m_io_service),
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
@@ -399,9 +401,11 @@ class DrillClientImpl : public DrillClientImplBase{
};
~DrillClientImpl(){
- //TODO: Cleanup.
- //Free any record batches or buffers remaining
//Cancel any pending requests
+ m_heartbeatTimer.cancel();
+ m_deadlineTimer.cancel();
+ m_io_service.stop();
+ //Free any record batches or buffers remaining
//Clear and destroy DrillClientQueryResults vector?
if(this->m_pWork!=NULL){
delete this->m_pWork;
@@ -411,13 +415,19 @@ class DrillClientImpl : public DrillClientImplBase{
delete this->m_saslAuthenticator;
this->m_saslAuthenticator = NULL;
}
+ {
+ boost::lock_guard<boost::mutex> lock(m_channelMutex);
+ if (this->m_pChannel != NULL) {
+ m_pChannel->close();
+ delete this->m_pChannel;
+ this->m_pChannel = NULL;
+ }
+ if (this->m_pChannelContext != NULL) {
+ delete this->m_pChannelContext;
+ this->m_pChannelContext = NULL;
+ }
+ }
- m_heartbeatTimer.cancel();
- m_deadlineTimer.cancel();
- m_io_service.stop();
- boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
- m_socket.close();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
@@ -442,6 +452,8 @@ class DrillClientImpl : public DrillClientImplBase{
//Connect via Zookeeper or directly
connectionStatus_t connect(const char* connStr, DrillUserProperties* props);
+ connectionStatus_t connect(const char* host, const char* port, DrillUserProperties* props);
+
// test whether the client is active
bool Active();
void Close() ;
@@ -523,6 +535,7 @@ class DrillClientImpl : public DrillClientImplBase{
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
bool validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
+ connectionStatus_t handleConnError(DrillClientError* err);
status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
@@ -603,7 +616,12 @@ class DrillClientImpl : public DrillClientImplBase{
boost::asio::io_service m_io_service;
// the work object prevent io_service running out of work
boost::asio::io_service::work * m_pWork;
- boost::asio::ip::tcp::socket m_socket;
+
+ // Mutex to protect channel
+ boost::mutex m_channelMutex;
+ Channel* m_pChannel;
+ ChannelContext_t* m_pChannelContext;
+
boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
diff --git a/contrib/native/client/src/clientlib/drillConfig.cpp b/contrib/native/client/src/clientlib/drillConfig.cpp
index abaa79aff..90a751a19 100644
--- a/contrib/native/client/src/clientlib/drillConfig.cpp
+++ b/contrib/native/client/src/clientlib/drillConfig.cpp
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-
+#include <boost/thread/lock_guard.hpp>
#include "drill/common.hpp"
#include "drill/drillConfig.hpp"
#include "env.h"
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 012bd1974..d8e2da78d 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -165,6 +165,10 @@ typedef enum{
RET_FAILURE=1
} ret_t;
+// Connect string protocol types
+#define PROTOCOL_TYPE_ZK "zk"
+#define PROTOCOL_TYPE_DIRECT "drillbit"
+#define PROTOCOL_TYPE_DIRECT_2 "local"
// User Property Names
#define USERPROP_USERNAME "userName"
@@ -173,7 +177,8 @@ typedef enum{
#define USERPROP_USESSL "enableTLS"
#define USERPROP_TLSPROTOCOL "TLSProtocol" //TLS version
#define USERPROP_CERTFILEPATH "certFilePath" // pem file path and name
-#define USERPROP_CERTPASSWORD "certPassword" // Password for certificate file
+// TODO: support truststore protected by password.
+// #define USERPROP_CERTPASSWORD "certPassword" // Password for certificate file.
#define USERPROP_DISABLE_HOSTVERIFICATION "disableHostVerification"
#define USERPROP_DISABLE_CERTVERIFICATION "disableCertVerification"
#define USERPROP_USESYSTEMTRUSTSTORE "useSystemTrustStore" //Windows only, use the system trust store
diff --git a/contrib/native/client/src/include/drill/drillConfig.hpp b/contrib/native/client/src/include/drill/drillConfig.hpp
index 669267d86..46bbbb2d2 100644
--- a/contrib/native/client/src/include/drill/drillConfig.hpp
+++ b/contrib/native/client/src/include/drill/drillConfig.hpp
@@ -21,27 +21,7 @@
#define DRILL_CONFIG_H
#include "drill/common.hpp"
-#include <boost/thread.hpp>
-
-
-
-#if defined _WIN32 || defined __CYGWIN__
- #ifdef DRILL_CLIENT_EXPORTS
- #define DECLSPEC_DRILL_CLIENT __declspec(dllexport)
- #else
- #ifdef USE_STATIC_LIBDRILL
- #define DECLSPEC_DRILL_CLIENT
- #else
- #define DECLSPEC_DRILL_CLIENT __declspec(dllimport)
- #endif
- #endif
-#else
- #if __GNUC__ >= 4
- #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
- #else
- #define DECLSPEC_DRILL_CLIENT
- #endif
-#endif
+#include <boost/thread/mutex.hpp>
namespace exec{
namespace shared{
diff --git a/contrib/native/client/src/include/drill/userProperties.hpp b/contrib/native/client/src/include/drill/userProperties.hpp
index 3490dce7a..62a04f787 100644
--- a/contrib/native/client/src/include/drill/userProperties.hpp
+++ b/contrib/native/client/src/include/drill/userProperties.hpp
@@ -36,20 +36,17 @@ class DECLSPEC_DRILL_CLIENT DrillUserProperties{
size_t size() const { return m_properties.size(); }
- //const std::string& keyAt(size_t i) const { return m_properties.at(i).first; }
-
- //const std::string& valueAt(size_t i) const { return m_properties.at(i).second; }
-
const bool isPropSet(const std::string& key) const{
bool isSet=true;
- auto f= m_properties.find(key);
+ std::map<std::string, std::string>::const_iterator f=m_properties.find(key);
if(f==m_properties.end()){
isSet=false;
}
return isSet;
}
+
const std::string& getProp(const std::string& key, std::string& value) const{
- auto f= m_properties.find(key);
+ std::map<std::string, std::string>::const_iterator f=m_properties.find(key);
if(f!=m_properties.end()){
value=f->second;
}
diff --git a/contrib/native/client/test/ssl/testSSL.cpp b/contrib/native/client/test/ssl/testSSL.cpp
index 3eaac4876..426256616 100644
--- a/contrib/native/client/test/ssl/testSSL.cpp
+++ b/contrib/native/client/test/ssl/testSSL.cpp
@@ -330,6 +330,7 @@ int main(int argc, char* argv[]){
std::string connectStr = "zk=localhost:2181/drill/drillbits1";
//std::string connectStr = "drillbit=localhost:31090";
channelType_t type;
+ boost::asio::io_service ioService;
bool isSSL = argc==2 && !(strcmp(argv[1], "ssl"));
type = CHANNEL_TYPE_SOCKET;
@@ -341,12 +342,10 @@ int main(int argc, char* argv[]){
props.setProperty(USERPROP_PASSWORD, "admin");
props.setProperty(USERPROP_CERTFILEPATH, "../../../test/ssl/drillTestCert.pem");
- pChannelContext = ChannelContextFactory::getChannelContext(type, &props);
-
- pChannel = ChannelFactory::getChannel(type, connectStr.c_str());
+ pChannel = ChannelFactory::getChannel(type, ioService, connectStr.c_str(), &props);
if(pChannel != NULL){
connectionStatus_t connStat;
- connStat = pChannel->init(pChannelContext);
+ connStat = pChannel->init();
if(connStat != CONN_SUCCESS){
std::cout << "Init Failed." << std::endl;
return -1;