aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp171
1 files changed, 147 insertions, 24 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index eca0e7516..97afb8803 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -22,6 +22,7 @@
#include <string.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
@@ -148,6 +149,13 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
}
+ // set socket keep alive
+ boost::asio::socket_base::keep_alive keepAlive(true);
+ m_socket.set_option(keepAlive);
+ // set no_delay
+ boost::asio::ip::tcp::no_delay noDelay(true);
+ m_socket.set_option(noDelay);
+
//
// We put some OS dependent code here for timing out a socket. Mostly, this appears to
// do nothing. Should we leave it in there?
@@ -157,6 +165,74 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
return CONN_SUCCESS;
}
+void DrillClientImpl::startHeartbeatTimer(){
+ DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+ << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;
+ m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
+ m_heartbeatTimer.async_wait(boost::bind(
+ &DrillClientImpl::handleHeartbeatTimeout,
+ this,
+ boost::asio::placeholders::error
+ ));
+ startMessageListener(); // start this thread early so we don't have the timer blocked
+}
+
+connectionStatus_t DrillClientImpl::sendHeartbeat(){
+ connectionStatus_t status=CONN_SUCCESS;
+ exec::rpc::Ack ack;
+ ack.set_ok(true);
+ OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
+ boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+ DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+ status=sendSync(heartbeatMsg);
+ status=status==CONN_SUCCESS?status:CONN_DEAD;
+ //If the server sends responses to a heartbeat, we need to increment the pending requests counter.
+ if(m_pendingRequests++==0){
+ getNextResult(); // async wait for results
+ }
+ return status;
+}
+
+void DrillClientImpl::resetHeartbeatTimer(){
+ m_heartbeatTimer.cancel();
+ DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+ startHeartbeatTimer();
+}
+
+
+
+void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;
+ if(err != boost::asio::error::operation_aborted){
+ // Check whether the deadline has passed.
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
+ << to_simple_string(m_heartbeatTimer.expires_at())
+ << " and time now is: "
+ << to_simple_string(boost::asio::deadline_timer::traits_type::now())
+ << std::endl;
+ ;
+ if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
+ // The deadline has passed.
+ m_heartbeatTimer.expires_at(boost::posix_time::pos_infin);
+ if(sendHeartbeat()==CONN_SUCCESS){
+ startHeartbeatTimer();
+ }else{
+ // Close connection.
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";
+ shutdownSocket();
+ }
+ }
+ }
+ return;
+}
+
+
+void DrillClientImpl::Close() {
+ shutdownSocket();
+}
+
+
connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
boost::system::error_code ec;
@@ -205,6 +281,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
return static_cast<connectionStatus_t>(m_pError->status);
}
#endif // WIN32_SHUTDOWN_ON_TIMEOUT
+ startHeartbeatTimer();
return CONN_SUCCESS;
}
@@ -285,6 +362,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
u2b.set_channel(exec::shared::USER);
u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
+ u2b.set_support_timeout(true);
if(properties != NULL && properties->size()>0){
std::string username;
@@ -369,6 +447,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>));
+void DrillClientImpl::startMessageListener() {
+ if(this->m_pListenerThread==NULL){
+ // Stopping the io_service from running out-of-work
+ if(m_io_service.stopped()){
+ DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;
+ m_io_service.reset();
+ }
+ this->m_pWork = new boost::asio::io_service::work(m_io_service);
+ this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
+ &this->m_io_service));
+ DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
+ << this->m_pListenerThread << std::endl;
+ }
+}
+
DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t,
const std::string& plan,
pfnQueryResultsListener l,
@@ -408,20 +501,8 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
}
//run this in a new thread
- {
- if(this->m_pListenerThread==NULL){
- // Stopping the io_service from running out-of-work
- if(m_io_service.stopped()){
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: io_service is stopped. Restarting." <<std::endl;
- m_io_service.reset();
- }
- this->m_pWork = new boost::asio::io_service::work(m_io_service);
- this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
- &this->m_io_service));
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: Starting listener thread: "
- << this->m_pListenerThread << std::endl;
- }
- }
+ startMessageListener();
+
return pQuery;
}
@@ -437,6 +518,7 @@ void DrillClientImpl::getNextResult(){
AllocatedBuffer::s_memCV.wait(memLock);
}
}
+
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
if (DrillClientConfig::getQueryTimeout() > 0){
@@ -450,6 +532,8 @@ void DrillClientImpl::getNextResult(){
));
}
+ resetHeartbeatTimer();
+
async_read(
this->m_socket,
boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
@@ -464,13 +548,15 @@ void DrillClientImpl::getNextResult(){
}
void DrillClientImpl::waitForResults(){
- if(this->m_pListenerThread!=NULL){
- // do nothing. No we do not need to explicity wait for the listener thread to finish
- delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
- this->m_pListenerThread->join();
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
- << this->m_pListenerThread << " exited." << std::endl;
- delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+ // The listener thread never exists because it may be sending/receiving a heartbeat. Before the heartbeat was introduced
+ // we could check if the listener thread has exited to tell if the queries are done. We can no longer do so now. We check
+ // a condition variable instead
+ {
+ boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+ //if no more data, return NULL;
+ while(this->m_pendingRequests>0) {
+ this->m_cv.wait(cvLock);
+ }
}
}
@@ -511,6 +597,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
<< (rmsgLen - leftover) << std::endl;
ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
+
while(1){
size_t dataBytesRead=this->m_socket.read_some(
boost::asio::buffer(b, bytesToRead),
@@ -521,6 +608,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
}
+
if(!error){
// read data successfully
DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
@@ -583,7 +671,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
ret=QRY_CANCELED;
}
delete allocatedBuffer;
- return ret;
+ //return ret;
}else{
// Normal query results come back with query_state not set.
// Actually this is not strictly true. The query state is set to
@@ -591,6 +679,12 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
}
}
+ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
return ret;
}
@@ -841,7 +935,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
return;
}
- if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
+ if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
+ m_pendingRequests--;
+ delete allocatedBuffer;
+ DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;
+ if(m_pendingRequests!=0){
+ boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ getNextResult();
+ }else{
+ boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+ DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;
+ m_cv.notify_one();
+ }
+ return;
+ }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
status_t s = processQueryResult(allocatedBuffer, msg);
if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){
if(m_pendingRequests!=0){
@@ -991,10 +1098,18 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
std::map<int, DrillClientQueryResult*>::iterator iter;
if(!m_queryIds.empty()){
for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
- iter->second->signalError(pErr);
+ DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg);
+ iter->second->signalError(err);
}
}
+ delete pErr;
}
+ // We have an error at the connection level. Cancel the heartbeat.
+ // And close the connection
+ m_heartbeatTimer.cancel();
+ m_pendingRequests=0;
+ m_cv.notify_one();
+ shutdownSocket();
return;
}
@@ -1054,6 +1169,14 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
}
+void DrillClientImpl::shutdownSocket(){
+ m_io_service.stop();
+ boost::system::error_code ignorederr;
+ m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
+ m_bIsConnected=false;
+ DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+}
+
// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
// class are used by the async callbacks.
status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {