diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.cpp | 600 |
1 files changed, 446 insertions, 154 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index d4e9ed96e..3ec01f521 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){ #endif } - -void DrillClientImpl::parseConnectStr(const char* connectStr, - std::string& pathToDrill, - std::string& protocol, - std::string& hostPortStr){ - char u[MAX_CONNECT_STR+1]; - strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0; - char* z=strtok(u, "="); - char* c=strtok(NULL, "/"); - char* p=strtok(NULL, ""); - - if(p!=NULL) pathToDrill=std::string("/")+p; - protocol=z; hostPortStr=c; - return; -} - connectionStatus_t DrillClientImpl::connect(const char* connStr){ std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; if(!this->m_bIsConnected){ - parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(!strcmp(protocol.c_str(), "zk")){ ZookeeperImpl zook; - if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){ + std::vector<std::string> drillbits; + int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + if(!err){ + Utils::shuffle(drillbits); + exec::DrillbitEndpoint endpoint; + err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list + if(!err){ + host=boost::lexical_cast<std::string>(endpoint.address()); + port=boost::lexical_cast<std::string>(endpoint.user_port()); + } + } + if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); } - zook.debugPrint(); - exec::DrillbitEndpoint e=zook.getEndPoint(); - host=boost::lexical_cast<std::string>(e.address()); - port=boost::lexical_cast<std::string>(e.user_port()); zook.close(); + m_bIsDirectConnection=true; }else if(!strcmp(protocol.c_str(), "local")){ + boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant char tempStr[MAX_CONNECT_STR+1]; strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; host=strtok(tempStr, ":"); port=strtok(NULL, ""); + m_bIsDirectConnection=false; }else{ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } - return this->connect(host.c_str(), port.c_str()); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) + connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + return ret; + }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected + return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); } return CONN_SUCCESS; } @@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ tcp::resolver::iterator end; while (iter != end){ endpoint = *iter++; - DRILL_LOG(LOG_TRACE) << endpoint << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;) } boost::system::error_code ec; m_socket.connect(endpoint, ec); @@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what())); } + m_bIsConnected=true; // set socket keep alive boost::asio::socket_base::keep_alive keepAlive(true); m_socket.set_option(keepAlive); @@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ boost::asio::ip::tcp::no_delay noDelay(true); m_socket.set_option(noDelay); - // - // We put some OS dependent code here for timing out a socket. Mostly, this appears to - // do nothing. Should we leave it in there? - // - setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout()); - + std::ostringstream connectedHost; + connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port; + m_connectedHost = connectedHost.str(); + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;) + return CONN_SUCCESS; } void DrillClientImpl::startHeartbeatTimer(){ - DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " - << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " + << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;) m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency())); m_heartbeatTimer.async_wait(boost::bind( &DrillClientImpl::handleHeartbeatTimeout, this, boost::asio::placeholders::error )); - startMessageListener(); // start this thread early so we don't have the timer blocked + startMessageListener(); // start this thread early so we don't have the timer blocked } connectionStatus_t DrillClientImpl::sendHeartbeat(){ - connectionStatus_t status=CONN_SUCCESS; + connectionStatus_t status=CONN_SUCCESS; exec::rpc::Ack ack; ack.set_ok(true); OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); - boost::lock_guard<boost::mutex> prLock(this->m_prMutex); - boost::lock_guard<boost::mutex> lock(m_dcMutex); - DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl; + boost::lock_guard<boost::mutex> prLock(this->m_prMutex); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;) status=sendSync(heartbeatMsg); status=status==CONN_SUCCESS?status:CONN_DEAD; //If the server sends responses to a heartbeat, we need to increment the pending requests counter. @@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){ void DrillClientImpl::resetHeartbeatTimer(){ m_heartbeatTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;) startHeartbeatTimer(); } - - void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;) if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " << to_simple_string(m_heartbeatTimer.expires_at()) << " and time now is: " << to_simple_string(boost::asio::deadline_timer::traits_type::now()) - << std::endl; + << std::endl;) ; if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. @@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e startHeartbeatTimer(); }else{ // Close connection. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection."; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";) shutdownSocket(); } } @@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e return; } - void DrillClientImpl::Close() { shutdownSocket(); } @@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ this, boost::asio::placeholders::error )); - DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " - << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " + << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;) } async_read( @@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";) m_io_service.run(); if(m_rbuf!=NULL){ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; @@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, boost::system::error_code error=err; // cancel the timer m_deadlineTimer.cancel(); - DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;) if(!error){ InBoundRpcMessage msg; uint32_t length = 0; @@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, boost::asio::buffer(b, bytesToRead), error); if(err) break; - DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;) if(dataBytesRead==bytesToRead) break; bytesToRead-=dataBytesRead; b+=dataBytesRead; } DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg); }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); return; } @@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";) handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT)); m_io_service.stop(); boost::system::error_code ignorederr; @@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){ - DRILL_LOG(LOG_TRACE) << "validateHandShake\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";) exec::user::UserToBitHandshake u2b; u2b.set_channel(exec::shared::USER); @@ -368,7 +364,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope std::string username; std::string err; if(!properties->validate(err)){ - DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;) } exec::user::UserProperties* userProperties = u2b.mutable_properties(); @@ -376,8 +372,8 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope for(size_t i=0; i<properties->size(); i++){ std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i)); if(it==DrillUserProperties::USER_PROPERTIES.end()){ - DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) - << ") is unknown and is being skipped" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) + << ") is unknown and is being skipped" << std::endl;) continue; } if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){ @@ -392,9 +388,9 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope //u2b.set_credentials(&creds); } if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){ - DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;) }else{ - DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;) } }// Server properties } @@ -406,7 +402,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSync(out_msg); - DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";) } connectionStatus_t ret = recvHandshake(); @@ -416,21 +412,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope if(this->m_handshakeStatus != exec::user::SUCCESS){ switch(this->m_handshakeStatus){ case exec::user::RPC_VERSION_MISMATCH: - DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected " - << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected " + << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;) return handleConnError(CONN_BAD_RPC_VER, getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION, m_handshakeVersion, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::AUTH_FAILED: - DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;) return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_AUTHFAIL, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::UNKNOWN_FAILURE: - DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;) return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR, this->m_handshakeErrorId.c_str(), @@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() { if(this->m_pListenerThread==NULL){ // Stopping the io_service from running out-of-work if(m_io_service.stopped()){ - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;) m_io_service.reset(); } this->m_pWork = new boost::asio::io_service::work(m_io_service); this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service)); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " - << this->m_pListenerThread << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " + << this->m_pListenerThread << std::endl;) } } @@ -480,22 +476,23 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query); sendSync(out_msg); - pQuery = new DrillClientQueryResult(this, coordId); + pQuery = new DrillClientQueryResult(this, coordId, plan); pQuery->registerListener(l, lCtx); bool sendRequest=false; this->m_queryIds[coordId]=pQuery; - DRILL_LOG(LOG_DEBUG) << "Sent query request. Coordination id = " << coordId << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) if(m_pendingRequests++==0){ sendRequest=true; }else{ - DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl; - DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) } if(sendRequest){ - DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " - << m_pendingRequests << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " + << m_pendingRequests << std::endl;) getNextResult(); // async wait for results } } @@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){ { boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex); - DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;) while(AllocatedBuffer::s_isBufferLimitReached){ AllocatedBuffer::s_memCV.wait(memLock); } @@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){ //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ - DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " - << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " + << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;) m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout())); m_deadlineTimer.async_wait(boost::bind( &DrillClientImpl::handleReadTimeout, @@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){ boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";) } void DrillClientImpl::waitForResults(){ @@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, InBoundRpcMessage& msg, boost::system::error_code& error){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " - << reinterpret_cast<int*>(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " + << reinterpret_cast<int*>(_buf) << std::endl;) size_t leftover=0; uint32_t rmsgLen; AllocatedBufferPtr currentBuffer; @@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard<boost::mutex> lock(this->m_dcMutex); int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen); - DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl; - DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;) if(rmsgLen>0){ leftover = LEN_PREFIX_BUFLEN - bytes_read; // Allocate a buffer currentBuffer=new AllocatedBuffer(rmsgLen); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ " - << currentBuffer << ", size = " << rmsgLen << " ]\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ " + << currentBuffer << ", size = " << rmsgLen << " ]\n";) if(currentBuffer==NULL){ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); @@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, if(leftover){ memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover); } - DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : " - << (rmsgLen - leftover) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : " + << (rmsgLen - leftover) << std::endl;) ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; @@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, boost::asio::buffer(b, bytesToRead), error); if(error) break; - DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;) if(dataBytesRead==bytesToRead) break; bytesToRead-=dataBytesRead; b+=dataBytesRead; @@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, if(!error){ // read data successfully DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); - DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;) }else{ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_COMM_ERROR, @@ -624,8 +621,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL); } } - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer " - << reinterpret_cast<int*>(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer " + << reinterpret_cast<int*>(_buf) << std::endl;) Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } @@ -639,9 +636,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer boost::lock_guard<boost::mutex> lock(this->m_dcMutex); exec::shared::QueryResult qr; - DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;) qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;) qid.CopyFrom(qr.query_id()); @@ -657,7 +654,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer std::string valErr; if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;) return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } ret=processQueryStatusResult(&qr, pDrillClientQueryResult); @@ -665,9 +662,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer // We've received the final message for a query that has been cancelled // or for which the resources have been freed. We no longer need to listen // for more incoming messages for such a query. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;) m_pendingRequests--; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;) ret=QRY_CANCELED; } delete allocatedBuffer; @@ -676,10 +673,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer // Normal query results come back with query_state not set. // Actually this is not strictly true. The query state is set to // 0(i.e. PENDING), but protobuf thinks this means the value is not set. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";) } } - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. @@ -701,21 +698,21 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, boost::lock_guard<boost::mutex> lock(this->m_dcMutex); exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up. - DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;) qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) qid.CopyFrom(qr->query_id()); if(qid.part1()==0){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; return QRY_SUCCESS; } pDrillClientQueryResult=findQueryResult(qid); if(pDrillClientQueryResult==NULL){ - DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" - << debugPrintQid(qid) << ")." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" + << debugPrintQid(qid) << ")." << std::endl;) delete qr; delete allocatedBuffer; return ret; @@ -726,23 +723,23 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; delete qr; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) pDrillClientQueryResult->setQueryStatus(ret); return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } //Build Record Batch here - DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;) pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); pDrillClientQueryResult->m_numBatches++; - DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) pRecordBatch->build(); - DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " - << pRecordBatch->getNumRecords() << std::endl; - DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " - << pRecordBatch->getNumFields() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " + << pRecordBatch->getNumRecords() << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " + << pRecordBatch->getNumFields() << std::endl;) ret=pDrillClientQueryResult->setupColumnDefs(qr); if(ret==QRY_SUCCESS_WITH_INFO){ @@ -752,8 +749,8 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult->setIsQueryPending(true); pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener; if(pDrillClientQueryResult->m_bIsLastChunk){ - DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Received last batch. " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) + << "Received last batch. " << std::endl;) ret=QRY_NO_MORE_DATA; } pDrillClientQueryResult->setQueryStatus(ret); @@ -770,7 +767,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are // pushed on the wire before the cancel is processed. pDrillClientQueryResult->setIsQueryPending(false); - DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) pDrillClientQueryResult->setQueryStatus(ret); clearMapEntries(pDrillClientQueryResult); return ret; @@ -780,27 +777,27 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; - DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; boost::lock_guard<boost::mutex> lock(m_dcMutex); std::map<int,DrillClientQueryResult*>::iterator it; for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL"); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first - << " QueryId: "<< qidString << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first + << " QueryId: "<< qidString << std::endl;) } if(msg.m_coord_id==0){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } it=this->m_queryIds.find(msg.m_coord_id); if(it!=this->m_queryIds.end()){ pDrillClientQueryResult=(*it).second; exec::shared::QueryId *qid = new exec::shared::QueryId; - DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) m_queryResults[qid]=pDrillClientQueryResult; //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); @@ -814,20 +811,20 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){ DrillClientQueryResult* pDrillClientQueryResult=NULL; - DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;) std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;) if(m_queryResults.size() != 0){ for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" - << it->first->part2() << "]\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" + << it->first->part2() << "]\n";) } } it=this->m_queryResults.find(&qid); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; - DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << - debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << + debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;) } return pDrillClientQueryResult; } @@ -870,7 +867,7 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr break; default: { - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";) ret=handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_UNKQRYSTATE), pDrillClientQueryResult); @@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";) handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL); // There is no longer an active deadline. The expiry is set to positive // infinity so that the timer never expires until a new deadline is set. @@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, const boost::system::error_code& err, size_t bytes_transferred) { boost::system::error_code error=err; - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " - << reinterpret_cast<int*>(_buf) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " + << reinterpret_cast<int*>(_buf) << std::endl;) if(DrillClientConfig::getQueryTimeout() > 0){ // Cancel the timeout if handleRead is called - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";) m_deadlineTimer.cancel(); } if(!error){ InBoundRpcMessage msg; boost::lock_guard<boost::mutex> lock(this->m_prMutex); - DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) AllocatedBufferPtr allocatedBuffer=NULL; if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){ @@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away m_pendingRequests--; delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) if(m_pendingRequests!=0){ boost::lock_guard<boost::mutex> lock(this->m_dcMutex); getNextResult(); }else{ - boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); - DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl; - m_cv.notify_one(); + boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) + m_cv.notify_one(); } return; }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ @@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // We have a socket read error, but we do not know which query this is for. // Signal ALL pending queries that they should stop waiting. delete allocatedBuffer; - DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;) handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; }else{ @@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // We should properly handle these handshake requests/responses if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){ if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) exec::user::UserToBitHandshake u2b; u2b.set_channel(exec::shared::USER); u2b.set_rpc_version(DRILL_RPC_VERSION); u2b.set_support_listening(true); OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); sendSync(out_msg); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n"; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) } }else{ - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " - << "QueryResult returned " << msg.m_rpc_type << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } delete allocatedBuffer; @@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, // boost error Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " - "Boost Communication Error: " << error.message() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " + "Boost Communication Error: " << error.message() << std::endl;) handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; } @@ -1066,6 +1063,7 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s }else{ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_pError=pErr; + shutdownSocket(); } return status; } @@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){ OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard<boost::mutex> lock(m_dcMutex); sendSync(ack_msg); - DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;) } void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ @@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ uint64_t coordId = this->getNextCoordinationId(); OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); sendSync(cancel_msg); - DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;) } void DrillClientImpl::shutdownSocket(){ @@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){ boost::system::error_code ignorederr; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); m_bIsConnected=false; - DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;) } // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this @@ -1236,7 +1234,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err) { //ctx; // unused, we already have the this pointer - DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;) //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. if(this->m_bCancel){ if(b!=NULL) delete b; @@ -1247,8 +1245,8 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, { if(b!=NULL){ #ifdef DEBUG - DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id()) - << "Query result listener saved result to queue." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id()) + << "Query result listener saved result to queue." << std::endl;) #endif boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); this->m_recordBatches.push(b); @@ -1267,7 +1265,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){ boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; - DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { this->m_cv.wait(cvLock); } @@ -1281,14 +1279,14 @@ RecordBatch* DrillClientQueryResult::getNext() { boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending){ - DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;) if(!m_recordBatches.empty()){ - DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;) } return NULL; } - DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){ this->m_cv.wait(cvLock); } @@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){ m_columnDefs->clear(); } if(this->m_pQueryId!=NULL){ - DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;) } //Tell the parent to remove this from its lists m_pClient->clearMapEntries(this); @@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){ if(!m_recordBatches.empty()){ // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. - DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;) RecordBatch* pR=NULL; while(!m_recordBatches.empty()){ pR=m_recordBatches.front(); @@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){ } } + +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ + connectionStatus_t stat = CONN_SUCCESS; + std::string pathToDrill, protocol, hostPortStr; + std::string host; + std::string port; + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + if(!strcmp(protocol.c_str(), "zk")){ + // Get a list of drillbits + ZookeeperImpl zook; + std::vector<std::string> drillbits; + int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + if(!err){ + Utils::shuffle(drillbits); + // The original shuffled order is maintained if we shuffle first and then add any missing elements + Utils::add(m_drillbits, drillbits); + exec::DrillbitEndpoint e; + size_t nextIndex=0; + { + boost::lock_guard<boost::mutex> cLock(m_cMutex); + m_lastConnection++; + nextIndex = (m_lastConnection)%(getDrillbitCount()); + } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" + << "(" << (void*)this << ")" + << ": Current counter is: " + << m_lastConnection << std::endl;) + err=zook.getEndPoint(m_drillbits, nextIndex, e); + if(!err){ + host=boost::lexical_cast<std::string>(e.address()); + port=boost::lexical_cast<std::string>(e.user_port()); + } + } + if(err){ + return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); + } + zook.close(); + m_bIsDirectConnection=false; + }else if(!strcmp(protocol.c_str(), "local")){ + char tempStr[MAX_CONNECT_STR+1]; + strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; + host=strtok(tempStr, ":"); + port=strtok(NULL, ""); + m_bIsDirectConnection=true; + }else{ + return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); + } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;) + DrillClientImpl* pDrillClientImpl = new DrillClientImpl(); + stat = pDrillClientImpl->connect(host.c_str(), port.c_str()); + if(stat == CONN_SUCCESS){ + boost::lock_guard<boost::mutex> lock(m_poolMutex); + m_clientConnections.push_back(pDrillClientImpl); + }else{ + DrillClientError* pErr = pDrillClientImpl->getError(); + handleConnError((connectionStatus_t)pErr->status, pErr->msg); + delete pDrillClientImpl; + } + return stat; +} + +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){ + // Assume there is one valid connection to at least one drillbit + connectionStatus_t stat=CONN_FAILURE; + // Keep a copy of the user properties + if(props!=NULL){ + m_pUserProperties = new DrillUserProperties; + for(size_t i=0; i<props->size(); i++){ + m_pUserProperties->setProperty( + props->keyAt(i), + props->valueAt(i) + ); + } + } + DrillClientImpl* pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) + stat=pDrillClientImpl->validateHandshake(m_pUserProperties); + } + else{ + stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); + } + return stat; +} + +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){ + DrillClientQueryResult* pDrillClientQueryResult = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx); + m_queriesExecuted++; + } + return pDrillClientQueryResult; +} + +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){ + // Nothing to do. If this class ever keeps track of executing queries then it will need + // to implement this call to free any query specific resources the pool might have + // allocated + return; +} + +bool PooledDrillClientImpl::Active(){ + boost::lock_guard<boost::mutex> lock(m_poolMutex); + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + if((*it)->Active()){ + return true; + } + } + return false; +} + +void PooledDrillClientImpl::Close() { + boost::lock_guard<boost::mutex> lock(m_poolMutex); + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + (*it)->Close(); + delete *it; + } + m_clientConnections.clear(); + if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_lastConnection=-1; + m_queriesExecuted=0; +} + +DrillClientError* PooledDrillClientImpl::getError(){ + std::string errMsg; + std::string nl=""; + uint32_t stat; + boost::lock_guard<boost::mutex> lock(m_poolMutex); + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + if((*it)->getError() != NULL){ + errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg; + stat=(*it)->getError()->status; + } + } + if(errMsg.length()>0){ + if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } + m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg); + } + return m_pError; +} + +//Waits as long as any one drillbit connection has results pending +void PooledDrillClientImpl::waitForResults(){ + boost::lock_guard<boost::mutex> lock(m_poolMutex); + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + (*it)->waitForResults(); + } + return; +} + +connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ + DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;) + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_pError=pErr; + return status; +} + +DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ + DrillClientImpl* pDrillClientImpl = NULL; + while(pDrillClientImpl==NULL){ + if(m_queriesExecuted == 0){ + // First query ever sent can use the connection already established to authenticate the user + boost::lock_guard<boost::mutex> lock(m_poolMutex); + pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed + }else if(m_clientConnections.size() == m_maxConcurrentConnections){ + // Pool is full. Use one of the already established connections + boost::lock_guard<boost::mutex> lock(m_poolMutex); + pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections]; + if(!pDrillClientImpl->Active()){ + Utils::eraseRemove(m_clientConnections, pDrillClientImpl); + pDrillClientImpl=NULL; + } + }else{ + int tries=0; + connectionStatus_t ret=CONN_SUCCESS; + while(pDrillClientImpl==NULL && tries++ < 3){ + if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){ + boost::lock_guard<boost::mutex> lock(m_poolMutex); + pDrillClientImpl=m_clientConnections.back(); + ret=pDrillClientImpl->validateHandshake(m_pUserProperties); + if(ret!=CONN_SUCCESS){ + delete pDrillClientImpl; pDrillClientImpl=NULL; + m_clientConnections.erase(m_clientConnections.end()); + } + } + } // try a few times + if(ret!=CONN_SUCCESS){ + break; + } + } // need a new connection + }// while + + if(pDrillClientImpl==NULL){ + connectionStatus_t status = CONN_NOTCONNECTED; + handleConnError(status, getMessage(status)); + } + return pDrillClientImpl; +} + char ZookeeperImpl::s_drillRoot[]="/drill/"; char ZookeeperImpl::s_defaultCluster[]="drillbits1"; @@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){ return ZOO_LOG_LEVEL_ERROR; } +int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){ + uint32_t waitTime=30000; // 10 seconds + zoo_set_debug_level(getZkLogLevel()); + zoo_deterministic_conn_order(1); // enable deterministic order + struct String_vector* pDrillbits=NULL; + m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); + if(!m_zh) { + m_err = getMessage(ERR_CONN_ZKFAIL); + zookeeper_close(m_zh); + return -1; + }else{ + m_err=""; + //Wait for the completion handler to signal successful connection + boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); + while(this->m_bConnecting) { + if(!this->m_cv.timed_wait(bufferLock, timeout)){ + m_err = getMessage(ERR_CONN_ZKTIMOUT); + zookeeper_close(m_zh); + return -1; + } + } + } + if(m_state!=ZOO_CONNECTED_STATE){ + zookeeper_close(m_zh); + return -1; + } + int rc = ZOK; + if(pathToDrill==NULL || strlen(pathToDrill)==0){ + m_rootDir=s_drillRoot; + m_rootDir += s_defaultCluster; + }else{ + m_rootDir=pathToDrill; + } + + pDrillbits = new String_vector; + rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits); + if(rc!=ZOK){ + delete pDrillbits; + m_err=getMessage(ERR_CONN_ZKERR, rc); + zookeeper_close(m_zh); + return -1; + } + if(pDrillbits && pDrillbits->count > 0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" + << connectStr << "/" << pathToDrill + << ")." <<std::endl;) + for(int i=0; i<pDrillbits->count; i++){ + drillbits.push_back(pDrillbits->data[i]); + } + for(int i=0; i<drillbits.size(); i++){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;) + } + } + delete pDrillbits; + return 0; +} + +int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){ + int rc = ZOK; + exec::DrillServiceInstance drillServiceInstance; + if( drillbits.size() >0){ + // pick the drillbit at 'index' + const char * bit=drillbits[index].c_str(); + std::string s; + s=m_rootDir + std::string("/") + bit; + int buffer_len=MAX_CONNECT_STR; + char buffer[MAX_CONNECT_STR+1]; + struct Stat stat; + buffer[MAX_CONNECT_STR]=0; + rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKDBITERR, rc); + zookeeper_close(m_zh); + return -1; + } + exec::DrillServiceInstance drillServiceInstance; + drillServiceInstance.ParseFromArray(buffer, buffer_len); + endpoint=drillServiceInstance.endpoint(); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;) + }else{ + + m_err=getMessage(ERR_CONN_ZKNODBIT); + zookeeper_close(m_zh); + return -1; + } + return 0; +} + +// Deprecated int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){ uint32_t waitTime=30000; // 10 seconds zoo_set_debug_level(getZkLogLevel()); @@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat // signal the cond var { if (state == ZOO_CONNECTED_STATE){ - DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) } boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex); self->m_bConnecting=false; @@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat void ZookeeperImpl:: debugPrint(){ if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){ - DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;) } } |