aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp600
1 files changed, 446 insertions, 154 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index d4e9ed96e..3ec01f521 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -78,47 +78,46 @@ void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
#endif
}
-
-void DrillClientImpl::parseConnectStr(const char* connectStr,
- std::string& pathToDrill,
- std::string& protocol,
- std::string& hostPortStr){
- char u[MAX_CONNECT_STR+1];
- strncpy(u,connectStr, MAX_CONNECT_STR); u[MAX_CONNECT_STR]=0;
- char* z=strtok(u, "=");
- char* c=strtok(NULL, "/");
- char* p=strtok(NULL, "");
-
- if(p!=NULL) pathToDrill=std::string("/")+p;
- protocol=z; hostPortStr=c;
- return;
-}
-
connectionStatus_t DrillClientImpl::connect(const char* connStr){
std::string pathToDrill, protocol, hostPortStr;
std::string host;
std::string port;
if(!this->m_bIsConnected){
- parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
if(!strcmp(protocol.c_str(), "zk")){
ZookeeperImpl zook;
- if(zook.connectToZookeeper(hostPortStr.c_str(), pathToDrill.c_str())!=0){
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ exec::DrillbitEndpoint endpoint;
+ err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list
+ if(!err){
+ host=boost::lexical_cast<std::string>(endpoint.address());
+ port=boost::lexical_cast<std::string>(endpoint.user_port());
+ }
+ }
+ if(err){
return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
}
- zook.debugPrint();
- exec::DrillbitEndpoint e=zook.getEndPoint();
- host=boost::lexical_cast<std::string>(e.address());
- port=boost::lexical_cast<std::string>(e.user_port());
zook.close();
+ m_bIsDirectConnection=true;
}else if(!strcmp(protocol.c_str(), "local")){
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
char tempStr[MAX_CONNECT_STR+1];
strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
host=strtok(tempStr, ":");
port=strtok(NULL, "");
+ m_bIsDirectConnection=false;
}else{
return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
}
- return this->connect(host.c_str(), port.c_str());
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
+ connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+ return ret;
+ }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected
+ return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
return CONN_SUCCESS;
}
@@ -133,7 +132,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
tcp::resolver::iterator end;
while (iter != end){
endpoint = *iter++;
- DRILL_LOG(LOG_TRACE) << endpoint << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
}
boost::system::error_code ec;
m_socket.connect(endpoint, ec);
@@ -149,6 +148,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what()));
}
+ m_bIsConnected=true;
// set socket keep alive
boost::asio::socket_base::keep_alive keepAlive(true);
m_socket.set_option(keepAlive);
@@ -156,35 +156,34 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
boost::asio::ip::tcp::no_delay noDelay(true);
m_socket.set_option(noDelay);
- //
- // We put some OS dependent code here for timing out a socket. Mostly, this appears to
- // do nothing. Should we leave it in there?
- //
- setSocketTimeout(m_socket, DrillClientConfig::getSocketTimeout());
-
+ std::ostringstream connectedHost;
+ connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
+ m_connectedHost = connectedHost.str();
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
+
return CONN_SUCCESS;
}
void DrillClientImpl::startHeartbeatTimer(){
- DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
- << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+ << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;)
m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
m_heartbeatTimer.async_wait(boost::bind(
&DrillClientImpl::handleHeartbeatTimeout,
this,
boost::asio::placeholders::error
));
- startMessageListener(); // start this thread early so we don't have the timer blocked
+ startMessageListener(); // start this thread early so we don't have the timer blocked
}
connectionStatus_t DrillClientImpl::sendHeartbeat(){
- connectionStatus_t status=CONN_SUCCESS;
+ connectionStatus_t status=CONN_SUCCESS;
exec::rpc::Ack ack;
ack.set_ok(true);
OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
- boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
- boost::lock_guard<boost::mutex> lock(m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;
+ boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
status=sendSync(heartbeatMsg);
status=status==CONN_SUCCESS?status:CONN_DEAD;
//If the server sends responses to a heartbeat, we need to increment the pending requests counter.
@@ -196,21 +195,19 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
void DrillClientImpl::resetHeartbeatTimer(){
m_heartbeatTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;)
startHeartbeatTimer();
}
-
-
void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
<< to_simple_string(m_heartbeatTimer.expires_at())
<< " and time now is: "
<< to_simple_string(boost::asio::deadline_timer::traits_type::now())
- << std::endl;
+ << std::endl;)
;
if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
@@ -219,7 +216,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
startHeartbeatTimer();
}else{
// Close connection.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";)
shutdownSocket();
}
}
@@ -227,7 +224,6 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
return;
}
-
void DrillClientImpl::Close() {
shutdownSocket();
}
@@ -257,8 +253,8 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
this,
boost::asio::placeholders::error
));
- DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
- << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
+ << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
}
async_read(
@@ -271,7 +267,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
m_io_service.run();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
@@ -292,7 +288,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
boost::system::error_code error=err;
// cancel the timer
m_deadlineTimer.cancel();
- DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
if(!error){
InBoundRpcMessage msg;
uint32_t length = 0;
@@ -306,14 +302,14 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
boost::asio::buffer(b, bytesToRead),
error);
if(err) break;
- DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Handshake Message: actual bytes read = " << dataBytesRead << std::endl;)
if(dataBytesRead==bytesToRead) break;
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
}
DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
return;
}
@@ -344,7 +340,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
m_io_service.stop();
boost::system::error_code ignorederr;
@@ -356,7 +352,7 @@ void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code &
connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
- DRILL_LOG(LOG_TRACE) << "validateHandShake\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
@@ -368,7 +364,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
std::string username;
std::string err;
if(!properties->validate(err)){
- DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;)
}
exec::user::UserProperties* userProperties = u2b.mutable_properties();
@@ -376,8 +372,8 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
for(size_t i=0; i<properties->size(); i++){
std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
if(it==DrillUserProperties::USER_PROPERTIES.end()){
- DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
- << ") is unknown and is being skipped" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
+ << ") is unknown and is being skipped" << std::endl;)
continue;
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){
@@ -392,9 +388,9 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
//u2b.set_credentials(&creds);
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){
- DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ": ********** " << std::endl;)
}else{
- DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << properties->keyAt(i) << ":" << properties->valueAt(i) << std::endl;)
}
}// Server properties
}
@@ -406,7 +402,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
sendSync(out_msg);
- DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
}
connectionStatus_t ret = recvHandshake();
@@ -416,21 +412,21 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
if(this->m_handshakeStatus != exec::user::SUCCESS){
switch(this->m_handshakeStatus){
case exec::user::RPC_VERSION_MISMATCH:
- DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected "
- << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected "
+ << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;)
return handleConnError(CONN_BAD_RPC_VER,
getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION,
m_handshakeVersion,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::AUTH_FAILED:
- DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;)
return handleConnError(CONN_AUTH_FAILED,
getMessage(ERR_CONN_AUTHFAIL,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::UNKNOWN_FAILURE:
- DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;)
return handleConnError(CONN_HANDSHAKE_FAILED,
getMessage(ERR_CONN_UNKNOWN_ERR,
this->m_handshakeErrorId.c_str(),
@@ -451,14 +447,14 @@ void DrillClientImpl::startMessageListener() {
if(this->m_pListenerThread==NULL){
// Stopping the io_service from running out-of-work
if(m_io_service.stopped()){
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;)
m_io_service.reset();
}
this->m_pWork = new boost::asio::io_service::work(m_io_service);
this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
- << this->m_pListenerThread << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
+ << this->m_pListenerThread << std::endl;)
}
}
@@ -480,22 +476,23 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
sendSync(out_msg);
- pQuery = new DrillClientQueryResult(this, coordId);
+ pQuery = new DrillClientQueryResult(this, coordId, plan);
pQuery->registerListener(l, lCtx);
bool sendRequest=false;
this->m_queryIds[coordId]=pQuery;
- DRILL_LOG(LOG_DEBUG) << "Sent query request. Coordination id = " << coordId << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;)
if(m_pendingRequests++==0){
sendRequest=true;
}else{
- DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;
- DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
}
if(sendRequest){
- DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
- << m_pendingRequests << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
+ << m_pendingRequests << std::endl;)
getNextResult(); // async wait for results
}
}
@@ -513,7 +510,7 @@ void DrillClientImpl::getNextResult(){
{
boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
- DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;)
while(AllocatedBuffer::s_isBufferLimitReached){
AllocatedBuffer::s_memCV.wait(memLock);
}
@@ -522,8 +519,8 @@ void DrillClientImpl::getNextResult(){
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
if (DrillClientConfig::getQueryTimeout() > 0){
- DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
- << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
+ << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;)
m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
m_deadlineTimer.async_wait(boost::bind(
&DrillClientImpl::handleReadTimeout,
@@ -544,7 +541,7 @@ void DrillClientImpl::getNextResult(){
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
}
void DrillClientImpl::waitForResults(){
@@ -565,8 +562,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
InBoundRpcMessage& msg,
boost::system::error_code& error){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
size_t leftover=0;
uint32_t rmsgLen;
AllocatedBufferPtr currentBuffer;
@@ -576,15 +573,15 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
- DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;
- DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
if(rmsgLen>0){
leftover = LEN_PREFIX_BUFLEN - bytes_read;
// Allocate a buffer
currentBuffer=new AllocatedBuffer(rmsgLen);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
- << currentBuffer << ", size = " << rmsgLen << " ]\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Allocated and locked buffer: [ "
+ << currentBuffer << ", size = " << rmsgLen << " ]\n";)
if(currentBuffer==NULL){
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
@@ -593,8 +590,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
if(leftover){
memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
}
- DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
- << (rmsgLen - leftover) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
+ << (rmsgLen - leftover) << std::endl;)
ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
@@ -603,7 +600,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
boost::asio::buffer(b, bytesToRead),
error);
if(error) break;
- DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual bytes read = " << dataBytesRead << std::endl;)
if(dataBytesRead==bytesToRead) break;
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
@@ -612,7 +609,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
if(!error){
// read data successfully
DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
- DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
}else{
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR,
@@ -624,8 +621,8 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
}
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
@@ -639,9 +636,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryResult qr;
- DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
qid.CopyFrom(qr.query_id());
@@ -657,7 +654,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
std::string valErr;
if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;)
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
@@ -665,9 +662,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
// We've received the final message for a query that has been cancelled
// or for which the resources have been freed. We no longer need to listen
// for more incoming messages for such a query.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;)
m_pendingRequests--;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;)
ret=QRY_CANCELED;
}
delete allocatedBuffer;
@@ -676,10 +673,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
// Normal query results come back with query_state not set.
// Actually this is not strictly true. The query state is set to
// 0(i.e. PENDING), but protobuf thinks this means the value is not set.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";)
}
}
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
@@ -701,21 +698,21 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up.
- DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;)
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
qid.CopyFrom(qr->query_id());
if(qid.part1()==0){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
delete allocatedBuffer;
return QRY_SUCCESS;
}
pDrillClientQueryResult=findQueryResult(qid);
if(pDrillClientQueryResult==NULL){
- DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
- << debugPrintQid(qid) << ")." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
+ << debugPrintQid(qid) << ")." << std::endl;)
delete qr;
delete allocatedBuffer;
return ret;
@@ -726,23 +723,23 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
delete qr;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
pDrillClientQueryResult->setQueryStatus(ret);
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
//Build Record Batch here
- DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;)
pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
pDrillClientQueryResult->m_numBatches++;
- DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
pRecordBatch->build();
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
- << pRecordBatch->getNumRecords() << std::endl;
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
- << pRecordBatch->getNumFields() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
+ << pRecordBatch->getNumRecords() << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
+ << pRecordBatch->getNumFields() << std::endl;)
ret=pDrillClientQueryResult->setupColumnDefs(qr);
if(ret==QRY_SUCCESS_WITH_INFO){
@@ -752,8 +749,8 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
pDrillClientQueryResult->setIsQueryPending(true);
pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
if(pDrillClientQueryResult->m_bIsLastChunk){
- DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
- << "Received last batch. " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+ << "Received last batch. " << std::endl;)
ret=QRY_NO_MORE_DATA;
}
pDrillClientQueryResult->setQueryStatus(ret);
@@ -770,7 +767,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
// Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
// pushed on the wire before the cancel is processed.
pDrillClientQueryResult->setIsQueryPending(false);
- DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
pDrillClientQueryResult->setQueryStatus(ret);
clearMapEntries(pDrillClientQueryResult);
return ret;
@@ -780,27 +777,27 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
- DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
boost::lock_guard<boost::mutex> lock(m_dcMutex);
std::map<int,DrillClientQueryResult*>::iterator it;
for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
- << " QueryId: "<< qidString << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+ << " QueryId: "<< qidString << std::endl;)
}
if(msg.m_coord_id==0){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
it=this->m_queryIds.find(msg.m_coord_id);
if(it!=this->m_queryIds.end()){
pDrillClientQueryResult=(*it).second;
exec::shared::QueryId *qid = new exec::shared::QueryId;
- DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
m_queryResults[qid]=pDrillClientQueryResult;
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
@@ -814,20 +811,20 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
- DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;)
if(m_queryResults.size() != 0){
for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
- << it->first->part2() << "]\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
+ << it->first->part2() << "]\n";)
}
}
it=this->m_queryResults.find(&qid);
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
- DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
- debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
+ debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;)
}
return pDrillClientQueryResult;
}
@@ -870,7 +867,7 @@ status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr
break;
default:
{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";)
ret=handleQryError(QRY_INTERNAL_ERROR,
getMessage(ERR_QRY_UNKQRYSTATE),
pDrillClientQueryResult);
@@ -887,7 +884,7 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";)
handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
// There is no longer an active deadline. The expiry is set to positive
// infinity so that the timer never expires until a new deadline is set.
@@ -913,18 +910,18 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
- << reinterpret_cast<int*>(_buf) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
+ << reinterpret_cast<int*>(_buf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
// Cancel the timeout if handleRead is called
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
m_deadlineTimer.cancel();
}
if(!error){
InBoundRpcMessage msg;
boost::lock_guard<boost::mutex> lock(this->m_prMutex);
- DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
AllocatedBufferPtr allocatedBuffer=NULL;
if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
@@ -938,14 +935,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
m_pendingRequests--;
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}else{
- boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;
- m_cv.notify_one();
+ boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
+ m_cv.notify_one();
}
return;
}else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
@@ -988,7 +985,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// We have a socket read error, but we do not know which query this is for.
// Signal ALL pending queries that they should stop waiting.
delete allocatedBuffer;
- DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}else{
@@ -997,20 +994,20 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// We should properly handle these handshake requests/responses
if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
sendSync(out_msg);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
}
}else{
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
- << "QueryResult returned " << msg.m_rpc_type << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << std::endl;)
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
delete allocatedBuffer;
@@ -1025,8 +1022,8 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
// boost error
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
- "Boost Communication Error: " << error.message() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
+ "Boost Communication Error: " << error.message() << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}
@@ -1066,6 +1063,7 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
}else{
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
+ shutdownSocket();
}
return status;
}
@@ -1158,7 +1156,7 @@ void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSync(ack_msg);
- DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
}
void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
@@ -1166,7 +1164,7 @@ void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
uint64_t coordId = this->getNextCoordinationId();
OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
sendSync(cancel_msg);
- DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
}
void DrillClientImpl::shutdownSocket(){
@@ -1174,7 +1172,7 @@ void DrillClientImpl::shutdownSocket(){
boost::system::error_code ignorederr;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_bIsConnected=false;
- DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
}
// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
@@ -1236,7 +1234,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
RecordBatch* b,
DrillClientError* err) {
//ctx; // unused, we already have the this pointer
- DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
//check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
if(this->m_bCancel){
if(b!=NULL) delete b;
@@ -1247,8 +1245,8 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
{
if(b!=NULL){
#ifdef DEBUG
- DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
- << "Query result listener saved result to queue." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
+ << "Query result listener saved result to queue." << std::endl;)
#endif
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
this->m_recordBatches.push(b);
@@ -1267,7 +1265,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
- DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
@@ -1281,14 +1279,14 @@ RecordBatch* DrillClientQueryResult::getNext() {
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending){
- DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;)
if(!m_recordBatches.empty()){
- DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;)
}
return NULL;
}
- DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
this->m_cv.wait(cvLock);
}
@@ -1367,7 +1365,7 @@ void DrillClientQueryResult::clearAndDestroy(){
m_columnDefs->clear();
}
if(this->m_pQueryId!=NULL){
- DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
}
//Tell the parent to remove this from its lists
m_pClient->clearMapEntries(this);
@@ -1379,7 +1377,7 @@ void DrillClientQueryResult::clearAndDestroy(){
if(!m_recordBatches.empty()){
// When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
// the last chunk has been received. We eventually delete it.
- DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
RecordBatch* pR=NULL;
while(!m_recordBatches.empty()){
pR=m_recordBatches.front();
@@ -1392,6 +1390,210 @@ void DrillClientQueryResult::clearAndDestroy(){
}
}
+
+connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
+ connectionStatus_t stat = CONN_SUCCESS;
+ std::string pathToDrill, protocol, hostPortStr;
+ std::string host;
+ std::string port;
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(!strcmp(protocol.c_str(), "zk")){
+ // Get a list of drillbits
+ ZookeeperImpl zook;
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ // The original shuffled order is maintained if we shuffle first and then add any missing elements
+ Utils::add(m_drillbits, drillbits);
+ exec::DrillbitEndpoint e;
+ size_t nextIndex=0;
+ {
+ boost::lock_guard<boost::mutex> cLock(m_cMutex);
+ m_lastConnection++;
+ nextIndex = (m_lastConnection)%(getDrillbitCount());
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
+ << "(" << (void*)this << ")"
+ << ": Current counter is: "
+ << m_lastConnection << std::endl;)
+ err=zook.getEndPoint(m_drillbits, nextIndex, e);
+ if(!err){
+ host=boost::lexical_cast<std::string>(e.address());
+ port=boost::lexical_cast<std::string>(e.user_port());
+ }
+ }
+ if(err){
+ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ zook.close();
+ m_bIsDirectConnection=false;
+ }else if(!strcmp(protocol.c_str(), "local")){
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
+ host=strtok(tempStr, ":");
+ port=strtok(NULL, "");
+ m_bIsDirectConnection=true;
+ }else{
+ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+ }
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
+ DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
+ stat = pDrillClientImpl->connect(host.c_str(), port.c_str());
+ if(stat == CONN_SUCCESS){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ m_clientConnections.push_back(pDrillClientImpl);
+ }else{
+ DrillClientError* pErr = pDrillClientImpl->getError();
+ handleConnError((connectionStatus_t)pErr->status, pErr->msg);
+ delete pDrillClientImpl;
+ }
+ return stat;
+}
+
+connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
+ // Assume there is one valid connection to at least one drillbit
+ connectionStatus_t stat=CONN_FAILURE;
+ // Keep a copy of the user properties
+ if(props!=NULL){
+ m_pUserProperties = new DrillUserProperties;
+ for(size_t i=0; i<props->size(); i++){
+ m_pUserProperties->setProperty(
+ props->keyAt(i),
+ props->valueAt(i)
+ );
+ }
+ }
+ DrillClientImpl* pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
+ stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ }
+ else{
+ stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
+ }
+ return stat;
+}
+
+DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
+ DrillClientQueryResult* pDrillClientQueryResult = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
+ // Nothing to do. If this class ever keeps track of executing queries then it will need
+ // to implement this call to free any query specific resources the pool might have
+ // allocated
+ return;
+}
+
+bool PooledDrillClientImpl::Active(){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->Active()){
+ return true;
+ }
+ }
+ return false;
+}
+
+void PooledDrillClientImpl::Close() {
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->Close();
+ delete *it;
+ }
+ m_clientConnections.clear();
+ if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_lastConnection=-1;
+ m_queriesExecuted=0;
+}
+
+DrillClientError* PooledDrillClientImpl::getError(){
+ std::string errMsg;
+ std::string nl="";
+ uint32_t stat;
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ if((*it)->getError() != NULL){
+ errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg;
+ stat=(*it)->getError()->status;
+ }
+ }
+ if(errMsg.length()>0){
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; }
+ m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg);
+ }
+ return m_pError;
+}
+
+//Waits as long as any one drillbit connection has results pending
+void PooledDrillClientImpl::waitForResults(){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ (*it)->waitForResults();
+ }
+ return;
+}
+
+connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
+ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;)
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_pError=pErr;
+ return status;
+}
+
+DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
+ DrillClientImpl* pDrillClientImpl = NULL;
+ while(pDrillClientImpl==NULL){
+ if(m_queriesExecuted == 0){
+ // First query ever sent can use the connection already established to authenticate the user
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed
+ }else if(m_clientConnections.size() == m_maxConcurrentConnections){
+ // Pool is full. Use one of the already established connections
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections];
+ if(!pDrillClientImpl->Active()){
+ Utils::eraseRemove(m_clientConnections, pDrillClientImpl);
+ pDrillClientImpl=NULL;
+ }
+ }else{
+ int tries=0;
+ connectionStatus_t ret=CONN_SUCCESS;
+ while(pDrillClientImpl==NULL && tries++ < 3){
+ if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
+ boost::lock_guard<boost::mutex> lock(m_poolMutex);
+ pDrillClientImpl=m_clientConnections.back();
+ ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ if(ret!=CONN_SUCCESS){
+ delete pDrillClientImpl; pDrillClientImpl=NULL;
+ m_clientConnections.erase(m_clientConnections.end());
+ }
+ }
+ } // try a few times
+ if(ret!=CONN_SUCCESS){
+ break;
+ }
+ } // need a new connection
+ }// while
+
+ if(pDrillClientImpl==NULL){
+ connectionStatus_t status = CONN_NOTCONNECTED;
+ handleConnError(status, getMessage(status));
+ }
+ return pDrillClientImpl;
+}
+
char ZookeeperImpl::s_drillRoot[]="/drill/";
char ZookeeperImpl::s_defaultCluster[]="drillbits1";
@@ -1427,6 +1629,96 @@ ZooLogLevel ZookeeperImpl::getZkLogLevel(){
return ZOO_LOG_LEVEL_ERROR;
}
+int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){
+ uint32_t waitTime=30000; // 10 seconds
+ zoo_set_debug_level(getZkLogLevel());
+ zoo_deterministic_conn_order(1); // enable deterministic order
+ struct String_vector* pDrillbits=NULL;
+ m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
+ if(!m_zh) {
+ m_err = getMessage(ERR_CONN_ZKFAIL);
+ zookeeper_close(m_zh);
+ return -1;
+ }else{
+ m_err="";
+ //Wait for the completion handler to signal successful connection
+ boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+ boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
+ while(this->m_bConnecting) {
+ if(!this->m_cv.timed_wait(bufferLock, timeout)){
+ m_err = getMessage(ERR_CONN_ZKTIMOUT);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ }
+ }
+ if(m_state!=ZOO_CONNECTED_STATE){
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ int rc = ZOK;
+ if(pathToDrill==NULL || strlen(pathToDrill)==0){
+ m_rootDir=s_drillRoot;
+ m_rootDir += s_defaultCluster;
+ }else{
+ m_rootDir=pathToDrill;
+ }
+
+ pDrillbits = new String_vector;
+ rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
+ if(rc!=ZOK){
+ delete pDrillbits;
+ m_err=getMessage(ERR_CONN_ZKERR, rc);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ if(pDrillbits && pDrillbits->count > 0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster ("
+ << connectStr << "/" << pathToDrill
+ << ")." <<std::endl;)
+ for(int i=0; i<pDrillbits->count; i++){
+ drillbits.push_back(pDrillbits->data[i]);
+ }
+ for(int i=0; i<drillbits.size(); i++){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
+ }
+ }
+ delete pDrillbits;
+ return 0;
+}
+
+int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){
+ int rc = ZOK;
+ exec::DrillServiceInstance drillServiceInstance;
+ if( drillbits.size() >0){
+ // pick the drillbit at 'index'
+ const char * bit=drillbits[index].c_str();
+ std::string s;
+ s=m_rootDir + std::string("/") + bit;
+ int buffer_len=MAX_CONNECT_STR;
+ char buffer[MAX_CONNECT_STR+1];
+ struct Stat stat;
+ buffer[MAX_CONNECT_STR]=0;
+ rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat);
+ if(rc!=ZOK){
+ m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ exec::DrillServiceInstance drillServiceInstance;
+ drillServiceInstance.ParseFromArray(buffer, buffer_len);
+ endpoint=drillServiceInstance.endpoint();
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;)
+ }else{
+
+ m_err=getMessage(ERR_CONN_ZKNODBIT);
+ zookeeper_close(m_zh);
+ return -1;
+ }
+ return 0;
+}
+
+// Deprecated
int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
uint32_t waitTime=30000; // 10 seconds
zoo_set_debug_level(getZkLogLevel());
@@ -1525,7 +1817,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
// signal the cond var
{
if (state == ZOO_CONNECTED_STATE){
- DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
}
boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
self->m_bConnecting=false;
@@ -1535,7 +1827,7 @@ void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *pat
void ZookeeperImpl:: debugPrint(){
if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
- DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;)
}
}