aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/native/client/example/querySubmitter.cpp40
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp53
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp1
3 files changed, 63 insertions, 31 deletions
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 9ecee2457..2b0f000c3 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -22,7 +22,7 @@
#include <stdlib.h>
#include "drill/drillc.hpp"
-int nOptions=10;
+int nOptions=11;
struct Option{
char name[32];
@@ -37,6 +37,7 @@ struct Option{
{"api", "API type [sync|async]", true},
{"logLevel", "Logging level [trace|debug|info|warn|error|fatal]", false},
{"testCancel", "Cancel the query afterthe first record batch.", false},
+ {"syncSend", "Send query only after previous result is received", false},
{"hshakeTimeout", "Handshake timeout (second).", false},
{"queryTimeout", "Query timeout (second).", false}
};
@@ -44,6 +45,7 @@ struct Option{
std::map<std::string, std::string> qsOptionValues;
bool bTestCancel=false;
+bool bSyncSend=false;
Drill::status_t SchemaListener(void* ctx, Drill::FieldDefPtr fields, Drill::DrillClientError* err){
@@ -268,6 +270,7 @@ int main(int argc, char* argv[]) {
std::string type_str=qsOptionValues["type"];
std::string logLevel=qsOptionValues["logLevel"];
std::string testCancel=qsOptionValues["testCancel"];
+ std::string syncSend=qsOptionValues["syncSend"];
std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
std::string queryTimeout=qsOptionValues["queryTimeout"];
@@ -295,6 +298,7 @@ int main(int argc, char* argv[]) {
}
bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false;
+ bSyncSend = !strcmp(syncSend.c_str(), "true")?true:false;
std::vector<std::string>::iterator queryInpIter;
@@ -371,16 +375,30 @@ int main(int argc, char* argv[]) {
client.freeQueryIterator(&pRecIter);
}
}else{
- for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
- Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
- client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
- client.registerSchemaChangeListener(qryHandle, SchemaListener);
- queryHandles.push_back(qryHandle);
- }
- client.waitForResults();
- for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
- client.freeQueryResources(*queryHandleIter);
- delete *queryHandleIter;
+ if(bSyncSend){
+ for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
+ Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
+ client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+ client.registerSchemaChangeListener(qryHandle, SchemaListener);
+
+ client.waitForResults();
+
+ client.freeQueryResources(qryHandle);
+ delete qryHandle;
+ }
+
+ }else{
+ for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
+ Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t;
+ client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
+ client.registerSchemaChangeListener(qryHandle, SchemaListener);
+ queryHandles.push_back(qryHandle);
+ }
+ client.waitForResults();
+ for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
+ client.freeQueryResources(*queryHandleIter);
+ delete *queryHandleIter;
+ }
}
}
client.close();
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 40bd81eae..f9c17f9de 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -397,12 +397,14 @@ void DrillClientImpl::getNextResult(){
}
void DrillClientImpl::waitForResults(){
- // do nothing. No we do not need to explicity wait for the listener thread to finish
- delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
- this->m_pListenerThread->join();
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
- << this->m_pListenerThread << " exited." << std::endl;
- delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+ if(this->m_pListenerThread!=NULL){
+ // do nothing. No we do not need to explicity wait for the listener thread to finish
+ delete this->m_pWork; this->m_pWork = NULL; // inform io_service that io_service is permited to exit
+ this->m_pListenerThread->join();
+ DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::waitForResults: Listener thread "
+ << this->m_pListenerThread << " exited." << std::endl;
+ delete this->m_pListenerThread; this->m_pListenerThread=NULL;
+ }
}
status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
@@ -638,7 +640,9 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB
boost::lock_guard<boost::mutex> lock(m_dcMutex);
std::map<int,DrillClientQueryResult*>::iterator it;
for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){
- DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl;
+ std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL");
+ DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first
+ << " QueryId: "<< qidString << std::endl;
}
if(msg.m_coord_id==0){
DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;
@@ -855,22 +859,25 @@ status_t DrillClientImpl::validateMessage(InBoundRpcMessage& msg, exec::shared::
connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
m_pendingRequests=0;
- if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
- m_pError=pErr;
- broadcastError(this->m_pError);
+ if(!m_queryIds.empty()){
+ // set query error only if queries are running
+ broadcastError(pErr);
+ }else{
+ if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
+ m_pError=pErr;
+ }
return status;
}
status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
- if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
- m_pError=pErr;
+ // set query error only if queries are running
if(pQueryResult!=NULL){
m_pendingRequests--;
pQueryResult->signalError(pErr);
}else{
m_pendingRequests=0;
- broadcastError(this->m_pError);
+ broadcastError(pErr);
}
return status;
}
@@ -879,9 +886,8 @@ status_t DrillClientImpl::handleQryError(status_t status,
const exec::shared::DrillPBError& e,
DrillClientQueryResult* pQueryResult){
assert(pQueryResult!=NULL);
- if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
- this->m_pError = DrillClientError::getErrorObject(e);
- pQueryResult->signalError(this->m_pError);
+ DrillClientError* pErr = DrillClientError::getErrorObject(e);
+ pQueryResult->signalError(pErr);
m_pendingRequests--;
return status;
}
@@ -904,10 +910,11 @@ status_t DrillClientImpl::handleTerminatedQryState(
std::string msg,
DrillClientQueryResult* pQueryResult){
assert(pQueryResult!=NULL);
- DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
- if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
- m_pError=pErr;
- pQueryResult->signalError(pErr);
+ if(status!=QRY_COMPLETED){
+ // set query error only if queries did not complete successfully
+ DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
+ pQueryResult->signalError(pErr);
+ }
return status;
}
@@ -1109,6 +1116,9 @@ void DrillClientQueryResult::cancel() {
void DrillClientQueryResult::signalError(DrillClientError* pErr){
// Ignore return values from the listener.
if(pErr!=NULL){
+ if(m_pError!=NULL){
+ delete m_pError; m_pError=NULL;
+ }
m_pError=pErr;
pfnQueryResultsListener pResultsListener=this->m_pResultsListener;
if(pResultsListener!=NULL){
@@ -1157,6 +1167,9 @@ void DrillClientQueryResult::clearAndDestroy(){
delete pR;
}
}
+ if(m_pError!=NULL){
+ delete m_pError; m_pError=NULL;
+}
}
char ZookeeperImpl::s_drillRoot[]="/drill/";
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index bd33317f1..c87e1b7d4 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -134,6 +134,7 @@ class DrillClientQueryResult{
status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
// Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
// Also used when a query is cancelled or when a query completed response is received.
+ // Error object is now owned by the DrillClientQueryResult object.
void signalError(DrillClientError* pErr);
void clearAndDestroy();