aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/drillClientImpl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/drillClientImpl.hpp')
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp41
1 files changed, 30 insertions, 11 deletions
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index d37076e6a..852233f8b 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -368,16 +368,17 @@ class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnCol
DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {}
};
+// Length Decoder Function Pointer definition
+typedef size_t (DrillClientImpl::*lengthDecoder)(const ByteBuf_t, uint32_t&);
class DrillClientImpl : public DrillClientImplBase{
public:
DrillClientImpl():
- m_coordinationId(1),
m_handshakeVersion(0),
m_handshakeStatus(exec::user::SUCCESS),
m_bIsConnected(false),
m_saslAuthenticator(NULL),
- m_saslResultCode(SASL_OK),
+ m_saslResultCode(SASL_OK),
m_saslDone(false),
m_pendingRequests(0),
m_pError(NULL),
@@ -388,9 +389,11 @@ class DrillClientImpl : public DrillClientImplBase{
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
m_wbuf(MAX_SOCK_RD_BUFSIZE),
- m_bIsDirectConnection(false)
+ m_bIsDirectConnection(false)
{
m_coordinationId=rand()%1729+1;
+ m_fpCurrentReadMsgHandler = &DrillClientImpl::readMsg;
+ m_fpCurrentSendHandler = &DrillClientImpl::sendSyncPlain;
};
~DrillClientImpl(){
@@ -477,9 +480,10 @@ class DrillClientImpl : public DrillClientImplBase{
void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
- // send synchronous messages
- //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg);
- connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg);
+ // synchronous message send handlers
+ connectionStatus_t sendSyncCommon(rpc::OutBoundRpcMessage& msg);
+ connectionStatus_t sendSyncPlain();
+ connectionStatus_t sendSyncEncrypted();
// handshake
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
@@ -488,10 +492,16 @@ class DrillClientImpl : public DrillClientImplBase{
void startMessageListener();
// Query results
void getNextResult();
- status_t readMsg(
- ByteBuf_t _buf,
- AllocatedBufferPtr* allocatedBuffer,
- rpc::InBoundRpcMessage& msg);
+ // Read Message Handlers
+ status_t readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ status_t readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ status_t readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
+ uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler);
+ void doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode);
+ void doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode);
+ // Length decode handlers
+ size_t lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
+ size_t rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
@@ -506,7 +516,7 @@ class DrillClientImpl : public DrillClientImplBase{
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
- void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
+ void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
@@ -540,6 +550,7 @@ class DrillClientImpl : public DrillClientImplBase{
void finishAuthentication();
void shutdownSocket();
+ bool clientNeedsEncryption(const DrillUserProperties* userProperties);
int32_t m_coordinationId;
int32_t m_handshakeVersion;
@@ -557,6 +568,14 @@ class DrillClientImpl : public DrillClientImplBase{
boost::mutex m_saslMutex; // mutex to protect m_saslDone
boost::condition_variable m_saslCv; // to signal completion of SASL exchange
+ // Used for encryption and is set when server notifies in first handshake response.
+ EncryptionContext m_encryptionCtxt;
+
+ // Function pointer for read and send handler. By default these are referred to handler for plain message read/send. When encryption is enabled
+ // then after successful handshake these pointers refer to handler for encrypted message read/send over wire.
+ status_t (DrillClientImpl::*m_fpCurrentReadMsgHandler)(ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
+ connectionStatus_t (DrillClientImpl::*m_fpCurrentSendHandler)();
+
std::string m_connectStr;
//