[fix](brpc-attachment) Fix bug that may cause BE crash when enable transfer_data_by_brpc_attachment (#7921)

This PR mainly changes:

1. Fix bug when enable `transfer_data_by_brpc_attachment`

    In `data_stream_sender`, we will send a serialized PRowBatch data to multiple Channels.
    And if `transfer_data_by_brpc_attachment` is enabled, we will mistakenly clear the data in PRowBatch
    after sending PRowBatch to the first Channel.
    As a result, the following Channel cannot receive the correct data, causing an error.

    So I use a separate buffer instead of `tuple_data` in PRowBatch to store the serialized data
    and reuse it in multiple channels.

2. Fix bug that the the offset in serialized row batch may overflow

    Use int64 to replace int32 offset. And for compatibility, add a new field `new_tuple_offsets` in PRowBatch.
This commit is contained in:
Mingyu Chen
2022-02-01 08:51:16 +08:00
committed by GitHub
parent 58ad8b7ec9
commit 82f421a019
18 changed files with 206 additions and 343 deletions

View File

@ -44,7 +44,12 @@ namespace stream_load {
NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id,
int32_t schema_hash)
: _parent(parent), _index_channel(index_channel), _node_id(node_id), _schema_hash(schema_hash) {}
: _parent(parent), _index_channel(index_channel), _node_id(node_id), _schema_hash(schema_hash) {
if (_parent->_transfer_data_by_brpc_attachment) {
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
}
NodeChannel::~NodeChannel() {
if (_open_closure != nullptr) {
@ -447,10 +452,16 @@ void NodeChannel::try_send_batch() {
request.set_packet_seq(_next_packet_seq);
if (row_batch->num_rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
row_batch->serialize(request.mutable_row_batch());
if (request.row_batch().ByteSizeLong() >= double(config::brpc_max_body_size) * 0.95f) {
size_t uncompressed_bytes = 0, compressed_bytes = 0;
Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes, &compressed_bytes, _tuple_data_buffer_ptr);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
return;
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
LOG(WARNING) << "send batch too large, this rpc may failed. send size: "
<< request.row_batch().ByteSizeLong() << ", " << channel_info();
<< compressed_bytes << ", threshold: " << config::brpc_max_body_size
<< ", " << channel_info();
}
}
@ -459,6 +470,7 @@ void NodeChannel::try_send_batch() {
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
return;
} else {
remain_ms = config::min_load_rpc_timeout_ms;
}
@ -479,9 +491,11 @@ void NodeChannel::try_send_batch() {
DCHECK(_pending_batches_num == 0);
}
request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
ReusableClosure<PTabletWriterAddBatchResult>>(
&request, _add_batch_closure);
if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) {
request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
ReusableClosure<PTabletWriterAddBatchResult>>(
&request, _tuple_data_buffer, _add_batch_closure);
}
_add_batch_closure->set_in_flight();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
_add_batch_closure);
@ -625,6 +639,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
*status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs);
}
_name = "OlapTableSink";
_transfer_data_by_brpc_attachment = config::transfer_data_by_brpc_attachment;
}
OlapTableSink::~OlapTableSink() {

View File

@ -265,6 +265,15 @@ private:
std::atomic<int64_t> _mem_exceeded_block_ns {0};
std::atomic<int64_t> _queue_push_lock_ns {0};
std::atomic<int64_t> _actual_consume_ns {0};
// buffer for saving serialized row batch data.
// In the non-attachment approach, we need to use two PRowBatch structures alternately
// so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch.
// This is not necessary with the attachment approach, because the memory structures
// are already copied into attachment memory before sending, and will wait for
// the previous RPC to be fully completed before the next copy.
std::string _tuple_data_buffer;
std::string* _tuple_data_buffer_ptr = nullptr;
};
class IndexChannel {
@ -448,6 +457,9 @@ protected:
bool _is_closed = false;
// Save the status of close() method
Status _close_status;
// TODO(cmy): this should be removed after we switch to rpc attachment by default.
bool _transfer_data_by_brpc_attachment = false;
};
} // namespace stream_load

View File

@ -62,11 +62,11 @@ DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_be_number(0),
_brpc_dest_addr(brpc_dest),
_ch_cur_pb_batch(&_ch_pb_batch1),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
std::string localhost = BackendOptions::get_localhost();
@ -146,9 +146,12 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
_closure->ref();
_closure->cntl.set_timeout_ms(_brpc_timeout_ms);
request_row_batch_transfer_attachment<PTransmitDataParams,
RefCountClosure<PTransmitDataResult>>(&_brpc_request,
_closure);
if (_parent->_transfer_data_by_brpc_attachment && _brpc_request.has_row_batch()) {
request_row_batch_transfer_attachment<PTransmitDataParams,
RefCountClosure<PTransmitDataResult>>(&_brpc_request, _parent->_tuple_data_buffer,
_closure);
}
_brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
if (batch != nullptr) {
_brpc_request.release_row_batch();
@ -190,17 +193,17 @@ Status DataStreamSender::Channel::send_current_batch(bool eos) {
if (is_local()) {
return send_local_batch(eos);
}
{
SCOPED_TIMER(_parent->_serialize_batch_timer);
size_t uncompressed_bytes = _batch->serialize(&_pb_batch);
COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch));
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
}
RETURN_IF_ERROR(_parent->serialize_batch(_batch.get(), _ch_cur_pb_batch));
_batch->reset();
RETURN_IF_ERROR(send_batch(&_pb_batch, eos));
RETURN_IF_ERROR(send_batch(_ch_cur_pb_batch, eos));
ch_roll_pb_batch();
return Status::OK();
}
void DataStreamSender::Channel::ch_roll_pb_batch() {
_ch_cur_pb_batch = (_ch_cur_pb_batch == &_ch_pb_batch1 ? &_ch_pb_batch2 : &_ch_pb_batch1);
}
Status DataStreamSender::Channel::send_local_batch(bool eos) {
std::shared_ptr<DataStreamRecvr> recvr = _parent->state()->exec_env()->stream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id);
@ -264,12 +267,18 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
: _row_desc(row_desc),
_current_pb_batch(&_pb_batch1),
_cur_pb_batch(&_pb_batch1),
_pool(pool),
_sender_id(sender_id),
_serialize_batch_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr) {}
_local_bytes_send_counter(nullptr),
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
if (_transfer_data_by_brpc_attachment) {
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
}
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
@ -278,7 +287,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
bool send_query_statistics_with_every_batch)
: _row_desc(row_desc),
_profile(nullptr),
_current_pb_batch(&_pb_batch1),
_cur_pb_batch(&_pb_batch1),
_pool(pool),
_sender_id(sender_id),
_serialize_batch_timer(nullptr),
@ -287,7 +296,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
_current_channel_idx(0),
_part_type(sink.output_partition.type),
_ignore_not_found(sink.__isset.ignore_not_found ? sink.ignore_not_found : true),
_dest_node_id(sink.dest_node_id) {
_dest_node_id(sink.dest_node_id),
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
if (_transfer_data_by_brpc_attachment) {
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@ -440,15 +455,16 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
RETURN_IF_ERROR(channel->send_local_batch(batch, false));
}
} else {
RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch, _channels.size()));
RETURN_IF_ERROR(serialize_batch(batch, _cur_pb_batch, _channels.size()));
for (auto channel : _channels) {
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_batch(batch, false));
} else {
RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
RETURN_IF_ERROR(channel->send_batch(_cur_pb_batch));
}
}
_current_pb_batch = (_current_pb_batch == &_pb_batch1 ? &_pb_batch2 : &_pb_batch1);
// rollover
_roll_pb_batch();
}
} else if (_part_type == TPartitionType::RANDOM) {
// Round-robin batches among channels. Wait for the current channel to finish its
@ -457,8 +473,9 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
if (current_channel->is_local()) {
RETURN_IF_ERROR(current_channel->send_local_batch(batch, false));
} else {
RETURN_IF_ERROR(serialize_batch(batch, current_channel->pb_batch()));
RETURN_IF_ERROR(current_channel->send_batch(current_channel->pb_batch()));
RETURN_IF_ERROR(serialize_batch(batch, current_channel->ch_cur_pb_batch()));
RETURN_IF_ERROR(current_channel->send_batch(current_channel->ch_cur_pb_batch()));
current_channel->ch_roll_pb_batch();
}
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED) {
@ -520,6 +537,10 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
return Status::OK();
}
void DataStreamSender::_roll_pb_batch() {
_cur_pb_batch = (_cur_pb_batch == &_pb_batch1 ? &_pb_batch2 : &_pb_batch1);
}
int DataStreamSender::binary_find_partition(const PartRangeKey& key) const {
int low = 0;
int high = _partition_infos.size() - 1;
@ -643,38 +664,16 @@ Status DataStreamSender::close(RuntimeState* state, Status exec_status) {
return final_st;
}
template <typename T>
Status DataStreamSender::serialize_batch(RowBatch* src, T* dest, int num_receivers) {
VLOG_ROW << "serializing " << src->num_rows() << " rows";
Status DataStreamSender::serialize_batch(RowBatch* src, PRowBatch* dest, int num_receivers) {
{
// TODO(zc)
// SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_serialize_batch_timer);
// TODO(zc)
// RETURN_IF_ERROR(src->serialize(dest));
size_t uncompressed_bytes = src->serialize(dest);
size_t bytes = RowBatch::get_batch_size(*dest);
// TODO(zc)
// int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size;
// The size output_batch would be if we didn't compress tuple_data (will be equal to
// actual batch size if tuple_data isn't compressed)
COUNTER_UPDATE(_bytes_sent_counter, bytes * num_receivers);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, _tuple_data_buffer_ptr));
COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
}
return Status::OK();
}
int64_t DataStreamSender::get_num_data_bytes_sent() const {
// TODO: do we need synchronization here or are reads & writes to 8-byte ints
// atomic?
int64_t result = 0;
for (int i = 0; i < _channels.size(); ++i) {
result += _channels[i]->num_data_bytes_sent();
}
return result;
}
} // namespace doris

View File

@ -95,13 +95,10 @@ public:
/// Serializes the src batch into the dest thrift batch. Maintains metrics.
/// num_receivers is the number of receivers this batch will be sent to. Only
/// used to maintain metrics.
template <class T>
Status serialize_batch(RowBatch* src, T* dest, int num_receivers = 1);
Status serialize_batch(RowBatch* src, PRowBatch* dest, int num_receivers = 1);
// Return total number of bytes sent in TRowBatch.data. If batches are
// Return total number of bytes sent in RowBatch.data. If batches are
// broadcast to multiple receivers, they are counted once per receiver.
int64_t get_num_data_bytes_sent() const;
virtual RuntimeProfile* profile() { return _profile; }
RuntimeState* state() { return _state; }
@ -112,7 +109,7 @@ protected:
// to a single destination ipaddress/node.
// It has a fixed-capacity buffer and allows the caller either to add rows to
// that buffer individually (AddRow()), or circumvent the buffer altogether and send
// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
// PRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
// at any one time (ie, sending will block if the most recent rpc hasn't finished,
// which allows the receiver node to throttle the sender by withholding acks).
// *Not* thread-safe.
@ -151,9 +148,7 @@ protected:
// Get close wait's response, to finish channel close operation.
Status close_wait(RuntimeState* state);
int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
PRowBatch* pb_batch() { return &_pb_batch; }
PRowBatch* ch_cur_pb_batch() { return _ch_cur_pb_batch; }
std::string get_fragment_instance_id_str() {
UniqueId uid(_fragment_instance_id);
@ -182,6 +177,8 @@ protected:
// Returns send_batch() status.
Status send_current_batch(bool eos = false);
Status close_internal();
// this must be called after calling `send_batch()`
void ch_roll_pb_batch();
DataStreamSender* _parent;
int _buffer_size;
@ -190,8 +187,6 @@ protected:
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
// the number of TRowBatch.data bytes sent successfully
int64_t _num_data_bytes_sent;
int64_t _packet_seq;
// we're accumulating rows into this batch
@ -204,7 +199,15 @@ protected:
// TODO(zc): initused for brpc
PUniqueId _finst_id;
PRowBatch _pb_batch;
// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent.
// Which is for same reason as `_cur_pb_batch`, `_pb_batch1` and `_pb_batch2`
// in DataStreamSender.
PRowBatch* _ch_cur_pb_batch;
PRowBatch _ch_pb_batch1;
PRowBatch _ch_pb_batch2;
PTransmitDataParams _brpc_request;
std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
@ -216,7 +219,7 @@ protected:
};
RuntimeProfile* _profile; // Allocated from _pool
PRowBatch* _current_pb_batch;
PRowBatch* _cur_pb_batch;
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool* _pool;
// Sender instance id, unique within a fragment.
@ -242,6 +245,8 @@ private:
Status process_distribute(RuntimeState* state, TupleRow* row, const PartitionInfo* part,
size_t* hash_val);
void _roll_pb_batch();
int _current_channel_idx; // index of current channel to send to if _random == true
TPartitionType::type _part_type;
@ -252,6 +257,14 @@ private:
PRowBatch _pb_batch1;
PRowBatch _pb_batch2;
// This buffer is used to store the serialized rowbatch data.
// Only works when `config::transfer_data_by_brpc_attachment` is true.
// The data in the buffer is copied to the attachment of the brpc when it is sent,
// to avoid an extra pb serialization in the brpc.
// _tuple_data_buffer_ptr will point to _tuple_data_buffer if `config::transfer_data_by_brpc_attachment` is true.
std::string _tuple_data_buffer;
std::string* _tuple_data_buffer_ptr = nullptr;
std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row partition values
// map from range value to partition_id
@ -265,6 +278,8 @@ private:
// Identifier of the destination plan node.
PlanNodeId _dest_node_id;
bool _transfer_data_by_brpc_attachment = false;
};
} // namespace doris

View File

@ -246,7 +246,7 @@ private:
class TupleDescriptor {
public:
// virtual ~TupleDescriptor() {}
int byte_size() const { return _byte_size; }
int64_t byte_size() const { return _byte_size; }
int num_materialized_slots() const { return _num_materialized_slots; }
int num_null_slots() const { return _num_null_slots; }
int num_null_bytes() const { return _num_null_bytes; }
@ -289,7 +289,7 @@ private:
const TupleId _id;
TableDescriptor* _table_desc;
int _byte_size;
int64_t _byte_size;
int _num_null_slots;
int _num_null_bytes;
int _num_materialized_slots;

View File

@ -48,9 +48,6 @@ class TupleRow;
// used to push data to blocking queue
class MemoryScratchSink : public DataSink {
public:
// construct a buffer for the result need send to blocking queue.
// row_desc used for convert RowBatch to TRowBatch
// buffer_size is the buffer size allocated to each scan
MemoryScratchSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TMemoryScratchSink& sink);

View File

@ -160,9 +160,9 @@ Status ResultFileSink::close(RuntimeState* state, Status exec_status) {
state->fragment_instance_id());
} else {
if (final_status.ok()) {
RETURN_IF_ERROR(serialize_batch(_output_batch, _current_pb_batch, _channels.size()));
RETURN_IF_ERROR(serialize_batch(_output_batch, _cur_pb_batch, _channels.size()));
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
RETURN_IF_ERROR(channel->send_batch(_cur_pb_batch));
}
}
Status final_st = Status::OK();

View File

@ -39,9 +39,6 @@ class ResultFileOptions;
class ResultFileSink : public DataStreamSender {
public:
// construct a buffer for the result need send to coordinator.
// row_desc used for convert RowBatch to TRowBatch
// buffer_size is the buffer size allocated to each query
ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TResultFileSink& sink);
ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,

View File

@ -42,9 +42,6 @@ class VExprContext;
class ResultSink : public DataSink {
public:
// construct a buffer for the result need send to coordinator.
// row_desc used for convert RowBatch to TRowBatch
// buffer_size is the buffer size allocated to each query
ResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TResultSink& sink, int buffer_size);
virtual ~ResultSink();

View File

@ -118,11 +118,26 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (auto offset : input_batch.tuple_offsets()) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
// For historical reasons, the original offset was stored using int32,
// so taht if a rowbatch is larger than 2GB, the passed offset may generate an error due to value overflow.
// So in the new version, a new_tuple_offsets structure is added to store offsets using int64.
// Here, to maintain compatibility, both versions of offsets are used, with preference given to new_tuple_offsets.
// TODO(cmy): in the next version, the original tuple_offsets should be removed.
if (input_batch.new_tuple_offsets_size() > 0) {
for (int64_t offset : input_batch.new_tuple_offsets()) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
} else {
for (int32_t offset : input_batch.tuple_offsets()) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
}
@ -200,138 +215,6 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
}
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
// global runtime memory segment; how do we get thrift to allocate it from there?
// maybe change line (in Data_types.cc generated from Data.thrift)
// xfer += iprot->readString(this->tuple_data[_i9]);
// to allocated string data in special mempool
// (change via python script that runs over Data_types.cc)
RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* tracker)
: _mem_tracker(tracker),
_has_in_flight_row(false),
_num_rows(input_batch.num_rows),
_num_uncommitted_rows(0),
_capacity(_num_rows),
_flush(FlushMode::NO_FLUSH_RESOURCES),
_needs_deep_copy(false),
_num_tuples_per_row(input_batch.row_tuples.size()),
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size);
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
}
char* tuple_data = nullptr;
if (input_batch.is_compressed) {
// Decompress tuple data into data pool
const char* compressed_data = input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
size_t uncompressed_size = 0;
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
}
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (auto offset : input_batch.tuple_offsets) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
// Check whether we have slots that require offset-to-pointer conversion.
if (!_row_desc.has_varlen_slots()) {
return;
}
const auto& tuple_descs = _row_desc.tuple_descriptors();
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
// so the first occurrence of a tuple will always have a higher offset than any tuple
// we already converted.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (desc->string_slots().empty() && desc->collection_slots().empty()) {
continue;
}
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
int offset = convert_to<int>(string_val->ptr);
string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed from int to size_t in
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
// this works fine in version 0.10, however in 0.11 this will lead to an invalid
// length. So we make the high bits zero here.
string_val->len &= 0x7FFFFFFFL;
}
// copy collection slot
for (auto slot_collection : desc->collection_slots()) {
DCHECK(slot_collection->type().is_collection_type());
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
int offset = convert_to<int>(array_val->data());
array_val->set_data(tuple_data + offset);
int null_offset = convert_to<int>(array_val->null_signs());
array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
const TypeDescriptor& item_type = slot_collection->type().children.at(0);
if (!item_type.is_string_type()) {
continue;
}
// copy string item
for (size_t k = 0; k < array_val->length(); ++k) {
if (array_val->is_null_at(k)) {
continue;
}
StringValue* dst_item_v = convert_to<StringValue*>(
(uint8_t*)array_val->data() + k * item_type.get_slot_size());
if (dst_item_v->len != 0) {
int offset = convert_to<int>(dst_item_v->ptr);
dst_item_v->ptr = tuple_data + offset;
}
}
}
}
}
}
void RowBatch::clear() {
if (_cleared) {
return;
@ -364,93 +247,39 @@ RowBatch::~RowBatch() {
clear();
}
size_t RowBatch::serialize(TRowBatch* output_batch) {
// why does Thrift not generate a Clear() function?
output_batch->row_tuples.clear();
output_batch->tuple_offsets.clear();
output_batch->is_compressed = false;
output_batch->num_rows = _num_rows;
_row_desc.to_thrift(&output_batch->row_tuples);
output_batch->tuple_offsets.reserve(_num_rows * _num_tuples_per_row);
size_t size = total_byte_size();
output_batch->tuple_data.resize(size);
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = output_batch->tuple_data.data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
output_batch->tuple_offsets.push_back(-1);
continue;
}
// Record offset before creating copy (which increments offset and tuple_data)
output_batch->tuple_offsets.push_back(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
// smaller
size_t max_compressed_size = snappy::MaxCompressedLength(size);
if (_compression_scratch.size() < max_compressed_size) {
_compression_scratch.resize(max_compressed_size);
}
size_t compressed_size = 0;
char* compressed_output = _compression_scratch.data();
snappy::RawCompress(output_batch->tuple_data.c_str(), size, compressed_output,
&compressed_size);
if (LIKELY(compressed_size < size)) {
_compression_scratch.resize(compressed_size);
output_batch->tuple_data.swap(_compression_scratch);
output_batch->is_compressed = true;
}
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
// The size output_batch would be if we didn't compress tuple_data (will be equal to
// actual batch size if tuple_data isn't compressed)
return get_batch_size(*output_batch) - output_batch->tuple_data.size() + size;
}
size_t RowBatch::serialize(PRowBatch* output_batch) {
Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
std::string* allocated_buf) {
// num_rows
output_batch->set_num_rows(_num_rows);
// row_tuples
_row_desc.to_protobuf(output_batch->mutable_row_tuples());
// tuple_offsets: must clear before reserve
output_batch->clear_tuple_offsets();
output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
output_batch->clear_new_tuple_offsets();
output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
// is_compressed
output_batch->set_is_compressed(false);
// tuple data
size_t size = total_byte_size();
auto mutable_tuple_data = output_batch->mutable_tuple_data();
mutable_tuple_data->resize(size);
std::string* mutable_tuple_data = nullptr;
if (allocated_buf != nullptr) {
allocated_buf->resize(size);
// all tuple data will be written in the allocated_buf
// instead of tuple_data in PRowBatch
mutable_tuple_data = allocated_buf;
// tuple_data is a required field
output_batch->set_tuple_data("");
} else {
mutable_tuple_data = output_batch->mutable_tuple_data();
mutable_tuple_data->resize(size);
}
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
int64_t offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = mutable_tuple_data->data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
const auto& mutable_tuple_offsets = output_batch->mutable_new_tuple_offsets();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
@ -464,11 +293,10 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
// Record offset before creating copy (which increments offset and tuple_data)
mutable_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
CHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
@ -492,9 +320,21 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
// The size output_batch would be if we didn't compress tuple_data (will be equal to
// actual batch size if tuple_data isn't compressed)
return get_batch_size(*output_batch) - mutable_tuple_data->size() + size;
// return compressed and uncompressed size
size_t pb_size = get_batch_size(*output_batch);
if (allocated_buf == nullptr) {
*uncompressed_size = pb_size - mutable_tuple_data->size() + size;
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
return Status::InternalError(fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
"please set BE config 'transfer_data_by_brpc_attachment' to true and restart BE.", pb_size));
}
} else {
*uncompressed_size = pb_size + size;
*compressed_size = pb_size + mutable_tuple_data->size();
}
return Status::OK();
}
// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch,
@ -676,13 +516,6 @@ vectorized::Block RowBatch::convert_to_vec_block() const {
return {columns_with_type_and_name};
}
size_t RowBatch::get_batch_size(const TRowBatch& batch) {
size_t result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
}
size_t RowBatch::get_batch_size(const PRowBatch& batch) {
size_t result = batch.tuple_data().size();
result += batch.row_tuples().size() * sizeof(int32_t);

View File

@ -37,7 +37,6 @@ class Block;
namespace doris {
class BufferedTupleStream2;
class TRowBatch;
class Tuple;
class TupleRow;
class TupleDescriptor;
@ -55,7 +54,7 @@ class PRowBatch;
// the data is in an io buffer that may not be attached to this row batch. The
// creator of that row batch has to make sure that the io buffer is not recycled
// until all batches that reference the memory have been consumed.
// In order to minimize memory allocations, RowBatches and TRowBatches that have been
// In order to minimize memory allocations, RowBatches and PRowBatches that have been
// serialized and sent over the wire should be reused (this prevents _compression_scratch
// from being needlessly reallocated).
//
@ -93,8 +92,6 @@ public:
// in the data back into pointers.
// TODO: figure out how to transfer the data from input_batch to this RowBatch
// (so that we don't need to make yet another copy)
RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* tracker);
RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, MemTracker* tracker);
// Releases all resources accumulated at this row batch. This includes
@ -356,11 +353,12 @@ public:
// This function does not reset().
// Returns the uncompressed serialized size (this will be the true size of output_batch
// if tuple_data is actually uncompressed).
size_t serialize(TRowBatch* output_batch);
size_t serialize(PRowBatch* output_batch);
// if allocated_buf is not null, the serialized tuple data will be saved in this buf
// instead of `tuple_data` in PRowBatch.
Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
std::string* allocated_buf = nullptr);
// Utility function: returns total size of batch.
static size_t get_batch_size(const TRowBatch& batch);
static size_t get_batch_size(const PRowBatch& batch);
vectorized::Block convert_to_vec_block() const;
@ -475,10 +473,10 @@ private:
std::vector<BufferedBlockMgr2::Block*> _blocks;
// String to write compressed tuple data to in serialize().
// This is a string so we can swap() with the string in the TRowBatch we're serializing
// to (we don't compress directly into the TRowBatch in case the compressed data is
// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and
// avoids excess memory allocations: since we reuse RowBatches and TRowBatchs, and
// This is a string so we can swap() with the string in the PRowBatch we're serializing
// to (we don't compress directly into the PRowBatch in case the compressed data is
// longer than the uncompressed data). Swapping avoids copying data to the PRowBatch and
// avoids excess memory allocations: since we reuse RowBatches and PRowBatchs, and
// assuming all row batches are roughly the same size, all strings will eventually be
// allocated to the right size.
std::string _compression_scratch;

View File

@ -75,7 +75,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
StringValue* string_v = dst->get_string_slot(string_slot->tuple_offset());
if (!dst->is_null(string_slot->null_indicator_offset())) {
if (string_v->len != 0) {
int offset = pool->total_allocated_bytes();
int64_t offset = pool->total_allocated_bytes();
char* string_copy = (char*)(pool->allocate(string_v->len));
memory_copy(string_copy, string_v->ptr, string_v->len);
string_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
@ -101,7 +101,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
int coll_byte_size = cv->length() * item_type.get_slot_size();
int nulls_size = cv->length() * sizeof(bool);
int offset = pool->total_allocated_bytes();
int64_t offset = pool->total_allocated_bytes();
char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size));
// copy data and null_signs
@ -130,7 +130,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
}
StringValue* dst_item_v = convert_to<StringValue*>(coll_data + item_offset);
if (dst_item_v->len != 0) {
int offset = pool->total_allocated_bytes();
int64_t offset = pool->total_allocated_bytes();
char* string_copy = (char*)(pool->allocate(dst_item_v->len));
memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
@ -181,7 +181,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
return bytes;
}
void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs) {
void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset, bool convert_ptrs) {
Tuple* dst = (Tuple*)(*data);
memory_copy(dst, this, desc.byte_size());
*data += desc.byte_size();
@ -231,7 +231,7 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
// when item is string type, copy every item
char* base_data = *data;
int base_offset = *offset;
int64_t base_offset = *offset;
*data += coll_byte_size + nulls_size;
*offset += coll_byte_size + nulls_size;

View File

@ -99,8 +99,8 @@ public:
// If 'convert_ptrs' is true, converts pointers that are part of the tuple
// into offsets in data, based on the provided offset. Otherwise they will be
// pointers directly into data.
void deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs);
void deep_copy(const TupleDescriptor& desc, char** data, int* offset) {
void deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset, bool convert_ptrs);
void deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset) {
deep_copy(desc, data, offset, false);
}

View File

@ -17,35 +17,33 @@
#pragma once
#include "util/stack_util.h"
namespace doris {
// Transfer RowBatch in ProtoBuf Request to Controller Attachment.
// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
// and it is expected that performance can be improved.
template <typename Params, typename Closure>
inline void request_row_batch_transfer_attachment(Params* brpc_request, Closure* closure) {
if (brpc_request->has_row_batch() && config::transfer_data_by_brpc_attachment == true) {
butil::IOBuf attachment;
auto row_batch = brpc_request->mutable_row_batch();
attachment.append(row_batch->tuple_data());
row_batch->clear_tuple_data();
row_batch->set_tuple_data("");
closure->cntl.request_attachment().swap(attachment);
brpc_request->set_transfer_by_attachment(true);
}
inline void request_row_batch_transfer_attachment(Params* brpc_request, const std::string& tuple_data, Closure* closure) {
auto row_batch = brpc_request->mutable_row_batch();
row_batch->set_tuple_data("");
brpc_request->set_transfer_by_attachment(true);
butil::IOBuf attachment;
attachment.append(tuple_data);
closure->cntl.request_attachment().swap(attachment);
}
// Controller Attachment transferred to RowBatch in ProtoBuf Request.
template <typename Params>
inline void attachment_transfer_request_row_batch(const Params* brpc_request,
brpc::Controller* cntl) {
inline void attachment_transfer_request_row_batch(const Params* brpc_request, brpc::Controller* cntl) {
Params* req = const_cast<Params*>(brpc_request);
if (req->has_row_batch() && req->transfer_by_attachment()) {
auto rb = req->mutable_row_batch();
DCHECK(cntl->request_attachment().size() > 0);
const butil::IOBuf& io_buf = cntl->request_attachment();
CHECK(io_buf.size() > 0) << io_buf.size() << ", row num: " << req->row_batch().num_rows();
io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
}
}
} // namespace doris
} // namespace doris

View File

@ -34,9 +34,6 @@ class VExprContext;
class VResultSink : public DataSink {
public:
// construct a buffer for the result need send to coordinator.
// row_desc used for convert RowBatch to TRowBatch
// buffer_size is the buffer size allocated to each query
VResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TResultSink& sink, int buffer_size);

View File

@ -249,7 +249,7 @@ private:
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
// the number of TRowBatch.data bytes sent successfully
// the number of RowBatch.data bytes sent successfully
int64_t _num_data_bytes_sent;
int64_t _packet_seq;

View File

@ -121,6 +121,9 @@ public:
}
private:
size_t uncompressed_size = 0;
size_t compressed_size = 0;
};
TEST_F(LoadChannelMgrTest, check_builder) {
@ -256,8 +259,7 @@ TEST_F(LoadChannelMgrTest, normal) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567;
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
// google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
request.release_id();
@ -423,7 +425,7 @@ TEST_F(LoadChannelMgrTest, add_failed) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567;
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size);
// DeltaWriter's write will return -215
add_status = OLAP_ERR_TABLE_NOT_FOUND;
PTabletWriterAddBatchResult response;
@ -516,7 +518,7 @@ TEST_F(LoadChannelMgrTest, close_failed) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567;
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size);
close_status = OLAP_ERR_TABLE_NOT_FOUND;
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
@ -605,7 +607,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567;
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
request.release_id();
@ -691,7 +693,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) = 76543234567;
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, &compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
ASSERT_TRUE(st.ok());

View File

@ -37,9 +37,12 @@ message PQueryStatistics {
message PRowBatch {
required int32 num_rows = 1;
repeated int32 row_tuples = 2;
// Should be deprecated after v1.2.0
repeated int32 tuple_offsets = 3;
required bytes tuple_data = 4;
required bool is_compressed = 5;
// This is used to replace "tuple_offsets"
repeated int64 new_tuple_offsets = 6;
}
message PColumn {