aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp36
1 files changed, 26 insertions, 10 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 04d59c763..ada63e113 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -201,6 +201,7 @@ class DrillClientImpl{
m_pWork(NULL),
m_socket(m_io_service),
m_deadlineTimer(m_io_service),
+ m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
m_wbuf(MAX_SOCK_RD_BUFSIZE)
{
@@ -218,6 +219,7 @@ class DrillClientImpl{
this->m_pWork = NULL;
}
+ m_heartbeatTimer.cancel();
m_deadlineTimer.cancel();
m_io_service.stop();
boost::system::error_code ignorederr;
@@ -229,6 +231,13 @@ class DrillClientImpl{
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
+ //Terminate and free the heartbeat thread
+ //if(this->m_pHeartbeatThread!=NULL){
+ // this->m_pHeartbeatThread->interrupt();
+ // this->m_pHeartbeatThread->join();
+ // delete this->m_pHeartbeatThread;
+ // this->m_pHeartbeatThread = NULL;
+ //}
//Terminate and free the listener thread
if(this->m_pListenerThread!=NULL){
this->m_pListenerThread->interrupt();
@@ -260,6 +269,11 @@ class DrillClientImpl{
// Direct connection to a drillbit
// host can be name or ip address, port can be port number or name of service in /etc/services
connectionStatus_t connect(const char* host, const char* port);
+ void startHeartbeatTimer();// start a heartbeat timer
+ connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
+ void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends a message to the server (after sendAck, or submitQuery)
+ void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
+
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr);
// send synchronous messages
@@ -269,6 +283,8 @@ class DrillClientImpl{
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
void handleHShakeReadTimeout(const boost::system::error_code & err);
+ // starts the listener thread that receives responses/messages from the server
+ void startMessageListener();
// Query results
void getNextResult();
status_t readMsg(
@@ -302,6 +318,8 @@ class DrillClientImpl{
void sendAck(InBoundRpcMessage& msg, bool isOk);
void sendCancel(exec::shared::QueryId* pQueryId);
+ void shutdownSocket();
+
static RpcEncoder s_encoder;
static RpcDecoder s_decoder;
@@ -325,6 +343,10 @@ class DrillClientImpl{
// If the error is query specific, only the query results object will have the error set.
DrillClientError* m_pError;
+ //Started after the connection is established and sends heartbeat messages after {heartbeat frequency} seconds
+ //The thread is killed on disconnect.
+ //boost::thread * m_pHeartbeatThread;
+
// for boost asio
boost::thread * m_pListenerThread;
boost::asio::io_service m_io_service;
@@ -332,6 +354,7 @@ class DrillClientImpl{
boost::asio::io_service::work * m_pWork;
boost::asio::ip::tcp::socket m_socket;
boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
+ boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
//for synchronous messages, like validate handshake
ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
@@ -346,22 +369,15 @@ class DrillClientImpl{
// Map of query id to query result for currently executing queries
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
+ // Condition variable to signal completion of all queries.
+ boost::condition_variable m_cv;
+
};
inline bool DrillClientImpl::Active() {
return this->m_bIsConnected;;
}
-inline void DrillClientImpl::Close() {
- //TODO: cancel pending query
- if(this->m_bIsConnected){
- boost::system::error_code ignorederr;
- m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
- m_socket.close();
- m_bIsConnected=false;
- }
-}
-
class ZookeeperImpl{
public:
ZookeeperImpl();