This reverts commit 120f30bcaec5ba8318ba1849b513b5d06d8df281.
This commit is contained in:
@ -164,7 +164,7 @@ private:
|
||||
int64_t _packet_seq;
|
||||
|
||||
// we're accumulating rows into this batch
|
||||
std::unique_ptr<RowBatch> _batch;
|
||||
boost::scoped_ptr<RowBatch> _batch;
|
||||
|
||||
bool _need_close;
|
||||
int _be_number;
|
||||
@ -228,9 +228,6 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
|
||||
|
||||
_brpc_request.set_eos(eos);
|
||||
if (batch != nullptr) {
|
||||
butil::IOBuf& io_buf = _closure->cntl.request_attachment();
|
||||
io_buf.append(batch->tuple_data());
|
||||
batch->set_tuple_data(""); // to padding the required tuple_data field in PB
|
||||
_brpc_request.set_allocated_row_batch(batch);
|
||||
}
|
||||
_brpc_request.set_packet_seq(_packet_seq++);
|
||||
@ -273,7 +270,12 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) {
|
||||
}
|
||||
|
||||
Status DataStreamSender::Channel::send_current_batch(bool eos) {
|
||||
_parent->serialize_batch(_batch.get(), &_pb_batch, 1);
|
||||
{
|
||||
SCOPED_TIMER(_parent->_serialize_batch_timer);
|
||||
int uncompressed_bytes = _batch->serialize(&_pb_batch);
|
||||
COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch));
|
||||
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
|
||||
}
|
||||
_batch->reset();
|
||||
RETURN_IF_ERROR(send_batch(&_pb_batch, eos));
|
||||
return Status::OK();
|
||||
|
||||
@ -56,4 +56,3 @@
|
||||
#include <brpc/closure_guard.h>
|
||||
#include <brpc/reloadable_flags.h>
|
||||
#include <brpc/protocol.h>
|
||||
#include <butil/strings/string_piece.h>
|
||||
|
||||
@ -49,13 +49,6 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
|
||||
google::protobuf::Closure* done) {
|
||||
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
|
||||
<< " node=" << request->node_id();
|
||||
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
|
||||
if (cntl->request_attachment().size() > 0) {
|
||||
PRowBatch* batch = (const_cast<PTransmitDataParams*>(request))->mutable_row_batch();
|
||||
butil::IOBuf& io_buf = cntl->request_attachment();
|
||||
std::string* tuple_data = batch->mutable_tuple_data();
|
||||
io_buf.copy_to(tuple_data);
|
||||
}
|
||||
_exec_env->stream_mgr()->transmit_data(request, &done);
|
||||
if (done != nullptr) {
|
||||
done->Run();
|
||||
|
||||
Reference in New Issue
Block a user