aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp392
1 files changed, 249 insertions, 143 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 06f37e059..f9d077957 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -21,7 +21,6 @@
#define DRILL_CLIENT_IMPL_H
#include "drill/common.hpp"
-
// Define some BOOST defines
// WIN32_SHUTDOWN_ON_TIMEOUT is defined in "drill/common.hpp" for Windows 32 bit platform
#ifndef WIN32_SHUTDOWN_ON_TIMEOUT
@@ -29,27 +28,25 @@
#endif //WIN32_SHUTDOWN_ON_TIMEOUT
#include <algorithm>
-#include <stdlib.h>
-#include <time.h>
#include <queue>
#include <vector>
-#include <boost/asio.hpp>
+#include <boost/asio.hpp>
#if defined _WIN32 || defined _WIN64
-#include <zookeeper.h>
//Windows header files redefine 'random'
#ifdef random
#undef random
#endif
-#else
-#include <zookeeper/zookeeper.h>
#endif
#include <boost/asio/deadline_timer.hpp>
+#include <boost/function.hpp>
#include <boost/thread.hpp>
#include "drill/drillClient.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
+#include "drill/preparedStatement.hpp"
+#include "collectionsImpl.hpp"
+#include "metadata.hpp"
+#include "rpcMessage.hpp"
#include "utils.hpp"
#include "User.pb.h"
#include "UserBitShared.pb.h"
@@ -57,11 +54,11 @@
namespace Drill {
class DrillClientImpl;
-class InBoundRpcMessage;
-class OutBoundRpcMessage;
+
+class DrillClientQueryHandle;
+
+class DrillClientPrepareHandle;
class RecordBatch;
-class RpcEncoder;
-class RpcDecoder;
/*
* Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl
@@ -89,6 +86,8 @@ class DrillClientImplBase{
// Submits a query to a drillbit.
virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0;
+ virtual DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx)=0;
+ virtual DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx)=0;
//Waits as a connection has results pending
virtual void waitForResults()=0;
@@ -96,31 +95,109 @@ class DrillClientImplBase{
//Validates handshake at connect time.
virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0;
- virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0;
+ virtual void freeQueryResources(DrillClientQueryHandle* pQryHandle)=0;
+
+ virtual meta::DrillMetadata* getMetadata() = 0;
+ virtual void freeMetadata(meta::DrillMetadata* metadata) = 0;
};
-class DrillClientQueryResult{
+/**
+ * Base type for query handles
+ */
+class DrillClientQueryHandle{
friend class DrillClientImpl;
public:
- DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query):
- m_pClient(pClient),
+ DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context):
+ m_client(client),
m_coordinationId(coordId),
m_query(query),
+ m_status(QRY_SUCCESS),
+ m_bCancel(false),
+ m_bHasError(false),
+ m_pError(NULL),
+ m_pApplicationContext(context){
+ };
+
+ virtual ~DrillClientQueryHandle(){
+ clearAndDestroy();
+ };
+
+ virtual void cancel();
+ bool isCancelled() const {return m_bCancel;};
+ int32_t getCoordinationId() const { return m_coordinationId;}
+ const std::string& getQuery() const { return m_query;}
+
+ bool hasError() const { return m_bHasError;}
+ void resetError() { m_bHasError = false; }
+
+ status_t getErrorStatus() const { return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;}
+ const DrillClientError* getError() const { return m_pError;}
+ void setQueryStatus(status_t s){ m_status = s;}
+ status_t getQueryStatus() const { return m_status;}
+ inline DrillClientImpl& client() const { return m_client; };
+
+ inline void* getApplicationContext() const { return m_pApplicationContext; }
+
+ protected:
+
+ virtual void signalError(DrillClientError* pErr);
+ virtual void clearAndDestroy();
+
+ private:
+ DrillClientImpl& m_client;
+
+ int32_t m_coordinationId;
+ std::string m_query;
+ status_t m_status;
+ bool m_bCancel;
+ bool m_bHasError;
+
+ const DrillClientError* m_pError;
+
+ void* m_pApplicationContext;
+};
+
+template<typename Listener, typename ListenerValue>
+class DrillClientBaseHandle: public DrillClientQueryHandle {
+ friend class DrillClientImpl;
+ public:
+ DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context):
+ DrillClientQueryHandle(client, coordId, query, context),
+ m_pApplicationListener(listener){
+ };
+
+ virtual ~DrillClientBaseHandle(){
+ clearAndDestroy();
+ };
+
+ inline Listener getApplicationListener() const { return m_pApplicationListener; }
+
+
+ protected:
+ virtual status_t notifyListener(ListenerValue v, DrillClientError* pErr);
+
+ virtual void signalError(DrillClientError* pErr);
+ void setHasError(bool hasError) { m_bHasError = hasError; }
+
+ private:
+ Listener m_pApplicationListener;
+};
+
+class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>{
+ friend class DrillClientImpl;
+ public:
+ DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx):
+ DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx),
m_numBatches(0),
m_columnDefs(new std::vector<Drill::FieldMetadata*>),
m_bIsQueryPending(true),
m_bIsLastChunk(false),
- m_bCancel(false),
m_bHasSchemaChanged(false),
m_bHasData(false),
- m_bHasError(false),
m_queryState(exec::shared::QueryResult_QueryState_STARTING),
- m_pError(NULL),
m_pQueryId(NULL),
- m_pSchemaListener(NULL),
- m_pResultsListener(NULL),
- m_pListenerCtx(NULL) {
+ m_pSchemaListener(NULL) {
};
~DrillClientQueryResult(){
@@ -128,20 +205,15 @@ class DrillClientQueryResult{
};
// get data asynchronously
- void registerListener(pfnQueryResultsListener listener, void* listenerCtx){
- this->m_pResultsListener=listener;
- this->m_pListenerCtx = listenerCtx;
- }
-
void registerSchemaChangeListener(pfnSchemaListener l){
m_pSchemaListener=l;
}
- // Synchronous call to get data. Caller assumes ownership of the recod batch
+ // Synchronous call to get data. Caller assumes ownership of the record batch
// returned and it is assumed to have been consumed.
RecordBatch* getNext();
// Synchronous call to get a look at the next Record Batch. This
- // call does not move the current pointer forward. Repeatied calls
+ // call does not move the current pointer forward. Repeated calls
// to peekNext return the same value until getNext is called.
RecordBatch* peekNext();
// Blocks until data is available.
@@ -150,32 +222,26 @@ class DrillClientQueryResult{
// placeholder to return an empty col def vector when calls are made out of order.
static FieldDefPtr s_emptyColDefs;
- FieldDefPtr getColumnDefs(){
+ FieldDefPtr getColumnDefs() {
boost::lock_guard<boost::mutex> bufferLock(this->m_schemaMutex);
return this->m_columnDefs;
}
- void cancel();
- bool isCancelled(){return this->m_bCancel;};
- bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
- int32_t getCoordinationId(){ return this->m_coordinationId;}
- const std::string& getQuery(){ return this->m_query;}
+ bool hasSchemaChanged() const {return this->m_bHasSchemaChanged;};
void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
- void* getListenerContext() {return this->m_pListenerCtx;}
- exec::shared::QueryId& getQueryId(){ return *(this->m_pQueryId); }
- bool hasError(){ return m_bHasError;}
- status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;}
- const DrillClientError* getError(){ return m_pError;}
- void setQueryStatus(status_t s){ m_status = s;}
- status_t getQueryStatus(){ return m_status;}
+ exec::shared::QueryId& getQueryId() const { return *(this->m_pQueryId); }
void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
- exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;}
+ exec::shared::QueryResult_QueryState getQueryState() const { return m_queryState;}
void setIsQueryPending(bool isPending){
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=isPending;
}
+ protected:
+ virtual status_t notifyListener(RecordBatch* batch, DrillClientError* pErr);
+ virtual void signalError(DrillClientError* pErr);
+ virtual void clearAndDestroy();
private:
status_t setupColumnDefs(exec::shared::QueryData* pQueryData);
@@ -183,15 +249,7 @@ class DrillClientQueryResult{
// Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
// Also used when a query is cancelled or when a query completed response is received.
// Error object is now owned by the DrillClientQueryResult object.
- void signalError(DrillClientError* pErr);
void signalComplete();
- void clearAndDestroy();
-
-
- DrillClientImpl* m_pClient;
-
- int32_t m_coordinationId;
- const std::string& m_query;
size_t m_numBatches; // number of record batches received so far
@@ -213,28 +271,90 @@ class DrillClientQueryResult{
// if m_bIsQueryPending is true, we continue to wait for results
bool m_bIsQueryPending;
bool m_bIsLastChunk;
- bool m_bCancel;
bool m_bHasSchemaChanged;
bool m_bHasData;
- bool m_bHasError;
// state in the last query result received from the server.
exec::shared::QueryResult_QueryState m_queryState;
- const DrillClientError* m_pError;
-
exec::shared::QueryId* m_pQueryId;
- status_t m_status;
// Schema change listener
pfnSchemaListener m_pSchemaListener;
- // Results callback
- pfnQueryResultsListener m_pResultsListener;
+};
+
+class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement {
+ public:
+ DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx):
+ DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx),
+ PreparedStatement(),
+ m_columnDefs(new std::vector<Drill::FieldMetadata*>) {
+ };
+
+ // PreparedStatement overrides
+ virtual std::size_t getNumFields() const { return m_columnDefs->size(); }
+ virtual const Drill::FieldMetadata& getFieldMetadata(std::size_t index) const { return *m_columnDefs->at(index);}
+
+ protected:
+ virtual void clearAndDestroy();
+
+ private:
+ friend class DrillClientImpl;
+ status_t setupPreparedStatement(const exec::user::PreparedStatement& pstmt);
+
+ FieldDefPtr m_columnDefs;
+ ::exec::user::PreparedStatementHandle m_preparedStatementHandle;
+};
+
+template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult>
+class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> {
+public:
+ DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx):
+ DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx) {}
+
+ void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); }
+
+private:
+ friend class DrillClientImpl;
+
+ // Meta informations returned to the user, linked to the handle
+ DrillVector<MetaType, MetaImpl> m_meta;
+
+ // to keep a reference on the underlying metadata object, and
+ // make sure it's clean when this handle is destroyed
+ boost::shared_ptr<MetadataResult> m_pMetadata;
+
+};
+
+class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp> {
+ friend class DrillClientImpl;
+public:
+ DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx):
+ DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx) {}
+};
+
+class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> {
+ friend class DrillClientImpl;
+public:
+ DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx):
+ DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx) {}
+};
+
+class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> {
+ friend class DrillClientImpl;
+public:
+ DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx):
+ DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx) {}
+};
- // Listener context
- void * m_pListenerCtx;
+class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> {
+ friend class DrillClientImpl;
+ public:
+ DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx):
+ DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
};
+
class DrillClientImpl : public DrillClientImplBase{
public:
DrillClientImpl():
@@ -250,7 +370,8 @@ class DrillClientImpl : public DrillClientImplBase{
m_deadlineTimer(m_io_service),
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
- m_wbuf(MAX_SOCK_RD_BUFSIZE)
+ m_wbuf(MAX_SOCK_RD_BUFSIZE),
+ m_bIsDirectConnection(false)
{
m_coordinationId=rand()%1729+1;
};
@@ -300,14 +421,24 @@ class DrillClientImpl : public DrillClientImplBase{
void Close() ;
DrillClientError* getError(){ return m_pError;}
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
+ DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx);
+ DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx);
+
void waitForResults();
connectionStatus_t validateHandshake(DrillUserProperties* props);
- void freeQueryResources(DrillClientQueryResult* pQryResult){
- // Doesn't need to do anything
- return;
+ void freeQueryResources(DrillClientQueryHandle* pQryHandle){
+ delete pQryHandle;
};
+
+ const exec::user::RpcEndpointInfos& getServerInfos() const { return m_serverInfos; }
+
+ meta::DrillMetadata* getMetadata();
+
+ void freeMetadata(meta::DrillMetadata* metadata);
private:
+ friend class meta::DrillMetadata;
+ friend class DrillClientQueryHandle;
friend class DrillClientQueryResult;
friend class PooledDrillClientImpl;
@@ -327,8 +458,8 @@ class DrillClientImpl : public DrillClientImplBase{
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
// send synchronous messages
- //connectionStatus_t recvSync(InBoundRpcMessage& msg);
- connectionStatus_t sendSync(OutBoundRpcMessage& msg);
+ //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg);
+ connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg);
// handshake
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
@@ -340,45 +471,54 @@ class DrillClientImpl : public DrillClientImplBase{
status_t readMsg(
ByteBuf_t _buf,
AllocatedBufferPtr* allocatedBuffer,
- InBoundRpcMessage& msg,
- boost::system::error_code& error);
- status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
- status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
+ rpc::InBoundRpcMessage& msg);
+ status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
+ status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
- status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg );
- DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid);
+ status_t processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ status_t processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ status_t processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ status_t processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ status_t processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ status_t processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
+ DrillClientQueryResult* findQueryResult(const exec::shared::QueryId& qid);
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
- status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError);
- status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
- connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
- status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult);
- status_t handleQryError(status_t status,
- const exec::shared::DrillPBError& e,
- DrillClientQueryResult* pQueryResult);
- // handle query state indicating query is COMPELTED or CANCELED
- // (i.e., COMPELTED or CANCELED)
+ status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
+ status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
+ connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
+ status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
+ status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
+ // handle query state indicating query is COMPLETED or CANCELED
+ // (i.e., COMPLETED or CANCELED)
status_t handleTerminatedQryState(status_t status,
- std::string msg,
+ const std::string& msg,
DrillClientQueryResult* pQueryResult);
void broadcastError(DrillClientError* pErr);
- void clearMapEntries(DrillClientQueryResult* pQueryResult);
- void sendAck(InBoundRpcMessage& msg, bool isOk);
- void sendCancel(exec::shared::QueryId* pQueryId);
+ void removeQueryHandle(DrillClientQueryHandle* pQueryHandle);
+ void removeQueryResult(DrillClientQueryResult* pQueryResult);
+ void sendAck(const rpc::InBoundRpcMessage& msg, bool isOk);
+ void sendCancel(const exec::shared::QueryId* pQueryId);
- void shutdownSocket();
+ template<typename Handle>
+ Handle* sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& msg);
+ // metadata requests
+ DrillClientCatalogResult* getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx);
+ DrillClientSchemaResult* getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx);
+ DrillClientTableResult* getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx);
+ DrillClientColumnResult* getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx);
- static RpcEncoder s_encoder;
- static RpcDecoder s_decoder;
+ void shutdownSocket();
int32_t m_coordinationId;
int32_t m_handshakeVersion;
exec::user::HandshakeStatus m_handshakeStatus;
std::string m_handshakeErrorId;
std::string m_handshakeErrorMsg;
+ exec::user::RpcEndpointInfos m_serverInfos;
bool m_bIsConnected;
std::string m_connectStr;
@@ -418,8 +558,8 @@ class DrillClientImpl : public DrillClientImplBase{
// Mutex to protect drill client operations
boost::mutex m_dcMutex;
- // Map of coordination id to Query Ids.
- std::map<int, DrillClientQueryResult*> m_queryIds;
+ // Map of coordination id to Query handles.
+ std::map<int, DrillClientQueryHandle*> m_queryHandles;
// Map of query id to query result for currently executing queries
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
@@ -431,7 +571,7 @@ class DrillClientImpl : public DrillClientImplBase{
};
inline bool DrillClientImpl::Active() {
- return this->m_bIsConnected;;
+ return this->m_bIsConnected;
}
@@ -442,17 +582,17 @@ inline bool DrillClientImpl::Active() {
* */
class PooledDrillClientImpl : public DrillClientImplBase{
public:
- PooledDrillClientImpl(){
- m_bIsDirectConnection=false;
- m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS;
+ PooledDrillClientImpl():
+ m_lastConnection(-1),
+ m_queriesExecuted(0),
+ m_maxConcurrentConnections(DEFAULT_MAX_CONCURRENT_CONNECTIONS),
+ m_bIsDirectConnection(false),
+ m_pError(NULL),
+ m_pUserProperties() {
char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV);
if(maxConn!=NULL){
m_maxConcurrentConnections=atoi(maxConn);
}
- m_lastConnection=-1;
- m_pError=NULL;
- m_queriesExecuted=0;
- m_pUserProperties=NULL;
}
~PooledDrillClientImpl(){
@@ -460,7 +600,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{
delete *it;
}
m_clientConnections.clear();
- if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;}
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
}
@@ -482,15 +621,22 @@ class PooledDrillClientImpl : public DrillClientImplBase{
// Connections once added to the pool will be removed only when the DrillClient is closed.
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
+ DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx);
+ DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx);
+
//Waits as long as any one drillbit connection has results pending
void waitForResults();
//Validates handshake only against the first drillbit connected to.
connectionStatus_t validateHandshake(DrillUserProperties* props);
- void freeQueryResources(DrillClientQueryResult* pQryResult);
+ void freeQueryResources(DrillClientQueryHandle* pQueryHandle);
+
+ int getDrillbitCount() const { return m_drillbits.size();};
+
+ meta::DrillMetadata* getMetadata();
- int getDrillbitCount(){ return m_drillbits.size();};
+ void freeMetadata(meta::DrillMetadata* metadata);
private:
@@ -502,9 +648,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{
// is currently executing. If none,
std::vector<DrillClientImpl*> m_clientConnections;
boost::mutex m_poolMutex; // protect access to the vector
-
- //ZookeeperImpl zook;
-
// Use this to decide which drillbit to select next from the list of drillbits.
size_t m_lastConnection;
boost::mutex m_cMutex;
@@ -524,44 +667,7 @@ class PooledDrillClientImpl : public DrillClientImplBase{
std::vector<std::string> m_drillbits;
- DrillUserProperties* m_pUserProperties;//Keep a copy of user properties
-};
-
-class ZookeeperImpl{
- public:
- ZookeeperImpl();
- ~ZookeeperImpl();
- static ZooLogLevel getZkLogLevel();
- // comma separated host:port pairs, each corresponding to a zk
- // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
- DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill);
- void close();
- static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context);
- void debugPrint();
- std::string& getError(){return m_err;}
- const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();}
- // return unshuffled list of drillbits
- int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits);
- // picks the index drillbit and returns the corresponding endpoint object
- int getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint);
-
-
- private:
- static char s_drillRoot[];
- static char s_defaultCluster[];
- zhandle_t* m_zh;
- clientid_t m_id;
- int m_state;
- std::string m_err;
-
- struct String_vector* m_pDrillbits;
-
- boost::mutex m_cvMutex;
- // Condition variable to signal connection callback has been processed
- boost::condition_variable m_cv;
- bool m_bConnecting;
- exec::DrillServiceInstance m_drillServiceInstance;
- std::string m_rootDir;
+ boost::shared_ptr<DrillUserProperties> m_pUserProperties;//Keep a copy of user properties
};
} // namespace Drill