/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "drill/common.hpp" #include #include #include #include #include #include #include #include #include #include #include #include "drill/drillClient.hpp" #include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "drillClientImpl.hpp" #include "collectionsImpl.hpp" #include "errmsgs.hpp" #include "logger.hpp" #include "metadata.hpp" #include "rpcMessage.hpp" #include "utils.hpp" #include "GeneralRPC.pb.h" #include "UserBitShared.pb.h" #include "zookeeperClient.hpp" #include "saslAuthenticatorImpl.hpp" namespace Drill{ namespace { // anonymous namespace static std::map QUERYSTATE_TO_STATUS_MAP = boost::assign::map_list_of (exec::shared::QueryResult_QueryState_STARTING, QRY_PENDING) (exec::shared::QueryResult_QueryState_RUNNING, QRY_RUNNING) (exec::shared::QueryResult_QueryState_COMPLETED, QRY_COMPLETED) (exec::shared::QueryResult_QueryState_CANCELED, QRY_CANCELED) (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED) ; static std::string debugPrintQid(const exec::shared::QueryId& qid){ return std::string("[")+boost::lexical_cast(qid.part1()) +std::string(":") + boost::lexical_cast(qid.part2())+std::string("] "); } // Convertion helper struct ToRpcType: public std::unary_function { exec::user::RpcType operator() (google::protobuf::int32 i) const { return static_cast(i); } }; } connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; if (this->m_bIsConnected) { if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); } return CONN_SUCCESS; } m_connectStr=connStr; Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(protocol == "zk"){ ZookeeperClient zook(pathToDrill); std::vector drillbits; int err = zook.getAllDrillbits(hostPortStr, drillbits); if(!err){ if (drillbits.empty()){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT)); } Utils::shuffle(drillbits); exec::DrillbitEndpoint endpoint; err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list if(!err){ host=boost::lexical_cast(endpoint.address()); port=boost::lexical_cast(endpoint.user_port()); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;) } if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); } zook.close(); m_bIsDirectConnection=true; }else if(protocol == "local"){ boost::lock_guard lock(m_dcMutex);//strtok is not reentrant char tempStr[MAX_CONNECT_STR+1]; strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; host=strtok(tempStr, ":"); port=strtok(NULL, ""); m_bIsDirectConnection=false; }else{ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) std::string serviceHost; for (size_t i = 0; i < props->size(); i++) { if (props->keyAt(i) == USERPROP_SERVICE_HOST) { serviceHost = props->valueAt(i); } } if (serviceHost.empty()) { props->setProperty(USERPROP_SERVICE_HOST, host); } connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); return ret; } connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ using boost::asio::ip::tcp; tcp::endpoint endpoint; try{ tcp::resolver resolver(m_io_service); tcp::resolver::query query(tcp::v4(), host, port); tcp::resolver::iterator iter = resolver.resolve(query); tcp::resolver::iterator end; while (iter != end){ endpoint = *iter++; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;) } boost::system::error_code ec; m_socket.connect(endpoint, ec); if(ec){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str())); } }catch(const std::exception & e){ // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve if (!strcmp(e.what(), "resolve")) { return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what())); } return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, e.what())); } m_bIsConnected=true; // set socket keep alive boost::asio::socket_base::keep_alive keepAlive(true); m_socket.set_option(keepAlive); // set no_delay boost::asio::ip::tcp::no_delay noDelay(true); m_socket.set_option(noDelay); std::ostringstream connectedHost; connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port; m_connectedHost = connectedHost.str(); DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;) return CONN_SUCCESS; } void DrillClientImpl::startHeartbeatTimer(){ if (DrillClientConfig::getHeartbeatFrequency() > 0) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with " << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;) m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency())); m_heartbeatTimer.async_wait(boost::bind( &DrillClientImpl::handleHeartbeatTimeout, this, boost::asio::placeholders::error )); startMessageListener(); // start this thread early so we don't have the timer blocked } } connectionStatus_t DrillClientImpl::sendHeartbeat(){ connectionStatus_t status=CONN_SUCCESS; exec::rpc::Ack ack; ack.set_ok(true); rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); boost::lock_guard prLock(this->m_prMutex); boost::lock_guard lock(m_dcMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;) status=sendSyncCommon(heartbeatMsg); status=status==CONN_SUCCESS?status:CONN_DEAD; //If the server sends responses to a heartbeat, we need to increment the pending requests counter. if(m_pendingRequests++==0){ getNextResult(); // async wait for results } return status; } void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;) if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " << to_simple_string(m_heartbeatTimer.expires_at()) << " and time now is: " << to_simple_string(boost::asio::deadline_timer::traits_type::now()) << std::endl;) ; if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. m_heartbeatTimer.expires_at(boost::posix_time::pos_infin); if(sendHeartbeat()==CONN_SUCCESS){ startHeartbeatTimer(); }else{ // Close connection. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";) shutdownSocket(); //broadcast to any executing queries handleConnError(CONN_FAILURE, getMessage(ERR_QRY_COMMERR, "Connection to drillbit lost.")); } } } return; } void DrillClientImpl::Close() { shutdownSocket(); } /* * Write bytesToWrite length data bytes pointed by dataPtr. It handles EINTR error * occurred during write_some sys call and does a retry on that. * * Parameters: * dataPtr - in param - Pointer to data bytes to write on socket. * bytesToWrite - in param - Length of data bytes to write from dataPtr. * errorCode - out param - Error code set by boost. */ void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode) { if(0 == bytesToWrite) { return; } // Write all the bytes to socket. In case of error when all bytes are not successfully written // proper errorCode will be set. while(1) { size_t bytesWritten = m_socket.write_some(boost::asio::buffer(dataPtr, bytesToWrite), errorCode); if(errorCode && boost::asio::error::interrupted != errorCode){ break; } // Update the state bytesToWrite -= bytesWritten; dataPtr += bytesWritten; // Check if all the data is written then break from loop if(0 == bytesToWrite) break; } } /* * Common wrapper to take care of sending both plain or encrypted message. It creates a send buffer from an * OutboundRPCMessage and then call the send handler pointing to either sendSyncPlain or sendSyncEncrypted * * Return: * connectionStatus_t - CONN_SUCCESS - In case of successful send * - CONN_FAILURE - In case of failure to send */ connectionStatus_t DrillClientImpl::sendSyncCommon(rpc::OutBoundRpcMessage& msg) { encode(m_wbuf, msg); return (this->*m_fpCurrentSendHandler)(); } /* * Send handler for sending plain messages over wire * * Return: * connectionStatus_t - CONN_SUCCESS - In case of successful send * - CONN_FAILURE - In case of failure to send */ connectionStatus_t DrillClientImpl::sendSyncPlain(){ boost::system::error_code ec; doWriteToSocket(reinterpret_cast(m_wbuf.data()), m_wbuf.size(), ec); if(!ec) { return CONN_SUCCESS; } else { return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str())); } } /* * Send handler for sending encrypted messages over wire. It encrypts the send buffer using wrap api provided by * saslAuthenticatorImpl and then transmit the encrypted bytes over wire. * * Return: * connectionStatus_t - CONN_SUCCESS - In case of successful send * - CONN_FAILURE - In case of failure to send */ connectionStatus_t DrillClientImpl::sendSyncEncrypted() { boost::system::error_code ec; // Encoded message is encrypted into chunks of size <= WrapSizeLimit. Each encrypted chunk along with // its encrypted length in network order (added by Cyrus-SASL plugin) is sent over wire. const int wrapChunkSize = m_encryptionCtxt.getWrapSizeLimit(); int lengthToEncrypt = m_wbuf.size(); int currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt); uint32_t currentChunkOffset = 0; std::stringstream errorMsg; // Encrypt and send each chunk while(lengthToEncrypt != 0) { const char* wrappedChunk = NULL; uint32_t wrappedLen = 0; const int wrapResult = m_saslAuthenticator->wrap(reinterpret_cast(m_wbuf.data() + currentChunkOffset), currentChunkLen, &wrappedChunk, wrappedLen); if(SASL_OK != wrapResult) { errorMsg << "Sasl wrap failed while encrypting chunk of length: " << currentChunkLen << " , EncodeError: " << wrapResult; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str() << " ,ChunkOffset: " << currentChunkOffset << ", Message Len: " << m_wbuf.size() << ", Closing connection.";) return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str())); } // Send the encrypted chunk. doWriteToSocket(wrappedChunk, wrappedLen, ec); if(ec) { errorMsg << "Failure while sending encrypted chunk. Error: " << ec.message().c_str(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str() << ", Chunk Length: " << currentChunkLen << ", ChunkOffset:" << currentChunkOffset << ", Message Len: " << m_wbuf.size() << ", Closing connection.";) return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str())); } // Update variables after sending each encrypted chunk lengthToEncrypt -= currentChunkLen; currentChunkOffset += currentChunkLen; currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt); } return CONN_SUCCESS; } connectionStatus_t DrillClientImpl::recvHandshake(){ if(m_rbuf==NULL){ m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE); } m_io_service.reset(); if (DrillClientConfig::getHandshakeTimeout() > 0){ m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHandshakeTimeout())); m_deadlineTimer.async_wait(boost::bind( &DrillClientImpl::handleHShakeReadTimeout, this, boost::asio::placeholders::error )); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with " << DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;) } async_read( this->m_socket, boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN), boost::bind( &DrillClientImpl::handleHandshake, this, m_rbuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";) m_io_service.run(); if(m_rbuf!=NULL){ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; } if (m_pError != NULL) { DRILL_MT_LOG(DRILL_LOG(LOG_ERROR) << "DrillClientImpl::recvHandshake: failed to complete handshake with server." << m_pError->msg << "\n";) return static_cast(m_pError->status); } startHeartbeatTimer(); return CONN_SUCCESS; } /* * Read bytesToRead length data bytes from socket into inBuf. It handles EINTR error * occurred during read_some sys call and does a retry on that. * * Parameters: * inBuf - out param - Pointer to buffer to read data into from socket. * bytesToRead - in param - Length of data bytes to read from socket. * errorCode - out param - Error code set by boost. */ void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode) { // Check if bytesToRead is zero if(0 == bytesToRead) { return; } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket read: reading " << bytesToRead << "data bytes" << std::endl;) // Read all the bytes. In case when all the bytes were not read the proper // errorCode will be set. while(1){ size_t dataBytesRead = m_socket.read_some(boost::asio::buffer(inBuf, bytesToRead), errorCode); // Check if errorCode is EINTR then just retry otherwise break from loop if(errorCode && boost::asio::error::interrupted != errorCode){ break; } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket read: actual bytes read = " << dataBytesRead << std::endl;) // Update the state bytesToRead -= dataBytesRead; inBuf += dataBytesRead; // Check if all the data is read then break from loop if(0 == bytesToRead) break; } } void DrillClientImpl::handleHandshake(ByteBuf_t inBuf, const boost::system::error_code& err, size_t bytes_transferred) { boost::system::error_code error=err; // cancel the timer m_deadlineTimer.cancel(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;) if(!error){ rpc::InBoundRpcMessage msg; uint32_t length = 0; std::size_t bytes_read = rpcLengthDecode(m_rbuf, length); if(length>0){ const size_t leftover = LEN_PREFIX_BUFLEN - bytes_read; const ByteBuf_t b = m_rbuf + LEN_PREFIX_BUFLEN; const size_t bytesToRead=length - leftover; doReadFromSocket(b, bytesToRead, error); // Check if any error happen while reading the message bytes. If yes then return before decoding the Msg if(error) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. " << " Failed to read entire handshake message. with error: " << error.message().c_str() << "\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Failed to read entire handshake message")); return; } // Decode the bytes into a valid RPC Message if (!decode(m_rbuf+bytes_read, length, msg)) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake")); return; } }else{ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); return; } exec::user::BitToUserHandshake b2u; b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); this->m_handshakeVersion=b2u.rpc_version(); this->m_handshakeStatus=b2u.status(); this->m_handshakeErrorId=b2u.errorid(); this->m_handshakeErrorMsg=b2u.errormessage(); this->m_serverInfos = b2u.server_infos(); std::transform(b2u.supported_methods().begin(), b2u.supported_methods().end(), std::back_inserter(this->m_supportedMethods), ToRpcType()); for (int i=0; im_serverAuthMechanisms.push_back(mechanism); } // Updated encryption context based on server response this->m_encryptionCtxt.setEncryptionReqd(b2u.has_encrypted() && b2u.encrypted()); if(b2u.has_maxwrappedsize()) { this->m_encryptionCtxt.setMaxWrappedSize(b2u.maxwrappedsize()); } }else{ // boost error if(error==boost::asio::error::eof){ // Server broke off the connection handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION)); }else{ handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str())); } return; } return; } void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer. if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: " << "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";) handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT)); m_io_service.stop(); boost::system::error_code ignorederr; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); } } return; } /* * Check's if client has explicitly expressed interest in encrypted connections only. It looks for USERPROP_SASL_ENCRYPT * connection string property. If set to true then returns true else returns false */ bool DrillClientImpl::clientNeedsEncryption(const DrillUserProperties* userProperties) { bool needsEncryption = false; // check if userProperties is null if(!userProperties) { return needsEncryption; } std::string val; needsEncryption = userProperties->isPropSet(USERPROP_SASL_ENCRYPT) && boost::iequals(userProperties->getProp(USERPROP_SASL_ENCRYPT, val), "true") ; return needsEncryption; } connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";) exec::user::UserToBitHandshake u2b; u2b.set_channel(exec::shared::USER); u2b.set_rpc_version(DRILL_RPC_VERSION); u2b.set_support_listening(true); u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0); u2b.set_sasl_support(exec::user::SASL_PRIVACY); // Adding version info exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos(); infos->set_name(DrillClientConfig::getClientName()); infos->set_application(DrillClientConfig::getApplicationName()); infos->set_version(DRILL_VERSION_STRING); infos->set_majorversion(DRILL_VERSION_MAJOR); infos->set_minorversion(DRILL_VERSION_MINOR); infos->set_patchversion(DRILL_VERSION_PATCH); if(properties != NULL && properties->size()>0){ std::string username; std::string err; if(!properties->validate(err)){ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;) } exec::user::UserProperties* userProperties = u2b.mutable_properties(); std::map::iterator it; for (std::map::const_iterator propIter=properties->begin(); propIter!=properties->end(); ++propIter){ std::string currKey=propIter->first; std::string currVal=propIter->second; std::map::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(currKey); if(it==DrillUserProperties::USER_PROPERTIES.end()){ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connection property ("<< currKey << ") is unknown" << std::endl;) exec::user::Property* connProp = userProperties->add_properties(); connProp->set_key(currKey); connProp->set_value(currVal); continue; } if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){ exec::user::Property* connProp = userProperties->add_properties(); connProp->set_key(currKey); connProp->set_value(currVal); //Username(but not the password) also needs to be set in UserCredentials if(IS_BITSET((*it).second,USERPROP_FLAGS_USERNAME)){ exec::shared::UserCredentials* creds = u2b.mutable_credentials(); username=currVal; creds->set_user_name(username); //u2b.set_credentials(&creds); } if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << currKey << ": ********** " << std::endl;) }else{ DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << currKey << ":" << currVal << std::endl;) } }// Server properties } } { boost::lock_guard lock(this->m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSyncCommon(out_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";) } connectionStatus_t ret = recvHandshake(); if(ret!=CONN_SUCCESS){ return ret; } switch(this->m_handshakeStatus) { case exec::user::SUCCESS: // reset io_service after handshake is validated before running queries m_io_service.reset(); return CONN_SUCCESS; case exec::user::RPC_VERSION_MISMATCH: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected " << DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;) return handleConnError(CONN_BAD_RPC_VER, getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION, m_handshakeVersion, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::AUTH_FAILED: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;) return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_AUTHFAIL, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::UNKNOWN_FAILURE: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;) return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); case exec::user::AUTH_REQUIRED: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server requires SASL authentication." << std::endl;) return handleAuthentication(properties); default: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown return status." << std::endl;) return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR, this->m_handshakeErrorId.c_str(), this->m_handshakeErrorMsg.c_str())); } } connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperties *userProperties) { // Check if client needs encryption and server is configured for encryption or not before starting handshake if(clientNeedsEncryption(userProperties) && !m_encryptionCtxt.isEncryptionReqd()) { return handleConnError(CONN_AUTH_FAILED, "Client needs encryption but on server side encryption is disabled." " Please check connection parameters or contact administrator?"); } try { m_saslAuthenticator = new SaslAuthenticatorImpl(userProperties); } catch (std::runtime_error& e) { return handleConnError(CONN_AUTH_FAILED, e.what()); } startMessageListener(); initiateAuthentication(); { // block until SASL exchange is complete boost::mutex::scoped_lock lock(m_saslMutex); while (!m_saslDone) { m_saslCv.wait(lock); } } std::stringstream logMsg; logMsg << "DrillClientImpl::handleAuthentication: Authentication failed. [Details: "; if (SASL_OK == m_saslResultCode) { // Check the negotiated SSF value and change the handlers. if(m_encryptionCtxt.isEncryptionReqd()) { if(SASL_OK != m_saslAuthenticator->verifyAndUpdateSaslProps()) { logMsg << m_encryptionCtxt << "]. Negotiated Parameter is invalid." << " Error: " << m_saslResultCode; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;) return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str()); } // Successfully negotiated for encryption related security parameters. // Start using Encrypt and Decrypt handlers. m_fpCurrentSendHandler = &DrillClientImpl::sendSyncEncrypted; m_fpCurrentReadMsgHandler = &DrillClientImpl::readAndDecryptMsg; } // Reset the errorMsg stream since this is success case. logMsg.str(std::string()); logMsg << "DrillClientImpl::handleAuthentication: Successfully authenticated! [Details: " << m_encryptionCtxt << " ]"; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;) m_io_service.reset(); return CONN_SUCCESS; } else { logMsg << m_encryptionCtxt << ", Error: " << m_saslResultCode; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;) // shuts down socket as well logMsg << "]. Check connection parameters?"; return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str()); } } void DrillClientImpl::initiateAuthentication() { exec::shared::SaslMessage response; m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response, &m_encryptionCtxt); switch (m_saslResultCode) { case SASL_CONTINUE: case SASL_OK: { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::initiateAuthentication: initiated. " << std::endl;) boost::lock_guard prLock(m_prMutex); sendSaslResponse(response); // the challenge returned by server is handled by processSaslChallenge break; } case SASL_NOMECH: DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::initiateAuthentication: " << "Mechanism is not supported (by server/client)." << std::endl;) default: DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::initiateAuthentication: " << "Failed to initiate authentication." << std::endl;) finishAuthentication(); break; } } void DrillClientImpl::sendSaslResponse(const exec::shared::SaslMessage& response) { boost::lock_guard lock(m_dcMutex); const int32_t coordId = getNextCoordinationId(); rpc::OutBoundRpcMessage msg(exec::rpc::REQUEST, exec::user::SASL_MESSAGE, coordId, &response); sendSyncCommon(msg); if (m_pendingRequests++ == 0) { getNextResult(); } } void DrillClientImpl::processSaslChallenge(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg) { boost::shared_ptr deallocationGuard(allocatedBuffer); assert(m_saslAuthenticator != NULL); // parse challenge exec::shared::SaslMessage challenge; const bool parseStatus = challenge.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); if (!parseStatus) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Failed to parse challenge." << std::endl;) m_saslResultCode = SASL_FAIL; finishAuthentication(); m_pendingRequests--; return; } // respond accordingly exec::shared::SaslMessage response; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSaslChallenge: status: " << exec::shared::SaslStatus_Name(challenge.status()) << std::endl;) switch (challenge.status()) { case exec::shared::SASL_IN_PROGRESS: m_saslResultCode = m_saslAuthenticator->step(challenge, response); if (m_saslResultCode == SASL_CONTINUE || m_saslResultCode == SASL_OK) { sendSaslResponse(response); } else { // failure finishAuthentication(); } break; case exec::shared::SASL_SUCCESS: if (SASL_CONTINUE == m_saslResultCode) { // client may need to evaluate once more m_saslResultCode = m_saslAuthenticator->step(challenge, response); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "SASL succeeded on client? " << m_saslResultCode << std::endl;) } finishAuthentication(); break; default: m_saslResultCode = SASL_FAIL; finishAuthentication(); break; } m_pendingRequests--; } void DrillClientImpl::finishAuthentication() { boost::mutex::scoped_lock lock(m_saslMutex); m_saslDone = true; m_saslCv.notify_one(); } FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector)); void DrillClientImpl::startMessageListener() { if(this->m_pListenerThread==NULL){ // Stopping the io_service from running out-of-work if(m_io_service.stopped()){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <m_pWork = new boost::asio::io_service::work(m_io_service); this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service)); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: " << this->m_pListenerThread << std::endl;) } } DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener l, void* lCtx){ exec::user::RunQuery query; query.set_results_mode(exec::user::STREAM_FULL); query.set_type(t); query.set_plan(plan); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, boost::cref(plan), l, lCtx); return sendMsg(factory, ::exec::user::RUN_QUERY, query); } DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener l, void* lCtx){ exec::user::CreatePreparedStatementReq query; query.set_sql_query(plan); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, boost::cref(plan), l, lCtx); return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query); } DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener l, void* lCtx){ const DrillClientPrepareHandle& handle = static_cast(pstmt); exec::user::RunQuery query; query.set_results_mode(exec::user::STREAM_FULL); query.set_type(::exec::shared::PREPARED_STATEMENT); query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle)); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, boost::cref(handle.m_query), l, lCtx); return sendMsg(factory, ::exec::user::RUN_QUERY, query); } static void updateLikeFilter(exec::user::LikeFilter& likeFilter, const std::string& pattern, const std::string& searchEscapeString) { likeFilter.set_pattern(pattern); likeFilter.set_escape(searchEscapeString); } DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern, const std::string& searchEscapeString, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx) { exec::user::GetCatalogsReq query; updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, listener, listenerCtx); return sendMsg(factory, ::exec::user::GET_CATALOGS, query); } DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& searchEscapeString, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx) { exec::user::GetSchemasReq query; updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString); updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, listener, listenerCtx); return sendMsg(factory, ::exec::user::GET_SCHEMAS, query); } DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector* tableTypes, const std::string& searchEscapeString, Metadata::pfnTableMetadataListener listener, void* listenerCtx) { exec::user::GetTablesReq query; updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString); updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString); updateLikeFilter(*query.mutable_table_name_filter(), tablePattern, searchEscapeString); if (tableTypes) { std::copy(tableTypes->begin(), tableTypes->end(), google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter())); } boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, listener, listenerCtx); return sendMsg(factory, ::exec::user::GET_TABLES, query); } DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::string& columnsPattern, const std::string& searchEscapeString, Metadata::pfnColumnMetadataListener listener, void* listenerCtx) { exec::user::GetColumnsReq query; updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString); updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString); updateLikeFilter(*query.mutable_table_name_filter(), tablePattern, searchEscapeString); updateLikeFilter(*query.mutable_column_name_filter(), columnsPattern, searchEscapeString); boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, listener, listenerCtx); return sendMsg(factory, ::exec::user::GET_COLUMNS, query); } template Handle* DrillClientImpl::sendMsg(boost::function handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) { int32_t coordId; Handle* phandle=NULL; connectionStatus_t cStatus=CONN_SUCCESS; { boost::lock_guard prLock(this->m_prMutex); boost::lock_guard dcLock(this->m_dcMutex); coordId = this->getNextCoordinationId(); rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message); phandle = handleFactory(coordId); this->m_queryHandles[coordId]=phandle; connectionStatus_t cStatus = sendSyncCommon(out_msg); if(cStatus == CONN_SUCCESS){ bool sendRequest=false; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;) if(m_pendingRequests++==0){ sendRequest=true; }else{ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) } if(sendRequest){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = " << m_pendingRequests << std::endl;) getNextResult(); // async wait for results } } } if(cStatus!=CONN_SUCCESS){ this->m_queryHandles.erase(coordId); delete phandle; return NULL; } //run this in a new thread startMessageListener(); return phandle; } void DrillClientImpl::getNextResult(){ // This call is always made from within a function where the mutex has already been acquired //boost::lock_guard lock(this->m_dcMutex); { boost::unique_lock memLock(AllocatedBuffer::s_memCVMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;) while(AllocatedBuffer::s_isBufferLimitReached){ AllocatedBuffer::s_memCV.wait(memLock); } } //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with " << DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;) m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout())); m_deadlineTimer.async_wait(boost::bind( &DrillClientImpl::handleReadTimeout, this, boost::asio::placeholders::error )); } startHeartbeatTimer(); async_read( this->m_socket, boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN), boost::bind( &DrillClientImpl::handleRead, this, readBuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) ); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";) } void DrillClientImpl::waitForResults(){ // The listener thread never exists because it may be sending/receiving a heartbeat. Before the heartbeat was introduced // we could check if the listener thread has exited to tell if the queries are done. We can no longer do so now. We check // a condition variable instead { boost::unique_lock cvLock(this->m_dcMutex); //if no more data, return NULL; while(this->m_pendingRequests>0) { this->m_cv.wait(cvLock); } } } /* * Decode the length of the message from bufWithLen and then read entire message from the socket. * Parameters: * bufWithLenField - in param - buffer containing the length of the RPC message/encrypted chunk * bufferWithDataAndLenBytes - out param - buffer pointer which points to memory allocated in this function and has the * entire one RPC message / encrypted chunk along with the length of the message. * Memory for this buffer is released by caller. * lengthFieldLength - out param - bytes of bufWithLen which contains the length of the entire RPC message or * encrypted chunk * lengthDecodeHandler - in param - function pointer with length decoder to use. For encrypted chunk we use * lengthDecode and for plain RPC message we use rpcLengthDecode. * Return: * status_t - QRY_SUCCESS - In case of success. * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error. */ status_t DrillClientImpl::readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes, uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler) { uint32_t rmsgLen = 0; boost::system::error_code error; *bufferWithDataAndLenBytes = NULL; // Decode the length field lengthFieldLength = (this->*lengthDecodeHandler)(bufWithLenField, rmsgLen); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Length bytes = " << lengthFieldLength << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Msg Length = " << rmsgLen << std::endl;) if(rmsgLen>0) { const size_t leftover = LEN_PREFIX_BUFLEN - lengthFieldLength; // Allocate a buffer for reading all the bytes in bufWithLen and length number of bytes. const size_t bufferSizeWithLenBytes = rmsgLen + lengthFieldLength; *bufferWithDataAndLenBytes = new AllocatedBuffer(bufferSizeWithLenBytes); if(*bufferWithDataAndLenBytes == NULL) { return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readLenBytesFromSocket: Allocated and locked buffer: [ " << *bufferWithDataAndLenBytes << ", size = " << bufferSizeWithLenBytes << " ]\n";) // Copy the memory of bufWithLen into bufferWithLenBytesSize memcpy((*bufferWithDataAndLenBytes)->m_pBuffer, bufWithLenField, LEN_PREFIX_BUFLEN); const size_t bytesToRead = rmsgLen - leftover; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Copied bufWithLen into bufferWithLenBytes. " << "Now reading data (rmsgLen - leftover) : " << bytesToRead << std::endl;) // Read the entire data left from socket and copy to currentBuffer. const ByteBuf_t b = (*bufferWithDataAndLenBytes)->m_pBuffer + LEN_PREFIX_BUFLEN; doReadFromSocket(b, bytesToRead, error); } else { return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL); } return error ? handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL) : QRY_SUCCESS; } /* * Function to read entire RPC message from socket and decode it to InboundRpcMessage * Parameters: * inBuf - in param - Buffer containing the length bytes. * allocatedBuffer - out param - Buffer containing the length bytes and entire RPC message bytes. * msg - out param - Decoded InBoundRpcMessage from the bytes in allocatedBuffer * Return: * status_t - QRY_SUCCESS - In case of success. * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error. */ status_t DrillClientImpl::readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " << reinterpret_cast(inBuf) << std::endl;) *allocatedBuffer = NULL; { // We need to protect the readLength and read buffer, and the pending requests counter, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard lock(this->m_dcMutex); uint32_t lengthFieldSize = 0; // Read the message length and extract length size bytes to form InBoundRpcMessage const status_t statusCode = readLenBytesFromSocket(inBuf, allocatedBuffer, lengthFieldSize, &DrillClientImpl::rpcLengthDecode); // Check for error conditions if(QRY_SUCCESS != statusCode) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return statusCode; } // Get the message size size_t msgLen = (*allocatedBuffer)->m_bufSize; // Read data successfully, now let's try to decode the buffer and form a valid RPC message. // allocatedBuffer also contains the length bytes which is not needed by decodes so skip that part of buffer. // We have it since in case of encryption the unwrap function expects it if (!decode((*allocatedBuffer)->m_pBuffer + lengthFieldSize, msgLen - lengthFieldSize, msg)) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: " << msg.m_coord_id << std::endl;) } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer " << reinterpret_cast(inBuf) << std::endl;) Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } /* * Read ENCRYPT_LEN_PREFIX_BUFLEN bytes to decode length of one complete encrypted chunk. The length bytes are expected * to be in network order. It is converted to host order and the value is stored in rmsgLen parameter. * Parameters: * inBuf - in param - ByteBuf_t containing atleast the length bytes. * rmsgLen - out param - Contain the decoded value of length. * Return: * size_t - length bytes read to decode */ size_t DrillClientImpl::lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) { memcpy(&rmsgLen, inBuf, ENCRYPT_LEN_PREFIX_BUFLEN); rmsgLen = ntohl(rmsgLen); return ENCRYPT_LEN_PREFIX_BUFLEN; } /* * Wrapper which uses RPC message length decoder to get length of one complete RPC message from _buf. * Parameters: * inBuf - in param - ByteBuf_t containing atleast the length bytes. * rmsgLen - out param - Contain the decoded value of length. * Return: * size_t - length bytes read to decode */ size_t DrillClientImpl::rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) { return rpc::lengthDecode(inBuf, rmsgLen); } /* * Read all the encrypted chunk needed to form a complete RPC message. Read an entire chunk from network, decrypt it * and put in a buffer. The same process is repeated until the entire buffer to form a completed RPC message is read. * Parameters: * inBuf - in param - ByteBuf_t containing atleast the length bytes. * allocatedBuffer - out param - Buffer containing the entire RPC message bytes which is formed by reading all the * required encrypted chunk from network and decrypting each individual chunk. The * buffer memory is released by caller. .* msg - out param - InBoundRpcMessage formed from bytes in allocatedBuffer * Return: * status_t - QRY_SUCCESS - In case of success. * - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error. */ status_t DrillClientImpl::readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Read message from buffer " << reinterpret_cast(inBuf) << std::endl;) size_t leftover = 0; uint32_t rpcMsgLen = 0; size_t bytes_read = 0; uint32_t writeIndex = 0; size_t bytesToRead = 0; *allocatedBuffer = NULL; boost::system::error_code error; std::stringstream errorMsg; { // We need to protect the readLength and read buffer, and the pending requests counter, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard lock(this->m_dcMutex); do{ AllocatedBufferPtr currentBuffer = NULL; uint32_t lengthFieldSize = 0; const status_t statusCode = readLenBytesFromSocket(inBuf, ¤tBuffer, lengthFieldSize, &DrillClientImpl::lengthDecode); if(QRY_SUCCESS != statusCode) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); // Release the buffer allocated to hold chunk if(currentBuffer != NULL) { Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize); currentBuffer = NULL; } return statusCode; } // read one chunk successfully. Let's try to decrypt the message const char* unWrappedData = NULL; uint32_t unWrappedLen = 0; const int decryptResult = m_saslAuthenticator->unwrap(reinterpret_cast(currentBuffer->m_pBuffer), currentBuffer->m_bufSize, &unWrappedData, unWrappedLen); if(SASL_OK != decryptResult) { errorMsg << "Sasl unwrap failed for the buffer of size:" << currentBuffer->m_bufSize << " , Error: " << decryptResult; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: " << errorMsg.str() << std::endl;) Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); // Release the buffer allocated to hold chunk Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize); currentBuffer = NULL; return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL); } // Check for case if the unWrappedLen is 0, since Cyrus SASL plugin verifies if the length of wrapped data // is less than the length specified by prepended 4 octets as per RFC 4422/2222. If so it just returns // and waits for more data if(unWrappedLen == 0 || (unWrappedData == NULL)) { errorMsg << "Sasl unwrap failed with mismatch in length of wrapped data and the prepended length value"; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: " << errorMsg.str() << std::endl;) Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); // Release the buffer allocated to hold chunk Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize); currentBuffer = NULL; return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Successfully decrypted the buffer" << " Sizes - Before Decryption = " << currentBuffer->m_bufSize << " and After Decryption = " << unWrappedLen << std::endl;) // Release the buffer allocated to hold chunk Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize); currentBuffer = NULL; bytes_read = 0; if(*allocatedBuffer == NULL) { // This is the first chunk of the RPC message. We will decode the RPC message full length bytes_read = rpcLengthDecode(reinterpret_cast(const_cast(unWrappedData)), rpcMsgLen); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length bytes = " << bytes_read << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length = " << rpcMsgLen << std::endl;) if(rpcMsgLen == 0) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL); } // Allocate a buffer for storing full RPC message. This is released by the caller *allocatedBuffer = new AllocatedBuffer(rpcMsgLen); if(*allocatedBuffer == NULL){ Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Allocated and locked buffer:" << "[ " << *allocatedBuffer << ", size = " << rpcMsgLen << " ]\n";) bytesToRead = rpcMsgLen; } // Update the leftover bytes that is not copied yet leftover = unWrappedLen - bytes_read; // Copy rest of decrypted message to the buffer. We can do this since it is assured that one // entire decrypted chunk is part of the same RPC message. if(leftover) { memcpy((*allocatedBuffer)->m_pBuffer + writeIndex, unWrappedData + bytes_read, leftover); } // Update bytes left to read to form full RPC message. bytesToRead -= leftover; writeIndex += leftover; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Left to read unencrypted data" << " of length (bytesToRead) : " << bytesToRead << std::endl;) if(bytesToRead > 0) { // Read synchronously buffer of size LEN_PREFIX_BUFLEN to get length of next chunk doReadFromSocket(inBuf, LEN_PREFIX_BUFLEN, error); if(error) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); } } }while(bytesToRead > 0); // more chunks to read for entire RPC message DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Done decrypting entire RPC message " << " of length: " << rpcMsgLen << ". Now starting decode:" << std::endl;) // Decode the buffer and form a RPC message if (!decode((*allocatedBuffer)->m_pBuffer, rpcMsgLen, msg)) { Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, "Cannot decode server message into valid RPC message"), NULL); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: " << msg.m_coord_id << std::endl;) } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Free buffer " << reinterpret_cast(inBuf) << std::endl;) Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; exec::shared::QueryId qid; sendAck(msg, true); { boost::lock_guard lock(this->m_dcMutex); exec::shared::QueryResult qr; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;) qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;) qid.CopyFrom(qr.query_id()); if (qr.has_query_state() && qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING && qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) { pDrillClientQueryResult=findQueryResult(qid); //Queries that have been cancelled or whose resources are freed before completion //do not have a DrillClientQueryResult object. We need not handle the terminal message //in that case since all it does is to free resources (and they have already been freed) if(pDrillClientQueryResult!=NULL){ //Validate the RPC message std::string valErr; if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;) return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } ret=processQueryStatusResult(&qr, pDrillClientQueryResult); }else{ // We've received the final message for a query that has been cancelled // or for which the resources have been freed. We no longer need to listen // for more incoming messages for such a query. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;) m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;) ret=QRY_CANCELED; } delete allocatedBuffer; //return ret; }else{ // Normal query results come back with query_state not set. // Actually this is not strictly true. The query state is set to // 0(i.e. PENDING), but protobuf thinks this means the value is not set. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";) } } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; // Be a good client and send ack as early as possible. // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message sendAck(msg, true); RecordBatch* pRecordBatch=NULL; { boost::lock_guard lock(this->m_dcMutex); exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up. DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;) qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) const ::exec::shared::QueryId& qid = qr->query_id(); if(qid.part1()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; return QRY_SUCCESS; } pDrillClientQueryResult=findQueryResult(qid); if(pDrillClientQueryResult==NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" << debugPrintQid(qid) << ")." << std::endl;) delete qr; delete allocatedBuffer; return ret; } // check if query has been cancelled if (pDrillClientQueryResult->isCancelled()) { DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " << std::endl;) delete qr; delete allocatedBuffer; ret = QRY_CANCELED; } else { //Validate the RPC message std::string valErr; if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ delete allocatedBuffer; delete qr; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) pDrillClientQueryResult->setQueryStatus(ret); return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } //Build Record Batch here DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qid) << std::endl;) pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); pDrillClientQueryResult->m_numBatches++; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) pRecordBatch->build(); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords " << pRecordBatch->getNumRecords() << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields " << pRecordBatch->getNumFields() << std::endl;) ret=pDrillClientQueryResult->setupColumnDefs(qr); if(ret==QRY_SUCCESS_WITH_INFO){ pRecordBatch->schemaChanged(true); } pDrillClientQueryResult->setIsQueryPending(true); if(pDrillClientQueryResult->m_bIsLastChunk){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid) << "Received last batch. " << std::endl;) ret=QRY_NO_MORE_DATA; } pDrillClientQueryResult->setQueryStatus(ret); ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } } // release lock if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){ return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); { boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ DrillClientQueryResult* pQueryResult=it->second; std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL"); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pQueryResult->m_coordinationId << " QueryId: "<< qidString << std::endl;) } std::map::const_iterator it; it=this->m_queryHandles.find(msg.m_coord_id); if(it==this->m_queryHandles.end()){ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } pDrillClientQueryResult=dynamic_cast((*it).second); if (!pDrillClientQueryResult) { return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } // Check for cancellation to notify if (pDrillClientQueryResult->isCancelled()) { ret = QRY_CANCELED; } else { exec::shared::QueryId *qid = new exec::shared::QueryId; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) m_queryResults[qid]=pDrillClientQueryResult; //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); } } if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) { return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast((*it).second); if (!validateResultRPCType(pDrillClientPrepareHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for prepared statement.", pDrillClientPrepareHandle); } exec::user::CreatePreparedStatementResp resp; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;) if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) { return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle); } if (resp.has_status() && resp.status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle); } if (QRY_SUCCESS != pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement())){ return handleQryError(QRY_FAILED, "Error during prepared statement setup.", pDrillClientPrepareHandle); } pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientCatalogResult* pHandle=static_cast((*it).second); if (!validateResultRPCType(pHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcatalogs results.", pHandle); } exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp; pHandle->attachMetadataResult(resp); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;) if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle); } if (resp->status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp->error(), pHandle); } const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs(); pHandle->m_meta.clear(); pHandle->m_meta.reserve(resp->catalogs_size()); for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) { meta::DrillCatalogMetadata meta(*it); pHandle->m_meta.push_back(meta); } pHandle->notifyListener(&pHandle->m_meta, NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result - " << resp->catalogs_size() << " catalog(s)" << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientSchemaResult* pHandle=static_cast((*it).second); if (!validateResultRPCType(pHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getschemas results.", pHandle); } exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp(); pHandle->attachMetadataResult(resp); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;) if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle); } if (resp->status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp->error(), pHandle); } const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas(); pHandle->m_meta.clear(); pHandle->m_meta.reserve(resp->schemas_size()); for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) { meta::DrillSchemaMetadata meta(*it); pHandle->m_meta.push_back(meta); } pHandle->notifyListener(&pHandle->m_meta, NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientTableResult* pHandle=static_cast((*it).second); if (!validateResultRPCType(pHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for gettables results.", pHandle); } exec::user::GetTablesResp* resp = new exec::user::GetTablesResp(); pHandle->attachMetadataResult(resp); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;) if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle); } if (resp->status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp->error(), pHandle); } const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables(); pHandle->m_meta.clear(); pHandle->m_meta.reserve(resp->tables_size()); for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) { meta::DrillTableMetadata meta(*it); pHandle->m_meta.push_back(meta); } pHandle->notifyListener(&pHandle->m_meta, NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientColumnResult* pHandle=static_cast((*it).second); if (!validateResultRPCType(pHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcolumns results.", pHandle); } exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp(); pHandle->attachMetadataResult(resp); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;) if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle); } if (resp->status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp->error(), pHandle); } const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns(); pHandle->m_meta.clear(); pHandle->m_meta.reserve(resp->columns_size()); for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { meta::DrillColumnMetadata meta(*it); pHandle->m_meta.push_back(meta); } pHandle->notifyListener(&pHandle->m_meta, NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetServerMetaResp with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); boost::lock_guard lock(m_dcMutex); if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processServerMetaResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } std::map::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); if(it!=this->m_queryHandles.end()){ DrillClientServerMetaHandle* pHandle=static_cast((*it).second); if (!validateResultRPCType(pHandle, msg)){ return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for GetServerMetaResp results.", pHandle); } exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;) if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { return handleQryError(QRY_COMM_ERROR, "Cannot decode GetServerMetaResp results", pHandle); } if (resp->status() != exec::user::OK) { return handleQryError(QRY_FAILED, resp->error(), pHandle); } pHandle->notifyListener(&(resp->server_meta()), NULL); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetServerMetaResp result " << std::endl;) }else{ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } m_pendingRequests--; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processServerMetaResult: " << m_pendingRequests << " requests pending." << std::endl;) if(m_pendingRequests==0){ // signal any waiting client that it can exit because there are no more any query results to arrive. // We keep the heartbeat going though. m_cv.notify_one(); } return ret; } DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){ DrillClientQueryResult* pDrillClientQueryResult=NULL; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;) std::map::iterator it; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;) if(m_queryResults.size() != 0){ for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" << it->first->part2() << "]\n";) } } it=this->m_queryResults.find(const_cast(&qid)); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;) } return pDrillClientQueryResult; } status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr, DrillClientQueryResult* pDrillClientQueryResult){ status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; if(pDrillClientQueryResult!=NULL){ pDrillClientQueryResult->setQueryStatus(ret); pDrillClientQueryResult->setQueryState(qr->query_state()); } switch(qr->query_state()) { case exec::shared::QueryResult_QueryState_FAILED: { // get the error message from protobuf and handle errors ret = (0 == qr->error_size()) ? handleQryError(ret, "Unknown protobuf error.", pDrillClientQueryResult) : handleQryError(ret, qr->error(0), pDrillClientQueryResult); } break; // m_pendingRequests should be decremented when the query is // completed case exec::shared::QueryResult_QueryState_CANCELED: { ret=handleTerminatedQryState(ret, getMessage(ERR_QRY_CANCELED), pDrillClientQueryResult); m_pendingRequests--; } break; case exec::shared::QueryResult_QueryState_COMPLETED: { //Not clean to call the handleTerminatedQryState method //because it signals an error to the listener. //The ODBC driver expects this though and the sync API //handles this (luckily). ret=handleTerminatedQryState(ret, getMessage(ERR_QRY_COMPLETED), pDrillClientQueryResult); m_pendingRequests--; } break; default: { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";) ret=handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_UNKQRYSTATE), pDrillClientQueryResult); } break; } return ret; } void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer. if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){ // The deadline has passed. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";) handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL); // There is no longer an active deadline. The expiry is set to positive // infinity so that the timer never expires until a new deadline is set. // Note that at this time, the caller is not in a (async) wait for the timer. m_deadlineTimer.expires_at(boost::posix_time::pos_infin); // Cancel all pending async IOs. // The cancel call _MAY_ not work on all platforms. To be a little more reliable we need // to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?) // defined. To be really sure, we need to close the socket. Closing the socket is a bit // drastic and we will defer that till a later release. #ifdef WIN32_SHUTDOWN_ON_TIMEOUT boost::system::error_code ignorederr; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); #else // NOT WIN32_SHUTDOWN_ON_TIMEOUT m_socket.cancel(); #endif // WIN32_SHUTDOWN_ON_TIMEOUT } } return; } void DrillClientImpl::handleRead(ByteBuf_t inBuf, const boost::system::error_code& error, size_t bytes_transferred) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " << reinterpret_cast(inBuf) << std::endl;) if(DrillClientConfig::getQueryTimeout() > 0){ // Cancel the timeout if handleRead is called DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";) m_deadlineTimer.cancel(); } if (error) { // boost error Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN); boost::lock_guard lock(this->m_dcMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " "Boost Communication Error: " << error.message() << std::endl;) handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; } rpc::InBoundRpcMessage msg; boost::lock_guard lockPR(this->m_prMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) AllocatedBufferPtr allocatedBuffer=NULL; if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){ delete allocatedBuffer; if(m_pendingRequests!=0){ boost::lock_guard lock(this->m_dcMutex); getNextResult(); } return; } if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away m_pendingRequests--; delete allocatedBuffer; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) if(m_pendingRequests!=0){ boost::lock_guard lock(this->m_dcMutex); getNextResult(); }else{ boost::unique_lock cvLock(this->m_dcMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) m_cv.notify_one(); } return; } if(msg.m_mode == exec::rpc::RESPONSE) { status_t s; switch(msg.m_rpc_type) { case exec::user::QUERY_HANDLE: s = processQueryId(allocatedBuffer, msg); break; case exec::user::PREPARED_STATEMENT: s = processPreparedStatement(allocatedBuffer, msg); break; case exec::user::CATALOGS: s = processCatalogsResult(allocatedBuffer, msg); break; case exec::user::SCHEMAS: s = processSchemasResult(allocatedBuffer, msg); break; case exec::user::TABLES: s = processTablesResult(allocatedBuffer, msg); break; case exec::user::COLUMNS: s = processColumnsResult(allocatedBuffer, msg); break; case exec::user::HANDSHAKE: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) delete allocatedBuffer; break; case exec::user::SASL_MESSAGE: processSaslChallenge(allocatedBuffer, msg); break; case exec::user::SERVER_META: processServerMetaResult(allocatedBuffer, msg); break; case exec::user::ACK: // Cancel requests will result in an ACK sent back. // Consume silently s = QRY_CANCELED; delete allocatedBuffer; break; default: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " << "QueryResult returned " << msg.m_rpc_type << std::endl;) delete allocatedBuffer; handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } if (m_pendingRequests != 0) { boost::lock_guard lock(this->m_dcMutex); getNextResult(); } return; } if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) { status_t s; switch(msg.m_rpc_type) { case exec::user::QUERY_RESULT: s = processQueryResult(allocatedBuffer, msg); break; case exec::user::QUERY_DATA: s = processQueryData(allocatedBuffer, msg); break; default: DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " << "QueryResult returned " << msg.m_rpc_type << std::endl;) delete allocatedBuffer; handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } if (m_pendingRequests != 0) { boost::lock_guard lock(this->m_dcMutex); getNextResult(); } return; } // If not QUERY_RESULT, then we think something serious has gone wrong? DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;) handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); delete allocatedBuffer; } status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; } if(qd.def().carries_two_byte_selection_vector() == true){ valErr=getMessage(ERR_QRY_SELVEC2); return QRY_FAILURE; } return QRY_SUCCESS; } status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; } if(qr.query_state()==exec::shared::QueryResult_QueryState_CANCELED){ valErr=getMessage(ERR_QRY_CANCELED); return QRY_FAILURE; } return QRY_SUCCESS; } bool DrillClientImpl::validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::validateResultRPCType" << std::endl;) if (NULL != pQueryHandle) { DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::validateResultRPCType: Expected RPC Type: " << pQueryHandle->getExpectedRPCType() << " inbound RPC Type: " << msg.m_rpc_type << std::endl;) return (pQueryHandle->getExpectedRPCType() == msg.m_rpc_type); } return false; } /* * Called when there is failure in connect/send. */ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); m_pendingRequests=0; if(!m_queryHandles.empty()){ // set query error only if queries are running broadcastError(pErr); }else{ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_pError=pErr; shutdownSocket(); } return status; } /* * Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read * and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found * for the created RPC message. */ status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); // Set query error only if queries are running. If valid QueryHandle that means the bytes to form a valid // RPC message was read successfully from socket. So there is no socket/connection issues. if(pQueryHandle!=NULL){ m_pendingRequests--; pQueryHandle->signalError(pErr); }else{ // This means error was while reading from socket, hence call broadcastError which eventually closes socket. m_pendingRequests=0; broadcastError(pErr); } return status; } /* * Always called with valid QueryHandle when there is any error processing Query related data. */ status_t DrillClientImpl::handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle){ assert(pQueryHandle!=NULL); DrillClientError* pErr = DrillClientError::getErrorObject(e); pQueryHandle->signalError(pErr); m_pendingRequests--; return status; } status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult* pQueryHandle) { sendCancel(&pQueryHandle->getQueryId()); // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are // pushed on the wire before the cancel is processed. pQueryHandle->setIsQueryPending(false); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) pQueryHandle->setQueryStatus(status); removeQueryHandle(pQueryHandle); return status; } void DrillClientImpl::broadcastError(DrillClientError* pErr){ if(pErr!=NULL){ std::map::const_iterator iter; if(!m_queryHandles.empty()){ for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) { DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg); iter->second->signalError(err); } } delete pErr; } // We have an error at the connection level. Cancel the heartbeat. // And close the connection m_heartbeatTimer.cancel(); m_pendingRequests=0; m_cv.notify_one(); shutdownSocket(); return; } // The implementation is similar to handleQryError status_t DrillClientImpl::handleTerminatedQryState( status_t status, const std::string& msg, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); if(status==QRY_COMPLETED){ pQueryResult->signalComplete(); }else{ // set query error only if queries did not complete successfully DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); pQueryResult->signalError(pErr); } return status; } void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){ boost::lock_guard lock(m_dcMutex); // Removing first the base handle for(std::map::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) { if(pQueryHandle==(DrillClientQueryHandle*)iter->second){ m_queryHandles.erase(iter->first); break; } } // if the query handle is a result handle, m_queryResults also need to be cleaned. DrillClientQueryResult* pQueryResult = dynamic_cast(pQueryHandle); if (pQueryResult) { for(std::map::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { if(pQueryResult==(DrillClientQueryResult*)it->second){ m_queryResults.erase(it->first); break; } } } } void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){ exec::rpc::Ack ack; ack.set_ok(isOk); rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard lock(m_dcMutex); sendSyncCommon(ack_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;) } void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){ boost::lock_guard lock(m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); sendSyncCommon(cancel_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;) } void DrillClientImpl::shutdownSocket(){ m_io_service.stop(); boost::system::error_code ignorederr; m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr); m_bIsConnected=false; // Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next // call to connect. if(m_saslAuthenticator != NULL) { delete m_saslAuthenticator; m_saslAuthenticator = NULL; } // Reset the SASL states. m_saslDone = false; m_saslResultCode = SASL_OK; // Reset the encryption context since connection is invalid m_encryptionCtxt.reset(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;) } namespace { // anonymous } namespace { // anonymous // Helper class to wait on ServerMeta results struct ServerMetaContext { ServerMetaContext() : m_done(false), m_status(QRY_FAILURE) { ; // Do nothing. } bool m_done; status_t m_status; exec::user::ServerMeta m_serverMeta; boost::mutex m_mutex; boost::condition_variable m_cv; static status_t listener(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err) { ServerMetaContext* context = static_cast(ctx); if (err) { context->m_status = QRY_FAILURE; } else { context->m_status = QRY_SUCCESS; context->m_serverMeta.CopyFrom(*serverMeta); } { boost::lock_guard lock(context->m_mutex); context->m_done = true; } context->m_cv.notify_one(); return QRY_SUCCESS; } }; } meta::DrillMetadata* DrillClientImpl::getMetadata() { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting metadata" << std::endl;) if (std::find(m_supportedMethods.begin(), m_supportedMethods.end(), exec::user::GET_SERVER_META) == m_supportedMethods.end()) { DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata not supported " << m_supportedMethods.size() << ". Falling back to default." << std::endl;) return new meta::DrillMetadata(*this, meta::DrillMetadata::s_defaultServerMeta); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata supported." << std::endl;) exec::user::GetServerMetaReq req; ServerMetaContext ctx; boost::function factory = boost::bind( boost::factory(), boost::ref(*this), _1, ServerMetaContext::listener, &ctx); // Getting a query handle, and make sure to free when done boost::shared_ptr handle = boost::shared_ptr( sendMsg(factory, exec::user::GET_SERVER_META, req), boost::bind(&DrillClientImpl::freeQueryResources, this, _1)); { boost::unique_lock lock(ctx.m_mutex); while(!ctx.m_done) { ctx.m_cv.wait(lock); } } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata received." << std::endl;) if (ctx.m_status != QRY_SUCCESS) { return NULL; } return new meta::DrillMetadata(*this, ctx.m_serverMeta); } void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { delete metadata; } // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this // class are used by the async callbacks. status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) { bool hasSchemaChanged=false; bool isFirstIter=false; boost::lock_guard schLock(this->m_schemaMutex); isFirstIter=this->m_numBatches==1?true:false; std::map oldSchema; if(!m_columnDefs->empty()){ for(std::vector::iterator it = this->m_columnDefs->begin(); it != this->m_columnDefs->end(); ++it){ // the key is the field_name + type char type[256]; snprintf(type, sizeof(type), ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() ); std::string k= (*it)->getName()+type; oldSchema[k]=*it; delete *it; } } m_columnDefs->clear(); size_t numFields=pQueryData->def().field_size(); if (numFields > 0){ for(size_t i=0; iset(pQueryData->def().field(i)); this->m_columnDefs->push_back(fmd); //Look for changes in the vector and trigger a Schema change event if necessary. //If vectors are different, then call the schema change listener. char type[256]; snprintf(type, sizeof(type), ":%d:%d",fmd->getMinorType(), fmd->getDataMode() ); std::string k= fmd->getName()+type; std::map::iterator iter=oldSchema.find(k); if(iter==oldSchema.end()){ // not found hasSchemaChanged=true; }else{ oldSchema.erase(iter); } } if(oldSchema.size()>0){ hasSchemaChanged=true; oldSchema.clear(); } } this->m_bHasSchemaChanged=hasSchemaChanged&&!isFirstIter; if(this->m_bHasSchemaChanged){ //invoke schema change Listener if(m_pSchemaListener!=NULL){ m_pSchemaListener(this, m_columnDefs, NULL); } } return this->m_bHasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS; } status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err) { //ctx; // unused, we already have the this pointer DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;) //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. if(this->isCancelled()){ if(b!=NULL) delete b; return QRY_FAILURE; } if (!err) { // signal the cond var { if(b!=NULL){ #ifdef DEBUG DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<getQueryResult()->query_id()) << "Query result listener saved result to queue." << std::endl;) #endif boost::lock_guard cvLock(this->m_cvMutex); this->m_recordBatches.push(b); this->m_bHasData=true; } } m_cv.notify_one(); }else{ return QRY_FAILURE; } return QRY_SUCCESS; } RecordBatch* DrillClientQueryResult::peekNext(){ RecordBatch* pRecordBatch=NULL; boost::unique_lock cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } // READ but not remove first element from queue pRecordBatch = this->m_recordBatches.front(); return pRecordBatch; } void DrillClientQueryResult::cancel() { // Calling parent class DrillClientBaseHandle::cancel(); // If queryId has already been received, don't wait to send the // cancellation message if (this->m_pQueryId) { this->client().handleQryCancellation(QRY_CANCELED, this); } } RecordBatch* DrillClientQueryResult::getNext() { RecordBatch* pRecordBatch=NULL; boost::unique_lock cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;) if(!m_recordBatches.empty()){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;) } return NULL; } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){ this->m_cv.wait(cvLock); } // remove first element from queue pRecordBatch = this->m_recordBatches.front(); this->m_recordBatches.pop(); this->m_bHasData=!this->m_recordBatches.empty(); // if vector is empty, set m_bHasDataPending to false; m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED); return pRecordBatch; } // Blocks until data is available void DrillClientQueryResult::waitForData() { boost::unique_lock cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return; while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } } template status_t DrillClientBaseHandle::notifyListener(Value v, DrillClientError* pErr){ return m_pApplicationListener(getApplicationContext(), v, pErr); } void DrillClientQueryHandle::cancel() { this->m_bCancel=true; } void DrillClientQueryHandle::signalError(DrillClientError* pErr){ // Ignore return values from the listener. if(pErr!=NULL){ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } m_pError=pErr; // TODO should it be protected by m_cvMutex? m_bHasError=true; } return; } template void DrillClientBaseHandle::signalError(DrillClientError* pErr){ DrillClientQueryHandle::signalError(pErr); // Ignore return values from the listener. if(pErr!=NULL){ this->notifyListener(NULL, pErr); } } status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) { pfnQueryResultsListener pResultsListener=getApplicationListener(); if(pResultsListener!=NULL){ return pResultsListener(this, batch, pErr); }else{ return defaultQueryResultsListener(this, batch, pErr); } } void DrillClientQueryResult::signalError(DrillClientError* pErr){ DrillClientQueryHandle::signalError(pErr); // Ignore return values from the listener. if(pErr!=NULL){ this->notifyListener(NULL, pErr); { boost::lock_guard cvLock(this->m_cvMutex); m_bIsQueryPending=false; m_bHasData=false; } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); } return; } void DrillClientQueryResult::signalComplete(){ this->notifyListener(NULL, NULL); { boost::lock_guard cvLock(this->m_cvMutex); m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED); resetError(); } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); return; } void DrillClientQueryHandle::clearAndDestroy(){ //Tell the parent to remove this from its lists m_client.removeQueryHandle(this); if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } } void DrillClientQueryResult::clearAndDestroy(){ DrillClientQueryHandle::clearAndDestroy(); //free memory allocated for FieldMetadata objects saved in m_columnDefs; if(!m_columnDefs->empty()){ for(std::vector::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ delete *it; } m_columnDefs->clear(); } if(this->m_pQueryId!=NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;) } //clear query id map entries. if(this->m_pQueryId!=NULL){ delete this->m_pQueryId; this->m_pQueryId=NULL; } if(!m_recordBatches.empty()){ // When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;) RecordBatch* pR=NULL; while(!m_recordBatches.empty()){ pR=m_recordBatches.front(); m_recordBatches.pop(); delete pR; } } } status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) { // Get columns schema information const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns(); for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { FieldMetadata* metadata = new FieldMetadata; metadata->set(*it); m_columnDefs->push_back(metadata); } // Copy server handle if (pstmt.has_server_handle()){ this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle()); return QRY_SUCCESS; } return QRY_FAILURE; } void DrillClientPrepareHandle::clearAndDestroy(){ DrillClientQueryHandle::clearAndDestroy(); //free memory allocated for FieldMetadata objects saved in m_columnDefs; if(!m_columnDefs->empty()){ for(std::vector::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ delete *it; } m_columnDefs->clear(); } } connectionStatus_t PooledDrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ connectionStatus_t stat = CONN_SUCCESS; std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; m_connectStr=connStr; Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(!strcmp(protocol.c_str(), "zk")){ // Get a list of drillbits ZookeeperClient zook(pathToDrill); std::vector drillbits; int err = zook.getAllDrillbits(hostPortStr, drillbits); if(!err){ if (drillbits.empty()){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT)); } Utils::shuffle(drillbits); // The original shuffled order is maintained if we shuffle first and then add any missing elements Utils::add(m_drillbits, drillbits); exec::DrillbitEndpoint e; size_t nextIndex=0; { boost::lock_guard cLock(m_cMutex); m_lastConnection++; nextIndex = (m_lastConnection)%(getDrillbitCount()); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" << "(" << (void*)this << ")" << ": Current counter is: " << m_lastConnection << std::endl;) err=zook.getEndPoint(m_drillbits[nextIndex], e); if(!err){ host=boost::lexical_cast(e.address()); port=boost::lexical_cast(e.user_port()); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex << ">. Selected " << e.DebugString() << std::endl;) } if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); } zook.close(); m_bIsDirectConnection=false; }else if(!strcmp(protocol.c_str(), "local")){ char tempStr[MAX_CONNECT_STR+1]; strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; host=strtok(tempStr, ":"); port=strtok(NULL, ""); m_bIsDirectConnection=true; }else{ return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;) DrillClientImpl* pDrillClientImpl = new DrillClientImpl(); stat = pDrillClientImpl->connect(host.c_str(), port.c_str()); if(stat == CONN_SUCCESS){ boost::lock_guard lock(m_poolMutex); m_clientConnections.push_back(pDrillClientImpl); }else{ DrillClientError* pErr = pDrillClientImpl->getError(); handleConnError((connectionStatus_t)pErr->status, pErr->msg); delete pDrillClientImpl; } return stat; } connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){ // Assume there is one valid connection to at least one drillbit connectionStatus_t stat=CONN_FAILURE; // Keep a copy of the user properties if(props!=NULL){ m_pUserProperties = boost::shared_ptr(new DrillUserProperties); //for(size_t i=0; isize(); i++){ for(std::map::const_iterator propIter = props->begin(); propIter != props->end(); ++propIter){ std::string currKey=propIter->first; std::string currVal=propIter->second; m_pUserProperties->setProperty( currKey, currVal ); } } DrillClientImpl* pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get()); } else{ stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); } return stat; } DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){ DrillClientQueryResult* pDrillClientQueryResult = NULL; DrillClientImpl* pDrillClientImpl = NULL; pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx); m_queriesExecuted++; } return pDrillClientQueryResult; } DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){ DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL; DrillClientImpl* pDrillClientImpl = NULL; pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx); m_queriesExecuted++; } return pDrillClientPrepareHandle; } DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){ DrillClientQueryResult* pDrillClientQueryResult = NULL; DrillClientImpl* pDrillClientImpl = NULL; pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx); m_queriesExecuted++; } return pDrillClientQueryResult; } void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){ // If this class ever keeps track of executing queries then it will need // to implement this call to free any query specific resources the pool might have // allocated pQryHandle->client().freeQueryResources(pQryHandle); } meta::DrillMetadata* PooledDrillClientImpl::getMetadata() { meta::DrillMetadata* metadata = NULL; DrillClientImpl* pDrillClientImpl = getOneConnection(); if (pDrillClientImpl != NULL) { metadata = pDrillClientImpl->getMetadata(); } return metadata; } void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { metadata->client().freeMetadata(metadata); } bool PooledDrillClientImpl::Active(){ boost::lock_guard lock(m_poolMutex); for(std::vector::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ if((*it)->Active()){ return true; } } return false; } void PooledDrillClientImpl::Close() { boost::lock_guard lock(m_poolMutex); for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ (*it)->Close(); delete *it; } m_clientConnections.clear(); m_pUserProperties.reset(); if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_lastConnection=-1; m_queriesExecuted=0; } DrillClientError* PooledDrillClientImpl::getError(){ std::string errMsg; std::string nl=""; uint32_t stat; boost::lock_guard lock(m_poolMutex); for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ if((*it)->getError() != NULL){ errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg; stat=(*it)->getError()->status; } } if(errMsg.length()>0){ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg); } return m_pError; } //Waits as long as any one drillbit connection has results pending void PooledDrillClientImpl::waitForResults(){ boost::lock_guard lock(m_poolMutex); for(std::vector::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ (*it)->waitForResults(); } return; } connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;) if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_pError=pErr; return status; } DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ DrillClientImpl* pDrillClientImpl = NULL; while(pDrillClientImpl==NULL){ if(m_queriesExecuted == 0){ // First query ever sent can use the connection already established to handleAuthentication the user boost::lock_guard lock(m_poolMutex); pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed }else if(m_clientConnections.size() == m_maxConcurrentConnections){ // Pool is full. Use one of the already established connections boost::lock_guard lock(m_poolMutex); pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections]; if(!pDrillClientImpl->Active()){ Utils::eraseRemove(m_clientConnections, pDrillClientImpl); pDrillClientImpl=NULL; } }else{ int tries=0; connectionStatus_t ret=CONN_SUCCESS; while(pDrillClientImpl==NULL && tries++ < 3){ if((ret=connect(m_connectStr.c_str(), m_pUserProperties.get()))==CONN_SUCCESS){ boost::lock_guard lock(m_poolMutex); pDrillClientImpl=m_clientConnections.back(); ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get()); if(ret!=CONN_SUCCESS){ delete pDrillClientImpl; pDrillClientImpl=NULL; m_clientConnections.erase(m_clientConnections.end()); } } } // try a few times if(ret!=CONN_SUCCESS){ break; } } // need a new connection }// while if(pDrillClientImpl==NULL){ connectionStatus_t status = CONN_NOTCONNECTED; handleConnError(status, getMessage(ERR_CONN_NOCONN)); } return pDrillClientImpl; } } // namespace Drill