/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "drill/common.hpp" #include "drill/drillClient.hpp" #include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "drill/userProperties.hpp" #include "drillClientImpl.hpp" #include "env.h" #include "errmsgs.hpp" #include "logger.hpp" #include "Types.pb.h" namespace Drill{ DrillClientInitializer::DrillClientInitializer(){ GOOGLE_PROTOBUF_VERIFY_VERSION; srand(time(NULL)); } DrillClientInitializer::~DrillClientInitializer(){ google::protobuf::ShutdownProtobufLibrary(); } RecordIterator::~RecordIterator(){ if(m_pColDefs!=NULL){ for(std::vector::iterator it=m_pColDefs->begin(); it!=m_pColDefs->end(); ++it){ delete *it; } } delete this->m_pQueryResult; this->m_pQueryResult=NULL; if(this->m_pCurrentRecordBatch!=NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL; } } FieldDefPtr RecordIterator::getColDefs(){ if(m_pQueryResult->hasError()){ return DrillClientQueryResult::s_emptyColDefs; } if (this->m_pColDefs != NULL && !this->hasSchemaChanged()) { return this->m_pColDefs; } //NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it. if(this->m_pCurrentRecordBatch==NULL){ this->m_pQueryResult->waitForData(); if(m_pQueryResult->hasError()){ return DrillClientQueryResult::s_emptyColDefs; } } if(this->hasSchemaChanged()){ if(m_pColDefs!=NULL){ for(std::vector::iterator it=m_pColDefs->begin(); it!=m_pColDefs->end(); ++it){ delete *it; } m_pColDefs->clear(); //delete m_pColDefs; m_pColDefs=NULL; } } FieldDefPtr pColDefs( new std::vector); { //lock after we come out of the wait. boost::lock_guard bufferLock(this->m_recordBatchMutex); boost::shared_ptr< std::vector > currentColDefs=DrillClientQueryResult::s_emptyColDefs; if(this->m_pCurrentRecordBatch!=NULL){ currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs(); }else{ // This is reached only when the first results have been received but // the getNext call has not been made to retrieve the record batch RecordBatch* pR=this->m_pQueryResult->peekNext(); if(pR!=NULL){ currentColDefs=pR->getColumnDefs(); } } for(std::vector::const_iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){ Drill::FieldMetadata* fmd= new Drill::FieldMetadata; fmd->copy(*(*it));//Yup, that's 2 stars pColDefs->push_back(fmd); } } this->m_pColDefs = pColDefs; return this->m_pColDefs; } status_t RecordIterator::next(){ status_t ret=QRY_SUCCESS; this->m_currentRecord++; if(this->m_pQueryResult->isCancelled()){ return QRY_CANCEL; } if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){ boost::lock_guard bufferLock(this->m_recordBatchMutex); if(this->m_pCurrentRecordBatch !=NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl;) delete this->m_pCurrentRecordBatch; //free the previous record batch this->m_pCurrentRecordBatch=NULL; } this->m_currentRecord=0; this->m_pQueryResult->waitForData(); if(m_pQueryResult->hasError()){ return m_pQueryResult->getErrorStatus(); } this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); if(this->m_pCurrentRecordBatch != NULL){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl;) }else{ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl;) } if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){ DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more data." << std::endl;) ret = QRY_NO_MORE_DATA; }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){ ret=QRY_SUCCESS_WITH_INFO; } } return ret; } /* Gets the ith column in the current record. */ status_t RecordIterator::getCol(size_t i, void** b, size_t* sz){ //TODO: check fields out of bounds without calling getColDefs //if(i>=getColDefs().size()) return QRY_OUT_OF_BOUNDS; //return raw byte buffer if(this->m_pQueryResult->isCancelled()){ return QRY_CANCEL; } const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); if(!pVector->isNull(this->m_currentRecord)){ *b=pVector->getRaw(this->m_currentRecord); *sz=pVector->getSize(this->m_currentRecord); }else{ *b=NULL; *sz=0; } return QRY_SUCCESS; } /* true if ith column in the current record is NULL. */ bool RecordIterator::isNull(size_t i){ if(this->m_pQueryResult->isCancelled()){ return false; } const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); return pVector->isNull(this->m_currentRecord); } status_t RecordIterator::cancel(){ this->m_pQueryResult->cancel(); return QRY_CANCEL; } bool RecordIterator::hasSchemaChanged(){ return m_currentRecord==0 && m_pCurrentRecordBatch!=NULL && m_pCurrentRecordBatch->hasSchemaChanged(); } void RecordIterator::registerSchemaChangeListener(pfnSchemaListener l){ assert(m_pQueryResult!=NULL); this->m_pQueryResult->registerSchemaChangeListener(l); } bool RecordIterator::hasError(){ return m_pQueryResult->hasError(); } const std::string& RecordIterator::getError(){ return m_pQueryResult->getError()->msg; } DrillClientInitializer DrillClient::s_init; DrillClientConfig DrillClient::s_config; void DrillClient::initLogging(const char* path, logLevel_t l){ if(path!=NULL) s_config.initLogging(path); s_config.setLogLevel(l); } DrillClient::DrillClient(){ const char* enablePooledClient=std::getenv(ENABLE_CONNECTION_POOL_ENV); if(enablePooledClient!=NULL && atoi(enablePooledClient)!=0){ this->m_pImpl=new PooledDrillClientImpl; }else{ this->m_pImpl=new DrillClientImpl; } } DrillClient::~DrillClient(){ delete this->m_pImpl; } connectionStatus_t DrillClient::connect(const char* connectStr, const char* defaultSchema){ DrillUserProperties props; std::string schema(defaultSchema); props.setProperty(USERPROP_SCHEMA, schema); if (defaultSchema != NULL) { return connect(connectStr, static_cast(NULL)); } else { return connect(connectStr, &props); } } connectionStatus_t DrillClient::connect(const char* connectStr, DrillUserProperties* properties){ connectionStatus_t ret=CONN_SUCCESS; ret=this->m_pImpl->connect(connectStr, properties); if(ret==CONN_SUCCESS){ ret=this->m_pImpl->validateHandshake(properties); } return ret; } bool DrillClient::isActive(){ return this->m_pImpl->Active(); } void DrillClient::close() { this->m_pImpl->Close(); } status_t DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){ ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener, listenerCtx); *qHandle=static_cast(pResult); if(pResult==NULL){ return (status_t)this->m_pImpl->getError()->status; } return QRY_SUCCESS; } RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, DrillClientError* err){ RecordIterator* pIter=NULL; ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, NULL, NULL); if(pResult){ pIter=new RecordIterator(pResult); } return pIter; } status_t DrillClient::prepareQuery(const std::string& sql, pfnPreparedStatementListener listener, void* listenerCtx, QueryHandle_t* qHandle) { DrillClientPrepareHandle* pResult=this->m_pImpl->PrepareQuery(sql, listener, listenerCtx); *qHandle=static_cast(pResult); if(pResult==NULL){ return static_cast(this->m_pImpl->getError()->status); } return QRY_SUCCESS; } status_t DrillClient::executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle) { DrillClientQueryResult* pResult=this->m_pImpl->ExecuteQuery(pstmt, listener, listenerCtx); *qHandle=static_cast(pResult); if(pResult==NULL){ return static_cast(this->m_pImpl->getError()->status); } return QRY_SUCCESS; } void DrillClient::cancelQuery(QueryHandle_t handle) { if (!handle) { return; } DrillClientQueryHandle* pHandle = static_cast(handle); pHandle->cancel(); } void* DrillClient::getApplicationContext(QueryHandle_t handle){ assert(handle!=NULL); return (static_cast(handle))->getApplicationContext(); } status_t DrillClient::getQueryStatus(QueryHandle_t handle){ assert(handle!=NULL); return static_cast(handle)->getQueryStatus(); } std::string& DrillClient::getError(){ return m_pImpl->getError()->msg; } const std::string& DrillClient::getError(QueryHandle_t handle){ return static_cast(handle)->getError()->msg; } void DrillClient::waitForResults(){ this->m_pImpl->waitForResults(); } void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){ if (!handle) { return; } // Let's ensure that handle is really an instance of DrillClientQueryResult // by using dynamic_cast to verify. Since void is not a class, we first have // to static_cast to a DrillClientQueryHandle DrillClientQueryHandle* pHandle = static_cast(*handle); DrillClientQueryResult* result = dynamic_cast(pHandle); if (result) { result->registerSchemaChangeListener(l); } } void DrillClient::freeQueryResources(QueryHandle_t* handle){ this->m_pImpl->freeQueryResources(static_cast(*handle)); *handle=NULL; } void DrillClient::freeRecordBatch(RecordBatch* pRecordBatch){ delete pRecordBatch; } Metadata* DrillClient::getMetadata() { return this->m_pImpl->getMetadata(); } void DrillClient::freeMetadata(Metadata** metadata) { this->m_pImpl->freeMetadata(static_cast(*metadata)); *metadata = NULL; } } // namespace Drill