aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiao Meng <xiaom.cs@gmail.com>2015-02-10 17:59:23 -0800
committerParth Chandra <pchandra@maprtech.com>2015-02-13 21:50:48 -0800
commit0553798c18aae6b3dfc14227e0cef5f2baea11fe (patch)
tree18c03ec489d97c23522b022bd530f514b0371c22
parent30769783ec22503d4ab0265da0b449df46a856fd (diff)
DRILL-1197: C++ Client. Differentiate socket/handshake/query timeout for deadline timer.
It also - returns more detailed connection status for validate handshake. - adds timeout options for query submitter.
-rw-r--r--contrib/native/client/example/querySubmitter.cpp15
-rw-r--r--contrib/native/client/src/clientlib/drillClient.cpp31
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp84
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp6
-rw-r--r--contrib/native/client/src/clientlib/errmsgs.cpp1
-rw-r--r--contrib/native/client/src/clientlib/errmsgs.hpp3
-rw-r--r--contrib/native/client/src/include/drill/common.hpp6
-rw-r--r--contrib/native/client/src/include/drill/drillClient.hpp24
8 files changed, 123 insertions, 47 deletions
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 2d8922379..9ecee2457 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -22,7 +22,7 @@
#include <stdlib.h>
#include "drill/drillc.hpp"
-int nOptions=8;
+int nOptions=10;
struct Option{
char name[32];
@@ -36,7 +36,9 @@ struct Option{
{"schema", "Default schema", false},
{"api", "API type [sync|async]", true},
{"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
- {"testCancel", "Cancel the query afterthe first record batch.", false}
+ {"testCancel", "Cancel the query afterthe first record batch.", false},
+ {"hshakeTimeout", "Handshake timeout (second).", false},
+ {"queryTimeout", "Query timeout (second).", false}
};
std::map<std::string, std::string> qsOptionValues;
@@ -266,6 +268,8 @@ int main(int argc, char* argv[]) {
std::string type_str=qsOptionValues["type"];
std::string logLevel=qsOptionValues["logLevel"];
std::string testCancel=qsOptionValues["testCancel"];
+ std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
+ std::string queryTimeout=qsOptionValues["queryTimeout"];
Drill::QueryType type;
@@ -309,6 +313,13 @@ int main(int argc, char* argv[]) {
int nQueries=queryInputs.size();
Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches.
+
+ if (!hshakeTimeout.empty()){
+ Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str()));
+ }
+ if (!queryTimeout.empty()){
+ Drill::DrillClientConfig::setQueryTimeout(atoi(queryTimeout.c_str()));
+ }
if(client.connect(connectStr.c_str(), schema.c_str())!=Drill::CONN_SUCCESS){
std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
return -1;
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 02bc1a47c..878dad4ba 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -43,9 +43,12 @@ DrillClientInitializer::~DrillClientInitializer(){
google::protobuf::ShutdownProtobufLibrary();
}
+// Initialize static member of DrillClientConfig
logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR;
uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE;
-int32_t DrillClientConfig::s_socketTimeout=180;
+int32_t DrillClientConfig::s_socketTimeout=0;
+int32_t DrillClientConfig::s_handshakeTimeout=5;
+int32_t DrillClientConfig::s_queryTimeout=180;
boost::mutex DrillClientConfig::s_mutex;
DrillClientConfig::DrillClientConfig(){
@@ -82,11 +85,35 @@ void DrillClientConfig::setSocketTimeout(int32_t t){
s_socketTimeout=t;
}
+void DrillClientConfig::setHandshakeTimeout(int32_t t){
+ if (t > 0) {
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ s_handshakeTimeout = t;
+ }
+}
+
+void DrillClientConfig::setQueryTimeout(int32_t t){
+ if (t>0){
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ s_queryTimeout=t;
+ }
+}
+
int32_t DrillClientConfig::getSocketTimeout(){
boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
return s_socketTimeout;
}
+int32_t DrillClientConfig::getHandshakeTimeout(){
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ return s_handshakeTimeout;
+}
+
+int32_t DrillClientConfig::getQueryTimeout(){
+ boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
+ return s_queryTimeout;
+}
+
logLevel_t DrillClientConfig::getLogLevel(){
boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
return s_logLevel;
@@ -263,7 +290,7 @@ connectionStatus_t DrillClient::connect(const char* connectStr, const char* defa
ret=this->m_pImpl->connect(connectStr);
if(ret==CONN_SUCCESS)
- ret=this->m_pImpl->validateHandShake(defaultSchema)?CONN_SUCCESS:CONN_HANDSHAKE_FAILED;
+ ret=this->m_pImpl->validateHandShake(defaultSchema);
return ret;
}
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index c832a7916..c0382ba0a 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -169,13 +169,17 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
}
- m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
- m_deadlineTimer.async_wait(boost::bind(
- &DrillClientImpl::handleHShakeReadTimeout,
- this,
- boost::asio::placeholders::error
- ));
- DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer." << std::endl;
+ m_io_service.reset();
+ if (DrillClientConfig::getHandshakeTimeout() > 0){
+ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHandshakeTimeout()));
+ m_deadlineTimer.async_wait(boost::bind(
+ &DrillClientImpl::handleHShakeReadTimeout,
+ this,
+ boost::asio::placeholders::error
+ ));
+ DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
+ << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;
+ }
async_read(
this->m_socket,
@@ -201,7 +205,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
boost::system::error_code error=err;
// cancel the timer
m_deadlineTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+ DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
if(!error){
InBoundRpcMessage msg;
uint32_t length = 0;
@@ -222,7 +226,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
}
DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
}else{
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
+ return;
}
exec::user::BitToUserHandshake b2u;
b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
@@ -243,21 +249,22 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){
// if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
- if(!err){
+ if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
- DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl;
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";
+ handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
+ m_io_service.stop();
boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr);
- m_socket.close();
+ m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
}
}
return;
}
-bool DrillClientImpl::validateHandShake(const char* defaultSchema){
+connectionStatus_t DrillClientImpl::validateHandShake(const char* defaultSchema){
DRILL_LOG(LOG_TRACE) << "validateHandShake\n";
@@ -282,19 +289,19 @@ bool DrillClientImpl::validateHandShake(const char* defaultSchema){
DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";
}
- recvHandshake();
- this->m_io_service.reset();
- if(this->m_pError!=NULL){
- return false;
+ connectionStatus_t ret = recvHandshake();
+ if(ret!=CONN_SUCCESS){
+ return ret;
}
if(m_handshakeVersion != u2b.rpc_version()) {
DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected << "
<< DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
- handleConnError(CONN_HANDSHAKE_FAILED,
+ return handleConnError(CONN_HANDSHAKE_FAILED,
getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION, m_handshakeVersion));
- return false;
}
- return true;
+ // reset io_service after handshake is validated before running queries
+ m_io_service.reset();
+ return CONN_SUCCESS;
}
@@ -365,14 +372,16 @@ void DrillClientImpl::getNextResult(){
}
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
-
- m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getSocketTimeout()));
- m_deadlineTimer.async_wait(boost::bind(
- &DrillClientImpl::handleReadTimeout,
- this,
- boost::asio::placeholders::error
- ));
- DRILL_LOG(LOG_TRACE) << "Started new async wait timer." << std::endl;
+ if (DrillClientConfig::getQueryTimeout() > 0){
+ DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
+ << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;
+ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
+ m_deadlineTimer.async_wait(boost::bind(
+ &DrillClientImpl::handleReadTimeout,
+ this,
+ boost::asio::placeholders::error
+ ));
+ }
async_read(
this->m_socket,
@@ -677,9 +686,9 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
case exec::shared::QueryResult_QueryState_COMPLETED:
{
//Not clean to call the handleTerminatedQryState method
- //because it signals an error to the listener.
+ //because it signals an error to the listener.
//The ODBC driver expects this though and the sync API
- //handles this (luckily).
+ //handles this (luckily).
ret=handleTerminatedQryState(ret,
getMessage(ERR_QRY_COMPLETED),
pDrillClientQueryResult);
@@ -700,16 +709,17 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
// if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
- if(!err){
+ if(err != boost::asio::error::operation_aborted){
+
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
- handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_TIMOUT), NULL);
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";
+ handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
// There is no longer an active deadline. The expiry is set to positive
// infinity so that the timer never expires until a new deadline is set.
// Note that at this time, the caller is not in a (async) wait for the timer.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
- DRILL_LOG(LOG_TRACE) << "Deadline timer expired." << std::endl;
// Cancel all pending async IOs.
// The cancel call _MAY_ not work on all platforms. To be a little more reliable we need
// to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?)
@@ -725,11 +735,13 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
- // cancel the timer
DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
<< reinterpret_cast<int*>(_buf) << std::endl;
- m_deadlineTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+ if(DrillClientConfig::getQueryTimeout() > 0){
+ // Cancel the timeout if handleRead is called
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";
+ m_deadlineTimer.cancel();
+ }
if(!error){
InBoundRpcMessage msg;
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index a5eeb7775..fdcf17882 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -211,7 +211,7 @@ class DrillClientImpl{
m_deadlineTimer.cancel();
m_io_service.stop();
boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr);
+ m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_socket.close();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
@@ -235,7 +235,7 @@ class DrillClientImpl{
DrillClientError* getError(){ return m_pError;}
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
void waitForResults();
- bool validateHandShake(const char* defaultSchema);
+ connectionStatus_t validateHandShake(const char* defaultSchema);
private:
friend class DrillClientQueryResult;
@@ -341,7 +341,7 @@ inline void DrillClientImpl::Close() {
//TODO: cancel pending query
if(this->m_bIsConnected){
boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr);
+ m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_socket.close();
m_bIsConnected=false;
}
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index a5e72172f..fa7272151 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -40,6 +40,7 @@ static Drill::ErrorMessages errorMessages[]={
{ERR_CONN_ZKNODBIT, ERR_CATEGORY_CONN, 0, "No drillbit found with this Zookeeper."},
{ERR_CONN_ZKNOAUTH, ERR_CATEGORY_CONN, 0, "Authentication failed."},
{ERR_CONN_ZKEXP, ERR_CATEGORY_CONN, 0, "Session expired."},
+ {ERR_CONN_HSHAKETIMOUT, ERR_CATEGORY_CONN, 0, "Handshake Timeout."},
{ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."},
{ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"},
{ERR_QRY_INVREADLEN, ERR_CATEGORY_QRY, 0, "Internal Error: Received a message with an invalid read length."},
diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp
index 9a69f213c..22e544f08 100644
--- a/contrib/native/client/src/clientlib/errmsgs.hpp
+++ b/contrib/native/client/src/clientlib/errmsgs.hpp
@@ -45,7 +45,8 @@ namespace Drill{
#define ERR_CONN_ZKNODBIT DRILL_ERR_START+12
#define ERR_CONN_ZKNOAUTH DRILL_ERR_START+13
#define ERR_CONN_ZKEXP DRILL_ERR_START+14
-#define ERR_CONN_MAX DRILL_ERR_START+14
+#define ERR_CONN_HSHAKETIMOUT DRILL_ERR_START+15
+#define ERR_CONN_MAX DRILL_ERR_START+15
#define ERR_QRY_OUTOFMEM ERR_CONN_MAX+1
#define ERR_QRY_COMMERR ERR_CONN_MAX+2
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index f83aae403..824d67062 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -78,7 +78,8 @@ typedef enum{
QRY_COMPLETED = 11,
QRY_CANCELED = 12,
QRY_FAILED = 13,
- QRY_UNKNOWN_QUERY = 14
+ QRY_UNKNOWN_QUERY = 14,
+ QRY_TIMEOUT = 15
} status_t;
typedef enum{
@@ -86,7 +87,8 @@ typedef enum{
CONN_FAILURE=1,
CONN_HANDSHAKE_FAILED=2,
CONN_INVALID_INPUT=3,
- CONN_ZOOKEEPER_ERROR=4
+ CONN_ZOOKEEPER_ERROR=4,
+ CONN_HANDSHAKE_TIMEOUT=5
} connectionStatus_t;
typedef enum{
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 0204855b2..19fec6985 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -99,7 +99,11 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
static void setBufferLimit(uint64_t l);
static uint64_t getBufferLimit();
static void setSocketTimeout(int32_t l);
+ static void setHandshakeTimeout(int32_t l);
+ static void setQueryTimeout(int32_t l);
static int32_t getSocketTimeout();
+ static int32_t getHandshakeTimeout();
+ static int32_t getQueryTimeout();
static logLevel_t getLogLevel();
private:
// The logging level
@@ -107,8 +111,26 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
// The total amount of memory to be allocated by an instance of DrillClient.
// For future use. Currently, not enforced.
static uint64_t s_bufferLimit;
- // Timeout (in seconds) for asynchronous read operations. Default is 180 seconds
+
+ /**
+ * DrillClient configures timeout (in seconds) in a fine granularity.
+ * Disabled by setting the value to zero.
+ *
+ * s_socketTimout: (default 0)
+ * set SO_RCVTIMEO and SO_SNDTIMEO socket options and place a
+ * timeout on socket receives and sends. It is disabled by default.
+ *
+ * s_handshakeTimeout: (default 5)
+ * place a timeout on validating handshake. When an endpoint (host:port)
+ * is reachable but drillbit hangs or running another service. It will
+ * avoid the client hanging.
+ *
+ * s_queryTimeout: (default 180)
+ * place a timeout on waiting result of querying.
+ */
static int32_t s_socketTimeout;
+ static int32_t s_handshakeTimeout;
+ static int32_t s_queryTimeout;
static boost::mutex s_mutex;
};