From c051bbd888ad1db5de54283d0e80ac9a08accac5 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 10 Nov 2014 14:20:57 -0800 Subject: DRILL-1568: C++ Client - Handle Query Cancel --- contrib/native/client/example/querySubmitter.cpp | 13 +- .../client/src/clientlib/drillClientImpl.cpp | 210 ++++++++++++++------- .../client/src/clientlib/drillClientImpl.hpp | 23 ++- contrib/native/client/src/clientlib/errmsgs.cpp | 8 +- contrib/native/client/src/include/drill/common.hpp | 1 + 5 files changed, 178 insertions(+), 77 deletions(-) diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 7b98bc97b..2d8922379 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -67,6 +67,7 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr if(!err){ assert(b!=NULL); b->print(std::cout, 0); // print all rows + std::cout << "DATA RECEIVED ..." << std::endl; delete b; // we're done with this batch, we can delete it if(bTestCancel){ return Drill::QRY_FAILURE; @@ -304,23 +305,27 @@ int main(int argc, char* argv[]) { //DrillClient::initLogging("/var/log/drill/", l); // To log to stderr Drill::DrillClient::initLogging(NULL, l); - Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold at least two record batches. + //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold at least two record batches. + int nQueries=queryInputs.size(); + Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches. if(client.connect(connectStr.c_str(), schema.c_str())!=Drill::CONN_SUCCESS){ std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<cancel(); - printf("Application canceled the query.\n"); - } + printf("Application cancelled the query.\n"); + } } if(ret!=Drill::QRY_NO_MORE_DATA && ret!=Drill::QRY_CANCEL){ std::cerr<< pRecIter->getError() << std::endl; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index cc7002038..23dc407d4 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -158,7 +158,7 @@ connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){ boost::system::error_code ec; size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec); if(!ec && s!=0){ - return CONN_SUCCESS; + return CONN_SUCCESS; }else{ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str())); } @@ -465,6 +465,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message sendAck(msg, true); + RecordBatch* pRecordBatch=NULL; { boost::lock_guard lock(this->m_dcMutex); exec::shared::QueryResult* qr = new exec::shared::QueryResult; //Record Batch will own this object and free it up. @@ -481,62 +482,29 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; }else{ - assert(0); - //assert might be compiled away in a release build. So return an error to the app. - status_t ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL); + ret=processCancelledQueryResult(qid, qr); + DRILL_LOG(LOG_TRACE) << "Cleaning up resource allocated for canceled quyery." << std::endl; delete qr; + delete allocatedBuffer; return ret; } DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl; - // Drillbit may send query state message which does not contain any + // Drillbit may send a query state change message which does not contain any // record batch. - if (qr->has_query_state()) { - ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; - pDrillClientQueryResult->setQueryStatus(ret); - switch(qr->query_state()) { - case exec::shared::QueryResult_QueryState_FAILED: - case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY: - // get the error message from protobuf and handle errors - ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult); - delete allocatedBuffer; - delete qr; - break; - - case exec::shared::QueryResult_QueryState_PENDING: - case exec::shared::QueryResult_QueryState_RUNNING: - // Ignore these state messages since they means the query is not completed. - // I have not observed those messages in testing though. - break; - - // m_pendingRequests should be decremented when the query is - // canceled or completed - // in both cases, fall back to free mememory - case exec::shared::QueryResult_QueryState_CANCELED: - ret=handleTerminatedQryState(ret, - getMessage(ERR_QRY_CANCELED), - pDrillClientQueryResult); - case exec::shared::QueryResult_QueryState_COMPLETED: - ret=handleTerminatedQryState(ret, - getMessage(ERR_QRY_COMPLETED), - pDrillClientQueryResult); - delete allocatedBuffer; - delete qr; - break; - - default: - DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown Query State.\n"; - ret=handleQryError(QRY_INTERNAL_ERROR, - getMessage(ERR_QRY_UNKQRYSTATE), - pDrillClientQueryResult); - delete allocatedBuffer; - delete qr; - break; - } + if (qr->has_query_state() && + qr->query_state() != exec::shared::QueryResult_QueryState_RUNNING && + qr->query_state() != exec::shared::QueryResult_QueryState_PENDING) { + ret=processQueryStatusResult(qr, pDrillClientQueryResult); + delete allocatedBuffer; + delete qr; return ret; }else{ - DRILL_LOG(LOG_WARNING) << "DrillClientImpl::processQueryResult: Query State was not set (assuming a query with no result set.\n"; + // Normal query results come back with query_state not set. + // Actually this is not strictly true. The query state is set to + // 0(i.e. PENDING), but protobuf thinks this means the value is not set. + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n"; } //Validate the RPC message @@ -550,9 +518,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer } //Build Record Batch here - DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl; + DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl; - RecordBatch* pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); + pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); pDrillClientQueryResult->m_numBatches++; DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl; @@ -572,6 +540,12 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer pDrillClientQueryResult->m_bIsQueryPending=true; pDrillClientQueryResult->m_bIsLastChunk=qr->is_last_chunk(); pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener; + if(pDrillClientQueryResult->m_bIsLastChunk){ + DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) + << "Received last batch. " << std::endl; + ret=QRY_NO_MORE_DATA; + } + pDrillClientQueryResult->setQueryStatus(ret); if(pResultsListener!=NULL){ ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL); }else{ @@ -582,23 +556,50 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer } // release lock if(ret==QRY_FAILURE){ sendCancel(&qid); - { - boost::lock_guard lock(this->m_dcMutex); - m_pendingRequests--; - } + // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are + // pushed on the wire before the cancel is processed. pDrillClientQueryResult->m_bIsQueryPending=false; DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl; pDrillClientQueryResult->setQueryStatus(ret); + clearMapEntries(pDrillClientQueryResult); return ret; } - if(pDrillClientQueryResult->m_bIsLastChunk){ - DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Received last batch. " << std::endl; - ret=QRY_NO_MORE_DATA; - pDrillClientQueryResult->setQueryStatus(ret); - return ret; + return ret; +} + +status_t DrillClientImpl::processCancelledQueryResult(exec::shared::QueryId& qid, exec::shared::QueryResult* qr){ + status_t ret=QRY_SUCCESS; + // look in cancelled queries + DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id()) << " has been cancelled." << std::endl; + std::set::iterator it2; + exec::shared::QueryId* pQid=NULL;// + it2=this->m_cancelledQueries.find(&qid); + if(it2!=this->m_cancelledQueries.end()){ + pQid=(*it2); + if(qr->has_query_state()){ + ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; + if(qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED + || qr->query_state()==exec::shared::QueryResult_QueryState_CANCELED + || qr->query_state()==exec::shared::QueryResult_QueryState_FAILED) { + this->m_pendingRequests--; + this->m_cancelledQueries.erase(it2); + delete pQid; + DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id()) << " completed." << std::endl; + DRILL_LOG(LOG_DEBUG) << "Pending requests - " << this->m_pendingRequests << std::endl; + } + } + }else{ + status_t ret=QRY_FAILED; + if(qr->has_query_state() && qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED){ + ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; + }else if(!qr->has_query_state() && qr->row_count()==0){ + ret=QRY_SUCCESS; + }else{ + //ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL); + DRILL_LOG(LOG_DEBUG) << "Pending requests - " << getMessage(ERR_QRY_OUTOFORDER) << std::endl; + ret= QRY_SUCCESS; + } } - pDrillClientQueryResult->setQueryStatus(ret); return ret; } @@ -613,9 +614,9 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB if(it!=this->m_queryIds.end()){ pDrillClientQueryResult=(*it).second; exec::shared::QueryId *qid = new exec::shared::QueryId; - DRILL_LOG(LOG_TRACE) << "Received Query Handle" << msg.m_pbody.size() << std::endl; + DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl; qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_LOG(LOG_TRACE) << qid->DebugString() << std::endl; + DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl; m_queryResults[qid]=pDrillClientQueryResult; //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); @@ -627,6 +628,53 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB return ret; } +status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr, + DrillClientQueryResult* pDrillClientQueryResult){ + status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()]; + pDrillClientQueryResult->setQueryStatus(ret); + pDrillClientQueryResult->setQueryState(qr->query_state()); + switch(qr->query_state()) { + case exec::shared::QueryResult_QueryState_FAILED: + case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY: + { + // get the error message from protobuf and handle errors + ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult); + } + break; + + // m_pendingRequests should be decremented when the query is + // completed + case exec::shared::QueryResult_QueryState_CANCELED: + { + ret=handleTerminatedQryState(ret, + getMessage(ERR_QRY_CANCELED), + pDrillClientQueryResult); + m_pendingRequests--; + } + break; + case exec::shared::QueryResult_QueryState_COMPLETED: + { + // DO NOT call handleTerminateQryState because that + // signals an error condition and the synchronous API + // will then free the query result object without it + // being processed by the application. + ret=QRY_COMPLETED; + m_pendingRequests--; + } + break; + + default: + { + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown Query State.\n"; + ret=handleQryError(QRY_INTERNAL_ERROR, + getMessage(ERR_QRY_UNKQRYSTATE), + pDrillClientQueryResult); + } + break; + } + return ret; +} + void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer. if(!err){ @@ -690,6 +738,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, } return; } + }else if(!error && msg.m_rpc_type==exec::user::ACK){ + // Cancel requests will result in an ACK sent back. + // Consume silently + if(m_pendingRequests!=0){ + boost::lock_guard lock(this->m_dcMutex); + getNextResult(); + } + return; }else{ boost::lock_guard lock(this->m_dcMutex); if(error){ @@ -769,6 +825,7 @@ status_t DrillClientImpl::handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} this->m_pError = DrillClientError::getErrorObject(e); pQueryResult->signalError(this->m_pError); m_pendingRequests--; @@ -796,11 +853,23 @@ status_t DrillClientImpl::handleTerminatedQryState( DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_pError=pErr; - m_pendingRequests--; pQueryResult->signalError(pErr); return status; } + +void DrillClientImpl::clearCancelledEntries(){ + + std::map::iterator iter; + boost::lock_guard lock(m_dcMutex); + + if(!m_cancelledQueries.empty()){ + std::set::iterator it; + m_cancelledQueries.erase(m_cancelledQueries.begin(), m_cancelledQueries.end()); + } +} + + void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ std::map::iterator iter; boost::lock_guard lock(m_dcMutex); @@ -813,6 +882,13 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ } } if(!m_queryResults.empty()){ + // Save the query id and state and free when the query is complete + if(pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_COMPLETED + && pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_FAILED){ + exec::shared::QueryId* pQueryId=new exec::shared::QueryId(); + pQueryId->CopyFrom(pQueryResult->getQueryId()); + m_cancelledQueries.insert(pQueryId); + } std::map::iterator it; for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { if(pQueryResult==(DrillClientQueryResult*)it->second){ @@ -907,6 +983,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl; //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. if(this->m_bCancel){ + delete b; return QRY_FAILURE; } if (!err) { @@ -1010,7 +1087,8 @@ void DrillClientQueryResult::clearAndDestroy(){ } m_columnDefs->clear(); } - //Tell the parent to remove this from it's lists + DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl; + //Tell the parent to remove this from its lists m_pClient->clearMapEntries(this); //clear query id map entries. @@ -1018,7 +1096,7 @@ void DrillClientQueryResult::clearAndDestroy(){ delete this->m_pQueryId; this->m_pQueryId=NULL; } if(!m_recordBatches.empty()){ - // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the servrer _after_ + // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl; RecordBatch* pR=NULL; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 8e2f437f3..a5eeb7775 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -42,7 +42,6 @@ #include #endif -#include "drill/common.hpp" #include "drill/drillClient.hpp" #include "rpcEncoder.hpp" #include "rpcDecoder.hpp" @@ -73,6 +72,7 @@ class DrillClientQueryResult{ m_bHasSchemaChanged(false), m_bHasData(false), m_bHasError(false), + m_queryState(exec::shared::QueryResult_QueryState_PENDING), m_pError(NULL), m_pQueryId(NULL), m_pSchemaListener(NULL), @@ -126,10 +126,14 @@ class DrillClientQueryResult{ void setQueryStatus(status_t s){ m_status = s;} status_t getQueryStatus(){ return m_status;} + void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;} + exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;} + private: status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult); status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err); // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables. + // Also used when a query is cancelled or when a query completed response is received. void signalError(DrillClientError* pErr); void clearAndDestroy(); @@ -162,6 +166,9 @@ class DrillClientQueryResult{ bool m_bHasData; bool m_bHasError; + // state in the last query result received from the server. + exec::shared::QueryResult_QueryState m_queryState; + const DrillClientError* m_pError; exec::shared::QueryId* m_pQueryId; @@ -200,6 +207,7 @@ class DrillClientImpl{ //Cancel any pending requests //Clear and destroy DrillClientQueryResults vector? + clearCancelledEntries(); m_deadlineTimer.cancel(); m_io_service.stop(); boost::system::error_code ignorederr; @@ -258,7 +266,10 @@ class DrillClientImpl{ InBoundRpcMessage& msg, boost::system::error_code& error); status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); + status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr); status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); + status_t processQueryStatusResult( exec::shared::QueryResult* qr, + DrillClientQueryResult* pDrillClientQueryResult); void handleReadTimeout(const boost::system::error_code & err); void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); @@ -268,12 +279,13 @@ class DrillClientImpl{ const exec::shared::DrillPBError& e, DrillClientQueryResult* pQueryResult); // handle query state indicating query is COMPELTED or CANCELED - // (i.e., COMPELTED or CANCELE) + // (i.e., COMPELTED or CANCELED) status_t handleTerminatedQryState(status_t status, std::string msg, DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); void clearMapEntries(DrillClientQueryResult* pQueryResult); + void clearCancelledEntries(); void sendAck(InBoundRpcMessage& msg, bool isOk); void sendCancel(exec::shared::QueryId* pQueryId); @@ -311,8 +323,13 @@ class DrillClientImpl{ // Map of coordination id to Query Ids. std::map m_queryIds; - // Map of query id to query result + // Map of query id to query result for currently executing queries std::map m_queryResults; + // + // State for every Query id whose queries have result data pending but which + // have been cancelled and whose resources have been released by the client application. + // The entry is cleared when the state changes to completed or failed. + std::set m_cancelledQueries; }; diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp index 7a7fa6a38..e09bda14e 100644 --- a/contrib/native/client/src/clientlib/errmsgs.cpp +++ b/contrib/native/client/src/clientlib/errmsgs.cpp @@ -50,11 +50,11 @@ static Drill::ErrorMessages errorMessages[]={ {ERR_QRY_TIMOUT, ERR_CATEGORY_QRY, 0, "Timed out waiting for server to respond."}, {ERR_QRY_FAILURE, ERR_CATEGORY_QRY, 0, "Query execution error. Details:[ \n%s\n]"}, {ERR_QRY_SELVEC2, ERR_CATEGORY_QRY, 0, "Receiving a selection_vector_2 from the server came as a complete surprise at this point"}, - {ERR_QRY_RESPFAIL, ERR_CATEGORY_QRY, 0, "Got a RESPONSE_FAILURE from the server and don't know what to do"}, + {ERR_QRY_RESPFAIL, ERR_CATEGORY_QRY, 0, "Received a RESPONSE_FAILURE from the server."}, {ERR_QRY_UNKQRYSTATE, ERR_CATEGORY_QRY, 0, "Got an unknown query state message from the server."}, - {ERR_QRY_UNKQRY, ERR_CATEGORY_QRY, 0, "The server didn't find this query"}, - {ERR_QRY_CANCELED, ERR_CATEGORY_QRY, 0, "The server says this query has been cancelled"}, - {ERR_QRY_COMPLETED, ERR_CATEGORY_QRY, 0, "Received query_state: COMPLETED."}, + {ERR_QRY_UNKQRY, ERR_CATEGORY_QRY, 0, "Query not found on server. It might have been terminated already."}, + {ERR_QRY_CANCELED, ERR_CATEGORY_QRY, 0, "Query has been cancelled"}, + {ERR_QRY_COMPLETED, ERR_CATEGORY_QRY, 0, "Query completed."}, {ERR_QRY_16, ERR_CATEGORY_QRY, 0, "Query Failed."}, {ERR_QRY_17, ERR_CATEGORY_QRY, 0, "Query Failed."}, {ERR_QRY_18, ERR_CATEGORY_QRY, 0, "Query Failed."}, diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index 59537f151..f83aae403 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -29,6 +29,7 @@ #endif #include +#include #include #include #include -- cgit v1.2.3