aboutsummaryrefslogtreecommitdiff
path: root/contrib/native/client/src/clientlib/rpcDecoder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/native/client/src/clientlib/rpcDecoder.cpp')
-rw-r--r--contrib/native/client/src/clientlib/rpcDecoder.cpp153
1 files changed, 0 insertions, 153 deletions
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp
deleted file mode 100644
index d3cf50cb0..000000000
--- a/contrib/native/client/src/clientlib/rpcDecoder.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <iostream>
-#include <google/protobuf/io/coded_stream.h>
-#include "drill/common.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
-#include "rpcMessage.hpp"
-
-namespace Drill{
-
-// return the number of bytes we have read
-int RpcDecoder::LengthDecode(const uint8_t* buf, uint32_t* p_length) {
-
- using google::protobuf::io::CodedInputStream;
-
- // read the frame to get the length of the message and then
-
- CodedInputStream* cis = new CodedInputStream(buf, 5); // read 5 bytes at most
-
- int pos0 = cis->CurrentPosition(); // for debugging
- cis->ReadVarint32(p_length);
-
- #ifdef CODER_DEBUG
- cerr << "p_length = " << *p_length << endl;
- #endif
-
- int pos1 = cis->CurrentPosition();
-
- #ifdef CODER_DEBUG
- cerr << "Reading full length " << *p_length << endl;
- #endif
- assert( (pos1-pos0) == getRawVarintSize(*p_length));
- delete cis;
- return (pos1-pos0);
-}
-
-// TODO: error handling
-//
-// - assume that the entire message is in the buffer and the buffer is constrained to this message
-// - easy to handle with raw arry in C++
-int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
- using google::protobuf::io::CodedInputStream;
-
- // if(!ctx.channel().isOpen()){ return; }
-
- #ifdef EXTRA_DEBUGGING
- std::cerr << "\nInbound rpc message received." << std::endl;
- #endif
-
- CodedInputStream* cis = new CodedInputStream(buf, length);
-
-
- int pos0 = cis->CurrentPosition(); // for debugging
-
- int len_limit = cis->PushLimit(length);
-
- uint32_t header_length = 0;
- cis->ExpectTag(RpcEncoder::HEADER_TAG);
- cis->ReadVarint32(&header_length);
-
- #ifdef CODER_DEBUG
- cerr << "Reading header length " << header_length << ", post read index " << cis->CurrentPosition() << endl;
- #endif
-
- exec::rpc::RpcHeader header;
- int header_limit = cis->PushLimit(header_length);
- header.ParseFromCodedStream(cis);
- cis->PopLimit(header_limit);
- msg.m_has_mode = header.has_mode();
- msg.m_mode = header.mode();
- msg.m_coord_id = header.coordination_id();
- msg.m_has_rpc_type = header.has_rpc_type();
- msg.m_rpc_type = header.rpc_type();
-
- //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());
-
- // read the protobuf body into a buffer.
- cis->ExpectTag(RpcEncoder::PROTOBUF_BODY_TAG);
- uint32_t p_body_length = 0;
- cis->ReadVarint32(&p_body_length);
-
- #ifdef CODER_DEBUG
- cerr << "Reading protobuf body length " << p_body_length << ", post read index " << cis->CurrentPosition() << endl;
- #endif
-
- msg.m_pbody.resize(p_body_length);
- cis->ReadRaw(msg.m_pbody.data(),p_body_length);
-
-
- // read the data body.
- if (cis->BytesUntilLimit() > 0 ) {
- #ifdef CODER_DEBUG
- cerr << "Reading raw body, buffer has "<< cis->BytesUntilLimit() << " bytes available, current possion "<< cis->CurrentPosition() << endl;
- #endif
- cis->ExpectTag(RpcEncoder::RAW_BODY_TAG);
- uint32_t d_body_length = 0;
- cis->ReadVarint32(&d_body_length);
-
- if(cis->BytesUntilLimit() != d_body_length) {
- #ifdef CODER_DEBUG
- cerr << "Expected to receive a raw body of " << d_body_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl;
- #endif
- }
- //msg.m_dbody.resize(d_body_length);
- //cis->ReadRaw(msg.m_dbody.data(), d_body_length);
- uint32_t currPos=cis->CurrentPosition();
- cis->GetDirectBufferPointer((const void**)&msg.m_dbody, (int*)&d_body_length);
- assert(msg.m_dbody==buf+currPos);
- cis->Skip(d_body_length);
- #ifdef CODER_DEBUG
- cerr << "Read raw body of " << d_body_length << " bytes" << endl;
- #endif
- } else {
- #ifdef CODER_DEBUG
- cerr << "No need to read raw body, no readable bytes left." << endl;
- #endif
- }
- cis->PopLimit(len_limit);
-
-
- // return the rpc message.
- // move the reader index forward so the next rpc call won't try to work with it.
- // buffer.skipBytes(dBodyLength);
- // messageCounter.incrementAndGet();
- #ifdef CODER_DEBUG
- cerr << "Inbound Rpc Message Decoded " << msg << endl;
- #endif
-
- int pos1 = cis->CurrentPosition();
- assert((pos1-pos0) == length);
- delete cis;
- return (pos1-pos0);
-}
-
-}//namespace Drill