From df0f0af3d963c1b65eb01c3141fe84532c53f5a5 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 12 Feb 2016 15:42:53 -0800 Subject: DRILL-4313: C++ Client - Thread safe Logging. Improved Drill bit selection. - Update random drill bit selection. Shuffle the list initially, then round robin. Add Utility methods to get random numbers and to shuffle and add vectors. Whitespace cleanup - Add Git properties to build and print to log. - Add interface to get error based on query handle. - Add support for Pooled connections. Allows switching between pooled and unpooled connections based on environment variables --- contrib/native/client/CMakeLists.txt | 24 +- .../native/client/cmakeModules/FindZookeeper.cmake | 2 +- contrib/native/client/example/querySubmitter.cpp | 25 +- .../native/client/src/clientlib/drillClient.cpp | 33 +- .../client/src/clientlib/drillClientImpl.cpp | 600 +++++++++++++++------ .../client/src/clientlib/drillClientImpl.hpp | 169 +++++- contrib/native/client/src/clientlib/env.h.in | 26 + contrib/native/client/src/clientlib/errmsgs.cpp | 2 + contrib/native/client/src/clientlib/errmsgs.hpp | 4 +- contrib/native/client/src/clientlib/logger.cpp | 126 +++-- contrib/native/client/src/clientlib/logger.hpp | 85 +-- contrib/native/client/src/clientlib/utils.cpp | 109 ++-- contrib/native/client/src/clientlib/utils.hpp | 100 +++- contrib/native/client/src/include/drill/common.hpp | 9 +- .../client/src/include/drill/drillClient.hpp | 7 +- 15 files changed, 1001 insertions(+), 320 deletions(-) create mode 100644 contrib/native/client/src/clientlib/env.h.in (limited to 'contrib') diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt index 603586d0e..b22af42f7 100644 --- a/contrib/native/client/CMakeLists.txt +++ b/contrib/native/client/CMakeLists.txt @@ -22,8 +22,20 @@ project(drillclient) message("Project Dir = ${PROJECT_SOURCE_DIR}") message("Source Dir = ${CMAKE_SOURCE_DIR} ") +cmake_policy(SET CMP0043 NEW) + set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/") +# Get the latest git commit properties of the working branch +execute_process( + COMMAND git log -1 --format="\\nCommit: %H \\nDescription: %s \\nAuthor: %aN Date: %ai" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + OUTPUT_VARIABLE GIT_COMMIT_PROP + OUTPUT_STRIP_TRAILING_WHITESPACE + ) +add_definitions("-DGIT_COMMIT_PROP=${GIT_COMMIT_PROP}") + + # Find Boost if(MSVC) @@ -36,7 +48,7 @@ else() set(Boost_USE_STATIC_RUNTIME OFF) endif() -find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread ) +find_package(Boost 1.53.0 REQUIRED COMPONENTS regex system date_time chrono thread random) include_directories(${Boost_INCLUDE_DIRS}) if(CMAKE_COMPILER_IS_GNUCXX) @@ -63,6 +75,16 @@ include_directories(${PROTOBUF_INCLUDE_DIR}) #Find Zookeeper find_package(Zookeeper REQUIRED ) + +# Generated sources +configure_file( + ${CMAKE_SOURCE_DIR}/src/clientlib/env.h.in + ${CMAKE_BINARY_DIR}/generated/env.h + ) + +include_directories(${CMAKE_BINARY_DIR}/generated) + + # # TARGETS # diff --git a/contrib/native/client/cmakeModules/FindZookeeper.cmake b/contrib/native/client/cmakeModules/FindZookeeper.cmake index fd8247f28..151c05cda 100644 --- a/contrib/native/client/cmakeModules/FindZookeeper.cmake +++ b/contrib/native/client/cmakeModules/FindZookeeper.cmake @@ -40,7 +40,7 @@ if (MSVC) message("- CMAKE will look for zookeeper library files in $ZOOKEEPER_HOME/src/c/Debug or $ZOOKEEPER_HOME/src/c/Release.") else() FILE(TO_CMAKE_PATH ${ZOOKEEPER_HOME} Zookeeper_HomePath) - set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir}) + set(Zookeeper_LIB_PATHS ${Zookeeper_HomePath}/src/c/${ZK_BuildOutputDir} ${Zookeeper_HomePath}/src/c/x64/${ZK_BuildOutputDir} ) find_path(ZK_INCLUDE_DIR zookeeper.h ${Zookeeper_HomePath}/src/c/include) find_path(ZK_INCLUDE_DIR_GEN zookeeper.jute.h ${Zookeeper_HomePath}/src/c/generated) diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 960ff4f44..d507d1bb2 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "drill/drillc.hpp" int nOptions=13; @@ -65,11 +66,13 @@ Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::Dril } } +boost::mutex listenerMutex; Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::DrillClientError* err){ // Invariant: // (received an record batch and err is NULL) // or // (received query state message passed by `err` and b is NULL) + boost::lock_guard listenerLock(listenerMutex); if(!err){ if(b!=NULL){ b->print(std::cout, 0); // print all rows @@ -317,16 +320,24 @@ int main(int argc, char* argv[]) { std::vector::iterator queryHandleIter; Drill::DrillClient client; - // To log to file - //DrillClient::initLogging("/var/log/drill/", l); +#if defined _WIN32 || defined _WIN64 + TCHAR tempPath[MAX_PATH]; + GetTempPath(MAX_PATH, tempPath); + char logpathPrefix[MAX_PATH + 128]; + strcpy(logpathPrefix,tempPath); + strcat(logpathPrefix, "\\drillclient"); +#else + char* logpathPrefix = "/var/log/drill/drillclient"; +#endif + // To log to file + Drill::DrillClient::initLogging(logpathPrefix, l); // To log to stderr - Drill::DrillClient::initLogging(NULL, l); - //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold at least two record batches. - int nQueries=queryInputs.size(); - Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches. + //Drill::DrillClient::initLogging(NULL, l); + int nQueries=queryInputs.size(); + Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. The size of a record batch may vary, but is unlikely to exceed the 256 MB which is the default. - if (!hshakeTimeout.empty()){ + if(!hshakeTimeout.empty()){ Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str())); } if (!queryTimeout.empty()){ diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index 708793867..92c5194d6 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -16,7 +16,7 @@ * limitations under the License. */ - +#include #include #include "drill/common.hpp" #include "drill/drillClient.hpp" @@ -56,21 +56,22 @@ int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 seconds boost::mutex DrillClientConfig::s_mutex; DrillClientConfig::DrillClientConfig(){ - initLogging(NULL); + // Do not initialize logging. The Logger object is static and may + // not have been initialized yet + //initLogging(NULL); } DrillClientConfig::~DrillClientConfig(){ - Logger::close(); } void DrillClientConfig::initLogging(const char* path){ - Logger::init(path); + getLogger().init(path); } void DrillClientConfig::setLogLevel(logLevel_t l){ boost::lock_guard configLock(DrillClientConfig::s_mutex); s_logLevel=l; - Logger::s_level=l; + getLogger().m_level=l; //boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel); } @@ -163,7 +164,7 @@ RecordIterator::~RecordIterator(){ delete this->m_pQueryResult; this->m_pQueryResult=NULL; if(this->m_pCurrentRecordBatch!=NULL){ - DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL; } } @@ -224,7 +225,7 @@ status_t RecordIterator::next(){ if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){ boost::lock_guard bufferLock(this->m_recordBatchMutex); if(this->m_pCurrentRecordBatch !=NULL){ - DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) delete this->m_pCurrentRecordBatch; //free the previous record batch this->m_pCurrentRecordBatch=NULL; } @@ -235,12 +236,12 @@ status_t RecordIterator::next(){ } this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); if(this->m_pCurrentRecordBatch != NULL){ - DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;) }else{ - DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;) } if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){ - DRILL_LOG(LOG_TRACE) << "No more data." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;) ret = QRY_NO_MORE_DATA; }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){ ret=QRY_SUCCESS_WITH_INFO; @@ -315,7 +316,12 @@ void DrillClient::initLogging(const char* path, logLevel_t l){ } DrillClient::DrillClient(){ - this->m_pImpl=new DrillClientImpl; + const char* enablePooledClient=std::getenv(ENABLE_CONNECTION_POOL_ENV); + if(enablePooledClient!=NULL && atoi(enablePooledClient)!=0){ + this->m_pImpl=new PooledDrillClientImpl; + }else{ + this->m_pImpl=new DrillClientImpl; + } } DrillClient::~DrillClient(){ @@ -378,10 +384,12 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string& } void* DrillClient::getApplicationContext(QueryHandle_t handle){ + assert(handle!=NULL); return ((DrillClientQueryResult*)handle)->getListenerContext(); } status_t DrillClient::getQueryStatus(QueryHandle_t handle){ + assert(handle!=NULL); return ((DrillClientQueryResult*)handle)->getQueryStatus(); } @@ -389,6 +397,9 @@ std::string& DrillClient::getError(){ return m_pImpl->getError()->msg; } +const std::string& DrillClient::getError(QueryHandle_t handle){ + return ((DrillClientQueryResult*)handle)->getError()->msg; +} void DrillClient::waitForResults(){ this->m_pImpl->waitForResults(); diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index d4e9ed96e..3ec01f521 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){ #endif } - -void DrillClientImpl::parseConnectStr(const char* connectStr, - std::string& pathToDrill, - std::string& protocol, - std::string& hostPortStr){ - char u[MAX_CONNECT_STR+1]; - strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0; - char* z=strtok(u, "="); - char* c=strtok(NULL, "/"); - char* p=strtok(NULL, ""); - - if(p!=NULL) pathToDrill=std::string("/")+p; - protocol=z; hostPortStr=c; - return; -} - connectionStatus_t DrillClientImpl::connect(const char* connStr){ std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; if(!this->m_bIsConnected){ - parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(!strcmp(protocol.c_str(), "zk")){ ZookeeperImpl zook; - if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){ + std::vector drillbits; + int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + if(!err){ + Utils::shuffle(drillbits); + exec::DrillbitEndpoint endpoint; + err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list + if(!err){ + host=boost::lexical_cast(endpoint.address()); + port=boost::lexical_cast(endpoint.user_port()); + } + } + if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); } - zook.debugPrint(); - exec::DrillbitEndpoint e=zook.getEndPoint(); - host=boost::lexical_cast(e.address()); - port=boost::lexical_cast(e.user_port()); zook.close(); + m_bIsDirectConnection=true; }else if(!strcmp(protocol.c_str(), "local")){ + boost::lock_guard lock(m_dcMutex);//strtok is not reentrant char tempStr[MAX_CONNECT_STR+1]; strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; host=strtok(tempStr, ":"); port=strtok(NULL, ""); + m_bIsDirectConnection=false; }else{ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } - return this->connect(host.c_str(), port.c_str()); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) + connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + return ret; + }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected + return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); } return CONN_SUCCESS; } @@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ tcp::resolver::iterator end; while (iter != end){ endpoint = *iter++; - DRILL_LOG(LOG_TRACE) << endpoint << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;) } boost::system::error_code ec; m_socket.connect(endpoint, ec); @@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what())); } + m_bIsConnected=true; // set socket keep alive boost::asio::socket_base::keep_alive keepAlive(true); m_socket.set_option(keepAlive); @@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ boost::asio::ip::tcp::no_delay noDelay(true); m_socket.set_option(noDelay); - // - // We put some OS dependent code here for timing out a socket. Mostly, this appears to - // do nothing. Should we leave it in there? - // - setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout()); - + std::ostringstream connectedHost; + connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port; + m_connectedHost = connectedHost.str(); + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;) + return CONN_SUCCESS; } void DrillClientImpl::startHeartbeatTimer(){ - DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " - << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " + << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;) m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency())); m_heartbeatTimer.async_wait(boost::bind( &DrillClientImpl::handleHeartbeatTimeout, this, boost::asio::placeholders::error )); - startMessageListener(); // start this thread early so we don't have the timer blocked + startMessageListener(); // start this thread early so we don't have the timer blocked } connectionStatus_t DrillClientImpl::sendHeartbeat(){ - connectionStatus_t status=CONN_SUCCESS; + connectionStatus_t status=CONN_SUCCESS; exec::rpc::Ack ack; ack.set_ok(true); OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); - boost::lock_guard prLock(this->m_prMutex); - boost::lock_guard lock(m_dcMutex); - DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl; + boost::lock_guard prLock(this->m_prMutex); + boost::lock_guard lock(m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;) status=sendSync(heartbeatMsg); status=status==CONN_SUCCESS?status:CONN_DEAD; //If the server sends responses to a heartbeat, we need to increment the pending requests counter. @@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){ void DrillClientImpl::resetHeartbeatTimer(){ m_heartbeatTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;) startHeartbeatTimer(); } - - void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;) if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " << to_simple_string(m_heartbeatTimer.expires_at()) << " and time now is: " << to_simple_string(boost::asio::deadline_timer::traits_type::now()) - << std::endl; + << std::endl;) ; if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. @@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e startHeartbeatTimer(); }else{ // Close connection. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection."; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";) shutdownSocket(); } } @@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e return; } - void DrillClientImpl::Close() { shutdownSocket(); } @@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ this, boost::asio::placeholders::error )); - DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " - << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " + << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;) } async_read( @@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";) m_io_service.run(); if(m_rbuf!=NULL){ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; @@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, boost::system::error_code error=err; // cancel the timer m_deadlineTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;) if(!error){ InBoundRpcMessage msg; uint32_t length = 0; @@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, boost::asio::buffer(b, bytesToRead), error); if(err) break; - DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;) if(dataBytesRead==bytesToRead) break; bytesToRead-=dataBytesRead; b+=dataBytesRead; } DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg); }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); return; } @@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";) handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT)); m_io_service.stop(); boost::system::error_code ignorederr; @@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){ - DRILL_LOG(LOG_TRACE) << "validateHandShake\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";) exec::user::UserToBitHandshake u2b; u2b.set_channel(exec::shared::USER); @@ -368,7 +364,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope std::string username; std::string err; if(!properties->validate(err)){ - DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;) } exec::user::UserProperties* userProperties = u2b.mutable_properties(); @@ -376,8 +372,8 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope for(size_t i=0; isize(); i++){ std::map::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i)); if(it==DrillUserProperties::USER_PROPERTIES.end()){ - DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) - << ") is unknown and is being skipped" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) + << ") is unknown and is being skipped" << std::endl;) continue; } if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){ @@ -392,9 +388,9 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope //u2b.set_credentials(&creds); } if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){ - DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;) }else{ - DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;) } }// Server properties } @@ -406,7 +402,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSync(out_msg); - DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";) } connectionStatus_t ret = recvHandshake(); @@ -416,21 +412,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope if(this->m_handshakeStatus != exec::user::SUCCESS){ switch(this->m_handshakeStatus){ case exec::user::RPC_VERSION_MISMATCH: - DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected " - << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected " + << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;) return handleConnError(CONN_BAD_RPC_VER, getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION, m_handshakeVersion, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::AUTH_FAILED: - DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;) return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_AUTHFAIL, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::UNKNOWN_FAILURE: - DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;) return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR, this->m_handshakeErrorId.c_str(), @@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() { if(this->m_pListenerThread==NULL){ // Stopping the io_service from running out-of-work if(m_io_service.stopped()){ - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <m_pWork = new boost::asio::io_service::work(m_io_service); this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service)); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " - << this->m_pListenerThread << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " + << this->m_pListenerThread << std::endl;) } } @@ -480,22 +476,23 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query); sendSync(out_msg); - pQuery = new DrillClientQueryResult(this, coordId); + pQuery = new DrillClientQueryResult(this, coordId, plan); pQuery->registerListener(l, lCtx); bool sendRequest=false; this->m_queryIds[coordId]=pQuery; - DRILL_LOG(LOG_DEBUG) << "Sent query request. Coordination id = " << coordId << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) if(m_pendingRequests++==0){ sendRequest=true; }else{ - DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl; - DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) } if(sendRequest){ - DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " - << m_pendingRequests << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " + << m_pendingRequests << std::endl;) getNextResult(); // async wait for results } } @@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){ { boost::unique_lock memLock(AllocatedBuffer::s_memCVMutex); - DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;) while(AllocatedBuffer::s_isBufferLimitReached){ AllocatedBuffer::s_memCV.wait(memLock); } @@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){ //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ - DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " - << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " + << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;) m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout())); m_deadlineTimer.async_wait(boost::bind( &DrillClientImpl::handleReadTimeout, @@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";) } void DrillClientImpl::waitForResults(){ @@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost::system::error_code& error){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " - << reinterpret_cast(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " + << reinterpret_cast(_buf) << std::endl;) size_t leftover=0; uint32_t rmsgLen; AllocatedBufferPtr currentBuffer; @@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard lock(this->m_dcMutex); int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen); - DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl; - DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;) if(rmsgLen>0){ leftover = LEN_PREFIX_BUFLEN - bytes_read; // Allocate a buffer currentBuffer=new AllocatedBuffer(rmsgLen); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ " - << currentBuffer << ", size = " << rmsgLen << " ]\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ " + << currentBuffer << ", size = " << rmsgLen << " ]\n";) if(currentBuffer==NULL){ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); @@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, if(leftover){ memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover); } - DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : " - << (rmsgLen - leftover) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : " + << (rmsgLen - leftover) << std::endl;) ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; @@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, boost::asio::buffer(b, bytesToRead), error); if(error) break; - DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;) if(dataBytesRead==bytesToRead) break; bytesToRead-=dataBytesRead; b+=dataBytesRead; @@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, if(!error){ // read data successfully DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); - DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer " + << reinterpret_cast(_buf) << std::endl;) Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } @@ -639,9 +636,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer boost::lock_guard lock(this->m_dcMutex); exec::shared::QueryResult qr; - DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;) qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;) qid.CopyFrom(qr.query_id()); @@ -657,7 +654,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer std::string valErr; if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;) return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } ret=processQueryStatusResult(&qr, pDrillClientQueryResult); @@ -665,9 +662,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer // We've received the final message for a query that has been cancelled // or for which the resources have been freed. We no longer need to listen // for more incoming messages for such a query. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;) m_pendingRequests--; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;) ret=QRY_CANCELED; } delete allocatedBuffer; @@ -676,10 +673,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer // Normal query results come back with query_state not set. // Actually this is not strictly true. The query state is set to // 0(i.e. PENDING), but protobuf thinks this means the value is not set. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";) } } - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. @@ -701,21 +698,21 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, boost::lock_guard lock(this->m_dcMutex); exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up. - DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;) qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) qid.CopyFrom(qr->query_id()); if(qid.part1()==0){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; return QRY_SUCCESS; } pDrillClientQueryResult=findQueryResult(qid); if(pDrillClientQueryResult==NULL){ - DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" - << debugPrintQid(qid) << ")." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" + << debugPrintQid(qid) << ")." << std::endl;) delete qr; delete allocatedBuffer; return ret; @@ -726,23 +723,23 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; delete qr; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) pDrillClientQueryResult->setQueryStatus(ret); return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } //Build Record Batch here - DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;) pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); pDrillClientQueryResult->m_numBatches++; - DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) pRecordBatch->build(); - DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " - << pRecordBatch->getNumRecords() << std::endl; - DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " - << pRecordBatch->getNumFields() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " + << pRecordBatch->getNumRecords() << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " + << pRecordBatch->getNumFields() << std::endl;) ret=pDrillClientQueryResult->setupColumnDefs(qr); if(ret==QRY_SUCCESS_WITH_INFO){ @@ -752,8 +749,8 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult->setIsQueryPending(true); pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener; if(pDrillClientQueryResult->m_bIsLastChunk){ - DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Received last batch. " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) + << "Received last batch. " << std::endl;) ret=QRY_NO_MORE_DATA; } pDrillClientQueryResult->setQueryStatus(ret); @@ -770,7 +767,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are // pushed on the wire before the cancel is processed. pDrillClientQueryResult->setIsQueryPending(false); - DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) pDrillClientQueryResult->setQueryStatus(ret); clearMapEntries(pDrillClientQueryResult); return ret; @@ -780,27 +777,27 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; - DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; boost::lock_guard lock(m_dcMutex); std::map::iterator it; for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL"); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first - << " QueryId: "<< qidString << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first + << " QueryId: "<< qidString << std::endl;) } if(msg.m_coord_id==0){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } it=this->m_queryIds.find(msg.m_coord_id); if(it!=this->m_queryIds.end()){ pDrillClientQueryResult=(*it).second; exec::shared::QueryId *qid = new exec::shared::QueryId; - DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) m_queryResults[qid]=pDrillClientQueryResult; //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); @@ -814,20 +811,20 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){ DrillClientQueryResult* pDrillClientQueryResult=NULL; - DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;) std::map::iterator it; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;) if(m_queryResults.size() != 0){ for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" - << it->first->part2() << "]\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" + << it->first->part2() << "]\n";) } } it=this->m_queryResults.find(&qid); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; - DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << - debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << + debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;) } return pDrillClientQueryResult; } @@ -870,7 +867,7 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr break; default: { - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";) ret=handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_UNKQRYSTATE), pDrillClientQueryResult); @@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";) handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL); // There is no longer an active deadline. The expiry is set to positive // infinity so that the timer never expires until a new deadline is set. @@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, const boost::system::error_code& err, size_t bytes_transferred) { boost::system::error_code error=err; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " - << reinterpret_cast(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " + << reinterpret_cast(_buf) << std::endl;) if(DrillClientConfig::getQueryTimeout() > 0){ // Cancel the timeout if handleRead is called - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";) m_deadlineTimer.cancel(); } if(!error){ InBoundRpcMessage msg; boost::lock_guard lock(this->m_prMutex); - DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) AllocatedBufferPtr allocatedBuffer=NULL; if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){ @@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away m_pendingRequests--; delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) if(m_pendingRequests!=0){ boost::lock_guard lock(this->m_dcMutex); getNextResult(); }else{ - boost::unique_lock cvLock(this->m_dcMutex); - DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl; - m_cv.notify_one(); + boost::unique_lock cvLock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) + m_cv.notify_one(); } return; }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ @@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // We have a socket read error, but we do not know which query this is for. // Signal ALL pending queries that they should stop waiting. delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;) handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; }else{ @@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // We should properly handle these handshake requests/responses if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){ if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) exec::user::UserToBitHandshake u2b; u2b.set_channel(exec::shared::USER); u2b.set_rpc_version(DRILL_RPC_VERSION); u2b.set_support_listening(true); OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); sendSync(out_msg); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) } }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " - << "QueryResult returned " << msg.m_rpc_type << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } delete allocatedBuffer; @@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // boost error Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); boost::lock_guard lock(this->m_dcMutex); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " - "Boost Communication Error: " << error.message() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " + "Boost Communication Error: " << error.message() << std::endl;) handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; } @@ -1066,6 +1063,7 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s }else{ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_pError=pErr; + shutdownSocket(); } return status; } @@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){ OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard lock(m_dcMutex); sendSync(ack_msg); - DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;) } void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ @@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ uint64_t coordId = this->getNextCoordinationId(); OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); sendSync(cancel_msg); - DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;) } void DrillClientImpl::shutdownSocket(){ @@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){ boost::system::error_code ignorederr; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); m_bIsConnected=false; - DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;) } // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this @@ -1236,7 +1234,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err) { //ctx; // unused, we already have the this pointer - DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;) //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. if(this->m_bCancel){ if(b!=NULL) delete b; @@ -1247,8 +1245,8 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, { if(b!=NULL){ #ifdef DEBUG - DRILL_LOG(LOG_DEBUG)<getQueryResult()->query_id()) - << "Query result listener saved result to queue." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<getQueryResult()->query_id()) + << "Query result listener saved result to queue." << std::endl;) #endif boost::lock_guard cvLock(this->m_cvMutex); this->m_recordBatches.push(b); @@ -1267,7 +1265,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){ boost::unique_lock cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; - DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { this->m_cv.wait(cvLock); } @@ -1281,14 +1279,14 @@ RecordBatch* DrillClientQueryResult::getNext() { boost::unique_lock cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending){ - DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;) if(!m_recordBatches.empty()){ - DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;) } return NULL; } - DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){ this->m_cv.wait(cvLock); } @@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){ m_columnDefs->clear(); } if(this->m_pQueryId!=NULL){ - DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;) } //Tell the parent to remove this from its lists m_pClient->clearMapEntries(this); @@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){ if(!m_recordBatches.empty()){ // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. - DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;) RecordBatch* pR=NULL; while(!m_recordBatches.empty()){ pR=m_recordBatches.front(); @@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){ } } + +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ + connectionStatus_t stat = CONN_SUCCESS; + std::string pathToDrill, protocol, hostPortStr; + std::string host; + std::string port; + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + if(!strcmp(protocol.c_str(), "zk")){ + // Get a list of drillbits + ZookeeperImpl zook; + std::vector drillbits; + int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + if(!err){ + Utils::shuffle(drillbits); + // The original shuffled order is maintained if we shuffle first and then add any missing elements + Utils::add(m_drillbits, drillbits); + exec::DrillbitEndpoint e; + size_t nextIndex=0; + { + boost::lock_guard cLock(m_cMutex); + m_lastConnection++; + nextIndex = (m_lastConnection)%(getDrillbitCount()); + } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" + << "(" << (void*)this << ")" + << ": Current counter is: " + << m_lastConnection << std::endl;) + err=zook.getEndPoint(m_drillbits, nextIndex, e); + if(!err){ + host=boost::lexical_cast(e.address()); + port=boost::lexical_cast(e.user_port()); + } + } + if(err){ + return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); + } + zook.close(); + m_bIsDirectConnection=false; + }else if(!strcmp(protocol.c_str(), "local")){ + char tempStr[MAX_CONNECT_STR+1]; + strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; + host=strtok(tempStr, ":"); + port=strtok(NULL, ""); + m_bIsDirectConnection=true; + }else{ + return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); + } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;) + DrillClientImpl* pDrillClientImpl = new DrillClientImpl(); + stat = pDrillClientImpl->connect(host.c_str(), port.c_str()); + if(stat == CONN_SUCCESS){ + boost::lock_guard lock(m_poolMutex); + m_clientConnections.push_back(pDrillClientImpl); + }else{ + DrillClientError* pErr = pDrillClientImpl->getError(); + handleConnError((connectionStatus_t)pErr->status, pErr->msg); + delete pDrillClientImpl; + } + return stat; +} + +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){ + // Assume there is one valid connection to at least one drillbit + connectionStatus_t stat=CONN_FAILURE; + // Keep a copy of the user properties + if(props!=NULL){ + m_pUserProperties = new DrillUserProperties; + for(size_t i=0; isize(); i++){ + m_pUserProperties->setProperty( + props->keyAt(i), + props->valueAt(i) + ); + } + } + DrillClientImpl* pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) + stat=pDrillClientImpl->validateHandshake(m_pUserProperties); + } + else{ + stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); + } + return stat; +} + +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){ + DrillClientQueryResult* pDrillClientQueryResult = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx); + m_queriesExecuted++; + } + return pDrillClientQueryResult; +} + +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){ + // Nothing to do. If this class ever keeps track of executing queries then it will need + // to implement this call to free any query specific resources the pool might have + // allocated + return; +} + +bool PooledDrillClientImpl::Active(){ + boost::lock_guard lock(m_poolMutex); + for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + if((*it)->Active()){ + return true; + } + } + return false; +} + +void PooledDrillClientImpl::Close() { + boost::lock_guard lock(m_poolMutex); + for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + (*it)->Close(); + delete *it; + } + m_clientConnections.clear(); + if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_lastConnection=-1; + m_queriesExecuted=0; +} + +DrillClientError* PooledDrillClientImpl::getError(){ + std::string errMsg; + std::string nl=""; + uint32_t stat; + boost::lock_guard lock(m_poolMutex); + for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + if((*it)->getError() != NULL){ + errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg; + stat=(*it)->getError()->status; + } + } + if(errMsg.length()>0){ + if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } + m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg); + } + return m_pError; +} + +//Waits as long as any one drillbit connection has results pending +void PooledDrillClientImpl::waitForResults(){ + boost::lock_guard lock(m_poolMutex); + for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + (*it)->waitForResults(); + } + return; +} + +connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ + DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;) + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_pError=pErr; + return status; +} + +DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ + DrillClientImpl* pDrillClientImpl = NULL; + while(pDrillClientImpl==NULL){ + if(m_queriesExecuted == 0){ + // First query ever sent can use the connection already established to authenticate the user + boost::lock_guard lock(m_poolMutex); + pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed + }else if(m_clientConnections.size() == m_maxConcurrentConnections){ + // Pool is full. Use one of the already established connections + boost::lock_guard lock(m_poolMutex); + pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections]; + if(!pDrillClientImpl->Active()){ + Utils::eraseRemove(m_clientConnections, pDrillClientImpl); + pDrillClientImpl=NULL; + } + }else{ + int tries=0; + connectionStatus_t ret=CONN_SUCCESS; + while(pDrillClientImpl==NULL && tries++ < 3){ + if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){ + boost::lock_guard lock(m_poolMutex); + pDrillClientImpl=m_clientConnections.back(); + ret=pDrillClientImpl->validateHandshake(m_pUserProperties); + if(ret!=CONN_SUCCESS){ + delete pDrillClientImpl; pDrillClientImpl=NULL; + m_clientConnections.erase(m_clientConnections.end()); + } + } + } // try a few times + if(ret!=CONN_SUCCESS){ + break; + } + } // need a new connection + }// while + + if(pDrillClientImpl==NULL){ + connectionStatus_t status = CONN_NOTCONNECTED; + handleConnError(status, getMessage(status)); + } + return pDrillClientImpl; +} + char ZookeeperImpl::s_drillRoot[]="/drill/"; char ZookeeperImpl::s_defaultCluster[]="drillbits1"; @@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){ return ZOO_LOG_LEVEL_ERROR; } +int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector& drillbits){ + uint32_t waitTime=30000; // 10 seconds + zoo_set_debug_level(getZkLogLevel()); + zoo_deterministic_conn_order(1); // enable deterministic order + struct String_vector* pDrillbits=NULL; + m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); + if(!m_zh) { + m_err = getMessage(ERR_CONN_ZKFAIL); + zookeeper_close(m_zh); + return -1; + }else{ + m_err=""; + //Wait for the completion handler to signal successful connection + boost::unique_lock bufferLock(this->m_cvMutex); + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); + while(this->m_bConnecting) { + if(!this->m_cv.timed_wait(bufferLock, timeout)){ + m_err = getMessage(ERR_CONN_ZKTIMOUT); + zookeeper_close(m_zh); + return -1; + } + } + } + if(m_state!=ZOO_CONNECTED_STATE){ + zookeeper_close(m_zh); + return -1; + } + int rc = ZOK; + if(pathToDrill==NULL || strlen(pathToDrill)==0){ + m_rootDir=s_drillRoot; + m_rootDir += s_defaultCluster; + }else{ + m_rootDir=pathToDrill; + } + + pDrillbits = new String_vector; + rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits); + if(rc!=ZOK){ + delete pDrillbits; + m_err=getMessage(ERR_CONN_ZKERR, rc); + zookeeper_close(m_zh); + return -1; + } + if(pDrillbits && pDrillbits->count > 0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" + << connectStr << "/" << pathToDrill + << ")." <count; i++){ + drillbits.push_back(pDrillbits->data[i]); + } + for(int i=0; i& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){ + int rc = ZOK; + exec::DrillServiceInstance drillServiceInstance; + if( drillbits.size() >0){ + // pick the drillbit at 'index' + const char * bit=drillbits[index].c_str(); + std::string s; + s=m_rootDir + std::string("/") + bit; + int buffer_len=MAX_CONNECT_STR; + char buffer[MAX_CONNECT_STR+1]; + struct Stat stat; + buffer[MAX_CONNECT_STR]=0; + rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKDBITERR, rc); + zookeeper_close(m_zh); + return -1; + } + exec::DrillServiceInstance drillServiceInstance; + drillServiceInstance.ParseFromArray(buffer, buffer_len); + endpoint=drillServiceInstance.endpoint(); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <. Selected " << drillServiceInstance.DebugString() << std::endl;) + }else{ + + m_err=getMessage(ERR_CONN_ZKNODBIT); + zookeeper_close(m_zh); + return -1; + } + return 0; +} + +// Deprecated int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){ uint32_t waitTime=30000; // 10 seconds zoo_set_debug_level(getZkLogLevel()); @@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat // signal the cond var { if (state == ZOO_CONNECTED_STATE){ - DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) } boost::lock_guard bufferLock(self->m_cvMutex); self->m_bConnecting=false; @@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat void ZookeeperImpl:: debugPrint(){ if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){ - DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;) } } diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index f19a015e5..06f37e059 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -34,13 +34,18 @@ #include #include #include -#include -#include -#ifdef _WIN32 + +#if defined _WIN32 || defined _WIN64 #include +//Windows header files redefine 'random' +#ifdef random +#undef random +#endif #else #include #endif +#include +#include #include "drill/drillClient.hpp" #include "rpcEncoder.hpp" @@ -58,12 +63,50 @@ class RecordBatch; class RpcEncoder; class RpcDecoder; +/* + * Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl + * */ +class DrillClientImplBase{ + public: + DrillClientImplBase(){ + } + + virtual ~DrillClientImplBase(){ + } + + //Connect via Zookeeper or directly. + //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool. + virtual connectionStatus_t connect(const char* connStr)=0; + + // Test whether the client is active. Returns true if any one of the underlying connections is active + virtual bool Active()=0; + + // Closes all open connections. + virtual void Close()=0; + + // Returns the last error encountered by any of the underlying executing queries or connections + virtual DrillClientError* getError()=0; + + // Submits a query to a drillbit. + virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0; + + //Waits as a connection has results pending + virtual void waitForResults()=0; + + //Validates handshake at connect time. + virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0; + + virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0; + +}; + class DrillClientQueryResult{ friend class DrillClientImpl; public: - DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId): + DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query): m_pClient(pClient), m_coordinationId(coordId), + m_query(query), m_numBatches(0), m_columnDefs(new std::vector), m_bIsQueryPending(true), @@ -116,6 +159,7 @@ class DrillClientQueryResult{ bool isCancelled(){return this->m_bCancel;}; bool hasSchemaChanged(){return this->m_bHasSchemaChanged;}; int32_t getCoordinationId(){ return this->m_coordinationId;} + const std::string& getQuery(){ return this->m_query;} void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;} void* getListenerContext() {return this->m_pListenerCtx;} @@ -147,6 +191,8 @@ class DrillClientQueryResult{ DrillClientImpl* m_pClient; int32_t m_coordinationId; + const std::string& m_query; + size_t m_numBatches; // number of record batches received so far // Vector of Buffers holding data returned by the server @@ -189,7 +235,7 @@ class DrillClientQueryResult{ void * m_pListenerCtx; }; -class DrillClientImpl{ +class DrillClientImpl : public DrillClientImplBase{ public: DrillClientImpl(): m_coordinationId(1), @@ -256,9 +302,14 @@ class DrillClientImpl{ DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); void waitForResults(); connectionStatus_t validateHandshake(DrillUserProperties* props); + void freeQueryResources(DrillClientQueryResult* pQryResult){ + // Doesn't need to do anything + return; + }; private: friend class DrillClientQueryResult; + friend class PooledDrillClientImpl; struct compareQueryId{ bool operator()(const exec::shared::QueryId* q1, const exec::shared::QueryId* q2) const { @@ -275,7 +326,6 @@ class DrillClientImpl{ void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out. int32_t getNextCoordinationId(){ return ++m_coordinationId; }; - void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr); // send synchronous messages //connectionStatus_t recvSync(InBoundRpcMessage& msg); connectionStatus_t sendSync(OutBoundRpcMessage& msg); @@ -331,6 +381,9 @@ class DrillClientImpl{ std::string m_handshakeErrorMsg; bool m_bIsConnected; + std::string m_connectStr; + + // // number of outstanding read requests. // handleRead will keep asking for more results as long as this number is not zero. size_t m_pendingRequests; @@ -356,6 +409,8 @@ class DrillClientImpl{ boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages + std::string m_connectedHost; // The hostname and port the socket is connected to. + //for synchronous messages, like validate handshake ByteBuf_t m_rbuf; // buffer for receiving synchronous messages DataBuf m_wbuf; // buffer for sending synchronous message @@ -372,12 +427,106 @@ class DrillClientImpl{ // Condition variable to signal completion of all queries. boost::condition_variable m_cv; + bool m_bIsDirectConnection; }; inline bool DrillClientImpl::Active() { return this->m_bIsConnected;; } + +/* * + * Provides the same public interface as a DrillClientImpl but holds a pool of DrillClientImpls. + * Every submitQuery uses a different DrillClientImpl to distribute the load. + * DrillClient can use this class instead of DrillClientImpl to get better load balancing. + * */ +class PooledDrillClientImpl : public DrillClientImplBase{ + public: + PooledDrillClientImpl(){ + m_bIsDirectConnection=false; + m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS; + char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV); + if(maxConn!=NULL){ + m_maxConcurrentConnections=atoi(maxConn); + } + m_lastConnection=-1; + m_pError=NULL; + m_queriesExecuted=0; + m_pUserProperties=NULL; + } + + ~PooledDrillClientImpl(){ + for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + delete *it; + } + m_clientConnections.clear(); + if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + } + + //Connect via Zookeeper or directly. + //Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool. + connectionStatus_t connect(const char* connStr); + + // Test whether the client is active. Returns true if any one of the underlying connections is active + bool Active(); + + // Closes all open connections. + void Close() ; + + // Returns the last error encountered by any of the underlying executing queries or connections + DrillClientError* getError(); + + // Submits a query to a drillbit. If more than one query is to be sent, we may choose a + // a different drillbit in the pool. No more than m_maxConcurrentConnections will be allowed. + // Connections once added to the pool will be removed only when the DrillClient is closed. + DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); + + //Waits as long as any one drillbit connection has results pending + void waitForResults(); + + //Validates handshake only against the first drillbit connected to. + connectionStatus_t validateHandshake(DrillUserProperties* props); + + void freeQueryResources(DrillClientQueryResult* pQryResult); + + int getDrillbitCount(){ return m_drillbits.size();}; + + private: + + std::string m_connectStr; + std::string m_lastQuery; + + // A list of all the current client connections. We choose a new one for every query. + // When picking a drillClientImpl to use, we see how many queries each drillClientImpl + // is currently executing. If none, + std::vector m_clientConnections; + boost::mutex m_poolMutex; // protect access to the vector + + //ZookeeperImpl zook; + + // Use this to decide which drillbit to select next from the list of drillbits. + size_t m_lastConnection; + boost::mutex m_cMutex; + + // Number of queries executed so far. Can be used to select a new Drillbit from the pool. + size_t m_queriesExecuted; + + size_t m_maxConcurrentConnections; + + bool m_bIsDirectConnection; + + DrillClientError* m_pError; + + connectionStatus_t handleConnError(connectionStatus_t status, std::string msg); + // get a connection from the pool or create a new one. Return NULL if none is found + DrillClientImpl* getOneConnection(); + + std::vector m_drillbits; + + DrillUserProperties* m_pUserProperties;//Keep a copy of user properties +}; + class ZookeeperImpl{ public: ZookeeperImpl(); @@ -385,12 +534,17 @@ class ZookeeperImpl{ static ZooLogLevel getZkLogLevel(); // comma separated host:port pairs, each corresponding to a zk // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 - int connectToZookeeper(const char* connectStr, const char* pathToDrill); + DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill); void close(); static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context); void debugPrint(); std::string& getError(){return m_err;} const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();} + // return unshuffled list of drillbits + int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector& drillbits); + // picks the index drillbit and returns the corresponding endpoint object + int getEndPoint(std::vector& drillbits, size_t index, exec::DrillbitEndpoint& endpoint); + private: static char s_drillRoot[]; @@ -407,6 +561,7 @@ class ZookeeperImpl{ boost::condition_variable m_cv; bool m_bConnecting; exec::DrillServiceInstance m_drillServiceInstance; + std::string m_rootDir; }; } // namespace Drill diff --git a/contrib/native/client/src/clientlib/env.h.in b/contrib/native/client/src/clientlib/env.h.in new file mode 100644 index 000000000..a32f1521d --- /dev/null +++ b/contrib/native/client/src/clientlib/env.h.in @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENV_H +#define ENV_H + +#define GIT_COMMIT_PROP @GIT_COMMIT_PROP@ + +#endif + + diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp index 11661f8c4..47d165f69 100644 --- a/contrib/native/client/src/clientlib/errmsgs.cpp +++ b/contrib/native/client/src/clientlib/errmsgs.cpp @@ -47,6 +47,8 @@ static Drill::ErrorMessages errorMessages[]={ {ERR_CONN_AUTHFAIL, ERR_CATEGORY_CONN, 0, "User authentication failed (please check the username and password)." "[Server message was: (%s) %s]"}, {ERR_CONN_UNKNOWN_ERR, ERR_CATEGORY_CONN, 0, "Handshake Failed due to an error on the server. [Server message was: (%s) %s]"}, + {ERR_CONN_NOCONN, ERR_CATEGORY_CONN, 0, "There is no connection to the server."}, + {ERR_CONN_ALREADYCONN, ERR_CATEGORY_CONN, 0, "This client is already connected to a server."}, {ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."}, {ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"}, {ERR_QRY_INVREADLEN, ERR_CATEGORY_QRY, 0, "Internal Error: Received a message with an invalid read length."}, diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp index b82efaaa2..cfb56a6b0 100644 --- a/contrib/native/client/src/clientlib/errmsgs.hpp +++ b/contrib/native/client/src/clientlib/errmsgs.hpp @@ -49,7 +49,9 @@ namespace Drill{ #define ERR_CONN_BAD_RPC_VER DRILL_ERR_START+16 #define ERR_CONN_AUTHFAIL DRILL_ERR_START+17 #define ERR_CONN_UNKNOWN_ERR DRILL_ERR_START+18 -#define ERR_CONN_MAX DRILL_ERR_START+18 +#define ERR_CONN_NOCONN DRILL_ERR_START+19 +#define ERR_CONN_ALREADYCONN DRILL_ERR_START+20 +#define ERR_CONN_MAX DRILL_ERR_START+20 #define ERR_QRY_OUTOFMEM ERR_CONN_MAX+1 #define ERR_QRY_COMMERR ERR_CONN_MAX+2 diff --git a/contrib/native/client/src/clientlib/logger.cpp b/contrib/native/client/src/clientlib/logger.cpp index 5411d01eb..c498ee14b 100644 --- a/contrib/native/client/src/clientlib/logger.cpp +++ b/contrib/native/client/src/clientlib/logger.cpp @@ -1,28 +1,40 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include #include "boost/date_time/posix_time/posix_time.hpp" #include "boost/thread.hpp" - +#include "env.h" +#include "utils.hpp" #include "logger.hpp" namespace Drill{ +/* +* Creates a single instance of the logger the first time this is called +*/ +/* static */ boost::mutex g_logMutex; +Logger& getLogger() { + boost::lock_guard logLock(g_logMutex); + static Logger* logger = new Logger(); + return *logger; +} + std::string getTime(){ return to_simple_string(boost::posix_time::second_clock::local_time()); } @@ -31,37 +43,77 @@ std::string getTid(){ return boost::lexical_cast(boost::this_thread::get_id()); } -logLevel_t Logger::s_level=LOG_ERROR; -std::ostream* Logger::s_pOutStream=NULL; -std::ofstream* Logger::s_pOutFileStream=NULL; -char* Logger::s_filepath=NULL; - void Logger::init(const char* path){ - if(path!=NULL) { - s_pOutFileStream = new std::ofstream; - s_pOutFileStream->open(path, std::ofstream::out); - if(!s_pOutFileStream->is_open()){ - std::cerr << "Logfile could not be opened. Logging to stdout" << std::endl; + static bool initialized = false; + boost::lock_guard logLock(m_logMutex); + if (!initialized && path != NULL) { + std::string fullname = path; + size_t lastindex = fullname.find_last_of("."); + std::string filename; + if (lastindex != std::string::npos){ + filename = fullname.substr(0, lastindex) + + "-" + + Utils::to_string(Utils::s_randomNumber()) + + fullname.substr(lastindex, fullname.length()); } + else{ + filename = fullname.substr(0, fullname.length()) + + "-" + + Utils::to_string(Utils::s_randomNumber()) + + ".log"; + } + //m_filepath=path; + m_filepath = filename.c_str(); + m_pOutFileStream = new std::ofstream; + m_pOutFileStream->open(m_filepath.c_str(), std::ios_base::out | std::ios_base::app); + if (!m_pOutFileStream->is_open()){ + std::cerr << "Logfile ( " << m_filepath << ") could not be opened. Logging to stdout" << std::endl; + m_filepath.erase(); + delete m_pOutFileStream; m_pOutFileStream=NULL; + } + initialized = true; + + m_pOutStream = (m_pOutFileStream != NULL) ? m_pOutFileStream : &std::cout; +#if defined _WIN32 || defined _WIN64 + + TCHAR szFile[MAX_PATH]; + GetModuleFileName(NULL, szFile, MAX_PATH); +#endif + *m_pOutStream + << "Drill Client Library" << std::endl + << "Build info:" << GIT_COMMIT_PROP << std::endl + +#if defined _WIN32 || defined _WIN64 + << "Loaded by process: " << szFile << std::endl + << "Current process id is: " << ::GetCurrentProcessId() << std::endl +#else + << "Current process id is: " << getpid() << std::endl +#endif + << "Initialized Logging to file (" << ((path!=NULL)?path:"std::out") << "). " + << std::endl; } - s_pOutStream=(s_pOutFileStream!=NULL && s_pOutFileStream->is_open())?s_pOutFileStream:&std::cout; } void Logger::close(){ - if(s_pOutFileStream !=NULL){ - if(s_pOutFileStream->is_open()){ - s_pOutFileStream->close(); + //boost::lock_guard logLock(Drill::Logger::m_logMutex); + boost::lock_guard logLock(m_logMutex); + if (m_pOutFileStream != NULL){ + if (m_pOutFileStream->is_open()){ + m_pOutFileStream->close(); } - delete s_pOutFileStream; s_pOutFileStream=NULL; + delete m_pOutFileStream; m_pOutFileStream = NULL; + m_pOutStream = &std::cout; // set it back to std::cout in case someone tries to log even after close } } +// The log call itself cannot be thread safe. Use the DRILL_MT_LOG macro to make +// this thread safe std::ostream& Logger::log(logLevel_t level){ - *s_pOutStream << getTime(); - *s_pOutStream << " : "< #include +#include #include "drill/common.hpp" namespace Drill{ class Logger{ public: - Logger(){} + Logger(){ + m_level = LOG_ERROR; + m_pOutFileStream = NULL; + m_pOutStream = &std::cout; + } ~Logger(){ } - static void init(const char* path); - static void close(); - static std::ostream& log(logLevel_t level); - static std::string levelAsString(logLevel_t level) { + void init(const char* path); + void close(); + std::ostream& log(logLevel_t level); + std::string levelAsString(logLevel_t level) { static const char* const levelNames[] = { - "TRACE", - "DEBUG", - "INFO", + "TRACE ", + "DEBUG ", + "INFO ", "WARNING", - "ERROR", - "FATAL" + "ERROR ", + "FATAL " }; - return levelNames[level]; + return levelNames[level>=LOG_TRACE && level<=LOG_FATAL?level:LOG_ERROR]; } // The logging level - static logLevel_t s_level; - static std::ostream* s_pOutStream; + logLevel_t m_level; + std::ostream* m_pOutStream; + boost::mutex m_logMutex; private: - //static std::ostream* s_pOutStream; - static std::ofstream* s_pOutFileStream; - static char* s_filepath; + std::ofstream* m_pOutFileStream; + std::string m_filepath; }; // Logger -std::string getTime(); -std::string getTid(); + Logger& getLogger(); + std::string getTime(); + std::string getTid(); + +#define DRILL_MT_LOG(LOG) \ + { \ + boost::lock_guard logLock(getLogger().m_logMutex); \ + LOG \ + } #define DRILL_LOG(level) \ - if (Logger::s_pOutStream==NULL || level < Drill::Logger::s_level); \ - else Drill::Logger::log(level) \ + if (getLogger().m_pOutStream==NULL || level < getLogger().m_level); \ + else getLogger().log(level) \ + } // namespace Drill diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp index f1f03a1e3..1e6a8774e 100644 --- a/contrib/native/client/src/clientlib/utils.cpp +++ b/contrib/native/client/src/clientlib/utils.cpp @@ -1,68 +1,107 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include #include #include "utils.hpp" +#include "logger.hpp" #include "drill/common.hpp" namespace Drill{ + +boost::random::random_device Utils::s_RNG; +boost::random::mt19937 Utils::s_URNG(s_RNG()); +boost::uniform_int<> Utils::s_uniformDist(0,std::numeric_limits::max()-1); +boost::variate_generator > Utils::s_randomNumber(s_URNG, s_uniformDist); + boost::mutex AllocatedBuffer::s_memCVMutex; boost::condition_variable AllocatedBuffer::s_memCV; -size_t AllocatedBuffer::s_allocatedMem=0; -bool AllocatedBuffer::s_isBufferLimitReached=false; +size_t AllocatedBuffer::s_allocatedMem = 0; +bool AllocatedBuffer::s_isBufferLimitReached = false; +boost::mutex s_utilMutex; ByteBuf_t Utils::allocateBuffer(size_t len){ boost::lock_guard memLock(AllocatedBuffer::s_memCVMutex); - AllocatedBuffer::s_allocatedMem+=len; + AllocatedBuffer::s_allocatedMem += len; //http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc - ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); - size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE; - if(b!=NULL && AllocatedBuffer::s_allocatedMem >= safeSize){ - AllocatedBuffer::s_isBufferLimitReached=true; + ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); + size_t safeSize = DrillClientConfig::getBufferLimit() - MEM_CHUNK_SIZE; + if (b != NULL && AllocatedBuffer::s_allocatedMem >= safeSize){ + AllocatedBuffer::s_isBufferLimitReached = true; } return b; } -void Utils::freeBuffer(ByteBuf_t b, size_t len){ +void Utils::freeBuffer(ByteBuf_t b, size_t len){ boost::lock_guard memLock(AllocatedBuffer::s_memCVMutex); - AllocatedBuffer::s_allocatedMem-=len; - free(b); - size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE; - if(b!=NULL && AllocatedBuffer::s_allocatedMem < safeSize){ - AllocatedBuffer::s_isBufferLimitReached=false; + AllocatedBuffer::s_allocatedMem -= len; + free(b); + size_t safeSize = DrillClientConfig::getBufferLimit() - MEM_CHUNK_SIZE; + if (b != NULL && AllocatedBuffer::s_allocatedMem < safeSize){ + AllocatedBuffer::s_isBufferLimitReached = false; //signal any waiting threads AllocatedBuffer::s_memCV.notify_one(); } } +void Utils::parseConnectStr(const char* connectStr, + std::string& pathToDrill, + std::string& protocol, + std::string& hostPortStr){ + boost::lock_guard memLock(s_utilMutex); + char u[MAX_CONNECT_STR + 1]; + strncpy(u, connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR] = 0; + char* z = strtok(u, "="); + char* c = strtok(NULL, "/"); + char* p = strtok(NULL, ""); + + if (p != NULL) pathToDrill = std::string("/") + p; + protocol = z; hostPortStr = c; + return; +} + +void Utils::shuffle(std::vector& vector){ + std::random_shuffle(vector.begin(), vector.end(), Utils::s_randomNumber); + return; +} + +void Utils::add(std::vector& vector1, std::vector& vector2){ + std::vector::iterator it; + for (it = vector2.begin(); it != vector2.end(); it++) { + std::vector::iterator it2 = std::find(vector1.begin(), vector1.end(), *it); + if (it2 == vector1.end()){ + vector1.push_back(*it); + } + } +} AllocatedBuffer::AllocatedBuffer(size_t l){ - m_pBuffer=NULL; - m_pBuffer=Utils::allocateBuffer(l); - m_bufSize=m_pBuffer!=NULL?l:0; + m_pBuffer = NULL; + m_pBuffer = Utils::allocateBuffer(l); + m_bufSize = m_pBuffer != NULL ? l : 0; } AllocatedBuffer::~AllocatedBuffer(){ - Utils::freeBuffer(m_pBuffer, m_bufSize); - m_pBuffer=NULL; - m_bufSize=0; + Utils::freeBuffer(m_pBuffer, m_bufSize); + m_pBuffer = NULL; + m_bufSize = 0; } } // namespace diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp index 0f26ad69c..36fb91f81 100644 --- a/contrib/native/client/src/clientlib/utils.hpp +++ b/contrib/native/client/src/clientlib/utils.hpp @@ -1,20 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ #ifndef __UTILS_H #define __UTILS_H @@ -23,6 +23,19 @@ #include #include #include +#include + +#if defined _WIN32 || defined _WIN64 + //Windows header files redefine 'random' + #ifdef random + #undef random + #endif +#endif +#include +#include // for mt19937 +#include +#include +#include #include #include "drill/common.hpp" @@ -33,26 +46,57 @@ namespace Drill{ // Wrapper Class to keep track of allocated memory class AllocatedBuffer{ public: - AllocatedBuffer(size_t l); - ~AllocatedBuffer(); - - ByteBuf_t m_pBuffer; - size_t m_bufSize; - - // keep track of allocated memory. The client lib blocks - // if we have allocated up to a limit (defined in drillClientConfig). - static boost::mutex s_memCVMutex; - static boost::condition_variable s_memCV; - static size_t s_allocatedMem; - static bool s_isBufferLimitReached; + AllocatedBuffer(size_t l); + ~AllocatedBuffer(); + + ByteBuf_t m_pBuffer; + size_t m_bufSize; + + // keep track of allocated memory. The client lib blocks + // if we have allocated up to a limit (defined in drillClientConfig). + static boost::mutex s_memCVMutex; + static boost::condition_variable s_memCV; + static size_t s_allocatedMem; + static bool s_isBufferLimitReached; + static boost::mutex s_utilMutex; // for provideing safety around strtok and other non-reentrant functions }; class Utils{ public: + static boost::random::random_device s_RNG; //Truly random (expensive and device dependent) + static boost::random::mt19937 s_URNG; //Pseudo random with a period of ( 2^19937 - 1 ) + static boost::uniform_int<> s_uniformDist; // Produces a uniform distribution + static boost::variate_generator > s_randomNumber; // a random number generator also usable by shuffle + //allocate memory for Record Batches static ByteBuf_t allocateBuffer(size_t len); static void freeBuffer(ByteBuf_t b, size_t len); + static void parseConnectStr(const char* connectStr, + std::string& pathToDrill, + std::string& protocol, + std::string& hostPortStr); + + // useful vector methods/idioms + + // performs a random shuffle on a string vector + static void shuffle(std::vector& vector); + + // adds the contents of vector2 to vector1 + static void add(std::vector& vector1, std::vector& vector2); + + // removes the element from the vector + template static void eraseRemove(std::vector& vector, T elem){ + vector.erase(std::remove(vector.begin(), vector.end(), elem), vector.end()); + } + + // Provide a to_string that works with older C++ compilers + template static std::string to_string(T val) { + std::stringstream stream; + stream << val; + return stream.str(); + } + }; // Utils } // namespace Drill diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index bb8e2b4a3..a617dc71f 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -45,6 +45,11 @@ #define MEM_CHUNK_SIZE 64*1024; // 64K #define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB +#define MAX_BATCH_SIZE 65536; // see RecordBatch.java +#define ENABLE_CONNECTION_POOL_ENV "DRILL_ENABLE_CONN_POOL" +#define DEFAULT_MAX_CONCURRENT_CONNECTIONS 10 +#define MAX_CONCURRENT_CONNECTIONS_ENV "DRILL_MAX_CONN" + #ifdef _DEBUG #define EXTRA_DEBUGGING #define CODER_DEBUGGING @@ -110,7 +115,9 @@ typedef enum{ CONN_HOSTNAME_RESOLUTION_ERROR=6, CONN_AUTH_FAILED=7, CONN_BAD_RPC_VER=8, - CONN_DEAD=9 + CONN_DEAD=9, + CONN_NOTCONNECTED=10, + CONN_ALREADYCONNECTED=11 } connectionStatus_t; typedef enum{ diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp index 4568ca1fc..a74f4bdc7 100644 --- a/contrib/native/client/src/include/drill/drillClient.hpp +++ b/contrib/native/client/src/include/drill/drillClient.hpp @@ -53,6 +53,7 @@ namespace exec{ namespace Drill{ //struct UserServerEndPoint; +class DrillClientImplBase; class DrillClientImpl; class DrillClientQueryResult; class FieldMetadata; @@ -339,6 +340,10 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ */ std::string& getError(); + /* + * Returns the error message associated with the query handle + */ + const std::string& getError(QueryHandle_t handle); /* * Applications using the async query submit method can register a listener for schema changes * @@ -369,7 +374,7 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ static DrillClientInitializer s_init; static DrillClientConfig s_config; - DrillClientImpl * m_pImpl; + DrillClientImplBase * m_pImpl; }; } // namespace Drill -- cgit v1.2.3