diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.hpp | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 33f81dbbf..95fe92254 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -127,14 +127,19 @@ class DrillClientQueryResult{ void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;} exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;} + void setIsQueryPending(bool isPending){ + boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); + m_bIsQueryPending=isPending; + } private: - status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult); + status_t setupColumnDefs(exec::shared::QueryData* pQueryData); status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err); // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables. // Also used when a query is cancelled or when a query completed response is received. // Error object is now owned by the DrillClientQueryResult object. void signalError(DrillClientError* pErr); + void signalComplete(); void clearAndDestroy(); @@ -212,7 +217,6 @@ class DrillClientImpl{ this->m_pWork = NULL; } - clearCancelledEntries(); m_deadlineTimer.cancel(); m_io_service.stop(); boost::system::error_code ignorederr; @@ -272,13 +276,16 @@ class DrillClientImpl{ InBoundRpcMessage& msg, boost::system::error_code& error); status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); + status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr); status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); + DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid); status_t processQueryStatusResult( exec::shared::QueryResult* qr, DrillClientQueryResult* pDrillClientQueryResult); void handleReadTimeout(const boost::system::error_code & err); void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; - status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); + status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError); + status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); connectionStatus_t handleConnError(connectionStatus_t status, std::string msg); status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult); status_t handleQryError(status_t status, @@ -291,7 +298,6 @@ class DrillClientImpl{ DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); void clearMapEntries(DrillClientQueryResult* pQueryResult); - void clearCancelledEntries(); void sendAck(InBoundRpcMessage& msg, bool isOk); void sendCancel(exec::shared::QueryId* pQueryId); @@ -335,11 +341,6 @@ class DrillClientImpl{ // Map of query id to query result for currently executing queries std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults; - // - // State for every Query id whose queries have result data pending but which - // have been cancelled and whose resources have been released by the client application. - // The entry is cleared when the state changes to completed or failed. - std::set<exec::shared::QueryId*, compareQueryId> m_cancelledQueries; }; |