aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2014-08-27 10:31:56 -0700
committerJacques Nadeau <jacques@apache.org>2014-08-27 13:33:48 -0700
commit5a7feb9dca73bc11249210e7a241a11533f6944e (patch)
tree86e8b629889c7e417895bf4282a76e06763a6e3c /contrib/native/client/src
parentcd9eaa88fee4b01706a9237fb160aad5cb59f9c8 (diff)
DRILL-998: Limit amount of memory used by drill C++ client API
Diffstat (limited to 'contrib/native/client/src')
-rw-r--r--contrib/native/client/src/clientlib/CMakeLists.txt1
-rw-r--r--contrib/native/client/src/clientlib/drillClient.cpp11
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.cpp50
-rw-r--r--contrib/native/client/src/clientlib/drillClientImpl.hpp12
-rw-r--r--contrib/native/client/src/clientlib/recordBatch.cpp2
-rw-r--r--contrib/native/client/src/clientlib/utils.cpp68
-rw-r--r--contrib/native/client/src/clientlib/utils.hpp31
-rw-r--r--contrib/native/client/src/include/drill/common.hpp6
-rw-r--r--contrib/native/client/src/include/drill/recordBatch.hpp8
9 files changed, 146 insertions, 43 deletions
diff --git a/contrib/native/client/src/clientlib/CMakeLists.txt b/contrib/native/client/src/clientlib/CMakeLists.txt
index 37f473477..dc8f03267 100644
--- a/contrib/native/client/src/clientlib/CMakeLists.txt
+++ b/contrib/native/client/src/clientlib/CMakeLists.txt
@@ -27,6 +27,7 @@ set (CLIENTLIB_SRC_FILES
${CMAKE_CURRENT_SOURCE_DIR}/rpcDecoder.cpp
${CMAKE_CURRENT_SOURCE_DIR}/errmsgs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/logger.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
)
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include )
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index a7489bc35..6611332b0 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -43,7 +43,7 @@ DrillClientInitializer::~DrillClientInitializer(){
}
logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR;
-uint64_t DrillClientConfig::s_bufferLimit=-1;
+uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE;
int32_t DrillClientConfig::s_socketTimeout=180;
boost::mutex DrillClientConfig::s_mutex;
@@ -157,10 +157,6 @@ FieldDefPtr RecordIterator::getColDefs(){
status_t RecordIterator::next(){
status_t ret=QRY_SUCCESS;
- this->m_pQueryResult->waitForData();
- if(m_pQueryResult->hasError()){
- return m_pQueryResult->getErrorStatus();
- }
this->m_currentRecord++;
if(!this->m_pQueryResult->isCancelled()){
@@ -169,8 +165,13 @@ status_t RecordIterator::next(){
if(this->m_pCurrentRecordBatch !=NULL){
DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;
delete this->m_pCurrentRecordBatch; //free the previous record batch
+ this->m_pCurrentRecordBatch=NULL;
}
this->m_currentRecord=0;
+ this->m_pQueryResult->waitForData();
+ if(m_pQueryResult->hasError()){
+ return m_pQueryResult->getErrorStatus();
+ }
this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext();
if(this->m_pCurrentRecordBatch != NULL){
DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 54dcdd0f1..0ea4897fa 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -175,7 +175,7 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
DRILL_LOG(LOG_DEBUG) << "Sent handshake read request to server" << std::endl;
m_io_service.run();
if(m_rbuf!=NULL){
- Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
return CONN_SUCCESS;
}
@@ -332,6 +332,13 @@ void DrillClientImpl::getNextResult(){
// This call is always made from within a function where the mutex has already been acquired
//boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+ {
+ boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
+ DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;
+ while(AllocatedBuffer::s_isBufferLimitReached){
+ AllocatedBuffer::s_memCV.wait(memLock);
+ }
+ }
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
@@ -362,10 +369,13 @@ void DrillClientImpl::waitForResults(){
delete this->m_pListenerThread; this->m_pListenerThread=NULL;
}
-status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error){
+status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
+ AllocatedBufferPtr* allocatedBuffer,
+ InBoundRpcMessage& msg,
+ boost::system::error_code& error){
size_t leftover=0;
uint32_t rmsgLen;
- ByteBuf_t currentBuffer;
+ AllocatedBufferPtr currentBuffer;
*allocatedBuffer=NULL;
{
// We need to protect the readLength and read buffer, and the pending requests counter,
@@ -379,18 +389,18 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, In
leftover = LEN_PREFIX_BUFLEN - bytes_read;
// Allocate a buffer
DRILL_LOG(LOG_TRACE) << "Allocated and locked buffer." << std::endl;
- currentBuffer=Utils::allocateBuffer(rmsgLen);
+ currentBuffer=new AllocatedBuffer(rmsgLen);
if(currentBuffer==NULL){
- Utils::freeBuffer(_buf);
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
}
*allocatedBuffer=currentBuffer;
if(leftover){
- memcpy(currentBuffer, _buf + bytes_read, leftover);
+ memcpy(currentBuffer->m_pBuffer, _buf + bytes_read, leftover);
}
DRILL_LOG(LOG_TRACE) << "reading data (rmsgLen - leftover) : "
<< (rmsgLen - leftover) << std::endl;
- ByteBuf_t b=currentBuffer + leftover;
+ ByteBuf_t b=currentBuffer->m_pBuffer + leftover;
size_t bytesToRead=rmsgLen - leftover;
while(1){
size_t dataBytesRead=this->m_socket.read_some(
@@ -404,24 +414,24 @@ status_t DrillClientImpl::readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, In
}
if(!error){
// read data successfully
- DrillClientImpl::s_decoder.Decode(currentBuffer, rmsgLen, msg);
+ DrillClientImpl::s_decoder.Decode(currentBuffer->m_pBuffer, rmsgLen, msg);
DRILL_LOG(LOG_TRACE) << "Done decoding chunk. Coordination id: " <<msg.m_coord_id<< std::endl;
}else{
- Utils::freeBuffer(_buf);
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR,
getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
}
}else{
// got a message with an invalid read length.
- Utils::freeBuffer(_buf);
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
}
- Utils::freeBuffer(_buf);
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
-status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
{
@@ -453,14 +463,14 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundR
//Check QueryResult.queryState. QueryResult could have an error.
if(qr->query_state() == exec::shared::QueryResult_QueryState_FAILED){
status_t ret=handleQryError(QRY_FAILURE, qr->error(0), pDrillClientQueryResult);
- Utils::freeBuffer(allocatedBuffer);
+ delete allocatedBuffer;
delete qr;
return ret;
}
//Validate the RPC message
std::string valErr;
if( (ret=validateMessage(msg, *qr, valErr)) != QRY_SUCCESS){
- Utils::freeBuffer(allocatedBuffer);
+ delete allocatedBuffer;
delete qr;
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
@@ -521,7 +531,7 @@ status_t DrillClientImpl::processQueryResult(ByteBuf_t allocatedBuffer, InBoundR
return ret;
}
-status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg ){
+status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;
status_t ret=QRY_SUCCESS;
@@ -539,10 +549,10 @@ status_t DrillClientImpl::processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMe
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
}else{
- Utils::freeBuffer(allocatedBuffer);
+ delete allocatedBuffer;
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
- Utils::freeBuffer(allocatedBuffer);
+ delete allocatedBuffer;
return ret;
}
@@ -580,7 +590,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
InBoundRpcMessage msg;
DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;
- ByteBuf_t allocatedBuffer=NULL;
+ AllocatedBufferPtr allocatedBuffer=NULL;
if(readMsg(_buf, &allocatedBuffer, msg, error)!=QRY_SUCCESS){
if(m_pendingRequests!=0){
@@ -628,7 +638,7 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
}
}else{
// boost error
- Utils::freeBuffer(_buf);
+ Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
@@ -828,7 +838,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
return QRY_SUCCESS;
}
-RecordBatch* DrillClientQueryResult::peekNext() {
+RecordBatch* DrillClientQueryResult::peekNext(){
RecordBatch* pRecordBatch=NULL;
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index e40b2147c..3ac0b20ce 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -198,7 +198,7 @@ class DrillClientImpl{
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ignorederr);
m_socket.close();
if(m_rbuf!=NULL){
- Utils::freeBuffer(m_rbuf); m_rbuf=NULL;
+ Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
@@ -244,9 +244,13 @@ class DrillClientImpl{
void handleHShakeReadTimeout(const boost::system::error_code & err);
// Query results
void getNextResult();
- status_t readMsg(ByteBuf_t _buf, ByteBuf_t* allocatedBuffer, InBoundRpcMessage& msg, boost::system::error_code& error);
- status_t processQueryResult(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg);
- status_t processQueryId(ByteBuf_t allocatedBuffer, InBoundRpcMessage& msg );
+ status_t readMsg(
+ ByteBuf_t _buf,
+ AllocatedBufferPtr* allocatedBuffer,
+ InBoundRpcMessage& msg,
+ boost::system::error_code& error);
+ status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
+ status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg );
void handleReadTimeout(const boost::system::error_code & err);
void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp
index 17073bd62..4c55f0482 100644
--- a/contrib/native/client/src/clientlib/recordBatch.cpp
+++ b/contrib/native/client/src/clientlib/recordBatch.cpp
@@ -312,7 +312,7 @@ RecordBatch::~RecordBatch(){
}
m_fieldDefs->clear();
delete m_pQueryResult;
- Utils::freeBuffer(m_allocatedBuffer);
+ delete m_allocatedBuffer;
}
ret_t RecordBatch::build(){
diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp
new file mode 100644
index 000000000..f1f03a1e3
--- /dev/null
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include "utils.hpp"
+#include "drill/common.hpp"
+
+namespace Drill{
+
+
+boost::mutex AllocatedBuffer::s_memCVMutex;
+boost::condition_variable AllocatedBuffer::s_memCV;
+size_t AllocatedBuffer::s_allocatedMem=0;
+bool AllocatedBuffer::s_isBufferLimitReached=false;
+
+ByteBuf_t Utils::allocateBuffer(size_t len){
+ boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
+ AllocatedBuffer::s_allocatedMem+=len;
+ //http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc
+ ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t));
+ size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
+ if(b!=NULL && AllocatedBuffer::s_allocatedMem >= safeSize){
+ AllocatedBuffer::s_isBufferLimitReached=true;
+ }
+ return b;
+}
+
+void Utils::freeBuffer(ByteBuf_t b, size_t len){
+ boost::lock_guard<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
+ AllocatedBuffer::s_allocatedMem-=len;
+ free(b);
+ size_t safeSize= DrillClientConfig::getBufferLimit()-MEM_CHUNK_SIZE;
+ if(b!=NULL && AllocatedBuffer::s_allocatedMem < safeSize){
+ AllocatedBuffer::s_isBufferLimitReached=false;
+ //signal any waiting threads
+ AllocatedBuffer::s_memCV.notify_one();
+ }
+}
+
+
+AllocatedBuffer::AllocatedBuffer(size_t l){
+ m_pBuffer=NULL;
+ m_pBuffer=Utils::allocateBuffer(l);
+ m_bufSize=m_pBuffer!=NULL?l:0;
+}
+
+AllocatedBuffer::~AllocatedBuffer(){
+ Utils::freeBuffer(m_pBuffer, m_bufSize);
+ m_pBuffer=NULL;
+ m_bufSize=0;
+}
+
+} // namespace
diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp
index 9def9b4da..0f26ad69c 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -23,24 +23,37 @@
#include <ostream>
#include <fstream>
#include <string>
-#include <stdlib.h>
+#include <boost/thread.hpp>
#include "drill/common.hpp"
+#include "drill/drillClient.hpp"
namespace Drill{
-class Utils{
+// Wrapper Class to keep track of allocated memory
+class AllocatedBuffer{
public:
+ AllocatedBuffer(size_t l);
+ ~AllocatedBuffer();
- //allocate memory for Record Batches
- static ByteBuf_t allocateBuffer(size_t len){
- //http://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc
- ByteBuf_t b = (ByteBuf_t)calloc(len, sizeof(Byte_t)); return b;
- }
- static void freeBuffer(ByteBuf_t b){ free(b); }
+ ByteBuf_t m_pBuffer;
+ size_t m_bufSize;
+
+ // keep track of allocated memory. The client lib blocks
+ // if we have allocated up to a limit (defined in drillClientConfig).
+ static boost::mutex s_memCVMutex;
+ static boost::condition_variable s_memCV;
+ static size_t s_allocatedMem;
+ static bool s_isBufferLimitReached;
-}; // Utils
+};
+class Utils{
+ public:
+ //allocate memory for Record Batches
+ static ByteBuf_t allocateBuffer(size_t len);
+ static void freeBuffer(ByteBuf_t b, size_t len);
+}; // Utils
} // namespace Drill
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 2113ce5e1..59734dc5b 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -33,6 +33,9 @@
#define MAX_CONNECT_STR 4096
#define MAX_SOCK_RD_BUFSIZE 1024
+#define MEM_CHUNK_SIZE 64*1024; // 64K
+#define MAX_MEM_ALLOC_SIZE 256*1024*1024; // 256 MB
+
#ifdef _DEBUG
#define EXTRA_DEBUGGING
#define CODER_DEBUGGING
@@ -48,6 +51,9 @@ typedef Byte_t * ByteBuf_t;
class FieldMetadata;
typedef boost::shared_ptr< std::vector<Drill::FieldMetadata*> > FieldDefPtr;
+class AllocatedBuffer;
+typedef AllocatedBuffer* AllocatedBufferPtr;
+
typedef enum{
QRY_SUCCESS=0,
QRY_FAILURE=1,
diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp b/contrib/native/client/src/include/drill/recordBatch.hpp
index e9298bf17..9a3df2b6f 100644
--- a/contrib/native/client/src/include/drill/recordBatch.hpp
+++ b/contrib/native/client/src/include/drill/recordBatch.hpp
@@ -836,10 +836,10 @@ class ValueVectorFactory{
class DECLSPEC_DRILL_CLIENT RecordBatch{
public:
- //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record BAtches operate on
- //slices of the allcoated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the
+ //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record Batches operate on
+ //slices of the allocated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the
//allocated buffer before m_buffer is mostly the RPC header, and the QueryResult object.
- RecordBatch(exec::shared::QueryResult* pResult, ByteBuf_t r, ByteBuf_t b)
+ RecordBatch(exec::shared::QueryResult* pResult, AllocatedBufferPtr r, ByteBuf_t b)
:m_fieldDefs(new(std::vector<Drill::FieldMetadata*>)){
m_pQueryResult=pResult;
m_pRecordBatchDef=&pResult->def();
@@ -892,7 +892,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
private:
const exec::shared::QueryResult* m_pQueryResult;
const exec::shared::RecordBatchDef* m_pRecordBatchDef;
- ByteBuf_t m_allocatedBuffer;
+ AllocatedBufferPtr m_allocatedBuffer;
ByteBuf_t m_buffer;
//build the current schema out of the field metadata
FieldDefPtr m_fieldDefs;