diff options
33 files changed, 4794 insertions, 1215 deletions
diff --git a/contrib/native/client/CMakeLists.txt b/contrib/native/client/CMakeLists.txt index e61eb9c9b..7e22ce88f 100644 --- a/contrib/native/client/CMakeLists.txt +++ b/contrib/native/client/CMakeLists.txt @@ -57,10 +57,9 @@ project(drillclient ) message("Project Dir = ${PROJECT_SOURCE_DIR}") -message("Project version = ${PROJECT_VERSION} ") +message("Project Version = ${PROJECT_VERSION} ") message("Source Dir = ${CMAKE_SOURCE_DIR} ") -cmake_policy(SET CMP0043 NEW) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmakeModules/") @@ -71,6 +70,12 @@ execute_process( OUTPUT_VARIABLE GIT_COMMIT_PROP OUTPUT_STRIP_TRAILING_WHITESPACE ) +execute_process( + COMMAND git log -1 --format="%H" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + OUTPUT_VARIABLE GIT_SHA_PROP + OUTPUT_STRIP_TRAILING_WHITESPACE + ) STRING(REPLACE . " " GIT_COMMIT_PROP "${GIT_COMMIT_PROP}") STRING(REPLACE \" "" GIT_COMMIT_PROP "${GIT_COMMIT_PROP}") set(GIT_COMMIT_PROP "\"${GIT_COMMIT_PROP}\"") diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp index d507d1bb2..306db5669 100644 --- a/contrib/native/client/example/querySubmitter.cpp +++ b/contrib/native/client/example/querySubmitter.cpp @@ -316,8 +316,8 @@ int main(int argc, char* argv[]) { std::vector<Drill::RecordIterator*> recordIterators; std::vector<Drill::RecordIterator*>::iterator recordIterIter; - std::vector<Drill::QueryHandle_t*> queryHandles; - std::vector<Drill::QueryHandle_t*>::iterator queryHandleIter; + std::vector<Drill::QueryHandle_t> queryHandles; + std::vector<Drill::QueryHandle_t>::iterator queryHandleIter; Drill::DrillClient client; #if defined _WIN32 || defined _WIN64 @@ -327,7 +327,7 @@ int main(int argc, char* argv[]) { strcpy(logpathPrefix,tempPath); strcat(logpathPrefix, "\\drillclient"); #else - char* logpathPrefix = "/var/log/drill/drillclient"; + const char* logpathPrefix = "/var/log/drill/drillclient"; #endif // To log to file Drill::DrillClient::initLogging(logpathPrefix, l); @@ -411,27 +411,25 @@ int main(int argc, char* argv[]) { }else{ if(bSyncSend){ for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) { - Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t; - client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle); - client.registerSchemaChangeListener(qryHandle, SchemaListener); + Drill::QueryHandle_t qryHandle; + client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle); + client.registerSchemaChangeListener(&qryHandle, SchemaListener); client.waitForResults(); - client.freeQueryResources(qryHandle); - delete qryHandle; + client.freeQueryResources(&qryHandle); } }else{ for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) { - Drill::QueryHandle_t* qryHandle = new Drill::QueryHandle_t; - client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle); - client.registerSchemaChangeListener(qryHandle, SchemaListener); + Drill::QueryHandle_t qryHandle; + client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle); + client.registerSchemaChangeListener(&qryHandle, SchemaListener); queryHandles.push_back(qryHandle); } client.waitForResults(); for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) { - client.freeQueryResources(*queryHandleIter); - delete *queryHandleIter; + client.freeQueryResources(&*queryHandleIter); } } } diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt index a2e705273..68326e2fc 100644 --- a/contrib/native/client/src/clientlib/CMakeLists.txt +++ b/contrib/native/client/src/clientlib/CMakeLists.txt @@ -22,12 +22,14 @@ set (CLIENTLIB_SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/decimalUtils.cpp ${CMAKE_CURRENT_SOURCE_DIR}/drillClient.cpp ${CMAKE_CURRENT_SOURCE_DIR}/drillClientImpl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/fieldmeta.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/metadata.cpp ${CMAKE_CURRENT_SOURCE_DIR}/recordBatch.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/rpcEncoder.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/rpcMessage.cpp ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zookeeperClient.cpp ) include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include ) @@ -43,7 +45,7 @@ set_property( if(MSVC) set(CMAKE_CXX_FLAGS "/EHsc") - add_definitions(-DDRILL_CLIENT_EXPORTS) + add_definitions(-DDRILL_CLIENT_EXPORTS -D_SCL_SECURE_NO_WARNINGS) endif() add_library(drillClient SHARED ${CLIENTLIB_SRC_FILES} ) diff --git a/contrib/native/client/src/clientlib/collectionsImpl.hpp b/contrib/native/client/src/clientlib/collectionsImpl.hpp new file mode 100644 index 000000000..be1b54f48 --- /dev/null +++ b/contrib/native/client/src/clientlib/collectionsImpl.hpp @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DRILL_COLLECTIONSIMPL_H +#define DRILL_COLLECTIONSIMPL_H + +#include <iterator> +#include <drill/collections.hpp> + +namespace Drill { +namespace impl { +template<typename T, typename Iterator> +class DrillContainerIterator: public DrillIteratorImpl<T> { +public: + typedef DrillContainerIterator<T, Iterator> type; + typedef DrillIteratorImpl<T> supertype; + typedef typename supertype::iterator iterator; + typedef typename iterator::value_type value_type; + typedef typename iterator::reference reference; + typedef typename iterator::pointer pointer; + + DrillContainerIterator(Iterator it): supertype(), m_it(it) {}; + + operator typename DrillIteratorImpl<const T>::iterator_ptr() const { return typename DrillIteratorImpl<const T>::iterator_ptr(new DrillContainerIterator<const T, Iterator>(m_it)); } + + reference operator*() const { return m_it.operator *();} + pointer operator->() const { return m_it.operator->(); } + + iterator& operator++() { m_it++; return *this; } + + bool operator==(const iterator& x) const { + const type& other(dynamic_cast<const type&>(x)); + return m_it == other.m_it; + } + + bool operator!=(const iterator& x) const { return !(*this==x); } + +private: + Iterator m_it; +}; + +template<typename T, typename Container> +class DrillContainerCollection: public DrillCollectionImpl<T> { +public: + typedef DrillCollectionImpl<T> supertype; + typedef typename supertype::value_type value_type; + typedef typename supertype::iterator iterator; + typedef typename supertype::const_iterator const_iterator; + + typedef typename supertype::iterator_ptr iterator_ptr; + typedef typename supertype::const_iterator_ptr const_iterator_ptr; + + DrillContainerCollection(): supertype(), m_container() {}; + + Container& operator*() { return &m_container; } + const Container& operator*() const { return &m_container; } + Container* operator->() { return &m_container; } + const Container* operator->() const { return &m_container; } + + iterator_ptr begin() { return iterator_ptr(new IteratorImpl(m_container.begin())); } + const_iterator_ptr begin() const { return const_iterator_ptr(new ConstIteratorImpl(m_container.begin())); } + iterator_ptr end() { return iterator_ptr(new IteratorImpl(m_container.end())); } + const_iterator_ptr end() const { return const_iterator_ptr(new ConstIteratorImpl(m_container.end())); } + +private: + typedef DrillContainerIterator<value_type, typename Container::iterator> IteratorImpl; + typedef DrillContainerIterator<const value_type, typename Container::const_iterator> ConstIteratorImpl; + + Container m_container; +}; +} /* namespace impl */ + + +/** + * Drill collection backed up by a vector + * Offer a view over a collection of Iface instances, + * where concrete implementation of Iface is T + */ +template<typename Iface, typename T> +class DrillVector: public DrillCollection<Iface> { +public: + DrillVector(): DrillCollection<Iface>(ImplPtr(new Impl())) {}; + + void clear() { + Impl& impl = static_cast<Impl&>(**this); + impl->clear(); + } + + void push_back( const T& value ) { + Impl& impl = static_cast<Impl&>(**this); + impl->push_back(value); + } + + void reserve(std::size_t new_cap) { + Impl& impl = static_cast<Impl&>(**this); + impl->reserve(new_cap); + } + + +private: + typedef impl::DrillContainerCollection<Iface, std::vector<T> > Impl; + typedef boost::shared_ptr<Impl> ImplPtr; +}; +} + + + +#endif /* DRILL_COLLECTIONSIMPL_H */ diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index 1251058fc..20a466e68 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -20,11 +20,11 @@ #include <boost/assign.hpp> #include "drill/common.hpp" #include "drill/drillClient.hpp" +#include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "drillClientImpl.hpp" #include "errmsgs.hpp" #include "logger.hpp" - #include "Types.pb.h" namespace Drill{ @@ -173,83 +173,86 @@ FieldDefPtr RecordIterator::getColDefs(){ if(m_pQueryResult->hasError()){ return DrillClientQueryResult::s_emptyColDefs; } + + if (this->m_pColDefs != NULL && !this->hasSchemaChanged()) { + return this->m_pColDefs; + } + //NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it. - if(this->m_pColDefs==NULL || this->hasSchemaChanged()){ - if(this->m_pCurrentRecordBatch==NULL){ - this->m_pQueryResult->waitForData(); - if(m_pQueryResult->hasError()){ - return DrillClientQueryResult::s_emptyColDefs; - } - } - if(this->hasSchemaChanged()){ - if(m_pColDefs!=NULL){ - for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin(); - it!=m_pColDefs->end(); - ++it){ - delete *it; - } - m_pColDefs->clear(); - //delete m_pColDefs; m_pColDefs=NULL; - } - } - FieldDefPtr pColDefs( new std::vector<Drill::FieldMetadata*>); - { //lock after we come out of the wait. - boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); - boost::shared_ptr< std::vector<Drill::FieldMetadata*> > currentColDefs=DrillClientQueryResult::s_emptyColDefs; - if(this->m_pCurrentRecordBatch!=NULL){ - currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs(); - }else{ - // This is reached only when the first results have been received but - // the getNext call has not been made to retrieve the record batch - RecordBatch* pR=this->m_pQueryResult->peekNext(); - if(pR!=NULL){ - currentColDefs=pR->getColumnDefs(); - } - } - for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){ - Drill::FieldMetadata* fmd= new Drill::FieldMetadata; - fmd->copy(*(*it));//Yup, that's 2 stars - pColDefs->push_back(fmd); - } - } - this->m_pColDefs = pColDefs; + if(this->m_pCurrentRecordBatch==NULL){ + this->m_pQueryResult->waitForData(); + if(m_pQueryResult->hasError()){ + return DrillClientQueryResult::s_emptyColDefs; + } + } + if(this->hasSchemaChanged()){ + if(m_pColDefs!=NULL){ + for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin(); + it!=m_pColDefs->end(); + ++it){ + delete *it; + } + m_pColDefs->clear(); + //delete m_pColDefs; m_pColDefs=NULL; + } + } + FieldDefPtr pColDefs( new std::vector<Drill::FieldMetadata*>); + { //lock after we come out of the wait. + boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); + boost::shared_ptr< std::vector<Drill::FieldMetadata*> > currentColDefs=DrillClientQueryResult::s_emptyColDefs; + if(this->m_pCurrentRecordBatch!=NULL){ + currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs(); + }else{ + // This is reached only when the first results have been received but + // the getNext call has not been made to retrieve the record batch + RecordBatch* pR=this->m_pQueryResult->peekNext(); + if(pR!=NULL){ + currentColDefs=pR->getColumnDefs(); + } + } + for(std::vector<Drill::FieldMetadata*>::const_iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){ + Drill::FieldMetadata* fmd= new Drill::FieldMetadata; + fmd->copy(*(*it));//Yup, that's 2 stars + pColDefs->push_back(fmd); + } } + this->m_pColDefs = pColDefs; return this->m_pColDefs; } status_t RecordIterator::next(){ status_t ret=QRY_SUCCESS; this->m_currentRecord++; + if(this->m_pQueryResult->isCancelled()){ + return QRY_CANCEL; + } - if(!this->m_pQueryResult->isCancelled()){ - if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){ - boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); - if(this->m_pCurrentRecordBatch !=NULL){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) - delete this->m_pCurrentRecordBatch; //free the previous record batch - this->m_pCurrentRecordBatch=NULL; - } - this->m_currentRecord=0; - this->m_pQueryResult->waitForData(); - if(m_pQueryResult->hasError()){ - return m_pQueryResult->getErrorStatus(); - } - this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); - if(this->m_pCurrentRecordBatch != NULL){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;) - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;) - } - if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;) - ret = QRY_NO_MORE_DATA; - }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){ - ret=QRY_SUCCESS_WITH_INFO; - } - } - }else{ - ret=QRY_CANCEL; + if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){ + boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); + if(this->m_pCurrentRecordBatch !=NULL){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) + delete this->m_pCurrentRecordBatch; //free the previous record batch + this->m_pCurrentRecordBatch=NULL; + } + this->m_currentRecord=0; + this->m_pQueryResult->waitForData(); + if(m_pQueryResult->hasError()){ + return m_pQueryResult->getErrorStatus(); + } + this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); + if(this->m_pCurrentRecordBatch != NULL){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;) + }else{ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;) + } + if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;) + ret = QRY_NO_MORE_DATA; + }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){ + ret=QRY_SUCCESS_WITH_INFO; + } } + return ret; } @@ -258,30 +261,28 @@ status_t RecordIterator::getCol(size_t i, void** b, size_t* sz){ //TODO: check fields out of bounds without calling getColDefs //if(i>=getColDefs().size()) return QRY_OUT_OF_BOUNDS; //return raw byte buffer - if(!this->m_pQueryResult->isCancelled()){ - const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); - if(!pVector->isNull(this->m_currentRecord)){ - *b=pVector->getRaw(this->m_currentRecord); - *sz=pVector->getSize(this->m_currentRecord); - }else{ - *b=NULL; - *sz=0; - - } - return QRY_SUCCESS; + if(this->m_pQueryResult->isCancelled()){ + return QRY_CANCEL; + } + const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); + if(!pVector->isNull(this->m_currentRecord)){ + *b=pVector->getRaw(this->m_currentRecord); + *sz=pVector->getSize(this->m_currentRecord); }else{ - return QRY_CANCEL; + *b=NULL; + *sz=0; } + return QRY_SUCCESS; } /* true if ith column in the current record is NULL. */ bool RecordIterator::isNull(size_t i){ - if(!this->m_pQueryResult->isCancelled()){ - const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); - return pVector->isNull(this->m_currentRecord); - }else{ - return false; + if(this->m_pQueryResult->isCancelled()){ + return false; } + + const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); + return pVector->isNull(this->m_currentRecord); } status_t RecordIterator::cancel(){ @@ -329,19 +330,15 @@ DrillClient::~DrillClient(){ } connectionStatus_t DrillClient::connect(const char* connectStr, const char* defaultSchema){ - connectionStatus_t ret=CONN_SUCCESS; - ret=this->m_pImpl->connect(connectStr); DrillUserProperties props; std::string schema(defaultSchema); props.setProperty(USERPROP_SCHEMA, schema); - if(ret==CONN_SUCCESS){ - if(defaultSchema!=NULL){ - ret=this->m_pImpl->validateHandshake(&props); - }else{ - ret=this->m_pImpl->validateHandshake(NULL); - } + if (defaultSchema != NULL) { + return connect(connectStr, static_cast<DrillUserProperties*>(NULL)); + } + else { + return connect(connectStr, &props); } - return ret; } connectionStatus_t DrillClient::connect(const char* connectStr, DrillUserProperties* properties){ @@ -366,14 +363,12 @@ void DrillClient::close() { } status_t DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){ - ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener, listenerCtx); + *qHandle=static_cast<QueryHandle_t>(pResult); if(pResult==NULL){ - *qHandle=NULL; return (status_t)this->m_pImpl->getError()->status; } - *qHandle=(QueryHandle_t)pResult; return QRY_SUCCESS; } @@ -387,14 +382,32 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string& return pIter; } +status_t DrillClient::prepareQuery(const std::string& sql, pfnPreparedStatementListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientPrepareHandle* pResult=this->m_pImpl->PrepareQuery(sql, listener, listenerCtx); + *qHandle=static_cast<QueryHandle_t>(pResult); + if(pResult==NULL){ + return static_cast<status_t>(this->m_pImpl->getError()->status); + } + return QRY_SUCCESS; +} + +status_t DrillClient::executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientQueryResult* pResult=this->m_pImpl->ExecuteQuery(pstmt, listener, listenerCtx); + *qHandle=static_cast<QueryHandle_t>(pResult); + if(pResult==NULL){ + return static_cast<status_t>(this->m_pImpl->getError()->status); + } + return QRY_SUCCESS; +} + void* DrillClient::getApplicationContext(QueryHandle_t handle){ assert(handle!=NULL); - return ((DrillClientQueryResult*)handle)->getListenerContext(); + return (static_cast<DrillClientQueryHandle*>(handle))->getApplicationContext(); } status_t DrillClient::getQueryStatus(QueryHandle_t handle){ assert(handle!=NULL); - return ((DrillClientQueryResult*)handle)->getQueryStatus(); + return static_cast<DrillClientQueryHandle*>(handle)->getQueryStatus(); } std::string& DrillClient::getError(){ @@ -402,7 +415,7 @@ std::string& DrillClient::getError(){ } const std::string& DrillClient::getError(QueryHandle_t handle){ - return ((DrillClientQueryResult*)handle)->getError()->msg; + return static_cast<DrillClientQueryHandle*>(handle)->getError()->msg; } void DrillClient::waitForResults(){ @@ -410,13 +423,23 @@ void DrillClient::waitForResults(){ } void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){ - if(handle!=NULL){ - ((DrillClientQueryResult*)(*handle))->registerSchemaChangeListener(l); + if (!handle) { + return; + } + + // Let's ensure that handle is really an instance of DrillClientQueryResult + // by using dynamic_cast to verify. Since void is not a class, we first have + // to static_cast to a DrillClientQueryHandle + DrillClientQueryHandle* pHandle = static_cast<DrillClientQueryHandle*>(*handle); + DrillClientQueryResult* result = dynamic_cast<DrillClientQueryResult*>(pHandle); + + if (result) { + result->registerSchemaChangeListener(l); } } void DrillClient::freeQueryResources(QueryHandle_t* handle){ - delete (DrillClientQueryResult*)(*handle); + this->m_pImpl->freeQueryResources(static_cast<DrillClientQueryHandle*>(*handle)); *handle=NULL; } @@ -424,4 +447,12 @@ void DrillClient::freeRecordBatch(RecordBatch* pRecordBatch){ delete pRecordBatch; } +Metadata* DrillClient::getMetadata() { + return this->m_pImpl->getMetadata(); +} + +void DrillClient::freeMetadata(Metadata** metadata) { + this->m_pImpl->freeMetadata(static_cast<meta::DrillMetadata*>(*metadata)); + *metadata = NULL; +} } // namespace Drill diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index b5d5a31e7..7ecf910f9 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -19,32 +19,30 @@ #include "drill/common.hpp" #include <queue> -#include <string.h> +#include <string> #include <boost/asio.hpp> +#include <boost/assign.hpp> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp> +#include <boost/functional/factory.hpp> #include <boost/lexical_cast.hpp> #include <boost/thread.hpp> -#ifdef _WIN32 -#include <zookeeper.h> -#else -#include <zookeeper/zookeeper.h> -#endif -#include <boost/assign.hpp> + #include "drill/drillClient.hpp" +#include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "drillClientImpl.hpp" +#include "collectionsImpl.hpp" #include "errmsgs.hpp" #include "logger.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" +#include "metadata.hpp" #include "rpcMessage.hpp" #include "utils.hpp" - #include "GeneralRPC.pb.h" #include "UserBitShared.pb.h" +#include "zookeeperClient.hpp" namespace Drill{ @@ -56,70 +54,57 @@ static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_ST (exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED) ; -RpcEncoder DrillClientImpl::s_encoder; -RpcDecoder DrillClientImpl::s_decoder; - -std::string debugPrintQid(const exec::shared::QueryId& qid){ +static std::string debugPrintQid(const exec::shared::QueryId& qid){ return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] "); } -void setSocketTimeout(boost::asio::ip::tcp::socket& socket, int32_t timeout){ -#if defined _WIN32 - int32_t timeoutMsecs=timeout*1000; - setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs)); - setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeoutMsecs, sizeof(timeoutMsecs)); -#else - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - int e=0; - e=setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - e=setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); -#endif -} - connectionStatus_t DrillClientImpl::connect(const char* connStr){ std::string pathToDrill, protocol, hostPortStr; std::string host; std::string port; - if(!this->m_bIsConnected){ - m_connectStr=connStr; - Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); - if(!strcmp(protocol.c_str(), "zk")){ - ZookeeperImpl zook; - std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + + if (this->m_bIsConnected) { + if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected + return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); + } + return CONN_SUCCESS; + } + + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + if(protocol == "zk"){ + ZookeeperClient zook(pathToDrill); + std::vector<std::string> drillbits; + int err = zook.getAllDrillbits(hostPortStr, drillbits); + if(!err){ + Utils::shuffle(drillbits); + exec::DrillbitEndpoint endpoint; + err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list if(!err){ - Utils::shuffle(drillbits); - exec::DrillbitEndpoint endpoint; - err = zook.getEndPoint(drillbits, drillbits.size()-1, endpoint);// get the last one in the list - if(!err){ - host=boost::lexical_cast<std::string>(endpoint.address()); - port=boost::lexical_cast<std::string>(endpoint.user_port()); - } + host=boost::lexical_cast<std::string>(endpoint.address()); + port=boost::lexical_cast<std::string>(endpoint.user_port()); } - if(err){ - return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); - } - zook.close(); - m_bIsDirectConnection=true; - }else if(!strcmp(protocol.c_str(), "local")){ - boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant - char tempStr[MAX_CONNECT_STR+1]; - strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; - host=strtok(tempStr, ":"); - port=strtok(NULL, ""); - m_bIsDirectConnection=false; - }else{ - return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;) + } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) - connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); - return ret; - }else if(std::strcmp(connStr, m_connectStr.c_str())){ // tring to connect to a different address is not allowed if already connected - return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); + if(err){ + return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); + } + zook.close(); + m_bIsDirectConnection=true; + }else if(protocol == "local"){ + boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant + char tempStr[MAX_CONNECT_STR+1]; + strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; + host=strtok(tempStr, ":"); + port=strtok(NULL, ""); + m_bIsDirectConnection=false; + }else{ + return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); } - return CONN_SUCCESS; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) + connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + return ret; } connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ @@ -140,7 +125,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str())); } - }catch(std::exception e){ + }catch(const std::exception & e){ // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve if (!strcmp(e.what(), "resolve")) { return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what())); @@ -152,7 +137,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ // set socket keep alive boost::asio::socket_base::keep_alive keepAlive(true); m_socket.set_option(keepAlive); - // set no_delay + // set no_delay boost::asio::ip::tcp::no_delay noDelay(true); m_socket.set_option(noDelay); @@ -160,7 +145,7 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ connectedHost << "id: " << m_socket.native_handle() << " address: " << host << ":" << port; m_connectedHost = connectedHost.str(); DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << m_connectedHost << std::endl;) - + return CONN_SUCCESS; } @@ -180,7 +165,7 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){ connectionStatus_t status=CONN_SUCCESS; exec::rpc::Ack ack; ack.set_ok(true); - OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); + rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack); boost::lock_guard<boost::mutex> prLock(this->m_prMutex); boost::lock_guard<boost::mutex> lock(m_dcMutex); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;) @@ -203,7 +188,7 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & e DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;) if(err != boost::asio::error::operation_aborted){ // Check whether the deadline has passed. - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: " << to_simple_string(m_heartbeatTimer.expires_at()) << " and time now is: " << to_simple_string(boost::asio::deadline_timer::traits_type::now()) @@ -231,8 +216,8 @@ void DrillClientImpl::Close() { } -connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){ - DrillClientImpl::s_encoder.Encode(m_wbuf, msg); +connectionStatus_t DrillClientImpl::sendSync(rpc::OutBoundRpcMessage& msg){ + encode(m_wbuf, msg); boost::system::error_code ec; size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec); if(!ec && s!=0){ @@ -292,9 +277,9 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, m_deadlineTimer.cancel(); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;) if(!error){ - InBoundRpcMessage msg; + rpc::InBoundRpcMessage msg; uint32_t length = 0; - int bytes_read = DrillClientImpl::s_decoder.LengthDecode(m_rbuf, &length); + std::size_t bytes_read = rpc::lengthDecode(m_rbuf, length); if(length>0){ size_t leftover = LEN_PREFIX_BUFLEN - bytes_read; ByteBuf_t b=m_rbuf + LEN_PREFIX_BUFLEN; @@ -309,7 +294,11 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, bytesToRead-=dataBytesRead; b+=dataBytesRead; } - DrillClientImpl::s_decoder.Decode(m_rbuf+bytes_read, length, msg); + if (!decode(m_rbuf+bytes_read, length, msg)) { + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";) + handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake")); + return; + } }else{ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";) handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake")); @@ -321,6 +310,7 @@ void DrillClientImpl::handleHandshake(ByteBuf_t _buf, this->m_handshakeStatus=b2u.status(); this->m_handshakeErrorId=b2u.errorid(); this->m_handshakeErrorMsg=b2u.errormessage(); + this->m_serverInfos = b2u.server_infos(); }else{ // boost error @@ -362,6 +352,14 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope u2b.set_support_listening(true); u2b.set_support_timeout(true); + // Adding version info + exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos(); + infos->set_name(DRILL_CONNECTOR_NAME); + infos->set_version(DRILL_VERSION_STRING); + infos->set_majorversion(DRILL_VERSION_MAJOR); + infos->set_minorversion(DRILL_VERSION_MINOR); + infos->set_patchversion(DRILL_VERSION_PATCH); + if(properties != NULL && properties->size()>0){ std::string username; std::string err; @@ -374,7 +372,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope for(size_t i=0; i<properties->size(); i++){ std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(properties->keyAt(i)); if(it==DrillUserProperties::USER_PROPERTIES.end()){ - DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) + DRILL_MT_LOG(DRILL_LOG(LOG_WARNING) << "Connection property ("<< properties->keyAt(i) << ") is unknown and is being skipped" << std::endl;) continue; } @@ -402,7 +400,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope boost::lock_guard<boost::mutex> lock(this->m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); - OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); + rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b); sendSync(out_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";) } @@ -469,38 +467,159 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t query.set_type(t); query.set_plan(plan); - uint64_t coordId; - DrillClientQueryResult* pQuery=NULL; + boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientQueryResult*>(), + boost::ref(*this), + _1, + boost::cref(plan), + l, + lCtx); + return sendMsg(factory, ::exec::user::RUN_QUERY, query); +} + +DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan, + pfnPreparedStatementListener l, + void* lCtx){ + exec::user::CreatePreparedStatementReq query; + query.set_sql_query(plan); + + boost::function<DrillClientPrepareHandle*(int32_t)> factory = boost::bind( + boost::factory<DrillClientPrepareHandle*>(), + boost::ref(*this), + _1, + boost::cref(plan), + l, + lCtx); + return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query); +} + +DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, + pfnQueryResultsListener l, + void* lCtx){ + const DrillClientPrepareHandle& handle = static_cast<const DrillClientPrepareHandle&>(pstmt); + + exec::user::RunQuery query; + query.set_results_mode(exec::user::STREAM_FULL); + query.set_type(::exec::shared::PREPARED_STATEMENT); + query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle)); + + boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientQueryResult*>(), + boost::ref(*this), + _1, + boost::cref(handle.m_query), + l, + lCtx); + return sendMsg(factory, ::exec::user::RUN_QUERY, query); +} + +DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern, + Metadata::pfnCatalogMetadataListener listener, + void* listenerCtx) { + exec::user::GetCatalogsReq query; + exec::user::LikeFilter* catalogFilter(query.mutable_catalog_name_filter()); + catalogFilter->set_pattern(catalogPattern); + + boost::function<DrillClientCatalogResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientCatalogResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_CATALOGS, query); +} + +DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern, + const std::string& schemaPattern, + Metadata::pfnSchemaMetadataListener listener, + void* listenerCtx) { + exec::user::GetSchemasReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + + boost::function<DrillClientSchemaResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientSchemaResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_SCHEMAS, query); +} + +DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern, + const std::string& schemaPattern, + const std::string& tablePattern, + const std::vector<std::string>* tableTypes, + Metadata::pfnTableMetadataListener listener, + void* listenerCtx) { + exec::user::GetTablesReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + query.mutable_table_name_filter()->set_pattern(tablePattern); + if (tableTypes) { + std::copy(tableTypes->begin(), tableTypes->end(), + google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter())); + } + + boost::function<DrillClientTableResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientTableResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_TABLES, query); +} + +DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern, + const std::string& schemaPattern, + const std::string& tablePattern, + const std::string& columnsPattern, + Metadata::pfnColumnMetadataListener listener, + void* listenerCtx) { + exec::user::GetColumnsReq query; + query.mutable_catalog_name_filter()->set_pattern(catalogPattern); + query.mutable_schema_name_filter()->set_pattern(schemaPattern); + query.mutable_table_name_filter()->set_pattern(tablePattern); + query.mutable_column_name_filter()->set_pattern(columnsPattern); + + boost::function<DrillClientColumnResult*(int32_t)> factory = boost::bind( + boost::factory<DrillClientColumnResult*>(), + boost::ref(*this), + _1, + listener, + listenerCtx); + return sendMsg(factory, ::exec::user::GET_COLUMNS, query); +} + +template<typename Handle> +Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) { + int32_t coordId; + Handle* phandle=NULL; connectionStatus_t cStatus=CONN_SUCCESS; { boost::lock_guard<boost::mutex> prLock(this->m_prMutex); boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex); coordId = this->getNextCoordinationId(); - OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query); + rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message); - // Create the result object and register the listener before we send the query - // because sometimes the caller is not checking the status of the submitQuery call. - // This way, the broadcast error call will cause the results listener to be called - // with a COMM_ERROR status. - pQuery = new DrillClientQueryResult(this, coordId, plan); - pQuery->registerListener(l, lCtx); - this->m_queryIds[coordId]=pQuery; + phandle = handleFactory(coordId); + this->m_queryHandles[coordId]=phandle; connectionStatus_t cStatus=sendSync(out_msg); if(cStatus == CONN_SUCCESS){ bool sendRequest=false; - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent query " << "Coordination id = " << coordId << " query: " << plan << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;) if(m_pendingRequests++==0){ sendRequest=true; }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server" << std::endl;) + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;) } if(sendRequest){ - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of pending requests = " + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = " << m_pendingRequests << std::endl;) getNextResult(); // async wait for results } @@ -508,21 +627,18 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t } if(cStatus!=CONN_SUCCESS){ - this->m_queryIds.erase(coordId); - delete pQuery; + this->m_queryHandles.erase(coordId); + delete phandle; return NULL; } - - //run this in a new thread startMessageListener(); - return pQuery; + return phandle; } void DrillClientImpl::getNextResult(){ - // This call is always made from within a function where the mutex has already been acquired //boost::lock_guard<boost::mutex> lock(this->m_dcMutex); @@ -533,7 +649,7 @@ void DrillClientImpl::getNextResult(){ AllocatedBuffer::s_memCV.wait(memLock); } } - + //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); if (DrillClientConfig::getQueryTimeout() > 0){ @@ -577,8 +693,7 @@ void DrillClientImpl::waitForResults(){ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, - boost::system::error_code& error){ + rpc::InBoundRpcMessage& msg){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer " << reinterpret_cast<int*>(_buf) << std::endl;) @@ -590,7 +705,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, // We need to protect the readLength and read buffer, and the pending requests counter, // but we don't have to keep the lock while we decode the rest of the buffer. boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - int bytes_read = DrillClientImpl::s_decoder.LengthDecode(_buf, &rmsgLen); + std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read << std::endl;) DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << std::endl;) @@ -612,7 +727,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, << (rmsgLen - leftover) << std::endl;) ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; - + boost::system::error_code error; while(1){ size_t dataBytesRead=this->m_socket.read_some( boost::asio::buffer(b, bytesToRead), @@ -623,10 +738,14 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, bytesToRead-=dataBytesRead; b+=dataBytesRead; } - + if(!error){ // read data successfully - DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); + if (!decode(currentBuffer->m_pBuffer, rmsgLen, msg)) { + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); + return handleQryError(QRY_COMM_ERROR, + getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);; + } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;) }else{ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); @@ -645,7 +764,7 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, return QRY_SUCCESS; } -status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; exec::shared::QueryId qid; @@ -657,15 +776,15 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;) qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;) - + qid.CopyFrom(qr.query_id()); - + if (qr.has_query_state() && qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING && qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) { pDrillClientQueryResult=findQueryResult(qid); - //Queries that have been cancelled or whose resources are freed before completion - //do not have a DrillClientQueryResult object. We need not handle the terminal message + //Queries that have been cancelled or whose resources are freed before completion + //do not have a DrillClientQueryResult object. We need not handle the terminal message //in that case since all it does is to free resources (and they have already been freed) if(pDrillClientQueryResult!=NULL){ //Validate the RPC message @@ -703,10 +822,10 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer return ret; } -status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; - exec::shared::QueryId qid; + ::exec::shared::QueryId qid; // Be a good client and send ack as early as possible. // Drillbit pushed the query result to the client, the client should send ack // whenever it receives the message @@ -720,7 +839,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;) - qid.CopyFrom(qr->query_id()); + qid = ::exec::shared::QueryId(qr->query_id()); if(qid.part1()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;) delete allocatedBuffer; @@ -729,13 +848,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult=findQueryResult(qid); if(pDrillClientQueryResult==NULL){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query (" << debugPrintQid(qid) << ")." << std::endl;) delete qr; delete allocatedBuffer; return ret; } - + //Validate the RPC message std::string valErr; if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){ @@ -765,20 +884,13 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, } pDrillClientQueryResult->setIsQueryPending(true); - pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener; if(pDrillClientQueryResult->m_bIsLastChunk){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << "Received last batch. " << std::endl;) ret=QRY_NO_MORE_DATA; } pDrillClientQueryResult->setQueryStatus(ret); - if(pResultsListener!=NULL){ - ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL); - }else{ - //Use a default callback that is called when a record batch is received - ret = pDrillClientQueryResult->defaultQueryResultsListener(pDrillClientQueryResult, - pRecordBatch, NULL); - } + ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL); } // release lock if(ret==QRY_FAILURE){ sendCancel(&qid); @@ -787,31 +899,37 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, pDrillClientQueryResult->setIsQueryPending(false); DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;) pDrillClientQueryResult->setQueryStatus(ret); - clearMapEntries(pDrillClientQueryResult); + removeQueryHandle(pDrillClientQueryResult); + removeQueryResult(pDrillClientQueryResult); return ret; } return ret; } -status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ - DrillClientQueryResult* pDrillClientQueryResult=NULL; +status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;) status_t ret=QRY_SUCCESS; + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); boost::lock_guard<boost::mutex> lock(m_dcMutex); - std::map<int,DrillClientQueryResult*>::iterator it; - for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ - std::string qidString = it->second->m_pQueryId!=NULL?debugPrintQid(*it->second->m_pQueryId):std::string("NULL"); - DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << it->first + for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){ + DrillClientQueryResult* pDrillClientQueryResult=it->second; + std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL"); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pDrillClientQueryResult->m_coordinationId << " QueryId: "<< qidString << std::endl;) } if(msg.m_coord_id==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) return QRY_SUCCESS; } - it=this->m_queryIds.find(msg.m_coord_id); - if(it!=this->m_queryIds.end()){ - pDrillClientQueryResult=(*it).second; + std::map<int, DrillClientQueryHandle*>::const_iterator it; + it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second); + if (!pDrillClientQueryResult) { + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } exec::shared::QueryId *qid = new exec::shared::QueryId; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;) qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()); @@ -820,14 +938,241 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); }else{ - delete allocatedBuffer; return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } - delete allocatedBuffer; return ret; } -DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& qid){ +status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second); + exec::user::CreatePreparedStatementResp resp; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;) + if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle); + } + if (resp.has_status() && resp.status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle); + } + pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement()); + pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second); + exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp; + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + + const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->catalogs_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) { + meta::DrillCatalogMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result - " << resp->catalogs_size() << " catalog(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second); + exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + + const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->schemas_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) { + meta::DrillSchemaMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second); + exec::user::GetTablesResp* resp = new exec::user::GetTablesResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->tables_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) { + meta::DrillTableMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){ + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;) + status_t ret=QRY_SUCCESS; + + // make sure to deallocate buffer + boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer); + boost::lock_guard<boost::mutex> lock(m_dcMutex); + + if(msg.m_coord_id==0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;) + return QRY_SUCCESS; + } + std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id); + if(it!=this->m_queryHandles.end()){ + DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second); + exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp(); + pHandle->attachMetadataResult(resp); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;) + if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) { + return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle); + } + if (resp->status() != exec::user::OK) { + return handleQryError(QRY_FAILED, resp->error(), pHandle); + } + const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns(); + pHandle->m_meta.clear(); + pHandle->m_meta.reserve(resp->columns_size()); + + for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { + meta::DrillColumnMetadata meta(*it); + pHandle->m_meta.push_back(meta); + } + pHandle->notifyListener(&pHandle->m_meta, NULL); + DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;) + }else{ + return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); + } + m_pendingRequests--; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;) + if(m_pendingRequests==0){ + // signal any waiting client that it can exit because there are no more any query results to arrive. + // We keep the heartbeat going though. + m_cv.notify_one(); + } + return ret; +} + +DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){ DrillClientQueryResult* pDrillClientQueryResult=NULL; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;) std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; @@ -838,7 +1183,7 @@ DrillClientQueryResult* DrillClientImpl::findQueryResult(exec::shared::QueryId& << it->first->part2() << "]\n";) } } - it=this->m_queryResults.find(&qid); + it=this->m_queryResults.find(const_cast<exec::shared::QueryId * const>(&qid)); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " << @@ -925,9 +1270,8 @@ void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){ } void DrillClientImpl::handleRead(ByteBuf_t _buf, - const boost::system::error_code& err, + const boost::system::error_code& error, size_t bytes_transferred) { - boost::system::error_code error=err; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer " << reinterpret_cast<int*>(_buf) << std::endl;) if(DrillClientConfig::getQueryTimeout() > 0){ @@ -935,120 +1279,153 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";) m_deadlineTimer.cancel(); } - if(!error){ - InBoundRpcMessage msg; - boost::lock_guard<boost::mutex> lock(this->m_prMutex); + if (error) { + // boost error + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " + "Boost Communication Error: " << error.message() << std::endl;) + handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); + return; + } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) - AllocatedBufferPtr allocatedBuffer=NULL; + rpc::InBoundRpcMessage msg; + boost::lock_guard<boost::mutex> lockPR(this->m_prMutex); - if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;) + AllocatedBufferPtr allocatedBuffer=NULL; + + if(readMsg(_buf, &allocatedBuffer, msg)!=QRY_SUCCESS){ + delete allocatedBuffer; + if(m_pendingRequests!=0){ + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + getNextResult(); } + return; + } - if(!error && msg.m_mode==exec::rpc::PONG){ //heartbeat response. Throw it away - m_pendingRequests--; + if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away + m_pendingRequests--; + delete allocatedBuffer; + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) + if(m_pendingRequests!=0){ + boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + getNextResult(); + }else{ + boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) + m_cv.notify_one(); + } + + return; + } + + if(msg.m_mode == exec::rpc::RESPONSE) { + status_t s; + switch(msg.m_rpc_type) { + case exec::user::QUERY_HANDLE: + s = processQueryId(allocatedBuffer, msg); + break; + + case exec::user::PREPARED_STATEMENT: + s = processPreparedStatement(allocatedBuffer, msg); + break; + + case exec::user::CATALOGS: + s = processCatalogsResult(allocatedBuffer, msg); + break; + + case exec::user::SCHEMAS: + s = processSchemasResult(allocatedBuffer, msg); + break; + + case exec::user::TABLES: + s = processTablesResult(allocatedBuffer, msg); + break; + + case exec::user::COLUMNS: + s = processColumnsResult(allocatedBuffer, msg); + break; + + case exec::user::HANDSHAKE: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) delete allocatedBuffer; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;) - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - }else{ - boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;) - m_cv.notify_one(); - } - return; - }else if(!error && msg.m_rpc_type==exec::user::QUERY_RESULT){ - status_t s = processQueryResult(allocatedBuffer, msg); - if(s !=QRY_SUCCESS && s!= QRY_NO_MORE_DATA){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::QUERY_DATA){ - if(processQueryData(allocatedBuffer, msg)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::QUERY_HANDLE){ - if(processQueryId(allocatedBuffer, msg)!=QRY_SUCCESS){ - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - } - }else if(!error && msg.m_rpc_type==exec::user::ACK){ + break; + + case exec::user::ACK: // Cancel requests will result in an ACK sent back. // Consume silently + s = QRY_CANCELED; delete allocatedBuffer; - if(m_pendingRequests!=0){ - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - getNextResult(); - } - return; - }else{ + break; + + default: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) + delete allocatedBuffer; + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); + } + + if (m_pendingRequests != 0) { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - if(error){ - // We have a socket read error, but we do not know which query this is for. - // Signal ALL pending queries that they should stop waiting. - delete allocatedBuffer; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "read error: " << error << std::endl;) - handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); - return; - }else{ - // If not QUERY_RESULT, then we think something serious has gone wrong? - // In one case when the client hung, we observed that the server was sending a handshake request to the client - // We should properly handle these handshake requests/responses - if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){ - if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) - exec::user::UserToBitHandshake u2b; - u2b.set_channel(exec::shared::USER); - u2b.set_rpc_version(DRILL_RPC_VERSION); - u2b.set_support_listening(true); - OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); - sendSync(out_msg); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";) - } - }else{ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " - << "QueryResult returned " << msg.m_rpc_type << std::endl;) - handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); - } - delete allocatedBuffer; - return; + getNextResult(); + } + + return; + } + + if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) { + status_t s; + switch(msg.m_rpc_type) { + case exec::user::QUERY_RESULT: + s = processQueryResult(allocatedBuffer, msg); + break; + + case exec::user::QUERY_DATA: + s = processQueryData(allocatedBuffer, msg); + break; + + case exec::user::HANDSHAKE: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";) + delete allocatedBuffer; + // In one case when the client hung, we observed that the server was sending a handshake request to the client + // We should properly handle these handshake requests/responses + { + boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex); + exec::user::UserToBitHandshake u2b; + u2b.set_channel(exec::shared::USER); + u2b.set_rpc_version(DRILL_RPC_VERSION); + u2b.set_support_listening(true); + rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); + sendSync(out_msg); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";) } + break; + + default: + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl;) + delete allocatedBuffer; + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); } - { + + if (m_pendingRequests != 0) { boost::lock_guard<boost::mutex> lock(this->m_dcMutex); getNextResult(); } - }else{ - // boost error - Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); - boost::lock_guard<boost::mutex> lock(this->m_dcMutex); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. " - "Boost Communication Error: " << error.message() << std::endl;) - handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); + return; } - return; + + // If not QUERY_RESULT, then we think something serious has gone wrong? + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;) + handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); + delete allocatedBuffer; + } -status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valErr){ +status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; @@ -1060,7 +1437,7 @@ status_t DrillClientImpl::validateDataMessage(InBoundRpcMessage& msg, exec::shar return QRY_SUCCESS; } -status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valErr){ +status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){ if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){ valErr=getMessage(ERR_QRY_RESPFAIL); return QRY_FAILURE; @@ -1072,10 +1449,10 @@ status_t DrillClientImpl::validateResultMessage(InBoundRpcMessage& msg, exec::sh return QRY_SUCCESS; } -connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){ +connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg); m_pendingRequests=0; - if(!m_queryIds.empty()){ + if(!m_queryHandles.empty()){ // set query error only if queries are running broadcastError(pErr); }else{ @@ -1086,12 +1463,12 @@ connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, s return status; } -status_t DrillClientImpl::handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult){ +status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){ DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg); // set query error only if queries are running - if(pQueryResult!=NULL){ + if(pQueryHandle!=NULL){ m_pendingRequests--; - pQueryResult->signalError(pErr); + pQueryHandle->signalError(pErr); }else{ m_pendingRequests=0; broadcastError(pErr); @@ -1101,27 +1478,27 @@ status_t DrillClientImpl::handleQryError(status_t status, std::string msg, Drill status_t DrillClientImpl::handleQryError(status_t status, const exec::shared::DrillPBError& e, - DrillClientQueryResult* pQueryResult){ - assert(pQueryResult!=NULL); + DrillClientQueryHandle* pQueryHandle){ + assert(pQueryHandle!=NULL); DrillClientError* pErr = DrillClientError::getErrorObject(e); - pQueryResult->signalError(pErr); + pQueryHandle->signalError(pErr); m_pendingRequests--; return status; } void DrillClientImpl::broadcastError(DrillClientError* pErr){ if(pErr!=NULL){ - std::map<int, DrillClientQueryResult*>::iterator iter; - if(!m_queryIds.empty()){ - for(iter = m_queryIds.begin(); iter != m_queryIds.end(); iter++) { + std::map<int, DrillClientQueryHandle*>::const_iterator iter; + if(!m_queryHandles.empty()){ + for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) { DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg); iter->second->signalError(err); } } delete pErr; } - // We have an error at the connection level. Cancel the heartbeat. - // And close the connection + // We have an error at the connection level. Cancel the heartbeat. + // And close the connection m_heartbeatTimer.cancel(); m_pendingRequests=0; m_cv.notify_one(); @@ -1132,7 +1509,7 @@ void DrillClientImpl::broadcastError(DrillClientError* pErr){ // The implementation is similar to handleQryError status_t DrillClientImpl::handleTerminatedQryState( status_t status, - std::string msg, + const std::string& msg, DrillClientQueryResult* pQueryResult){ assert(pQueryResult!=NULL); if(status==QRY_COMPLETED){ @@ -1145,21 +1522,22 @@ status_t DrillClientImpl::handleTerminatedQryState( return status; } - -void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ - std::map<int, DrillClientQueryResult*>::iterator iter; +void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){ boost::lock_guard<boost::mutex> lock(m_dcMutex); - if(!m_queryIds.empty()){ - for(iter=m_queryIds.begin(); iter!=m_queryIds.end(); iter++) { - if(pQueryResult==(DrillClientQueryResult*)iter->second){ - m_queryIds.erase(iter->first); + if(!m_queryHandles.empty()){ + for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) { + if(pQueryHandle==(DrillClientQueryHandle*)iter->second){ + m_queryHandles.erase(iter->first); break; } } } +} + +void DrillClientImpl::removeQueryResult(DrillClientQueryResult* pQueryResult){ + boost::lock_guard<boost::mutex> lock(m_dcMutex); if(!m_queryResults.empty()){ - std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; - for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { + for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) { if(pQueryResult==(DrillClientQueryResult*)it->second){ m_queryResults.erase(it->first); break; @@ -1168,19 +1546,19 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){ } } -void DrillClientImpl::sendAck(InBoundRpcMessage& msg, bool isOk){ +void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){ exec::rpc::Ack ack; ack.set_ok(isOk); - OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); + rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack); boost::lock_guard<boost::mutex> lock(m_dcMutex); sendSync(ack_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;) } -void DrillClientImpl::sendCancel(exec::shared::QueryId* pQueryId){ +void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){ boost::lock_guard<boost::mutex> lock(m_dcMutex); uint64_t coordId = this->getNextCoordinationId(); - OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); + rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId); sendSync(cancel_msg); DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;) } @@ -1193,6 +1571,14 @@ void DrillClientImpl::shutdownSocket(){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;) } +meta::DrillMetadata* DrillClientImpl::getMetadata() { + return new meta::DrillMetadata(*this); +} + +void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { + delete metadata; +} + // This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this // class are used by the async callbacks. status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) { @@ -1254,7 +1640,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, //ctx; // unused, we already have the this pointer DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;) //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server. - if(this->m_bCancel){ + if(this->isCancelled()){ if(b!=NULL) delete b; return QRY_FAILURE; } @@ -1284,7 +1670,7 @@ RecordBatch* DrillClientQueryResult::peekNext(){ //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } // READ but not remove first element from queue @@ -1305,7 +1691,7 @@ RecordBatch* DrillClientQueryResult::getNext() { } DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;) - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending){ + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){ this->m_cv.wait(cvLock); } // remove first element from queue @@ -1322,33 +1708,60 @@ void DrillClientQueryResult::waitForData() { boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex); //if no more data, return NULL; if(!m_bIsQueryPending) return; - while(!this->m_bHasData && !m_bHasError && m_bIsQueryPending) { + while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) { this->m_cv.wait(cvLock); } } -void DrillClientQueryResult::cancel() { +template<typename Listener, typename Value> +status_t DrillClientBaseHandle<Listener, Value>::notifyListener(Value v, DrillClientError* pErr){ + return m_pApplicationListener(getApplicationContext(), v, pErr); +} + +void DrillClientQueryHandle::cancel() { this->m_bCancel=true; } -void DrillClientQueryResult::signalError(DrillClientError* pErr){ +void DrillClientQueryHandle::signalError(DrillClientError* pErr){ // Ignore return values from the listener. if(pErr!=NULL){ if(m_pError!=NULL){ delete m_pError; m_pError=NULL; } m_pError=pErr; - pfnQueryResultsListener pResultsListener=this->m_pResultsListener; - if(pResultsListener!=NULL){ - pResultsListener(this, NULL, pErr); - }else{ - defaultQueryResultsListener(this, NULL, pErr); - } + // TODO should it be protected by m_cvMutex? + m_bHasError=true; + } + return; +} + +template<typename Listener, typename Value> +void DrillClientBaseHandle<Listener, Value>::signalError(DrillClientError* pErr){ + DrillClientQueryHandle::signalError(pErr); + // Ignore return values from the listener. + if(pErr!=NULL){ + this->notifyListener(NULL, pErr); + } +} + +status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) { + pfnQueryResultsListener pResultsListener=getApplicationListener(); + if(pResultsListener!=NULL){ + return pResultsListener(this, batch, pErr); + }else{ + return defaultQueryResultsListener(this, batch, pErr); + } +} + +void DrillClientQueryResult::signalError(DrillClientError* pErr){ + DrillClientQueryHandle::signalError(pErr); + // Ignore return values from the listener. + if(pErr!=NULL){ + this->notifyListener(NULL, pErr); { boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); m_bIsQueryPending=false; m_bHasData=false; - m_bHasError=true; } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); @@ -1357,24 +1770,27 @@ void DrillClientQueryResult::signalError(DrillClientError* pErr){ } void DrillClientQueryResult::signalComplete(){ - pfnQueryResultsListener pResultsListener=this->m_pResultsListener; - if(pResultsListener!=NULL){ - pResultsListener(this, NULL, NULL); - }else{ - defaultQueryResultsListener(this, NULL, NULL); - } + this->notifyListener(NULL, NULL); { boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); - m_bIsQueryPending=false; m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED); - m_bHasError=false; + resetError(); } //Signal the cv in case there is a client waiting for data already. m_cv.notify_one(); return; } +void DrillClientQueryHandle::clearAndDestroy(){ + //Tell the parent to remove this from its lists + m_client.removeQueryHandle(this); + + if(m_pError!=NULL){ + delete m_pError; m_pError=NULL; + } +} void DrillClientQueryResult::clearAndDestroy(){ + DrillClientQueryHandle::clearAndDestroy(); //free memory allocated for FieldMetadata objects saved in m_columnDefs; if(!m_columnDefs->empty()){ for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ @@ -1385,15 +1801,16 @@ void DrillClientQueryResult::clearAndDestroy(){ if(this->m_pQueryId!=NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;) } + //Tell the parent to remove this from its lists - m_pClient->clearMapEntries(this); + this->client().removeQueryResult(this); //clear query id map entries. if(this->m_pQueryId!=NULL){ delete this->m_pQueryId; this->m_pQueryId=NULL; } if(!m_recordBatches.empty()){ - // When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_ + // When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_ // the last chunk has been received. We eventually delete it. DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;) RecordBatch* pR=NULL; @@ -1403,11 +1820,32 @@ void DrillClientQueryResult::clearAndDestroy(){ delete pR; } } - if(m_pError!=NULL){ - delete m_pError; m_pError=NULL; +} + +status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) { + // Get columns schema information + const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns(); + for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) { + FieldMetadata* metadata = new FieldMetadata; + metadata->set(*it); + m_columnDefs->push_back(metadata); } + + // Copy server handle + this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle()); + return QRY_SUCCESS; } +void DrillClientPrepareHandle::clearAndDestroy(){ + DrillClientQueryHandle::clearAndDestroy(); + //free memory allocated for FieldMetadata objects saved in m_columnDefs; + if(!m_columnDefs->empty()){ + for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){ + delete *it; + } + m_columnDefs->clear(); + } +} connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ connectionStatus_t stat = CONN_SUCCESS; @@ -1418,9 +1856,9 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); if(!strcmp(protocol.c_str(), "zk")){ // Get a list of drillbits - ZookeeperImpl zook; + ZookeeperClient zook(pathToDrill); std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + int err = zook.getAllDrillbits(hostPortStr, drillbits); if(!err){ Utils::shuffle(drillbits); // The original shuffled order is maintained if we shuffle first and then add any missing elements @@ -1432,15 +1870,17 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ m_lastConnection++; nextIndex = (m_lastConnection)%(getDrillbitCount()); } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" << "(" << (void*)this << ")" - << ": Current counter is: " + << ": Current counter is: " << m_lastConnection << std::endl;) - err=zook.getEndPoint(m_drillbits, nextIndex, e); + err=zook.getEndPoint(m_drillbits[nextIndex], e); if(!err){ host=boost::lexical_cast<std::string>(e.address()); port=boost::lexical_cast<std::string>(e.user_port()); } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex << ">. Selected " << e.DebugString() << std::endl;) } if(err){ return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); @@ -1475,7 +1915,7 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* connectionStatus_t stat=CONN_FAILURE; // Keep a copy of the user properties if(props!=NULL){ - m_pUserProperties = new DrillUserProperties; + m_pUserProperties = boost::shared_ptr<DrillUserProperties>(new DrillUserProperties); for(size_t i=0; i<props->size(); i++){ m_pUserProperties->setProperty( props->keyAt(i), @@ -1486,10 +1926,10 @@ connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* DrillClientImpl* pDrillClientImpl = getOneConnection(); if(pDrillClientImpl != NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) - stat=pDrillClientImpl->validateHandshake(m_pUserProperties); + stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get()); } else{ - stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); + stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); } return stat; } @@ -1505,16 +1945,52 @@ DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::Query return pDrillClientQueryResult; } -void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){ - // Nothing to do. If this class ever keeps track of executing queries then it will need - // to implement this call to free any query specific resources the pool might have +DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){ + DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx); + m_queriesExecuted++; + } + return pDrillClientPrepareHandle; +} + +DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){ + DrillClientQueryResult* pDrillClientQueryResult = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx); + m_queriesExecuted++; + } + return pDrillClientQueryResult; +} + +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){ + // If this class ever keeps track of executing queries then it will need + // to implement this call to free any query specific resources the pool might have // allocated - return; + + pQryHandle->client().freeQueryResources(pQryHandle); +} + +meta::DrillMetadata* PooledDrillClientImpl::getMetadata() { + meta::DrillMetadata* metadata = NULL; + DrillClientImpl* pDrillClientImpl = getOneConnection(); + if (pDrillClientImpl != NULL) { + metadata = pDrillClientImpl->getMetadata(); + } + return metadata; +} + +void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) { + metadata->client().freeMetadata(metadata); } bool PooledDrillClientImpl::Active(){ boost::lock_guard<boost::mutex> lock(m_poolMutex); - for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + for(std::vector<DrillClientImpl*>::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ if((*it)->Active()){ return true; } @@ -1529,7 +2005,7 @@ void PooledDrillClientImpl::Close() { delete *it; } m_clientConnections.clear(); - if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} + m_pUserProperties.reset(); if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} m_lastConnection=-1; m_queriesExecuted=0; @@ -1592,7 +2068,7 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ if((ret=connect(m_connectStr.c_str()))==CONN_SUCCESS){ boost::lock_guard<boost::mutex> lock(m_poolMutex); pDrillClientImpl=m_clientConnections.back(); - ret=pDrillClientImpl->validateHandshake(m_pUserProperties); + ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get()); if(ret!=CONN_SUCCESS){ delete pDrillClientImpl; pDrillClientImpl=NULL; m_clientConnections.erase(m_clientConnections.end()); @@ -1602,251 +2078,14 @@ DrillClientImpl* PooledDrillClientImpl::getOneConnection(){ if(ret!=CONN_SUCCESS){ break; } - } // need a new connection + } // need a new connection }// while if(pDrillClientImpl==NULL){ connectionStatus_t status = CONN_NOTCONNECTED; - handleConnError(status, getMessage(status)); + handleConnError(status, getMessage(ERR_CONN_NOCONN)); } return pDrillClientImpl; } -char ZookeeperImpl::s_drillRoot[]="/drill/"; -char ZookeeperImpl::s_defaultCluster[]="drillbits1"; - -ZookeeperImpl::ZookeeperImpl(){ - m_pDrillbits=new String_vector; - m_bConnecting=true; - memset(&m_id, 0, sizeof(m_id)); -} - -ZookeeperImpl::~ZookeeperImpl(){ - delete m_pDrillbits; -} - -ZooLogLevel ZookeeperImpl::getZkLogLevel(){ - //typedef enum {ZOO_LOG_LEVEL_ERROR=1, - // ZOO_LOG_LEVEL_WARN=2, - // ZOO_LOG_LEVEL_INFO=3, - // ZOO_LOG_LEVEL_DEBUG=4 - //} ZooLogLevel; - switch(DrillClientConfig::getLogLevel()){ - case LOG_TRACE: - case LOG_DEBUG: - return ZOO_LOG_LEVEL_DEBUG; - case LOG_INFO: - return ZOO_LOG_LEVEL_INFO; - case LOG_WARNING: - return ZOO_LOG_LEVEL_WARN; - case LOG_ERROR: - case LOG_FATAL: - default: - return ZOO_LOG_LEVEL_ERROR; - } - return ZOO_LOG_LEVEL_ERROR; -} - -int ZookeeperImpl::getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits){ - uint32_t waitTime=30000; // 10 seconds - zoo_set_debug_level(getZkLogLevel()); - zoo_deterministic_conn_order(1); // enable deterministic order - struct String_vector* pDrillbits=NULL; - m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); - if(!m_zh) { - m_err = getMessage(ERR_CONN_ZKFAIL); - zookeeper_close(m_zh); - return -1; - }else{ - m_err=""; - //Wait for the completion handler to signal successful connection - boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); - while(this->m_bConnecting) { - if(!this->m_cv.timed_wait(bufferLock, timeout)){ - m_err = getMessage(ERR_CONN_ZKTIMOUT); - zookeeper_close(m_zh); - return -1; - } - } - } - if(m_state!=ZOO_CONNECTED_STATE){ - zookeeper_close(m_zh); - return -1; - } - int rc = ZOK; - if(pathToDrill==NULL || strlen(pathToDrill)==0){ - m_rootDir=s_drillRoot; - m_rootDir += s_defaultCluster; - }else{ - m_rootDir=pathToDrill; - } - - pDrillbits = new String_vector; - rc=zoo_get_children(m_zh, m_rootDir.c_str(), 0, pDrillbits); - if(rc!=ZOK){ - delete pDrillbits; - m_err=getMessage(ERR_CONN_ZKERR, rc); - zookeeper_close(m_zh); - return -1; - } - if(pDrillbits && pDrillbits->count > 0){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << pDrillbits->count << " drillbits in cluster (" - << connectStr << "/" << pathToDrill - << ")." <<std::endl;) - for(int i=0; i<pDrillbits->count; i++){ - drillbits.push_back(pDrillbits->data[i]); - } - for(int i=0; i<drillbits.size(); i++){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;) - } - } - delete pDrillbits; - return 0; -} - -int ZookeeperImpl::getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint){ - int rc = ZOK; - exec::DrillServiceInstance drillServiceInstance; - if( drillbits.size() >0){ - // pick the drillbit at 'index' - const char * bit=drillbits[index].c_str(); - std::string s; - s=m_rootDir + std::string("/") + bit; - int buffer_len=MAX_CONNECT_STR; - char buffer[MAX_CONNECT_STR+1]; - struct Stat stat; - buffer[MAX_CONNECT_STR]=0; - rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKDBITERR, rc); - zookeeper_close(m_zh); - return -1; - } - exec::DrillServiceInstance drillServiceInstance; - drillServiceInstance.ParseFromArray(buffer, buffer_len); - endpoint=drillServiceInstance.endpoint(); - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" <<index << ">. Selected " << drillServiceInstance.DebugString() << std::endl;) - }else{ - - m_err=getMessage(ERR_CONN_ZKNODBIT); - zookeeper_close(m_zh); - return -1; - } - return 0; -} - -// Deprecated -int ZookeeperImpl::connectToZookeeper(const char* connectStr, const char* pathToDrill){ - uint32_t waitTime=30000; // 10 seconds - zoo_set_debug_level(getZkLogLevel()); - zoo_deterministic_conn_order(1); // enable deterministic order - m_zh = zookeeper_init(connectStr, watcher, waitTime, 0, this, 0); - if(!m_zh) { - m_err = getMessage(ERR_CONN_ZKFAIL); - return CONN_FAILURE; - }else{ - m_err=""; - //Wait for the completion handler to signal successful connection - boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); - while(this->m_bConnecting) { - if(!this->m_cv.timed_wait(bufferLock, timeout)){ - m_err = getMessage(ERR_CONN_ZKTIMOUT); - return CONN_FAILURE; - } - } - } - if(m_state!=ZOO_CONNECTED_STATE){ - return CONN_FAILURE; - } - int rc = ZOK; - char rootDir[MAX_CONNECT_STR+1]; - if(pathToDrill==NULL || strlen(pathToDrill)==0){ - strcpy(rootDir, (char*)s_drillRoot); - strcat(rootDir, s_defaultCluster); - }else{ - strncpy(rootDir, pathToDrill, MAX_CONNECT_STR); rootDir[MAX_CONNECT_STR]=0; - } - rc=zoo_get_children(m_zh, (char*)rootDir, 0, m_pDrillbits); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKERR, rc); - zookeeper_close(m_zh); - return -1; - } - - //Let's pick a random drillbit. - if(m_pDrillbits && m_pDrillbits->count >0){ - - std::vector<std::string> randomDrillbits; - for(int i=0; i<m_pDrillbits->count; i++){ - randomDrillbits.push_back(m_pDrillbits->data[i]); - } - //Use the same random shuffle as the Java client instead of picking a drillbit at random. - //Gives much better randomization when the size of the cluster is small. - std::random_shuffle(randomDrillbits.begin(), randomDrillbits.end()); - const char * bit=randomDrillbits[0].c_str(); - std::string s; - - s=rootDir + std::string("/") + bit; - int buffer_len=MAX_CONNECT_STR; - char buffer[MAX_CONNECT_STR+1]; - struct Stat stat; - buffer[MAX_CONNECT_STR]=0; - rc= zoo_get(m_zh, s.c_str(), 0, buffer, &buffer_len, &stat); - if(rc!=ZOK){ - m_err=getMessage(ERR_CONN_ZKDBITERR, rc); - zookeeper_close(m_zh); - return -1; - } - m_drillServiceInstance.ParseFromArray(buffer, buffer_len); - }else{ - m_err=getMessage(ERR_CONN_ZKNODBIT); - zookeeper_close(m_zh); - return -1; - } - return 0; -} - -void ZookeeperImpl::close(){ - zookeeper_close(m_zh); -} - -void ZookeeperImpl::watcher(zhandle_t *zzh, int type, int state, const char *path, void* context) { - //From cli.c - - /* Be careful using zh here rather than zzh - as this may be mt code - * the client lib may call the watcher before zookeeper_init returns */ - - ZookeeperImpl* self=(ZookeeperImpl*)context; - self->m_state=state; - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_CONNECTED_STATE) { - } else if (state == ZOO_AUTH_FAILED_STATE) { - self->m_err= getMessage(ERR_CONN_ZKNOAUTH); - zookeeper_close(zzh); - self->m_zh=0; - } else if (state == ZOO_EXPIRED_SESSION_STATE) { - self->m_err= getMessage(ERR_CONN_ZKEXP); - zookeeper_close(zzh); - self->m_zh=0; - } - } - // signal the cond var - { - if (state == ZOO_CONNECTED_STATE){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) - } - boost::lock_guard<boost::mutex> bufferLock(self->m_cvMutex); - self->m_bConnecting=false; - } - self->m_cv.notify_one(); -} - -void ZookeeperImpl:: debugPrint(){ - if(m_zh!=NULL && m_state==ZOO_CONNECTED_STATE){ - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << m_drillServiceInstance.DebugString() << std::endl;) - } -} - } // namespace Drill diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index 06f37e059..f9d077957 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -21,7 +21,6 @@ #define DRILL_CLIENT_IMPL_H #include "drill/common.hpp" - // Define some BOOST defines // WIN32_SHUTDOWN_ON_TIMEOUT is defined in "drill/common.hpp" for Windows 32 bit platform #ifndef WIN32_SHUTDOWN_ON_TIMEOUT @@ -29,27 +28,25 @@ #endif //WIN32_SHUTDOWN_ON_TIMEOUT #include <algorithm> -#include <stdlib.h> -#include <time.h> #include <queue> #include <vector> -#include <boost/asio.hpp> +#include <boost/asio.hpp> #if defined _WIN32 || defined _WIN64 -#include <zookeeper.h> //Windows header files redefine 'random' #ifdef random #undef random #endif -#else -#include <zookeeper/zookeeper.h> #endif #include <boost/asio/deadline_timer.hpp> +#include <boost/function.hpp> #include <boost/thread.hpp> #include "drill/drillClient.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" +#include "drill/preparedStatement.hpp" +#include "collectionsImpl.hpp" +#include "metadata.hpp" +#include "rpcMessage.hpp" #include "utils.hpp" #include "User.pb.h" #include "UserBitShared.pb.h" @@ -57,11 +54,11 @@ namespace Drill { class DrillClientImpl; -class InBoundRpcMessage; -class OutBoundRpcMessage; + +class DrillClientQueryHandle; + +class DrillClientPrepareHandle; class RecordBatch; -class RpcEncoder; -class RpcDecoder; /* * Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl @@ -89,6 +86,8 @@ class DrillClientImplBase{ // Submits a query to a drillbit. virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0; + virtual DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx)=0; + virtual DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx)=0; //Waits as a connection has results pending virtual void waitForResults()=0; @@ -96,31 +95,109 @@ class DrillClientImplBase{ //Validates handshake at connect time. virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0; - virtual void freeQueryResources(DrillClientQueryResult* pQryResult)=0; + virtual void freeQueryResources(DrillClientQueryHandle* pQryHandle)=0; + + virtual meta::DrillMetadata* getMetadata() = 0; + virtual void freeMetadata(meta::DrillMetadata* metadata) = 0; }; -class DrillClientQueryResult{ +/** + * Base type for query handles + */ +class DrillClientQueryHandle{ friend class DrillClientImpl; public: - DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId, const std::string& query): - m_pClient(pClient), + DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context): + m_client(client), m_coordinationId(coordId), m_query(query), + m_status(QRY_SUCCESS), + m_bCancel(false), + m_bHasError(false), + m_pError(NULL), + m_pApplicationContext(context){ + }; + + virtual ~DrillClientQueryHandle(){ + clearAndDestroy(); + }; + + virtual void cancel(); + bool isCancelled() const {return m_bCancel;}; + int32_t getCoordinationId() const { return m_coordinationId;} + const std::string& getQuery() const { return m_query;} + + bool hasError() const { return m_bHasError;} + void resetError() { m_bHasError = false; } + + status_t getErrorStatus() const { return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;} + const DrillClientError* getError() const { return m_pError;} + void setQueryStatus(status_t s){ m_status = s;} + status_t getQueryStatus() const { return m_status;} + inline DrillClientImpl& client() const { return m_client; }; + + inline void* getApplicationContext() const { return m_pApplicationContext; } + + protected: + + virtual void signalError(DrillClientError* pErr); + virtual void clearAndDestroy(); + + private: + DrillClientImpl& m_client; + + int32_t m_coordinationId; + std::string m_query; + status_t m_status; + bool m_bCancel; + bool m_bHasError; + + const DrillClientError* m_pError; + + void* m_pApplicationContext; +}; + +template<typename Listener, typename ListenerValue> +class DrillClientBaseHandle: public DrillClientQueryHandle { + friend class DrillClientImpl; + public: + DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context): + DrillClientQueryHandle(client, coordId, query, context), + m_pApplicationListener(listener){ + }; + + virtual ~DrillClientBaseHandle(){ + clearAndDestroy(); + }; + + inline Listener getApplicationListener() const { return m_pApplicationListener; } + + + protected: + virtual status_t notifyListener(ListenerValue v, DrillClientError* pErr); + + virtual void signalError(DrillClientError* pErr); + void setHasError(bool hasError) { m_bHasError = hasError; } + + private: + Listener m_pApplicationListener; +}; + +class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>{ + friend class DrillClientImpl; + public: + DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx): + DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx), m_numBatches(0), m_columnDefs(new std::vector<Drill::FieldMetadata*>), m_bIsQueryPending(true), m_bIsLastChunk(false), - m_bCancel(false), m_bHasSchemaChanged(false), m_bHasData(false), - m_bHasError(false), m_queryState(exec::shared::QueryResult_QueryState_STARTING), - m_pError(NULL), m_pQueryId(NULL), - m_pSchemaListener(NULL), - m_pResultsListener(NULL), - m_pListenerCtx(NULL) { + m_pSchemaListener(NULL) { }; ~DrillClientQueryResult(){ @@ -128,20 +205,15 @@ class DrillClientQueryResult{ }; // get data asynchronously - void registerListener(pfnQueryResultsListener listener, void* listenerCtx){ - this->m_pResultsListener=listener; - this->m_pListenerCtx = listenerCtx; - } - void registerSchemaChangeListener(pfnSchemaListener l){ m_pSchemaListener=l; } - // Synchronous call to get data. Caller assumes ownership of the recod batch + // Synchronous call to get data. Caller assumes ownership of the record batch // returned and it is assumed to have been consumed. RecordBatch* getNext(); // Synchronous call to get a look at the next Record Batch. This - // call does not move the current pointer forward. Repeatied calls + // call does not move the current pointer forward. Repeated calls // to peekNext return the same value until getNext is called. RecordBatch* peekNext(); // Blocks until data is available. @@ -150,32 +222,26 @@ class DrillClientQueryResult{ // placeholder to return an empty col def vector when calls are made out of order. static FieldDefPtr s_emptyColDefs; - FieldDefPtr getColumnDefs(){ + FieldDefPtr getColumnDefs() { boost::lock_guard<boost::mutex> bufferLock(this->m_schemaMutex); return this->m_columnDefs; } - void cancel(); - bool isCancelled(){return this->m_bCancel;}; - bool hasSchemaChanged(){return this->m_bHasSchemaChanged;}; - int32_t getCoordinationId(){ return this->m_coordinationId;} - const std::string& getQuery(){ return this->m_query;} + bool hasSchemaChanged() const {return this->m_bHasSchemaChanged;}; void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;} - void* getListenerContext() {return this->m_pListenerCtx;} - exec::shared::QueryId& getQueryId(){ return *(this->m_pQueryId); } - bool hasError(){ return m_bHasError;} - status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;} - const DrillClientError* getError(){ return m_pError;} - void setQueryStatus(status_t s){ m_status = s;} - status_t getQueryStatus(){ return m_status;} + exec::shared::QueryId& getQueryId() const { return *(this->m_pQueryId); } void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;} - exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;} + exec::shared::QueryResult_QueryState getQueryState() const { return m_queryState;} void setIsQueryPending(bool isPending){ boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex); m_bIsQueryPending=isPending; } + protected: + virtual status_t notifyListener(RecordBatch* batch, DrillClientError* pErr); + virtual void signalError(DrillClientError* pErr); + virtual void clearAndDestroy(); private: status_t setupColumnDefs(exec::shared::QueryData* pQueryData); @@ -183,15 +249,7 @@ class DrillClientQueryResult{ // Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables. // Also used when a query is cancelled or when a query completed response is received. // Error object is now owned by the DrillClientQueryResult object. - void signalError(DrillClientError* pErr); void signalComplete(); - void clearAndDestroy(); - - - DrillClientImpl* m_pClient; - - int32_t m_coordinationId; - const std::string& m_query; size_t m_numBatches; // number of record batches received so far @@ -213,28 +271,90 @@ class DrillClientQueryResult{ // if m_bIsQueryPending is true, we continue to wait for results bool m_bIsQueryPending; bool m_bIsLastChunk; - bool m_bCancel; bool m_bHasSchemaChanged; bool m_bHasData; - bool m_bHasError; // state in the last query result received from the server. exec::shared::QueryResult_QueryState m_queryState; - const DrillClientError* m_pError; - exec::shared::QueryId* m_pQueryId; - status_t m_status; // Schema change listener pfnSchemaListener m_pSchemaListener; - // Results callback - pfnQueryResultsListener m_pResultsListener; +}; + +class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement { + public: + DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx): + DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx), + PreparedStatement(), + m_columnDefs(new std::vector<Drill::FieldMetadata*>) { + }; + + // PreparedStatement overrides + virtual std::size_t getNumFields() const { return m_columnDefs->size(); } + virtual const Drill::FieldMetadata& getFieldMetadata(std::size_t index) const { return *m_columnDefs->at(index);} + + protected: + virtual void clearAndDestroy(); + + private: + friend class DrillClientImpl; + status_t setupPreparedStatement(const exec::user::PreparedStatement& pstmt); + + FieldDefPtr m_columnDefs; + ::exec::user::PreparedStatementHandle m_preparedStatementHandle; +}; + +template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult> +class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> { +public: + DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx): + DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx) {} + + void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); } + +private: + friend class DrillClientImpl; + + // Meta informations returned to the user, linked to the handle + DrillVector<MetaType, MetaImpl> m_meta; + + // to keep a reference on the underlying metadata object, and + // make sure it's clean when this handle is destroyed + boost::shared_ptr<MetadataResult> m_pMetadata; + +}; + +class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp> { + friend class DrillClientImpl; +public: + DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx) {} +}; + +class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> { + friend class DrillClientImpl; +public: + DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx) {} +}; + +class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> { + friend class DrillClientImpl; +public: + DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx) {} +}; - // Listener context - void * m_pListenerCtx; +class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> { + friend class DrillClientImpl; + public: + DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx): + DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx) {} }; + class DrillClientImpl : public DrillClientImplBase{ public: DrillClientImpl(): @@ -250,7 +370,8 @@ class DrillClientImpl : public DrillClientImplBase{ m_deadlineTimer(m_io_service), m_heartbeatTimer(m_io_service), m_rbuf(NULL), - m_wbuf(MAX_SOCK_RD_BUFSIZE) + m_wbuf(MAX_SOCK_RD_BUFSIZE), + m_bIsDirectConnection(false) { m_coordinationId=rand()%1729+1; }; @@ -300,14 +421,24 @@ class DrillClientImpl : public DrillClientImplBase{ void Close() ; DrillClientError* getError(){ return m_pError;} DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); + DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx); + DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx); + void waitForResults(); connectionStatus_t validateHandshake(DrillUserProperties* props); - void freeQueryResources(DrillClientQueryResult* pQryResult){ - // Doesn't need to do anything - return; + void freeQueryResources(DrillClientQueryHandle* pQryHandle){ + delete pQryHandle; }; + + const exec::user::RpcEndpointInfos& getServerInfos() const { return m_serverInfos; } + + meta::DrillMetadata* getMetadata(); + + void freeMetadata(meta::DrillMetadata* metadata); private: + friend class meta::DrillMetadata; + friend class DrillClientQueryHandle; friend class DrillClientQueryResult; friend class PooledDrillClientImpl; @@ -327,8 +458,8 @@ class DrillClientImpl : public DrillClientImplBase{ int32_t getNextCoordinationId(){ return ++m_coordinationId; }; // send synchronous messages - //connectionStatus_t recvSync(InBoundRpcMessage& msg); - connectionStatus_t sendSync(OutBoundRpcMessage& msg); + //connectionStatus_t recvSync(rpc::InBoundRpcMessage& msg); + connectionStatus_t sendSync(rpc::OutBoundRpcMessage& msg); // handshake connectionStatus_t recvHandshake(); void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred ); @@ -340,45 +471,54 @@ class DrillClientImpl : public DrillClientImplBase{ status_t readMsg( ByteBuf_t _buf, AllocatedBufferPtr* allocatedBuffer, - InBoundRpcMessage& msg, - boost::system::error_code& error); - status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); - status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); + rpc::InBoundRpcMessage& msg); + status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); + status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg); status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr); - status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); - DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid); + status_t processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + status_t processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ); + DrillClientQueryResult* findQueryResult(const exec::shared::QueryId& qid); status_t processQueryStatusResult( exec::shared::QueryResult* qr, DrillClientQueryResult* pDrillClientQueryResult); void handleReadTimeout(const boost::system::error_code & err); void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; - status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError); - status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); - connectionStatus_t handleConnError(connectionStatus_t status, std::string msg); - status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult); - status_t handleQryError(status_t status, - const exec::shared::DrillPBError& e, - DrillClientQueryResult* pQueryResult); - // handle query state indicating query is COMPELTED or CANCELED - // (i.e., COMPELTED or CANCELED) + status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError); + status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError); + connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg); + status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle); + status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle); + // handle query state indicating query is COMPLETED or CANCELED + // (i.e., COMPLETED or CANCELED) status_t handleTerminatedQryState(status_t status, - std::string msg, + const std::string& msg, DrillClientQueryResult* pQueryResult); void broadcastError(DrillClientError* pErr); - void clearMapEntries(DrillClientQueryResult* pQueryResult); - void sendAck(InBoundRpcMessage& msg, bool isOk); - void sendCancel(exec::shared::QueryId* pQueryId); + void removeQueryHandle(DrillClientQueryHandle* pQueryHandle); + void removeQueryResult(DrillClientQueryResult* pQueryResult); + void sendAck(const rpc::InBoundRpcMessage& msg, bool isOk); + void sendCancel(const exec::shared::QueryId* pQueryId); - void shutdownSocket(); + template<typename Handle> + Handle* sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& msg); + // metadata requests + DrillClientCatalogResult* getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx); + DrillClientSchemaResult* getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx); + DrillClientTableResult* getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx); + DrillClientColumnResult* getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx); - static RpcEncoder s_encoder; - static RpcDecoder s_decoder; + void shutdownSocket(); int32_t m_coordinationId; int32_t m_handshakeVersion; exec::user::HandshakeStatus m_handshakeStatus; std::string m_handshakeErrorId; std::string m_handshakeErrorMsg; + exec::user::RpcEndpointInfos m_serverInfos; bool m_bIsConnected; std::string m_connectStr; @@ -418,8 +558,8 @@ class DrillClientImpl : public DrillClientImplBase{ // Mutex to protect drill client operations boost::mutex m_dcMutex; - // Map of coordination id to Query Ids. - std::map<int, DrillClientQueryResult*> m_queryIds; + // Map of coordination id to Query handles. + std::map<int, DrillClientQueryHandle*> m_queryHandles; // Map of query id to query result for currently executing queries std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults; @@ -431,7 +571,7 @@ class DrillClientImpl : public DrillClientImplBase{ }; inline bool DrillClientImpl::Active() { - return this->m_bIsConnected;; + return this->m_bIsConnected; } @@ -442,17 +582,17 @@ inline bool DrillClientImpl::Active() { * */ class PooledDrillClientImpl : public DrillClientImplBase{ public: - PooledDrillClientImpl(){ - m_bIsDirectConnection=false; - m_maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS; + PooledDrillClientImpl(): + m_lastConnection(-1), + m_queriesExecuted(0), + m_maxConcurrentConnections(DEFAULT_MAX_CONCURRENT_CONNECTIONS), + m_bIsDirectConnection(false), + m_pError(NULL), + m_pUserProperties() { char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV); if(maxConn!=NULL){ m_maxConcurrentConnections=atoi(maxConn); } - m_lastConnection=-1; - m_pError=NULL; - m_queriesExecuted=0; - m_pUserProperties=NULL; } ~PooledDrillClientImpl(){ @@ -460,7 +600,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{ delete *it; } m_clientConnections.clear(); - if(m_pUserProperties!=NULL){ delete m_pUserProperties; m_pUserProperties=NULL;} if(m_pError!=NULL){ delete m_pError; m_pError=NULL;} } @@ -482,15 +621,22 @@ class PooledDrillClientImpl : public DrillClientImplBase{ // Connections once added to the pool will be removed only when the DrillClient is closed. DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx); + DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx); + DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx); + //Waits as long as any one drillbit connection has results pending void waitForResults(); //Validates handshake only against the first drillbit connected to. connectionStatus_t validateHandshake(DrillUserProperties* props); - void freeQueryResources(DrillClientQueryResult* pQryResult); + void freeQueryResources(DrillClientQueryHandle* pQueryHandle); + + int getDrillbitCount() const { return m_drillbits.size();}; + + meta::DrillMetadata* getMetadata(); - int getDrillbitCount(){ return m_drillbits.size();}; + void freeMetadata(meta::DrillMetadata* metadata); private: @@ -502,9 +648,6 @@ class PooledDrillClientImpl : public DrillClientImplBase{ // is currently executing. If none, std::vector<DrillClientImpl*> m_clientConnections; boost::mutex m_poolMutex; // protect access to the vector - - //ZookeeperImpl zook; - // Use this to decide which drillbit to select next from the list of drillbits. size_t m_lastConnection; boost::mutex m_cMutex; @@ -524,44 +667,7 @@ class PooledDrillClientImpl : public DrillClientImplBase{ std::vector<std::string> m_drillbits; - DrillUserProperties* m_pUserProperties;//Keep a copy of user properties -}; - -class ZookeeperImpl{ - public: - ZookeeperImpl(); - ~ZookeeperImpl(); - static ZooLogLevel getZkLogLevel(); - // comma separated host:port pairs, each corresponding to a zk - // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 - DEPRECATED int connectToZookeeper(const char* connectStr, const char* pathToDrill); - void close(); - static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context); - void debugPrint(); - std::string& getError(){return m_err;} - const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();} - // return unshuffled list of drillbits - int getAllDrillbits(const char* connectStr, const char* pathToDrill, std::vector<std::string>& drillbits); - // picks the index drillbit and returns the corresponding endpoint object - int getEndPoint(std::vector<std::string>& drillbits, size_t index, exec::DrillbitEndpoint& endpoint); - - - private: - static char s_drillRoot[]; - static char s_defaultCluster[]; - zhandle_t* m_zh; - clientid_t m_id; - int m_state; - std::string m_err; - - struct String_vector* m_pDrillbits; - - boost::mutex m_cvMutex; - // Condition variable to signal connection callback has been processed - boost::condition_variable m_cv; - bool m_bConnecting; - exec::DrillServiceInstance m_drillServiceInstance; - std::string m_rootDir; + boost::shared_ptr<DrillUserProperties> m_pUserProperties;//Keep a copy of user properties }; } // namespace Drill diff --git a/contrib/native/client/src/clientlib/env.h.in b/contrib/native/client/src/clientlib/env.h.in index a32f1521d..746a500a4 100644 --- a/contrib/native/client/src/clientlib/env.h.in +++ b/contrib/native/client/src/clientlib/env.h.in @@ -19,6 +19,15 @@ #ifndef ENV_H #define ENV_H +#define DRILL_NAME "Apache Drill" +#define DRILL_CONNECTOR_NAME "Apache Drill C++ client" +#define DRILL_VERSION_STRING "@PROJECT_VERSION@" + +#define DRILL_VERSION_MAJOR @PROJECT_VERSION_MAJOR@ +#define DRILL_VERSION_MINOR @PROJECT_VERSION_MINOR@ +#define DRILL_VERSION_PATCH @PROJECT_VERSION_PATCH@ + +#define GIT_SHA_PROP @GIT_SHA_PROP@ #define GIT_COMMIT_PROP @GIT_COMMIT_PROP@ #endif diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp index 47d165f69..56510ec80 100644 --- a/contrib/native/client/src/clientlib/errmsgs.cpp +++ b/contrib/native/client/src/clientlib/errmsgs.cpp @@ -74,7 +74,7 @@ static Drill::ErrorMessages errorMessages[]={ std::string getMessage(uint32_t msgId, ...){ char str[10240]; std::string s; - assert(msgId <= ERR_QRY_MAX); + assert((ERR_NONE <= msgId) && (msgId < ERR_QRY_MAX)); va_list args; va_start (args, msgId); vsprintf (str, errorMessages[msgId-DRILL_ERR_START].msgFormatStr, args); diff --git a/contrib/native/client/src/clientlib/fieldmeta.cpp b/contrib/native/client/src/clientlib/fieldmeta.cpp new file mode 100644 index 000000000..d9d6bd182 --- /dev/null +++ b/contrib/native/client/src/clientlib/fieldmeta.cpp @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "drill/common.hpp" +#include "drill/fieldmeta.hpp" +#include "../protobuf/UserBitShared.pb.h" +#include "../protobuf/User.pb.h" + +namespace { +// List of SQL types as string constants +static std::string SQLAny("ANY"); +static std::string SQLArray("ARRAY"); +static std::string SQLBigint("BIGINT"); +static std::string SQLBinary("BINARY"); +static std::string SQLBoolean("BOOLEAN"); +static std::string SQLChar("CHARACTER"); +static std::string SQLDate("DATE"); +static std::string SQLDecimal("DECIMAL"); +static std::string SQLDouble("DOUBLE"); +static std::string SQLFloat("FLOAT"); +static std::string SQLInteger("INTEGER"); +static std::string SQLInterval("INTERVAL"); +static std::string SQLIntervalYearMonth("INTERVAL YEAR TO MONTH"); +static std::string SQLIntervalDaySecond("INTERVAL DAY TO SECOND"); +static std::string SQLNChar("NATIONAL CHARACTER"); +static std::string SQLNull("NULL"); +static std::string SQLMap("MAP"); +static std::string SQLSmallint("SMALLINT"); +static std::string SQLTime("TIME"); +static std::string SQLTimestamp("TIMESTAMP"); +static std::string SQLTimestampTZ("TIMESTAMP WITH TIME ZONE"); +static std::string SQLTimeTZ("TIME WITH TIME ZONE"); +static std::string SQLTinyint("TINYINT"); +static std::string SQLUnion("UNION"); +static std::string SQLVarbinary("BINARY VARYING"); +static std::string SQLVarchar("CHARACTER VARYING"); +static std::string SQLVarnchar("NATIONAL CHARACTER VARYING"); +static std::string SQLUnknown("__unknown__"); + +static const std::string& getSQLType(common::MinorType type, common::DataMode mode) { + if (mode == common::DM_REPEATED || type == common::LIST) { + return SQLArray; + } + + switch(type) { + case common::BIT: return SQLBoolean; + + case common::TINYINT: return SQLTinyint; + case common::SMALLINT: return SQLSmallint; + case common::INT: return SQLInteger; + case common::BIGINT: return SQLBigint; + case common::FLOAT4: return SQLFloat; + case common::FLOAT8: return SQLDouble; + + case common::DECIMAL9: + case common::DECIMAL18: + case common::DECIMAL28DENSE: + case common::DECIMAL28SPARSE: + case common::DECIMAL38DENSE: + case common::DECIMAL38SPARSE: return SQLDecimal; + + case common::VARCHAR: return SQLVarchar; + case common::FIXEDCHAR: return SQLChar; + + case common::VAR16CHAR: return SQLVarnchar; + case common::FIXED16CHAR: return SQLNChar; + + case common::VARBINARY: return SQLVarbinary; + case common::FIXEDBINARY: return SQLBinary; + + case common::DATE: return SQLDate; + case common::TIME: return SQLTime; + case common::TIMETZ: return SQLTimeTZ; + case common::TIMESTAMP: return SQLTimestamp; + case common::TIMESTAMPTZ: return SQLTimestampTZ; + + case common::INTERVALYEAR: return SQLIntervalYearMonth; + case common::INTERVALDAY: return SQLIntervalDaySecond; + case common::INTERVAL: return SQLInterval; + case common::MONEY: return SQLDecimal; + + case common::MAP: return SQLMap; + case common::LATE: return SQLAny; + case common::DM_UNKNOWN: return SQLNull; + case common::UNION: return SQLUnion; + + case common::UINT1: return SQLTinyint; + case common::UINT2: return SQLSmallint; + case common::UINT4: return SQLInteger; + case common::UINT8: return SQLBigint; + + default: + return SQLUnknown; + } +} + +static bool isSortable(common::MinorType type) { + return type != common::MAP && type != common::LIST; +} + +static bool isNullable(common::DataMode mode) { + return mode == common::DM_OPTIONAL; // Same behaviour as JDBC +} + +static bool isSigned(common::MinorType type, common::DataMode mode) { + if (mode == common::DM_REPEATED) { + return false;// SQL ARRAY + } + + switch(type) { + case common::SMALLINT: + case common::INT: + case common::BIGINT: + case common::FLOAT4: + case common::FLOAT8: + + case common::DECIMAL9: + case common::DECIMAL18: + case common::DECIMAL28DENSE: + case common::DECIMAL38DENSE: + case common::DECIMAL38SPARSE: + + case common::INTERVALYEAR: + case common::INTERVALDAY: + case common::INTERVAL: + case common::MONEY: + case common::TINYINT: + return true; + + case common::BIT: + case common::VARCHAR: + case common::FIXEDCHAR: + + case common::VAR16CHAR: + case common::FIXED16CHAR: + + case common::VARBINARY: + case common::FIXEDBINARY: + + case common::DATE: + case common::TIME: + case common::TIMETZ: + case common::TIMESTAMP: + case common::TIMESTAMPTZ: + + case common::MAP: + case common::LATE: + case common::DM_UNKNOWN: + case common::UNION: + + case common::UINT1: + case common::UINT2: + case common::UINT4: + case common::UINT8: + return false; + + default: + return false; + } +} + +static Drill::FieldMetadata::ColumnSearchability getSearchability(exec::user::ColumnSearchability s) { + switch(s) { + case exec::user::UNKNOWN_SEARCHABILITY: return Drill::FieldMetadata::UNKNOWN_SEARCHABILITY; + case exec::user::NONE: return Drill::FieldMetadata::NONE; + case exec::user::CHAR: return Drill::FieldMetadata::CHAR; + case exec::user::NUMBER: return Drill::FieldMetadata::NUMBER; + case exec::user::ALL: return Drill::FieldMetadata::ALL; + + default: + return Drill::FieldMetadata::UNKNOWN_SEARCHABILITY; + } +} + +static Drill::FieldMetadata::ColumnUpdatability getUpdatability(exec::user::ColumnUpdatability u) { + switch(u) { + case exec::user::UNKNOWN_UPDATABILITY: return Drill::FieldMetadata::UNKNOWN_UPDATABILITY; + case exec::user::READ_ONLY: return Drill::FieldMetadata::READ_ONLY; + case exec::user::WRITABLE: return Drill::FieldMetadata::WRITABLE; + + default: + return Drill::FieldMetadata::UNKNOWN_UPDATABILITY; + } +} + +// Based on ODBC spec +// https://msdn.microsoft.com/en-us/library/ms711786(v=vs.85).aspx +static uint32_t getColumnSize(const std::string& type, uint32_t precision) { + if (type == SQLBoolean) { + return 1; + } + else if (type == SQLTinyint) { + return 3; + } + else if (type == SQLSmallint) { + return 5; + } + else if (type == SQLInteger) { + return 10; + } + else if (type == SQLBigint) { + return 19; + } + else if (type == SQLFloat) { + return 7; + } + else if (type == SQLDouble) { + return 15; + } + else if (type == SQLDecimal) { + return precision; + } + else if (type == SQLBinary || type == SQLVarbinary + || type == SQLChar || type == SQLVarchar + || type == SQLNChar || type == SQLVarnchar) { + return precision; + } + else if (type == SQLDate) { + return 10; // 'yyyy-MM-dd' format + } + else if (type == SQLTime) { + if (precision > 0) { + return 9 + precision; + } + else return 8; // 'hh-mm-ss' format + } + else if (type == SQLTimestamp) { + return (precision > 0) + ? 20 + precision + : 19; // 'yyyy-MM-ddThh-mm-ss' format + } + else if (type == SQLIntervalYearMonth) { + return (precision > 0) + ? 5 + precision // P..M31 + : 0; // if precision is not set, return 0 because there's not enough info + } + else if (type == SQLIntervalDaySecond) { + return (precision > 0) + ? 12 + precision // P..DT12H60M60....S + : 0; // if precision is not set, return 0 because there's not enough info + } + else { + return 0; + } +} + +static uint32_t getPrecision(const ::common::MajorType& type) { + const ::common::MinorType& minor_type = type.minor_type(); + + if (type.has_precision()) { + return type.precision(); + } + + if (minor_type == ::common::VARBINARY || minor_type == ::common::VARCHAR) { + return 65536; + } + + return 0; +} + +// From Types.java +// Based on ODBC spec: +// https://msdn.microsoft.com/en-us/library/ms713974(v=vs.85).aspx +static uint32_t getDisplaySize(const ::common::MajorType& type) { + if (type.mode() == ::common::DM_REPEATED || type.minor_type() == ::common::LIST) { + return 0; + } + + uint32_t precision = getPrecision(type); + + switch(type.minor_type()) { + case ::common::BIT: return 1; // 1 digit + + case ::common::TINYINT: return 4; // sign + 3 digit + case ::common::SMALLINT: return 6; // sign + 5 digits + case ::common::INT: return 11; // sign + 10 digits + case ::common::BIGINT: return 20; // sign + 19 digits + + case ::common::UINT1: return 3; // 3 digits + case ::common::UINT2: return 5; // 5 digits + case ::common::UINT4: return 10; // 10 digits + case ::common::UINT8: return 19; // 19 digits + + case ::common::FLOAT4: return 14; // sign + 7 digits + decimal point + E + 2 digits + case ::common::FLOAT8: return 24; // sign + 15 digits + decimal point + E + 3 digits + + case ::common::DECIMAL9: + case ::common::DECIMAL18: + case ::common::DECIMAL28DENSE: + case ::common::DECIMAL28SPARSE: + case ::common::DECIMAL38DENSE: + case ::common::DECIMAL38SPARSE: + case ::common::MONEY: return 2 + precision; // precision of the column plus a sign and a decimal point + + case ::common::VARCHAR: + case ::common::FIXEDCHAR: + case ::common::VAR16CHAR: + case ::common::FIXED16CHAR: return precision; // number of characters + + case ::common::VARBINARY: + case ::common::FIXEDBINARY: return 2 * precision; // each binary byte is represented as a 2digit hex number + + case ::common::DATE: return 10; // yyyy-mm-dd + case ::common::TIME: + return precision > 0 + ? 9 + precision // hh-mm-ss.SSS + : 8; // hh-mm-ss + case ::common::TIMETZ: + return precision > 0 + ? 15 + precision // hh-mm-ss.SSS-zz:zz + : 14; // hh-mm-ss-zz:zz + case ::common::TIMESTAMP: + return precision > 0 + ? 20 + precision // yyyy-mm-ddThh:mm:ss.SSS + : 19; // yyyy-mm-ddThh:mm:ss + case ::common::TIMESTAMPTZ: + return precision > 0 + ? 26 + precision // yyyy-mm-ddThh:mm:ss.SSS:ZZ-ZZ + : 25; // yyyy-mm-ddThh:mm:ss-ZZ:ZZ + + case ::common::INTERVALYEAR: + return precision > 0 + ? 5 + precision // P..Y12M + : 0; // if precision is not set, return 0 because there's not enough info + + case ::common::INTERVALDAY: + return precision > 0 + ? 12 + precision // P..DT12H60M60S assuming fractional seconds precision is not supported + : 0; // if precision is not set, return 0 because there's not enough info + + default: + // We don't know how to compute a display size, let's return 0 (unknown) + return 0; +} +} +} // namespace + +namespace Drill{ + +void FieldMetadata::set(const exec::shared::SerializedField& f){ + m_name=f.name_part().name(); + m_minorType=f.major_type().minor_type(); + m_dataMode=f.major_type().mode(); + m_valueCount=f.value_count(); + m_scale=f.major_type().scale(); + m_precision=f.major_type().precision(); + m_bufferLength=f.buffer_length(); + m_catalogName="DRILL"; + m_schemaName=""; // unknown + m_tableName=""; // unknown; + m_label=m_name; + m_sqlType=::getSQLType(m_minorType, m_dataMode); + m_nullable=::isNullable(m_dataMode); + m_signed=::isSigned(m_minorType, m_dataMode); + m_displaySize=::getDisplaySize(f.major_type()); + m_searchability=ALL; + m_updatability=READ_ONLY; + m_autoIncremented=false; + m_caseSensitive=false; + m_sortable=::isSortable(m_minorType); + m_currency=false; + m_columnSize = ::getColumnSize(m_sqlType, m_precision); +} + +void FieldMetadata::set(const exec::user::ResultColumnMetadata& m){ + m_name=m.column_name(); + m_minorType=static_cast<common::MinorType>(-1); + m_dataMode=static_cast<common::DataMode>(-1); + m_valueCount=0; + m_scale=m.scale(); + m_precision=m.precision(); + m_bufferLength=0; + m_catalogName=m.catalog_name(); + m_schemaName=m.schema_name(); + m_tableName=m.table_name(); + m_label=m.label(); + m_sqlType=m.data_type(); + m_nullable=m.is_nullable(); + m_displaySize=m.display_size(); + m_signed=m.signed_(); + m_searchability=::getSearchability(m.searchability()); + m_updatability=::getUpdatability(m.updatability()); + m_autoIncremented=m.auto_increment(); + m_caseSensitive=m.case_sensitivity(); + m_sortable=m.sortable(); + m_currency=m.is_currency(); + m_columnSize =::getColumnSize(m_sqlType, m_precision); +} + +}// namespace Drill + diff --git a/contrib/native/client/src/clientlib/metadata.cpp b/contrib/native/client/src/clientlib/metadata.cpp new file mode 100644 index 000000000..0364c7d81 --- /dev/null +++ b/contrib/native/client/src/clientlib/metadata.cpp @@ -0,0 +1,748 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/assign.hpp> +#include <boost/functional/hash.hpp> +#include <boost/unordered_set.hpp> +#include "drillClientImpl.hpp" + +#include "metadata.hpp" + +const std::string Drill::meta::DrillMetadata::s_connectorName(DRILL_CONNECTOR_NAME); +const std::string Drill::meta::DrillMetadata::s_connectorVersion(DRILL_VERSION_STRING); + +const std::string Drill::meta::DrillMetadata::s_catalogSeparator("."); +const std::string Drill::meta::DrillMetadata::s_catalogTerm("catalog"); +const std::string Drill::meta::DrillMetadata::s_identifierQuoteString("`"); + +const std::vector<std::string> Drill::meta::DrillMetadata::s_sqlKeywords = boost::assign::list_of + ("ABS")("ALLOW")("ARRAY")("ASENSITIVE")("ASYMMETRIC")("ATOMIC")("BIGINT")("BINARY")("BLOB") + ("BOOLEAN")("CALL")("CALLED")("CARDINALITY")("CEIL")("CEILING")("CLOB")("COLLECT")("CONDITION") + ("CORR")("COVAR_POP")("COVAR_SAMP")("CUBE")("CUME_DIST")("CURRENT_CATALOG") + ("CURRENT_DEFAULT_TRANSFORM_GROUP")("CURRENT_PATH")("CURRENT_ROLE")("CURRENT_SCHEMA") + ("CURRENT_TRANSFORM_GROUP_FOR_TYPE")("CYCLE")("DATABASE")("DATABASES")("DENSE_RANK")("DEREF") + ("DETERMINISTIC")("DISALLOW")("DYNAMIC")("EACH")("ELEMENT")("EVERY")("EXP")("EXPLAIN") + ("EXTEND")("FILES")("FILTER")("FIRST_VALUE")("FLOOR")("FREE")("FUNCTION")("FUSION")("GROUPING") + ("HOLD")("IF")("IMPORT")("INOUT")("INTERSECTION")("LARGE")("LAST_VALUE")("LATERAL")("LIMIT")("LN") + ("LOCALTIME")("LOCALTIMESTAMP")("MEMBER")("MERGE")("METADATA")("METHOD")("MOD")("MODIFIES") + ("MULTISET")("NCLOB")("NEW")("NONE")("NORMALIZE")("OFFSET")("OLD")("OUT")("OVER")("OVERLAY") + ("PARAMETER")("PARTITION")("PERCENTILE_CONT")("PERCENTILE_DISC")("PERCENT_RANK")("POWER") + ("RANGE")("RANK")("READS")("RECURSIVE")("REF")("REFERENCING")("REFRESH")("REGR_AVGX")("REGR_AVGY") + ("REGR_COUNT")("REGR_INTERCEPT")("REGR_R2")("REGR_SLOPE")("REGR_SXX")("REGR_SXY")("REGR_SYY") + ("RELEASE")("REPLACE")("RESET")("RESULT")("RETURN")("RETURNS")("ROLLUP")("ROW")("ROW_NUMBER") + ("SAVEPOINT")("SCHEMAS")("SCOPE")("SEARCH")("SENSITIVE")("SHOW")("SIMILAR")("SPECIFIC")("SPECIFICTYPE") + ("SQLEXCEPTION")("SQLWARNING")("SQRT")("START")("STATIC")("STDDEV_POP")("STDDEV_SAMP")("STREAM") + ("SUBMULTISET")("SYMMETRIC")("SYSTEM")("TABLES")("TABLESAMPLE")("TINYINT")("TREAT")("TRIGGER") + ("UESCAPE")("UNNEST")("UPSERT")("USE")("VARBINARY")("VAR_POP")("VAR_SAMP")("WIDTH_BUCKET") + ("WINDOW")("WITHIN")("WITHOUT"); + +const std::vector<std::string> Drill::meta::DrillMetadata::s_numericFunctions = boost::assign::list_of + ("ABS")("ACOS")("ASIN")("ATAN")("ATAN2")("CEILING")("COS")("COT") + ("DEGREES")("EXP")("FLOOR")("LOG")("LOG10")("MOD")("PI") + ("POWER")("RADIANS")("RAND")("ROUND")("SIGN")("SIN")("SQRT") + ("TAN")("TRUNCATE"); + +const std::string Drill::meta::DrillMetadata::s_schemaTerm("schema"); +const std::string Drill::meta::DrillMetadata::s_searchEscapeString("\\"); +const std::string Drill::meta::DrillMetadata::s_specialCharacters; + +const std::vector<std::string> Drill::meta::DrillMetadata::s_stringFunctions = boost::assign::list_of + ("ASCII")("CHAR")("CONCAT")("DIFFERENCE")("INSERT")("LCASE") + ("LEFT")("LENGTH")("LOCATE")("LTRIM")("REPEAT")("REPLACE") + ("RIGHT")("RTRIM")("SOUNDEX")("SPACE")("SUBSTRING")("UCASE"); + +const std::vector<std::string> Drill::meta::DrillMetadata::s_systemFunctions = boost::assign::list_of + ("DATABASE")("IFNULL")("USER"); + +const std::string Drill::meta::DrillMetadata::s_tableTerm("table"); + +const std::vector<std::string> Drill::meta::DrillMetadata::s_dateTimeFunctions = boost::assign::list_of + ("CURDATE")("CURTIME")("DAYNAME")("DAYOFMONTH")("DAYOFWEEK") + ("DAYOFYEAR")("HOUR")("MINUTE")("MONTH")("MONTHNAME")("NOW") + ("QUARTER")("SECOND")("TIMESTAMPADD")("TIMESTAMPDIFF")("WEEK")("YEAR"); + +namespace Drill { +namespace meta { +namespace { +using boost::assign::list_of; + +struct FromTo { + FromTo(common::MinorType from, common::MinorType to): m_from(from), m_to(to) {} + + common::MinorType m_from; + common::MinorType m_to; +}; + +bool operator==(FromTo const& ft1, FromTo const& ft2) { + return ft1.m_from == ft2.m_from && ft1.m_to == ft2.m_to; +} + +std::size_t hash_value(FromTo const& ft) { + std::size_t hash = 0; + boost::hash_combine(hash, ft.m_from); + boost::hash_combine(hash, ft.m_to); + + return hash; +} + +static boost::unordered_set<FromTo> s_convertMap = boost::assign::list_of + (FromTo(common::TINYINT, common::INT)) + (FromTo(common::TINYINT, common::BIGINT)) + (FromTo(common::TINYINT, common::DECIMAL9)) + (FromTo(common::TINYINT, common::DECIMAL18)) + (FromTo(common::TINYINT, common::DECIMAL28SPARSE)) + (FromTo(common::TINYINT, common::DECIMAL38SPARSE)) + (FromTo(common::TINYINT, common::DATE)) + (FromTo(common::TINYINT, common::TIME)) + (FromTo(common::TINYINT, common::TIMESTAMP)) + (FromTo(common::TINYINT, common::INTERVAL)) + (FromTo(common::TINYINT, common::FLOAT4)) + (FromTo(common::TINYINT, common::FLOAT8)) + (FromTo(common::TINYINT, common::BIT)) + (FromTo(common::TINYINT, common::VARCHAR)) + (FromTo(common::TINYINT, common::VAR16CHAR)) + (FromTo(common::TINYINT, common::VARBINARY)) + (FromTo(common::TINYINT, common::INTERVALYEAR)) + (FromTo(common::TINYINT, common::INTERVALDAY)) + (FromTo(common::SMALLINT, common::INT)) + (FromTo(common::SMALLINT, common::BIGINT)) + (FromTo(common::SMALLINT, common::DECIMAL9)) + (FromTo(common::SMALLINT, common::DECIMAL18)) + (FromTo(common::SMALLINT, common::DECIMAL28SPARSE)) + (FromTo(common::SMALLINT, common::DECIMAL38SPARSE)) + (FromTo(common::SMALLINT, common::DATE)) + (FromTo(common::SMALLINT, common::TIME)) + (FromTo(common::SMALLINT, common::TIMESTAMP)) + (FromTo(common::SMALLINT, common::INTERVAL)) + (FromTo(common::SMALLINT, common::FLOAT4)) + (FromTo(common::SMALLINT, common::FLOAT8)) + (FromTo(common::SMALLINT, common::BIT)) + (FromTo(common::SMALLINT, common::VARCHAR)) + (FromTo(common::SMALLINT, common::VAR16CHAR)) + (FromTo(common::SMALLINT, common::VARBINARY)) + (FromTo(common::SMALLINT, common::INTERVALYEAR)) + (FromTo(common::SMALLINT, common::INTERVALDAY)) + (FromTo(common::INT, common::INT)) + (FromTo(common::INT, common::BIGINT)) + (FromTo(common::INT, common::DECIMAL9)) + (FromTo(common::INT, common::DECIMAL18)) + (FromTo(common::INT, common::DECIMAL28SPARSE)) + (FromTo(common::INT, common::DECIMAL38SPARSE)) + (FromTo(common::INT, common::DATE)) + (FromTo(common::INT, common::TIME)) + (FromTo(common::INT, common::TIMESTAMP)) + (FromTo(common::INT, common::INTERVAL)) + (FromTo(common::INT, common::FLOAT4)) + (FromTo(common::INT, common::FLOAT8)) + (FromTo(common::INT, common::BIT)) + (FromTo(common::INT, common::VARCHAR)) + (FromTo(common::INT, common::VAR16CHAR)) + (FromTo(common::INT, common::VARBINARY)) + (FromTo(common::INT, common::INTERVALYEAR)) + (FromTo(common::INT, common::INTERVALDAY)) + (FromTo(common::BIGINT, common::INT)) + (FromTo(common::BIGINT, common::BIGINT)) + (FromTo(common::BIGINT, common::DECIMAL9)) + (FromTo(common::BIGINT, common::DECIMAL18)) + (FromTo(common::BIGINT, common::DECIMAL28SPARSE)) + (FromTo(common::BIGINT, common::DECIMAL38SPARSE)) + (FromTo(common::BIGINT, common::DATE)) + (FromTo(common::BIGINT, common::TIME)) + (FromTo(common::BIGINT, common::TIMESTAMP)) + (FromTo(common::BIGINT, common::INTERVAL)) + (FromTo(common::BIGINT, common::FLOAT4)) + (FromTo(common::BIGINT, common::FLOAT8)) + (FromTo(common::BIGINT, common::BIT)) + (FromTo(common::BIGINT, common::VARCHAR)) + (FromTo(common::BIGINT, common::VAR16CHAR)) + (FromTo(common::BIGINT, common::VARBINARY)) + (FromTo(common::BIGINT, common::INTERVALYEAR)) + (FromTo(common::BIGINT, common::INTERVALDAY)) + (FromTo(common::DECIMAL9, common::INT)) + (FromTo(common::DECIMAL9, common::BIGINT)) + (FromTo(common::DECIMAL9, common::DECIMAL9)) + (FromTo(common::DECIMAL9, common::DECIMAL18)) + (FromTo(common::DECIMAL9, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL9, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL9, common::DATE)) + (FromTo(common::DECIMAL9, common::TIME)) + (FromTo(common::DECIMAL9, common::TIMESTAMP)) + (FromTo(common::DECIMAL9, common::INTERVAL)) + (FromTo(common::DECIMAL9, common::FLOAT4)) + (FromTo(common::DECIMAL9, common::FLOAT8)) + (FromTo(common::DECIMAL9, common::BIT)) + (FromTo(common::DECIMAL9, common::VARCHAR)) + (FromTo(common::DECIMAL9, common::VAR16CHAR)) + (FromTo(common::DECIMAL9, common::VARBINARY)) + (FromTo(common::DECIMAL9, common::INTERVALYEAR)) + (FromTo(common::DECIMAL9, common::INTERVALDAY)) + (FromTo(common::DECIMAL18, common::INT)) + (FromTo(common::DECIMAL18, common::BIGINT)) + (FromTo(common::DECIMAL18, common::DECIMAL9)) + (FromTo(common::DECIMAL18, common::DECIMAL18)) + (FromTo(common::DECIMAL18, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL18, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL18, common::DATE)) + (FromTo(common::DECIMAL18, common::TIME)) + (FromTo(common::DECIMAL18, common::TIMESTAMP)) + (FromTo(common::DECIMAL18, common::INTERVAL)) + (FromTo(common::DECIMAL18, common::FLOAT4)) + (FromTo(common::DECIMAL18, common::FLOAT8)) + (FromTo(common::DECIMAL18, common::BIT)) + (FromTo(common::DECIMAL18, common::VARCHAR)) + (FromTo(common::DECIMAL18, common::VAR16CHAR)) + (FromTo(common::DECIMAL18, common::VARBINARY)) + (FromTo(common::DECIMAL18, common::INTERVALYEAR)) + (FromTo(common::DECIMAL18, common::INTERVALDAY)) + (FromTo(common::DECIMAL28SPARSE, common::INT)) + (FromTo(common::DECIMAL28SPARSE, common::BIGINT)) + (FromTo(common::DECIMAL28SPARSE, common::DECIMAL9)) + (FromTo(common::DECIMAL28SPARSE, common::DECIMAL18)) + (FromTo(common::DECIMAL28SPARSE, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL28SPARSE, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL28SPARSE, common::DATE)) + (FromTo(common::DECIMAL28SPARSE, common::TIME)) + (FromTo(common::DECIMAL28SPARSE, common::TIMESTAMP)) + (FromTo(common::DECIMAL28SPARSE, common::INTERVAL)) + (FromTo(common::DECIMAL28SPARSE, common::FLOAT4)) + (FromTo(common::DECIMAL28SPARSE, common::FLOAT8)) + (FromTo(common::DECIMAL28SPARSE, common::BIT)) + (FromTo(common::DECIMAL28SPARSE, common::VARCHAR)) + (FromTo(common::DECIMAL28SPARSE, common::VAR16CHAR)) + (FromTo(common::DECIMAL28SPARSE, common::VARBINARY)) + (FromTo(common::DECIMAL28SPARSE, common::INTERVALYEAR)) + (FromTo(common::DECIMAL28SPARSE, common::INTERVALDAY)) + (FromTo(common::DECIMAL38SPARSE, common::INT)) + (FromTo(common::DECIMAL38SPARSE, common::BIGINT)) + (FromTo(common::DECIMAL38SPARSE, common::DECIMAL9)) + (FromTo(common::DECIMAL38SPARSE, common::DECIMAL18)) + (FromTo(common::DECIMAL38SPARSE, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL38SPARSE, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL38SPARSE, common::DATE)) + (FromTo(common::DECIMAL38SPARSE, common::TIME)) + (FromTo(common::DECIMAL38SPARSE, common::TIMESTAMP)) + (FromTo(common::DECIMAL38SPARSE, common::INTERVAL)) + (FromTo(common::DECIMAL38SPARSE, common::FLOAT4)) + (FromTo(common::DECIMAL38SPARSE, common::FLOAT8)) + (FromTo(common::DECIMAL38SPARSE, common::BIT)) + (FromTo(common::DECIMAL38SPARSE, common::VARCHAR)) + (FromTo(common::DECIMAL38SPARSE, common::VAR16CHAR)) + (FromTo(common::DECIMAL38SPARSE, common::VARBINARY)) + (FromTo(common::DECIMAL38SPARSE, common::INTERVALYEAR)) + (FromTo(common::DECIMAL38SPARSE, common::INTERVALDAY)) + (FromTo(common::MONEY, common::INT)) + (FromTo(common::MONEY, common::BIGINT)) + (FromTo(common::MONEY, common::DECIMAL9)) + (FromTo(common::MONEY, common::DECIMAL18)) + (FromTo(common::MONEY, common::DECIMAL28SPARSE)) + (FromTo(common::MONEY, common::DECIMAL38SPARSE)) + (FromTo(common::MONEY, common::DATE)) + (FromTo(common::MONEY, common::TIME)) + (FromTo(common::MONEY, common::TIMESTAMP)) + (FromTo(common::MONEY, common::INTERVAL)) + (FromTo(common::MONEY, common::FLOAT4)) + (FromTo(common::MONEY, common::FLOAT8)) + (FromTo(common::MONEY, common::BIT)) + (FromTo(common::MONEY, common::VARCHAR)) + (FromTo(common::MONEY, common::VAR16CHAR)) + (FromTo(common::MONEY, common::VARBINARY)) + (FromTo(common::MONEY, common::INTERVALYEAR)) + (FromTo(common::MONEY, common::INTERVALDAY)) + (FromTo(common::DATE, common::INT)) + (FromTo(common::DATE, common::BIGINT)) + (FromTo(common::DATE, common::DECIMAL9)) + (FromTo(common::DATE, common::DECIMAL18)) + (FromTo(common::DATE, common::DECIMAL28SPARSE)) + (FromTo(common::DATE, common::DECIMAL38SPARSE)) + (FromTo(common::DATE, common::DATE)) + (FromTo(common::DATE, common::TIME)) + (FromTo(common::DATE, common::TIMESTAMP)) + (FromTo(common::DATE, common::INTERVAL)) + (FromTo(common::DATE, common::FLOAT4)) + (FromTo(common::DATE, common::FLOAT8)) + (FromTo(common::DATE, common::BIT)) + (FromTo(common::DATE, common::VARCHAR)) + (FromTo(common::DATE, common::VAR16CHAR)) + (FromTo(common::DATE, common::VARBINARY)) + (FromTo(common::DATE, common::INTERVALYEAR)) + (FromTo(common::DATE, common::INTERVALDAY)) + (FromTo(common::TIME, common::INT)) + (FromTo(common::TIME, common::BIGINT)) + (FromTo(common::TIME, common::DECIMAL9)) + (FromTo(common::TIME, common::DECIMAL18)) + (FromTo(common::TIME, common::DECIMAL28SPARSE)) + (FromTo(common::TIME, common::DECIMAL38SPARSE)) + (FromTo(common::TIME, common::DATE)) + (FromTo(common::TIME, common::TIME)) + (FromTo(common::TIME, common::TIMESTAMP)) + (FromTo(common::TIME, common::INTERVAL)) + (FromTo(common::TIME, common::FLOAT4)) + (FromTo(common::TIME, common::FLOAT8)) + (FromTo(common::TIME, common::BIT)) + (FromTo(common::TIME, common::VARCHAR)) + (FromTo(common::TIME, common::VAR16CHAR)) + (FromTo(common::TIME, common::VARBINARY)) + (FromTo(common::TIME, common::INTERVALYEAR)) + (FromTo(common::TIME, common::INTERVALDAY)) + (FromTo(common::TIMESTAMPTZ, common::INT)) + (FromTo(common::TIMESTAMPTZ, common::BIGINT)) + (FromTo(common::TIMESTAMPTZ, common::DECIMAL9)) + (FromTo(common::TIMESTAMPTZ, common::DECIMAL18)) + (FromTo(common::TIMESTAMPTZ, common::DECIMAL28SPARSE)) + (FromTo(common::TIMESTAMPTZ, common::DECIMAL38SPARSE)) + (FromTo(common::TIMESTAMPTZ, common::DATE)) + (FromTo(common::TIMESTAMPTZ, common::TIME)) + (FromTo(common::TIMESTAMPTZ, common::TIMESTAMP)) + (FromTo(common::TIMESTAMPTZ, common::INTERVAL)) + (FromTo(common::TIMESTAMPTZ, common::FLOAT4)) + (FromTo(common::TIMESTAMPTZ, common::FLOAT8)) + (FromTo(common::TIMESTAMPTZ, common::BIT)) + (FromTo(common::TIMESTAMPTZ, common::VARCHAR)) + (FromTo(common::TIMESTAMPTZ, common::VAR16CHAR)) + (FromTo(common::TIMESTAMPTZ, common::VARBINARY)) + (FromTo(common::TIMESTAMPTZ, common::INTERVALYEAR)) + (FromTo(common::TIMESTAMPTZ, common::INTERVALDAY)) + (FromTo(common::TIMESTAMP, common::INT)) + (FromTo(common::TIMESTAMP, common::BIGINT)) + (FromTo(common::TIMESTAMP, common::DECIMAL9)) + (FromTo(common::TIMESTAMP, common::DECIMAL18)) + (FromTo(common::TIMESTAMP, common::DECIMAL28SPARSE)) + (FromTo(common::TIMESTAMP, common::DECIMAL38SPARSE)) + (FromTo(common::TIMESTAMP, common::DATE)) + (FromTo(common::TIMESTAMP, common::TIME)) + (FromTo(common::TIMESTAMP, common::TIMESTAMP)) + (FromTo(common::TIMESTAMP, common::INTERVAL)) + (FromTo(common::TIMESTAMP, common::FLOAT4)) + (FromTo(common::TIMESTAMP, common::FLOAT8)) + (FromTo(common::TIMESTAMP, common::BIT)) + (FromTo(common::TIMESTAMP, common::VARCHAR)) + (FromTo(common::TIMESTAMP, common::VAR16CHAR)) + (FromTo(common::TIMESTAMP, common::VARBINARY)) + (FromTo(common::TIMESTAMP, common::INTERVALYEAR)) + (FromTo(common::TIMESTAMP, common::INTERVALDAY)) + (FromTo(common::INTERVAL, common::INT)) + (FromTo(common::INTERVAL, common::BIGINT)) + (FromTo(common::INTERVAL, common::DECIMAL9)) + (FromTo(common::INTERVAL, common::DECIMAL18)) + (FromTo(common::INTERVAL, common::DECIMAL28SPARSE)) + (FromTo(common::INTERVAL, common::DECIMAL38SPARSE)) + (FromTo(common::INTERVAL, common::DATE)) + (FromTo(common::INTERVAL, common::TIME)) + (FromTo(common::INTERVAL, common::TIMESTAMP)) + (FromTo(common::INTERVAL, common::INTERVAL)) + (FromTo(common::INTERVAL, common::FLOAT4)) + (FromTo(common::INTERVAL, common::FLOAT8)) + (FromTo(common::INTERVAL, common::BIT)) + (FromTo(common::INTERVAL, common::VARCHAR)) + (FromTo(common::INTERVAL, common::VAR16CHAR)) + (FromTo(common::INTERVAL, common::VARBINARY)) + (FromTo(common::INTERVAL, common::INTERVALYEAR)) + (FromTo(common::INTERVAL, common::INTERVALDAY)) + (FromTo(common::FLOAT4, common::INT)) + (FromTo(common::FLOAT4, common::BIGINT)) + (FromTo(common::FLOAT4, common::DECIMAL9)) + (FromTo(common::FLOAT4, common::DECIMAL18)) + (FromTo(common::FLOAT4, common::DECIMAL28SPARSE)) + (FromTo(common::FLOAT4, common::DECIMAL38SPARSE)) + (FromTo(common::FLOAT4, common::DATE)) + (FromTo(common::FLOAT4, common::TIME)) + (FromTo(common::FLOAT4, common::TIMESTAMP)) + (FromTo(common::FLOAT4, common::INTERVAL)) + (FromTo(common::FLOAT4, common::FLOAT4)) + (FromTo(common::FLOAT4, common::FLOAT8)) + (FromTo(common::FLOAT4, common::BIT)) + (FromTo(common::FLOAT4, common::VARCHAR)) + (FromTo(common::FLOAT4, common::VAR16CHAR)) + (FromTo(common::FLOAT4, common::VARBINARY)) + (FromTo(common::FLOAT4, common::INTERVALYEAR)) + (FromTo(common::FLOAT4, common::INTERVALDAY)) + (FromTo(common::FLOAT8, common::INT)) + (FromTo(common::FLOAT8, common::BIGINT)) + (FromTo(common::FLOAT8, common::DECIMAL9)) + (FromTo(common::FLOAT8, common::DECIMAL18)) + (FromTo(common::FLOAT8, common::DECIMAL28SPARSE)) + (FromTo(common::FLOAT8, common::DECIMAL38SPARSE)) + (FromTo(common::FLOAT8, common::DATE)) + (FromTo(common::FLOAT8, common::TIME)) + (FromTo(common::FLOAT8, common::TIMESTAMP)) + (FromTo(common::FLOAT8, common::INTERVAL)) + (FromTo(common::FLOAT8, common::FLOAT4)) + (FromTo(common::FLOAT8, common::FLOAT8)) + (FromTo(common::FLOAT8, common::BIT)) + (FromTo(common::FLOAT8, common::VARCHAR)) + (FromTo(common::FLOAT8, common::VAR16CHAR)) + (FromTo(common::FLOAT8, common::VARBINARY)) + (FromTo(common::FLOAT8, common::INTERVALYEAR)) + (FromTo(common::FLOAT8, common::INTERVALDAY)) + (FromTo(common::BIT, common::TINYINT)) + (FromTo(common::BIT, common::INT)) + (FromTo(common::BIT, common::BIGINT)) + (FromTo(common::BIT, common::DECIMAL9)) + (FromTo(common::BIT, common::DECIMAL18)) + (FromTo(common::BIT, common::DECIMAL28SPARSE)) + (FromTo(common::BIT, common::DECIMAL38SPARSE)) + (FromTo(common::BIT, common::DATE)) + (FromTo(common::BIT, common::TIME)) + (FromTo(common::BIT, common::TIMESTAMP)) + (FromTo(common::BIT, common::INTERVAL)) + (FromTo(common::BIT, common::FLOAT4)) + (FromTo(common::BIT, common::FLOAT8)) + (FromTo(common::BIT, common::BIT)) + (FromTo(common::BIT, common::VARCHAR)) + (FromTo(common::BIT, common::VAR16CHAR)) + (FromTo(common::BIT, common::VARBINARY)) + (FromTo(common::BIT, common::INTERVALYEAR)) + (FromTo(common::BIT, common::INTERVALDAY)) + (FromTo(common::FIXEDCHAR, common::TINYINT)) + (FromTo(common::FIXEDCHAR, common::INT)) + (FromTo(common::FIXEDCHAR, common::BIGINT)) + (FromTo(common::FIXEDCHAR, common::DECIMAL9)) + (FromTo(common::FIXEDCHAR, common::DECIMAL18)) + (FromTo(common::FIXEDCHAR, common::DECIMAL28SPARSE)) + (FromTo(common::FIXEDCHAR, common::DECIMAL38SPARSE)) + (FromTo(common::FIXEDCHAR, common::DATE)) + (FromTo(common::FIXEDCHAR, common::TIME)) + (FromTo(common::FIXEDCHAR, common::TIMESTAMP)) + (FromTo(common::FIXEDCHAR, common::INTERVAL)) + (FromTo(common::FIXEDCHAR, common::FLOAT4)) + (FromTo(common::FIXEDCHAR, common::FLOAT8)) + (FromTo(common::FIXEDCHAR, common::BIT)) + (FromTo(common::FIXEDCHAR, common::VARCHAR)) + (FromTo(common::FIXEDCHAR, common::VAR16CHAR)) + (FromTo(common::FIXEDCHAR, common::VARBINARY)) + (FromTo(common::FIXEDCHAR, common::INTERVALYEAR)) + (FromTo(common::FIXEDCHAR, common::INTERVALDAY)) + (FromTo(common::FIXED16CHAR, common::TINYINT)) + (FromTo(common::FIXED16CHAR, common::INT)) + (FromTo(common::FIXED16CHAR, common::BIGINT)) + (FromTo(common::FIXED16CHAR, common::DECIMAL9)) + (FromTo(common::FIXED16CHAR, common::DECIMAL18)) + (FromTo(common::FIXED16CHAR, common::DECIMAL28SPARSE)) + (FromTo(common::FIXED16CHAR, common::DECIMAL38SPARSE)) + (FromTo(common::FIXED16CHAR, common::DATE)) + (FromTo(common::FIXED16CHAR, common::TIME)) + (FromTo(common::FIXED16CHAR, common::TIMESTAMP)) + (FromTo(common::FIXED16CHAR, common::INTERVAL)) + (FromTo(common::FIXED16CHAR, common::FLOAT4)) + (FromTo(common::FIXED16CHAR, common::FLOAT8)) + (FromTo(common::FIXED16CHAR, common::BIT)) + (FromTo(common::FIXED16CHAR, common::VARCHAR)) + (FromTo(common::FIXED16CHAR, common::VAR16CHAR)) + (FromTo(common::FIXED16CHAR, common::VARBINARY)) + (FromTo(common::FIXED16CHAR, common::INTERVALYEAR)) + (FromTo(common::FIXED16CHAR, common::INTERVALDAY)) + (FromTo(common::FIXEDBINARY, common::INT)) + (FromTo(common::FIXEDBINARY, common::BIGINT)) + (FromTo(common::FIXEDBINARY, common::DECIMAL9)) + (FromTo(common::FIXEDBINARY, common::DECIMAL18)) + (FromTo(common::FIXEDBINARY, common::DECIMAL28SPARSE)) + (FromTo(common::FIXEDBINARY, common::DECIMAL38SPARSE)) + (FromTo(common::FIXEDBINARY, common::DATE)) + (FromTo(common::FIXEDBINARY, common::TIME)) + (FromTo(common::FIXEDBINARY, common::TIMESTAMP)) + (FromTo(common::FIXEDBINARY, common::INTERVAL)) + (FromTo(common::FIXEDBINARY, common::FLOAT4)) + (FromTo(common::FIXEDBINARY, common::FLOAT8)) + (FromTo(common::FIXEDBINARY, common::BIT)) + (FromTo(common::FIXEDBINARY, common::VARCHAR)) + (FromTo(common::FIXEDBINARY, common::VAR16CHAR)) + (FromTo(common::FIXEDBINARY, common::VARBINARY)) + (FromTo(common::FIXEDBINARY, common::INTERVALYEAR)) + (FromTo(common::FIXEDBINARY, common::INTERVALDAY)) + (FromTo(common::VARCHAR, common::TINYINT)) + (FromTo(common::VARCHAR, common::INT)) + (FromTo(common::VARCHAR, common::BIGINT)) + (FromTo(common::VARCHAR, common::DECIMAL9)) + (FromTo(common::VARCHAR, common::DECIMAL18)) + (FromTo(common::VARCHAR, common::DECIMAL28SPARSE)) + (FromTo(common::VARCHAR, common::DECIMAL38SPARSE)) + (FromTo(common::VARCHAR, common::DATE)) + (FromTo(common::VARCHAR, common::TIME)) + (FromTo(common::VARCHAR, common::TIMESTAMP)) + (FromTo(common::VARCHAR, common::INTERVAL)) + (FromTo(common::VARCHAR, common::FLOAT4)) + (FromTo(common::VARCHAR, common::FLOAT8)) + (FromTo(common::VARCHAR, common::BIT)) + (FromTo(common::VARCHAR, common::VARCHAR)) + (FromTo(common::VARCHAR, common::VAR16CHAR)) + (FromTo(common::VARCHAR, common::VARBINARY)) + (FromTo(common::VARCHAR, common::INTERVALYEAR)) + (FromTo(common::VARCHAR, common::INTERVALDAY)) + (FromTo(common::VAR16CHAR, common::TINYINT)) + (FromTo(common::VAR16CHAR, common::INT)) + (FromTo(common::VAR16CHAR, common::BIGINT)) + (FromTo(common::VAR16CHAR, common::DECIMAL9)) + (FromTo(common::VAR16CHAR, common::DECIMAL18)) + (FromTo(common::VAR16CHAR, common::DECIMAL28SPARSE)) + (FromTo(common::VAR16CHAR, common::DECIMAL38SPARSE)) + (FromTo(common::VAR16CHAR, common::DATE)) + (FromTo(common::VAR16CHAR, common::TIME)) + (FromTo(common::VAR16CHAR, common::TIMESTAMP)) + (FromTo(common::VAR16CHAR, common::INTERVAL)) + (FromTo(common::VAR16CHAR, common::FLOAT4)) + (FromTo(common::VAR16CHAR, common::FLOAT8)) + (FromTo(common::VAR16CHAR, common::BIT)) + (FromTo(common::VAR16CHAR, common::VARCHAR)) + (FromTo(common::VAR16CHAR, common::VARBINARY)) + (FromTo(common::VAR16CHAR, common::INTERVALYEAR)) + (FromTo(common::VAR16CHAR, common::INTERVALDAY)) + (FromTo(common::VARBINARY, common::TINYINT)) + (FromTo(common::VARBINARY, common::INT)) + (FromTo(common::VARBINARY, common::BIGINT)) + (FromTo(common::VARBINARY, common::DECIMAL9)) + (FromTo(common::VARBINARY, common::DECIMAL18)) + (FromTo(common::VARBINARY, common::DECIMAL28SPARSE)) + (FromTo(common::VARBINARY, common::DECIMAL38SPARSE)) + (FromTo(common::VARBINARY, common::DATE)) + (FromTo(common::VARBINARY, common::TIME)) + (FromTo(common::VARBINARY, common::TIMESTAMP)) + (FromTo(common::VARBINARY, common::INTERVAL)) + (FromTo(common::VARBINARY, common::FLOAT4)) + (FromTo(common::VARBINARY, common::FLOAT8)) + (FromTo(common::VARBINARY, common::BIT)) + (FromTo(common::VARBINARY, common::VARCHAR)) + (FromTo(common::VARBINARY, common::VAR16CHAR)) + (FromTo(common::VARBINARY, common::VARBINARY)) + (FromTo(common::VARBINARY, common::INTERVALYEAR)) + (FromTo(common::VARBINARY, common::INTERVALDAY)) + (FromTo(common::UINT1, common::INT)) + (FromTo(common::UINT1, common::BIGINT)) + (FromTo(common::UINT1, common::DECIMAL9)) + (FromTo(common::UINT1, common::DECIMAL18)) + (FromTo(common::UINT1, common::DECIMAL28SPARSE)) + (FromTo(common::UINT1, common::DECIMAL38SPARSE)) + (FromTo(common::UINT1, common::DATE)) + (FromTo(common::UINT1, common::TIME)) + (FromTo(common::UINT1, common::TIMESTAMP)) + (FromTo(common::UINT1, common::INTERVAL)) + (FromTo(common::UINT1, common::FLOAT4)) + (FromTo(common::UINT1, common::FLOAT8)) + (FromTo(common::UINT1, common::BIT)) + (FromTo(common::UINT1, common::VARCHAR)) + (FromTo(common::UINT1, common::VAR16CHAR)) + (FromTo(common::UINT1, common::VARBINARY)) + (FromTo(common::UINT1, common::INTERVALYEAR)) + (FromTo(common::UINT1, common::INTERVALDAY)) + (FromTo(common::UINT2, common::INT)) + (FromTo(common::UINT2, common::BIGINT)) + (FromTo(common::UINT2, common::DECIMAL9)) + (FromTo(common::UINT2, common::DECIMAL18)) + (FromTo(common::UINT2, common::DECIMAL28SPARSE)) + (FromTo(common::UINT2, common::DECIMAL38SPARSE)) + (FromTo(common::UINT2, common::DATE)) + (FromTo(common::UINT2, common::TIME)) + (FromTo(common::UINT2, common::TIMESTAMP)) + (FromTo(common::UINT2, common::INTERVAL)) + (FromTo(common::UINT2, common::FLOAT4)) + (FromTo(common::UINT2, common::FLOAT8)) + (FromTo(common::UINT2, common::BIT)) + (FromTo(common::UINT2, common::VARCHAR)) + (FromTo(common::UINT2, common::VAR16CHAR)) + (FromTo(common::UINT2, common::VARBINARY)) + (FromTo(common::UINT2, common::INTERVALYEAR)) + (FromTo(common::UINT2, common::INTERVALDAY)) + (FromTo(common::UINT4, common::INT)) + (FromTo(common::UINT4, common::BIGINT)) + (FromTo(common::UINT4, common::DECIMAL9)) + (FromTo(common::UINT4, common::DECIMAL18)) + (FromTo(common::UINT4, common::DECIMAL28SPARSE)) + (FromTo(common::UINT4, common::DECIMAL38SPARSE)) + (FromTo(common::UINT4, common::DATE)) + (FromTo(common::UINT4, common::TIME)) + (FromTo(common::UINT4, common::TIMESTAMP)) + (FromTo(common::UINT4, common::INTERVAL)) + (FromTo(common::UINT4, common::FLOAT4)) + (FromTo(common::UINT4, common::FLOAT8)) + (FromTo(common::UINT4, common::BIT)) + (FromTo(common::UINT4, common::VARCHAR)) + (FromTo(common::UINT4, common::VAR16CHAR)) + (FromTo(common::UINT4, common::VARBINARY)) + (FromTo(common::UINT4, common::INTERVALYEAR)) + (FromTo(common::UINT4, common::INTERVALDAY)) + (FromTo(common::UINT8, common::INT)) + (FromTo(common::UINT8, common::BIGINT)) + (FromTo(common::UINT8, common::DECIMAL9)) + (FromTo(common::UINT8, common::DECIMAL18)) + (FromTo(common::UINT8, common::DECIMAL28SPARSE)) + (FromTo(common::UINT8, common::DECIMAL38SPARSE)) + (FromTo(common::UINT8, common::DATE)) + (FromTo(common::UINT8, common::TIME)) + (FromTo(common::UINT8, common::TIMESTAMP)) + (FromTo(common::UINT8, common::INTERVAL)) + (FromTo(common::UINT8, common::FLOAT4)) + (FromTo(common::UINT8, common::FLOAT8)) + (FromTo(common::UINT8, common::BIT)) + (FromTo(common::UINT8, common::VARCHAR)) + (FromTo(common::UINT8, common::VAR16CHAR)) + (FromTo(common::UINT8, common::VARBINARY)) + (FromTo(common::UINT8, common::INTERVALYEAR)) + (FromTo(common::UINT8, common::INTERVALDAY)) + (FromTo(common::DECIMAL28DENSE, common::INT)) + (FromTo(common::DECIMAL28DENSE, common::BIGINT)) + (FromTo(common::DECIMAL28DENSE, common::DECIMAL9)) + (FromTo(common::DECIMAL28DENSE, common::DECIMAL18)) + (FromTo(common::DECIMAL28DENSE, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL28DENSE, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL28DENSE, common::DATE)) + (FromTo(common::DECIMAL28DENSE, common::TIME)) + (FromTo(common::DECIMAL28DENSE, common::TIMESTAMP)) + (FromTo(common::DECIMAL28DENSE, common::INTERVAL)) + (FromTo(common::DECIMAL28DENSE, common::FLOAT4)) + (FromTo(common::DECIMAL28DENSE, common::FLOAT8)) + (FromTo(common::DECIMAL28DENSE, common::BIT)) + (FromTo(common::DECIMAL28DENSE, common::VARCHAR)) + (FromTo(common::DECIMAL28DENSE, common::VAR16CHAR)) + (FromTo(common::DECIMAL28DENSE, common::VARBINARY)) + (FromTo(common::DECIMAL28DENSE, common::INTERVALYEAR)) + (FromTo(common::DECIMAL28DENSE, common::INTERVALDAY)) + (FromTo(common::DECIMAL38DENSE, common::INT)) + (FromTo(common::DECIMAL38DENSE, common::BIGINT)) + (FromTo(common::DECIMAL38DENSE, common::DECIMAL9)) + (FromTo(common::DECIMAL38DENSE, common::DECIMAL18)) + (FromTo(common::DECIMAL38DENSE, common::DECIMAL28SPARSE)) + (FromTo(common::DECIMAL38DENSE, common::DECIMAL38SPARSE)) + (FromTo(common::DECIMAL38DENSE, common::DATE)) + (FromTo(common::DECIMAL38DENSE, common::TIME)) + (FromTo(common::DECIMAL38DENSE, common::TIMESTAMP)) + (FromTo(common::DECIMAL38DENSE, common::INTERVAL)) + (FromTo(common::DECIMAL38DENSE, common::FLOAT4)) + (FromTo(common::DECIMAL38DENSE, common::FLOAT8)) + (FromTo(common::DECIMAL38DENSE, common::BIT)) + (FromTo(common::DECIMAL38DENSE, common::VARCHAR)) + (FromTo(common::DECIMAL38DENSE, common::VAR16CHAR)) + (FromTo(common::DECIMAL38DENSE, common::VARBINARY)) + (FromTo(common::DECIMAL38DENSE, common::INTERVALYEAR)) + (FromTo(common::DECIMAL38DENSE, common::INTERVALDAY)) + (FromTo(common::DM_UNKNOWN, common::TINYINT)) + (FromTo(common::DM_UNKNOWN, common::INT)) + (FromTo(common::DM_UNKNOWN, common::BIGINT)) + (FromTo(common::DM_UNKNOWN, common::DECIMAL9)) + (FromTo(common::DM_UNKNOWN, common::DECIMAL18)) + (FromTo(common::DM_UNKNOWN, common::DECIMAL28SPARSE)) + (FromTo(common::DM_UNKNOWN, common::DECIMAL38SPARSE)) + (FromTo(common::DM_UNKNOWN, common::DATE)) + (FromTo(common::DM_UNKNOWN, common::TIME)) + (FromTo(common::DM_UNKNOWN, common::TIMESTAMP)) + (FromTo(common::DM_UNKNOWN, common::INTERVAL)) + (FromTo(common::DM_UNKNOWN, common::FLOAT4)) + (FromTo(common::DM_UNKNOWN, common::FLOAT8)) + (FromTo(common::DM_UNKNOWN, common::BIT)) + (FromTo(common::DM_UNKNOWN, common::VARCHAR)) + (FromTo(common::DM_UNKNOWN, common::VAR16CHAR)) + (FromTo(common::DM_UNKNOWN, common::VARBINARY)) + (FromTo(common::DM_UNKNOWN, common::INTERVALYEAR)) + (FromTo(common::DM_UNKNOWN, common::INTERVALDAY)) + (FromTo(common::INTERVALYEAR, common::INT)) + (FromTo(common::INTERVALYEAR, common::BIGINT)) + (FromTo(common::INTERVALYEAR, common::DECIMAL9)) + (FromTo(common::INTERVALYEAR, common::DECIMAL18)) + (FromTo(common::INTERVALYEAR, common::DECIMAL28SPARSE)) + (FromTo(common::INTERVALYEAR, common::DECIMAL38SPARSE)) + (FromTo(common::INTERVALYEAR, common::DATE)) + (FromTo(common::INTERVALYEAR, common::TIME)) + (FromTo(common::INTERVALYEAR, common::TIMESTAMP)) + (FromTo(common::INTERVALYEAR, common::INTERVAL)) + (FromTo(common::INTERVALYEAR, common::FLOAT4)) + (FromTo(common::INTERVALYEAR, common::FLOAT8)) + (FromTo(common::INTERVALYEAR, common::BIT)) + (FromTo(common::INTERVALYEAR, common::VARCHAR)) + (FromTo(common::INTERVALYEAR, common::VAR16CHAR)) + (FromTo(common::INTERVALYEAR, common::VARBINARY)) + (FromTo(common::INTERVALYEAR, common::INTERVALYEAR)) + (FromTo(common::INTERVALYEAR, common::INTERVALDAY)) + (FromTo(common::INTERVALDAY, common::INT)) + (FromTo(common::INTERVALDAY, common::BIGINT)) + (FromTo(common::INTERVALDAY, common::DECIMAL9)) + (FromTo(common::INTERVALDAY, common::DECIMAL18)) + (FromTo(common::INTERVALDAY, common::DECIMAL28SPARSE)) + (FromTo(common::INTERVALDAY, common::DECIMAL38SPARSE)) + (FromTo(common::INTERVALDAY, common::DATE)) + (FromTo(common::INTERVALDAY, common::TIME)) + (FromTo(common::INTERVALDAY, common::TIMESTAMP)) + (FromTo(common::INTERVALDAY, common::INTERVAL)) + (FromTo(common::INTERVALDAY, common::FLOAT4)) + (FromTo(common::INTERVALDAY, common::FLOAT8)) + (FromTo(common::INTERVALDAY, common::BIT)) + (FromTo(common::INTERVALDAY, common::VARCHAR)) + (FromTo(common::INTERVALDAY, common::VAR16CHAR)) + (FromTo(common::INTERVALDAY, common::VARBINARY)) + (FromTo(common::INTERVALDAY, common::INTERVALYEAR)) + (FromTo(common::INTERVALDAY, common::INTERVALDAY)); +} // anonymous namespace + +// Conversion scalar function support +bool DrillMetadata::isConvertSupported(common::MinorType from, common::MinorType to) const { + return s_convertMap.find(FromTo(from,to)) != s_convertMap.end(); +} + +const std::string& DrillMetadata::getServerName() const { + return m_client.getServerInfos().name(); +} +const std::string& DrillMetadata::getServerVersion() const { + return m_client.getServerInfos().version(); +} +uint32_t DrillMetadata::getServerMajorVersion() const { + return m_client.getServerInfos().majorversion(); +} + +uint32_t DrillMetadata::getServerMinorVersion() const { + return m_client.getServerInfos().minorversion(); +} + +uint32_t DrillMetadata::getServerPatchVersion() const { + return m_client.getServerInfos().patchversion(); +} + +status_t DrillMetadata::getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientCatalogResult* result = m_client.getCatalogs(catalogPattern, listener, listenerCtx); + if(result==NULL){ + *qHandle=NULL; + return static_cast<status_t>(m_client.getError()->status); + } + *qHandle=reinterpret_cast<QueryHandle_t>(result); + return QRY_SUCCESS; +} +status_t DrillMetadata::getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientSchemaResult* result = m_client.getSchemas(catalogPattern, schemaPattern, listener, listenerCtx); + if(result==NULL){ + *qHandle=NULL; + return static_cast<status_t>(m_client.getError()->status); + } + *qHandle=reinterpret_cast<QueryHandle_t>(result); + return QRY_SUCCESS; +} +status_t DrillMetadata::getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientTableResult* result = m_client.getTables(catalogPattern, schemaPattern, tablePattern, tableTypes, listener, listenerCtx); + if(result==NULL){ + *qHandle=NULL; + return static_cast<status_t>(m_client.getError()->status); + } + *qHandle=reinterpret_cast<QueryHandle_t>(result); + return QRY_SUCCESS; +} +status_t DrillMetadata::getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std:: string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) { + DrillClientColumnResult* result = m_client.getColumns(catalogPattern, schemaPattern, tablePattern, columnPattern, listener, listenerCtx); + if(result==NULL){ + *qHandle=NULL; + return static_cast<status_t>(m_client.getError()->status); + } + *qHandle=reinterpret_cast<QueryHandle_t>(result); + return QRY_SUCCESS; +} +} // namespace meta +} // namespace Drill diff --git a/contrib/native/client/src/clientlib/metadata.hpp b/contrib/native/client/src/clientlib/metadata.hpp new file mode 100644 index 000000000..0cc8987ed --- /dev/null +++ b/contrib/native/client/src/clientlib/metadata.hpp @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef DRILL_METADATA_H +#define DRILL_METADATA_H + +#include <boost/ref.hpp> + +#include "drill/common.hpp" +#include "drill/drillClient.hpp" +#include "env.h" +#include "User.pb.h" + +namespace Drill { +class DrillClientImpl; + +namespace meta { + class DrillCatalogMetadata: public meta::CatalogMetadata { + public: + DrillCatalogMetadata(const ::exec::user::CatalogMetadata& metadata): + meta::CatalogMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasDescription() const { return m_pMetadata.get().has_description(); } + const std::string& getDescription() const { return m_pMetadata.get().description(); } + + bool hasConnect() const { return m_pMetadata.get().has_connect(); } + const std::string& getConnect() const { return m_pMetadata.get().connect(); } + + private: + boost::reference_wrapper<const ::exec::user::CatalogMetadata> m_pMetadata; + }; + + class DrillSchemaMetadata: public meta::SchemaMetadata { + public: + DrillSchemaMetadata(const ::exec::user::SchemaMetadata& metadata): + meta::SchemaMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasOwnerName() const { return m_pMetadata.get().has_owner(); } + const std::string& getOwner() const { return m_pMetadata.get().owner(); } + + bool hasType() const { return m_pMetadata.get().has_type(); } + const std::string& getType() const { return m_pMetadata.get().type(); } + + bool hasMutable() const { return m_pMetadata.get().has_mutable_(); } + const std::string& getMutable() const { return m_pMetadata.get().mutable_(); } + + private: + boost::reference_wrapper<const ::exec::user::SchemaMetadata> m_pMetadata; + }; + + class DrillTableMetadata: public meta::TableMetadata { + public: + DrillTableMetadata(const ::exec::user::TableMetadata& metadata): + meta::TableMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasTableName() const { return m_pMetadata.get().has_table_name(); } + const std::string& getTableName() const { return m_pMetadata.get().table_name(); } + + bool hasType() const { return m_pMetadata.get().has_type(); } + const std::string& getType() const { return m_pMetadata.get().type(); } + + private: + boost::reference_wrapper<const ::exec::user::TableMetadata> m_pMetadata; + }; + + class DrillColumnMetadata: public meta::ColumnMetadata { + public: + DrillColumnMetadata(const ::exec::user::ColumnMetadata& metadata): + meta::ColumnMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasTableName() const { return m_pMetadata.get().has_table_name(); } + const std::string& getTableName() const { return m_pMetadata.get().table_name(); } + + bool hasColumnName() const { return m_pMetadata.get().has_column_name(); } + const std::string& getColumnName() const { return m_pMetadata.get().column_name(); } + + bool hasOrdinalPosition() const { return m_pMetadata.get().has_ordinal_position(); } + std::size_t getOrdinalPosition() const { return m_pMetadata.get().ordinal_position(); } + + bool hasDefaultValue() const { return m_pMetadata.get().has_default_value(); } + const std::string& getDefaultValue() const { return m_pMetadata.get().default_value(); } + + bool hasNullable() const { return m_pMetadata.get().has_is_nullable(); } + bool isNullable() const { return m_pMetadata.get().is_nullable(); } + + bool hasDataType() const { return m_pMetadata.get().has_data_type(); } + const std::string& getDataType() const { return m_pMetadata.get().data_type(); } + + bool hasColumnSize() const { return m_pMetadata.get().has_column_size(); } + std::size_t getColumnSize() const { return m_pMetadata.get().column_size(); } + + bool hasCharMaxLength() const { return m_pMetadata.get().has_char_max_length(); } + std::size_t getCharMaxLength() const { return m_pMetadata.get().char_max_length(); } + + bool hasCharOctetLength() const { return m_pMetadata.get().has_char_octet_length(); } + std::size_t getCharOctetLength() const { return m_pMetadata.get().char_octet_length(); } + + bool hasNumericPrecision() const { return m_pMetadata.get().has_numeric_precision(); } + int32_t getNumericPrecision() const { return m_pMetadata.get().numeric_precision(); } + + bool hasNumericRadix() const { return m_pMetadata.get().has_numeric_precision_radix(); } + int32_t getNumericRadix() const { return m_pMetadata.get().numeric_precision_radix(); } + + bool hasNumericScale() const { return m_pMetadata.get().has_numeric_scale(); } + int32_t getNumericScale() const { return m_pMetadata.get().numeric_scale(); } + + bool hasIntervalType() const { return m_pMetadata.get().has_interval_type(); } + const std::string& getIntervalType() const { return m_pMetadata.get().interval_type(); } + + bool hasIntervalPrecision() const { return m_pMetadata.get().has_interval_precision(); } + int32_t getIntervalPrecision() const { return m_pMetadata.get().interval_precision(); } + + private: + boost::reference_wrapper<const ::exec::user::ColumnMetadata> m_pMetadata; + }; + + class DrillMetadata: public Metadata { + public: + static const std::string s_connectorName; + static const std::string s_connectorVersion; + + static const std::string s_serverName; + static const std::string s_serverVersion; + + static const std::string s_catalogSeparator; + static const std::string s_catalogTerm; + + static const std::string s_identifierQuoteString; + static const std::vector<std::string> s_sqlKeywords; + static const std::vector<std::string> s_numericFunctions; + static const std::string s_schemaTerm; + static const std::string s_searchEscapeString; + static const std::string s_specialCharacters; + static const std::vector<std::string> s_stringFunctions; + static const std::vector<std::string> s_systemFunctions; + static const std::string s_tableTerm; + static const std::vector<std::string> s_dateTimeFunctions; + + DrillMetadata(DrillClientImpl& client): Metadata(), m_client(client) {} + ~DrillMetadata() {} + + DrillClientImpl& client() { return m_client; } + + const std::string& getConnectorName() const { return s_connectorName; }; + const std::string& getConnectorVersion() const { return s_connectorVersion; } + uint32_t getConnectorMajorVersion() const { return DRILL_VERSION_MAJOR; } + uint32_t getConnectorMinorVersion() const { return DRILL_VERSION_MINOR; } + uint32_t getConnectorPatchVersion() const { return DRILL_VERSION_PATCH; } + + const std::string& getServerName() const; + const std::string& getServerVersion() const; + uint32_t getServerMajorVersion() const; + uint32_t getServerMinorVersion() const; + uint32_t getServerPatchVersion() const; + + status_t getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std:: string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + + bool areAllTableSelectable() const { return false; } + bool isCatalogAtStart() const { return true; } + const std::string& getCatalogSeparator() const { return s_catalogSeparator; } + const std::string& getCatalogTerm() const { return s_catalogTerm; } + bool isColumnAliasingSupported() const { return true; } + bool isNullPlusNonNullNull() const { return true; } + bool isConvertSupported(common::MinorType from, common::MinorType to) const; + meta::CorrelationNamesSupport getCorrelationNames() const { return meta::CN_ANY_NAMES; } + bool isReadOnly() const { return false; } + meta::DateTimeLiteralSupport getDateTimeLiteralsSupport() const { + return DL_DATE + | DL_TIME + | DL_TIMESTAMP + | DL_INTERVAL_YEAR + | DL_INTERVAL_MONTH + | DL_INTERVAL_DAY + | DL_INTERVAL_HOUR + | DL_INTERVAL_MINUTE + | DL_INTERVAL_SECOND + | DL_INTERVAL_YEAR_TO_MONTH + | DL_INTERVAL_DAY_TO_HOUR + | DL_INTERVAL_DAY_TO_MINUTE + | DL_INTERVAL_DAY_TO_SECOND + | DL_INTERVAL_HOUR_TO_MINUTE + | DL_INTERVAL_HOUR_TO_SECOND + | DL_INTERVAL_MINUTE_TO_SECOND; + } + + meta::CollateSupport getCollateSupport() const { return meta::C_NONE; }// supported? + meta::GroupBySupport getGroupBySupport() const { return meta::GB_UNRELATED; } + meta::IdentifierCase getIdentifierCase() const { return meta::IC_STORES_UPPER; } // to check? + + const std::string& getIdentifierQuoteString() const { return s_identifierQuoteString; } + const std::vector<std::string>& getSQLKeywords() const { return s_sqlKeywords; } + bool isLikeEscapeClauseSupported() const { return true; } + std::size_t getMaxBinaryLiteralLength() const { return 0; } + std::size_t getMaxCatalogNameLength() const { return 0; } + std::size_t getMaxCharLiteralLength() const { return 0; } + std::size_t getMaxColumnNameLength() const { return 0; } + std::size_t getMaxColumnsInGroupBy() const { return 0; } + std::size_t getMaxColumnsInOrderBy() const { return 0; } + std::size_t getMaxColumnsInSelect() const { return 0; } + std::size_t getMaxCursorNameLength() const { return 0; } + std::size_t getMaxLogicalLobSize() const { return 0; } + std::size_t getMaxStatements() const { return 0; } + std::size_t getMaxRowSize() const { return 0; } + bool isBlobIncludedInMaxRowSize() const { return true; } + std::size_t getMaxSchemaNameLength() const { return 0; } + std::size_t getMaxStatementLength() const { return 0; } + std::size_t getMaxTableNameLength() const { return 0; } + std::size_t getMaxTablesInSelect() const { return 0; } + std::size_t getMaxUserNameLength() const { return 0; } + meta::NullCollation getNullCollation() const { return meta::NC_AT_END; } + const std::vector<std::string>& getNumericFunctions() const { return s_numericFunctions; } + meta::OuterJoinSupport getOuterJoinSupport() const { return meta::OJ_LEFT + | meta::OJ_RIGHT + | meta::OJ_FULL; + } + bool isUnrelatedColumnsInOrderBySupported() const { return true; } + meta::QuotedIdentifierCase getQuotedIdentifierCase() const { return meta::QIC_SUPPORTS_MIXED; } + const std::string& getSchemaTerm() const { return s_schemaTerm; } + const std::string& getSearchEscapeString() const { return s_searchEscapeString; } + const std::string& getSpecialCharacters() const { return s_specialCharacters; } + const std::vector<std::string>& getStringFunctions() const { return s_stringFunctions; } + meta::SubQuerySupport getSubQuerySupport() const { return SQ_CORRELATED + | SQ_IN_COMPARISON + | SQ_IN_EXISTS + | SQ_IN_QUANTIFIED; + } + const std::vector<std::string>& getSystemFunctions() const { return s_systemFunctions; } + const std::string& getTableTerm() const { return s_tableTerm; } + const std::vector<std::string>& getDateTimeFunctions() const { return s_dateTimeFunctions; } + bool isTransactionSupported() const { return false; } + meta::UnionSupport getUnionSupport() const { return meta::U_UNION | meta::U_UNION_ALL; } + bool isSelectForUpdateSupported() const { return false; } + + private: + DrillClientImpl& m_client; + }; +} // namespace meta +} // namespace Drill + +#endif // DRILL_METADATA diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp index c6c033b79..6e1329314 100644 --- a/contrib/native/client/src/clientlib/recordBatch.cpp +++ b/contrib/native/client/src/clientlib/recordBatch.cpp @@ -17,6 +17,7 @@ */ #include "drill/common.hpp" +#include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "utils.hpp" #include "../protobuf/User.pb.h" @@ -403,17 +404,6 @@ bool RecordBatch::isLastChunk(){ -void FieldMetadata::set(const exec::shared::SerializedField& f){ - m_name=f.name_part().name(); - m_minorType=f.major_type().minor_type(); - m_dataMode=f.major_type().mode(); - m_valueCount=f.value_count(); - m_scale=f.major_type().scale(); - m_precision=f.major_type().precision(); - m_bufferLength=f.buffer_length(); -} - - void DateHolder::load(){ m_year=1970; m_month=1; diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp deleted file mode 100644 index d3cf50cb0..000000000 --- a/contrib/native/client/src/clientlib/rpcDecoder.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <iostream> -#include <google/protobuf/io/coded_stream.h> -#include "drill/common.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" -#include "rpcMessage.hpp" - -namespace Drill{ - -// return the number of bytes we have read -int RpcDecoder::LengthDecode(const uint8_t* buf, uint32_t* p_length) { - - using google::protobuf::io::CodedInputStream; - - // read the frame to get the length of the message and then - - CodedInputStream* cis = new CodedInputStream(buf, 5); // read 5 bytes at most - - int pos0 = cis->CurrentPosition(); // for debugging - cis->ReadVarint32(p_length); - - #ifdef CODER_DEBUG - cerr << "p_length = " << *p_length << endl; - #endif - - int pos1 = cis->CurrentPosition(); - - #ifdef CODER_DEBUG - cerr << "Reading full length " << *p_length << endl; - #endif - assert( (pos1-pos0) == getRawVarintSize(*p_length)); - delete cis; - return (pos1-pos0); -} - -// TODO: error handling -// -// - assume that the entire message is in the buffer and the buffer is constrained to this message -// - easy to handle with raw arry in C++ -int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) { - using google::protobuf::io::CodedInputStream; - - // if(!ctx.channel().isOpen()){ return; } - - #ifdef EXTRA_DEBUGGING - std::cerr << "\nInbound rpc message received." << std::endl; - #endif - - CodedInputStream* cis = new CodedInputStream(buf, length); - - - int pos0 = cis->CurrentPosition(); // for debugging - - int len_limit = cis->PushLimit(length); - - uint32_t header_length = 0; - cis->ExpectTag(RpcEncoder::HEADER_TAG); - cis->ReadVarint32(&header_length); - - #ifdef CODER_DEBUG - cerr << "Reading header length " << header_length << ", post read index " << cis->CurrentPosition() << endl; - #endif - - exec::rpc::RpcHeader header; - int header_limit = cis->PushLimit(header_length); - header.ParseFromCodedStream(cis); - cis->PopLimit(header_limit); - msg.m_has_mode = header.has_mode(); - msg.m_mode = header.mode(); - msg.m_coord_id = header.coordination_id(); - msg.m_has_rpc_type = header.has_rpc_type(); - msg.m_rpc_type = header.rpc_type(); - - //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex()); - - // read the protobuf body into a buffer. - cis->ExpectTag(RpcEncoder::PROTOBUF_BODY_TAG); - uint32_t p_body_length = 0; - cis->ReadVarint32(&p_body_length); - - #ifdef CODER_DEBUG - cerr << "Reading protobuf body length " << p_body_length << ", post read index " << cis->CurrentPosition() << endl; - #endif - - msg.m_pbody.resize(p_body_length); - cis->ReadRaw(msg.m_pbody.data(),p_body_length); - - - // read the data body. - if (cis->BytesUntilLimit() > 0 ) { - #ifdef CODER_DEBUG - cerr << "Reading raw body, buffer has "<< cis->BytesUntilLimit() << " bytes available, current possion "<< cis->CurrentPosition() << endl; - #endif - cis->ExpectTag(RpcEncoder::RAW_BODY_TAG); - uint32_t d_body_length = 0; - cis->ReadVarint32(&d_body_length); - - if(cis->BytesUntilLimit() != d_body_length) { - #ifdef CODER_DEBUG - cerr << "Expected to receive a raw body of " << d_body_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl; - #endif - } - //msg.m_dbody.resize(d_body_length); - //cis->ReadRaw(msg.m_dbody.data(), d_body_length); - uint32_t currPos=cis->CurrentPosition(); - cis->GetDirectBufferPointer((const void**)&msg.m_dbody, (int*)&d_body_length); - assert(msg.m_dbody==buf+currPos); - cis->Skip(d_body_length); - #ifdef CODER_DEBUG - cerr << "Read raw body of " << d_body_length << " bytes" << endl; - #endif - } else { - #ifdef CODER_DEBUG - cerr << "No need to read raw body, no readable bytes left." << endl; - #endif - } - cis->PopLimit(len_limit); - - - // return the rpc message. - // move the reader index forward so the next rpc call won't try to work with it. - // buffer.skipBytes(dBodyLength); - // messageCounter.incrementAndGet(); - #ifdef CODER_DEBUG - cerr << "Inbound Rpc Message Decoded " << msg << endl; - #endif - - int pos1 = cis->CurrentPosition(); - assert((pos1-pos0) == length); - delete cis; - return (pos1-pos0); -} - -}//namespace Drill diff --git a/contrib/native/client/src/clientlib/rpcEncoder.cpp b/contrib/native/client/src/clientlib/rpcEncoder.cpp deleted file mode 100644 index 2f354d7a7..000000000 --- a/contrib/native/client/src/clientlib/rpcEncoder.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/message_lite.h> -#include <google/protobuf/wire_format_lite.h> - -#include "drill/common.hpp" -#include "rpcEncoder.hpp" -#include "rpcMessage.hpp" - -namespace Drill{ - -using google::protobuf::internal::WireFormatLite; -using exec::rpc::CompleteRpcMessage; - -const uint32_t RpcEncoder::HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG); -const uint32_t RpcEncoder::PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG); -const uint32_t RpcEncoder::RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG); - - -bool RpcEncoder::Encode(DataBuf& buf, OutBoundRpcMessage& msg) { - using exec::rpc::RpcHeader; - using google::protobuf::io::CodedOutputStream; - using google::protobuf::io::ArrayOutputStream; - // Todo: - // - // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();` - // - builder pattern - // - #ifdef CODER_DEBUG - cerr << "\nEncoding outbound message " << msg << endl; - #endif - - RpcHeader header; - header.set_mode(msg.m_mode); - header.set_coordination_id(msg.m_coord_id); - header.set_rpc_type(msg.m_rpc_type); - - // calcute the length of the message - int header_length = header.ByteSize(); - int proto_body_length = msg.m_pbody->ByteSize(); - int full_length = HEADER_TAG_LENGTH + getRawVarintSize(header_length) + header_length + \ - PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(proto_body_length) + proto_body_length; - - /* - if(raw_body_length > 0) { - full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length); - } - */ - - buf.resize(full_length + getRawVarintSize(full_length)); - ArrayOutputStream* os = new ArrayOutputStream(buf.data(), buf.size()); - CodedOutputStream* cos = new CodedOutputStream(os); - - - #ifdef CODER_DEBUG - cerr << "Writing full length " << full_length << endl; - #endif - - // write full length first (this is length delimited stream). - cos->WriteVarint32(full_length); - - #ifdef CODER_DEBUG - cerr << "Writing header length " << header_length << endl; - #endif - - cos->WriteVarint32(HEADER_TAG); - cos->WriteVarint32(header_length); - - header.SerializeToCodedStream(cos); - - // write protobuf body length and body - #ifdef CODER_DEBUG - cerr << "Writing protobuf body length " << proto_body_length << endl; - #endif - - cos->WriteVarint32(PROTOBUF_BODY_TAG); - cos->WriteVarint32(proto_body_length); - msg.m_pbody->SerializeToCodedStream(cos); - - delete os; - delete cos; - - // Done! no read to write data body for client - return true; -} - -} // namespace Drill diff --git a/contrib/native/client/src/clientlib/rpcEncoder.hpp b/contrib/native/client/src/clientlib/rpcEncoder.hpp deleted file mode 100644 index a4a721613..000000000 --- a/contrib/native/client/src/clientlib/rpcEncoder.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef RPC_ENCODER_H -#define RPC_ENCODER_H - -#include "rpcMessage.hpp" - -namespace Drill { - -class RpcEncoder { - public: - RpcEncoder() {} - ~RpcEncoder() { } - bool Encode(DataBuf& buf,OutBoundRpcMessage& msg); - static const uint32_t HEADER_TAG; - static const uint32_t PROTOBUF_BODY_TAG; - static const uint32_t RAW_BODY_TAG; - static const uint32_t HEADER_TAG_LENGTH; - static const uint32_t PROTOBUF_BODY_TAG_LENGTH; - static const uint32_t RAW_BODY_TAG_LENGTH; -}; - -// copy from java code -inline int getRawVarintSize(uint32_t value) { - int count = 0; - while (true) { - if ((value & ~0x7F) == 0) { - count++; - return count; - } else { - count++; - value >>= 7; - } - } -} - -} // namespace Drill -#endif diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp new file mode 100644 index 000000000..13cd7a82e --- /dev/null +++ b/contrib/native/client/src/clientlib/rpcMessage.cpp @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message_lite.h> +#include <google/protobuf/wire_format_lite.h> + +#include "drill/common.hpp" +#include "rpcMessage.hpp" + +namespace Drill{ +namespace rpc { + + +namespace { +using google::protobuf::internal::WireFormatLite; +using google::protobuf::io::CodedOutputStream; +using exec::rpc::CompleteRpcMessage; + +static const uint32_t HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t HEADER_TAG_LENGTH = CodedOutputStream::VarintSize32(HEADER_TAG); +static const uint32_t PROTOBUF_BODY_TAG_LENGTH = CodedOutputStream::VarintSize32(PROTOBUF_BODY_TAG); +} + +std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) { + using google::protobuf::io::CodedInputStream; + using google::protobuf::io::CodedOutputStream; + + // read the frame to get the length of the message and then + + CodedInputStream cis(buf, 5); // read 5 bytes at most + + int startPos(cis.CurrentPosition()); // for debugging + if (!cis.ReadVarint32(&length)) { + return -1; + } + + #ifdef CODER_DEBUG + std::cerr << "length = " << length << std::endl; + #endif + + int endPos(cis.CurrentPosition()); + + assert((endPos-startPos) == CodedOutputStream::VarintSize32(length)); + return (endPos-startPos); +} + +// TODO: error handling +// +// - assume that the entire message is in the buffer and the buffer is constrained to this message +// - easy to handle with raw array in C++ +bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) { + using google::protobuf::io::CodedInputStream; + + CodedInputStream cis(buf, length); + + int startPos(cis.CurrentPosition()); // for debugging + + CodedInputStream::Limit len_limit(cis.PushLimit(length)); + + uint32_t header_length(0); + + if (!cis.ExpectTag(HEADER_TAG)) { + return false; + } + + if (!cis.ReadVarint32(&header_length)) { + return false; + } + + #ifdef CODER_DEBUG + std::cerr << "Reading header length " << header_length << ", post read index " << cis.CurrentPosition() << std::endl; + #endif + + exec::rpc::RpcHeader header; + CodedInputStream::Limit header_limit(cis.PushLimit(header_length)); + + if (!header.ParseFromCodedStream(&cis)) { + return false; + } + cis.PopLimit(header_limit); + + msg.m_has_mode = header.has_mode(); + msg.m_mode = header.mode(); + msg.m_coord_id = header.coordination_id(); + msg.m_has_rpc_type = header.has_rpc_type(); + msg.m_rpc_type = header.rpc_type(); + + // read the protobuf body into a buffer. + if (!cis.ExpectTag(PROTOBUF_BODY_TAG)) { + return false; + } + + uint32_t pbody_length(0); + if (!cis.ReadVarint32(&pbody_length)) { + return false; + } + + #ifdef CODER_DEBUG + std::cerr << "Reading protobuf body length " << pbody_length << ", post read index " << cis.CurrentPosition() << std::endl; + #endif + + msg.m_pbody.resize(pbody_length); + if (!cis.ReadRaw(msg.m_pbody.data(), pbody_length)) { + return false; + } + + // read the data body. + if (cis.BytesUntilLimit() > 0 ) { + #ifdef CODER_DEBUG + std::cerr << "Reading raw body, buffer has "<< std::cis->BytesUntilLimit() << " bytes available, current possion "<< cis.CurrentPosition() << endl; + #endif + if (!cis.ExpectTag(RAW_BODY_TAG)) { + return false; + } + + uint32_t dbody_length = 0; + if (!cis.ReadVarint32(&dbody_length)) { + return false; + } + + if(cis.BytesUntilLimit() != dbody_length) { + #ifdef CODER_DEBUG + cerr << "Expected to receive a raw body of " << dbody_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl; + #endif + return false; + } + + int currPos(cis.CurrentPosition()); + int size; + cis.GetDirectBufferPointer(const_cast<const void**>(reinterpret_cast<void**>(&msg.m_dbody)), &size); + cis.Skip(size); + + assert(dbody_length == size); + assert(msg.m_dbody==buf+currPos); + #ifdef CODER_DEBUG + cerr << "Read raw body of " << dbody_length << " bytes" << endl; + #endif + } else { + #ifdef CODER_DEBUG + cerr << "No need to read raw body, no readable bytes left." << endl; + #endif + } + cis.PopLimit(len_limit); + + + // return the rpc message. + // move the reader index forward so the next rpc call won't try to work with it. + // buffer.skipBytes(dBodyLength); + // messageCounter.incrementAndGet(); + #ifdef CODER_DEBUG + std::cerr << "Inbound Rpc Message Decoded " << msg << std::endl; + #endif + + int endPos = cis.CurrentPosition(); + assert((endPos-startPos) == length); + return true; +} + + +bool encode(DataBuf& buf, const OutBoundRpcMessage& msg) { + using exec::rpc::RpcHeader; + using google::protobuf::io::CodedOutputStream; + // Todo: + // + // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();` + // - builder pattern + // + #ifdef CODER_DEBUG + std::cerr << "Encoding outbound message " << msg << std::endl; + #endif + + RpcHeader header; + header.set_mode(msg.m_mode); + header.set_coordination_id(msg.m_coord_id); + header.set_rpc_type(msg.m_rpc_type); + + // calcute the length of the message + int header_length = header.ByteSize(); + int proto_body_length = msg.m_pbody->ByteSize(); + int full_length = HEADER_TAG_LENGTH + CodedOutputStream::VarintSize32(header_length) + header_length + \ + PROTOBUF_BODY_TAG_LENGTH + CodedOutputStream::VarintSize32(proto_body_length) + proto_body_length; + + /* + if(raw_body_length > 0) { + full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length); + } + */ + + buf.resize(full_length + CodedOutputStream::VarintSize32(full_length)); + + uint8_t* data = buf.data(); + + #ifdef CODER_DEBUG + std::cerr << "Writing full length " << full_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(full_length, data); + + #ifdef CODER_DEBUG + std::cerr << "Writing header length " << header_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(HEADER_TAG, data); + data = CodedOutputStream::WriteVarint32ToArray(header_length, data); + + data = header.SerializeWithCachedSizesToArray(data); + + // write protobuf body length and body + #ifdef CODER_DEBUG + std::cerr << "Writing protobuf body length " << proto_body_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(PROTOBUF_BODY_TAG, data); + data = CodedOutputStream::WriteVarint32ToArray(proto_body_length, data); + msg.m_pbody->SerializeWithCachedSizesToArray(data); + + // Done! no read to write data body for client + return true; +} +} // namespace rpc +} // namespace Drill diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp index 669697130..15487e9ba 100644 --- a/contrib/native/client/src/clientlib/rpcMessage.hpp +++ b/contrib/native/client/src/clientlib/rpcMessage.hpp @@ -25,8 +25,8 @@ #include "GeneralRPC.pb.h" namespace Drill { - -class InBoundRpcMessage { +namespace rpc { +struct InBoundRpcMessage { public: exec::rpc::RpcMode m_mode; int m_rpc_type; @@ -39,7 +39,7 @@ class InBoundRpcMessage { bool has_rpc_type() { return m_has_rpc_type; }; }; -class OutBoundRpcMessage { +struct OutBoundRpcMessage { public: exec::rpc::RpcMode m_mode; int m_rpc_type; @@ -49,6 +49,13 @@ class OutBoundRpcMessage { m_mode(mode), m_rpc_type(rpc_type), m_coord_id(coord_id), m_pbody(pbody) { } }; -} +std::size_t lengthDecode(const uint8_t* buf, uint32_t& length); + +bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg); + +bool encode(DataBuf& buf, const OutBoundRpcMessage& msg); + +} // namespace rpc +} // namespace Drill #endif diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp index 1e6a8774e..d3c8f08d7 100644 --- a/contrib/native/client/src/clientlib/utils.cpp +++ b/contrib/native/client/src/clientlib/utils.cpp @@ -22,6 +22,13 @@ #include "logger.hpp" #include "drill/common.hpp" +#if defined _WIN32 || defined _WIN64 +//Windows header files redefine 'max' +#ifdef max +#undef max +#endif +#endif + namespace Drill{ diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp index 3237aa333..4cd8fa5e8 100644 --- a/contrib/native/client/src/clientlib/utils.hpp +++ b/contrib/native/client/src/clientlib/utils.hpp @@ -31,7 +31,6 @@ #undef random #endif #endif -#include <boost/asio/deadline_timer.hpp> #include <boost/random/mersenne_twister.hpp> // for mt19937 #include <boost/random/random_device.hpp> #include <boost/random/uniform_int.hpp> diff --git a/contrib/native/client/src/clientlib/y2038/time64.c b/contrib/native/client/src/clientlib/y2038/time64.c index e0d61c851..bbbabe274 100644 --- a/contrib/native/client/src/clientlib/y2038/time64.c +++ b/contrib/native/client/src/clientlib/y2038/time64.c @@ -110,15 +110,15 @@ static const int safe_years_low[SOLAR_CYCLE_LENGTH] = { }; /* This isn't used, but it's handy to look at */ -static const char dow_year_start[SOLAR_CYCLE_LENGTH] = { - 5, 0, 1, 2, /* 0 2016 - 2019 */ - 3, 5, 6, 0, /* 4 */ - 1, 3, 4, 5, /* 8 1996 - 1998, 1971*/ - 6, 1, 2, 3, /* 12 1972 - 1975 */ - 4, 6, 0, 1, /* 16 */ - 2, 4, 5, 6, /* 20 2036, 2037, 2010, 2011 */ - 0, 2, 3, 4 /* 24 2012, 2013, 2014, 2015 */ -}; +//static const char dow_year_start[SOLAR_CYCLE_LENGTH] = { +// 5, 0, 1, 2, /* 0 2016 - 2019 */ +// 3, 5, 6, 0, /* 4 */ +// 1, 3, 4, 5, /* 8 1996 - 1998, 1971*/ +// 6, 1, 2, 3, /* 12 1972 - 1975 */ +// 4, 6, 0, 1, /* 16 */ +// 2, 4, 5, 6, /* 20 2036, 2037, 2010, 2011 */ +// 0, 2, 3, 4 /* 24 2012, 2013, 2014, 2015 */ +//}; /* Let's assume people are going to be looking for dates in the future. Let's provide some cheats so you can skip ahead. diff --git a/contrib/native/client/src/clientlib/zookeeperClient.cpp b/contrib/native/client/src/clientlib/zookeeperClient.cpp new file mode 100644 index 000000000..535bebcad --- /dev/null +++ b/contrib/native/client/src/clientlib/zookeeperClient.cpp @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/bind.hpp> +#include <drill/drillClient.hpp> +#include "zookeeperClient.hpp" + +#include "errmsgs.hpp" +#include "logger.hpp" + +namespace Drill { +std::string ZookeeperClient::s_defaultDrillPath("/drill/drillbits1"); +static void watcherCallback(zhandle_t *zzh, int type, int state, const char *path, void* context) { + static_cast<ZookeeperClient*>(context)->watcher(zzh, type, state, path, context); +} + +ZookeeperClient::ZookeeperClient(const std::string& drillPath) +: p_zh(), m_state(), m_path(drillPath) { + m_bConnecting=true; + memset(&m_id, 0, sizeof(m_id)); +} + +ZookeeperClient::~ZookeeperClient(){ +} + +ZooLogLevel ZookeeperClient::getZkLogLevel(){ + //typedef enum {ZOO_LOG_LEVEL_ERROR=1, + // ZOO_LOG_LEVEL_WARN=2, + // ZOO_LOG_LEVEL_INFO=3, + // ZOO_LOG_LEVEL_DEBUG=4 + //} ZooLogLevel; + switch(DrillClientConfig::getLogLevel()){ + case LOG_TRACE: + case LOG_DEBUG: + return ZOO_LOG_LEVEL_DEBUG; + case LOG_INFO: + return ZOO_LOG_LEVEL_INFO; + case LOG_WARNING: + return ZOO_LOG_LEVEL_WARN; + case LOG_ERROR: + case LOG_FATAL: + default: + return ZOO_LOG_LEVEL_ERROR; + } + return ZOO_LOG_LEVEL_ERROR; +} + +void ZookeeperClient::watcher(zhandle_t *zzh, int type, int state, const char *path, void*) { + //From cli.c + + /* Be careful using zh here rather than zzh - as this may be mt code + * the client lib may call the watcher before zookeeper_init returns */ + + this->m_state=state; + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) { + } else if (state == ZOO_AUTH_FAILED_STATE) { + this->m_err= getMessage(ERR_CONN_ZKNOAUTH); + this->close(); + } else if (state == ZOO_EXPIRED_SESSION_STATE) { + this->m_err= getMessage(ERR_CONN_ZKEXP); + this->close(); + } + } + // signal the cond var + { + if (state == ZOO_CONNECTED_STATE){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) + } + boost::lock_guard<boost::mutex> bufferLock(this->m_cvMutex); + this->m_bConnecting=false; + } + this->m_cv.notify_one(); +} + +int ZookeeperClient::getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits){ + uint32_t waitTime=30000; // 10 seconds + zoo_set_debug_level(getZkLogLevel()); + zoo_deterministic_conn_order(1); // enable deterministic order + + p_zh = boost::shared_ptr<zhandle_t>(zookeeper_init(connectStr.c_str(), &watcherCallback, waitTime, &m_id, this, 0), zookeeper_close); + if(!p_zh) { + m_err = getMessage(ERR_CONN_ZKFAIL); + return -1; + } + + m_err=""; + //Wait for the completion handler to signal successful connection + boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); + while(this->m_bConnecting) { + if(!this->m_cv.timed_wait(bufferLock, timeout)){ + m_err = getMessage(ERR_CONN_ZKTIMOUT); + return -1; + } + } + + if(m_state!=ZOO_CONNECTED_STATE){ + return -1; + } + + int rc = ZOK; + + struct String_vector drillbitsVector; + rc=zoo_get_children(p_zh.get(), m_path.c_str(), 0, &drillbitsVector); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKERR, rc); + p_zh.reset(); + return -1; + } + + // Make sure we deallocate drillbitsVector properly when we exit + boost::shared_ptr<String_vector> guard(&drillbitsVector, deallocate_String_vector); + + if(drillbitsVector.count > 0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << drillbitsVector.count << " drillbits in cluster (" + << connectStr << "/" << m_path + << ")." <<std::endl;) + for(int i=0; i<drillbitsVector.count; i++){ + drillbits.push_back(drillbitsVector.data[i]); + } + for(int i=0; i<drillbits.size(); i++){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;) + } + } + return 0; +} + +int ZookeeperClient::getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint){ + int rc = ZOK; + // pick the drillbit at 'index' + std::string s(m_path + "/" + drillbit); + int buffer_len=MAX_CONNECT_STR; + char buffer[MAX_CONNECT_STR+1]; + struct Stat stat; + buffer[MAX_CONNECT_STR]=0; + rc= zoo_get(p_zh.get(), s.c_str(), 0, buffer, &buffer_len, &stat); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKDBITERR, rc); + return -1; + } + exec::DrillServiceInstance drillServiceInstance; + drillServiceInstance.ParseFromArray(buffer, buffer_len); + endpoint=drillServiceInstance.endpoint(); + + return 0; +} + +void ZookeeperClient::close(){ + p_zh.reset(); +} + +} /* namespace Drill */ diff --git a/contrib/native/client/src/clientlib/zookeeperClient.hpp b/contrib/native/client/src/clientlib/zookeeperClient.hpp new file mode 100644 index 000000000..25d6af5a5 --- /dev/null +++ b/contrib/native/client/src/clientlib/zookeeperClient.hpp @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include <zookeeper.h> +#else +#include <zookeeper/zookeeper.h> +#endif + +#include <boost/shared_ptr.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> + +#include "UserBitShared.pb.h" + + +#ifndef ZOOKEEPER_CLIENT_H +#define ZOOKEEPER_CLIENT_H + +namespace Drill { +class ZookeeperClient{ + public: + static std::string s_defaultDrillPath; + + ZookeeperClient(const std::string& drillPath = s_defaultDrillPath); + ~ZookeeperClient(); + static ZooLogLevel getZkLogLevel(); + // comma separated host:port pairs, each corresponding to a zk + // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 + void close(); + const std::string& getError() const{return m_err;} + // return unshuffled list of drillbits + int getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits); + // picks the index drillbit and returns the corresponding endpoint object + int getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint); + + void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context); + + private: + boost::shared_ptr<zhandle_t> p_zh; + clientid_t m_id; + int m_state; + std::string m_err; + + boost::mutex m_cvMutex; + // Condition variable to signal connection callback has been processed + boost::condition_variable m_cv; + bool m_bConnecting; + std::string m_path; + +}; +} /* namespace Drill */ + + + +#endif /* ZOOKEEPER_H */ diff --git a/contrib/native/client/src/include/drill/collections.hpp b/contrib/native/client/src/include/drill/collections.hpp new file mode 100644 index 000000000..9fbfcc5e2 --- /dev/null +++ b/contrib/native/client/src/include/drill/collections.hpp @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DRILL_COLLECTIONS_H +#define _DRILL_COLLECTIONS_H + +#include <iterator> + +#include <boost/noncopyable.hpp> +#include <boost/shared_ptr.hpp> + +namespace Drill { +namespace impl { + +/** + * Interface for internal iterators + */ +template<typename T> +class DrillIteratorImpl: private boost::noncopyable { +public: + typedef DrillIteratorImpl<T> iterator; + typedef boost::shared_ptr<iterator> iterator_ptr; + + typedef T value_type; + typedef value_type& reference; + typedef value_type* pointer; + + virtual ~DrillIteratorImpl() {}; + + // To allow conversion from non-const to const types + virtual operator typename DrillIteratorImpl<const T>::iterator_ptr() const = 0; + + virtual reference operator*() const = 0; + virtual pointer operator->() const = 0; + + virtual iterator& operator++() = 0; + + virtual bool operator==(const iterator& x) const = 0; + virtual bool operator!=(const iterator& x) const = 0; +}; + +/** + * Interface for internal collections + */ +template<typename T> +class DrillCollectionImpl: private boost::noncopyable { +public: + // STL-like iterator typedef + typedef DrillIteratorImpl<T> iterator; + typedef boost::shared_ptr<iterator> iterator_ptr; + typedef DrillIteratorImpl<const T> const_iterator; + typedef boost::shared_ptr<const_iterator> const_iterator_ptr; + + typedef T value_type; + typedef value_type& reference; + typedef const value_type& const_reference; + typedef value_type* pointer; + typedef const value_type* const_pointer; + typedef int size_type; + + virtual ~DrillCollectionImpl() {} + + virtual iterator_ptr begin() = 0; + virtual const_iterator_ptr begin() const = 0; + virtual iterator_ptr end() = 0; + virtual const_iterator_ptr end() const = 0; +}; +} // namespace internal + +template<typename T> +class DrillCollection; + +template<typename T> +class DrillIterator: public std::iterator<std::input_iterator_tag, T> { +public: + typedef impl::DrillIteratorImpl<T> Impl; + typedef boost::shared_ptr<Impl> ImplPtr; + + typedef DrillIterator<T> iterator; + typedef std::iterator<std::input_iterator_tag, T> superclass; + typedef typename superclass::reference reference; + typedef typename superclass::pointer pointer; + + // Default constructor + DrillIterator(): m_pImpl() {}; + ~DrillIterator() {} + + // Iterators are CopyConstructible and CopyAssignable + DrillIterator(const iterator& it): m_pImpl(it.m_pImpl) {} + iterator& operator=(const iterator& it) { + m_pImpl = it.m_pImpl; + return *this; + } + + template<typename U> + DrillIterator(const DrillIterator<U>& it): m_pImpl(*it.m_pImpl) {} + + reference operator*() const { return m_pImpl->operator*(); } + pointer operator->() const { return m_pImpl->operator->(); } + + iterator& operator++() { m_pImpl->operator++(); return *this; } + + bool operator==(const iterator& x) const { + if (m_pImpl == x.m_pImpl) { + return true; + } + return m_pImpl && m_pImpl->operator==(*x.m_pImpl); + } + + bool operator!=(const iterator& x) const { + if (m_pImpl == x.m_pImpl) { + return false; + } + return !m_pImpl || m_pImpl->operator!=(*x.m_pImpl); + } + +private: + template<typename U> + friend class DrillCollection; + template<typename U> + friend class DrillIterator; + + ImplPtr m_pImpl; + + template<typename U> + DrillIterator(const boost::shared_ptr<impl::DrillIteratorImpl<U> >& pImpl): m_pImpl(pImpl) {} +}; + +template<typename T> +class DrillCollection { +public: + typedef impl::DrillCollectionImpl<T> Impl; + typedef boost::shared_ptr<Impl> ImplPtr; + + // STL-like iterator typedef + typedef DrillIterator<T> iterator; + typedef DrillIterator<const T> const_iterator; + typedef T value_type; + typedef value_type& reference; + typedef const value_type& const_reference; + typedef value_type* pointer; + typedef const value_type* const_pointer; + typedef int size_type; + + iterator begin() { return iterator(m_pImpl->begin()); } + const_iterator begin() const { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->begin()); } + iterator end() { return iterator(m_pImpl->end()); } + const_iterator end() const { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->end()); } + +protected: + DrillCollection(const ImplPtr& impl): m_pImpl(impl) {} + + Impl& operator*() { return *m_pImpl; } + const Impl& operator*() const { return *m_pImpl; } + Impl* operator->() { return m_pImpl.get(); } + const Impl* operator->() const { return m_pImpl.get(); } + +private: + ImplPtr m_pImpl; +}; + + +} /* namespace Drill */ +#endif /* _DRILL_COLLECTIONS_H */ diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index a617dc71f..6d3816e7c 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -20,6 +20,24 @@ #ifndef _COMMON_H_ #define _COMMON_H_ +#if defined _WIN32 || defined __CYGWIN__ + #ifdef DRILL_CLIENT_EXPORTS + #define DECLSPEC_DRILL_CLIENT __declspec(dllexport) + #else + #ifdef USE_STATIC_LIBDRILL + #define DECLSPEC_DRILL_CLIENT + #else + #define DECLSPEC_DRILL_CLIENT __declspec(dllimport) + #endif + #endif +#else + #if __GNUC__ >= 4 + #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default"))) + #else + #define DECLSPEC_DRILL_CLIENT + #endif +#endif + #ifdef _WIN32 // The order of inclusion is important. Including winsock2 before everything else // ensures that the correct typedefs are defined and that the older typedefs defined diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp index a74f4bdc7..5e59885d3 100644 --- a/contrib/native/client/src/include/drill/drillClient.hpp +++ b/contrib/native/client/src/include/drill/drillClient.hpp @@ -23,27 +23,9 @@ #include <vector> #include <boost/thread.hpp> #include "drill/common.hpp" +#include "drill/collections.hpp" #include "drill/protobuf/Types.pb.h" - -#if defined _WIN32 || defined __CYGWIN__ - #ifdef DRILL_CLIENT_EXPORTS - #define DECLSPEC_DRILL_CLIENT __declspec(dllexport) - #else - #ifdef USE_STATIC_LIBDRILL - #define DECLSPEC_DRILL_CLIENT - #else - #define DECLSPEC_DRILL_CLIENT __declspec(dllimport) - #endif - #endif -#else - #if __GNUC__ >= 4 - #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default"))) - #else - #define DECLSPEC_DRILL_CLIENT - #endif -#endif - namespace exec{ namespace shared{ class DrillPBError; @@ -57,6 +39,7 @@ class DrillClientImplBase; class DrillClientImpl; class DrillClientQueryResult; class FieldMetadata; +class PreparedStatement; class RecordBatch; class SchemaDef; @@ -188,6 +171,17 @@ typedef status_t (*pfnQueryResultsListener)(QueryHandle_t ctx, RecordBatch* b, D */ typedef status_t (*pfnSchemaListener)(void* ctx, FieldDefPtr f, DrillClientError* err); +/** + * The prepared statement creation listener + * + * This function is called when a prepared statement is created, or if an error occurs during the prepared statement creation. + * This callback is only invoked once. + * @param[in] ctx the listener context provided to getColumns + * @param[in] pstmt the prepared statement handle, NULL in case of error + * @param[in] err an error object, NULL in case of success + */ +typedef status_t (*pfnPreparedStatementListener)(void* ctx, PreparedStatement* pstmt, DrillClientError* err); + /* * A Record Iterator instance is returned by the SubmitQuery class. Calls block until some data * is available, or until all data has been returned. @@ -244,6 +238,938 @@ class DECLSPEC_DRILL_CLIENT RecordIterator{ // first record batch with this definition }; +namespace meta { + // Set of template functions to create bitmasks + template<typename T> + inline T + operator&(T __a, T __b) + { return T(static_cast<int>(__a) & static_cast<int>(__b)); } + template<typename T> + inline T + operator|(T __a, T __b) + { return T(static_cast<int>(__a) | static_cast<int>(__b)); } + template<typename T> + inline T + operator^(T __a, T __b) + { return T(static_cast<int>(__a) ^ static_cast<int>(__b)); } + template<typename T> + inline T& + operator|=(T& __a, T __b) + { return __a = __a | __b; } + template<typename T> + inline T& + operator&=(T& __a, T __b) + { return __a = __a & __b; } + template<typename T> + inline T& + operator^=(T& __a, T __b) + { return __a = __a ^ __b; } + template<typename T> + inline T + operator~(T __a) + { return T(~static_cast<int>(__a)); } + + /* + * Internal type for Date/Time literals support + */ + enum _DateTimeLiteralSupport { + _DL_NONE = 0, + _DL_DATE = 1 << 1L, + _DL_TIME = 1 << 2L, + _DL_TIMESTAMP = 1 << 3L, + _DL_INTERVAL_YEAR = 1 << 4L, + _DL_INTERVAL_MONTH = 1 << 5L, + _DL_INTERVAL_DAY = 1 << 6L, + _DL_INTERVAL_HOUR = 1 << 7L, + _DL_INTERVAL_MINUTE = 1 << 8L, + _DL_INTERVAL_SECOND = 1 << 9L, + _DL_INTERVAL_YEAR_TO_MONTH = 1 << 10L, + _DL_INTERVAL_DAY_TO_HOUR = 1 << 11L, + _DL_INTERVAL_DAY_TO_MINUTE = 1 << 12L, + _DL_INTERVAL_DAY_TO_SECOND = 1 << 13L, + _DL_INTERVAL_HOUR_TO_MINUTE = 1 << 14L, + _DL_INTERVAL_HOUR_TO_SECOND = 1 << 15L, + _DL_INTERVAL_MINUTE_TO_SECOND = 1 << 16L + }; + + template inline _DateTimeLiteralSupport operator&(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b); + template inline _DateTimeLiteralSupport operator|(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b); + template inline _DateTimeLiteralSupport operator^(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b); + + template inline _DateTimeLiteralSupport& operator&=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b); + template inline _DateTimeLiteralSupport& operator|=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b); + template inline _DateTimeLiteralSupport& operator^=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b); + + template inline _DateTimeLiteralSupport operator~(_DateTimeLiteralSupport __a); + + /** + * Date time literal support flags + */ + typedef _DateTimeLiteralSupport DateTimeLiteralSupport; + + /** Does not support Date/Time literals */ + static const DateTimeLiteralSupport DL_NONE = _DL_NONE; + /** Supports DATE literal */ + static const DateTimeLiteralSupport DL_DATE = _DL_DATE; + /** Supports TIME literal */ + static const DateTimeLiteralSupport DL_TIME = _DL_TIME; + /** Supports TIMESTAMP literal */ + static const DateTimeLiteralSupport DL_TIMESTAMP = _DL_TIMESTAMP; + /** Supports INTERVAL YEAR literal */ + static const DateTimeLiteralSupport DL_INTERVAL_YEAR = _DL_INTERVAL_YEAR; + /** Supports INTERVAL MONTH literal */ + static const DateTimeLiteralSupport DL_INTERVAL_MONTH = _DL_INTERVAL_MONTH; + /** Supports INTERVAL DAY literal */ + static const DateTimeLiteralSupport DL_INTERVAL_DAY = _DL_INTERVAL_DAY; + /** Supports INTERVAL HOUR literal */ + static const DateTimeLiteralSupport DL_INTERVAL_HOUR = _DL_INTERVAL_HOUR; + /** Supports INTERVAL MINUTE literal */ + static const DateTimeLiteralSupport DL_INTERVAL_MINUTE = _DL_INTERVAL_MINUTE; + /** Supports INTERVAL SECOND literal */ + static const DateTimeLiteralSupport DL_INTERVAL_SECOND = _DL_INTERVAL_SECOND; + /** Supports INTERVAL YEAR TO MONTH literal */ + static const DateTimeLiteralSupport DL_INTERVAL_YEAR_TO_MONTH = _DL_INTERVAL_YEAR_TO_MONTH; + /** Supports INTERVAL DAY TO HOUR literal */ + static const DateTimeLiteralSupport DL_INTERVAL_DAY_TO_HOUR = _DL_INTERVAL_DAY_TO_HOUR; + /** Supports INTERVAL DAY TO MINUTE literal */ + static const DateTimeLiteralSupport DL_INTERVAL_DAY_TO_MINUTE = _DL_INTERVAL_DAY_TO_MINUTE; + /** Supports INTERVAL DAY TO SECOND literal */ + static const DateTimeLiteralSupport DL_INTERVAL_DAY_TO_SECOND = _DL_INTERVAL_DAY_TO_SECOND; + /** Supports INTERVAL HOUR TO MINUTE literal */ + static const DateTimeLiteralSupport DL_INTERVAL_HOUR_TO_MINUTE = _DL_INTERVAL_HOUR_TO_MINUTE; + /** Supports INTERVAL HOUR TO SECOND literal */ + static const DateTimeLiteralSupport DL_INTERVAL_HOUR_TO_SECOND = _DL_INTERVAL_HOUR_TO_SECOND; + /** Supports INTERVAL MINUTE TO SECOND literal */ + static const DateTimeLiteralSupport DL_INTERVAL_MINUTE_TO_SECOND = _DL_INTERVAL_MINUTE_TO_SECOND; + + /* + * Internal type for COLLATE support + */ + enum _CollateSupport { + _C_NONE = 0, + _C_GROUPBY = 1 << 1L + }; + + template inline _CollateSupport operator&(_CollateSupport __a, _CollateSupport __b); + template inline _CollateSupport operator|(_CollateSupport __a, _CollateSupport __b); + template inline _CollateSupport operator^(_CollateSupport __a, _CollateSupport __b); + + template inline _CollateSupport& operator&=(_CollateSupport& __a, _CollateSupport __b); + template inline _CollateSupport& operator|=(_CollateSupport& __a, _CollateSupport __b); + template inline _CollateSupport& operator^=(_CollateSupport& __a, _CollateSupport __b); + + template inline _CollateSupport operator~(_CollateSupport __a); + + + /** + * COLLATE support flags + */ + typedef _CollateSupport CollateSupport; + static const CollateSupport C_NONE = _C_NONE; /**< COLLATE clauses are not supported */ + static const CollateSupport C_GROUPBY = _C_GROUPBY; /**< a COLLATE clause can be added after each grouping column */ + + /** + * Correlation names support flags + */ + enum CorrelationNamesSupport { + CN_NONE = 1, /**< Correlation names are not supported */ + CN_DIFFERENT_NAMES = 2, /**< Correlation names are supported, but names have to be different + from the tables they represent */ + CN_ANY_NAMES = 3 /**< Correlation names are supported with no restriction on names */ + }; + + /** + * Group by support + */ + enum GroupBySupport { + GB_NONE, /**< Do not support GROUP BY */ + GB_SELECT_ONLY, /**< Only support GROUP BY clause with non aggregated columns in the select list */ + GB_BEYOND_SELECT,/**< Support GROUP BY clauses with columns absent from the select list + if all the non-aggregated column from the select list are also added. */ + GB_UNRELATED /** Support GROUP BY clauses with columns absent from the select list */ + }; + + /** + * Identified case support + */ + enum IdentifierCase { + IC_STORES_LOWER, /**< Mixed case unquoted SQL identifier are treated as + case insensitive and stored in lower case */ + IC_STORES_MIXED, /**< Mixed case unquoted SQL identifier are treated as + case insensitive and stored in mixed case */ + IC_STORES_UPPER, /**< Mixed case unquoted SQL identifier are treated as + case insensitive and stored in upper case */ + IC_SUPPORTS_MIXED /**< Mixed case unquoted SQL identifier are treated as + case sensitive and stored in mixed case */ + }; + + /** + * Null collation support + */ + enum NullCollation { + NC_AT_START,/**< NULL values are sorted at the start regardless of the order*/ + NC_AT_END, /**< NULL values are sorted at the end regardless of the order*/ + NC_HIGH, /**< NULL is the highest value */ + NC_LOW /**< NULL is the lowest value */ + }; + + + /* + * Internal type for Outer join support flags + */ + enum _OuterJoinSupport { + _OJ_NONE = 0, //!< _OJ_NONE + _OJ_LEFT = 1 << 1L,//!< _OJ_LEFT + _OJ_RIGHT = 1 << 2L,//!< _OJ_RIGHT + _OJ_FULL = 1 << 3L,//!< _OJ_FULL + _OJ_NESTED = 1 << 4L,//!< _OJ_NESTED + _OJ_NOT_ORDERED = 1 << 5L,//!< _OJ_NOT_ORDERED + _OJ_INNER = 1 << 6L,//!< _OJ_INNER + _OJ_ALL_COMPARISON_OPS = 1 << 7L //!< _OJ_ALL_COMPARISON_OPS + }; + + template inline _OuterJoinSupport operator&(_OuterJoinSupport __a, _OuterJoinSupport __b); + template inline _OuterJoinSupport operator|(_OuterJoinSupport __a, _OuterJoinSupport __b); + template inline _OuterJoinSupport operator^(_OuterJoinSupport __a, _OuterJoinSupport __b); + + template inline _OuterJoinSupport& operator&=(_OuterJoinSupport& __a, _OuterJoinSupport __b); + template inline _OuterJoinSupport& operator|=(_OuterJoinSupport& __a, _OuterJoinSupport __b); + template inline _OuterJoinSupport& operator^=(_OuterJoinSupport& __a, _OuterJoinSupport __b); + + template inline _OuterJoinSupport operator~(_OuterJoinSupport __a); + + /** + * Outer join support flags + */ + typedef _OuterJoinSupport OuterJoinSupport; + /** Outer join is not supported */ + static const OuterJoinSupport OJ_NONE = _OJ_NONE; + /** Left outer join is supported */ + static const OuterJoinSupport OJ_LEFT = _OJ_LEFT; + /** Right outer join is supported */ + static const OuterJoinSupport OJ_RIGHT = _OJ_RIGHT; + /** Full outer join is supported */ + static const OuterJoinSupport OJ_FULL = _OJ_FULL; + /** Nested outer join is supported */ + static const OuterJoinSupport OJ_NESTED = _OJ_NESTED; + /** + * The columns names in the ON clause of a outer join don't have to share the same + * order as their respective table names in the OUTER JOIN clause + */ + static const OuterJoinSupport OJ_NOT_ORDERED = _OJ_NOT_ORDERED; + /** + * The inner table can also be used in an inner join + */ + static const OuterJoinSupport OJ_INNER = _OJ_INNER; + /** + * Any comparison operator in supported in the ON clause. + */ + static const OuterJoinSupport OJ_ALL_COMPARISON_OPS = _OJ_ALL_COMPARISON_OPS; + + /** + * Quoted Identified case support + */ + enum QuotedIdentifierCase { + QIC_STORES_LOWER, /**< Mixed case quoted SQL identifier are treated as + case insensitive and stored in lower case */ + QIC_STORES_MIXED, /**< Mixed case quoted SQL identifier are treated as + case insensitive and stored in mixed case */ + QIC_STORES_UPPER, /**< Mixed case quoted SQL identifier are treated as + case insensitive and stored in upper case */ + QIC_SUPPORTS_MIXED /**< Mixed case quoted SQL identifier are treated as + case sensitive and stored in mixed case */ + }; + + /* + * Internal Subquery support flags type + */ + enum _SubQuerySupport { + _SQ_NONE = 0, + _SQ_CORRELATED = 1 << 1L, + _SQ_IN_COMPARISON = 1 << 2L, + _SQ_IN_EXISTS = 1 << 3L, + _SQ_IN_INSERT = 1 << 4L, + _SQ_IN_QUANTIFIED = 1 << 5L + }; + + template inline _SubQuerySupport operator&(_SubQuerySupport __a, _SubQuerySupport __b); + template inline _SubQuerySupport operator|(_SubQuerySupport __a, _SubQuerySupport __b); + template inline _SubQuerySupport operator^(_SubQuerySupport __a, _SubQuerySupport __b); + + template inline _SubQuerySupport& operator&=(_SubQuerySupport& __a, _SubQuerySupport __b); + template inline _SubQuerySupport& operator|=(_SubQuerySupport& __a, _SubQuerySupport __b); + template inline _SubQuerySupport& operator^=(_SubQuerySupport& __a, _SubQuerySupport __b); + + template inline _SubQuerySupport operator~(_SubQuerySupport __a); + + /** + * SubQuery support flags + */ + typedef _SubQuerySupport SubQuerySupport; + /** + * Subqueries are not supported + */ + static const SubQuerySupport SQ_NONE = _SQ_NONE; + /** Correlated subqueries are supported */ + static const SubQuerySupport SQ_CORRELATED = _SQ_CORRELATED; + /** Subqueries in comparison expressions are supported */ + static const SubQuerySupport SQ_IN_COMPARISON = _SQ_IN_COMPARISON; + /** Subqueries in EXISTS expressions are supported */ + static const SubQuerySupport SQ_IN_EXISTS = _SQ_IN_EXISTS; + /** Subqueries in INSERT expressions are supported */ + static const SubQuerySupport SQ_IN_INSERT = _SQ_IN_INSERT; + /** Subqueries in quantified expressions are supported */ + static const SubQuerySupport SQ_IN_QUANTIFIED = _SQ_IN_QUANTIFIED; + + /* + * Internal Union support flags type + */ + enum _UnionSupport { + _U_NONE = 0, //!< _U_NONE + _U_UNION = 1 << 1L,//!< _U_UNION + _U_UNION_ALL = 1 << 2L //!< _U_UNION_ALL + }; + + template inline _UnionSupport operator&(_UnionSupport __a, _UnionSupport __b); + template inline _UnionSupport operator|(_UnionSupport __a, _UnionSupport __b); + template inline _UnionSupport operator^(_UnionSupport __a, _UnionSupport __b); + + template inline _UnionSupport& operator&=(_UnionSupport& __a, _UnionSupport __b); + template inline _UnionSupport& operator|=(_UnionSupport& __a, _UnionSupport __b); + template inline _UnionSupport& operator^=(_UnionSupport& __a, _UnionSupport __b); + + template inline _UnionSupport operator~(_UnionSupport __a); + + /** + * Union support flags + */ + typedef _UnionSupport UnionSupport; + /** Union is not supported */ + static const UnionSupport U_NONE = _U_NONE; + /** UNION is supported */ + static const UnionSupport U_UNION = _U_UNION; + /** UNION ALL is supported */ + static const UnionSupport U_UNION_ALL = _U_UNION_ALL; + + class DECLSPEC_DRILL_CLIENT CatalogMetadata { + protected: + CatalogMetadata() {}; + public: + virtual ~CatalogMetadata() {}; + + virtual bool hasCatalogName() const = 0; + virtual const std::string& getCatalogName() const = 0; + + virtual bool hasDescription() const = 0; + virtual const std::string& getDescription() const = 0; + + virtual bool hasConnect() const = 0; + virtual const std::string& getConnect() const = 0; + }; + + class DECLSPEC_DRILL_CLIENT SchemaMetadata { + protected: + SchemaMetadata() {}; + + public: + virtual ~SchemaMetadata() {}; + + virtual bool hasCatalogName() const = 0; + virtual const std::string& getCatalogName() const = 0; + + virtual bool hasSchemaName() const = 0; + virtual const std::string& getSchemaName() const = 0; + + virtual bool hasOwnerName() const = 0; + virtual const std::string& getOwner() const = 0; + + virtual bool hasType() const = 0; + virtual const std::string& getType() const = 0; + + virtual bool hasMutable() const = 0; + virtual const std::string& getMutable() const = 0; + }; + + class DECLSPEC_DRILL_CLIENT TableMetadata { + protected: + TableMetadata() {}; + + public: + virtual ~TableMetadata() {}; + + virtual bool hasCatalogName() const = 0; + virtual const std::string& getCatalogName() const = 0; + + virtual bool hasSchemaName() const = 0; + virtual const std::string& getSchemaName() const = 0; + + virtual bool hasTableName() const = 0; + virtual const std::string& getTableName() const = 0; + + virtual bool hasType() const = 0; + virtual const std::string& getType() const = 0; + }; + + class DECLSPEC_DRILL_CLIENT ColumnMetadata { + protected: + ColumnMetadata() {}; + + public: + virtual ~ColumnMetadata() {}; + + virtual bool hasCatalogName() const = 0; + virtual const std::string& getCatalogName() const = 0; + + virtual bool hasSchemaName() const = 0; + virtual const std::string& getSchemaName() const = 0; + + virtual bool hasTableName() const = 0; + virtual const std::string& getTableName() const = 0; + + virtual bool hasColumnName() const = 0; + virtual const std::string& getColumnName() const = 0; + + virtual bool hasOrdinalPosition() const = 0; + virtual std::size_t getOrdinalPosition() const = 0; + + virtual bool hasDefaultValue() const = 0; + virtual const std::string& getDefaultValue() const = 0; + + virtual bool hasNullable() const = 0; + virtual bool isNullable() const = 0; + + virtual bool hasDataType() const = 0; + virtual const std::string& getDataType() const = 0; + + virtual bool hasColumnSize() const = 0; + virtual std::size_t getColumnSize() const = 0; + + virtual bool hasCharMaxLength() const = 0; + virtual std::size_t getCharMaxLength() const = 0; + + virtual bool hasCharOctetLength() const = 0; + virtual std::size_t getCharOctetLength() const = 0; + + virtual bool hasNumericPrecision() const = 0; + virtual int32_t getNumericPrecision() const = 0; + + virtual bool hasNumericRadix() const = 0; + virtual int32_t getNumericRadix() const = 0; + + virtual bool hasNumericScale() const = 0; + virtual int32_t getNumericScale() const = 0; + + virtual bool hasIntervalType() const = 0; + virtual const std::string& getIntervalType() const = 0; + + virtual bool hasIntervalPrecision() const = 0; + virtual int32_t getIntervalPrecision() const = 0; + }; +} + +class DECLSPEC_DRILL_CLIENT Metadata { + public: + virtual ~Metadata() {}; + + /** + * Returns the connector name + * + * @return the connector name + */ + virtual const std::string& getConnectorName() const = 0; + + /** + * Returns the connector version string + * + * @return the connector version string + */ + virtual const std::string& getConnectorVersion() const = 0; + + /** + * Returns the connector major version + * + * @return the connector major version + */ + virtual uint32_t getConnectorMajorVersion() const = 0; + + /** + * Returns the connector minor version + * + * @return the connector minor version + */ + virtual uint32_t getConnectorMinorVersion() const = 0; + + /** + * Returns the connector patch version + * + * @return the connector patch version + */ + virtual uint32_t getConnectorPatchVersion() const = 0; + + /** + * Returns the server name + * + * @return the server name + */ + virtual const std::string& getServerName() const = 0; + + /** + * Returns the server version string + * + * @return the server version string + */ + virtual const std::string& getServerVersion() const = 0; + + /** + * Returns the server major version + * + * @return the server major version + */ + virtual uint32_t getServerMajorVersion() const = 0; + + /** + * Returns the server minor version + * + * @return the server minor version + */ + virtual uint32_t getServerMinorVersion() const = 0; + + /** + * Returns the server patch version + * + * @return the server patch version + */ + virtual uint32_t getServerPatchVersion() const = 0; + + /** + * Callback function invoked by getCatalogs when receiving results + * + * This callback is only invoked once. + * @param[in] ctx the listener context provided to getCatalogs + * @param[in] metadata the catalog metadata, or NULL in case of error + * @param[in] err an error object, NULL in case of success + */ + typedef status_t (*pfnCatalogMetadataListener)(void* ctx, const DrillCollection<meta::CatalogMetadata>* metadata, DrillClientError* err); + + /** + * Get a list of catalogPattern available to the current connection. + * Only catalogs matching the catalogPattern LIKE expression are returned. + * + * @param[in] catalogPattern a catalog pattern + * @param[in] listener a metadata listener + * @param[in] context to be passed to the listener + * @param[out] the query handle + */ + virtual status_t getCatalogs(const std::string& catalogPattern, pfnCatalogMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) = 0; + + /** + * Callback function invoked by getSchemas when receiving results + * + * This callback is only invoked once. + * @param[in] ctx the listener context provided to getSchemas + * @param[in] metadata the schema metadata, or NULL in case of error + * @param[in] err an error object, NULL in case of success + */ + typedef status_t (*pfnSchemaMetadataListener)(void* ctx, const DrillCollection<meta::SchemaMetadata>* metadata, DrillClientError* err); + + /** + * Get a list of schemas available to the current connection. + * Only schemas matching the catalogPattern and schemaPattern LIKE expressions are returned. + * + * @param[in] catalogPattern a catalog pattern + * @param[in] schemaPattern a schema pattern + * @param[in] listener a metadata query listener + * @param[in] context to be passed to the listener + * @param[out] the query handle + */ + virtual status_t getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, pfnSchemaMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) = 0; + + /** + * Callback function invoked by getTables when receiving results + * + * This callback is only invoked once. + * @param[in] ctx the listener context provided to getTables + * @param[in] metadata the table metadata, or NULL in case of error + * @param[in] err an error object, NULL in case of success + */ + typedef status_t (*pfnTableMetadataListener)(void* ctx, const DrillCollection<meta::TableMetadata>* metadata, DrillClientError* err); + + /** + * Get a list of tables available to the current connection. + * Only tables matching the catalogPattern, schemaPattern and tablePattern LIKE expressions are returned. + * + * @param[in] catalogPattern a catalog pattern + * @param[in] schemaPattern a schema pattern + * @param[in] tablePattern a table pattern + * @param[in] tableTypes a list of table types to look for. Pass NULL to not filter + * @param[in] listener a metadata query listener + * @param[in] context to be passed to the listener + * @param[out] the query handle + */ + virtual status_t getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, + pfnTableMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) = 0; + + /** + * Callback function invoked by getColumns when receiving results + * + * This callback is only invoked once. + * @param[in] ctx the listener context provided to getColumns + * @param[in] metadata the columns metadata, or NULL in case of error + * @param[in] err an error object, NULL in case of success + */ + typedef status_t (*pfnColumnMetadataListener)(void* ctx, const DrillCollection<meta::ColumnMetadata>* metadata, DrillClientError* err); + + /** + * Get a list of columns available to the current connection. + * Only columns matching the catalogPattern, schemaPattern, tablePattern and columnPattern LIKE expressions are returned. + * + * @param[in] catalogPattern a catalog pattern + * @param[in] schemaPattern a schema pattern + * @param[in] tablePattern a table pattern + * @param[in] columnPattern a colum name pattern + * @param[in] listener a metadata query listener + * @param[in] context to be passed to the listener + * @param[out] the query handle + */ + virtual status_t getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std:: string& tablePattern, const std::string& columnPattern, pfnColumnMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle) = 0; + + // Capabilities + /** + * Return if the current user can use all tables returned by the getTables method + * + * @result true if the user can select any table, false otherwise + */ + virtual bool areAllTableSelectable() const = 0; + + /** + * Return if the catalog name is at the start of a fully qualified table name + * + * @return true if the catalog name is at the start, false otherwise. + */ + virtual bool isCatalogAtStart() const = 0; + + /** + * Return the string used as a separator between the catalog and the table name + * + * @return the catalog separator + */ + virtual const std::string& getCatalogSeparator() const = 0; + + /** + * Return the term used by the server to designate a catalog + * + * @return the catalog term + */ + virtual const std::string& getCatalogTerm() const = 0; + + /** + * Return if the server supports column aliasing + * + * @return true if the server supports column aliasing, false otherwise + */ + virtual bool isColumnAliasingSupported() const = 0; + + /** + * Return if the result of a NULL and a non-NULL values concatenation is NULL + * + * @return true if the result is NULL, false otherwise + */ + virtual bool isNullPlusNonNullNull() const = 0; + + /** + * Return if the CONVERT function supports conversion for the given types + * + * @return true if the conversion is supported, false otherwise + */ + virtual bool isConvertSupported(common::MinorType from, common::MinorType to) const = 0; + + /** + * Return what kind of correlation name support the server provides + * + * @return the correlation name supported by the server + */ + virtual meta::CorrelationNamesSupport getCorrelationNames() const = 0; + + /** + * Returns if the connection to the server is read-only + * + * @return true if the connection is read-only, false otherwise + */ + virtual bool isReadOnly() const = 0; + + /** + * Return what kind of date time literals the server supports + * + * @return a bitmask of supported date/time literals + */ + virtual meta::DateTimeLiteralSupport getDateTimeLiteralsSupport() const = 0; + + /** + * Return what kind of COLLATE expressions are supported + */ + virtual meta::CollateSupport getCollateSupport() const = 0; + + /** + * Return what kind of GROUP BY support the server provides + * + * @return the group by support + */ + virtual meta::GroupBySupport getGroupBySupport() const = 0; + + /** + * Returns how unquoted identifier are stored + * + * @return the unquoted identifier storage policy + */ + virtual meta::IdentifierCase getIdentifierCase() const = 0; + + /** + * Returns the string used to quote SQL identifiers + * + * @return the quote string + */ + virtual const std::string& getIdentifierQuoteString() const = 0; + + /** + * Returns the list of SQL keywords supported by the database + * + * @return a list of keywords + */ + virtual const std::vector<std::string>& getSQLKeywords() const = 0; + + /** + * Returns if LIKE operator supports an escape clause + * + * @return true if escape claused are supported + */ + virtual bool isLikeEscapeClauseSupported() const = 0; + + /** + * Returns the maximum number of hexa characters supported for binary literals + * + * @return the length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxBinaryLiteralLength() const = 0; + + /** + * Returns the maximum length of catalog names + * + * @return the length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxCatalogNameLength() const = 0; + + /** + * Returns the maximum number of characters for string literals + * + * @return the length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxCharLiteralLength() const = 0; + + /** + * Returns the maximum length of column names + * + * @return the length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxColumnNameLength() const = 0; + + /** + * Returns the maximum number of columns in GROUP BY expressions + * + * @return the maximum number, 0 if unlimited or unknown + */ + virtual std::size_t getMaxColumnsInGroupBy() const = 0; + + /** + * Returns the maximum number of columns in ORDER BY expressions + * + * @return the maximum number, 0 if unlimited or unknown + */ + virtual std::size_t getMaxColumnsInOrderBy() const = 0; + + /** + * Returns the maximum number of columns in a SELECT list + * + * @return the maximum number, 0 if unlimited or unknown + */ + virtual std::size_t getMaxColumnsInSelect() const = 0; + + /** + * Returns the maximum length for cursor names + * + * @return the maximum length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxCursorNameLength() const = 0; + + /** + * Returns the maximum logical size for LOB types + * + * @return the maximum size, 0 if unlimited or unknown + */ + virtual std::size_t getMaxLogicalLobSize() const = 0; + + /** + * Returns the maximum number of statements + * + * @return the maximum number, 0 if unlimited or unknown + */ + virtual std::size_t getMaxStatements() const = 0; + + /** + * Returns the maximum number of bytes for a single row + * @return the maximum size, 0 if unlimited or unknown + */ + virtual std::size_t getMaxRowSize() const = 0; + + /** + * Returns if BLOB types are included in the maximum row size + * + * @return true if BLOB are included + */ + virtual bool isBlobIncludedInMaxRowSize() const = 0; + + /** + * Returns the maximum length for schema names + * @return the maximum length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxSchemaNameLength() const = 0; + + /** + * Returns the maximum length for statements + * @return the maximum length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxStatementLength() const = 0; + + /** + * Returns the maximum length for table names + * @return the maximum length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxTableNameLength() const = 0; + + /** + * Returns the maximum number of tables in a SELECT expression + * @return the maximum number, 0 if unlimited or unknown + */ + virtual std::size_t getMaxTablesInSelect() const = 0; + + /** + * Returns the maximum length for user names + * @return the maximum length, 0 if unlimited or unknown + */ + virtual std::size_t getMaxUserNameLength() const = 0; + + /** + * Returns how NULL are sorted + * + * @return the NULL collation policy + */ + virtual meta::NullCollation getNullCollation() const = 0; + + /** + * Returns the list of supported numeric functions + * @return a list of function names + */ + virtual const std::vector<std::string>& getNumericFunctions() const = 0; + + /** + * Returns how outer joins are supported + * @return outer join support (as flags) + */ + virtual meta::OuterJoinSupport getOuterJoinSupport() const = 0; + + /** + * Returns if columns not in the SELECT column lists can be used + * in the ORDER BY expression + * + * @return true if unrelated columns are supported in ORDER BY + */ + virtual bool isUnrelatedColumnsInOrderBySupported() const = 0; + + /** + * Returns how quoted identifier are stored + * + * @return the quoted identifier storage policy + */ + virtual meta::QuotedIdentifierCase getQuotedIdentifierCase() const = 0; + + /** + * Returns the term used to designate schemas + * + * @return the term + */ + virtual const std::string& getSchemaTerm() const = 0; + + /** + * Return the string for escaping patterns in metadata queries + * + * @return the characters for escaping, empty if not supported + */ + virtual const std::string& getSearchEscapeString() const = 0; + + /** + * Returns the list of extra characters that can be used in identifier names + * + * Extra characters are those characters beyond a-z, A-Z, 0-9 and '_' (underscore) + * + * @return a list of characters + */ + virtual const std::string& getSpecialCharacters() const = 0; + + /** + * Returns the list of supported string functions + * + * @return a list of function names + */ + virtual const std::vector<std::string>& getStringFunctions() const = 0; + + /** + * Returns how subqueries are supported + * + * @return the subqueries support (as flags) + */ + virtual meta::SubQuerySupport getSubQuerySupport() const = 0; + + /** + * Returns the list of supported system functions + * + * @return a list of function names + */ + virtual const std::vector<std::string>& getSystemFunctions() const = 0; + + /** + * Returns the term used to designate tables + * + * @return the term + */ + virtual const std::string& getTableTerm() const = 0; + + /** + * Returns the list of supported date/time functions + * + * @return a list of function names + */ + virtual const std::vector<std::string>& getDateTimeFunctions() const = 0; + + /** + * Returns if transactions are supported + * @return true if transactions are supported + */ + virtual bool isTransactionSupported() const = 0; + + /** + * Returns how unions are supported + * + * @return the union support (as flags) + */ + virtual meta::UnionSupport getUnionSupport() const = 0; + + /** + * Returns if SELECT FOR UPDATE expressions are supported + * + * @return true if SELECT FOR UPDATE is supported + */ + virtual bool isSelectForUpdateSupported() const = 0; +}; + class DECLSPEC_DRILL_CLIENT DrillClient{ public: /* @@ -273,7 +1199,7 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ */ DEPRECATED connectionStatus_t connect(const char* connectStr, const char* defaultSchema=NULL); - /* + /* * Connect the client to a Drillbit using connection string and a set of user properties. * The connection string format can be found in comments of * [DRILL-780](https://issues.apache.org/jira/browse/DRILL-780) @@ -325,10 +1251,30 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ /* * Submit a query asynchronously and wait for results to be returned through an iterator that returns - * results synchronously. The client app needs to call delete on the iterator when done. + * results synchronously. The client app needs to call freeQueryIterator on the iterator when done. */ RecordIterator* submitQuery(Drill::QueryType t, const std::string& plan, DrillClientError* err); + /** + * Prepare a query. + * + * @param[in] sql the query to prepare + * @param[in] listener a callback to be notified when the prepared statement is created, or if an error occured + * @param[in] user context to provide to the callback + * @param[out] a handle on the query + */ + status_t prepareQuery(const std::string& sql, pfnPreparedStatementListener listener, void* listenerCtx, QueryHandle_t* qHandle); + + /* + * Execute a prepared statement. + * + * @param[in] pstmt the prepared statement to execute + * @param[in] listener a callback to be notified when results have arrived, or if an error occured + * @param[in] user context to provide to the callback + * @param[out] a handle on the query + */ + status_t executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle); + /* * The client application should call this function to wait for results if it has registered a * listener. @@ -360,7 +1306,7 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ * Applications using the sync query submit method should call freeQueryIterator to free up resources * once the RecordIterator is no longer being processed. */ - void freeQueryIterator(RecordIterator** pIter){ delete *pIter; *pIter=NULL;}; + void freeQueryIterator(RecordIterator** pIter){ delete *pIter; *pIter=NULL;} /* * Applications using the async query submit method should call freeRecordBatch to free up resources @@ -368,7 +1314,15 @@ class DECLSPEC_DRILL_CLIENT DrillClient{ */ void freeRecordBatch(RecordBatch* pRecordBatch); + /** + * Get access to the server metadata + */ + Metadata* getMetadata(); + /** + * Free resources associated with the metadata object + */ + void freeMetadata(Metadata** metadata); private: static DrillClientInitializer s_init; diff --git a/contrib/native/client/src/include/drill/drillc.hpp b/contrib/native/client/src/include/drill/drillc.hpp index 3697ee8cd..c8593f599 100644 --- a/contrib/native/client/src/include/drill/drillc.hpp +++ b/contrib/native/client/src/include/drill/drillc.hpp @@ -21,6 +21,8 @@ #include "drill/common.hpp" #include "drill/drillClient.hpp" +#include "drill/fieldmeta.hpp" +#include "drill/preparedStatement.hpp" #include "drill/recordBatch.hpp" #include "drill/protobuf/Types.pb.h" diff --git a/contrib/native/client/src/include/drill/fieldmeta.hpp b/contrib/native/client/src/include/drill/fieldmeta.hpp new file mode 100644 index 000000000..40c9cca9b --- /dev/null +++ b/contrib/native/client/src/include/drill/fieldmeta.hpp @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIELDMETA_H +#define FIELDMETA_H + +#include "drill/common.hpp" +#include "drill/protobuf/Types.pb.h" + +namespace exec{ + namespace shared{ + class SerializedField; + }; + namespace user{ + class ResultColumnMetadata; + }; +}; + + +namespace Drill { + +class DECLSPEC_DRILL_CLIENT FieldMetadata{ + public: + enum ColumnSearchability { UNKNOWN_SEARCHABILITY = 0, NONE = 1, CHAR = 2, NUMBER = 3, ALL = 4 }; + enum ColumnUpdatability { UNKNOWN_UPDATABILITY = 0, READ_ONLY = 1, WRITABLE = 2 }; + + FieldMetadata(){}; + void set(const exec::shared::SerializedField& f); + void set(const exec::user::ResultColumnMetadata& m); + const std::string& getName() const{ return m_name;} + common::MinorType getMinorType() const{ return m_minorType;} + common::DataMode getDataMode() const{return m_dataMode;} + uint32_t getValueCount() const{return m_valueCount;} + uint32_t getScale() const{return m_scale;} + uint32_t getPrecision() const{return m_precision;} + uint32_t getBufferLength() const{return m_bufferLength;} + const std::string& getCatalogName() const{return m_catalogName;} + const std::string& getSchemaName() const{return m_schemaName;} + const std::string& getTableName() const{return m_tableName;} + const std::string& getLabel() const{return m_label;} + const std::string& getSQLType() const{return m_sqlType;} + bool isNullable() const{return m_nullable;} + bool isSigned() const{return m_signed;} + uint32_t getDisplaySize() const{return m_displaySize;} + bool isAliased() const{return m_aliased;} + ColumnSearchability getSearchability() const{return m_searchability;} + ColumnUpdatability getUpdatability() const{return m_updatability;} + bool isAutoIncremented() const{return m_autoIncremented;} + bool isCaseSensitive() const{return m_caseSensitive;} + bool isSortable() const{return m_sortable;} + bool isCurrency() const{return m_currency;} + void copy(Drill::FieldMetadata& f){ + m_name=f.m_name; + m_minorType=f.m_minorType; + m_dataMode=f.m_dataMode; + m_valueCount=f.m_valueCount; + m_scale=f.m_scale; + m_precision=f.m_precision; + m_bufferLength=f.m_bufferLength; + m_catalogName=f.m_catalogName; + m_schemaName=f.m_schemaName; + m_tableName=f.m_tableName; + m_label=f.m_label; + m_sqlType=f.m_sqlType; + m_nullable=f.m_nullable; + m_signed=f.m_signed; + m_displaySize=f.m_displaySize; + m_aliased=f.m_aliased; + m_searchability=f.m_searchability; + m_updatability=f.m_updatability; + m_autoIncremented=f.m_autoIncremented; + m_caseSensitive=f.m_caseSensitive; + m_sortable=f.m_sortable; + m_currency=f.m_currency; + m_columnSize=f.m_columnSize; + } + + private: + std::string m_name; + common::MinorType m_minorType; + common::DataMode m_dataMode; + uint32_t m_valueCount; + uint32_t m_scale; + uint32_t m_precision; + uint32_t m_bufferLength; + std::string m_catalogName; + std::string m_schemaName; + std::string m_tableName; + std::string m_label; + std::string m_sqlType; + bool m_nullable; + bool m_signed; + uint32_t m_displaySize; + bool m_aliased; + ColumnSearchability m_searchability; + ColumnUpdatability m_updatability; + bool m_autoIncremented; + bool m_caseSensitive; + bool m_sortable; + bool m_currency; + uint32_t m_columnSize; + +}; +} // namespace + +#endif + diff --git a/contrib/native/client/src/clientlib/rpcDecoder.hpp b/contrib/native/client/src/include/drill/preparedStatement.hpp index dca49f7ed..2a7d15a6a 100644 --- a/contrib/native/client/src/clientlib/rpcDecoder.hpp +++ b/contrib/native/client/src/include/drill/preparedStatement.hpp @@ -16,23 +16,23 @@ * limitations under the License. */ +#ifndef PREPAREDSTATEMENT_H +#define PREPAREDSTATEMENT_H -#ifndef RPC_DECODER_H -#define RPC_DECODER_H - -#include "rpcMessage.hpp" +#include <cstddef> +#include <string> +#include <vector> namespace Drill { +class DECLSPEC_DRILL_CLIENT PreparedStatement{ +public: + virtual std::size_t getNumFields() const = 0; + virtual const Drill::FieldMetadata& getFieldMetadata(std::size_t index) const = 0; -class RpcDecoder { - public: - RpcDecoder() { } - ~RpcDecoder() { } - // bool Decode(const DataBuf& buf); - // bool Decode(const DataBuf& buf, InBoundRpcMessage& msg); - static int LengthDecode(const uint8_t* buf, uint32_t* length); // read the length prefix (at most 4 bytes) - static int Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg); + virtual ~PreparedStatement() {}; }; } // namespace Drill -#endif + +#endif // PREPAREDSTATEMENT_H + diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp b/contrib/native/client/src/include/drill/recordBatch.hpp index 12cbad46d..8d1a0a3e6 100644 --- a/contrib/native/client/src/include/drill/recordBatch.hpp +++ b/contrib/native/client/src/include/drill/recordBatch.hpp @@ -647,7 +647,7 @@ template <class VALUEHOLDER_CLASS_TYPE, class VALUE_VECTOR_TYPE> sstr<<"NULL"; strncpy(buf, sstr.str().c_str(), nChars); }else{ - return m_pVector->getValueAt(index, buf, nChars); + m_pVector->getValueAt(index, buf, nChars); } } @@ -786,39 +786,6 @@ typedef NullableValueVectorTyped<IntervalHolder, ValueVectorInterval> NullableV typedef NullableValueVectorTyped<IntervalDayHolder, ValueVectorIntervalDay> NullableValueVectorIntervalDay; typedef NullableValueVectorTyped<IntervalYearHolder, ValueVectorIntervalYear> NullableValueVectorIntervalYear; -class DECLSPEC_DRILL_CLIENT FieldMetadata{ - public: - - FieldMetadata(){}; - void set(const exec::shared::SerializedField& f); - const std::string& getName() const{ return m_name;} - common::MinorType getMinorType() const{ return m_minorType;} - common::DataMode getDataMode() const{return m_dataMode;} - uint32_t getValueCount() const{return m_valueCount;} - uint32_t getScale() const{return m_scale;} - uint32_t getPrecision() const{return m_precision;} - uint32_t getBufferLength() const{return m_bufferLength;} - void copy(Drill::FieldMetadata& f){ - m_name=f.m_name; - m_minorType=f.m_minorType; - m_dataMode=f.m_dataMode; - m_valueCount=f.m_valueCount; - m_scale=f.m_scale; - m_precision=f.m_precision; - m_bufferLength=f.m_bufferLength; - } - - private: - //exec::shared::FieldMetadata* m_pFieldMetadata; - std::string m_name; - common::MinorType m_minorType; - common::DataMode m_dataMode; - uint32_t m_valueCount; - uint32_t m_scale; - uint32_t m_precision; - uint32_t m_bufferLength; -}; - class FieldBatch{ public: FieldBatch(const Drill::FieldMetadata& fmd, const ByteBuf_t data, size_t start, size_t length): diff --git a/contrib/native/client/src/test/CMakeLists.txt b/contrib/native/client/src/test/CMakeLists.txt index 892b58c46..523734bad 100644 --- a/contrib/native/client/src/test/CMakeLists.txt +++ b/contrib/native/client/src/test/CMakeLists.txt @@ -18,6 +18,7 @@ # Drill Client unit tests set (TESTS_SRC_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/CollectionsTest.cpp ${CMAKE_CURRENT_SOURCE_DIR}/UtilsTest.cpp ) diff --git a/contrib/native/client/src/test/CollectionsTest.cpp b/contrib/native/client/src/test/CollectionsTest.cpp new file mode 100644 index 000000000..ebac941c7 --- /dev/null +++ b/contrib/native/client/src/test/CollectionsTest.cpp @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> +#include <vector> + +#include <boost/assign.hpp> +#include <boost/shared_ptr.hpp> + +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include "drill/collections.hpp" +#include "collectionsImpl.hpp" + +namespace { +template<typename T, typename Iter> +class DrillVectorIteratorImpl: public Drill::impl::DrillIteratorImpl<T> { +public: + typedef DrillVectorIteratorImpl<T, Iter> type; + typedef Drill::impl::DrillIteratorImpl<T> supertype; + + DrillVectorIteratorImpl(const Iter& it): m_it(it) {}; + + T& operator*() const { return m_it.operator *();} + T* operator->() const { return m_it.operator->(); } + + operator typename Drill::impl::DrillIteratorImpl<const T>::iterator_ptr() const { return typename Drill::impl::DrillIteratorImpl<const T>::iterator_ptr(new DrillVectorIteratorImpl<const T, Iter>(m_it)); } + + DrillVectorIteratorImpl& operator++() { + m_it++; return *this; + } + + bool operator==(const supertype& x) const { + const type& other(dynamic_cast<const type&>(x)); + return m_it == other.m_it; + } + + bool operator!=(const supertype& x) const { return !(*this==x); } + +private: + Iter m_it; +}; + +template<typename T> +class DrillVectorImpl: public Drill::impl::DrillCollectionImpl<T> { +public: + typedef Drill::impl::DrillCollectionImpl<T> supertype; + + typedef typename supertype::iterator_ptr iterator_ptr; + typedef typename supertype::const_iterator_ptr const_iterator_ptr; + + DrillVectorImpl() {} + DrillVectorImpl(const std::vector<T>& v): m_vector(v) {}; + + iterator_ptr begin() { return iterator_ptr(new IteratorImpl(m_vector.begin()));} + const_iterator_ptr begin() const { return const_iterator_ptr(new ConstIteratorImpl(m_vector.begin()));} + iterator_ptr end() { return iterator_ptr(new IteratorImpl(m_vector.end()));} + const_iterator_ptr end() const { return const_iterator_ptr(new ConstIteratorImpl(m_vector.end()));} + +private: + typedef DrillVectorIteratorImpl<T, typename std::vector<T>::iterator> IteratorImpl; + typedef DrillVectorIteratorImpl<const T, typename std::vector<T>::const_iterator> ConstIteratorImpl; + std::vector<T> m_vector; +}; + +template<typename T> +class DrillVector: public Drill::DrillCollection<T> { +public: + DrillVector(const std::vector<T>& v): Drill::DrillCollection<T>(typename Drill::DrillCollection<T>::ImplPtr(new DrillVectorImpl<T>(v))) {} +}; + + +struct SimpleInterface { + virtual ~SimpleInterface() {} + virtual const std::string& foo() const = 0; + virtual std::string bar() = 0; +}; + +class SimpleImplementation: public SimpleInterface { +public: + SimpleImplementation(const std::string& foo, const std::string& bar): m_foo(foo), m_bar(bar) {} + + const std::string& foo() const { return m_foo; } + std::string bar() { return m_bar; } + +private: + std::string m_foo; + std::string m_bar; +}; + +} // anonymous namespace + +class CollectionsTest: public CppUnit::TestFixture { +public: + CollectionsTest() {} + + + CPPUNIT_TEST_SUITE( CollectionsTest ); + CPPUNIT_TEST( testSimpleCollection ); + CPPUNIT_TEST( testSimpleConstCollection ); + CPPUNIT_TEST( testDrillVectorConstIterator ); + CPPUNIT_TEST( testDrillVectorIterator ); + CPPUNIT_TEST( testDrillVectorConstPointer ); + CPPUNIT_TEST_SUITE_END(); + + void testSimpleCollection() { + // basic test/proof of concept for collections.hpp + + std::vector<std::string> v = boost::assign::list_of("foo")("bar"); + + DrillVector<std::string> drillCollection(v); + std::vector<std::string> result; + for(DrillVector<std::string>::const_iterator it = drillCollection.begin(); it != drillCollection.end(); ++it) { + result.push_back(*it); + } + + CPPUNIT_ASSERT(result == v); + } + + void testSimpleConstCollection() { + std::vector<std::string> v = boost::assign::list_of("foo")("bar"); + + const DrillVector<std::string> drillCollection(v); + std::vector<std::string> result; + for(DrillVector<std::string>::const_iterator it = drillCollection.begin(); it != drillCollection.end(); ++it) { + result.push_back(*it); + } + + CPPUNIT_ASSERT(result == v); + } + + void testDrillVectorConstIterator() { + typedef Drill::DrillVector<SimpleInterface, SimpleImplementation> SimpleInterfaceVector; + SimpleInterfaceVector v; + + v.push_back(SimpleImplementation("foo1", "bar1")); + v.push_back(SimpleImplementation("foo2", "bar2")); + + std::vector<std::string> resultFoo; + SimpleInterfaceVector::const_iterator it(v.begin()); + for(; it != v.end(); ++it) { + resultFoo.push_back(it->foo()); + // const-correctness: The following line does not compile if uncommented! + // resultBar.push_back(it->bar()); + } + + std::vector<std::string> expectedFoo = boost::assign::list_of("foo1")("foo2"); + + CPPUNIT_ASSERT(resultFoo == expectedFoo); + } + + void testDrillVectorIterator() { + typedef Drill::DrillVector<SimpleInterface, SimpleImplementation> SimpleInterfaceVector; + SimpleInterfaceVector v; + + v.push_back(SimpleImplementation("foo1", "bar1")); + v.push_back(SimpleImplementation("foo2", "bar2")); + + std::vector<std::string> resultFoo; + std::vector<std::string> resultBar; + SimpleInterfaceVector::iterator it; + for(it = v.begin(); it != v.end(); ++it) { + resultFoo.push_back(it->foo()); + resultBar.push_back(it->bar()); + } + + std::vector<std::string> expectedFoo = boost::assign::list_of("foo1")("foo2"); + std::vector<std::string> expectedBar = boost::assign::list_of("bar1")("bar2"); + + CPPUNIT_ASSERT(resultFoo == expectedFoo); + CPPUNIT_ASSERT(resultBar == expectedBar); + } + + // Check some const-correctness issues + // by getting iterators of a const collection + void testDrillVectorConstPointer() { + typedef Drill::DrillVector<SimpleInterface, SimpleImplementation> SimpleInterfaceVector; + boost::shared_ptr<SimpleInterfaceVector> v(new SimpleInterfaceVector); + + const SimpleInterfaceVector* vv(v.get()); + + v->push_back(SimpleImplementation("foo1", "bar1")); + v->push_back(SimpleImplementation("foo2", "bar2")); + + std::vector<std::string> resultFoo; + std::vector<std::string> resultBar; + SimpleInterfaceVector::const_iterator it; + for(it = vv->begin(); it != vv->end(); ++it) { + resultFoo.push_back(it->foo()); + } + + std::vector<std::string> expectedFoo = boost::assign::list_of("foo1")("foo2"); + + CPPUNIT_ASSERT(resultFoo == expectedFoo); + } + +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( CollectionsTest ); diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto index f78ea2bdf..96d0477c2 100644 --- a/protocol/src/main/protobuf/User.proto +++ b/protocol/src/main/protobuf/User.proto @@ -361,7 +361,7 @@ message ResultColumnMetadata { optional ColumnSearchability searchability = 13; /* - * Defaults to READ_ONLU + * Defaults to READ_ONLY */ optional ColumnUpdatability updatability = 14; |