diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.cpp | 171 |
1 files changed, 147 insertions, 24 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index eca0e7516..97afb8803 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -22,6 +22,7 @@ #include <string.h> #include <boost/asio.hpp> #include <boost/bind.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/lexical_cast.hpp> #include <boost/thread.hpp> @@ -148,6 +149,13 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what())); } + // set socket keep alive + boost::asio::socket_base::keep_alive keepAlive(true); + m_socket.set_option(keepAlive); + // set no_delay + boost::asio::ip::tcp::no_delay noDelay(true); + m_socket.set_option(noDelay); + // // We put some OS dependent code here for timing out a socket. Mostly, this appears to // do nothing. Should we leave it in there? @@ -157,6 +165,74 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return CONN_SUCCESS; } +void DrillClientImpl::startHeartbeatTimer(){ + DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " + << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl; + m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency())); + m_heartbeatTimer.async_wait(boost::bind( + &DrillClientImpl::handleHeartbeatTimeout, + this, + boost::asio::placeholders::error + )); + startMessageListener(); // start this thread early so we don't have the timer blocked +} + +connectionStatus_t DrillClientImpl::sendHeartbeat(){ + connectionStatus_t status=CONN_SUCCESS; + exec::rpc::Ack ack; + ack.set_ok(true); + OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); + boost::lock_guard<boost::mutex> prLock(this->m_prMutex); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl; + status=sendSync(heartbeatMsg); + status=status==CONN_SUCCESS?status:CONN_DEAD; + //If the server sends responses to a heartbeat, we need to increment the pending requests counter. + if(m_pendingRequests++==0){ + getNextResult(); // async wait for results + } + return status; +} + +void DrillClientImpl::resetHeartbeatTimer(){ + m_heartbeatTimer.cancel(); + DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl; + startHeartbeatTimer(); +} + + + +void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl; + if(err != boost::asio::error::operation_aborted){ + // Check whether the deadline has passed. + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " + << to_simple_string(m_heartbeatTimer.expires_at()) + << " and time now is: " + << to_simple_string(boost::asio::deadline_timer::traits_type::now()) + << std::endl; + ; + if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ + // The deadline has passed. + m_heartbeatTimer.expires_at(boost::posix_time::pos_infin); + if(sendHeartbeat()==CONN_SUCCESS){ + startHeartbeatTimer(); + }else{ + // Close connection. + DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection."; + shutdownSocket(); + } + } + } + return; +} + + +void DrillClientImpl::Close() { + shutdownSocket(); +} + + connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){ DrillClientImpl::s_encoder.Encode(m_wbuf, msg); boost::system::error_code ec; @@ -205,6 +281,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ return static_cast<connectionStatus_t>(m_pError->status); } #endif // WIN32_SHUTDOWN_ON_TIMEOUT + startHeartbeatTimer(); return CONN_SUCCESS; } @@ -285,6 +362,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope u2b.set_channel(exec::shared::USER); u2b.set_rpc_version(DRILL_RPC_VERSION); u2b.set_support_listening(true); + u2b.set_support_timeout(true); if(properties != NULL && properties->size()>0){ std::string username; @@ -369,6 +447,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>)); +void DrillClientImpl::startMessageListener() { + if(this->m_pListenerThread==NULL){ + // Stopping the io_service from running out-of-work + if(m_io_service.stopped()){ + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl; + m_io_service.reset(); + } + this->m_pWork = new boost::asio::io_service::work(m_io_service); + this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, + &this->m_io_service)); + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " + << this->m_pListenerThread << std::endl; + } +} + DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener l, @@ -408,20 +501,8 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t } //run this in a new thread - { - if(this->m_pListenerThread==NULL){ - // Stopping the io_service from running out-of-work - if(m_io_service.stopped()){ - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: io_service is stopped. Restarting." <<std::endl; - m_io_service.reset(); - } - this->m_pWork = new boost::asio::io_service::work(m_io_service); - this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, - &this->m_io_service)); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::SubmitQuery: Starting listener thread: " - << this->m_pListenerThread << std::endl; - } - } + startMessageListener(); + return pQuery; } @@ -437,6 +518,7 @@ void DrillClientImpl::getNextResult(){ AllocatedBuffer::s_memCV.wait(memLock); } } + //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ @@ -450,6 +532,8 @@ void DrillClientImpl::getNextResult(){ )); } + resetHeartbeatTimer(); + async_read( this->m_socket, boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN), @@ -464,13 +548,15 @@ void DrillClientImpl::getNextResult(){ } void DrillClientImpl::waitForResults(){ - if(this->m_pListenerThread!=NULL){ - // do nothing. No we do not need to explicity wait for the listener thread to finish - delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit - this->m_pListenerThread->join(); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread " - << this->m_pListenerThread << " exited." << std::endl; - delete this->m_pListenerThread; this->m_pListenerThread=NULL; + // The listener thread never exists because it may be sending/receiving a heartbeat. Before the heartbeat was introduced + // we could check if the listener thread has exited to tell if the queries are done. We can no longer do so now. We check + // a condition variable instead + { + boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); + //if no more data, return NULL; + while(this->m_pendingRequests>0) { + this->m_cv.wait(cvLock); + } } } @@ -511,6 +597,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, << (rmsgLen - leftover) << std::endl; ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; + while(1){ size_t dataBytesRead=this->m_socket.read_some( boost::asio::buffer(b, bytesToRead), @@ -521,6 +608,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, bytesToRead-=dataBytesRead; b+=dataBytesRead; } + if(!error){ // read data successfully DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); @@ -583,7 +671,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer ret=QRY_CANCELED; } delete allocatedBuffer; - return ret; + //return ret; }else{ // Normal query results come back with query_state not set. // Actually this is not strictly true. The query state is set to @@ -591,6 +679,12 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n"; } } + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl; + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } return ret; } @@ -841,7 +935,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, return; } - if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ + if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away + m_pendingRequests--; + delete allocatedBuffer; + DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl; + if(m_pendingRequests!=0){ + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + getNextResult(); + }else{ + boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); + DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl; + m_cv.notify_one(); + } + return; + }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ status_t s = processQueryResult(allocatedBuffer, msg); if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){ if(m_pendingRequests!=0){ @@ -991,10 +1098,18 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){ std::map<int, DrillClientQueryResult*>::iterator iter; if(!m_queryIds.empty()){ for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) { - iter->second->signalError(pErr); + DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg); + iter->second->signalError(err); } } + delete pErr; } + // We have an error at the connection level. Cancel the heartbeat. + // And close the connection + m_heartbeatTimer.cancel(); + m_pendingRequests=0; + m_cv.notify_one(); + shutdownSocket(); return; } @@ -1054,6 +1169,14 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl; } +void DrillClientImpl::shutdownSocket(){ + m_io_service.stop(); + boost::system::error_code ignorederr; + m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); + m_bIsConnected=false; + DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl; +} + // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this // class are used by the async callbacks. status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) { |