aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/include
diff options
context:
space:
mode:
authorParth Chandra <pchandra@maprtech.com>2014-05-30 11:17:40 -0700
committerJacques Nadeau <jacques@apache.org>2014-06-19 20:29:53 -0700
commitaaa4db74b215e03ad0e1334cfc18964972d93a3b (patch)
treeb880e6c31c48f5f79037976edf1a2b27576e4030 /contrib/native/client/src/include
parentff39fb8383e038aadbf4810a6b4ad5f22d25a181 (diff)
DRILL-875: Fixes for DRILL-707, DRILL-780, DRILL-835 (Schema change), DRILL-852, DRILL-876, DRILL_877, DRILL-878, DRILL-890
Diffstat (limited to 'contrib/native/client/src/include')
-rw-r--r--contrib/native/client/src/include/drill/common.hpp25
-rw-r--r--contrib/native/client/src/include/drill/drillClient.hpp73
-rw-r--r--contrib/native/client/src/include/drill/protobuf/User.pb.h40
-rw-r--r--contrib/native/client/src/include/drill/recordBatch.hpp100
4 files changed, 118 insertions, 120 deletions
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 436805861..2113ce5e1 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -23,13 +23,19 @@
#include <stdint.h>
#include <string>
#include <vector>
+#include <boost/shared_ptr.hpp>
+
+#define DRILL_RPC_VERSION 1
#define LENGTH_PREFIX_MAX_LENGTH 5
#define LEN_PREFIX_BUFLEN LENGTH_PREFIX_MAX_LENGTH
+#define MAX_CONNECT_STR 4096
+#define MAX_SOCK_RD_BUFSIZE 1024
+
#ifdef _DEBUG
#define EXTRA_DEBUGGING
-#define CODER_DEBUGGING
+#define CODER_DEBUGGING
#endif
namespace Drill {
@@ -39,12 +45,15 @@ typedef std::vector<uint8_t> DataBuf;
typedef uint8_t Byte_t;
typedef Byte_t * ByteBuf_t;
+class FieldMetadata;
+typedef boost::shared_ptr< std::vector<Drill::FieldMetadata*> > FieldDefPtr;
+
typedef enum{
- QRY_SUCCESS=0,
- QRY_FAILURE=1,
- QRY_SUCCESS_WITH_INFO=2,
- QRY_NO_MORE_DATA=3,
- QRY_CANCEL=4,
+ QRY_SUCCESS=0,
+ QRY_FAILURE=1,
+ QRY_SUCCESS_WITH_INFO=2,
+ QRY_NO_MORE_DATA=3,
+ QRY_CANCEL=4,
QRY_OUT_OF_BOUNDS=5,
QRY_CLIENT_OUTOFMEM=6,
QRY_INTERNAL_ERROR=7,
@@ -52,8 +61,8 @@ typedef enum{
} status_t;
typedef enum{
- CONN_SUCCESS=0,
- CONN_FAILURE=1,
+ CONN_SUCCESS=0,
+ CONN_FAILURE=1,
CONN_HANDSHAKE_FAILED=2,
CONN_INVALID_INPUT=3,
CONN_ZOOKEEPER_ERROR=4
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 0e85dcc93..6d59afb5d 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -21,8 +21,6 @@
#define DRILL_CLIENT_H
#include <vector>
-#include <boost/log/core.hpp>
-#include <boost/log/trivial.hpp>
#include <boost/thread.hpp>
#include "drill/common.hpp"
#include "drill/protobuf/User.pb.h"
@@ -42,7 +40,7 @@
#if __GNUC__ >= 4
#define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
#else
- #define DECLSPEC_DRILL_CLIENT
+ #define DECLSPEC_DRILL_CLIENT
#endif
#endif
@@ -65,7 +63,7 @@ class DECLSPEC_DRILL_CLIENT DrillClientError{
static DrillClientError* getErrorObject(const exec::shared::DrillPBError& e);
- // To get the error number we add a error range start number to
+ // To get the error number we add a error range start number to
// the status code returned (either status_t or connectionStatus_t)
uint32_t status; // could be either status_t or connectionStatus_t
uint32_t errnum;
@@ -83,10 +81,13 @@ class DECLSPEC_DRILL_CLIENT DrillClientInitializer{
class DECLSPEC_DRILL_CLIENT DrillClientConfig{
public:
DrillClientConfig();
+ ~DrillClientConfig();
static void initLogging(const char* path);
static void setLogLevel(logLevel_t l);
static void setBufferLimit(uint64_t l);
static uint64_t getBufferLimit();
+ static void setSocketTimeout(int32_t l);
+ static int32_t getSocketTimeout();
static logLevel_t getLogLevel();
private:
// The logging level
@@ -94,7 +95,9 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
// The total amount of memory to be allocated by an instance of DrillClient.
// For future use. Currently, not enforced.
static uint64_t s_bufferLimit;
- static boost::mutex s_mutex;
+ // Timeout (in seconds) for asynchronous read operations. Default is 180 seconds
+ static int32_t s_socketTimeout;
+ static boost::mutex s_mutex;
};
@@ -104,8 +107,8 @@ class DECLSPEC_DRILL_CLIENT DrillClientConfig{
typedef void* QueryHandle_t;
/*
- * Query Results listener callback. This function is called for every record batch after it has
- * been received and decoded. The listener function should return a status.
+ * Query Results listener callback. This function is called for every record batch after it has
+ * been received and decoded. The listener function should return a status.
* If the listener returns failure, the query will be canceled.
*
* DrillClientQueryResult will hold a listener & listener contxt for the call back function
@@ -114,13 +117,13 @@ typedef status_t (*pfnQueryResultsListener)(QueryHandle_t ctx, RecordBatch* b, D
/*
* The schema change listener callback. This function is called if the record batch detects a
- * change in the schema. The client application can call getColDefs in the RecordIterator or
+ * change in the schema. The client application can call getColDefs in the RecordIterator or
* get the field information from the RecordBatch itself and handle the change appropriately.
*/
-typedef uint32_t (*pfnSchemaListener)(void* ctx, SchemaDef* s, DrillClientError* err);
+typedef status_t (*pfnSchemaListener)(void* ctx, FieldDefPtr f, DrillClientError* err);
-/*
- * A Record Iterator instance is returned by the SubmitQuery class. Calls block until some data
+/*
+ * A Record Iterator instance is returned by the SubmitQuery class. Calls block until some data
* is available, or until all data has been returned.
*/
@@ -129,12 +132,12 @@ class DECLSPEC_DRILL_CLIENT RecordIterator{
public:
~RecordIterator();
- /*
- * Returns a vector of column(i.e. field) definitions. The returned reference is guaranteed to be valid till the
- * end of the query or until a schema change event is received. If a schema change event is received by the
- * application, the application should discard the reference it currently holds and call this function again.
+ /*
+ * Returns a vector of column(i.e. field) definitions. The returned reference is guaranteed to be valid till the
+ * end of the query or until a schema change event is received. If a schema change event is received by the
+ * application, the application should discard the reference it currently holds and call this function again.
*/
- std::vector<Drill::FieldMetadata*>& getColDefs();
+ FieldDefPtr getColDefs();
/* Move the current pointer to the next record. */
status_t next();
@@ -148,26 +151,29 @@ class DECLSPEC_DRILL_CLIENT RecordIterator{
/* Cancels the query. */
status_t cancel();
- void registerSchemaChangeListener(pfnSchemaListener* l);
+ /* Returns true is the schem has changed from the previous record. Returns false for the first record. */
+ bool hasSchemaChanged();
+
+ void registerSchemaChangeListener(pfnSchemaListener l);
/*
* Returns the last error message
*/
- std::string& getError();
+ const std::string& getError();
private:
RecordIterator(DrillClientQueryResult* pResult){
this->m_currentRecord=-1;
this->m_pCurrentRecordBatch=NULL;
this->m_pQueryResult=pResult;
- m_pColDefs=NULL;
+ //m_pColDefs=NULL;
}
DrillClientQueryResult* m_pQueryResult;
size_t m_currentRecord;
RecordBatch* m_pCurrentRecordBatch;
- boost::mutex m_recordBatchMutex;
- std::vector<Drill::FieldMetadata*>* m_pColDefs; // Copy of the latest column defs made from the
+ boost::mutex m_recordBatchMutex;
+ FieldDefPtr m_pColDefs; // Copy of the latest column defs made from the
// first record batch with this definition
};
@@ -189,19 +195,19 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
void close() ;
/*
- * Submit a query asynchronously and wait for results to be returned thru a callback. A query context handle is passed
+ * Submit a query asynchronously and wait for results to be returned thru a callback. A query context handle is passed
* back. The listener callback will return the handle in the ctx parameter.
*/
- status_t submitQuery(exec::user::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle);
+ status_t submitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle);
/*
* Submit a query asynchronously and wait for results to be returned thru an iterator that returns
* results synchronously. The client app needs to call delete on the iterator when done.
*/
- RecordIterator* submitQuery(exec::user::QueryType t, const std::string& plan, DrillClientError* err);
+ RecordIterator* submitQuery(::exec::shared::QueryType t, const std::string& plan, DrillClientError* err);
- /*
- * The client application should call this function to wait for results if it has registered a
+ /*
+ * The client application should call this function to wait for results if it has registered a
* listener.
*/
void waitForResults();
@@ -212,10 +218,21 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
std::string& getError();
/*
- * Applications using the async query submit method should call freeQueryResources to free up resources
+ * Applications using the async query submit method can register a listener for schema changes
+ *
+ */
+ void registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l);
+
+ /*
+ * Applications using the async query submit method should call freeQueryResources to free up resources
* once the query is no longer being processed.
- * */
+ */
void freeQueryResources(QueryHandle_t* handle);
+
+ /*
+ * Applications using the sync query submit method should call freeQueryIterator to free up resources
+ * once the RecordIterator is no longer being processed.
+ */
void freeQueryIterator(RecordIterator** pIter){ delete *pIter; *pIter=NULL;};
private:
diff --git a/contrib/native/client/src/include/drill/protobuf/User.pb.h b/contrib/native/client/src/include/drill/protobuf/User.pb.h
index 3ed024963..982d34fbd 100644
--- a/contrib/native/client/src/include/drill/protobuf/User.pb.h
+++ b/contrib/native/client/src/include/drill/protobuf/User.pb.h
@@ -96,26 +96,6 @@ inline bool RpcType_Parse(
return ::google::protobuf::internal::ParseNamedEnum<RpcType>(
RpcType_descriptor(), name, value);
}
-enum QueryType {
- SQL = 1,
- LOGICAL = 2,
- PHYSICAL = 3
-};
-bool QueryType_IsValid(int value);
-const QueryType QueryType_MIN = SQL;
-const QueryType QueryType_MAX = PHYSICAL;
-const int QueryType_ARRAYSIZE = QueryType_MAX + 1;
-
-const ::google::protobuf::EnumDescriptor* QueryType_descriptor();
-inline const ::std::string& QueryType_Name(QueryType value) {
- return ::google::protobuf::internal::NameOfEnum(
- QueryType_descriptor(), value);
-}
-inline bool QueryType_Parse(
- const ::std::string& name, QueryType* value) {
- return ::google::protobuf::internal::ParseNamedEnum<QueryType>(
- QueryType_descriptor(), name, value);
-}
enum QueryResultsMode {
STREAM_FULL = 1
};
@@ -604,12 +584,12 @@ class RunQuery : public ::google::protobuf::Message {
inline ::exec::user::QueryResultsMode results_mode() const;
inline void set_results_mode(::exec::user::QueryResultsMode value);
- // optional .exec.user.QueryType type = 2;
+ // optional .exec.shared.QueryType type = 2;
inline bool has_type() const;
inline void clear_type();
static const int kTypeFieldNumber = 2;
- inline ::exec::user::QueryType type() const;
- inline void set_type(::exec::user::QueryType value);
+ inline ::exec::shared::QueryType type() const;
+ inline void set_type(::exec::shared::QueryType value);
// optional string plan = 3;
inline bool has_plan() const;
@@ -1456,7 +1436,7 @@ inline void RunQuery::set_results_mode(::exec::user::QueryResultsMode value) {
results_mode_ = value;
}
-// optional .exec.user.QueryType type = 2;
+// optional .exec.shared.QueryType type = 2;
inline bool RunQuery::has_type() const {
return (_has_bits_[0] & 0x00000002u) != 0;
}
@@ -1470,11 +1450,11 @@ inline void RunQuery::clear_type() {
type_ = 1;
clear_has_type();
}
-inline ::exec::user::QueryType RunQuery::type() const {
- return static_cast< ::exec::user::QueryType >(type_);
+inline ::exec::shared::QueryType RunQuery::type() const {
+ return static_cast< ::exec::shared::QueryType >(type_);
}
-inline void RunQuery::set_type(::exec::user::QueryType value) {
- assert(::exec::user::QueryType_IsValid(value));
+inline void RunQuery::set_type(::exec::shared::QueryType value) {
+ assert(::exec::shared::QueryType_IsValid(value));
set_has_type();
type_ = value;
}
@@ -1927,10 +1907,6 @@ inline const EnumDescriptor* GetEnumDescriptor< ::exec::user::RpcType>() {
return ::exec::user::RpcType_descriptor();
}
template <>
-inline const EnumDescriptor* GetEnumDescriptor< ::exec::user::QueryType>() {
- return ::exec::user::QueryType_descriptor();
-}
-template <>
inline const EnumDescriptor* GetEnumDescriptor< ::exec::user::QueryResultsMode>() {
return ::exec::user::QueryResultsMode_descriptor();
}
diff --git a/contrib/native/client/src/include/drill/recordBatch.hpp b/contrib/native/client/src/include/drill/recordBatch.hpp
index c40327bfd..4ed1e3139 100644
--- a/contrib/native/client/src/include/drill/recordBatch.hpp
+++ b/contrib/native/client/src/include/drill/recordBatch.hpp
@@ -21,6 +21,7 @@
#include <assert.h>
+#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <ostream>
@@ -45,7 +46,7 @@
#if __GNUC__ >= 4
#define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
#else
- #define DECLSPEC_DRILL_CLIENT
+ #define DECLSPEC_DRILL_CLIENT
#endif
#endif
@@ -55,7 +56,7 @@ namespace Drill {
class FieldBatch;
class ValueVectorBase;
-//TODO: The base classes for value vectors should have abstract functions instead of implementations
+//TODO: The base classes for value vectors should have abstract functions instead of implementations
//that return 'NOT IMPLEMENTED YET'
// A Read Only Sliced byte buffer
@@ -111,7 +112,7 @@ class SlicedByteBuf{
ByteBuf_t getSliceStart(){ return this->m_buffer+this->m_start;}
// accessor functions
- //
+ //
// TYPE getTYPE(size_t index){
// if(index>=m_length) return 0;
// return (TYPE) m_buffer[offset+index];
@@ -122,8 +123,8 @@ class SlicedByteBuf{
// Type T can only be an integer type
// Type T cannot be a struct of fixed size
// Because struct alignment is compiler dependent
- // we can end up with a struct size that is larger
- // than the buffer in the sliced buf.
+ // we can end up with a struct size that is larger
+ // than the buffer in the sliced buf.
assert((index + sizeof(T) <= this->m_length));
if(index + sizeof(T) <= this->m_length)
return *((T*)(this->m_buffer+this->m_start+index));
@@ -145,7 +146,7 @@ class SlicedByteBuf{
ByteBuf_t getAt(size_t index){
return this->m_buffer+m_start+index;
- }
+ }
bool getBit(size_t index){
// refer to BitVector.java http://bit.ly/Py1jof
@@ -202,7 +203,7 @@ class DECLSPEC_DRILL_CLIENT ValueVectorUnimplemented:public ValueVectorBase{
const char* get(size_t index) const { return 0;};
virtual void getValueAt(size_t index, char* buf, size_t nChars) const{
*buf=0; return;
- }
+ }
virtual uint32_t getSize(size_t index) const{ return 0;};
@@ -284,7 +285,7 @@ class DECLSPEC_DRILL_CLIENT ValueVectorBit:public ValueVectorFixedWidth{
template <int DECIMAL_DIGITS, int WIDTH_IN_BYTES, bool IS_SPARSE, int MAX_PRECISION = 0 >
class ValueVectorDecimal: public ValueVectorFixedWidth {
public:
- ValueVectorDecimal(SlicedByteBuf* b, size_t rowCount, int32_t scale):
+ ValueVectorDecimal(SlicedByteBuf* b, size_t rowCount, int32_t scale):
ValueVectorFixedWidth(b, rowCount),
m_scale(scale)
{
@@ -319,7 +320,7 @@ template <int DECIMAL_DIGITS, int WIDTH_IN_BYTES, bool IS_SPARSE, int MAX_PRECIS
template<typename VALUE_TYPE>
class ValueVectorDecimalTrivial: public ValueVectorFixedWidth {
public:
- ValueVectorDecimalTrivial(SlicedByteBuf* b, size_t rowCount, int32_t scale):
+ ValueVectorDecimalTrivial(SlicedByteBuf* b, size_t rowCount, int32_t scale):
ValueVectorFixedWidth(b, rowCount),
m_scale(scale)
{
@@ -329,7 +330,7 @@ template<typename VALUE_TYPE>
DecimalValue get(size_t index) const {
return DecimalValue(
m_pBuffer->readAt<VALUE_TYPE>(index * sizeof(VALUE_TYPE)),
- m_scale);
+ m_scale);
}
void getValueAt(size_t index, char* buf, size_t nChars) const {
@@ -355,7 +356,7 @@ template <typename VALUE_TYPE>
{
public:
NullableValueVectorFixed(SlicedByteBuf *b, size_t rowCount):ValueVectorBase(b, rowCount){
- size_t offsetEnd = rowCount/8 + 1;
+ size_t offsetEnd = (size_t)ceil(rowCount/8.0);
this->m_pBitmap= new SlicedByteBuf(*b, 0, offsetEnd);
this->m_pData= new SlicedByteBuf(*b, offsetEnd, b->getLength());
// TODO: testing boundary case(null columns)
@@ -372,7 +373,7 @@ template <typename VALUE_TYPE>
}
VALUE_TYPE get(size_t index) const {
- // it should not be called if the value is null
+ // it should not be called if the value is null
assert( "value is null" && !isNull(index));
return m_pData->readAt<VALUE_TYPE>(index * sizeof(VALUE_TYPE));
}
@@ -390,14 +391,14 @@ template <typename VALUE_TYPE>
return sizeof(VALUE_TYPE);
}
private:
- SlicedByteBuf* m_pBitmap;
+ SlicedByteBuf* m_pBitmap;
SlicedByteBuf* m_pData;
};
// The 'holder' classes are (by contract) simple structs with primitive members and no dynamic allocations.
-// The template classes create an instance of the class and return it to the caller in the 'get' routines.
-// The compiler will create a copy and return it to the caller. If the object is more complex than a struct of
-// primitives, the class _must_ provide a copy constructor.
+// The template classes create an instance of the class and return it to the caller in the 'get' routines.
+// The compiler will create a copy and return it to the caller. If the object is more complex than a struct of
+// primitives, the class _must_ provide a copy constructor.
// We don't really need a destructor here, but we declare a virtual dtor in the base class in case we ever get
// more complex and start doing dynamic allocations in these classes.
@@ -490,11 +491,11 @@ struct IntervalHolder{
};
/*
- * VALUEHOLDER_CLASS_TYPE is a struct with a constructor that takes a parameter of type VALUE_VECTOR_TYPE
+ * VALUEHOLDER_CLASS_TYPE is a struct with a constructor that takes a parameter of type VALUE_VECTOR_TYPE
* (a primitive type)
* VALUEHOLDER_CLASS_TYPE implements a toString function
- * Note that VALUEHOLDER_CLASS_TYPE is created on the stack and the copy returned in the get function.
- * So the class needs to have the appropriate copy constructor or the default bitwise copy should work
+ * Note that VALUEHOLDER_CLASS_TYPE is created on the stack and the copy returned in the get function.
+ * So the class needs to have the appropriate copy constructor or the default bitwise copy should work
* correctly.
*/
template <class VALUEHOLDER_CLASS_TYPE, typename VALUE_TYPE>
@@ -552,7 +553,7 @@ template <class VALUEHOLDER_CLASS_TYPE, class VALUE_VECTOR_TYPE>
public:
NullableValueVectorTyped(SlicedByteBuf *b, size_t rowCount):ValueVectorBase(b, rowCount){
- size_t offsetEnd = rowCount/8 + 1;
+ size_t offsetEnd = (size_t)ceil(rowCount/8.0);
this->m_pBitmap= new SlicedByteBuf(*b, 0, offsetEnd);
this->m_pData= new SlicedByteBuf(*b, offsetEnd, b->getLength()-offsetEnd);
this->m_pVector= new VALUE_VECTOR_TYPE(m_pData, rowCount);
@@ -575,7 +576,7 @@ template <class VALUEHOLDER_CLASS_TYPE, class VALUE_VECTOR_TYPE>
void getValueAt(size_t index, char* buf, size_t nChars) const{
std::stringstream sstr;
- if(this->isNull(index)){
+ if(this->isNull(index)){
sstr<<"NULL";
strncpy(buf, sstr.str().c_str(), nChars);
}else{
@@ -589,7 +590,7 @@ template <class VALUEHOLDER_CLASS_TYPE, class VALUE_VECTOR_TYPE>
}
private:
- SlicedByteBuf* m_pBitmap;
+ SlicedByteBuf* m_pBitmap;
SlicedByteBuf* m_pData;
VALUE_VECTOR_TYPE* m_pVector;
};
@@ -617,10 +618,10 @@ class DECLSPEC_DRILL_CLIENT ValueVectorVarWidth:public ValueVectorBase{
size_t endIdx = this->m_pOffsetArray->getUint32((index+1)*sizeof(uint32_t));
size_t length = endIdx - startIdx;
assert(length >= 0);
- // Return an object created on the stack. The compiler will return a
- // copy and destroy the stack object. The optimizer will hopefully
+ // Return an object created on the stack. The compiler will return a
+ // copy and destroy the stack object. The optimizer will hopefully
// elide this so we can return an object with no extra memory allocation
- // and no copies.(SEE: http://en.wikipedia.org/wiki/Return_value_optimization)
+ // and no copies.(SEE: http://en.wikipedia.org/wiki/Return_value_optimization)
VarWidthHolder dst;
dst.data=this->m_pData->getSliceStart()+startIdx;
dst.size=length;
@@ -673,9 +674,9 @@ class DECLSPEC_DRILL_CLIENT ValueVectorVarBinary:public ValueVectorVarWidth{
}
};
//
-//TODO: For windows, we have to export instantiations of the template class.
+//TODO: For windows, we have to export instantiations of the template class.
//see: http://msdn.microsoft.com/en-us/library/twa2aw10.aspx
-//for example:
+//for example:
//template class __declspec(dllexport) B<int>;
//class __declspec(dllexport) D : public B<int> { }
//
@@ -686,7 +687,7 @@ typedef NullableValueVectorTyped<int, ValueVectorBit > NullableValueVectorBit;
// Aliases for Decimal Types:
// The definitions for decimal digits, width, max precision are defined in
// /exec/java-exec/src/main/codegen/data/ValueVectorTypes.tdd
-//
+//
// Decimal9 and Decimal18 could be optimized, maybe write seperate classes?
typedef ValueVectorDecimalTrivial<int32_t> ValueVectorDecimal9;
typedef ValueVectorDecimalTrivial<int64_t> ValueVectorDecimal18;
@@ -778,7 +779,7 @@ class FieldBatch{
ret_t load();
const ValueVectorBase * getVector(){
- return m_pValueVector;
+ return m_pValueVector;
}
private:
@@ -795,33 +796,27 @@ class ValueVectorFactory{
class DECLSPEC_DRILL_CLIENT RecordBatch{
public:
- RecordBatch(exec::user::QueryResult* pResult, ByteBuf_t b){
- m_pQueryResult=pResult;
+
+ //m_allocatedBuffer is the memory block allocated to hold the incoming RPC message. Record BAtches operate on
+ //slices of the allcoated buffer. The first slice (the first Field Batch), begins at m_buffer. Data in the
+ //allocated buffer before m_buffer is mostly the RPC header, and the QueryResult object.
+ RecordBatch(exec::user::QueryResult* pResult, ByteBuf_t r, ByteBuf_t b)
+ :m_fieldDefs(new(std::vector<Drill::FieldMetadata*>)){
+ m_pQueryResult=pResult;
m_pRecordBatchDef=&pResult->def();
m_numRecords=pResult->row_count();
+ m_allocatedBuffer=r;
m_buffer=b;
m_numFields=pResult->def().field_size();
m_bHasSchemaChanged=false;
}
- ~RecordBatch(){
- m_buffer=NULL;
- //free memory allocated for FieldBatch objects saved in m_fields;
- for(std::vector<FieldBatch*>::iterator it = m_fields.begin(); it != m_fields.end(); ++it){
- delete *it;
- }
- m_fields.clear();
- for(std::vector<Drill::FieldMetadata*>::iterator it = m_fieldDefs.begin(); it != m_fieldDefs.end(); ++it){
- delete *it;
- }
- m_fieldDefs.clear();
- delete m_pQueryResult;
- }
+ ~RecordBatch();
// get the ith field metadata
const Drill::FieldMetadata& getFieldMetadata(size_t index){
//return this->m_pRecordBatchDef->field(index);
- return *(m_fieldDefs[index]);
+ return *(m_fieldDefs->at(index));
}
size_t getNumRecords(){ return m_numRecords;}
@@ -829,13 +824,13 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
size_t getNumFields() { return m_pRecordBatchDef->field_size(); }
bool isLastChunk() { return m_pQueryResult->is_last_chunk(); }
- std::vector<Drill::FieldMetadata*>& getColumnDefs(){ return m_fieldDefs;}
+ boost::shared_ptr<std::vector<Drill::FieldMetadata*> > getColumnDefs(){ return m_fieldDefs;}
- //
+ //
// build the record batch: i.e. fill up the value vectors from the buffer.
- // On fetching the data from the server, the caller creates a RecordBatch
- // object then calls build() to build the value vectors.The caller saves the
- // Record Batch and is responsible for freeing both the RecordBatch and the
+ // On fetching the data from the server, the caller creates a RecordBatch
+ // object then calls build() to build the value vectors.The caller saves the
+ // Record Batch and is responsible for freeing both the RecordBatch and the
// raw buffer memory
//
ret_t build();
@@ -843,7 +838,7 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
void print(std::ostream& s, size_t num);
const ValueVectorBase * getVector(size_t index){
- return m_fields[index]->getVector();
+ return m_fields[index]->getVector();
}
void schemaChanged(bool b){
@@ -858,9 +853,10 @@ class DECLSPEC_DRILL_CLIENT RecordBatch{
private:
const exec::user::QueryResult* m_pQueryResult;
const exec::shared::RecordBatchDef* m_pRecordBatchDef;
+ ByteBuf_t m_allocatedBuffer;
ByteBuf_t m_buffer;
//build the current schema out of the field metadata
- std::vector<Drill::FieldMetadata*> m_fieldDefs;
+ FieldDefPtr m_fieldDefs;
std::vector<FieldBatch*> m_fields;
size_t m_numFields;
size_t m_numRecords;