diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.hpp | 392 |
1 files changed, 249 insertions, 143 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 06f37e059..f9d077957 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -21,7 +21,6 @@ #define DRILL_CLIENT_IMPL_H #include "drill/common.hpp" - // Define some BOOST defines // WIN32_SHUTDOWN_ON_TIMEOUT is defined in "drill/common.hpp" for Windows 32 bit platform #ifndef WIN32_SHUTDOWN_ON_TIMEOUT @@ -29,27 +28,25 @@ #endif //WIN32_SHUTDOWN_ON_TIMEOUT #include <algorithm> -#include <stdlib.h> -#include <time.h> #include <queue> #include <vector> -#include <boost/asio.hpp> +#include <boost/asio.hpp> #if defined _WIN32 || defined _WIN64 -#include <zookeeper.h> //Windows header files redefine 'random' #ifdef random #undef random #endif -#else -#include <zookeeper/zookeeper.h> #endif #include <boost/asio/deadline_timer.hpp> +#include <boost/function.hpp> #include <boost/thread.hpp> #include "drill/drillClient.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" +#include "drill/preparedStatement.hpp" +#include "collectionsImpl.hpp" +#include "metadata.hpp" +#include "rpcMessage.hpp" #include "utils.hpp" #include "User.pb.h" #include "UserBitShared.pb.h" @@ -57,11 +54,11 @@ namespace Drill { class DrillClientImpl; -class InBoundRpcMessage; -class OutBoundRpcMessage; + +class DrillClientQueryHandle; + +class DrillClientPrepareHandle; class RecordBatch; -class RpcEncoder; -class RpcDecoder; /* * Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl @@ -89,6 +86,8 @@ class DrillClientImplBase{ // Submits a query to a drillbit. virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0; + virtual DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx)=0; + virtual DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx)=0; //Waits as a connection has results pending virtual void waitForResults()=0; @@ -96,31 +95,109 @@ class DrillClientImplBase{ //Validates handshake at connect time. virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0; - virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0; + virtual void freeQueryResources(DrillClientQueryHandle* pQryHandle)=0; + + virtual meta::DrillMetadata* getMetadata() = 0; + virtual void freeMetadata(meta::DrillMetadata* metadata) = 0; }; -class DrillClientQueryResult{ +/** + * Base type for query handles + */ +class DrillClientQueryHandle{ friend class DrillClientImpl; public: - DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query): - m_pClient(pClient), + DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context): + m_client(client), m_coordinationId(coordId), m_query(query), + m_status(QRY_SUCCESS), + m_bCancel(false), + m_bHasError(false), + m_pError(NULL), + m_pApplicationContext(context){ + }; + + virtual ~DrillClientQueryHandle(){ + clearAndDestroy(); + }; + + virtual void cancel(); + bool isCancelled() const {return m_bCancel;}; + int32_t getCoordinationId() const { return m_coordinationId;} + const std::string& getQuery() const { return m_query;} + + bool hasError() const { return m_bHasError;} + void resetError() { m_bHasError = false; } + + status_t getErrorStatus() const { return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;} + const DrillClientError* getError() const { return m_pError;} + void setQueryStatus(status_t s){ m_status = s;} + status_t getQueryStatus() const { return m_status;} + inline DrillClientImpl& client() const { return m_client; }; + + inline void* getApplicationContext() const { return m_pApplicationContext; } + + protected: + + virtual void signalError(DrillClientError* pErr); + virtual void clearAndDestroy(); + + private: + DrillClientImpl& m_client; + + int32_t m_coordinationId; + std::string m_query; + status_t m_status; + bool m_bCancel; + bool m_bHasError; + + const DrillClientError* m_pError; + + void* m_pApplicationContext; +}; + +template<typename Listener, typename ListenerValue> +class DrillClientBaseHandle: public DrillClientQueryHandle { + friend class DrillClientImpl; + public: + DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context): + DrillClientQueryHandle(client, coordId, query, context), + m_pApplicationListener(listener){ + }; + + virtual ~DrillClientBaseHandle(){ + clearAndDestroy(); + }; + + inline Listener getApplicationListener() const { return m_pApplicationListener; } + + + protected: + virtual status_t notifyListener(ListenerValue v, DrillClientError* pErr); + + virtual void signalError(DrillClientError* pErr); + void setHasError(bool hasError) { m_bHasError = hasError; } + + private: + Listener m_pApplicationListener; +}; + +class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>{ + friend class DrillClientImpl; + public: + DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx): + DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx), m_numBatches(0), m_columnDefs(new std::vector<Drill::FieldMetadata*>), m_bIsQueryPending(true), m_bIsLastChunk(false), - m_bCancel(false), m_bHasSchemaChanged(false), m_bHasData(false), - m_bHasError(false), m_queryState(exec::shared::QueryResult_QueryState_STARTING), - m_pError(NULL), m_pQueryId(NULL), - m_pSchemaListener(NULL), - m_pResultsListener(NULL), - m_pListenerCtx(NULL) { + m_pSchemaListener(NULL) { }; ~DrillClientQueryResult(){ @@ -128,20 +205,15 @@ class DrillClientQueryResult{ }; // get data asynchronously - void registerListener(pfnQueryResultsListener listener, void* listenerCtx){ - this->m_pResultsListener=listener; - this->m_pListenerCtx = listenerCtx; - } - void registerSchemaChangeListener(pfnSchemaListener l){ m_pSchemaListener=l; } - // Synchronous call to get data. Caller assumes ownership of the recod batch + // Synchronous call to get data. Caller assumes ownership of the record batch // returned and it is assumed to have been consumed. RecordBatch* getNext(); // Synchronous call to get a look at the next Record Batch. This - // call does not move the current pointer forward. Repeatied calls + // call does not move the current pointer forward. Repeated calls // to peekNext return the same value until getNext is called. RecordBatch* peekNext(); // Blocks until data is available. @@ -150,32 +222,26 @@ class DrillClientQueryResult{ // placeholder to return an empty col def vector when calls are made out of order. static FieldDefPtr s_emptyColDefs; - FieldDefPtr getColumnDefs(){ + FieldDefPtr getColumnDefs() { boost::lock_guard<boost::mutex> bufferLock(this->m_schemaMutex); return this->m_columnDefs; } - void cancel(); - bool isCancelled(){return this->m_bCancel;}; - bool hasSchemaChanged(){return this->m_bHasSchemaChanged;}; - int32_t getCoordinationId(){ return this->m_coordinationId;} - const std::string& getQuery(){ return this->m_query;} + bool hasSchemaChanged() const {return this->m_bHasSchemaChanged;}; void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;} - void* getListenerContext() {return this->m_pListenerCtx;} - exec::shared::QueryId& getQueryId(){ return *(this->m_pQueryId); } - bool hasError(){ return m_bHasError;} - status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;} - const DrillClientError* getError(){ return m_pError;} - void setQueryStatus(status_t s){ m_status = s;} - status_t getQueryStatus(){ return m_status;} + exec::shared::QueryId& getQueryId() const { return *(this->m_pQueryId); } void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;} - exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;} + exec::shared::QueryResult_QueryState getQueryState() const { return m_queryState;} void setIsQueryPending(bool isPending){ boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); m_bIsQueryPending=isPending; } + protected: + virtual status_t notifyListener(RecordBatch* batch, DrillClientError* pErr); + virtual void signalError(DrillClientError* pErr); + virtual void clearAndDestroy(); private: status_t setupColumnDefs(exec::shared::QueryData* pQueryData); @@ -183,15 +249,7 @@ class DrillClientQueryResult{ // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables. // Also used when a query is cancelled or when a query completed response is received. // Error object is now owned by the DrillClientQueryResult object. - void signalError(DrillClientError* pErr); void signalComplete(); - void clearAndDestroy(); - - - DrillClientImpl* m_pClient; - - int32_t m_coordinationId; - const std::string& m_query; size_t m_numBatches; // number of record batches received so far @@ -213,28 +271,90 @@ class DrillClientQueryResult{ // if m_bIsQueryPending is true, we continue to wait for results bool m_bIsQueryPending; bool m_bIsLastChunk; - bool m_bCancel; bool m_bHasSchemaChanged; bool m_bHasData; - bool m_bHasError; // state in the last query result received from the server. exec::shared::QueryResult_QueryState m_queryState; - const DrillClientError* m_pError; - exec::shared::QueryId* m_pQueryId; - status_t m_status; // Schema change listener pfnSchemaListener m_pSchemaListener; - // Results callback - pfnQueryResultsListener m_pResultsListener; +}; + +class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement { + public: + DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx): + DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx), + PreparedStatement(), + m_columnDefs(new std::vector<Drill::FieldMetadata*>) { + }; + + // PreparedStatement overrides + virtual std::size_t getNumFields() const { return m_columnDefs->size(); } + virtual const Drill::FieldMetadata& getFieldMetadata(std::size_t index) const { return *m_columnDefs->at(index);} + + protected: + virtual void clearAndDestroy(); + + private: + friend class DrillClientImpl; + status_t setupPreparedStatement(const exec::user::PreparedStatement& pstmt); + + FieldDefPtr m_columnDefs; + ::exec::user::PreparedStatementHandle m_preparedStatementHandle; +}; + +template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult> +class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> { +public: + DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx): + DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx) {} + + void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); } + +private: + friend class DrillClientImpl; + + // Meta informations returned to the user, linked to the handle + DrillVector<MetaType, MetaImpl> m_meta; + + // to keep a reference on the underlying metadata object, and + // make sure it's clean when this handle is destroyed + boost::shared_ptr<MetadataResult> m_pMetadata; + +}; + +class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp> { + friend class DrillClientImpl; +public: + DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx) {} +}; + +class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> { + friend class DrillClientImpl; +public: + DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx) {} +}; + +class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> { + friend class DrillClientImpl; +public: + DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx) {} +}; - // Listener context - void * m_pListenerCtx; +class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> { + friend class DrillClientImpl; + public: + DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {} }; + class DrillClientImpl : public DrillClientImplBase{ public: DrillClientImpl(): @@ -250,7 +370,8 @@ class DrillClientImpl : public DrillClientImplBase{ m_deadlineTimer(m_io_service), m_heartbeatTimer(m_io_service), m_rbuf(NULL), - m_wbuf(MAX_SOCK_RD_BUFSIZE) + m_wbuf(MAX_SOCK_RD_BUFSIZE), + m_bIsDirectConnection(false) { m_coordinationId=rand()%1729+1; }; @@ -300,14 +421,24 @@ class DrillClientImpl : public DrillClientImplBase{ void Close() ; DrillClientError* getError(){ return m_pError;} DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); + DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx); + DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx); + void waitForResults(); connectionStatus_t validateHandshake(DrillUserProperties* props); - void freeQueryResources(DrillClientQueryResult* pQryResult){ - // Doesn't need to do anything - return; + void freeQueryResources(DrillClientQueryHandle* pQryHandle){ + delete pQryHandle; }; + + const exec::user::RpcEndpointInfos& getServerInfos() const { return m_serverInfos; } + + meta::DrillMetadata* getMetadata(); + + void freeMetadata(meta::DrillMetadata* metadata); private: + friend class meta::DrillMetadata; + friend class DrillClientQueryHandle; friend class DrillClientQueryResult; friend class PooledDrillClientImpl; @@ -327,8 +458,8 @@ class DrillClientImpl : public DrillClientImplBase{ int32_t getNextCoordinationId(){ return ++m_coordinationId; }; // send synchronous messages - //connectionStatus_t recvSync(InBoundRpcMessage& msg); - connectionStatus_t sendSync(OutBoundRpcMessage& msg); + //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg); + connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg); // handshake connectionStatus_t recvHandshake(); void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred ); @@ -340,45 +471,54 @@ class DrillClientImpl : public DrillClientImplBase{ status_t readMsg( ByteBuf_t _buf, AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, - boost::system::error_code& error); - status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); - status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); + rpc::InBoundRpcMessage& msg); + status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); + status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr); - status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); - DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid); + status_t processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + DrillClientQueryResult* findQueryResult(const exec::shared::QueryId& qid); status_t processQueryStatusResult( exec::shared::QueryResult* qr, DrillClientQueryResult* pDrillClientQueryResult); void handleReadTimeout(const boost::system::error_code & err); void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; - status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError); - status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); - connectionStatus_t handleConnError(connectionStatus_t status, std::string msg); - status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult); - status_t handleQryError(status_t status, - const exec::shared::DrillPBError& e, - DrillClientQueryResult* pQueryResult); - // handle query state indicating query is COMPELTED or CANCELED - // (i.e., COMPELTED or CANCELED) + status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError); + status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError); + connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg); + status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle); + status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle); + // handle query state indicating query is COMPLETED or CANCELED + // (i.e., COMPLETED or CANCELED) status_t handleTerminatedQryState(status_t status, - std::string msg, + const std::string& msg, DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); - void clearMapEntries(DrillClientQueryResult* pQueryResult); - void sendAck(InBoundRpcMessage& msg, bool isOk); - void sendCancel(exec::shared::QueryId* pQueryId); + void removeQueryHandle(DrillClientQueryHandle* pQueryHandle); + void removeQueryResult(DrillClientQueryResult* pQueryResult); + void sendAck(const rpc::InBoundRpcMessage& msg, bool isOk); + void sendCancel(const exec::shared::QueryId* pQueryId); - void shutdownSocket(); + template<typename Handle> + Handle* sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& msg); + // metadata requests + DrillClientCatalogResult* getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx); + DrillClientSchemaResult* getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx); + DrillClientTableResult* getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx); + DrillClientColumnResult* getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx); - static RpcEncoder s_encoder; - static RpcDecoder s_decoder; + void shutdownSocket(); int32_t m_coordinationId; int32_t m_handshakeVersion; exec::user::HandshakeStatus m_handshakeStatus; std::string m_handshakeErrorId; std::string m_handshakeErrorMsg; + exec::user::RpcEndpointInfos m_serverInfos; bool m_bIsConnected; std::string m_connectStr; @@ -418,8 +558,8 @@ class DrillClientImpl : public DrillClientImplBase{ // Mutex to protect drill client operations boost::mutex m_dcMutex; - // Map of coordination id to Query Ids. - std::map<int, DrillClientQueryResult*> m_queryIds; + // Map of coordination id to Query handles. + std::map<int, DrillClientQueryHandle*> m_queryHandles; // Map of query id to query result for currently executing queries std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults; @@ -431,7 +571,7 @@ class DrillClientImpl : public DrillClientImplBase{ }; inline bool DrillClientImpl::Active() { - return this->m_bIsConnected;; + return this->m_bIsConnected; } @@ -442,17 +582,17 @@ inline bool DrillClientImpl::Active() { * */ class PooledDrillClientImpl : public DrillClientImplBase{ public: - PooledDrillClientImpl(){ - m_bIsDirectConnection=false; - m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS; + PooledDrillClientImpl(): + m_lastConnection(-1), + m_queriesExecuted(0), + m_maxConcurrentConnections(DEFAULT_MAX_CONCURRENT_CONNECTIONS), + m_bIsDirectConnection(false), + m_pError(NULL), + m_pUserProperties() { char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV); if(maxConn!=NULL){ m_maxConcurrentConnections=atoi(maxConn); } - m_lastConnection=-1; - m_pError=NULL; - m_queriesExecuted=0; - m_pUserProperties=NULL; } ~PooledDrillClientImpl(){ @@ -460,7 +600,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{ delete *it; } m_clientConnections.clear(); - if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} } @@ -482,15 +621,22 @@ class PooledDrillClientImpl : public DrillClientImplBase{ // Connections once added to the pool will be removed only when the DrillClient is closed. DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); + DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx); + DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx); + //Waits as long as any one drillbit connection has results pending void waitForResults(); //Validates handshake only against the first drillbit connected to. connectionStatus_t validateHandshake(DrillUserProperties* props); - void freeQueryResources(DrillClientQueryResult* pQryResult); + void freeQueryResources(DrillClientQueryHandle* pQueryHandle); + + int getDrillbitCount() const { return m_drillbits.size();}; + + meta::DrillMetadata* getMetadata(); - int getDrillbitCount(){ return m_drillbits.size();}; + void freeMetadata(meta::DrillMetadata* metadata); private: @@ -502,9 +648,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{ // is currently executing. If none, std::vector<DrillClientImpl*> m_clientConnections; boost::mutex m_poolMutex; // protect access to the vector - - //ZookeeperImpl zook; - // Use this to decide which drillbit to select next from the list of drillbits. size_t m_lastConnection; boost::mutex m_cMutex; @@ -524,44 +667,7 @@ class PooledDrillClientImpl : public DrillClientImplBase{ std::vector<std::string> m_drillbits; - DrillUserProperties* m_pUserProperties;//Keep a copy of user properties -}; - -class ZookeeperImpl{ - public: - ZookeeperImpl(); - ~ZookeeperImpl(); - static ZooLogLevel getZkLogLevel(); - // comma separated host:port pairs, each corresponding to a zk - // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 - DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill); - void close(); - static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context); - void debugPrint(); - std::string& getError(){return m_err;} - const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();} - // return unshuffled list of drillbits - int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits); - // picks the index drillbit and returns the corresponding endpoint object - int getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint); - - - private: - static char s_drillRoot[]; - static char s_defaultCluster[]; - zhandle_t* m_zh; - clientid_t m_id; - int m_state; - std::string m_err; - - struct String_vector* m_pDrillbits; - - boost::mutex m_cvMutex; - // Condition variable to signal connection callback has been processed - boost::condition_variable m_cv; - bool m_bConnecting; - exec::DrillServiceInstance m_drillServiceInstance; - std::string m_rootDir; + boost::shared_ptr<DrillUserProperties> m_pUserProperties;//Keep a copy of user properties }; } // namespace Drill |