aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/channel.hpp')
-rw-r--r--contrib/native/client/src/clientlib/channel.hpp237
1 files changed, 237 insertions, 0 deletions
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
new file mode 100644
index 000000000..7f310e899
--- /dev/null
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CHANNEL_HPP
+#define CHANNEL_HPP
+
+#include "drill/common.hpp"
+#include "drill/drillClient.hpp"
+#include "streamSocket.hpp"
+
+namespace Drill {
+
+class UserProperties;
+
+ class ConnectionEndpoint{
+ public:
+ ConnectionEndpoint(const char* connStr);
+ ConnectionEndpoint(const char* host, const char* port);
+ ~ConnectionEndpoint();
+
+ //parse the connection string and set up the host and port to connect to
+ connectionStatus_t getDrillbitEndpoint();
+
+ std::string& getProtocol(){return m_protocol;}
+ std::string& getHost(){return m_host;}
+ std::string& getPort(){return m_port;}
+ DrillClientError* getError(){ return m_pError;};
+
+ private:
+ void parseConnectString();
+ bool isDirectConnection();
+ bool isZookeeperConnection();
+ connectionStatus_t getDrillbitEndpointFromZk();
+ connectionStatus_t handleError(connectionStatus_t status, std::string msg);
+
+ std::string m_connectString;
+ std::string m_pathToDrill;
+ std::string m_protocol;
+ std::string m_hostPortStr;
+ std::string m_host;
+ std::string m_port;
+
+ DrillClientError* m_pError;
+
+ };
+
+ class ChannelContext{
+ public:
+ ChannelContext(DrillUserProperties* props):m_properties(props){};
+ virtual ~ChannelContext(){};
+ const DrillUserProperties* getUserProperties() const { return m_properties;}
+ protected:
+ DrillUserProperties* m_properties;
+ };
+
+ class SSLChannelContext: public ChannelContext{
+ public:
+ static boost::asio::ssl::context::method getTlsVersion(std::string version){
+ if(version.empty()){
+ return boost::asio::ssl::context::tlsv12;
+ } else if (version == "tlsv12") {
+ return boost::asio::ssl::context::tlsv12;
+ } else if (version == "tlsv11") {
+ return boost::asio::ssl::context::tlsv11;
+ } else if (version == "sslv23") {
+ return boost::asio::ssl::context::sslv23;
+ } else if (version == "tlsv1") {
+ return boost::asio::ssl::context::tlsv1;
+ } else if (version == "sslv3") {
+ return boost::asio::ssl::context::sslv3;
+ } else {
+ return boost::asio::ssl::context::tlsv12;
+ }
+ }
+
+ SSLChannelContext(DrillUserProperties *props, boost::asio::ssl::context::method tlsVersion, boost::asio::ssl::verify_mode verifyMode) :
+ ChannelContext(props),
+ m_SSLContext(tlsVersion) {
+ m_SSLContext.set_default_verify_paths();
+ m_SSLContext.set_options(
+ boost::asio::ssl::context::default_workarounds
+ | boost::asio::ssl::context::no_sslv2
+ | boost::asio::ssl::context::single_dh_use
+ );
+ m_SSLContext.set_verify_mode(verifyMode);
+ };
+ ~SSLChannelContext(){};
+ boost::asio::ssl::context& getSslContext(){ return m_SSLContext;}
+ private:
+ boost::asio::ssl::context m_SSLContext;
+ };
+
+ typedef ChannelContext ChannelContext_t;
+ typedef SSLChannelContext SSLChannelContext_t;
+
+ class ChannelContextFactory{
+ public:
+ static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
+ };
+
+ /***
+ * The Channel class encapsulates a connection to a drillbit. Based on
+ * the connection string and the options, the connection will be either
+ * a simple socket or a socket using an ssl stream. The class also encapsulates
+ * connecting to a drillbit directly or thru zookeeper.
+ * The channel class owns the socket and the io_service that the applications
+ * will use to communicate with the server.
+ ***/
+ class Channel{
+ public:
+ Channel(const char* connStr);
+ Channel(const char* host, const char* port);
+ Channel(boost::asio::io_service& ioService, const char* connStr);
+ Channel(boost::asio::io_service& ioService, const char* host, const char* port);
+ virtual ~Channel();
+ virtual connectionStatus_t init(ChannelContext_t* context)=0;
+ connectionStatus_t connect();
+ bool isConnected(){ return m_state == CHANNEL_CONNECTED;}
+ template <typename SettableSocketOption> void setOption(SettableSocketOption& option);
+ DrillClientError* getError(){ return m_pError;}
+ void close(){
+ if(m_state==CHANNEL_INITIALIZED||m_state==CHANNEL_CONNECTED){
+ m_pSocket->protocolClose();
+ m_state=CHANNEL_CLOSED;
+ }
+ } // Not OK to use the channel after this call.
+
+ boost::asio::io_service& getIOService(){
+ return m_ioService;
+ }
+
+ // returns a reference to the underlying socket
+ // This access should really be removed and encapsulated in calls that
+ // manage async_send and async_recv
+ // Until then we will let DrillClientImpl have direct access
+ streamSocket_t& getInnerSocket(){
+ return m_pSocket->getInnerSocket();
+ }
+
+ AsioStreamSocket& getSocketStream(){
+ return *m_pSocket;
+ }
+
+ ConnectionEndpoint* getEndpoint(){return m_pEndpoint;}
+
+ protected:
+ connectionStatus_t handleError(connectionStatus_t status, std::string msg);
+
+ boost::asio::io_service& m_ioService;
+ boost::asio::io_service m_ioServiceFallback; // used if m_ioService is not provided
+ AsioStreamSocket* m_pSocket;
+ ConnectionEndpoint *m_pEndpoint;
+ ChannelContext_t *m_pContext;
+
+ private:
+
+ typedef enum channelState{
+ CHANNEL_UNINITIALIZED=1,
+ CHANNEL_INITIALIZED,
+ CHANNEL_CONNECTED,
+ CHANNEL_CLOSED
+ } channelState_t;
+
+ connectionStatus_t connectInternal();
+ connectionStatus_t protocolHandshake(bool useSystemConfig){
+ connectionStatus_t status = CONN_SUCCESS;
+ try{
+ m_pSocket->protocolHandshake(useSystemConfig);
+ } catch (boost::system::system_error e) {
+ status = handleError(CONN_HANDSHAKE_FAILED, e.what());
+ }
+ return status;
+ }
+
+ channelState_t m_state;
+ DrillClientError* m_pError;
+ bool m_ownIoService;
+ };
+
+ class SocketChannel: public Channel{
+ public:
+ SocketChannel(const char* connStr):Channel(connStr){
+ }
+ SocketChannel(const char* host, const char* port):Channel(host, port){
+ }
+ SocketChannel(boost::asio::io_service& ioService, const char* connStr)
+ :Channel(ioService, connStr){
+ }
+ SocketChannel(boost::asio::io_service& ioService, const char* host, const char* port)
+ :Channel(ioService, host, port){
+ }
+ connectionStatus_t init(ChannelContext_t* context=NULL);
+ };
+
+ class SSLStreamChannel: public Channel{
+ public:
+ SSLStreamChannel(const char* connStr):Channel(connStr){
+ }
+ SSLStreamChannel(const char* host, const char* port):Channel(host, port){
+ }
+ SSLStreamChannel(boost::asio::io_service& ioService, const char* connStr)
+ :Channel(ioService, connStr){
+ }
+ SSLStreamChannel(boost::asio::io_service& ioService, const char* host, const char* port)
+ :Channel(ioService, host, port){
+ }
+ connectionStatus_t init(ChannelContext_t* context);
+ };
+
+ class ChannelFactory{
+ public:
+ static Channel* getChannel(channelType_t t, const char* connStr);
+ static Channel* getChannel(channelType_t t, const char* host, const char* port);
+ static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* connStr);
+ static Channel* getChannel(channelType_t t, boost::asio::io_service& ioService, const char* host, const char* port);
+ };
+
+
+} // namespace Drill
+
+#endif // CHANNEL_HPP
+