aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp1315
1 files changed, 777 insertions, 538 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index b5d5a31e7..7ecf910f9 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -19,32 +19,30 @@
#include "drill/common.hpp"
#include <queue>
-#include <string.h>
+#include <string>
#include <boost/asio.hpp>
+#include <boost/assign.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/functional/factory.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
-#ifdef _WIN32
-#include <zookeeper.h>
-#else
-#include <zookeeper/zookeeper.h>
-#endif
-#include <boost/assign.hpp>
+
#include "drill/drillClient.hpp"
+#include "drill/fieldmeta.hpp"
#include "drill/recordBatch.hpp"
#include "drillClientImpl.hpp"
+#include "collectionsImpl.hpp"
#include "errmsgs.hpp"
#include "logger.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
+#include "metadata.hpp"
#include "rpcMessage.hpp"
#include "utils.hpp"
-
#include "GeneralRPC.pb.h"
#include "UserBitShared.pb.h"
+#include "zookeeperClient.hpp"
namespace Drill{
@@ -56,70 +54,57 @@ static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_ST
(exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED)
;
-RpcEncoder DrillClientImpl::s_encoder;
-RpcDecoder DrillClientImpl::s_decoder;
-
-std::string debugPrintQid(const exec::shared::QueryId& qid){
+static std::string debugPrintQid(const exec::shared::QueryId& qid){
return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
}
-void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){
-#if defined _WIN32
- int32_t timeoutMsecs=timeout*1000;
- setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
- setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs));
-#else
- struct timeval tv;
- tv.tv_sec = timeout;
- tv.tv_usec = 0;
- int e=0;
- e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
- e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
-#endif
-}
-
connectionStatus_t DrillClientImpl::connect(const char* connStr){
std::string pathToDrill, protocol, hostPortStr;
std::string host;
std::string port;
- if(!this->m_bIsConnected){
- m_connectStr=connStr;
- Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
- if(!strcmp(protocol.c_str(), "zk")){
- ZookeeperImpl zook;
- std::vector<std::string> drillbits;
- int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+
+ if (this->m_bIsConnected) {
+ if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected
+ return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
+ }
+ return CONN_SUCCESS;
+ }
+
+ m_connectStr=connStr;
+ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
+ if(protocol == "zk"){
+ ZookeeperClient zook(pathToDrill);
+ std::vector<std::string> drillbits;
+ int err = zook.getAllDrillbits(hostPortStr, drillbits);
+ if(!err){
+ Utils::shuffle(drillbits);
+ exec::DrillbitEndpoint endpoint;
+ err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list
if(!err){
- Utils::shuffle(drillbits);
- exec::DrillbitEndpoint endpoint;
- err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list
- if(!err){
- host=boost::lexical_cast<std::string>(endpoint.address());
- port=boost::lexical_cast<std::string>(endpoint.user_port());
- }
+ host=boost::lexical_cast<std::string>(endpoint.address());
+ port=boost::lexical_cast<std::string>(endpoint.user_port());
}
- if(err){
- return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
- }
- zook.close();
- m_bIsDirectConnection=true;
- }else if(!strcmp(protocol.c_str(), "local")){
- boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
- char tempStr[MAX_CONNECT_STR+1];
- strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
- host=strtok(tempStr, ":");
- port=strtok(NULL, "");
- m_bIsDirectConnection=false;
- }else{
- return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;)
+
}
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
- connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
- return ret;
- }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected
- return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
+ if(err){
+ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
+ }
+ zook.close();
+ m_bIsDirectConnection=true;
+ }else if(protocol == "local"){
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant
+ char tempStr[MAX_CONNECT_STR+1];
+ strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
+ host=strtok(tempStr, ":");
+ port=strtok(NULL, "");
+ m_bIsDirectConnection=false;
+ }else{
+ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
}
- return CONN_SUCCESS;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;)
+ connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
+ return ret;
}
connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
@@ -140,7 +125,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str()));
}
- }catch(std::exception e){
+ }catch(const std::exception & e){
// Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve
if (!strcmp(e.what(), "resolve")) {
return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what()));
@@ -152,7 +137,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
// set socket keep alive
boost::asio::socket_base::keep_alive keepAlive(true);
m_socket.set_option(keepAlive);
- // set no_delay
+ // set no_delay
boost::asio::ip::tcp::no_delay noDelay(true);
m_socket.set_option(noDelay);
@@ -160,7 +145,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port;
m_connectedHost = connectedHost.str();
DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;)
-
+
return CONN_SUCCESS;
}
@@ -180,7 +165,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
connectionStatus_t status=CONN_SUCCESS;
exec::rpc::Ack ack;
ack.set_ok(true);
- OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
+ rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
@@ -203,7 +188,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
<< to_simple_string(m_heartbeatTimer.expires_at())
<< " and time now is: "
<< to_simple_string(boost::asio::deadline_timer::traits_type::now())
@@ -231,8 +216,8 @@ void DrillClientImpl::Close() {
}
-connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
- DrillClientImpl::s_encoder.Encode(m_wbuf, msg);
+connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){
+ encode(m_wbuf, msg);
boost::system::error_code ec;
size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
if(!ec && s!=0){
@@ -292,9 +277,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
m_deadlineTimer.cancel();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
if(!error){
- InBoundRpcMessage msg;
+ rpc::InBoundRpcMessage msg;
uint32_t length = 0;
- int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length);
+ std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length);
if(length>0){
size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN;
@@ -309,7 +294,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
}
- DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg);
+ if (!decode(m_rbuf+bytes_read, length, msg)) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";)
+ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake"));
+ return;
+ }
}else{
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
@@ -321,6 +310,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf,
this->m_handshakeStatus=b2u.status();
this->m_handshakeErrorId=b2u.errorid();
this->m_handshakeErrorMsg=b2u.errormessage();
+ this->m_serverInfos = b2u.server_infos();
}else{
// boost error
@@ -362,6 +352,14 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
u2b.set_support_listening(true);
u2b.set_support_timeout(true);
+ // Adding version info
+ exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
+ infos->set_name(DRILL_CONNECTOR_NAME);
+ infos->set_version(DRILL_VERSION_STRING);
+ infos->set_majorversion(DRILL_VERSION_MAJOR);
+ infos->set_minorversion(DRILL_VERSION_MINOR);
+ infos->set_patchversion(DRILL_VERSION_PATCH);
+
if(properties != NULL && properties->size()>0){
std::string username;
std::string err;
@@ -374,7 +372,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
for(size_t i=0; i<properties->size(); i++){
std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i));
if(it==DrillUserProperties::USER_PROPERTIES.end()){
- DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
+ DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i)
<< ") is unknown and is being skipped" << std::endl;)
continue;
}
@@ -402,7 +400,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
- OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
+ rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
sendSync(out_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
}
@@ -469,38 +467,159 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
query.set_type(t);
query.set_plan(plan);
- uint64_t coordId;
- DrillClientQueryResult* pQuery=NULL;
+ boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientQueryResult*>(),
+ boost::ref(*this),
+ _1,
+ boost::cref(plan),
+ l,
+ lCtx);
+ return sendMsg(factory, ::exec::user::RUN_QUERY, query);
+}
+
+DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan,
+ pfnPreparedStatementListener l,
+ void* lCtx){
+ exec::user::CreatePreparedStatementReq query;
+ query.set_sql_query(plan);
+
+ boost::function<DrillClientPrepareHandle*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientPrepareHandle*>(),
+ boost::ref(*this),
+ _1,
+ boost::cref(plan),
+ l,
+ lCtx);
+ return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query);
+}
+
+DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt,
+ pfnQueryResultsListener l,
+ void* lCtx){
+ const DrillClientPrepareHandle& handle = static_cast<const DrillClientPrepareHandle&>(pstmt);
+
+ exec::user::RunQuery query;
+ query.set_results_mode(exec::user::STREAM_FULL);
+ query.set_type(::exec::shared::PREPARED_STATEMENT);
+ query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle));
+
+ boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientQueryResult*>(),
+ boost::ref(*this),
+ _1,
+ boost::cref(handle.m_query),
+ l,
+ lCtx);
+ return sendMsg(factory, ::exec::user::RUN_QUERY, query);
+}
+
+DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern,
+ Metadata::pfnCatalogMetadataListener listener,
+ void* listenerCtx) {
+ exec::user::GetCatalogsReq query;
+ exec::user::LikeFilter* catalogFilter(query.mutable_catalog_name_filter());
+ catalogFilter->set_pattern(catalogPattern);
+
+ boost::function<DrillClientCatalogResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientCatalogResult*>(),
+ boost::ref(*this),
+ _1,
+ listener,
+ listenerCtx);
+ return sendMsg(factory, ::exec::user::GET_CATALOGS, query);
+}
+
+DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern,
+ const std::string& schemaPattern,
+ Metadata::pfnSchemaMetadataListener listener,
+ void* listenerCtx) {
+ exec::user::GetSchemasReq query;
+ query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+ query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+
+ boost::function<DrillClientSchemaResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientSchemaResult*>(),
+ boost::ref(*this),
+ _1,
+ listener,
+ listenerCtx);
+ return sendMsg(factory, ::exec::user::GET_SCHEMAS, query);
+}
+
+DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern,
+ const std::string& schemaPattern,
+ const std::string& tablePattern,
+ const std::vector<std::string>* tableTypes,
+ Metadata::pfnTableMetadataListener listener,
+ void* listenerCtx) {
+ exec::user::GetTablesReq query;
+ query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+ query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+ query.mutable_table_name_filter()->set_pattern(tablePattern);
+ if (tableTypes) {
+ std::copy(tableTypes->begin(), tableTypes->end(),
+ google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter()));
+ }
+
+ boost::function<DrillClientTableResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientTableResult*>(),
+ boost::ref(*this),
+ _1,
+ listener,
+ listenerCtx);
+ return sendMsg(factory, ::exec::user::GET_TABLES, query);
+}
+
+DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern,
+ const std::string& schemaPattern,
+ const std::string& tablePattern,
+ const std::string& columnsPattern,
+ Metadata::pfnColumnMetadataListener listener,
+ void* listenerCtx) {
+ exec::user::GetColumnsReq query;
+ query.mutable_catalog_name_filter()->set_pattern(catalogPattern);
+ query.mutable_schema_name_filter()->set_pattern(schemaPattern);
+ query.mutable_table_name_filter()->set_pattern(tablePattern);
+ query.mutable_column_name_filter()->set_pattern(columnsPattern);
+
+ boost::function<DrillClientColumnResult*(int32_t)> factory = boost::bind(
+ boost::factory<DrillClientColumnResult*>(),
+ boost::ref(*this),
+ _1,
+ listener,
+ listenerCtx);
+ return sendMsg(factory, ::exec::user::GET_COLUMNS, query);
+}
+
+template<typename Handle>
+Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) {
+ int32_t coordId;
+ Handle* phandle=NULL;
connectionStatus_t cStatus=CONN_SUCCESS;
{
boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex);
coordId = this->getNextCoordinationId();
- OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
+ rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message);
- // Create the result object and register the listener before we send the query
- // because sometimes the caller is not checking the status of the submitQuery call.
- // This way, the broadcast error call will cause the results listener to be called
- // with a COMM_ERROR status.
- pQuery = new DrillClientQueryResult(this, coordId, plan);
- pQuery->registerListener(l, lCtx);
- this->m_queryIds[coordId]=pQuery;
+ phandle = handleFactory(coordId);
+ this->m_queryHandles[coordId]=phandle;
connectionStatus_t cStatus=sendSync(out_msg);
if(cStatus == CONN_SUCCESS){
bool sendRequest=false;
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
if(m_pendingRequests++==0){
sendRequest=true;
}else{
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
}
if(sendRequest){
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = "
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = "
<< m_pendingRequests << std::endl;)
getNextResult(); // async wait for results
}
@@ -508,21 +627,18 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
}
if(cStatus!=CONN_SUCCESS){
- this->m_queryIds.erase(coordId);
- delete pQuery;
+ this->m_queryHandles.erase(coordId);
+ delete phandle;
return NULL;
}
-
-
//run this in a new thread
startMessageListener();
- return pQuery;
+ return phandle;
}
void DrillClientImpl::getNextResult(){
-
// This call is always made from within a function where the mutex has already been acquired
//boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
@@ -533,7 +649,7 @@ void DrillClientImpl::getNextResult(){
AllocatedBuffer::s_memCV.wait(memLock);
}
}
-
+
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
if (DrillClientConfig::getQueryTimeout() > 0){
@@ -577,8 +693,7 @@ void DrillClientImpl::waitForResults(){
status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
AllocatedBufferPtr* allocatedBuffer,
- InBoundRpcMessage& msg,
- boost::system::error_code& error){
+ rpc::InBoundRpcMessage& msg){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
<< reinterpret_cast<int*>(_buf) << std::endl;)
@@ -590,7 +705,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
// We need to protect the readLength and read buffer, and the pending requests counter,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen);
+ std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;)
@@ -612,7 +727,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
<< (rmsgLen - leftover) << std::endl;)
ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
-
+ boost::system::error_code error;
while(1){
size_t dataBytesRead=this->m_socket.read_some(
boost::asio::buffer(b, bytesToRead),
@@ -623,10 +738,14 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
bytesToRead-=dataBytesRead;
b+=dataBytesRead;
}
-
+
if(!error){
// read data successfully
- DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
+ if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) {
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+ return handleQryError(QRY_COMM_ERROR,
+ getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);;
+ }
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;)
}else{
Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
@@ -645,7 +764,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
return QRY_SUCCESS;
}
-status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
exec::shared::QueryId qid;
@@ -657,15 +776,15 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
-
+
qid.CopyFrom(qr.query_id());
-
+
if (qr.has_query_state() &&
qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) {
pDrillClientQueryResult=findQueryResult(qid);
- //Queries that have been cancelled or whose resources are freed before completion
- //do not have a DrillClientQueryResult object. We need not handle the terminal message
+ //Queries that have been cancelled or whose resources are freed before completion
+ //do not have a DrillClientQueryResult object. We need not handle the terminal message
//in that case since all it does is to free resources (and they have already been freed)
if(pDrillClientQueryResult!=NULL){
//Validate the RPC message
@@ -703,10 +822,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
return ret;
}
-status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
- exec::shared::QueryId qid;
+ ::exec::shared::QueryId qid;
// Be a good client and send ack as early as possible.
// Drillbit pushed the query result to the client, the client should send ack
// whenever it receives the message
@@ -720,7 +839,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
- qid.CopyFrom(qr->query_id());
+ qid = ::exec::shared::QueryId(qr->query_id());
if(qid.part1()==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
delete allocatedBuffer;
@@ -729,13 +848,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
pDrillClientQueryResult=findQueryResult(qid);
if(pDrillClientQueryResult==NULL){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
<< debugPrintQid(qid) << ")." << std::endl;)
delete qr;
delete allocatedBuffer;
return ret;
}
-
+
//Validate the RPC message
std::string valErr;
if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
@@ -765,20 +884,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
}
pDrillClientQueryResult->setIsQueryPending(true);
- pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
if(pDrillClientQueryResult->m_bIsLastChunk){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
<< "Received last batch. " << std::endl;)
ret=QRY_NO_MORE_DATA;
}
pDrillClientQueryResult->setQueryStatus(ret);
- if(pResultsListener!=NULL){
- ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL);
- }else{
- //Use a default callback that is called when a record batch is received
- ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult,
- pRecordBatch, NULL);
- }
+ ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
} // release lock
if(ret==QRY_FAILURE){
sendCancel(&qid);
@@ -787,31 +899,37 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
pDrillClientQueryResult->setIsQueryPending(false);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
pDrillClientQueryResult->setQueryStatus(ret);
- clearMapEntries(pDrillClientQueryResult);
+ removeQueryHandle(pDrillClientQueryResult);
+ removeQueryResult(pDrillClientQueryResult);
return ret;
}
return ret;
}
-status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
- DrillClientQueryResult* pDrillClientQueryResult=NULL;
+status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
- std::map<int,DrillClientQueryResult*>::iterator it;
- for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
- std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+ for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
+ DrillClientQueryResult* pDrillClientQueryResult=it->second;
+ std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL");
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId
<< " QueryId: "<< qidString << std::endl;)
}
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
- it=this->m_queryIds.find(msg.m_coord_id);
- if(it!=this->m_queryIds.end()){
- pDrillClientQueryResult=(*it).second;
+ std::map<int, DrillClientQueryHandle*>::const_iterator it;
+ it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
+ if (!pDrillClientQueryResult) {
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
exec::shared::QueryId *qid = new exec::shared::QueryId;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
@@ -820,14 +938,241 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
}else{
- delete allocatedBuffer;
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
- delete allocatedBuffer;
return ret;
}
-DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){
+status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;)
+ status_t ret=QRY_SUCCESS;
+
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+ std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second);
+ exec::user::CreatePreparedStatementResp resp;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;)
+ if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) {
+ return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle);
+ }
+ if (resp.has_status() && resp.status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle);
+ }
+ pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement());
+ pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;)
+ }else{
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ m_pendingRequests--;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;)
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
+ return ret;
+}
+
+status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;)
+ status_t ret=QRY_SUCCESS;
+
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+ std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second);
+ exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp;
+ pHandle->attachMetadataResult(resp);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;)
+ if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+ return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle);
+ }
+ if (resp->status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp->error(), pHandle);
+ }
+
+ const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs();
+ pHandle->m_meta.clear();
+ pHandle->m_meta.reserve(resp->catalogs_size());
+
+ for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) {
+ meta::DrillCatalogMetadata meta(*it);
+ pHandle->m_meta.push_back(meta);
+ }
+ pHandle->notifyListener(&pHandle->m_meta, NULL);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result - " << resp->catalogs_size() << " catalog(s)" << std::endl;)
+ }else{
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ m_pendingRequests--;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;)
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
+ return ret;
+}
+
+status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;)
+ status_t ret=QRY_SUCCESS;
+
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+ std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second);
+ exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp();
+ pHandle->attachMetadataResult(resp);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;)
+ if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+ return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle);
+ }
+ if (resp->status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp->error(), pHandle);
+ }
+
+ const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas();
+ pHandle->m_meta.clear();
+ pHandle->m_meta.reserve(resp->schemas_size());
+
+ for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) {
+ meta::DrillSchemaMetadata meta(*it);
+ pHandle->m_meta.push_back(meta);
+ }
+ pHandle->notifyListener(&pHandle->m_meta, NULL);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;)
+ }else{
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ m_pendingRequests--;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;)
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
+ return ret;
+}
+
+status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;)
+ status_t ret=QRY_SUCCESS;
+
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+ std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second);
+ exec::user::GetTablesResp* resp = new exec::user::GetTablesResp();
+ pHandle->attachMetadataResult(resp);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;)
+ if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+ return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle);
+ }
+ if (resp->status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp->error(), pHandle);
+ }
+ const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables();
+ pHandle->m_meta.clear();
+ pHandle->m_meta.reserve(resp->tables_size());
+
+ for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) {
+ meta::DrillTableMetadata meta(*it);
+ pHandle->m_meta.push_back(meta);
+ }
+ pHandle->notifyListener(&pHandle->m_meta, NULL);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;)
+ }else{
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ m_pendingRequests--;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;)
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
+ return ret;
+}
+
+status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;)
+ status_t ret=QRY_SUCCESS;
+
+ // make sure to deallocate buffer
+ boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+ std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it!=this->m_queryHandles.end()){
+ DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second);
+ exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp();
+ pHandle->attachMetadataResult(resp);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;)
+ if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
+ return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle);
+ }
+ if (resp->status() != exec::user::OK) {
+ return handleQryError(QRY_FAILED, resp->error(), pHandle);
+ }
+ const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns();
+ pHandle->m_meta.clear();
+ pHandle->m_meta.reserve(resp->columns_size());
+
+ for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
+ meta::DrillColumnMetadata meta(*it);
+ pHandle->m_meta.push_back(meta);
+ }
+ pHandle->notifyListener(&pHandle->m_meta, NULL);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;)
+ }else{
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ m_pendingRequests--;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;)
+ if(m_pendingRequests==0){
+ // signal any waiting client that it can exit because there are no more any query results to arrive.
+ // We keep the heartbeat going though.
+ m_cv.notify_one();
+ }
+ return ret;
+}
+
+DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
@@ -838,7 +1183,7 @@ DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId&
<< it->first->part2() << "]\n";)
}
}
- it=this->m_queryResults.find(&qid);
+ it=this->m_queryResults.find(const_cast<exec::shared::QueryId * const>(&qid));
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
@@ -925,9 +1270,8 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
}
void DrillClientImpl::handleRead(ByteBuf_t _buf,
- const boost::system::error_code& err,
+ const boost::system::error_code& error,
size_t bytes_transferred) {
- boost::system::error_code error=err;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
<< reinterpret_cast<int*>(_buf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
@@ -935,120 +1279,153 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
m_deadlineTimer.cancel();
}
- if(!error){
- InBoundRpcMessage msg;
- boost::lock_guard<boost::mutex> lock(this->m_prMutex);
+ if (error) {
+ // boost error
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
+ boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
+ "Boost Communication Error: " << error.message() << std::endl;)
+ handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+ return;
+ }
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
- AllocatedBufferPtr allocatedBuffer=NULL;
+ rpc::InBoundRpcMessage msg;
+ boost::lock_guard<boost::mutex> lockPR(this->m_prMutex);
- if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }
- return;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
+ AllocatedBufferPtr allocatedBuffer=NULL;
+
+ if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){
+ delete allocatedBuffer;
+ if(m_pendingRequests!=0){
+ boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ getNextResult();
}
+ return;
+ }
- if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away
- m_pendingRequests--;
+ if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away
+ m_pendingRequests--;
+ delete allocatedBuffer;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
+ if(m_pendingRequests!=0){
+ boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ getNextResult();
+ }else{
+ boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
+ m_cv.notify_one();
+ }
+
+ return;
+ }
+
+ if(msg.m_mode == exec::rpc::RESPONSE) {
+ status_t s;
+ switch(msg.m_rpc_type) {
+ case exec::user::QUERY_HANDLE:
+ s = processQueryId(allocatedBuffer, msg);
+ break;
+
+ case exec::user::PREPARED_STATEMENT:
+ s = processPreparedStatement(allocatedBuffer, msg);
+ break;
+
+ case exec::user::CATALOGS:
+ s = processCatalogsResult(allocatedBuffer, msg);
+ break;
+
+ case exec::user::SCHEMAS:
+ s = processSchemasResult(allocatedBuffer, msg);
+ break;
+
+ case exec::user::TABLES:
+ s = processTablesResult(allocatedBuffer, msg);
+ break;
+
+ case exec::user::COLUMNS:
+ s = processColumnsResult(allocatedBuffer, msg);
+ break;
+
+ case exec::user::HANDSHAKE:
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
delete allocatedBuffer;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }else{
- boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
- m_cv.notify_one();
- }
- return;
- }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){
- status_t s = processQueryResult(allocatedBuffer, msg);
- if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }
- return;
- }
- }else if(!error && msg.m_rpc_type==exec::user::QUERY_DATA){
- if(processQueryData(allocatedBuffer, msg)!=QRY_SUCCESS){
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }
- return;
- }
- }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){
- if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }
- return;
- }
- }else if(!error && msg.m_rpc_type==exec::user::ACK){
+ break;
+
+ case exec::user::ACK:
// Cancel requests will result in an ACK sent back.
// Consume silently
+ s = QRY_CANCELED;
delete allocatedBuffer;
- if(m_pendingRequests!=0){
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- getNextResult();
- }
- return;
- }else{
+ break;
+
+ default:
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << std::endl;)
+ delete allocatedBuffer;
+ handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+ }
+
+ if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- if(error){
- // We have a socket read error, but we do not know which query this is for.
- // Signal ALL pending queries that they should stop waiting.
- delete allocatedBuffer;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;)
- handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
- return;
- }else{
- // If not QUERY_RESULT, then we think something serious has gone wrong?
- // In one case when the client hung, we observed that the server was sending a handshake request to the client
- // We should properly handle these handshake requests/responses
- if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){
- if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
- exec::user::UserToBitHandshake u2b;
- u2b.set_channel(exec::shared::USER);
- u2b.set_rpc_version(DRILL_RPC_VERSION);
- u2b.set_support_listening(true);
- OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
- sendSync(out_msg);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
- }else{
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
- }
- }else{
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
- << "QueryResult returned " << msg.m_rpc_type << std::endl;)
- handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
- }
- delete allocatedBuffer;
- return;
+ getNextResult();
+ }
+
+ return;
+ }
+
+ if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) {
+ status_t s;
+ switch(msg.m_rpc_type) {
+ case exec::user::QUERY_RESULT:
+ s = processQueryResult(allocatedBuffer, msg);
+ break;
+
+ case exec::user::QUERY_DATA:
+ s = processQueryData(allocatedBuffer, msg);
+ break;
+
+ case exec::user::HANDSHAKE:
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
+ delete allocatedBuffer;
+ // In one case when the client hung, we observed that the server was sending a handshake request to the client
+ // We should properly handle these handshake requests/responses
+ {
+ boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex);
+ exec::user::UserToBitHandshake u2b;
+ u2b.set_channel(exec::shared::USER);
+ u2b.set_rpc_version(DRILL_RPC_VERSION);
+ u2b.set_support_listening(true);
+ rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
+ sendSync(out_msg);
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
}
+ break;
+
+ default:
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << std::endl;)
+ delete allocatedBuffer;
+ handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
- {
+
+ if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
- }else{
- // boost error
- Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
- boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
- "Boost Communication Error: " << error.message() << std::endl;)
- handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
+
return;
}
- return;
+
+ // If not QUERY_RESULT, then we think something serious has gone wrong?
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
+ << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;)
+ handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
+ delete allocatedBuffer;
+
}
-status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valErr){
+status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){
if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
valErr=getMessage(ERR_QRY_RESPFAIL);
return QRY_FAILURE;
@@ -1060,7 +1437,7 @@ status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shar
return QRY_SUCCESS;
}
-status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){
+status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){
if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
valErr=getMessage(ERR_QRY_RESPFAIL);
return QRY_FAILURE;
@@ -1072,10 +1449,10 @@ status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::sh
return QRY_SUCCESS;
}
-connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
+connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
m_pendingRequests=0;
- if(!m_queryIds.empty()){
+ if(!m_queryHandles.empty()){
// set query error only if queries are running
broadcastError(pErr);
}else{
@@ -1086,12 +1463,12 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s
return status;
}
-status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
+status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
// set query error only if queries are running
- if(pQueryResult!=NULL){
+ if(pQueryHandle!=NULL){
m_pendingRequests--;
- pQueryResult->signalError(pErr);
+ pQueryHandle->signalError(pErr);
}else{
m_pendingRequests=0;
broadcastError(pErr);
@@ -1101,27 +1478,27 @@ status_t DrillClientImpl::handleQryError(status_t status, std::string msg, Drill
status_t DrillClientImpl::handleQryError(status_t status,
const exec::shared::DrillPBError& e,
- DrillClientQueryResult* pQueryResult){
- assert(pQueryResult!=NULL);
+ DrillClientQueryHandle* pQueryHandle){
+ assert(pQueryHandle!=NULL);
DrillClientError* pErr = DrillClientError::getErrorObject(e);
- pQueryResult->signalError(pErr);
+ pQueryHandle->signalError(pErr);
m_pendingRequests--;
return status;
}
void DrillClientImpl::broadcastError(DrillClientError* pErr){
if(pErr!=NULL){
- std::map<int, DrillClientQueryResult*>::iterator iter;
- if(!m_queryIds.empty()){
- for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) {
+ std::map<int, DrillClientQueryHandle*>::const_iterator iter;
+ if(!m_queryHandles.empty()){
+ for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) {
DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg);
iter->second->signalError(err);
}
}
delete pErr;
}
- // We have an error at the connection level. Cancel the heartbeat.
- // And close the connection
+ // We have an error at the connection level. Cancel the heartbeat.
+ // And close the connection
m_heartbeatTimer.cancel();
m_pendingRequests=0;
m_cv.notify_one();
@@ -1132,7 +1509,7 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){
// The implementation is similar to handleQryError
status_t DrillClientImpl::handleTerminatedQryState(
status_t status,
- std::string msg,
+ const std::string& msg,
DrillClientQueryResult* pQueryResult){
assert(pQueryResult!=NULL);
if(status==QRY_COMPLETED){
@@ -1145,21 +1522,22 @@ status_t DrillClientImpl::handleTerminatedQryState(
return status;
}
-
-void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
- std::map<int, DrillClientQueryResult*>::iterator iter;
+void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){
boost::lock_guard<boost::mutex> lock(m_dcMutex);
- if(!m_queryIds.empty()){
- for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) {
- if(pQueryResult==(DrillClientQueryResult*)iter->second){
- m_queryIds.erase(iter->first);
+ if(!m_queryHandles.empty()){
+ for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) {
+ if(pQueryHandle==(DrillClientQueryHandle*)iter->second){
+ m_queryHandles.erase(iter->first);
break;
}
}
}
+}
+
+void DrillClientImpl::removeQueryResult(DrillClientQueryResult* pQueryResult){
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(!m_queryResults.empty()){
- std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
- for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
+ for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
if(pQueryResult==(DrillClientQueryResult*)it->second){
m_queryResults.erase(it->first);
break;
@@ -1168,19 +1546,19 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
}
}
-void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){
+void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){
exec::rpc::Ack ack;
ack.set_ok(isOk);
- OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
+ rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSync(ack_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
}
-void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){
+void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
boost::lock_guard<boost::mutex> lock(m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
- OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
+ rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
sendSync(cancel_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
}
@@ -1193,6 +1571,14 @@ void DrillClientImpl::shutdownSocket(){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
}
+meta::DrillMetadata* DrillClientImpl::getMetadata() {
+ return new meta::DrillMetadata(*this);
+}
+
+void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
+ delete metadata;
+}
+
// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
// class are used by the async callbacks.
status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {
@@ -1254,7 +1640,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
//ctx; // unused, we already have the this pointer
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
//check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
- if(this->m_bCancel){
+ if(this->isCancelled()){
if(b!=NULL) delete b;
return QRY_FAILURE;
}
@@ -1284,7 +1670,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
- while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
+ while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
// READ but not remove first element from queue
@@ -1305,7 +1691,7 @@ RecordBatch* DrillClientQueryResult::getNext() {
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
- while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){
+ while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){
this->m_cv.wait(cvLock);
}
// remove first element from queue
@@ -1322,33 +1708,60 @@ void DrillClientQueryResult::waitForData() {
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending) return;
- while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) {
+ while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
}
-void DrillClientQueryResult::cancel() {
+template<typename Listener, typename Value>
+status_t DrillClientBaseHandle<Listener, Value>::notifyListener(Value v, DrillClientError* pErr){
+ return m_pApplicationListener(getApplicationContext(), v, pErr);
+}
+
+void DrillClientQueryHandle::cancel() {
this->m_bCancel=true;
}
-void DrillClientQueryResult::signalError(DrillClientError* pErr){
+void DrillClientQueryHandle::signalError(DrillClientError* pErr){
// Ignore return values from the listener.
if(pErr!=NULL){
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
m_pError=pErr;
- pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
- if(pResultsListener!=NULL){
- pResultsListener(this, NULL, pErr);
- }else{
- defaultQueryResultsListener(this, NULL, pErr);
- }
+ // TODO should it be protected by m_cvMutex?
+ m_bHasError=true;
+ }
+ return;
+}
+
+template<typename Listener, typename Value>
+void DrillClientBaseHandle<Listener, Value>::signalError(DrillClientError* pErr){
+ DrillClientQueryHandle::signalError(pErr);
+ // Ignore return values from the listener.
+ if(pErr!=NULL){
+ this->notifyListener(NULL, pErr);
+ }
+}
+
+status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) {
+ pfnQueryResultsListener pResultsListener=getApplicationListener();
+ if(pResultsListener!=NULL){
+ return pResultsListener(this, batch, pErr);
+ }else{
+ return defaultQueryResultsListener(this, batch, pErr);
+ }
+}
+
+void DrillClientQueryResult::signalError(DrillClientError* pErr){
+ DrillClientQueryHandle::signalError(pErr);
+ // Ignore return values from the listener.
+ if(pErr!=NULL){
+ this->notifyListener(NULL, pErr);
{
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=false;
m_bHasData=false;
- m_bHasError=true;
}
//Signal the cv in case there is a client waiting for data already.
m_cv.notify_one();
@@ -1357,24 +1770,27 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){
}
void DrillClientQueryResult::signalComplete(){
- pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
- if(pResultsListener!=NULL){
- pResultsListener(this, NULL, NULL);
- }else{
- defaultQueryResultsListener(this, NULL, NULL);
- }
+ this->notifyListener(NULL, NULL);
{
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
- m_bIsQueryPending=false;
m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
- m_bHasError=false;
+ resetError();
}
//Signal the cv in case there is a client waiting for data already.
m_cv.notify_one();
return;
}
+void DrillClientQueryHandle::clearAndDestroy(){
+ //Tell the parent to remove this from its lists
+ m_client.removeQueryHandle(this);
+
+ if(m_pError!=NULL){
+ delete m_pError; m_pError=NULL;
+ }
+}
void DrillClientQueryResult::clearAndDestroy(){
+ DrillClientQueryHandle::clearAndDestroy();
//free memory allocated for FieldMetadata objects saved in m_columnDefs;
if(!m_columnDefs->empty()){
for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
@@ -1385,15 +1801,16 @@ void DrillClientQueryResult::clearAndDestroy(){
if(this->m_pQueryId!=NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
}
+
//Tell the parent to remove this from its lists
- m_pClient->clearMapEntries(this);
+ this->client().removeQueryResult(this);
//clear query id map entries.
if(this->m_pQueryId!=NULL){
delete this->m_pQueryId; this->m_pQueryId=NULL;
}
if(!m_recordBatches.empty()){
- // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
+ // When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_
// the last chunk has been received. We eventually delete it.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
RecordBatch* pR=NULL;
@@ -1403,11 +1820,32 @@ void DrillClientQueryResult::clearAndDestroy(){
delete pR;
}
}
- if(m_pError!=NULL){
- delete m_pError; m_pError=NULL;
+}
+
+status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) {
+ // Get columns schema information
+ const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns();
+ for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
+ FieldMetadata* metadata = new FieldMetadata;
+ metadata->set(*it);
+ m_columnDefs->push_back(metadata);
}
+
+ // Copy server handle
+ this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
+ return QRY_SUCCESS;
}
+void DrillClientPrepareHandle::clearAndDestroy(){
+ DrillClientQueryHandle::clearAndDestroy();
+ //free memory allocated for FieldMetadata objects saved in m_columnDefs;
+ if(!m_columnDefs->empty()){
+ for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
+ delete *it;
+ }
+ m_columnDefs->clear();
+ }
+}
connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
connectionStatus_t stat = CONN_SUCCESS;
@@ -1418,9 +1856,9 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
if(!strcmp(protocol.c_str(), "zk")){
// Get a list of drillbits
- ZookeeperImpl zook;
+ ZookeeperClient zook(pathToDrill);
std::vector<std::string> drillbits;
- int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
+ int err = zook.getAllDrillbits(hostPortStr, drillbits);
if(!err){
Utils::shuffle(drillbits);
// The original shuffled order is maintained if we shuffle first and then add any missing elements
@@ -1432,15 +1870,17 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
m_lastConnection++;
nextIndex = (m_lastConnection)%(getDrillbitCount());
}
+
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
<< "(" << (void*)this << ")"
- << ": Current counter is: "
+ << ": Current counter is: "
<< m_lastConnection << std::endl;)
- err=zook.getEndPoint(m_drillbits, nextIndex, e);
+ err=zook.getEndPoint(m_drillbits[nextIndex], e);
if(!err){
host=boost::lexical_cast<std::string>(e.address());
port=boost::lexical_cast<std::string>(e.user_port());
}
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex << ">. Selected " << e.DebugString() << std::endl;)
}
if(err){
return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
@@ -1475,7 +1915,7 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties*
connectionStatus_t stat=CONN_FAILURE;
// Keep a copy of the user properties
if(props!=NULL){
- m_pUserProperties = new DrillUserProperties;
+ m_pUserProperties = boost::shared_ptr<DrillUserProperties>(new DrillUserProperties);
for(size_t i=0; i<props->size(); i++){
m_pUserProperties->setProperty(
props->keyAt(i),
@@ -1486,10 +1926,10 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties*
DrillClientImpl* pDrillClientImpl = getOneConnection();
if(pDrillClientImpl != NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
- stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get());
}
else{
- stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
+ stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
}
return stat;
}
@@ -1505,16 +1945,52 @@ DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::Query
return pDrillClientQueryResult;
}
-void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
- // Nothing to do. If this class ever keeps track of executing queries then it will need
- // to implement this call to free any query specific resources the pool might have
+DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){
+ DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientPrepareHandle;
+}
+
+DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){
+ DrillClientQueryResult* pDrillClientQueryResult = NULL;
+ DrillClientImpl* pDrillClientImpl = NULL;
+ pDrillClientImpl = getOneConnection();
+ if(pDrillClientImpl != NULL){
+ pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx);
+ m_queriesExecuted++;
+ }
+ return pDrillClientQueryResult;
+}
+
+void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){
+ // If this class ever keeps track of executing queries then it will need
+ // to implement this call to free any query specific resources the pool might have
// allocated
- return;
+
+ pQryHandle->client().freeQueryResources(pQryHandle);
+}
+
+meta::DrillMetadata* PooledDrillClientImpl::getMetadata() {
+ meta::DrillMetadata* metadata = NULL;
+ DrillClientImpl* pDrillClientImpl = getOneConnection();
+ if (pDrillClientImpl != NULL) {
+ metadata = pDrillClientImpl->getMetadata();
+ }
+ return metadata;
+}
+
+void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
+ metadata->client().freeMetadata(metadata);
}
bool PooledDrillClientImpl::Active(){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
- for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
+ for(std::vector<DrillClientImpl*>::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
if((*it)->Active()){
return true;
}
@@ -1529,7 +2005,7 @@ void PooledDrillClientImpl::Close() {
delete *it;
}
m_clientConnections.clear();
- if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
+ m_pUserProperties.reset();
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_lastConnection=-1;
m_queriesExecuted=0;
@@ -1592,7 +2068,7 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
pDrillClientImpl=m_clientConnections.back();
- ret=pDrillClientImpl->validateHandshake(m_pUserProperties);
+ ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get());
if(ret!=CONN_SUCCESS){
delete pDrillClientImpl; pDrillClientImpl=NULL;
m_clientConnections.erase(m_clientConnections.end());
@@ -1602,251 +2078,14 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
if(ret!=CONN_SUCCESS){
break;
}
- } // need a new connection
+ } // need a new connection
}// while
if(pDrillClientImpl==NULL){
connectionStatus_t status = CONN_NOTCONNECTED;
- handleConnError(status, getMessage(status));
+ handleConnError(status, getMessage(ERR_CONN_NOCONN));
}
return pDrillClientImpl;
}
-char ZookeeperImpl::s_drillRoot[]="/drill/";
-char ZookeeperImpl::s_defaultCluster[]="drillbits1";
-
-ZookeeperImpl::ZookeeperImpl(){
- m_pDrillbits=new String_vector;
- m_bConnecting=true;
- memset(&m_id, 0, sizeof(m_id));
-}
-
-ZookeeperImpl::~ZookeeperImpl(){
- delete m_pDrillbits;
-}
-
-ZooLogLevel ZookeeperImpl::getZkLogLevel(){
- //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
- // ZOO_LOG_LEVEL_WARN=2,
- // ZOO_LOG_LEVEL_INFO=3,
- // ZOO_LOG_LEVEL_DEBUG=4
- //} ZooLogLevel;
- switch(DrillClientConfig::getLogLevel()){
- case LOG_TRACE:
- case LOG_DEBUG:
- return ZOO_LOG_LEVEL_DEBUG;
- case LOG_INFO:
- return ZOO_LOG_LEVEL_INFO;
- case LOG_WARNING:
- return ZOO_LOG_LEVEL_WARN;
- case LOG_ERROR:
- case LOG_FATAL:
- default:
- return ZOO_LOG_LEVEL_ERROR;
- }
- return ZOO_LOG_LEVEL_ERROR;
-}
-
-int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){
- uint32_t waitTime=30000; // 10 seconds
- zoo_set_debug_level(getZkLogLevel());
- zoo_deterministic_conn_order(1); // enable deterministic order
- struct String_vector* pDrillbits=NULL;
- m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
- if(!m_zh) {
- m_err = getMessage(ERR_CONN_ZKFAIL);
- zookeeper_close(m_zh);
- return -1;
- }else{
- m_err="";
- //Wait for the completion handler to signal successful connection
- boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
- boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
- while(this->m_bConnecting) {
- if(!this->m_cv.timed_wait(bufferLock, timeout)){
- m_err = getMessage(ERR_CONN_ZKTIMOUT);
- zookeeper_close(m_zh);
- return -1;
- }
- }
- }
- if(m_state!=ZOO_CONNECTED_STATE){
- zookeeper_close(m_zh);
- return -1;
- }
- int rc = ZOK;
- if(pathToDrill==NULL || strlen(pathToDrill)==0){
- m_rootDir=s_drillRoot;
- m_rootDir += s_defaultCluster;
- }else{
- m_rootDir=pathToDrill;
- }
-
- pDrillbits = new String_vector;
- rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits);
- if(rc!=ZOK){
- delete pDrillbits;
- m_err=getMessage(ERR_CONN_ZKERR, rc);
- zookeeper_close(m_zh);
- return -1;
- }
- if(pDrillbits && pDrillbits->count > 0){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster ("
- << connectStr << "/" << pathToDrill
- << ")." <<std::endl;)
- for(int i=0; i<pDrillbits->count; i++){
- drillbits.push_back(pDrillbits->data[i]);
- }
- for(int i=0; i<drillbits.size(); i++){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
- }
- }
- delete pDrillbits;
- return 0;
-}
-
-int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){
- int rc = ZOK;
- exec::DrillServiceInstance drillServiceInstance;
- if( drillbits.size() >0){
- // pick the drillbit at 'index'
- const char * bit=drillbits[index].c_str();
- std::string s;
- s=m_rootDir + std::string("/") + bit;
- int buffer_len=MAX_CONNECT_STR;
- char buffer[MAX_CONNECT_STR+1];
- struct Stat stat;
- buffer[MAX_CONNECT_STR]=0;
- rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat);
- if(rc!=ZOK){
- m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
- zookeeper_close(m_zh);
- return -1;
- }
- exec::DrillServiceInstance drillServiceInstance;
- drillServiceInstance.ParseFromArray(buffer, buffer_len);
- endpoint=drillServiceInstance.endpoint();
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;)
- }else{
-
- m_err=getMessage(ERR_CONN_ZKNODBIT);
- zookeeper_close(m_zh);
- return -1;
- }
- return 0;
-}
-
-// Deprecated
-int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){
- uint32_t waitTime=30000; // 10 seconds
- zoo_set_debug_level(getZkLogLevel());
- zoo_deterministic_conn_order(1); // enable deterministic order
- m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0);
- if(!m_zh) {
- m_err = getMessage(ERR_CONN_ZKFAIL);
- return CONN_FAILURE;
- }else{
- m_err="";
- //Wait for the completion handler to signal successful connection
- boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
- boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
- while(this->m_bConnecting) {
- if(!this->m_cv.timed_wait(bufferLock, timeout)){
- m_err = getMessage(ERR_CONN_ZKTIMOUT);
- return CONN_FAILURE;
- }
- }
- }
- if(m_state!=ZOO_CONNECTED_STATE){
- return CONN_FAILURE;
- }
- int rc = ZOK;
- char rootDir[MAX_CONNECT_STR+1];
- if(pathToDrill==NULL || strlen(pathToDrill)==0){
- strcpy(rootDir, (char*)s_drillRoot);
- strcat(rootDir, s_defaultCluster);
- }else{
- strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0;
- }
- rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits);
- if(rc!=ZOK){
- m_err=getMessage(ERR_CONN_ZKERR, rc);
- zookeeper_close(m_zh);
- return -1;
- }
-
- //Let's pick a random drillbit.
- if(m_pDrillbits && m_pDrillbits->count >0){
-
- std::vector<std::string> randomDrillbits;
- for(int i=0; i<m_pDrillbits->count; i++){
- randomDrillbits.push_back(m_pDrillbits->data[i]);
- }
- //Use the same random shuffle as the Java client instead of picking a drillbit at random.
- //Gives much better randomization when the size of the cluster is small.
- std::random_shuffle(randomDrillbits.begin(), randomDrillbits.end());
- const char * bit=randomDrillbits[0].c_str();
- std::string s;
-
- s=rootDir + std::string("/") + bit;
- int buffer_len=MAX_CONNECT_STR;
- char buffer[MAX_CONNECT_STR+1];
- struct Stat stat;
- buffer[MAX_CONNECT_STR]=0;
- rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat);
- if(rc!=ZOK){
- m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
- zookeeper_close(m_zh);
- return -1;
- }
- m_drillServiceInstance.ParseFromArray(buffer, buffer_len);
- }else{
- m_err=getMessage(ERR_CONN_ZKNODBIT);
- zookeeper_close(m_zh);
- return -1;
- }
- return 0;
-}
-
-void ZookeeperImpl::close(){
- zookeeper_close(m_zh);
-}
-
-void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *path, void* context) {
- //From cli.c
-
- /* Be careful using zh here rather than zzh - as this may be mt code
- * the client lib may call the watcher before zookeeper_init returns */
-
- ZookeeperImpl* self=(ZookeeperImpl*)context;
- self->m_state=state;
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_CONNECTED_STATE) {
- } else if (state == ZOO_AUTH_FAILED_STATE) {
- self->m_err= getMessage(ERR_CONN_ZKNOAUTH);
- zookeeper_close(zzh);
- self->m_zh=0;
- } else if (state == ZOO_EXPIRED_SESSION_STATE) {
- self->m_err= getMessage(ERR_CONN_ZKEXP);
- zookeeper_close(zzh);
- self->m_zh=0;
- }
- }
- // signal the cond var
- {
- if (state == ZOO_CONNECTED_STATE){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
- }
- boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex);
- self->m_bConnecting=false;
- }
- self->m_cv.notify_one();
-}
-
-void ZookeeperImpl:: debugPrint(){
- if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;)
- }
-}
-
} // namespace Drill