diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.cpp | 1315 |
1 files changed, 777 insertions, 538 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index b5d5a31e7..7ecf910f9 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -19,32 +19,30 @@ #include "drill/common.hpp" #include <queue> -#include <string.h> +#include <string> #include <boost/asio.hpp> +#include <boost/assign.hpp> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp> +#include <boost/functional/factory.hpp> #include <boost/lexical_cast.hpp> #include <boost/thread.hpp> -#ifdef _WIN32 -#include <zookeeper.h> -#else -#include <zookeeper/zookeeper.h> -#endif -#include <boost/assign.hpp> + #include "drill/drillClient.hpp" +#include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "drillClientImpl.hpp" +#include "collectionsImpl.hpp" #include "errmsgs.hpp" #include "logger.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" +#include "metadata.hpp" #include "rpcMessage.hpp" #include "utils.hpp" - #include "GeneralRPC.pb.h" #include "UserBitShared.pb.h" +#include "zookeeperClient.hpp" namespace Drill{ @@ -56,70 +54,57 @@ static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_ST (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED) ; -RpcEncoder DrillClientImpl::s_encoder; -RpcDecoder DrillClientImpl::s_decoder; - -std::string debugPrintQid(const exec::shared::QueryId& qid){ +static std::string debugPrintQid(const exec::shared::QueryId& qid){ return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] "); } -void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){ -#if defined _WIN32 - int32_t timeoutMsecs=timeout*1000; - setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs)); - setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs)); -#else - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - int e=0; - e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); -#endif -} - connectionStatus_t DrillClientImpl::connect(const char* connStr){ std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; - if(!this->m_bIsConnected){ - m_connectStr=connStr; - Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); - if(!strcmp(protocol.c_str(), "zk")){ - ZookeeperImpl zook; - std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + + if (this->m_bIsConnected) { + if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected + return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); + } + return CONN_SUCCESS; + } + + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + if(protocol == "zk"){ + ZookeeperClient zook(pathToDrill); + std::vector<std::string> drillbits; + int err = zook.getAllDrillbits(hostPortStr, drillbits); + if(!err){ + Utils::shuffle(drillbits); + exec::DrillbitEndpoint endpoint; + err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list if(!err){ - Utils::shuffle(drillbits); - exec::DrillbitEndpoint endpoint; - err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list - if(!err){ - host=boost::lexical_cast<std::string>(endpoint.address()); - port=boost::lexical_cast<std::string>(endpoint.user_port()); - } + host=boost::lexical_cast<std::string>(endpoint.address()); + port=boost::lexical_cast<std::string>(endpoint.user_port()); } - if(err){ - return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); - } - zook.close(); - m_bIsDirectConnection=true; - }else if(!strcmp(protocol.c_str(), "local")){ - boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant - char tempStr[MAX_CONNECT_STR+1]; - strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; - host=strtok(tempStr, ":"); - port=strtok(NULL, ""); - m_bIsDirectConnection=false; - }else{ - return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;) + } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) - connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); - return ret; - }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected - return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); + if(err){ + return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); + } + zook.close(); + m_bIsDirectConnection=true; + }else if(protocol == "local"){ + boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant + char tempStr[MAX_CONNECT_STR+1]; + strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; + host=strtok(tempStr, ":"); + port=strtok(NULL, ""); + m_bIsDirectConnection=false; + }else{ + return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } - return CONN_SUCCESS; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) + connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + return ret; } connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ @@ -140,7 +125,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str())); } - }catch(std::exception e){ + }catch(const std::exception & e){ // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve if (!strcmp(e.what(), "resolve")) { return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what())); @@ -152,7 +137,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ // set socket keep alive boost::asio::socket_base::keep_alive keepAlive(true); m_socket.set_option(keepAlive); - // set no_delay + // set no_delay boost::asio::ip::tcp::no_delay noDelay(true); m_socket.set_option(noDelay); @@ -160,7 +145,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port; m_connectedHost = connectedHost.str(); DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;) - + return CONN_SUCCESS; } @@ -180,7 +165,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){ connectionStatus_t status=CONN_SUCCESS; exec::rpc::Ack ack; ack.set_ok(true); - OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); + rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); boost::lock_guard<boost::mutex> prLock(this->m_prMutex); boost::lock_guard<boost::mutex> lock(m_dcMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;) @@ -203,7 +188,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;) if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " << to_simple_string(m_heartbeatTimer.expires_at()) << " and time now is: " << to_simple_string(boost::asio::deadline_timer::traits_type::now()) @@ -231,8 +216,8 @@ void DrillClientImpl::Close() { } -connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){ - DrillClientImpl::s_encoder.Encode(m_wbuf, msg); +connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){ + encode(m_wbuf, msg); boost::system::error_code ec; size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec); if(!ec && s!=0){ @@ -292,9 +277,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, m_deadlineTimer.cancel(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;) if(!error){ - InBoundRpcMessage msg; + rpc::InBoundRpcMessage msg; uint32_t length = 0; - int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length); + std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length); if(length>0){ size_t leftover = LEN_PREFIX_BUFLEN - bytes_read; ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN; @@ -309,7 +294,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, bytesToRead-=dataBytesRead; b+=dataBytesRead; } - DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg); + if (!decode(m_rbuf+bytes_read, length, msg)) { + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";) + handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake")); + return; + } }else{ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); @@ -321,6 +310,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, this->m_handshakeStatus=b2u.status(); this->m_handshakeErrorId=b2u.errorid(); this->m_handshakeErrorMsg=b2u.errormessage(); + this->m_serverInfos = b2u.server_infos(); }else{ // boost error @@ -362,6 +352,14 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope u2b.set_support_listening(true); u2b.set_support_timeout(true); + // Adding version info + exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos(); + infos->set_name(DRILL_CONNECTOR_NAME); + infos->set_version(DRILL_VERSION_STRING); + infos->set_majorversion(DRILL_VERSION_MAJOR); + infos->set_minorversion(DRILL_VERSION_MINOR); + infos->set_patchversion(DRILL_VERSION_PATCH); + if(properties != NULL && properties->size()>0){ std::string username; std::string err; @@ -374,7 +372,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope for(size_t i=0; i<properties->size(); i++){ std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i)); if(it==DrillUserProperties::USER_PROPERTIES.end()){ - DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) + DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) << ") is unknown and is being skipped" << std::endl;) continue; } @@ -402,7 +400,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope boost::lock_guard<boost::mutex> lock(this->m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); - OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); + rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSync(out_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";) } @@ -469,38 +467,159 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t query.set_type(t); query.set_plan(plan); - uint64_t coordId; - DrillClientQueryResult* pQuery=NULL; + boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientQueryResult*>(), + boost::ref(*this), + _1, + boost::cref(plan), + l, + lCtx); + return sendMsg(factory, ::exec::user::RUN_QUERY, query); +} + +DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan, + pfnPreparedStatementListener l, + void* lCtx){ + exec::user::CreatePreparedStatementReq query; + query.set_sql_query(plan); + + boost::function<DrillClientPrepareHandle*(int32_t)> factory = boost::bind( + boost::factory<DrillClientPrepareHandle*>(), + boost::ref(*this), + _1, + boost::cref(plan), + l, + lCtx); + return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query); +} + +DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, + pfnQueryResultsListener l, + void* lCtx){ + const DrillClientPrepareHandle& handle = static_cast<const DrillClientPrepareHandle&>(pstmt); + + exec::user::RunQuery query; + query.set_results_mode(exec::user::STREAM_FULL); + query.set_type(::exec::shared::PREPARED_STATEMENT); + query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle)); + + boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientQueryResult*>(), + boost::ref(*this), + _1, + boost::cref(handle.m_query), + l, + lCtx); + return sendMsg(factory, ::exec::user::RUN_QUERY, query); +} + +DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern, + Metadata::pfnCatalogMetadataListener listener, + void* listenerCtx) { + exec::user::GetCatalogsReq query; + exec::user::LikeFilter* catalogFilter(query.mutable_catalog_name_filter()); + catalogFilter->set_pattern(catalogPattern); + + boost::function<DrillClientCatalogResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientCatalogResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_CATALOGS, query); +} + +DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern, + const std::string& schemaPattern, + Metadata::pfnSchemaMetadataListener listener, + void* listenerCtx) { + exec::user::GetSchemasReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + + boost::function<DrillClientSchemaResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientSchemaResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_SCHEMAS, query); +} + +DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern, + const std::string& schemaPattern, + const std::string& tablePattern, + const std::vector<std::string>* tableTypes, + Metadata::pfnTableMetadataListener listener, + void* listenerCtx) { + exec::user::GetTablesReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + query.mutable_table_name_filter()->set_pattern(tablePattern); + if (tableTypes) { + std::copy(tableTypes->begin(), tableTypes->end(), + google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter())); + } + + boost::function<DrillClientTableResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientTableResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_TABLES, query); +} + +DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern, + const std::string& schemaPattern, + const std::string& tablePattern, + const std::string& columnsPattern, + Metadata::pfnColumnMetadataListener listener, + void* listenerCtx) { + exec::user::GetColumnsReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + query.mutable_table_name_filter()->set_pattern(tablePattern); + query.mutable_column_name_filter()->set_pattern(columnsPattern); + + boost::function<DrillClientColumnResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientColumnResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_COLUMNS, query); +} + +template<typename Handle> +Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) { + int32_t coordId; + Handle* phandle=NULL; connectionStatus_t cStatus=CONN_SUCCESS; { boost::lock_guard<boost::mutex> prLock(this->m_prMutex); boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex); coordId = this->getNextCoordinationId(); - OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query); + rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message); - // Create the result object and register the listener before we send the query - // because sometimes the caller is not checking the status of the submitQuery call. - // This way, the broadcast error call will cause the results listener to be called - // with a COMM_ERROR status. - pQuery = new DrillClientQueryResult(this, coordId, plan); - pQuery->registerListener(l, lCtx); - this->m_queryIds[coordId]=pQuery; + phandle = handleFactory(coordId); + this->m_queryHandles[coordId]=phandle; connectionStatus_t cStatus=sendSync(out_msg); if(cStatus == CONN_SUCCESS){ bool sendRequest=false; - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;) if(m_pendingRequests++==0){ sendRequest=true; }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) } if(sendRequest){ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = " << m_pendingRequests << std::endl;) getNextResult(); // async wait for results } @@ -508,21 +627,18 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t } if(cStatus!=CONN_SUCCESS){ - this->m_queryIds.erase(coordId); - delete pQuery; + this->m_queryHandles.erase(coordId); + delete phandle; return NULL; } - - //run this in a new thread startMessageListener(); - return pQuery; + return phandle; } void DrillClientImpl::getNextResult(){ - // This call is always made from within a function where the mutex has already been acquired //boost::lock_guard<boost::mutex> lock(this->m_dcMutex); @@ -533,7 +649,7 @@ void DrillClientImpl::getNextResult(){ AllocatedBuffer::s_memCV.wait(memLock); } } - + //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ @@ -577,8 +693,7 @@ void DrillClientImpl::waitForResults(){ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, - boost::system::error_code& error){ + rpc::InBoundRpcMessage& msg){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " << reinterpret_cast<int*>(_buf) << std::endl;) @@ -590,7 +705,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, // We need to protect the readLength and read buffer, and the pending requests counter, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen); + std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;) @@ -612,7 +727,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, << (rmsgLen - leftover) << std::endl;) ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; - + boost::system::error_code error; while(1){ size_t dataBytesRead=this->m_socket.read_some( boost::asio::buffer(b, bytesToRead), @@ -623,10 +738,14 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, bytesToRead-=dataBytesRead; b+=dataBytesRead; } - + if(!error){ // read data successfully - DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); + if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) { + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); + return handleQryError(QRY_COMM_ERROR, + getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);; + } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;) }else{ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); @@ -645,7 +764,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, return QRY_SUCCESS; } -status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; exec::shared::QueryId qid; @@ -657,15 +776,15 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;) qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;) - + qid.CopyFrom(qr.query_id()); - + if (qr.has_query_state() && qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING && qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) { pDrillClientQueryResult=findQueryResult(qid); - //Queries that have been cancelled or whose resources are freed before completion - //do not have a DrillClientQueryResult object. We need not handle the terminal message + //Queries that have been cancelled or whose resources are freed before completion + //do not have a DrillClientQueryResult object. We need not handle the terminal message //in that case since all it does is to free resources (and they have already been freed) if(pDrillClientQueryResult!=NULL){ //Validate the RPC message @@ -703,10 +822,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer return ret; } -status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; - exec::shared::QueryId qid; + ::exec::shared::QueryId qid; // Be a good client and send ack as early as possible. // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message @@ -720,7 +839,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) - qid.CopyFrom(qr->query_id()); + qid = ::exec::shared::QueryId(qr->query_id()); if(qid.part1()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; @@ -729,13 +848,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult=findQueryResult(qid); if(pDrillClientQueryResult==NULL){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" << debugPrintQid(qid) << ")." << std::endl;) delete qr; delete allocatedBuffer; return ret; } - + //Validate the RPC message std::string valErr; if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ @@ -765,20 +884,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, } pDrillClientQueryResult->setIsQueryPending(true); - pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener; if(pDrillClientQueryResult->m_bIsLastChunk){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << "Received last batch. " << std::endl;) ret=QRY_NO_MORE_DATA; } pDrillClientQueryResult->setQueryStatus(ret); - if(pResultsListener!=NULL){ - ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL); - }else{ - //Use a default callback that is called when a record batch is received - ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult, - pRecordBatch, NULL); - } + ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } // release lock if(ret==QRY_FAILURE){ sendCancel(&qid); @@ -787,31 +899,37 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult->setIsQueryPending(false); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) pDrillClientQueryResult->setQueryStatus(ret); - clearMapEntries(pDrillClientQueryResult); + removeQueryHandle(pDrillClientQueryResult); + removeQueryResult(pDrillClientQueryResult); return ret; } return ret; } -status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ - DrillClientQueryResult* pDrillClientQueryResult=NULL; +status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); boost::lock_guard<boost::mutex> lock(m_dcMutex); - std::map<int,DrillClientQueryResult*>::iterator it; - for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ - std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL"); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first + for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ + DrillClientQueryResult* pDrillClientQueryResult=it->second; + std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL"); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId << " QueryId: "<< qidString << std::endl;) } if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } - it=this->m_queryIds.find(msg.m_coord_id); - if(it!=this->m_queryIds.end()){ - pDrillClientQueryResult=(*it).second; + std::map<int, DrillClientQueryHandle*>::const_iterator it; + it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second); + if (!pDrillClientQueryResult) { + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } exec::shared::QueryId *qid = new exec::shared::QueryId; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); @@ -820,14 +938,241 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); }else{ - delete allocatedBuffer; return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } - delete allocatedBuffer; return ret; } -DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){ +status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second); + exec::user::CreatePreparedStatementResp resp; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;) + if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle); + } + if (resp.has_status() && resp.status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle); + } + pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement()); + pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second); + exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp; + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + + const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->catalogs_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) { + meta::DrillCatalogMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result - " << resp->catalogs_size() << " catalog(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second); + exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + + const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->schemas_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) { + meta::DrillSchemaMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second); + exec::user::GetTablesResp* resp = new exec::user::GetTablesResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->tables_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) { + meta::DrillTableMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second); + exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->columns_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { + meta::DrillColumnMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){ DrillClientQueryResult* pDrillClientQueryResult=NULL; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;) std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; @@ -838,7 +1183,7 @@ DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& << it->first->part2() << "]\n";) } } - it=this->m_queryResults.find(&qid); + it=this->m_queryResults.find(const_cast<exec::shared::QueryId * const>(&qid)); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << @@ -925,9 +1270,8 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ } void DrillClientImpl::handleRead(ByteBuf_t _buf, - const boost::system::error_code& err, + const boost::system::error_code& error, size_t bytes_transferred) { - boost::system::error_code error=err; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " << reinterpret_cast<int*>(_buf) << std::endl;) if(DrillClientConfig::getQueryTimeout() > 0){ @@ -935,120 +1279,153 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";) m_deadlineTimer.cancel(); } - if(!error){ - InBoundRpcMessage msg; - boost::lock_guard<boost::mutex> lock(this->m_prMutex); + if (error) { + // boost error + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " + "Boost Communication Error: " << error.message() << std::endl;) + handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); + return; + } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) - AllocatedBufferPtr allocatedBuffer=NULL; + rpc::InBoundRpcMessage msg; + boost::lock_guard<boost::mutex> lockPR(this->m_prMutex); - if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) + AllocatedBufferPtr allocatedBuffer=NULL; + + if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){ + delete allocatedBuffer; + if(m_pendingRequests!=0){ + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + getNextResult(); } + return; + } - if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away - m_pendingRequests--; + if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away + m_pendingRequests--; + delete allocatedBuffer; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) + if(m_pendingRequests!=0){ + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + getNextResult(); + }else{ + boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) + m_cv.notify_one(); + } + + return; + } + + if(msg.m_mode == exec::rpc::RESPONSE) { + status_t s; + switch(msg.m_rpc_type) { + case exec::user::QUERY_HANDLE: + s = processQueryId(allocatedBuffer, msg); + break; + + case exec::user::PREPARED_STATEMENT: + s = processPreparedStatement(allocatedBuffer, msg); + break; + + case exec::user::CATALOGS: + s = processCatalogsResult(allocatedBuffer, msg); + break; + + case exec::user::SCHEMAS: + s = processSchemasResult(allocatedBuffer, msg); + break; + + case exec::user::TABLES: + s = processTablesResult(allocatedBuffer, msg); + break; + + case exec::user::COLUMNS: + s = processColumnsResult(allocatedBuffer, msg); + break; + + case exec::user::HANDSHAKE: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) delete allocatedBuffer; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - }else{ - boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) - m_cv.notify_one(); - } - return; - }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ - status_t s = processQueryResult(allocatedBuffer, msg); - if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::QUERY_DATA){ - if(processQueryData(allocatedBuffer, msg)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){ - if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::ACK){ + break; + + case exec::user::ACK: // Cancel requests will result in an ACK sent back. // Consume silently + s = QRY_CANCELED; delete allocatedBuffer; - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - }else{ + break; + + default: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) + delete allocatedBuffer; + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); + } + + if (m_pendingRequests != 0) { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - if(error){ - // We have a socket read error, but we do not know which query this is for. - // Signal ALL pending queries that they should stop waiting. - delete allocatedBuffer; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;) - handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); - return; - }else{ - // If not QUERY_RESULT, then we think something serious has gone wrong? - // In one case when the client hung, we observed that the server was sending a handshake request to the client - // We should properly handle these handshake requests/responses - if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){ - if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) - exec::user::UserToBitHandshake u2b; - u2b.set_channel(exec::shared::USER); - u2b.set_rpc_version(DRILL_RPC_VERSION); - u2b.set_support_listening(true); - OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); - sendSync(out_msg); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) - } - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " - << "QueryResult returned " << msg.m_rpc_type << std::endl;) - handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); - } - delete allocatedBuffer; - return; + getNextResult(); + } + + return; + } + + if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) { + status_t s; + switch(msg.m_rpc_type) { + case exec::user::QUERY_RESULT: + s = processQueryResult(allocatedBuffer, msg); + break; + + case exec::user::QUERY_DATA: + s = processQueryData(allocatedBuffer, msg); + break; + + case exec::user::HANDSHAKE: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) + delete allocatedBuffer; + // In one case when the client hung, we observed that the server was sending a handshake request to the client + // We should properly handle these handshake requests/responses + { + boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex); + exec::user::UserToBitHandshake u2b; + u2b.set_channel(exec::shared::USER); + u2b.set_rpc_version(DRILL_RPC_VERSION); + u2b.set_support_listening(true); + rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); + sendSync(out_msg); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) } + break; + + default: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) + delete allocatedBuffer; + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } - { + + if (m_pendingRequests != 0) { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); getNextResult(); } - }else{ - // boost error - Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " - "Boost Communication Error: " << error.message() << std::endl;) - handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); + return; } - return; + + // If not QUERY_RESULT, then we think something serious has gone wrong? + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;) + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); + delete allocatedBuffer; + } -status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valErr){ +status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; @@ -1060,7 +1437,7 @@ status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shar return QRY_SUCCESS; } -status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){ +status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; @@ -1072,10 +1449,10 @@ status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::sh return QRY_SUCCESS; } -connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ +connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); m_pendingRequests=0; - if(!m_queryIds.empty()){ + if(!m_queryHandles.empty()){ // set query error only if queries are running broadcastError(pErr); }else{ @@ -1086,12 +1463,12 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s return status; } -status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){ +status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); // set query error only if queries are running - if(pQueryResult!=NULL){ + if(pQueryHandle!=NULL){ m_pendingRequests--; - pQueryResult->signalError(pErr); + pQueryHandle->signalError(pErr); }else{ m_pendingRequests=0; broadcastError(pErr); @@ -1101,27 +1478,27 @@ status_t DrillClientImpl::handleQryError(status_t status, std::string msg, Drill status_t DrillClientImpl::handleQryError(status_t status, const exec::shared::DrillPBError& e, - DrillClientQueryResult* pQueryResult){ - assert(pQueryResult!=NULL); + DrillClientQueryHandle* pQueryHandle){ + assert(pQueryHandle!=NULL); DrillClientError* pErr = DrillClientError::getErrorObject(e); - pQueryResult->signalError(pErr); + pQueryHandle->signalError(pErr); m_pendingRequests--; return status; } void DrillClientImpl::broadcastError(DrillClientError* pErr){ if(pErr!=NULL){ - std::map<int, DrillClientQueryResult*>::iterator iter; - if(!m_queryIds.empty()){ - for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) { + std::map<int, DrillClientQueryHandle*>::const_iterator iter; + if(!m_queryHandles.empty()){ + for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) { DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg); iter->second->signalError(err); } } delete pErr; } - // We have an error at the connection level. Cancel the heartbeat. - // And close the connection + // We have an error at the connection level. Cancel the heartbeat. + // And close the connection m_heartbeatTimer.cancel(); m_pendingRequests=0; m_cv.notify_one(); @@ -1132,7 +1509,7 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){ // The implementation is similar to handleQryError status_t DrillClientImpl::handleTerminatedQryState( status_t status, - std::string msg, + const std::string& msg, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); if(status==QRY_COMPLETED){ @@ -1145,21 +1522,22 @@ status_t DrillClientImpl::handleTerminatedQryState( return status; } - -void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ - std::map<int, DrillClientQueryResult*>::iterator iter; +void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){ boost::lock_guard<boost::mutex> lock(m_dcMutex); - if(!m_queryIds.empty()){ - for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) { - if(pQueryResult==(DrillClientQueryResult*)iter->second){ - m_queryIds.erase(iter->first); + if(!m_queryHandles.empty()){ + for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) { + if(pQueryHandle==(DrillClientQueryHandle*)iter->second){ + m_queryHandles.erase(iter->first); break; } } } +} + +void DrillClientImpl::removeQueryResult(DrillClientQueryResult* pQueryResult){ + boost::lock_guard<boost::mutex> lock(m_dcMutex); if(!m_queryResults.empty()){ - std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; - for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { + for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { if(pQueryResult==(DrillClientQueryResult*)it->second){ m_queryResults.erase(it->first); break; @@ -1168,19 +1546,19 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ } } -void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){ +void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){ exec::rpc::Ack ack; ack.set_ok(isOk); - OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); + rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard<boost::mutex> lock(m_dcMutex); sendSync(ack_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;) } -void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ +void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){ boost::lock_guard<boost::mutex> lock(m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); - OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); + rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); sendSync(cancel_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;) } @@ -1193,6 +1571,14 @@ void DrillClientImpl::shutdownSocket(){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;) } +meta::DrillMetadata* DrillClientImpl::getMetadata() { + return new meta::DrillMetadata(*this); +} + +void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { + delete metadata; +} + // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this // class are used by the async callbacks. status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) { @@ -1254,7 +1640,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, //ctx; // unused, we already have the this pointer DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;) //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. - if(this->m_bCancel){ + if(this->isCancelled()){ if(b!=NULL) delete b; return QRY_FAILURE; } @@ -1284,7 +1670,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){ //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } // READ but not remove first element from queue @@ -1305,7 +1691,7 @@ RecordBatch* DrillClientQueryResult::getNext() { } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){ + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){ this->m_cv.wait(cvLock); } // remove first element from queue @@ -1322,33 +1708,60 @@ void DrillClientQueryResult::waitForData() { boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return; - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } } -void DrillClientQueryResult::cancel() { +template<typename Listener, typename Value> +status_t DrillClientBaseHandle<Listener, Value>::notifyListener(Value v, DrillClientError* pErr){ + return m_pApplicationListener(getApplicationContext(), v, pErr); +} + +void DrillClientQueryHandle::cancel() { this->m_bCancel=true; } -void DrillClientQueryResult::signalError(DrillClientError* pErr){ +void DrillClientQueryHandle::signalError(DrillClientError* pErr){ // Ignore return values from the listener. if(pErr!=NULL){ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } m_pError=pErr; - pfnQueryResultsListener pResultsListener=this->m_pResultsListener; - if(pResultsListener!=NULL){ - pResultsListener(this, NULL, pErr); - }else{ - defaultQueryResultsListener(this, NULL, pErr); - } + // TODO should it be protected by m_cvMutex? + m_bHasError=true; + } + return; +} + +template<typename Listener, typename Value> +void DrillClientBaseHandle<Listener, Value>::signalError(DrillClientError* pErr){ + DrillClientQueryHandle::signalError(pErr); + // Ignore return values from the listener. + if(pErr!=NULL){ + this->notifyListener(NULL, pErr); + } +} + +status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) { + pfnQueryResultsListener pResultsListener=getApplicationListener(); + if(pResultsListener!=NULL){ + return pResultsListener(this, batch, pErr); + }else{ + return defaultQueryResultsListener(this, batch, pErr); + } +} + +void DrillClientQueryResult::signalError(DrillClientError* pErr){ + DrillClientQueryHandle::signalError(pErr); + // Ignore return values from the listener. + if(pErr!=NULL){ + this->notifyListener(NULL, pErr); { boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); m_bIsQueryPending=false; m_bHasData=false; - m_bHasError=true; } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); @@ -1357,24 +1770,27 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){ } void DrillClientQueryResult::signalComplete(){ - pfnQueryResultsListener pResultsListener=this->m_pResultsListener; - if(pResultsListener!=NULL){ - pResultsListener(this, NULL, NULL); - }else{ - defaultQueryResultsListener(this, NULL, NULL); - } + this->notifyListener(NULL, NULL); { boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); - m_bIsQueryPending=false; m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED); - m_bHasError=false; + resetError(); } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); return; } +void DrillClientQueryHandle::clearAndDestroy(){ + //Tell the parent to remove this from its lists + m_client.removeQueryHandle(this); + + if(m_pError!=NULL){ + delete m_pError; m_pError=NULL; + } +} void DrillClientQueryResult::clearAndDestroy(){ + DrillClientQueryHandle::clearAndDestroy(); //free memory allocated for FieldMetadata objects saved in m_columnDefs; if(!m_columnDefs->empty()){ for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ @@ -1385,15 +1801,16 @@ void DrillClientQueryResult::clearAndDestroy(){ if(this->m_pQueryId!=NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;) } + //Tell the parent to remove this from its lists - m_pClient->clearMapEntries(this); + this->client().removeQueryResult(this); //clear query id map entries. if(this->m_pQueryId!=NULL){ delete this->m_pQueryId; this->m_pQueryId=NULL; } if(!m_recordBatches.empty()){ - // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_ + // When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;) RecordBatch* pR=NULL; @@ -1403,11 +1820,32 @@ void DrillClientQueryResult::clearAndDestroy(){ delete pR; } } - if(m_pError!=NULL){ - delete m_pError; m_pError=NULL; +} + +status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) { + // Get columns schema information + const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns(); + for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { + FieldMetadata* metadata = new FieldMetadata; + metadata->set(*it); + m_columnDefs->push_back(metadata); } + + // Copy server handle + this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle()); + return QRY_SUCCESS; } +void DrillClientPrepareHandle::clearAndDestroy(){ + DrillClientQueryHandle::clearAndDestroy(); + //free memory allocated for FieldMetadata objects saved in m_columnDefs; + if(!m_columnDefs->empty()){ + for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ + delete *it; + } + m_columnDefs->clear(); + } +} connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ connectionStatus_t stat = CONN_SUCCESS; @@ -1418,9 +1856,9 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(!strcmp(protocol.c_str(), "zk")){ // Get a list of drillbits - ZookeeperImpl zook; + ZookeeperClient zook(pathToDrill); std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + int err = zook.getAllDrillbits(hostPortStr, drillbits); if(!err){ Utils::shuffle(drillbits); // The original shuffled order is maintained if we shuffle first and then add any missing elements @@ -1432,15 +1870,17 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ m_lastConnection++; nextIndex = (m_lastConnection)%(getDrillbitCount()); } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" << "(" << (void*)this << ")" - << ": Current counter is: " + << ": Current counter is: " << m_lastConnection << std::endl;) - err=zook.getEndPoint(m_drillbits, nextIndex, e); + err=zook.getEndPoint(m_drillbits[nextIndex], e); if(!err){ host=boost::lexical_cast<std::string>(e.address()); port=boost::lexical_cast<std::string>(e.user_port()); } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex << ">. Selected " << e.DebugString() << std::endl;) } if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); @@ -1475,7 +1915,7 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* connectionStatus_t stat=CONN_FAILURE; // Keep a copy of the user properties if(props!=NULL){ - m_pUserProperties = new DrillUserProperties; + m_pUserProperties = boost::shared_ptr<DrillUserProperties>(new DrillUserProperties); for(size_t i=0; i<props->size(); i++){ m_pUserProperties->setProperty( props->keyAt(i), @@ -1486,10 +1926,10 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* DrillClientImpl* pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) - stat=pDrillClientImpl->validateHandshake(m_pUserProperties); + stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get()); } else{ - stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); + stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); } return stat; } @@ -1505,16 +1945,52 @@ DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::Query return pDrillClientQueryResult; } -void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){ - // Nothing to do. If this class ever keeps track of executing queries then it will need - // to implement this call to free any query specific resources the pool might have +DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){ + DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx); + m_queriesExecuted++; + } + return pDrillClientPrepareHandle; +} + +DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){ + DrillClientQueryResult* pDrillClientQueryResult = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx); + m_queriesExecuted++; + } + return pDrillClientQueryResult; +} + +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){ + // If this class ever keeps track of executing queries then it will need + // to implement this call to free any query specific resources the pool might have // allocated - return; + + pQryHandle->client().freeQueryResources(pQryHandle); +} + +meta::DrillMetadata* PooledDrillClientImpl::getMetadata() { + meta::DrillMetadata* metadata = NULL; + DrillClientImpl* pDrillClientImpl = getOneConnection(); + if (pDrillClientImpl != NULL) { + metadata = pDrillClientImpl->getMetadata(); + } + return metadata; +} + +void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { + metadata->client().freeMetadata(metadata); } bool PooledDrillClientImpl::Active(){ boost::lock_guard<boost::mutex> lock(m_poolMutex); - for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + for(std::vector<DrillClientImpl*>::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ if((*it)->Active()){ return true; } @@ -1529,7 +2005,7 @@ void PooledDrillClientImpl::Close() { delete *it; } m_clientConnections.clear(); - if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} + m_pUserProperties.reset(); if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_lastConnection=-1; m_queriesExecuted=0; @@ -1592,7 +2068,7 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){ boost::lock_guard<boost::mutex> lock(m_poolMutex); pDrillClientImpl=m_clientConnections.back(); - ret=pDrillClientImpl->validateHandshake(m_pUserProperties); + ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get()); if(ret!=CONN_SUCCESS){ delete pDrillClientImpl; pDrillClientImpl=NULL; m_clientConnections.erase(m_clientConnections.end()); @@ -1602,251 +2078,14 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ if(ret!=CONN_SUCCESS){ break; } - } // need a new connection + } // need a new connection }// while if(pDrillClientImpl==NULL){ connectionStatus_t status = CONN_NOTCONNECTED; - handleConnError(status, getMessage(status)); + handleConnError(status, getMessage(ERR_CONN_NOCONN)); } return pDrillClientImpl; } -char ZookeeperImpl::s_drillRoot[]="/drill/"; -char ZookeeperImpl::s_defaultCluster[]="drillbits1"; - -ZookeeperImpl::ZookeeperImpl(){ - m_pDrillbits=new String_vector; - m_bConnecting=true; - memset(&m_id, 0, sizeof(m_id)); -} - -ZookeeperImpl::~ZookeeperImpl(){ - delete m_pDrillbits; -} - -ZooLogLevel ZookeeperImpl::getZkLogLevel(){ - //typedef enum {ZOO_LOG_LEVEL_ERROR=1, - // ZOO_LOG_LEVEL_WARN=2, - // ZOO_LOG_LEVEL_INFO=3, - // ZOO_LOG_LEVEL_DEBUG=4 - //} ZooLogLevel; - switch(DrillClientConfig::getLogLevel()){ - case LOG_TRACE: - case LOG_DEBUG: - return ZOO_LOG_LEVEL_DEBUG; - case LOG_INFO: - return ZOO_LOG_LEVEL_INFO; - case LOG_WARNING: - return ZOO_LOG_LEVEL_WARN; - case LOG_ERROR: - case LOG_FATAL: - default: - return ZOO_LOG_LEVEL_ERROR; - } - return ZOO_LOG_LEVEL_ERROR; -} - -int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){ - uint32_t waitTime=30000; // 10 seconds - zoo_set_debug_level(getZkLogLevel()); - zoo_deterministic_conn_order(1); // enable deterministic order - struct String_vector* pDrillbits=NULL; - m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); - if(!m_zh) { - m_err = getMessage(ERR_CONN_ZKFAIL); - zookeeper_close(m_zh); - return -1; - }else{ - m_err=""; - //Wait for the completion handler to signal successful connection - boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); - while(this->m_bConnecting) { - if(!this->m_cv.timed_wait(bufferLock, timeout)){ - m_err = getMessage(ERR_CONN_ZKTIMOUT); - zookeeper_close(m_zh); - return -1; - } - } - } - if(m_state!=ZOO_CONNECTED_STATE){ - zookeeper_close(m_zh); - return -1; - } - int rc = ZOK; - if(pathToDrill==NULL || strlen(pathToDrill)==0){ - m_rootDir=s_drillRoot; - m_rootDir += s_defaultCluster; - }else{ - m_rootDir=pathToDrill; - } - - pDrillbits = new String_vector; - rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits); - if(rc!=ZOK){ - delete pDrillbits; - m_err=getMessage(ERR_CONN_ZKERR, rc); - zookeeper_close(m_zh); - return -1; - } - if(pDrillbits && pDrillbits->count > 0){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" - << connectStr << "/" << pathToDrill - << ")." <<std::endl;) - for(int i=0; i<pDrillbits->count; i++){ - drillbits.push_back(pDrillbits->data[i]); - } - for(int i=0; i<drillbits.size(); i++){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;) - } - } - delete pDrillbits; - return 0; -} - -int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){ - int rc = ZOK; - exec::DrillServiceInstance drillServiceInstance; - if( drillbits.size() >0){ - // pick the drillbit at 'index' - const char * bit=drillbits[index].c_str(); - std::string s; - s=m_rootDir + std::string("/") + bit; - int buffer_len=MAX_CONNECT_STR; - char buffer[MAX_CONNECT_STR+1]; - struct Stat stat; - buffer[MAX_CONNECT_STR]=0; - rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKDBITERR, rc); - zookeeper_close(m_zh); - return -1; - } - exec::DrillServiceInstance drillServiceInstance; - drillServiceInstance.ParseFromArray(buffer, buffer_len); - endpoint=drillServiceInstance.endpoint(); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;) - }else{ - - m_err=getMessage(ERR_CONN_ZKNODBIT); - zookeeper_close(m_zh); - return -1; - } - return 0; -} - -// Deprecated -int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){ - uint32_t waitTime=30000; // 10 seconds - zoo_set_debug_level(getZkLogLevel()); - zoo_deterministic_conn_order(1); // enable deterministic order - m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); - if(!m_zh) { - m_err = getMessage(ERR_CONN_ZKFAIL); - return CONN_FAILURE; - }else{ - m_err=""; - //Wait for the completion handler to signal successful connection - boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); - while(this->m_bConnecting) { - if(!this->m_cv.timed_wait(bufferLock, timeout)){ - m_err = getMessage(ERR_CONN_ZKTIMOUT); - return CONN_FAILURE; - } - } - } - if(m_state!=ZOO_CONNECTED_STATE){ - return CONN_FAILURE; - } - int rc = ZOK; - char rootDir[MAX_CONNECT_STR+1]; - if(pathToDrill==NULL || strlen(pathToDrill)==0){ - strcpy(rootDir, (char*)s_drillRoot); - strcat(rootDir, s_defaultCluster); - }else{ - strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0; - } - rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKERR, rc); - zookeeper_close(m_zh); - return -1; - } - - //Let's pick a random drillbit. - if(m_pDrillbits && m_pDrillbits->count >0){ - - std::vector<std::string> randomDrillbits; - for(int i=0; i<m_pDrillbits->count; i++){ - randomDrillbits.push_back(m_pDrillbits->data[i]); - } - //Use the same random shuffle as the Java client instead of picking a drillbit at random. - //Gives much better randomization when the size of the cluster is small. - std::random_shuffle(randomDrillbits.begin(), randomDrillbits.end()); - const char * bit=randomDrillbits[0].c_str(); - std::string s; - - s=rootDir + std::string("/") + bit; - int buffer_len=MAX_CONNECT_STR; - char buffer[MAX_CONNECT_STR+1]; - struct Stat stat; - buffer[MAX_CONNECT_STR]=0; - rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKDBITERR, rc); - zookeeper_close(m_zh); - return -1; - } - m_drillServiceInstance.ParseFromArray(buffer, buffer_len); - }else{ - m_err=getMessage(ERR_CONN_ZKNODBIT); - zookeeper_close(m_zh); - return -1; - } - return 0; -} - -void ZookeeperImpl::close(){ - zookeeper_close(m_zh); -} - -void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *path, void* context) { - //From cli.c - - /* Be careful using zh here rather than zzh - as this may be mt code - * the client lib may call the watcher before zookeeper_init returns */ - - ZookeeperImpl* self=(ZookeeperImpl*)context; - self->m_state=state; - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_CONNECTED_STATE) { - } else if (state == ZOO_AUTH_FAILED_STATE) { - self->m_err= getMessage(ERR_CONN_ZKNOAUTH); - zookeeper_close(zzh); - self->m_zh=0; - } else if (state == ZOO_EXPIRED_SESSION_STATE) { - self->m_err= getMessage(ERR_CONN_ZKEXP); - zookeeper_close(zzh); - self->m_zh=0; - } - } - // signal the cond var - { - if (state == ZOO_CONNECTED_STATE){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) - } - boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex); - self->m_bConnecting=false; - } - self->m_cv.notify_one(); -} - -void ZookeeperImpl:: debugPrint(){ - if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;) - } -} - } // namespace Drill |