From 0553798c18aae6b3dfc14227e0cef5f2baea11fe Mon Sep 17 00:00:00 2001 From: Xiao Meng Date: Tue, 10 Feb 2015 17:59:23 -0800 Subject: DRILL-1197: C++ Client. Differentiate socket/handshake/query timeout for deadline timer. It also - returns more detailed connection status for validate handshake. - adds timeout options for query submitter. --- .../native/client/src/clientlib/drillClient.cpp | 31 +++++++- .../client/src/clientlib/drillClientImpl.cpp | 84 ++++++++++++---------- .../client/src/clientlib/drillClientImpl.hpp | 6 +- contrib/native/client/src/clientlib/errmsgs.cpp | 1 + contrib/native/client/src/clientlib/errmsgs.hpp | 3 +- contrib/native/client/src/include/drill/common.hpp | 6 +- .../client/src/include/drill/drillClient.hpp | 24 ++++++- 7 files changed, 110 insertions(+), 45 deletions(-) (limited to 'contrib/native/client/src') diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index 02bc1a47c..878dad4ba 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -43,9 +43,12 @@ DrillClientInitializer::~DrillClientInitializer(){ google::protobuf::ShutdownProtobufLibrary(); } +// Initialize static member of DrillClientConfig logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR; uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE; -int32_t DrillClientConfig::s_socketTimeout=180; +int32_t DrillClientConfig::s_socketTimeout=0; +int32_t DrillClientConfig::s_handshakeTimeout=5; +int32_t DrillClientConfig::s_queryTimeout=180; boost::mutex DrillClientConfig::s_mutex; DrillClientConfig::DrillClientConfig(){ @@ -82,11 +85,35 @@ void DrillClientConfig::setSocketTimeout(int32_t t){ s_socketTimeout=t; } +void DrillClientConfig::setHandshakeTimeout(int32_t t){ + if (t > 0) { + boost::lock_guard configLock(DrillClientConfig::s_mutex); + s_handshakeTimeout = t; + } +} + +void DrillClientConfig::setQueryTimeout(int32_t t){ + if (t>0){ + boost::lock_guard configLock(DrillClientConfig::s_mutex); + s_queryTimeout=t; + } +} + int32_t DrillClientConfig::getSocketTimeout(){ boost::lock_guard configLock(DrillClientConfig::s_mutex); return s_socketTimeout; } +int32_t DrillClientConfig::getHandshakeTimeout(){ + boost::lock_guard configLock(DrillClientConfig::s_mutex); + return s_handshakeTimeout; +} + +int32_t DrillClientConfig::getQueryTimeout(){ + boost::lock_guard configLock(DrillClientConfig::s_mutex); + return s_queryTimeout; +} + logLevel_t DrillClientConfig::getLogLevel(){ boost::lock_guard configLock(DrillClientConfig::s_mutex); return s_logLevel; @@ -263,7 +290,7 @@ connectionStatus_t DrillClient::connect(const char* connectStr, const char* defa ret=this->m_pImpl->connect(connectStr); if(ret==CONN_SUCCESS) - ret=this->m_pImpl->validateHandShake(defaultSchema)?CONN_SUCCESS:CONN_HANDSHAKE_FAILED; + ret=this->m_pImpl->validateHandShake(defaultSchema); return ret; } diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index c832a7916..c0382ba0a 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -169,13 +169,17 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE); } - m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout())); - m_deadlineTimer.async_wait(boost::bind( - &DrillClientImpl::handleHShakeReadTimeout, - this, - boost::asio::placeholders::error - )); - DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer." << std::endl; + m_io_service.reset(); + if (DrillClientConfig::getHandshakeTimeout() > 0){ + m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHandshakeTimeout())); + m_deadlineTimer.async_wait(boost::bind( + &DrillClientImpl::handleHShakeReadTimeout, + this, + boost::asio::placeholders::error + )); + DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " + << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl; + } async_read( this->m_socket, @@ -201,7 +205,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, boost::system::error_code error=err; // cancel the timer m_deadlineTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; + DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; if(!error){ InBoundRpcMessage msg; uint32_t length = 0; @@ -222,7 +226,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, } DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg); }else{ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n"; handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); + return; } exec::user::BitToUserHandshake b2u; b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); @@ -243,21 +249,22 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer. - if(!err){ + if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); - DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl; + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n"; + handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT)); + m_io_service.stop(); boost::system::error_code ignorederr; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr); - m_socket.close(); + m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); } } return; } -bool DrillClientImpl::validateHandShake(const char* defaultSchema){ +connectionStatus_t DrillClientImpl::validateHandShake(const char* defaultSchema){ DRILL_LOG(LOG_TRACE) << "validateHandShake\n"; @@ -282,19 +289,19 @@ bool DrillClientImpl::validateHandShake(const char* defaultSchema){ DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n"; } - recvHandshake(); - this->m_io_service.reset(); - if(this->m_pError!=NULL){ - return false; + connectionStatus_t ret = recvHandshake(); + if(ret!=CONN_SUCCESS){ + return ret; } if(m_handshakeVersion != u2b.rpc_version()) { DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected << " << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl; - handleConnError(CONN_HANDSHAKE_FAILED, + return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion)); - return false; } - return true; + // reset io_service after handshake is validated before running queries + m_io_service.reset(); + return CONN_SUCCESS; } @@ -365,14 +372,16 @@ void DrillClientImpl::getNextResult(){ } //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); - - m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout())); - m_deadlineTimer.async_wait(boost::bind( - &DrillClientImpl::handleReadTimeout, - this, - boost::asio::placeholders::error - )); - DRILL_LOG(LOG_TRACE) << "Started new async wait timer." << std::endl; + if (DrillClientConfig::getQueryTimeout() > 0){ + DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " + << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl; + m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout())); + m_deadlineTimer.async_wait(boost::bind( + &DrillClientImpl::handleReadTimeout, + this, + boost::asio::placeholders::error + )); + } async_read( this->m_socket, @@ -677,9 +686,9 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr case exec::shared::QueryResult_QueryState_COMPLETED: { //Not clean to call the handleTerminatedQryState method - //because it signals an error to the listener. + //because it signals an error to the listener. //The ODBC driver expects this though and the sync API - //handles this (luckily). + //handles this (luckily). ret=handleTerminatedQryState(ret, getMessage(ERR_QRY_COMPLETED), pDrillClientQueryResult); @@ -700,16 +709,17 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer. - if(!err){ + if(err != boost::asio::error::operation_aborted){ + // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. - handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_TIMOUT), NULL); + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n"; + handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL); // There is no longer an active deadline. The expiry is set to positive // infinity so that the timer never expires until a new deadline is set. // Note that at this time, the caller is not in a (async) wait for the timer. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); - DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl; // Cancel all pending async IOs. // The cancel call _MAY_ not work on all platforms. To be a little more reliable we need // to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?) @@ -725,11 +735,13 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, const boost::system::error_code& err, size_t bytes_transferred) { boost::system::error_code error=err; - // cancel the timer DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " << reinterpret_cast(_buf) << std::endl; - m_deadlineTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; + if(DrillClientConfig::getQueryTimeout() > 0){ + // Cancel the timeout if handleRead is called + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n"; + m_deadlineTimer.cancel(); + } if(!error){ InBoundRpcMessage msg; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index a5eeb7775..fdcf17882 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -211,7 +211,7 @@ class DrillClientImpl{ m_deadlineTimer.cancel(); m_io_service.stop(); boost::system::error_code ignorederr; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr); + m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); m_socket.close(); if(m_rbuf!=NULL){ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; @@ -235,7 +235,7 @@ class DrillClientImpl{ DrillClientError* getError(){ return m_pError;} DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); void waitForResults(); - bool validateHandShake(const char* defaultSchema); + connectionStatus_t validateHandShake(const char* defaultSchema); private: friend class DrillClientQueryResult; @@ -341,7 +341,7 @@ inline void DrillClientImpl::Close() { //TODO: cancel pending query if(this->m_bIsConnected){ boost::system::error_code ignorederr; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr); + m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); m_socket.close(); m_bIsConnected=false; } diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp index a5e72172f..fa7272151 100644 --- a/contrib/native/client/src/clientlib/errmsgs.cpp +++ b/contrib/native/client/src/clientlib/errmsgs.cpp @@ -40,6 +40,7 @@ static Drill::ErrorMessages errorMessages[]={ {ERR_CONN_ZKNODBIT, ERR_CATEGORY_CONN, 0, "No drillbit found with this Zookeeper."}, {ERR_CONN_ZKNOAUTH, ERR_CATEGORY_CONN, 0, "Authentication failed."}, {ERR_CONN_ZKEXP, ERR_CATEGORY_CONN, 0, "Session expired."}, + {ERR_CONN_HSHAKETIMOUT, ERR_CATEGORY_CONN, 0, "Handshake Timeout."}, {ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."}, {ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"}, {ERR_QRY_INVREADLEN, ERR_CATEGORY_QRY, 0, "Internal Error: Received a message with an invalid read length."}, diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp index 9a69f213c..22e544f08 100644 --- a/contrib/native/client/src/clientlib/errmsgs.hpp +++ b/contrib/native/client/src/clientlib/errmsgs.hpp @@ -45,7 +45,8 @@ namespace Drill{ #define ERR_CONN_ZKNODBIT DRILL_ERR_START+12 #define ERR_CONN_ZKNOAUTH DRILL_ERR_START+13 #define ERR_CONN_ZKEXP DRILL_ERR_START+14 -#define ERR_CONN_MAX DRILL_ERR_START+14 +#define ERR_CONN_HSHAKETIMOUT DRILL_ERR_START+15 +#define ERR_CONN_MAX DRILL_ERR_START+15 #define ERR_QRY_OUTOFMEM ERR_CONN_MAX+1 #define ERR_QRY_COMMERR ERR_CONN_MAX+2 diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index f83aae403..824d67062 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -78,7 +78,8 @@ typedef enum{ QRY_COMPLETED = 11, QRY_CANCELED = 12, QRY_FAILED = 13, - QRY_UNKNOWN_QUERY = 14 + QRY_UNKNOWN_QUERY = 14, + QRY_TIMEOUT = 15 } status_t; typedef enum{ @@ -86,7 +87,8 @@ typedef enum{ CONN_FAILURE=1, CONN_HANDSHAKE_FAILED=2, CONN_INVALID_INPUT=3, - CONN_ZOOKEEPER_ERROR=4 + CONN_ZOOKEEPER_ERROR=4, + CONN_HANDSHAKE_TIMEOUT=5 } connectionStatus_t; typedef enum{ diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp index 0204855b2..19fec6985 100644 --- a/contrib/native/client/src/include/drill/drillClient.hpp +++ b/contrib/native/client/src/include/drill/drillClient.hpp @@ -99,7 +99,11 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{ static void setBufferLimit(uint64_t l); static uint64_t getBufferLimit(); static void setSocketTimeout(int32_t l); + static void setHandshakeTimeout(int32_t l); + static void setQueryTimeout(int32_t l); static int32_t getSocketTimeout(); + static int32_t getHandshakeTimeout(); + static int32_t getQueryTimeout(); static logLevel_t getLogLevel(); private: // The logging level @@ -107,8 +111,26 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{ // The total amount of memory to be allocated by an instance of DrillClient. // For future use. Currently, not enforced. static uint64_t s_bufferLimit; - // Timeout (in seconds) for asynchronous read operations. Default is 180 seconds + + /** + * DrillClient configures timeout (in seconds) in a fine granularity. + * Disabled by setting the value to zero. + * + * s_socketTimout: (default 0) + * set SO_RCVTIMEO and SO_SNDTIMEO socket options and place a + * timeout on socket receives and sends. It is disabled by default. + * + * s_handshakeTimeout: (default 5) + * place a timeout on validating handshake. When an endpoint (host:port) + * is reachable but drillbit hangs or running another service. It will + * avoid the client hanging. + * + * s_queryTimeout: (default 180) + * place a timeout on waiting result of querying. + */ static int32_t s_socketTimeout; + static int32_t s_handshakeTimeout; + static int32_t s_queryTimeout; static boost::mutex s_mutex; }; -- cgit v1.2.3