diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.cpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.cpp | 174 |
1 files changed, 100 insertions, 74 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 7ecf910f9..51ae1a2cd 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -825,7 +825,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; - ::exec::shared::QueryId qid; // Be a good client and send ack as early as possible. // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message @@ -839,7 +838,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) - qid = ::exec::shared::QueryId(qr->query_id()); + const ::exec::shared::QueryId& qid = qr->query_id(); if(qid.part1()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; @@ -855,90 +854,105 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, c return ret; } - //Validate the RPC message - std::string valErr; - if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ - delete allocatedBuffer; - delete qr; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) - pDrillClientQueryResult->setQueryStatus(ret); - return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); - } - - //Build Record Batch here - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id()) << std::endl;) - - pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); - pDrillClientQueryResult->m_numBatches++; - - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) - pRecordBatch->build(); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords " - << pRecordBatch->getNumRecords() << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields " - << pRecordBatch->getNumFields() << std::endl;) - - ret=pDrillClientQueryResult->setupColumnDefs(qr); - if(ret==QRY_SUCCESS_WITH_INFO){ - pRecordBatch->schemaChanged(true); - } - - pDrillClientQueryResult->setIsQueryPending(true); - if(pDrillClientQueryResult->m_bIsLastChunk){ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) - << "Received last batch. " << std::endl;) - ret=QRY_NO_MORE_DATA; + // check if query has been cancelled + if (pDrillClientQueryResult->isCancelled()) { + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " << std::endl;) + delete qr; + delete allocatedBuffer; + ret = QRY_CANCEL; + } else { + //Validate the RPC message + std::string valErr; + if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ + delete allocatedBuffer; + delete qr; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";) + pDrillClientQueryResult->setQueryStatus(ret); + return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); + } + + //Build Record Batch here + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qid) << std::endl;) + + pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody); + pDrillClientQueryResult->m_numBatches++; + + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;) + pRecordBatch->build(); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords " + << pRecordBatch->getNumRecords() << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields " + << pRecordBatch->getNumFields() << std::endl;) + + ret=pDrillClientQueryResult->setupColumnDefs(qr); + if(ret==QRY_SUCCESS_WITH_INFO){ + pRecordBatch->schemaChanged(true); + } + + pDrillClientQueryResult->setIsQueryPending(true); + if(pDrillClientQueryResult->m_bIsLastChunk){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid) + << "Received last batch. " << std::endl;) + ret=QRY_NO_MORE_DATA; + } + pDrillClientQueryResult->setQueryStatus(ret); + ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } - pDrillClientQueryResult->setQueryStatus(ret); - ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } // release lock - if(ret==QRY_FAILURE){ - sendCancel(&qid); - // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are - // pushed on the wire before the cancel is processed. - pDrillClientQueryResult->setIsQueryPending(false); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) - pDrillClientQueryResult->setQueryStatus(ret); - removeQueryHandle(pDrillClientQueryResult); - removeQueryResult(pDrillClientQueryResult); - return ret; + if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){ + return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) + DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; // make sure to deallocate buffer boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); - boost::lock_guard<boost::mutex> lock(m_dcMutex); - for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ - DrillClientQueryResult* pDrillClientQueryResult=it->second; - std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL"); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId - << " QueryId: "<< qidString << std::endl;) - } - if(msg.m_coord_id==0){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) - return QRY_SUCCESS; - } - std::map<int, DrillClientQueryHandle*>::const_iterator it; - it=this->m_queryHandles.find(msg.m_coord_id); - if(it!=this->m_queryHandles.end()){ - DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second); - if (!pDrillClientQueryResult) { - return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); - } - exec::shared::QueryId *qid = new exec::shared::QueryId; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) - qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) - m_queryResults[qid]=pDrillClientQueryResult; - //save queryId allocated here so we can free it later - pDrillClientQueryResult->setQueryId(qid); - }else{ - return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + { + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + + for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ + DrillClientQueryResult* pQueryResult=it->second; + std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL"); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pQueryResult->m_coordinationId + << " QueryId: "<< qidString << std::endl;) + } + + std::map<int, DrillClientQueryHandle*>::const_iterator it; + it=this->m_queryHandles.find(msg.m_coord_id); + if(it==this->m_queryHandles.end()){ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second); + if (!pDrillClientQueryResult) { + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + + // Check for cancellation to notify + if (pDrillClientQueryResult->isCancelled()) { + ret = QRY_CANCELED; + } + else { + exec::shared::QueryId *qid = new exec::shared::QueryId; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) + qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;) + m_queryResults[qid]=pDrillClientQueryResult; + //save queryId allocated here so we can free it later + pDrillClientQueryResult->setQueryId(qid); + } + } + if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) { + return handleQryCancellation(ret, pDrillClientQueryResult); } return ret; } @@ -1486,6 +1500,18 @@ status_t DrillClientImpl::handleQryError(status_t status, return status; } +status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult* pQueryHandle) { + sendCancel(&pQueryHandle->getQueryId()); + // Do not decrement pending requests here. We have sent a cancel and we may still receive results that are + // pushed on the wire before the cancel is processed. + pQueryHandle->setIsQueryPending(false); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) + pQueryHandle->setQueryStatus(status); + removeQueryResult(pQueryHandle); + removeQueryHandle(pQueryHandle); + return status; +} + void DrillClientImpl::broadcastError(DrillClientError* pErr){ if(pErr!=NULL){ std::map<int, DrillClientQueryHandle*>::const_iterator iter; |