diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 384e7606f9..73d642d677 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -164,7 +164,7 @@ private: int64_t _packet_seq; // we're accumulating rows into this batch - std::unique_ptr _batch; + boost::scoped_ptr _batch; bool _need_close; int _be_number; @@ -228,9 +228,6 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _brpc_request.set_eos(eos); if (batch != nullptr) { - butil::IOBuf& io_buf = _closure->cntl.request_attachment(); - io_buf.append(batch->tuple_data()); - batch->set_tuple_data(""); // to padding the required tuple_data field in PB _brpc_request.set_allocated_row_batch(batch); } _brpc_request.set_packet_seq(_packet_seq++); @@ -273,7 +270,12 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) { } Status DataStreamSender::Channel::send_current_batch(bool eos) { - _parent->serialize_batch(_batch.get(), &_pb_batch, 1); + { + SCOPED_TIMER(_parent->_serialize_batch_timer); + int uncompressed_bytes = _batch->serialize(&_pb_batch); + COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch)); + COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes); + } _batch->reset(); RETURN_IF_ERROR(send_batch(&_pb_batch, eos)); return Status::OK(); diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index c9325d31e8..d3fa30f481 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -56,4 +56,3 @@ #include #include #include -#include diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 07372ba01e..68dfdfd638 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -49,13 +49,6 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt google::protobuf::Closure* done) { VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); - brpc::Controller* cntl = static_cast(cntl_base); - if (cntl->request_attachment().size() > 0) { - PRowBatch* batch = (const_cast(request))->mutable_row_batch(); - butil::IOBuf& io_buf = cntl->request_attachment(); - std::string* tuple_data = batch->mutable_tuple_data(); - io_buf.copy_to(tuple_data); - } _exec_env->stream_mgr()->transmit_data(request, &done); if (done != nullptr) { done->Run();