diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.hpp | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 04d59c763..ada63e113 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -201,6 +201,7 @@ class DrillClientImpl{ m_pWork(NULL), m_socket(m_io_service), m_deadlineTimer(m_io_service), + m_heartbeatTimer(m_io_service), m_rbuf(NULL), m_wbuf(MAX_SOCK_RD_BUFSIZE) { @@ -218,6 +219,7 @@ class DrillClientImpl{ this->m_pWork = NULL; } + m_heartbeatTimer.cancel(); m_deadlineTimer.cancel(); m_io_service.stop(); boost::system::error_code ignorederr; @@ -229,6 +231,13 @@ class DrillClientImpl{ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } + //Terminate and free the heartbeat thread + //if(this->m_pHeartbeatThread!=NULL){ + // this->m_pHeartbeatThread->interrupt(); + // this->m_pHeartbeatThread->join(); + // delete this->m_pHeartbeatThread; + // this->m_pHeartbeatThread = NULL; + //} //Terminate and free the listener thread if(this->m_pListenerThread!=NULL){ this->m_pListenerThread->interrupt(); @@ -260,6 +269,11 @@ class DrillClientImpl{ // Direct connection to a drillbit // host can be name or ip address, port can be port number or name of service in /etc/services connectionStatus_t connect(const char* host, const char* port); + void startHeartbeatTimer();// start a heartbeat timer + connectionStatus_t sendHeartbeat(); // send a heartbeat to the server + void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends a message to the server (after sendAck, or submitQuery) + void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out. + int32_t getNextCoordinationId(){ return ++m_coordinationId; }; void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr); // send synchronous messages @@ -269,6 +283,8 @@ class DrillClientImpl{ connectionStatus_t recvHandshake(); void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred ); void handleHShakeReadTimeout(const boost::system::error_code & err); + // starts the listener thread that receives responses/messages from the server + void startMessageListener(); // Query results void getNextResult(); status_t readMsg( @@ -302,6 +318,8 @@ class DrillClientImpl{ void sendAck(InBoundRpcMessage& msg, bool isOk); void sendCancel(exec::shared::QueryId* pQueryId); + void shutdownSocket(); + static RpcEncoder s_encoder; static RpcDecoder s_decoder; @@ -325,6 +343,10 @@ class DrillClientImpl{ // If the error is query specific, only the query results object will have the error set. DrillClientError* m_pError; + //Started after the connection is established and sends heartbeat messages after {heartbeat frequency} seconds + //The thread is killed on disconnect. + //boost::thread * m_pHeartbeatThread; + // for boost asio boost::thread * m_pListenerThread; boost::asio::io_service m_io_service; @@ -332,6 +354,7 @@ class DrillClientImpl{ boost::asio::io_service::work * m_pWork; boost::asio::ip::tcp::socket m_socket; boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return + boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages //for synchronous messages, like validate handshake ByteBuf_t m_rbuf; // buffer for receiving synchronous messages @@ -346,22 +369,15 @@ class DrillClientImpl{ // Map of query id to query result for currently executing queries std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults; + // Condition variable to signal completion of all queries. + boost::condition_variable m_cv; + }; inline bool DrillClientImpl::Active() { return this->m_bIsConnected;; } -inline void DrillClientImpl::Close() { - //TODO: cancel pending query - if(this->m_bIsConnected){ - boost::system::error_code ignorederr; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); - m_socket.close(); - m_bIsConnected=false; - } -} - class ZookeeperImpl{ public: ZookeeperImpl(); |