aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client
diff options
context:
space:
mode:
authorParth Chandra <parthc@apache.org>2016-02-12 15:42:53 -0800
committerParth Chandra <parthc@apache.org>2016-03-07 17:49:50 -0800
commitdf0f0af3d963c1b65eb01c3141fe84532c53f5a5 (patch)
tree1aee84f184cddafb7e89815a87dbf29d9fadaf20 /contrib/native/client
parenta2fec78695df979e240231cb9d32c7f18274a333 (diff)
DRILL-4313: C++ Client - Thread safe Logging. Improved Drill bit selection.
- Update random drill bit selection. Shuffle the list initially, then round robin. Add Utility methods to get random numbers and to shuffle and add vectors. Whitespace cleanup - Add Git properties to build and print to log. - Add interface to get error based on query handle. - Add support for Pooled connections. Allows switching between pooled and unpooled connections based on environment variables
Diffstat (limited to 'contrib/native/client')
-rw-r--r--contrib/native/client/CMakeLists.txt24
-rw-r--r--contrib/native/client/cmakeModules/FindZookeeper.cmake2
-rw-r--r--contrib/native/client/example/querySubmitter.cpp25
-rw-r--r--contrib/native/client/src/clientlib/drillClient.cpp33
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp600
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp169
-rw-r--r--contrib/native/client/src/clientlib/env.h.in26
-rw-r--r--contrib/native/client/src/clientlib/errmsgs.cpp2
-rw-r--r--contrib/native/client/src/clientlib/errmsgs.hpp4
-rw-r--r--contrib/native/client/src/clientlib/logger.cpp126
-rw-r--r--contrib/native/client/src/clientlib/logger.hpp85
-rw-r--r--contrib/native/client/src/clientlib/utils.cpp109
-rw-r--r--contrib/native/client/src/clientlib/utils.hpp100
-rw-r--r--contrib/native/client/src/include/drill/common.hpp9
-rw-r--r--contrib/native/client/src/include/drill/drillClient.hpp7
15 files changed, 1001 insertions, 320 deletions
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt
index 603586d0e..b22af42f7 100644
--- a/contrib/native/client/CMakeLists.txt
+++ b/contrib/native/client/CMakeLists.txt
@@ -22,8 +22,20 @@ project(drillclient)
message("Project Dir = ${PROJECT_SOURCE_DIR}")
message("Source Dir = ${CMAKE_SOURCE_DIR} ")
+cmake_policy(SET CMP0043 NEW)
+
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/")
+# Get the latest git commit properties of the working branch
+execute_process(
+ COMMAND git log -1 --format="\\nCommit: %H \\nDescription: %s \\nAuthor: %aN Date: %ai"
+ WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
+ OUTPUT_VARIABLE GIT_COMMIT_PROP
+ OUTPUT_STRIP_TRAILING_WHITESPACE
+ )
+add_definitions("-DGIT_COMMIT_PROP=${GIT_COMMIT_PROP}")
+
+
# Find Boost
if(MSVC)
@@ -36,7 +48,7 @@ else()
set(Boost_USE_STATIC_RUNTIME OFF)
endif()
-find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread )
+find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread random)
include_directories(${Boost_INCLUDE_DIRS})
if(CMAKE_COMPILER_IS_GNUCXX)
@@ -63,6 +75,16 @@ include_directories(${PROTOBUF_INCLUDE_DIR})
#Find Zookeeper
find_package(Zookeeper REQUIRED )
+
+# Generated sources
+configure_file(
+ ${CMAKE_SOURCE_DIR}/src/clientlib/env.h.in
+ ${CMAKE_BINARY_DIR}/generated/env.h
+ )
+
+include_directories(${CMAKE_BINARY_DIR}/generated)
+
+
#
# TARGETS
#
diff --git a/contrib/native/client/cmakeModules/FindZookeeper.cmake b/contrib/native/client/cmakeModules/FindZookeeper.cmake
index fd8247f28..151c05cda 100644
--- a/contrib/native/client/cmakeModules/FindZookeeper.cmake
+++ b/contrib/native/client/cmakeModules/FindZookeeper.cmake
@@ -40,7 +40,7 @@ if (MSVC)
message("- CMAKE will look for zookeeper library files in $ZOOKEEPER_HOME/src/c/Debug or $ZOOKEEPER_HOME/src/c/Release.")
else()
FILE(TO_CMAKE_PATH ${ZOOKEEPER_HOME} Zookeeper_HomePath)
- set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir})
+ set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir} ${Zookeeper_HomePath}/src/c/x64/${ZK_BuildOutputDir} )
find_path(ZK_INCLUDE_DIR zookeeper.h ${Zookeeper_HomePath}/src/c/include)
find_path(ZK_INCLUDE_DIR_GEN zookeeper.jute.h ${Zookeeper_HomePath}/src/c/generated)
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 960ff4f44..d507d1bb2 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -20,6 +20,7 @@
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
+#include <boost/thread.hpp>
#include "drill/drillc.hpp"
int nOptions=13;
@@ -65,11 +66,13 @@ Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::Dril
}
}
+boost::mutex listenerMutex;
Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){
// Invariant:
// (received an record batch and err is NULL)
// or
// (received query state message passed by `err` and b is NULL)
+ boost::lock_guard<boost::mutex> listenerLock(listenerMutex);
if(!err){
if(b!=NULL){
b->print(std::cout, 0); // print all rows
@@ -317,16 +320,24 @@ int main(int argc, char* argv[]) {
std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter;
Drill::DrillClient client;
- // To log to file
- //DrillClient::initLogging("/var/log/drill/", l);
+#if defined _WIN32 || defined _WIN64
+ TCHAR tempPath[MAX_PATH];
+ GetTempPath(MAX_PATH, tempPath);
+ char logpathPrefix[MAX_PATH + 128];
+ strcpy(logpathPrefix,tempPath);
+ strcat(logpathPrefix, "\\drillclient");
+#else
+ char* logpathPrefix = "/var/log/drill/drillclient";
+#endif
+ // To log to file
+ Drill::DrillClient::initLogging(logpathPrefix, l);
// To log to stderr
- Drill::DrillClient::initLogging(NULL, l);
- //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold at least two record batches.
- int nQueries=queryInputs.size();
- Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches.
+ //Drill::DrillClient::initLogging(NULL, l);
+ int nQueries=queryInputs.size();
+ Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. The size of a record batch may vary, but is unlikely to exceed the 256 MB which is the default.
- if (!hshakeTimeout.empty()){
+ if(!hshakeTimeout.empty()){
Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str()));
}
if (!queryTimeout.empty()){
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 708793867..92c5194d6 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-
+#include <stdlib.h>
#include <boost/assign.hpp>
#include "drill/common.hpp"
#include "drill/drillClient.hpp"
@@ -56,21 +56,22 @@ int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 seconds
boost::mutex DrillClientConfig::s_mutex;
DrillClientConfig::DrillClientConfig(){
- initLogging(NULL);
+ // Do not initialize logging. The Logger object is static and may
+ // not have been initialized yet
+ //initLogging(NULL);
}
DrillClientConfig::~DrillClientConfig(){
- Logger::close();
}
void DrillClientConfig::initLogging(const char* path){
- Logger::init(path);
+ getLogger().init(path);
}
void DrillClientConfig::setLogLevel(logLevel_t l){
boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
s_logLevel=l;
- Logger::s_level=l;
+ getLogger().m_level=l;
//boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel);
}
@@ -163,7 +164,7 @@ RecordIterator::~RecordIterator(){
delete this->m_pQueryResult;
this->m_pQueryResult=NULL;
if(this->m_pCurrentRecordBatch!=NULL){
- DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL;
}
}
@@ -224,7 +225,7 @@ status_t RecordIterator::next(){
if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){
boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex);
if(this->m_pCurrentRecordBatch !=NULL){
- DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;)
delete this->m_pCurrentRecordBatch; //free the previous record batch
this->m_pCurrentRecordBatch=NULL;
}
@@ -235,12 +236,12 @@ status_t RecordIterator::next(){
}
this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
if(this->m_pCurrentRecordBatch != NULL){
- DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;)
}else{
- DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;)
}
if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){
- DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;)
ret = QRY_NO_MORE_DATA;
}else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){
ret=QRY_SUCCESS_WITH_INFO;
@@ -315,7 +316,12 @@ void DrillClient::initLogging(const char* path, logLevel_t l){
}
DrillClient::DrillClient(){
- this->m_pImpl=new DrillClientImpl;
+ const char* enablePooledClient=std::getenv(ENABLE_CONNECTION_POOL_ENV);
+ if(enablePooledClient!=NULL && atoi(enablePooledClient)!=0){
+ this->m_pImpl=new PooledDrillClientImpl;
+ }else{
+ this->m_pImpl=new DrillClientImpl;
+ }
}
DrillClient::~DrillClient(){
@@ -378,10 +384,12 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string&
}
void* DrillClient::getApplicationContext(QueryHandle_t handle){
+ assert(handle!=NULL);
return ((DrillClientQueryResult*)handle)->getListenerContext();
}
status_t DrillClient::getQueryStatus(QueryHandle_t handle){
+ assert(handle!=NULL);
return ((DrillClientQueryResult*)handle)->getQueryStatus();
}
@@ -389,6 +397,9 @@ std::string& DrillClient::getError(){
return m_pImpl->getError()->msg;
}
+const std::string& DrillClient::getError(QueryHandle_t handle){
+ return ((DrillClientQueryResult*)handle)->getError()->msg;
+}
void DrillClient::waitForResults(){
this->m_pImpl->waitForResults();
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index d4e9ed96e..3ec01f521 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
#endif
}
-
-void DrillClientImpl::parseConnectStr(const char* connectStr,
- std::string& pathToDrill,
- std::string& protocol,
- std::string& hostPortStr){
- char u[MAX_CONNECT_STR+1];
- strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
- char* z=strtok(u, "=");
- char* c=strtok(NULL, "/");
- char* p=strtok(NULL, "");
-
- if(p!=NULL) pathToDrill=std::string("/")+p;
- protocol=z; hostPortStr=c;
- return;
-}
-
connectionStatus_t DrillClientImpl::connect(const char* connStr){
std::string pathToDrill, protocol, hostPortStr;
std::string host;
std::string port;
if(!this->m_bIsConnected){
- parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
if(!strcmp(protocol.c_str(), "zk")){
ZookeeperImpl zook;
- if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ exec::DrillbitEndpoint endpoint;
+ err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list
+ if(!err){
+ host=boost::lexical_cast<std::string>(endpoint.address());
+ port=boost::lexical_cast<std::string>(endpoint.user_port());
+ }
+ }
+ if(err){
return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
}
- zook.debugPrint();
- exec::DrillbitEndpoint e=zook.getEndPoint();
- host=boost::lexical_cast<std::string>(e.address());
- port=boost::lexical_cast<std::string>(e.user_port());
zook.close();
+ m_bIsDirectConnection=true;
}else if(!strcmp(protocol.c_str(), "local")){
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
char tempStr[MAX_CONNECT_STR+1];
strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
host=strtok(tempStr, ":");
port=strtok(NULL, "");
+ m_bIsDirectConnection=false;
}else{
return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
}
- return this->connect(host.c_str(), port.c_str());
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
+ connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+ return ret;
+ }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected
+ return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
return CONN_SUCCESS;
}
@@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
tcp::resolver::iterator end;
while (iter != end){
endpoint = *iter++;
- DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
}
boost::system::error_code ec;
m_socket.connect(endpoint, ec);
@@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
}
+ m_bIsConnected=true;
// set socket keep alive
boost::asio::socket_base::keep_alive keepAlive(true);
m_socket.set_option(keepAlive);
@@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
boost::asio::ip::tcp::no_delay noDelay(true);
m_socket.set_option(noDelay);
- //
- // We put some OS dependent code here for timing out a socket. Mostly, this appears to
- // do nothing. Should we leave it in there?
- //
- setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
-
+ std::ostringstream connectedHost;
+ connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
+ m_connectedHost = connectedHost.str();
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
+
return CONN_SUCCESS;
}
void DrillClientImpl::startHeartbeatTimer(){
- DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
- << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+ << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;)
m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
m_heartbeatTimer.async_wait(boost::bind(
&DrillClientImpl::handleHeartbeatTimeout,
this,
boost::asio::placeholders::error
));
- startMessageListener(); // start this thread early so we don't have the timer blocked
+ startMessageListener(); // start this thread early so we don't have the timer blocked
}
connectionStatus_t DrillClientImpl::sendHeartbeat(){
- connectionStatus_t status=CONN_SUCCESS;
+ connectionStatus_t status=CONN_SUCCESS;
exec::rpc::Ack ack;
ack.set_ok(true);
OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
- boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
- boost::lock_guard<boost::mutex> lock(m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+ boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
status=sendSync(heartbeatMsg);
status=status==CONN_SUCCESS?status:CONN_DEAD;
//If the server sends responses to a heartbeat, we need to increment the pending requests counter.
@@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
void DrillClientImpl::resetHeartbeatTimer(){
m_heartbeatTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;)
startHeartbeatTimer();
}
-
-
void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
<< to_simple_string(m_heartbeatTimer.expires_at())
<< " and time now is: "
<< to_simple_string(boost::asio::deadline_timer::traits_type::now())
- << std::endl;
+ << std::endl;)
;
if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
@@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
startHeartbeatTimer();
}else{
// Close connection.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";)
shutdownSocket();
}
}
@@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
return;
}
-
void DrillClientImpl::Close() {
shutdownSocket();
}
@@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
this,
boost::asio::placeholders::error
));
- DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
- << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
+ << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
}
async_read(
@@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
m_io_service.run();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
@@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
boost::system::error_code error=err;
// cancel the timer
m_deadlineTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
if(!error){
InBoundRpcMessage msg;
uint32_t length = 0;
@@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
boost::asio::buffer(b, bytesToRead),
error);
if(err) break;
- DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;)
if(dataBytesRead==bytesToRead) break;
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
}
DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
return;
}
@@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
m_io_service.stop();
boost::system::error_code ignorederr;
@@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
- DRILL_LOG(LOG_TRACE) << "validateHandShake\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
@@ -368,7 +364,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
std::string username;
std::string err;
if(!properties->validate(err)){
- DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;)
}
exec::user::UserProperties* userProperties = u2b.mutable_properties();
@@ -376,8 +372,8 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
for(size_t i=0; i<properties->size(); i++){
std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
if(it==DrillUserProperties::USER_PROPERTIES.end()){
- DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
- << ") is unknown and is being skipped" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
+ << ") is unknown and is being skipped" << std::endl;)
continue;
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){
@@ -392,9 +388,9 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
//u2b.set_credentials(&creds);
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){
- DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;)
}else{
- DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;)
}
}// Server properties
}
@@ -406,7 +402,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
sendSync(out_msg);
- DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
}
connectionStatus_t ret = recvHandshake();
@@ -416,21 +412,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
if(this->m_handshakeStatus != exec::user::SUCCESS){
switch(this->m_handshakeStatus){
case exec::user::RPC_VERSION_MISMATCH:
- DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected "
- << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected "
+ << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;)
return handleConnError(CONN_BAD_RPC_VER,
getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION,
m_handshakeVersion,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::AUTH_FAILED:
- DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;)
return handleConnError(CONN_AUTH_FAILED,
getMessage(ERR_CONN_AUTHFAIL,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::UNKNOWN_FAILURE:
- DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;)
return handleConnError(CONN_HANDSHAKE_FAILED,
getMessage(ERR_CONN_UNKNOWN_ERR,
this->m_handshakeErrorId.c_str(),
@@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() {
if(this->m_pListenerThread==NULL){
// Stopping the io_service from running out-of-work
if(m_io_service.stopped()){
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;)
m_io_service.reset();
}
this->m_pWork = new boost::asio::io_service::work(m_io_service);
this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
- << this->m_pListenerThread << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
+ << this->m_pListenerThread << std::endl;)
}
}
@@ -480,22 +476,23 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
sendSync(out_msg);
- pQuery = new DrillClientQueryResult(this, coordId);
+ pQuery = new DrillClientQueryResult(this, coordId, plan);
pQuery->registerListener(l, lCtx);
bool sendRequest=false;
this->m_queryIds[coordId]=pQuery;
- DRILL_LOG(LOG_DEBUG) << "Sent query request. Coordination id = " << coordId << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;)
if(m_pendingRequests++==0){
sendRequest=true;
}else{
- DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;
- DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
}
if(sendRequest){
- DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
- << m_pendingRequests << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
+ << m_pendingRequests << std::endl;)
getNextResult(); // async wait for results
}
}
@@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){
{
boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
- DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;)
while(AllocatedBuffer::s_isBufferLimitReached){
AllocatedBuffer::s_memCV.wait(memLock);
}
@@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
if (DrillClientConfig::getQueryTimeout() > 0){
- DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
- << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
+ << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;)
m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
m_deadlineTimer.async_wait(boost::bind(
&DrillClientImpl::handleReadTimeout,
@@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
}
void DrillClientImpl::waitForResults(){
@@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
InBoundRpcMessage& msg,
boost::system::error_code& error){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
size_t leftover=0;
uint32_t rmsgLen;
AllocatedBufferPtr currentBuffer;
@@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
- DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
- DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
if(rmsgLen>0){
leftover = LEN_PREFIX_BUFLEN - bytes_read;
// Allocate a buffer
currentBuffer=new AllocatedBuffer(rmsgLen);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
- << currentBuffer << ", size = " << rmsgLen << " ]\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
+ << currentBuffer << ", size = " << rmsgLen << " ]\n";)
if(currentBuffer==NULL){
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
@@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
if(leftover){
memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
}
- DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
- << (rmsgLen - leftover) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
+ << (rmsgLen - leftover) << std::endl;)
ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
@@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
boost::asio::buffer(b, bytesToRead),
error);
if(error) break;
- DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;)
if(dataBytesRead==bytesToRead) break;
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
@@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
if(!error){
// read data successfully
DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
- DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
}else{
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR,
@@ -624,8 +621,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
}
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
@@ -639,9 +636,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryResult qr;
- DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
qid.CopyFrom(qr.query_id());
@@ -657,7 +654,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
std::string valErr;
if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;)
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
@@ -665,9 +662,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
// We've received the final message for a query that has been cancelled
// or for which the resources have been freed. We no longer need to listen
// for more incoming messages for such a query.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;)
m_pendingRequests--;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;)
ret=QRY_CANCELED;
}
delete allocatedBuffer;
@@ -676,10 +673,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
// Normal query results come back with query_state not set.
// Actually this is not strictly true. The query state is set to
// 0(i.e. PENDING), but protobuf thinks this means the value is not set.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";)
}
}
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
@@ -701,21 +698,21 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up.
- DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;)
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
qid.CopyFrom(qr->query_id());
if(qid.part1()==0){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
delete allocatedBuffer;
return QRY_SUCCESS;
}
pDrillClientQueryResult=findQueryResult(qid);
if(pDrillClientQueryResult==NULL){
- DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
- << debugPrintQid(qid) << ")." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
+ << debugPrintQid(qid) << ")." << std::endl;)
delete qr;
delete allocatedBuffer;
return ret;
@@ -726,23 +723,23 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
delete qr;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
pDrillClientQueryResult->setQueryStatus(ret);
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
//Build Record Batch here
- DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;)
pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
pDrillClientQueryResult->m_numBatches++;
- DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
pRecordBatch->build();
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
- << pRecordBatch->getNumRecords() << std::endl;
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
- << pRecordBatch->getNumFields() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
+ << pRecordBatch->getNumRecords() << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
+ << pRecordBatch->getNumFields() << std::endl;)
ret=pDrillClientQueryResult->setupColumnDefs(qr);
if(ret==QRY_SUCCESS_WITH_INFO){
@@ -752,8 +749,8 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
pDrillClientQueryResult->setIsQueryPending(true);
pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
if(pDrillClientQueryResult->m_bIsLastChunk){
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
- << "Received last batch. " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+ << "Received last batch. " << std::endl;)
ret=QRY_NO_MORE_DATA;
}
pDrillClientQueryResult->setQueryStatus(ret);
@@ -770,7 +767,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
// Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
// pushed on the wire before the cancel is processed.
pDrillClientQueryResult->setIsQueryPending(false);
- DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
pDrillClientQueryResult->setQueryStatus(ret);
clearMapEntries(pDrillClientQueryResult);
return ret;
@@ -780,27 +777,27 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
- DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
boost::lock_guard<boost::mutex> lock(m_dcMutex);
std::map<int,DrillClientQueryResult*>::iterator it;
for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
- << " QueryId: "<< qidString << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+ << " QueryId: "<< qidString << std::endl;)
}
if(msg.m_coord_id==0){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
it=this->m_queryIds.find(msg.m_coord_id);
if(it!=this->m_queryIds.end()){
pDrillClientQueryResult=(*it).second;
exec::shared::QueryId *qid = new exec::shared::QueryId;
- DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
m_queryResults[qid]=pDrillClientQueryResult;
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
@@ -814,20 +811,20 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
- DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;)
if(m_queryResults.size() != 0){
for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
- << it->first->part2() << "]\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+ << it->first->part2() << "]\n";)
}
}
it=this->m_queryResults.find(&qid);
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
- DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
- debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+ debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;)
}
return pDrillClientQueryResult;
}
@@ -870,7 +867,7 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
break;
default:
{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";)
ret=handleQryError(QRY_INTERNAL_ERROR,
getMessage(ERR_QRY_UNKQRYSTATE),
pDrillClientQueryResult);
@@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";)
handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
// There is no longer an active deadline. The expiry is set to positive
// infinity so that the timer never expires until a new deadline is set.
@@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
// Cancel the timeout if handleRead is called
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
m_deadlineTimer.cancel();
}
if(!error){
InBoundRpcMessage msg;
boost::lock_guard<boost::mutex> lock(this->m_prMutex);
- DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
AllocatedBufferPtr allocatedBuffer=NULL;
if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
@@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
m_pendingRequests--;
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}else{
- boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;
- m_cv.notify_one();
+ boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
+ m_cv.notify_one();
}
return;
}else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
@@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// We have a socket read error, but we do not know which query this is for.
// Signal ALL pending queries that they should stop waiting.
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}else{
@@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// We should properly handle these handshake requests/responses
if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
sendSync(out_msg);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
}
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
- << "QueryResult returned " << msg.m_rpc_type << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << std::endl;)
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
delete allocatedBuffer;
@@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// boost error
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
- "Boost Communication Error: " << error.message() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
+ "Boost Communication Error: " << error.message() << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}
@@ -1066,6 +1063,7 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
}else{
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
+ shutdownSocket();
}
return status;
}
@@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSync(ack_msg);
- DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
}
void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
@@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
uint64_t coordId = this->getNextCoordinationId();
OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
sendSync(cancel_msg);
- DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
}
void DrillClientImpl::shutdownSocket(){
@@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){
boost::system::error_code ignorederr;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_bIsConnected=false;
- DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
}
// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
@@ -1236,7 +1234,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
RecordBatch* b,
DrillClientError* err) {
//ctx; // unused, we already have the this pointer
- DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
//check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
if(this->m_bCancel){
if(b!=NULL) delete b;
@@ -1247,8 +1245,8 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
{
if(b!=NULL){
#ifdef DEBUG
- DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
- << "Query result listener saved result to queue." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
+ << "Query result listener saved result to queue." << std::endl;)
#endif
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
this->m_recordBatches.push(b);
@@ -1267,7 +1265,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
- DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
@@ -1281,14 +1279,14 @@ RecordBatch* DrillClientQueryResult::getNext() {
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending){
- DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;)
if(!m_recordBatches.empty()){
- DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;)
}
return NULL;
}
- DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
this->m_cv.wait(cvLock);
}
@@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){
m_columnDefs->clear();
}
if(this->m_pQueryId!=NULL){
- DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
}
//Tell the parent to remove this from its lists
m_pClient->clearMapEntries(this);
@@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){
if(!m_recordBatches.empty()){
// When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
// the last chunk has been received. We eventually delete it.
- DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
RecordBatch* pR=NULL;
while(!m_recordBatches.empty()){
pR=m_recordBatches.front();
@@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){
}
}
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+ connectionStatus_t stat = CONN_SUCCESS;
+ std::string pathToDrill, protocol, hostPortStr;
+ std::string host;
+ std::string port;
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(!strcmp(protocol.c_str(), "zk")){
+ // Get a list of drillbits
+ ZookeeperImpl zook;
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ // The original shuffled order is maintained if we shuffle first and then add any missing elements
+ Utils::add(m_drillbits, drillbits);
+ exec::DrillbitEndpoint e;
+ size_t nextIndex=0;
+ {
+ boost::lock_guard<boost::mutex> cLock(m_cMutex);
+ m_lastConnection++;
+ nextIndex = (m_lastConnection)%(getDrillbitCount());
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+ << "(" << (void*)this << ")"
+ << ": Current counter is: "
+ << m_lastConnection << std::endl;)
+ err=zook.getEndPoint(m_drillbits, nextIndex, e);
+ if(!err){
+ host=boost::lexical_cast<std::string>(e.address());
+ port=boost::lexical_cast<std::string>(e.user_port());
+ }
+ }
+ if(err){
+ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ zook.close();
+ m_bIsDirectConnection=false;
+ }else if(!strcmp(protocol.c_str(), "local")){
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
+ host=strtok(tempStr, ":");
+ port=strtok(NULL, "");
+ m_bIsDirectConnection=true;
+ }else{
+ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
+ DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+ stat = pDrillClientImpl->connect(host.c_str(), port.c_str());
+ if(stat == CONN_SUCCESS){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ m_clientConnections.push_back(pDrillClientImpl);
+ }else{
+ DrillClientError* pErr = pDrillClientImpl->getError();
+ handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+ delete pDrillClientImpl;
+ }
+ return stat;
+}
+
+connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+ // Assume there is one valid connection to at least one drillbit
+ connectionStatus_t stat=CONN_FAILURE;
+ // Keep a copy of the user properties
+ if(props!=NULL){
+ m_pUserProperties = new DrillUserProperties;
+ for(size_t i=0; i<props->size(); i++){
+ m_pUserProperties->setProperty(
+ props->keyAt(i),
+ props->valueAt(i)
+ );
+ }
+ }
+ DrillClientImpl* pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
+ stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ }
+ else{
+ stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
+ }
+ return stat;
+}
+
+DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+ DrillClientQueryResult* pDrillClientQueryResult = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
+ // Nothing to do. If this class ever keeps track of executing queries then it will need
+ // to implement this call to free any query specific resources the pool might have
+ // allocated
+ return;
+}
+
+bool PooledDrillClientImpl::Active(){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->Active()){
+ return true;
+ }
+ }
+ return false;
+}
+
+void PooledDrillClientImpl::Close() {
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->Close();
+ delete *it;
+ }
+ m_clientConnections.clear();
+ if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_lastConnection=-1;
+ m_queriesExecuted=0;
+}
+
+DrillClientError* PooledDrillClientImpl::getError(){
+ std::string errMsg;
+ std::string nl="";
+ uint32_t stat;
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->getError() != NULL){
+ errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg;
+ stat=(*it)->getError()->status;
+ }
+ }
+ if(errMsg.length()>0){
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; }
+ m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg);
+ }
+ return m_pError;
+}
+
+//Waits as long as any one drillbit connection has results pending
+void PooledDrillClientImpl::waitForResults(){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->waitForResults();
+ }
+ return;
+}
+
+connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
+ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;)
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_pError=pErr;
+ return status;
+}
+
+DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
+ DrillClientImpl* pDrillClientImpl = NULL;
+ while(pDrillClientImpl==NULL){
+ if(m_queriesExecuted == 0){
+ // First query ever sent can use the connection already established to authenticate the user
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed
+ }else if(m_clientConnections.size() == m_maxConcurrentConnections){
+ // Pool is full. Use one of the already established connections
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections];
+ if(!pDrillClientImpl->Active()){
+ Utils::eraseRemove(m_clientConnections, pDrillClientImpl);
+ pDrillClientImpl=NULL;
+ }
+ }else{
+ int tries=0;
+ connectionStatus_t ret=CONN_SUCCESS;
+ while(pDrillClientImpl==NULL && tries++ < 3){
+ if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl=m_clientConnections.back();
+ ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ if(ret!=CONN_SUCCESS){
+ delete pDrillClientImpl; pDrillClientImpl=NULL;
+ m_clientConnections.erase(m_clientConnections.end());
+ }
+ }
+ } // try a few times
+ if(ret!=CONN_SUCCESS){
+ break;
+ }
+ } // need a new connection
+ }// while
+
+ if(pDrillClientImpl==NULL){
+ connectionStatus_t status = CONN_NOTCONNECTED;
+ handleConnError(status, getMessage(status));
+ }
+ return pDrillClientImpl;
+}
+
char ZookeeperImpl::s_drillRoot[]="/drill/";
char ZookeeperImpl::s_defaultCluster[]="drillbits1";
@@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
return ZOO_LOG_LEVEL_ERROR;
}
+int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){
+ uint32_t waitTime=30000; // 10 seconds
+ zoo_set_debug_level(getZkLogLevel());
+ zoo_deterministic_conn_order(1); // enable deterministic order
+ struct String_vector* pDrillbits=NULL;
+ m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
+ if(!m_zh) {
+ m_err = getMessage(ERR_CONN_ZKFAIL);
+ zookeeper_close(m_zh);
+ return -1;
+ }else{
+ m_err="";
+ //Wait for the completion handler to signal successful connection
+ boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+ boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
+ while(this->m_bConnecting) {
+ if(!this->m_cv.timed_wait(bufferLock, timeout)){
+ m_err = getMessage(ERR_CONN_ZKTIMOUT);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ }
+ }
+ if(m_state!=ZOO_CONNECTED_STATE){
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ int rc = ZOK;
+ if(pathToDrill==NULL || strlen(pathToDrill)==0){
+ m_rootDir=s_drillRoot;
+ m_rootDir += s_defaultCluster;
+ }else{
+ m_rootDir=pathToDrill;
+ }
+
+ pDrillbits = new String_vector;
+ rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
+ if(rc!=ZOK){
+ delete pDrillbits;
+ m_err=getMessage(ERR_CONN_ZKERR, rc);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ if(pDrillbits && pDrillbits->count > 0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster ("
+ << connectStr << "/" << pathToDrill
+ << ")." <<std::endl;)
+ for(int i=0; i<pDrillbits->count; i++){
+ drillbits.push_back(pDrillbits->data[i]);
+ }
+ for(int i=0; i<drillbits.size(); i++){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
+ }
+ }
+ delete pDrillbits;
+ return 0;
+}
+
+int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){
+ int rc = ZOK;
+ exec::DrillServiceInstance drillServiceInstance;
+ if( drillbits.size() >0){
+ // pick the drillbit at 'index'
+ const char * bit=drillbits[index].c_str();
+ std::string s;
+ s=m_rootDir + std::string("/") + bit;
+ int buffer_len=MAX_CONNECT_STR;
+ char buffer[MAX_CONNECT_STR+1];
+ struct Stat stat;
+ buffer[MAX_CONNECT_STR]=0;
+ rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat);
+ if(rc!=ZOK){
+ m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ exec::DrillServiceInstance drillServiceInstance;
+ drillServiceInstance.ParseFromArray(buffer, buffer_len);
+ endpoint=drillServiceInstance.endpoint();
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;)
+ }else{
+
+ m_err=getMessage(ERR_CONN_ZKNODBIT);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ return 0;
+}
+
+// Deprecated
int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
uint32_t waitTime=30000; // 10 seconds
zoo_set_debug_level(getZkLogLevel());
@@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
// signal the cond var
{
if (state == ZOO_CONNECTED_STATE){
- DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
}
boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
self->m_bConnecting=false;
@@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
void ZookeeperImpl:: debugPrint(){
if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
- DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;)
}
}
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f19a015e5..06f37e059 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -34,13 +34,18 @@
#include <queue>
#include <vector>
#include <boost/asio.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <boost/thread.hpp>
-#ifdef _WIN32
+
+#if defined _WIN32 || defined _WIN64
#include <zookeeper.h>
+//Windows header files redefine 'random'
+#ifdef random
+#undef random
+#endif
#else
#include <zookeeper/zookeeper.h>
#endif
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/thread.hpp>
#include "drill/drillClient.hpp"
#include "rpcEncoder.hpp"
@@ -58,12 +63,50 @@ class RecordBatch;
class RpcEncoder;
class RpcDecoder;
+/*
+ * Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl
+ * */
+class DrillClientImplBase{
+ public:
+ DrillClientImplBase(){
+ }
+
+ virtual ~DrillClientImplBase(){
+ }
+
+ //Connect via Zookeeper or directly.
+ //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
+ virtual connectionStatus_t connect(const char* connStr)=0;
+
+ // Test whether the client is active. Returns true if any one of the underlying connections is active
+ virtual bool Active()=0;
+
+ // Closes all open connections.
+ virtual void Close()=0;
+
+ // Returns the last error encountered by any of the underlying executing queries or connections
+ virtual DrillClientError* getError()=0;
+
+ // Submits a query to a drillbit.
+ virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0;
+
+ //Waits as a connection has results pending
+ virtual void waitForResults()=0;
+
+ //Validates handshake at connect time.
+ virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0;
+
+ virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0;
+
+};
+
class DrillClientQueryResult{
friend class DrillClientImpl;
public:
- DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId):
+ DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query):
m_pClient(pClient),
m_coordinationId(coordId),
+ m_query(query),
m_numBatches(0),
m_columnDefs(new std::vector<Drill::FieldMetadata*>),
m_bIsQueryPending(true),
@@ -116,6 +159,7 @@ class DrillClientQueryResult{
bool isCancelled(){return this->m_bCancel;};
bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
int32_t getCoordinationId(){ return this->m_coordinationId;}
+ const std::string& getQuery(){ return this->m_query;}
void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
void* getListenerContext() {return this->m_pListenerCtx;}
@@ -147,6 +191,8 @@ class DrillClientQueryResult{
DrillClientImpl* m_pClient;
int32_t m_coordinationId;
+ const std::string& m_query;
+
size_t m_numBatches; // number of record batches received so far
// Vector of Buffers holding data returned by the server
@@ -189,7 +235,7 @@ class DrillClientQueryResult{
void * m_pListenerCtx;
};
-class DrillClientImpl{
+class DrillClientImpl : public DrillClientImplBase{
public:
DrillClientImpl():
m_coordinationId(1),
@@ -256,9 +302,14 @@ class DrillClientImpl{
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
void waitForResults();
connectionStatus_t validateHandshake(DrillUserProperties* props);
+ void freeQueryResources(DrillClientQueryResult* pQryResult){
+ // Doesn't need to do anything
+ return;
+ };
private:
friend class DrillClientQueryResult;
+ friend class PooledDrillClientImpl;
struct compareQueryId{
bool operator()(const exec::shared::QueryId* q1, const exec::shared::QueryId* q2) const {
@@ -275,7 +326,6 @@ class DrillClientImpl{
void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
- void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr);
// send synchronous messages
//connectionStatus_t recvSync(InBoundRpcMessage& msg);
connectionStatus_t sendSync(OutBoundRpcMessage& msg);
@@ -331,6 +381,9 @@ class DrillClientImpl{
std::string m_handshakeErrorMsg;
bool m_bIsConnected;
+ std::string m_connectStr;
+
+ //
// number of outstanding read requests.
// handleRead will keep asking for more results as long as this number is not zero.
size_t m_pendingRequests;
@@ -356,6 +409,8 @@ class DrillClientImpl{
boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
+ std::string m_connectedHost; // The hostname and port the socket is connected to.
+
//for synchronous messages, like validate handshake
ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
DataBuf m_wbuf; // buffer for sending synchronous message
@@ -372,12 +427,106 @@ class DrillClientImpl{
// Condition variable to signal completion of all queries.
boost::condition_variable m_cv;
+ bool m_bIsDirectConnection;
};
inline bool DrillClientImpl::Active() {
return this->m_bIsConnected;;
}
+
+/* *
+ * Provides the same public interface as a DrillClientImpl but holds a pool of DrillClientImpls.
+ * Every submitQuery uses a different DrillClientImpl to distribute the load.
+ * DrillClient can use this class instead of DrillClientImpl to get better load balancing.
+ * */
+class PooledDrillClientImpl : public DrillClientImplBase{
+ public:
+ PooledDrillClientImpl(){
+ m_bIsDirectConnection=false;
+ m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS;
+ char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV);
+ if(maxConn!=NULL){
+ m_maxConcurrentConnections=atoi(maxConn);
+ }
+ m_lastConnection=-1;
+ m_pError=NULL;
+ m_queriesExecuted=0;
+ m_pUserProperties=NULL;
+ }
+
+ ~PooledDrillClientImpl(){
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ delete *it;
+ }
+ m_clientConnections.clear();
+ if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ }
+
+ //Connect via Zookeeper or directly.
+ //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
+ connectionStatus_t connect(const char* connStr);
+
+ // Test whether the client is active. Returns true if any one of the underlying connections is active
+ bool Active();
+
+ // Closes all open connections.
+ void Close() ;
+
+ // Returns the last error encountered by any of the underlying executing queries or connections
+ DrillClientError* getError();
+
+ // Submits a query to a drillbit. If more than one query is to be sent, we may choose a
+ // a different drillbit in the pool. No more than m_maxConcurrentConnections will be allowed.
+ // Connections once added to the pool will be removed only when the DrillClient is closed.
+ DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
+
+ //Waits as long as any one drillbit connection has results pending
+ void waitForResults();
+
+ //Validates handshake only against the first drillbit connected to.
+ connectionStatus_t validateHandshake(DrillUserProperties* props);
+
+ void freeQueryResources(DrillClientQueryResult* pQryResult);
+
+ int getDrillbitCount(){ return m_drillbits.size();};
+
+ private:
+
+ std::string m_connectStr;
+ std::string m_lastQuery;
+
+ // A list of all the current client connections. We choose a new one for every query.
+ // When picking a drillClientImpl to use, we see how many queries each drillClientImpl
+ // is currently executing. If none,
+ std::vector<DrillClientImpl*> m_clientConnections;
+ boost::mutex m_poolMutex; // protect access to the vector
+
+ //ZookeeperImpl zook;
+
+ // Use this to decide which drillbit to select next from the list of drillbits.
+ size_t m_lastConnection;
+ boost::mutex m_cMutex;
+
+ // Number of queries executed so far. Can be used to select a new Drillbit from the pool.
+ size_t m_queriesExecuted;
+
+ size_t m_maxConcurrentConnections;
+
+ bool m_bIsDirectConnection;
+
+ DrillClientError* m_pError;
+
+ connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
+ // get a connection from the pool or create a new one. Return NULL if none is found
+ DrillClientImpl* getOneConnection();
+
+ std::vector<std::string> m_drillbits;
+
+ DrillUserProperties* m_pUserProperties;//Keep a copy of user properties
+};
+
class ZookeeperImpl{
public:
ZookeeperImpl();
@@ -385,12 +534,17 @@ class ZookeeperImpl{
static ZooLogLevel getZkLogLevel();
// comma separated host:port pairs, each corresponding to a zk
// server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
- int connectToZookeeper(const char* connectStr, const char* pathToDrill);
+ DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill);
void close();
static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context);
void debugPrint();
std::string& getError(){return m_err;}
const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();}
+ // return unshuffled list of drillbits
+ int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits);
+ // picks the index drillbit and returns the corresponding endpoint object
+ int getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint);
+
private:
static char s_drillRoot[];
@@ -407,6 +561,7 @@ class ZookeeperImpl{
boost::condition_variable m_cv;
bool m_bConnecting;
exec::DrillServiceInstance m_drillServiceInstance;
+ std::string m_rootDir;
};
} // namespace Drill
diff --git a/contrib/native/client/src/clientlib/env.h.in b/contrib/native/client/src/clientlib/env.h.in
new file mode 100644
index 000000000..a32f1521d
--- /dev/null
+++ b/contrib/native/client/src/clientlib/env.h.in
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ENV_H
+#define ENV_H
+
+#define GIT_COMMIT_PROP @GIT_COMMIT_PROP@
+
+#endif
+
+
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index 11661f8c4..47d165f69 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -47,6 +47,8 @@ static Drill::ErrorMessages errorMessages[]={
{ERR_CONN_AUTHFAIL, ERR_CATEGORY_CONN, 0, "User authentication failed (please check the username and password)."
"[Server message was: (%s) %s]"},
{ERR_CONN_UNKNOWN_ERR, ERR_CATEGORY_CONN, 0, "Handshake Failed due to an error on the server. [Server message was: (%s) %s]"},
+ {ERR_CONN_NOCONN, ERR_CATEGORY_CONN, 0, "There is no connection to the server."},
+ {ERR_CONN_ALREADYCONN, ERR_CATEGORY_CONN, 0, "This client is already connected to a server."},
{ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."},
{ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"},
{ERR_QRY_INVREADLEN, ERR_CATEGORY_QRY, 0, "Internal Error: Received a message with an invalid read length."},
diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp
index b82efaaa2..cfb56a6b0 100644
--- a/contrib/native/client/src/clientlib/errmsgs.hpp
+++ b/contrib/native/client/src/clientlib/errmsgs.hpp
@@ -49,7 +49,9 @@ namespace Drill{
#define ERR_CONN_BAD_RPC_VER DRILL_ERR_START+16
#define ERR_CONN_AUTHFAIL DRILL_ERR_START+17
#define ERR_CONN_UNKNOWN_ERR DRILL_ERR_START+18
-#define ERR_CONN_MAX DRILL_ERR_START+18
+#define ERR_CONN_NOCONN DRILL_ERR_START+19
+#define ERR_CONN_ALREADYCONN DRILL_ERR_START+20
+#define ERR_CONN_MAX DRILL_ERR_START+20
#define ERR_QRY_OUTOFMEM ERR_CONN_MAX+1
#define ERR_QRY_COMMERR ERR_CONN_MAX+2
diff --git a/contrib/native/client/src/clientlib/logger.cpp b/contrib/native/client/src/clientlib/logger.cpp
index 5411d01eb..c498ee14b 100644
--- a/contrib/native/client/src/clientlib/logger.cpp
+++ b/contrib/native/client/src/clientlib/logger.cpp
@@ -1,28 +1,40 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <sys/types.h>
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/thread.hpp"
-
+#include "env.h"
+#include "utils.hpp"
#include "logger.hpp"
namespace Drill{
+/*
+* Creates a single instance of the logger the first time this is called
+*/
+/* static */ boost::mutex g_logMutex;
+Logger& getLogger() {
+ boost::lock_guard<boost::mutex> logLock(g_logMutex);
+ static Logger* logger = new Logger();
+ return *logger;
+}
+
std::string getTime(){
return to_simple_string(boost::posix_time::second_clock::local_time());
}
@@ -31,37 +43,77 @@ std::string getTid(){
return boost::lexical_cast<std::string>(boost::this_thread::get_id());
}
-logLevel_t Logger::s_level=LOG_ERROR;
-std::ostream* Logger::s_pOutStream=NULL;
-std::ofstream* Logger::s_pOutFileStream=NULL;
-char* Logger::s_filepath=NULL;
-
void Logger::init(const char* path){
- if(path!=NULL) {
- s_pOutFileStream = new std::ofstream;
- s_pOutFileStream->open(path, std::ofstream::out);
- if(!s_pOutFileStream->is_open()){
- std::cerr << "Logfile could not be opened. Logging to stdout" << std::endl;
+ static bool initialized = false;
+ boost::lock_guard<boost::mutex> logLock(m_logMutex);
+ if (!initialized && path != NULL) {
+ std::string fullname = path;
+ size_t lastindex = fullname.find_last_of(".");
+ std::string filename;
+ if (lastindex != std::string::npos){
+ filename = fullname.substr(0, lastindex)
+ + "-"
+ + Utils::to_string(Utils::s_randomNumber())
+ + fullname.substr(lastindex, fullname.length());
}
+ else{
+ filename = fullname.substr(0, fullname.length())
+ + "-"
+ + Utils::to_string(Utils::s_randomNumber())
+ + ".log";
+ }
+ //m_filepath=path;
+ m_filepath = filename.c_str();
+ m_pOutFileStream = new std::ofstream;
+ m_pOutFileStream->open(m_filepath.c_str(), std::ios_base::out | std::ios_base::app);
+ if (!m_pOutFileStream->is_open()){
+ std::cerr << "Logfile ( " << m_filepath << ") could not be opened. Logging to stdout" << std::endl;
+ m_filepath.erase();
+ delete m_pOutFileStream; m_pOutFileStream=NULL;
+ }
+ initialized = true;
+
+ m_pOutStream = (m_pOutFileStream != NULL) ? m_pOutFileStream : &std::cout;
+#if defined _WIN32 || defined _WIN64
+
+ TCHAR szFile[MAX_PATH];
+ GetModuleFileName(NULL, szFile, MAX_PATH);
+#endif
+ *m_pOutStream
+ << "Drill Client Library" << std::endl
+ << "Build info:" << GIT_COMMIT_PROP << std::endl
+
+#if defined _WIN32 || defined _WIN64
+ << "Loaded by process: " << szFile << std::endl
+ << "Current process id is: " << ::GetCurrentProcessId() << std::endl
+#else
+ << "Current process id is: " << getpid() << std::endl
+#endif
+ << "Initialized Logging to file (" << ((path!=NULL)?path:"std::out") << "). "
+ << std::endl;
}
- s_pOutStream=(s_pOutFileStream!=NULL && s_pOutFileStream->is_open())?s_pOutFileStream:&std::cout;
}
void Logger::close(){
- if(s_pOutFileStream !=NULL){
- if(s_pOutFileStream->is_open()){
- s_pOutFileStream->close();
+ //boost::lock_guard<boost::mutex> logLock(Drill::Logger::m_logMutex);
+ boost::lock_guard<boost::mutex> logLock(m_logMutex);
+ if (m_pOutFileStream != NULL){
+ if (m_pOutFileStream->is_open()){
+ m_pOutFileStream->close();
}
- delete s_pOutFileStream; s_pOutFileStream=NULL;
+ delete m_pOutFileStream; m_pOutFileStream = NULL;
+ m_pOutStream = &std::cout; // set it back to std::cout in case someone tries to log even after close
}
}
+// The log call itself cannot be thread safe. Use the DRILL_MT_LOG macro to make
+// this thread safe
std::ostream& Logger::log(logLevel_t level){
- *s_pOutStream << getTime();
- *s_pOutStream << " : "<<levelAsString(level);
- *s_pOutStream << " : "<<getTid();
- *s_pOutStream << " : ";
- return *s_pOutStream;
+ *m_pOutStream << getTime();
+ *m_pOutStream << " : " << levelAsString(level);
+ *m_pOutStream << " : " << getTid();
+ *m_pOutStream << " : ";
+ return *m_pOutStream;
}
diff --git a/contrib/native/client/src/clientlib/logger.hpp b/contrib/native/client/src/clientlib/logger.hpp
index e3edb1329..7baf50c41 100644
--- a/contrib/native/client/src/clientlib/logger.hpp
+++ b/contrib/native/client/src/clientlib/logger.hpp
@@ -1,20 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
#ifndef __LOGGER_H
#define __LOGGER_H
@@ -25,47 +25,60 @@
#include <string>
#include <stdio.h>
+#include <boost/thread/mutex.hpp>
#include "drill/common.hpp"
namespace Drill{
class Logger{
public:
- Logger(){}
+ Logger(){
+ m_level = LOG_ERROR;
+ m_pOutFileStream = NULL;
+ m_pOutStream = &std::cout;
+ }
~Logger(){ }
- static void init(const char* path);
- static void close();
- static std::ostream& log(logLevel_t level);
- static std::string levelAsString(logLevel_t level) {
+ void init(const char* path);
+ void close();
+ std::ostream& log(logLevel_t level);
+ std::string levelAsString(logLevel_t level) {
static const char* const levelNames[] = {
- "TRACE",
- "DEBUG",
- "INFO",
+ "TRACE ",
+ "DEBUG ",
+ "INFO ",
"WARNING",
- "ERROR",
- "FATAL"
+ "ERROR ",
+ "FATAL "
};
- return levelNames[level];
+ return levelNames[level>=LOG_TRACE && level<=LOG_FATAL?level:LOG_ERROR];
}
// The logging level
- static logLevel_t s_level;
- static std::ostream* s_pOutStream;
+ logLevel_t m_level;
+ std::ostream* m_pOutStream;
+ boost::mutex m_logMutex;
private:
- //static std::ostream* s_pOutStream;
- static std::ofstream* s_pOutFileStream;
- static char* s_filepath;
+ std::ofstream* m_pOutFileStream;
+ std::string m_filepath;
}; // Logger
-std::string getTime();
-std::string getTid();
+ Logger& getLogger();
+ std::string getTime();
+ std::string getTid();
+
+#define DRILL_MT_LOG(LOG) \
+ { \
+ boost::lock_guard<boost::mutex> logLock(getLogger().m_logMutex); \
+ LOG \
+ }
#define DRILL_LOG(level) \
- if (Logger::s_pOutStream==NULL || level < Drill::Logger::s_level); \
- else Drill::Logger::log(level) \
+ if (getLogger().m_pOutStream==NULL || level < getLogger().m_level); \
+ else getLogger().log(level) \
+
} // namespace Drill
diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp
index f1f03a1e3..1e6a8774e 100644
--- a/contrib/native/client/src/clientlib/utils.cpp
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -1,68 +1,107 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <limits.h>
#include <stdlib.h>
#include "utils.hpp"
+#include "logger.hpp"
#include "drill/common.hpp"
namespace Drill{
+
+boost::random::random_device Utils::s_RNG;
+boost::random::mt19937 Utils::s_URNG(s_RNG());
+boost::uniform_int<> Utils::s_uniformDist(0,std::numeric_limits<int>::max()-1);
+boost::variate_generator<boost::random::mt19937&, boost::uniform_int<> > Utils::s_randomNumber(s_URNG, s_uniformDist);
+
boost::mutex AllocatedBuffer::s_memCVMutex;
boost::condition_variable AllocatedBuffer::s_memCV;
-size_t AllocatedBuffer::s_allocatedMem=0;
-bool AllocatedBuffer::s_isBufferLimitReached=false;
+size_t AllocatedBuffer::s_allocatedMem = 0;
+bool AllocatedBuffer::s_isBufferLimitReached = false;
+boost::mutex s_utilMutex;
ByteBuf_t Utils::allocateBuffer(size_t len){
boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
- AllocatedBuffer::s_allocatedMem+=len;
+ AllocatedBuffer::s_allocatedMem += len;
//http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc
- ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t));
- size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
- if(b!=NULL && AllocatedBuffer::s_allocatedMem >= safeSize){
- AllocatedBuffer::s_isBufferLimitReached=true;
+ ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t));
+ size_t safeSize = DrillClientConfig::getBufferLimit() - MEM_CHUNK_SIZE;
+ if (b != NULL && AllocatedBuffer::s_allocatedMem >= safeSize){
+ AllocatedBuffer::s_isBufferLimitReached = true;
}
return b;
}
-void Utils::freeBuffer(ByteBuf_t b, size_t len){
+void Utils::freeBuffer(ByteBuf_t b, size_t len){
boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
- AllocatedBuffer::s_allocatedMem-=len;
- free(b);
- size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
- if(b!=NULL && AllocatedBuffer::s_allocatedMem < safeSize){
- AllocatedBuffer::s_isBufferLimitReached=false;
+ AllocatedBuffer::s_allocatedMem -= len;
+ free(b);
+ size_t safeSize = DrillClientConfig::getBufferLimit() - MEM_CHUNK_SIZE;
+ if (b != NULL && AllocatedBuffer::s_allocatedMem < safeSize){
+ AllocatedBuffer::s_isBufferLimitReached = false;
//signal any waiting threads
AllocatedBuffer::s_memCV.notify_one();
}
}
+void Utils::parseConnectStr(const char* connectStr,
+ std::string& pathToDrill,
+ std::string& protocol,
+ std::string& hostPortStr){
+ boost::lock_guard<boost::mutex> memLock(s_utilMutex);
+ char u[MAX_CONNECT_STR + 1];
+ strncpy(u, connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR] = 0;
+ char* z = strtok(u, "=");
+ char* c = strtok(NULL, "/");
+ char* p = strtok(NULL, "");
+
+ if (p != NULL) pathToDrill = std::string("/") + p;
+ protocol = z; hostPortStr = c;
+ return;
+}
+
+void Utils::shuffle(std::vector<std::string>& vector){
+ std::random_shuffle(vector.begin(), vector.end(), Utils::s_randomNumber);
+ return;
+}
+
+void Utils::add(std::vector<std::string>& vector1, std::vector<std::string>& vector2){
+ std::vector<std::string>::iterator it;
+ for (it = vector2.begin(); it != vector2.end(); it++) {
+ std::vector<std::string>::iterator it2 = std::find(vector1.begin(), vector1.end(), *it);
+ if (it2 == vector1.end()){
+ vector1.push_back(*it);
+ }
+ }
+}
AllocatedBuffer::AllocatedBuffer(size_t l){
- m_pBuffer=NULL;
- m_pBuffer=Utils::allocateBuffer(l);
- m_bufSize=m_pBuffer!=NULL?l:0;
+ m_pBuffer = NULL;
+ m_pBuffer = Utils::allocateBuffer(l);
+ m_bufSize = m_pBuffer != NULL ? l : 0;
}
AllocatedBuffer::~AllocatedBuffer(){
- Utils::freeBuffer(m_pBuffer, m_bufSize);
- m_pBuffer=NULL;
- m_bufSize=0;
+ Utils::freeBuffer(m_pBuffer, m_bufSize);
+ m_pBuffer = NULL;
+ m_bufSize = 0;
}
} // namespace
diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp
index 0f26ad69c..36fb91f81 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -1,20 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
#ifndef __UTILS_H
#define __UTILS_H
@@ -23,6 +23,19 @@
#include <ostream>
#include <fstream>
#include <string>
+#include <vector>
+
+#if defined _WIN32 || defined _WIN64
+ //Windows header files redefine 'random'
+ #ifdef random
+ #undef random
+ #endif
+#endif
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/random/mersenne_twister.hpp> // for mt19937
+#include <boost/random/random_device.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/variate_generator.hpp>
#include <boost/thread.hpp>
#include "drill/common.hpp"
@@ -33,26 +46,57 @@ namespace Drill{
// Wrapper Class to keep track of allocated memory
class AllocatedBuffer{
public:
- AllocatedBuffer(size_t l);
- ~AllocatedBuffer();
-
- ByteBuf_t m_pBuffer;
- size_t m_bufSize;
-
- // keep track of allocated memory. The client lib blocks
- // if we have allocated up to a limit (defined in drillClientConfig).
- static boost::mutex s_memCVMutex;
- static boost::condition_variable s_memCV;
- static size_t s_allocatedMem;
- static bool s_isBufferLimitReached;
+ AllocatedBuffer(size_t l);
+ ~AllocatedBuffer();
+
+ ByteBuf_t m_pBuffer;
+ size_t m_bufSize;
+
+ // keep track of allocated memory. The client lib blocks
+ // if we have allocated up to a limit (defined in drillClientConfig).
+ static boost::mutex s_memCVMutex;
+ static boost::condition_variable s_memCV;
+ static size_t s_allocatedMem;
+ static bool s_isBufferLimitReached;
+ static boost::mutex s_utilMutex; // for provideing safety around strtok and other non-reentrant functions
};
class Utils{
public:
+ static boost::random::random_device s_RNG; //Truly random (expensive and device dependent)
+ static boost::random::mt19937 s_URNG; //Pseudo random with a period of ( 2^19937 - 1 )
+ static boost::uniform_int<> s_uniformDist; // Produces a uniform distribution
+ static boost::variate_generator<boost::random::mt19937&, boost::uniform_int<> > s_randomNumber; // a random number generator also usable by shuffle
+
//allocate memory for Record Batches
static ByteBuf_t allocateBuffer(size_t len);
static void freeBuffer(ByteBuf_t b, size_t len);
+ static void parseConnectStr(const char* connectStr,
+ std::string& pathToDrill,
+ std::string& protocol,
+ std::string& hostPortStr);
+
+ // useful vector methods/idioms
+
+ // performs a random shuffle on a string vector
+ static void shuffle(std::vector<std::string>& vector);
+
+ // adds the contents of vector2 to vector1
+ static void add(std::vector<std::string>& vector1, std::vector<std::string>& vector2);
+
+ // removes the element from the vector
+ template <typename T> static void eraseRemove(std::vector<T>& vector, T elem){
+ vector.erase(std::remove(vector.begin(), vector.end(), elem), vector.end());
+ }
+
+ // Provide a to_string that works with older C++ compilers
+ template <typename T> static std::string to_string(T val) {
+ std::stringstream stream;
+ stream << val;
+ return stream.str();
+ }
+
}; // Utils
} // namespace Drill
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index bb8e2b4a3..a617dc71f 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -45,6 +45,11 @@
#define MEM_CHUNK_SIZE 64*1024; // 64K
#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB
+#define MAX_BATCH_SIZE 65536; // see RecordBatch.java
+#define ENABLE_CONNECTION_POOL_ENV "DRILL_ENABLE_CONN_POOL"
+#define DEFAULT_MAX_CONCURRENT_CONNECTIONS 10
+#define MAX_CONCURRENT_CONNECTIONS_ENV "DRILL_MAX_CONN"
+
#ifdef _DEBUG
#define EXTRA_DEBUGGING
#define CODER_DEBUGGING
@@ -110,7 +115,9 @@ typedef enum{
CONN_HOSTNAME_RESOLUTION_ERROR=6,
CONN_AUTH_FAILED=7,
CONN_BAD_RPC_VER=8,
- CONN_DEAD=9
+ CONN_DEAD=9,
+ CONN_NOTCONNECTED=10,
+ CONN_ALREADYCONNECTED=11
} connectionStatus_t;
typedef enum{
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 4568ca1fc..a74f4bdc7 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -53,6 +53,7 @@ namespace exec{
namespace Drill{
//struct UserServerEndPoint;
+class DrillClientImplBase;
class DrillClientImpl;
class DrillClientQueryResult;
class FieldMetadata;
@@ -340,6 +341,10 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
std::string& getError();
/*
+ * Returns the error message associated with the query handle
+ */
+ const std::string& getError(QueryHandle_t handle);
+ /*
* Applications using the async query submit method can register a listener for schema changes
*
*/
@@ -369,7 +374,7 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
static DrillClientInitializer s_init;
static DrillClientConfig s_config;
- DrillClientImpl * m_pImpl;
+ DrillClientImplBase * m_pImpl;
};
} // namespace Drill