aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src
diff options
context:
space:
mode:
authorLaurent Goujon <laurent@dremio.com>2016-10-11 16:35:18 -0700
committerParth Chandra <parthc@apache.org>2016-11-01 11:33:23 -0700
commit83513daf0903e0d94fcaad7b1ae4e8ad6272b494 (patch)
treec0b097420f1dcfa7eaef81ed4312dfe308c4483f /contrib/native/client/src
parent166c4ce7600b5571249a6748dd57383479313e2e (diff)
DRILL-1996: Add cancel method to Drill C++ connector
This closes #602
Diffstat (limited to 'contrib/native/client/src')
-rw-r--r--contrib/native/client/src/clientlib/drillClient.cpp8
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp174
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp1
-rw-r--r--contrib/native/client/src/include/drill/drillClient.hpp7
4 files changed, 116 insertions, 74 deletions
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 20a466e68..b02f99359 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -400,6 +400,14 @@ status_t DrillClient::executeQuery(const PreparedStatement& pstmt, pfnQueryResul
return QRY_SUCCESS;
}
+void DrillClient::cancelQuery(QueryHandle_t handle) {
+ if (!handle) {
+ return;
+ }
+ DrillClientQueryHandle* pHandle = static_cast<DrillClientQueryHandle*>(handle);
+ pHandle->cancel();
+}
+
void* DrillClient::getApplicationContext(QueryHandle_t handle){
assert(handle!=NULL);
return (static_cast<DrillClientQueryHandle*>(handle))->getApplicationContext();
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 7ecf910f9..51ae1a2cd 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -825,7 +825,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer
status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
- ::exec::shared::QueryId qid;
// Be a good client and send ack as early as possible.
// Drillbit pushed the query result to the client, the client should send ack
// whenever it receives the message
@@ -839,7 +838,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
- qid = ::exec::shared::QueryId(qr->query_id());
+ const ::exec::shared::QueryId& qid = qr->query_id();
if(qid.part1()==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
delete allocatedBuffer;
@@ -855,90 +854,105 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c
return ret;
}
- //Validate the RPC message
- std::string valErr;
- if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
- delete allocatedBuffer;
- delete qr;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
- pDrillClientQueryResult->setQueryStatus(ret);
- return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
- }
-
- //Build Record Batch here
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;)
-
- pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
- pDrillClientQueryResult->m_numBatches++;
-
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
- pRecordBatch->build();
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords "
- << pRecordBatch->getNumRecords() << std::endl;)
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields "
- << pRecordBatch->getNumFields() << std::endl;)
-
- ret=pDrillClientQueryResult->setupColumnDefs(qr);
- if(ret==QRY_SUCCESS_WITH_INFO){
- pRecordBatch->schemaChanged(true);
- }
-
- pDrillClientQueryResult->setIsQueryPending(true);
- if(pDrillClientQueryResult->m_bIsLastChunk){
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
- << "Received last batch. " << std::endl;)
- ret=QRY_NO_MORE_DATA;
+ // check if query has been cancelled
+ if (pDrillClientQueryResult->isCancelled()) {
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " << std::endl;)
+ delete qr;
+ delete allocatedBuffer;
+ ret = QRY_CANCEL;
+ } else {
+ //Validate the RPC message
+ std::string valErr;
+ if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+ delete allocatedBuffer;
+ delete qr;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
+ pDrillClientQueryResult->setQueryStatus(ret);
+ return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
+ }
+
+ //Build Record Batch here
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qid) << std::endl;)
+
+ pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
+ pDrillClientQueryResult->m_numBatches++;
+
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
+ pRecordBatch->build();
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords "
+ << pRecordBatch->getNumRecords() << std::endl;)
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields "
+ << pRecordBatch->getNumFields() << std::endl;)
+
+ ret=pDrillClientQueryResult->setupColumnDefs(qr);
+ if(ret==QRY_SUCCESS_WITH_INFO){
+ pRecordBatch->schemaChanged(true);
+ }
+
+ pDrillClientQueryResult->setIsQueryPending(true);
+ if(pDrillClientQueryResult->m_bIsLastChunk){
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)
+ << "Received last batch. " << std::endl;)
+ ret=QRY_NO_MORE_DATA;
+ }
+ pDrillClientQueryResult->setQueryStatus(ret);
+ ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
}
- pDrillClientQueryResult->setQueryStatus(ret);
- ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
} // release lock
- if(ret==QRY_FAILURE){
- sendCancel(&qid);
- // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
- // pushed on the wire before the cancel is processed.
- pDrillClientQueryResult->setIsQueryPending(false);
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
- pDrillClientQueryResult->setQueryStatus(ret);
- removeQueryHandle(pDrillClientQueryResult);
- removeQueryResult(pDrillClientQueryResult);
- return ret;
+ if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){
+ return handleQryCancellation(ret, pDrillClientQueryResult);
}
return ret;
}
status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
+ DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
- boost::lock_guard<boost::mutex> lock(m_dcMutex);
- for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
- DrillClientQueryResult* pDrillClientQueryResult=it->second;
- std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL");
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId
- << " QueryId: "<< qidString << std::endl;)
- }
- if(msg.m_coord_id==0){
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
- return QRY_SUCCESS;
- }
- std::map<int, DrillClientQueryHandle*>::const_iterator it;
- it=this->m_queryHandles.find(msg.m_coord_id);
- if(it!=this->m_queryHandles.end()){
- DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
- if (!pDrillClientQueryResult) {
- return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
- }
- exec::shared::QueryId *qid = new exec::shared::QueryId;
- DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
- qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
- DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
- m_queryResults[qid]=pDrillClientQueryResult;
- //save queryId allocated here so we can free it later
- pDrillClientQueryResult->setQueryId(qid);
- }else{
- return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ {
+ boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+ if(msg.m_coord_id==0){
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
+ return QRY_SUCCESS;
+ }
+
+ for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
+ DrillClientQueryResult* pQueryResult=it->second;
+ std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL");
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pQueryResult->m_coordinationId
+ << " QueryId: "<< qidString << std::endl;)
+ }
+
+ std::map<int, DrillClientQueryHandle*>::const_iterator it;
+ it=this->m_queryHandles.find(msg.m_coord_id);
+ if(it==this->m_queryHandles.end()){
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+ pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
+ if (!pDrillClientQueryResult) {
+ return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+ }
+
+ // Check for cancellation to notify
+ if (pDrillClientQueryResult->isCancelled()) {
+ ret = QRY_CANCELED;
+ }
+ else {
+ exec::shared::QueryId *qid = new exec::shared::QueryId;
+ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
+ qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
+ m_queryResults[qid]=pDrillClientQueryResult;
+ //save queryId allocated here so we can free it later
+ pDrillClientQueryResult->setQueryId(qid);
+ }
+ }
+ if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) {
+ return handleQryCancellation(ret, pDrillClientQueryResult);
}
return ret;
}
@@ -1486,6 +1500,18 @@ status_t DrillClientImpl::handleQryError(status_t status,
return status;
}
+status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult* pQueryHandle) {
+ sendCancel(&pQueryHandle->getQueryId());
+ // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
+ // pushed on the wire before the cancel is processed.
+ pQueryHandle->setIsQueryPending(false);
+ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
+ pQueryHandle->setQueryStatus(status);
+ removeQueryResult(pQueryHandle);
+ removeQueryHandle(pQueryHandle);
+ return status;
+}
+
void DrillClientImpl::broadcastError(DrillClientError* pErr){
if(pErr!=NULL){
std::map<int, DrillClientQueryHandle*>::const_iterator iter;
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f9d077957..8da37b6ae 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -489,6 +489,7 @@ class DrillClientImpl : public DrillClientImplBase{
status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
+ status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
// handle query state indicating query is COMPLETED or CANCELED
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 5e59885d3..29ae6c284 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -1276,6 +1276,13 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
status_t executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle);
/*
+ * Cancel a query.
+ *
+ * @param[in] the handle of the query to cancel
+ */
+ void cancelQuery(QueryHandle_t handle);
+
+ /*
* The client application should call this function to wait for results if it has registered a
* listener.
*/