diff options
author | Jacques Nadeau <jacques@apache.org> | 2014-08-27 10:31:56 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-27 13:33:48 -0700 |
commit | 5a7feb9dca73bc11249210e7a241a11533f6944e (patch) | |
tree | 86e8b629889c7e417895bf4282a76e06763a6e3c /contrib/native/client/src | |
parent | cd9eaa88fee4b01706a9237fb160aad5cb59f9c8 (diff) |
DRILL-998: Limit amount of memory used by drill C++ client API
Diffstat (limited to 'contrib/native/client/src')
9 files changed, 146 insertions, 43 deletions
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt index 37f473477..dc8f03267 100644 --- a/contrib/native/client/src/clientlib/CMakeLists.txt +++ b/contrib/native/client/src/clientlib/CMakeLists.txt @@ -27,6 +27,7 @@ set (CLIENTLIB_SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp ${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp ) include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include ) diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp index a7489bc35..6611332b0 100644 --- a/contrib/native/client/src/clientlib/drillClient.cpp +++ b/contrib/native/client/src/clientlib/drillClient.cpp @@ -43,7 +43,7 @@ DrillClientInitializer::~DrillClientInitializer(){ } logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR; -uint64_t DrillClientConfig::s_bufferLimit=-1; +uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE; int32_t DrillClientConfig::s_socketTimeout=180; boost::mutex DrillClientConfig::s_mutex; @@ -157,10 +157,6 @@ FieldDefPtr RecordIterator::getColDefs(){ status_t RecordIterator::next(){ status_t ret=QRY_SUCCESS; - this->m_pQueryResult->waitForData(); - if(m_pQueryResult->hasError()){ - return m_pQueryResult->getErrorStatus(); - } this->m_currentRecord++; if(!this->m_pQueryResult->isCancelled()){ @@ -169,8 +165,13 @@ status_t RecordIterator::next(){ if(this->m_pCurrentRecordBatch !=NULL){ DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl; delete this->m_pCurrentRecordBatch; //free the previous record batch + this->m_pCurrentRecordBatch=NULL; } this->m_currentRecord=0; + this->m_pQueryResult->waitForData(); + if(m_pQueryResult->hasError()){ + return m_pQueryResult->getErrorStatus(); + } this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); if(this->m_pCurrentRecordBatch != NULL){ DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 54dcdd0f1..0ea4897fa 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -175,7 +175,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){ DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << std::endl; m_io_service.run(); if(m_rbuf!=NULL){ - Utils::freeBuffer(m_rbuf); m_rbuf=NULL; + Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; } return CONN_SUCCESS; } @@ -332,6 +332,13 @@ void DrillClientImpl::getNextResult(){ // This call is always made from within a function where the mutex has already been acquired //boost::lock_guard<boost::mutex> lock(this->m_dcMutex); + { + boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex); + DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl; + while(AllocatedBuffer::s_isBufferLimitReached){ + AllocatedBuffer::s_memCV.wait(memLock); + } + } //use free, not delete to free ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN); @@ -362,10 +369,13 @@ void DrillClientImpl::waitForResults(){ delete this->m_pListenerThread; this->m_pListenerThread=NULL; } -status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error){ +status_t DrillClientImpl::readMsg(ByteBuf_t _buf, + AllocatedBufferPtr* allocatedBuffer, + InBoundRpcMessage& msg, + boost::system::error_code& error){ size_t leftover=0; uint32_t rmsgLen; - ByteBuf_t currentBuffer; + AllocatedBufferPtr currentBuffer; *allocatedBuffer=NULL; { // We need to protect the readLength and read buffer, and the pending requests counter, @@ -379,18 +389,18 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, In leftover = LEN_PREFIX_BUFLEN - bytes_read; // Allocate a buffer DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << std::endl; - currentBuffer=Utils::allocateBuffer(rmsgLen); + currentBuffer=new AllocatedBuffer(rmsgLen); if(currentBuffer==NULL){ - Utils::freeBuffer(_buf); + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL); } *allocatedBuffer=currentBuffer; if(leftover){ - memcpy(currentBuffer, _buf + bytes_read, leftover); + memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover); } DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : " << (rmsgLen - leftover) << std::endl; - ByteBuf_t b=currentBuffer + leftover; + ByteBuf_t b=currentBuffer->m_pBuffer + leftover; size_t bytesToRead=rmsgLen - leftover; while(1){ size_t dataBytesRead=this->m_socket.read_some( @@ -404,24 +414,24 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, In } if(!error){ // read data successfully - DrillClientImpl::s_decoder.Decode(currentBuffer, rmsgLen, msg); + DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg); DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl; }else{ - Utils::freeBuffer(_buf); + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); } }else{ // got a message with an invalid read length. - Utils::freeBuffer(_buf); + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL); } } - Utils::freeBuffer(_buf); + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); return QRY_SUCCESS; } -status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; status_t ret=QRY_SUCCESS; { @@ -453,14 +463,14 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundR //Check QueryResult.queryState. QueryResult could have an error. if(qr->query_state() == exec::shared::QueryResult_QueryState_FAILED){ status_t ret=handleQryError(QRY_FAILURE, qr->error(0), pDrillClientQueryResult); - Utils::freeBuffer(allocatedBuffer); + delete allocatedBuffer; delete qr; return ret; } //Validate the RPC message std::string valErr; if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){ - Utils::freeBuffer(allocatedBuffer); + delete allocatedBuffer; delete qr; return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult); } @@ -521,7 +531,7 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundR return ret; } -status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){ +status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){ DrillClientQueryResult* pDrillClientQueryResult=NULL; DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl; status_t ret=QRY_SUCCESS; @@ -539,10 +549,10 @@ status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMe //save queryId allocated here so we can free it later pDrillClientQueryResult->setQueryId(qid); }else{ - Utils::freeBuffer(allocatedBuffer); + delete allocatedBuffer; return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL); } - Utils::freeBuffer(allocatedBuffer); + delete allocatedBuffer; return ret; } @@ -580,7 +590,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, InBoundRpcMessage msg; DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl; - ByteBuf_t allocatedBuffer=NULL; + AllocatedBufferPtr allocatedBuffer=NULL; if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){ if(m_pendingRequests!=0){ @@ -628,7 +638,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, } }else{ // boost error - Utils::freeBuffer(_buf); + Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN); boost::lock_guard<boost::mutex> lock(this->m_dcMutex); handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; @@ -828,7 +838,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx, return QRY_SUCCESS; } -RecordBatch* DrillClientQueryResult::peekNext() { +RecordBatch* DrillClientQueryResult::peekNext(){ RecordBatch* pRecordBatch=NULL; //if no more data, return NULL; if(!m_bIsQueryPending) return NULL; diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp index e40b2147c..3ac0b20ce 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.hpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp @@ -198,7 +198,7 @@ class DrillClientImpl{ m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr); m_socket.close(); if(m_rbuf!=NULL){ - Utils::freeBuffer(m_rbuf); m_rbuf=NULL; + Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL; } if(m_pError!=NULL){ delete m_pError; m_pError=NULL; @@ -244,9 +244,13 @@ class DrillClientImpl{ void handleHShakeReadTimeout(const boost::system::error_code & err); // Query results void getNextResult(); - status_t readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error); - status_t processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg); - status_t processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ); + status_t readMsg( + ByteBuf_t _buf, + AllocatedBufferPtr* allocatedBuffer, + InBoundRpcMessage& msg, + boost::system::error_code& error); + status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg); + status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ); void handleReadTimeout(const boost::system::error_code & err); void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ; status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError); diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp index 17073bd62..4c55f0482 100644 --- a/contrib/native/client/src/clientlib/recordBatch.cpp +++ b/contrib/native/client/src/clientlib/recordBatch.cpp @@ -312,7 +312,7 @@ RecordBatch::~RecordBatch(){ } m_fieldDefs->clear(); delete m_pQueryResult; - Utils::freeBuffer(m_allocatedBuffer); + delete m_allocatedBuffer; } ret_t RecordBatch::build(){ diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp new file mode 100644 index 000000000..f1f03a1e3 --- /dev/null +++ b/contrib/native/client/src/clientlib/utils.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdlib.h> +#include "utils.hpp" +#include "drill/common.hpp" + +namespace Drill{ + + +boost::mutex AllocatedBuffer::s_memCVMutex; +boost::condition_variable AllocatedBuffer::s_memCV; +size_t AllocatedBuffer::s_allocatedMem=0; +bool AllocatedBuffer::s_isBufferLimitReached=false; + +ByteBuf_t Utils::allocateBuffer(size_t len){ + boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex); + AllocatedBuffer::s_allocatedMem+=len; + //http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc + ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); + size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE; + if(b!=NULL && AllocatedBuffer::s_allocatedMem >= safeSize){ + AllocatedBuffer::s_isBufferLimitReached=true; + } + return b; +} + +void Utils::freeBuffer(ByteBuf_t b, size_t len){ + boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex); + AllocatedBuffer::s_allocatedMem-=len; + free(b); + size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE; + if(b!=NULL && AllocatedBuffer::s_allocatedMem < safeSize){ + AllocatedBuffer::s_isBufferLimitReached=false; + //signal any waiting threads + AllocatedBuffer::s_memCV.notify_one(); + } +} + + +AllocatedBuffer::AllocatedBuffer(size_t l){ + m_pBuffer=NULL; + m_pBuffer=Utils::allocateBuffer(l); + m_bufSize=m_pBuffer!=NULL?l:0; +} + +AllocatedBuffer::~AllocatedBuffer(){ + Utils::freeBuffer(m_pBuffer, m_bufSize); + m_pBuffer=NULL; + m_bufSize=0; +} + +} // namespace diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp index 9def9b4da..0f26ad69c 100644 --- a/contrib/native/client/src/clientlib/utils.hpp +++ b/contrib/native/client/src/clientlib/utils.hpp @@ -23,24 +23,37 @@ #include <ostream> #include <fstream> #include <string> -#include <stdlib.h> +#include <boost/thread.hpp> #include "drill/common.hpp" +#include "drill/drillClient.hpp" namespace Drill{ -class Utils{ +// Wrapper Class to keep track of allocated memory +class AllocatedBuffer{ public: + AllocatedBuffer(size_t l); + ~AllocatedBuffer(); - //allocate memory for Record Batches - static ByteBuf_t allocateBuffer(size_t len){ - //http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc - ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); return b; - } - static void freeBuffer(ByteBuf_t b){ free(b); } + ByteBuf_t m_pBuffer; + size_t m_bufSize; + + // keep track of allocated memory. The client lib blocks + // if we have allocated up to a limit (defined in drillClientConfig). + static boost::mutex s_memCVMutex; + static boost::condition_variable s_memCV; + static size_t s_allocatedMem; + static bool s_isBufferLimitReached; -}; // Utils +}; +class Utils{ + public: + //allocate memory for Record Batches + static ByteBuf_t allocateBuffer(size_t len); + static void freeBuffer(ByteBuf_t b, size_t len); +}; // Utils } // namespace Drill diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index 2113ce5e1..59734dc5b 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -33,6 +33,9 @@ #define MAX_CONNECT_STR 4096 #define MAX_SOCK_RD_BUFSIZE 1024 +#define MEM_CHUNK_SIZE 64*1024; // 64K +#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB + #ifdef _DEBUG #define EXTRA_DEBUGGING #define CODER_DEBUGGING @@ -48,6 +51,9 @@ typedef Byte_t * ByteBuf_t; class FieldMetadata; typedef boost::shared_ptr< std::vector<Drill::FieldMetadata*> > FieldDefPtr; +class AllocatedBuffer; +typedef AllocatedBuffer* AllocatedBufferPtr; + typedef enum{ QRY_SUCCESS=0, QRY_FAILURE=1, diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp b/contrib/native/client/src/include/drill/recordBatch.hpp index e9298bf17..9a3df2b6f 100644 --- a/contrib/native/client/src/include/drill/recordBatch.hpp +++ b/contrib/native/client/src/include/drill/recordBatch.hpp @@ -836,10 +836,10 @@ class ValueVectorFactory{ class DECLSPEC_DRILL_CLIENT RecordBatch{ public: - //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record BAtches operate on - //slices of the allcoated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the + //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record Batches operate on + //slices of the allocated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the //allocated buffer before m_buffer is mostly the RPC header, and the QueryResult object. - RecordBatch(exec::shared::QueryResult* pResult, ByteBuf_t r, ByteBuf_t b) + RecordBatch(exec::shared::QueryResult* pResult, AllocatedBufferPtr r, ByteBuf_t b) :m_fieldDefs(new(std::vector<Drill::FieldMetadata*>)){ m_pQueryResult=pResult; m_pRecordBatchDef=&pResult->def(); @@ -892,7 +892,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{ private: const exec::shared::QueryResult* m_pQueryResult; const exec::shared::RecordBatchDef* m_pRecordBatchDef; - ByteBuf_t m_allocatedBuffer; + AllocatedBufferPtr m_allocatedBuffer; ByteBuf_t m_buffer; //build the current schema out of the field metadata FieldDefPtr m_fieldDefs; |