diff options
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r-- | contrib/native/client/src/clientlib/drillClientImpl.hpp | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index d37076e6a..852233f8b 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -368,16 +368,17 @@ class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnCol DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {} }; +// Length Decoder Function Pointer definition +typedef size_t (DrillClientImpl::*lengthDecoder)(const ByteBuf_t, uint32_t&); class DrillClientImpl : public DrillClientImplBase{ public: DrillClientImpl(): - m_coordinationId(1), m_handshakeVersion(0), m_handshakeStatus(exec::user::SUCCESS), m_bIsConnected(false), m_saslAuthenticator(NULL), - m_saslResultCode(SASL_OK), + m_saslResultCode(SASL_OK), m_saslDone(false), m_pendingRequests(0), m_pError(NULL), @@ -388,9 +389,11 @@ class DrillClientImpl : public DrillClientImplBase{ m_heartbeatTimer(m_io_service), m_rbuf(NULL), m_wbuf(MAX_SOCK_RD_BUFSIZE), - m_bIsDirectConnection(false) + m_bIsDirectConnection(false) { m_coordinationId=rand()%1729+1; + m_fpCurrentReadMsgHandler = &DrillClientImpl::readMsg; + m_fpCurrentSendHandler = &DrillClientImpl::sendSyncPlain; }; ~DrillClientImpl(){ @@ -477,9 +480,10 @@ class DrillClientImpl : public DrillClientImplBase{ void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out. int32_t getNextCoordinationId(){ return ++m_coordinationId; }; - // send synchronous messages - //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg); - connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg); + // synchronous message send handlers + connectionStatus_t sendSyncCommon(rpc::OutBoundRpcMessage& msg); + connectionStatus_t sendSyncPlain(); + connectionStatus_t sendSyncEncrypted(); // handshake connectionStatus_t recvHandshake(); void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred ); @@ -488,10 +492,16 @@ class DrillClientImpl : public DrillClientImplBase{ void startMessageListener(); // Query results void getNextResult(); - status_t readMsg( - ByteBuf_t _buf, - AllocatedBufferPtr* allocatedBuffer, - rpc::InBoundRpcMessage& msg); + // Read Message Handlers + status_t readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg); + status_t readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg); + status_t readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes, + uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler); + void doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode); + void doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode); + // Length decode handlers + size_t lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen); + size_t rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen); status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr); @@ -506,7 +516,7 @@ class DrillClientImpl : public DrillClientImplBase{ status_t processQueryStatusResult( exec::shared::QueryResult* qr, DrillClientQueryResult* pDrillClientQueryResult); void handleReadTimeout(const boost::system::error_code & err); - void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; + void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ; status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError); status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError); connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg); @@ -540,6 +550,7 @@ class DrillClientImpl : public DrillClientImplBase{ void finishAuthentication(); void shutdownSocket(); + bool clientNeedsEncryption(const DrillUserProperties* userProperties); int32_t m_coordinationId; int32_t m_handshakeVersion; @@ -557,6 +568,14 @@ class DrillClientImpl : public DrillClientImplBase{ boost::mutex m_saslMutex; // mutex to protect m_saslDone boost::condition_variable m_saslCv; // to signal completion of SASL exchange + // Used for encryption and is set when server notifies in first handshake response. + EncryptionContext m_encryptionCtxt; + + // Function pointer for read and send handler. By default these are referred to handler for plain message read/send. When encryption is enabled + // then after successful handshake these pointers refer to handler for encrypted message read/send over wire. + status_t (DrillClientImpl::*m_fpCurrentReadMsgHandler)(ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg); + connectionStatus_t (DrillClientImpl::*m_fpCurrentSendHandler)(); + std::string m_connectStr; // |