From 83513daf0903e0d94fcaad7b1ae4e8ad6272b494 Mon Sep 17 00:00:00 2001 From: Laurent Goujon Date: Tue, 11 Oct 2016 16:35:18 -0700 Subject: DRILL-1996: Add cancel method to Drill C++ connector This closes #602 --- contrib/native/client/example/querySubmitter.cpp | 9 +- .../native/client/src/clientlib/drillClient.cpp | 8 + .../client/src/clientlib/drillClientImpl.cpp | 174 ++++++++++++--------- .../client/src/clientlib/drillClientImpl.hpp | 1 + .../client/src/include/drill/drillClient.hpp | 7 + 5 files changed, 124 insertions(+), 75 deletions(-) (limited to 'contrib/native') diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 306db5669..2eeaf35cb 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -415,7 +415,14 @@ int main(int argc, char* argv[]) { client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle); client.registerSchemaChangeListener(&qryHandle, SchemaListener); - client.waitForResults(); + if(bTestCancel) { + // Send cancellation request after 5seconds + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + std::cout<< "\n Cancelling query: " << *queryInpIter << "\n" << std::endl; + client.cancelQuery(qryHandle); + } else { + client.waitForResults(); + } client.freeQueryResources(&qryHandle); } diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index 20a466e68..b02f99359 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -400,6 +400,14 @@ status_t DrillClient::executeQuery(const PreparedStatement& pstmt, pfnQueryResul return QRY_SUCCESS; } +void DrillClient::cancelQuery(QueryHandle_t handle) { + if (!handle) { + return; + } + DrillClientQueryHandle* pHandle = static_cast(handle); + pHandle->cancel(); +} + void* DrillClient::getApplicationContext(QueryHandle_t handle){ assert(handle!=NULL); return (static_cast(handle))->getApplicationContext(); diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 7ecf910f9..51ae1a2cd 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -825,7 +825,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; - ::exec::shared::QueryId qid; // Be a good client and send ack as early as possible. // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message @@ -839,7 +838,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) - qid = ::exec::shared::QueryId(qr->query_id()); + const ::exec::shared::QueryId& qid = qr->query_id(); if(qid.part1()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; @@ -855,90 +854,105 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c return ret; } - //Validate the RPC message - std::string valErr; - if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ - delete allocatedBuffer; - delete qr; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) - pDrillClientQueryResult->setQueryStatus(ret); - return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); - } - - //Build Record Batch here - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;) - - pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); - pDrillClientQueryResult->m_numBatches++; - - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) - pRecordBatch->build(); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " - << pRecordBatch->getNumRecords() << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " - << pRecordBatch->getNumFields() << std::endl;) - - ret=pDrillClientQueryResult->setupColumnDefs(qr); - if(ret==QRY_SUCCESS_WITH_INFO){ - pRecordBatch->schemaChanged(true); - } - - pDrillClientQueryResult->setIsQueryPending(true); - if(pDrillClientQueryResult->m_bIsLastChunk){ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Received last batch. " << std::endl;) - ret=QRY_NO_MORE_DATA; + // check if query has been cancelled + if (pDrillClientQueryResult->isCancelled()) { + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " << std::endl;) + delete qr; + delete allocatedBuffer; + ret = QRY_CANCEL; + } else { + //Validate the RPC message + std::string valErr; + if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ + delete allocatedBuffer; + delete qr; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) + pDrillClientQueryResult->setQueryStatus(ret); + return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); + } + + //Build Record Batch here + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qid) << std::endl;) + + pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); + pDrillClientQueryResult->m_numBatches++; + + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) + pRecordBatch->build(); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords " + << pRecordBatch->getNumRecords() << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields " + << pRecordBatch->getNumFields() << std::endl;) + + ret=pDrillClientQueryResult->setupColumnDefs(qr); + if(ret==QRY_SUCCESS_WITH_INFO){ + pRecordBatch->schemaChanged(true); + } + + pDrillClientQueryResult->setIsQueryPending(true); + if(pDrillClientQueryResult->m_bIsLastChunk){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid) + << "Received last batch. " << std::endl;) + ret=QRY_NO_MORE_DATA; + } + pDrillClientQueryResult->setQueryStatus(ret); + ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } - pDrillClientQueryResult->setQueryStatus(ret); - ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } // release lock - if(ret==QRY_FAILURE){ - sendCancel(&qid); - // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are - // pushed on the wire before the cancel is processed. - pDrillClientQueryResult->setIsQueryPending(false); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) - pDrillClientQueryResult->setQueryStatus(ret); - removeQueryHandle(pDrillClientQueryResult); - removeQueryResult(pDrillClientQueryResult); - return ret; + if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){ + return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) + DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr deallocationGuard(allocatedBuffer); - boost::lock_guard lock(m_dcMutex); - for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ - DrillClientQueryResult* pDrillClientQueryResult=it->second; - std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL"); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId - << " QueryId: "<< qidString << std::endl;) - } - if(msg.m_coord_id==0){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) - return QRY_SUCCESS; - } - std::map::const_iterator it; - it=this->m_queryHandles.find(msg.m_coord_id); - if(it!=this->m_queryHandles.end()){ - DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast((*it).second); - if (!pDrillClientQueryResult) { - return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); - } - exec::shared::QueryId *qid = new exec::shared::QueryId; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) - qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) - m_queryResults[qid]=pDrillClientQueryResult; - //save queryId allocated here so we can free it later - pDrillClientQueryResult->setQueryId(qid); - }else{ - return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + { + boost::lock_guard lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + + for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ + DrillClientQueryResult* pQueryResult=it->second; + std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL"); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pQueryResult->m_coordinationId + << " QueryId: "<< qidString << std::endl;) + } + + std::map::const_iterator it; + it=this->m_queryHandles.find(msg.m_coord_id); + if(it==this->m_queryHandles.end()){ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + pDrillClientQueryResult=dynamic_cast((*it).second); + if (!pDrillClientQueryResult) { + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + + // Check for cancellation to notify + if (pDrillClientQueryResult->isCancelled()) { + ret = QRY_CANCELED; + } + else { + exec::shared::QueryId *qid = new exec::shared::QueryId; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) + qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) + m_queryResults[qid]=pDrillClientQueryResult; + //save queryId allocated here so we can free it later + pDrillClientQueryResult->setQueryId(qid); + } + } + if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) { + return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } @@ -1486,6 +1500,18 @@ status_t DrillClientImpl::handleQryError(status_t status, return status; } +status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult* pQueryHandle) { + sendCancel(&pQueryHandle->getQueryId()); + // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are + // pushed on the wire before the cancel is processed. + pQueryHandle->setIsQueryPending(false); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) + pQueryHandle->setQueryStatus(status); + removeQueryResult(pQueryHandle); + removeQueryHandle(pQueryHandle); + return status; +} + void DrillClientImpl::broadcastError(DrillClientError* pErr){ if(pErr!=NULL){ std::map::const_iterator iter; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index f9d077957..8da37b6ae 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -489,6 +489,7 @@ class DrillClientImpl : public DrillClientImplBase{ status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError); status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError); connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg); + status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult); status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle); status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle); // handle query state indicating query is COMPLETED or CANCELED diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp index 5e59885d3..29ae6c284 100644 --- a/contrib/native/client/src/include/drill/drillClient.hpp +++ b/contrib/native/client/src/include/drill/drillClient.hpp @@ -1275,6 +1275,13 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ */ status_t executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle); + /* + * Cancel a query. + * + * @param[in] the handle of the query to cancel + */ + void cancelQuery(QueryHandle_t handle); + /* * The client application should call this function to wait for results if it has registered a * listener. -- cgit v1.2.3