diff options
-rw-r--r-- | contrib/native/client/example/querySubmitter.cpp | 40 | ||||
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.cpp | 53 | ||||
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.hpp | 1 |
3 files changed, 63 insertions, 31 deletions
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index 9ecee2457..2b0f000c3 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -22,7 +22,7 @@ #include <stdlib.h> #include "drill/drillc.hpp" -int nOptions=10; +int nOptions=11; struct Option{ char name[32]; @@ -37,6 +37,7 @@ struct Option{ {"api", "API type [sync|async]", true}, {"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false}, {"testCancel", "Cancel the query afterthe first record batch.", false}, + {"syncSend", "Send query only after previous result is received", false}, {"hshakeTimeout", "Handshake timeout (second).", false}, {"queryTimeout", "Query timeout (second).", false} }; @@ -44,6 +45,7 @@ struct Option{ std::map<std::string, std::string> qsOptionValues; bool bTestCancel=false; +bool bSyncSend=false; Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){ @@ -268,6 +270,7 @@ int main(int argc, char* argv[]) { std::string type_str=qsOptionValues["type"]; std::string logLevel=qsOptionValues["logLevel"]; std::string testCancel=qsOptionValues["testCancel"]; + std::string syncSend=qsOptionValues["syncSend"]; std::string hshakeTimeout=qsOptionValues["hshakeTimeout"]; std::string queryTimeout=qsOptionValues["queryTimeout"]; @@ -295,6 +298,7 @@ int main(int argc, char* argv[]) { } bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false; + bSyncSend = !strcmp(syncSend.c_str(), "true")?true:false; std::vector<std::string>::iterator queryInpIter; @@ -371,16 +375,30 @@ int main(int argc, char* argv[]) { client.freeQueryIterator(&pRecIter); } }else{ - for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) { - Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t; - client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle); - client.registerSchemaChangeListener(qryHandle, SchemaListener); - queryHandles.push_back(qryHandle); - } - client.waitForResults(); - for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) { - client.freeQueryResources(*queryHandleIter); - delete *queryHandleIter; + if(bSyncSend){ + for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) { + Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t; + client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle); + client.registerSchemaChangeListener(qryHandle, SchemaListener); + + client.waitForResults(); + + client.freeQueryResources(qryHandle); + delete qryHandle; + } + + }else{ + for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) { + Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t; + client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle); + client.registerSchemaChangeListener(qryHandle, SchemaListener); + queryHandles.push_back(qryHandle); + } + client.waitForResults(); + for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) { + client.freeQueryResources(*queryHandleIter); + delete *queryHandleIter; + } } } client.close(); diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 40bd81eae..f9c17f9de 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -397,12 +397,14 @@ void DrillClientImpl::getNextResult(){ } void DrillClientImpl::waitForResults(){ - // do nothing. No we do not need to explicity wait for the listener thread to finish - delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit - this->m_pListenerThread->join(); - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread " - << this->m_pListenerThread << " exited." << std::endl; - delete this->m_pListenerThread; this->m_pListenerThread=NULL; + if(this->m_pListenerThread!=NULL){ + // do nothing. No we do not need to explicity wait for the listener thread to finish + delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit + this->m_pListenerThread->join(); + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread " + << this->m_pListenerThread << " exited." << std::endl; + delete this->m_pListenerThread; this->m_pListenerThread=NULL; + } } status_t DrillClientImpl::readMsg(ByteBuf_t _buf, @@ -638,7 +640,9 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB boost::lock_guard<boost::mutex> lock(m_dcMutex); std::map<int,DrillClientQueryResult*>::iterator it; for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ - DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl; + std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL"); + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first + << " QueryId: "<< qidString << std::endl; } if(msg.m_coord_id==0){ DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl; @@ -855,22 +859,25 @@ status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::shared:: connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); m_pendingRequests=0; - if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} - m_pError=pErr; - broadcastError(this->m_pError); + if(!m_queryIds.empty()){ + // set query error only if queries are running + broadcastError(pErr); + }else{ + if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} + m_pError=pErr; + } return status; } status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); - if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} - m_pError=pErr; + // set query error only if queries are running if(pQueryResult!=NULL){ m_pendingRequests--; pQueryResult->signalError(pErr); }else{ m_pendingRequests=0; - broadcastError(this->m_pError); + broadcastError(pErr); } return status; } @@ -879,9 +886,8 @@ status_t DrillClientImpl::handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); - if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} - this->m_pError = DrillClientError::getErrorObject(e); - pQueryResult->signalError(this->m_pError); + DrillClientError* pErr = DrillClientError::getErrorObject(e); + pQueryResult->signalError(pErr); m_pendingRequests--; return status; } @@ -904,10 +910,11 @@ status_t DrillClientImpl::handleTerminatedQryState( std::string msg, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); - DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); - if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} - m_pError=pErr; - pQueryResult->signalError(pErr); + if(status!=QRY_COMPLETED){ + // set query error only if queries did not complete successfully + DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); + pQueryResult->signalError(pErr); + } return status; } @@ -1109,6 +1116,9 @@ void DrillClientQueryResult::cancel() { void DrillClientQueryResult::signalError(DrillClientError* pErr){ // Ignore return values from the listener. if(pErr!=NULL){ + if(m_pError!=NULL){ + delete m_pError; m_pError=NULL; + } m_pError=pErr; pfnQueryResultsListener pResultsListener=this->m_pResultsListener; if(pResultsListener!=NULL){ @@ -1157,6 +1167,9 @@ void DrillClientQueryResult::clearAndDestroy(){ delete pR; } } + if(m_pError!=NULL){ + delete m_pError; m_pError=NULL; +} } char ZookeeperImpl::s_drillRoot[]="/drill/"; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index bd33317f1..c87e1b7d4 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -134,6 +134,7 @@ class DrillClientQueryResult{ status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err); // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables. // Also used when a query is cancelled or when a query completed response is received. + // Error object is now owned by the DrillClientQueryResult object. void signalError(DrillClientError* pErr); void clearAndDestroy(); |