aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp19
1 files changed, 10 insertions, 9 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 33f81dbbf..95fe92254 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -127,14 +127,19 @@ class DrillClientQueryResult{
void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;}
+ void setIsQueryPending(bool isPending){
+ boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
+ m_bIsQueryPending=isPending;
+ }
private:
- status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult);
+ status_t setupColumnDefs(exec::shared::QueryData* pQueryData);
status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
// Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
// Also used when a query is cancelled or when a query completed response is received.
// Error object is now owned by the DrillClientQueryResult object.
void signalError(DrillClientError* pErr);
+ void signalComplete();
void clearAndDestroy();
@@ -212,7 +217,6 @@ class DrillClientImpl{
this->m_pWork = NULL;
}
- clearCancelledEntries();
m_deadlineTimer.cancel();
m_io_service.stop();
boost::system::error_code ignorederr;
@@ -272,13 +276,16 @@ class DrillClientImpl{
InBoundRpcMessage& msg,
boost::system::error_code& error);
status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
+ status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg );
+ DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid);
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
- status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
+ status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError);
+ status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status,
@@ -291,7 +298,6 @@ class DrillClientImpl{
DrillClientQueryResult* pQueryResult);
void broadcastError(DrillClientError* pErr);
void clearMapEntries(DrillClientQueryResult* pQueryResult);
- void clearCancelledEntries();
void sendAck(InBoundRpcMessage& msg, bool isOk);
void sendCancel(exec::shared::QueryId* pQueryId);
@@ -335,11 +341,6 @@ class DrillClientImpl{
// Map of query id to query result for currently executing queries
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
- //
- // State for every Query id whose queries have result data pending but which
- // have been cancelled and whose resources have been released by the client application.
- // The entry is cleared when the state changes to completed or failed.
- std::set<exec::shared::QueryId*, compareQueryId> m_cancelledQueries;
};