[opt](load) optimize the performance of row distribution (#25546)

For non-pipeline non-sinkv2:
before: 14s
now: 6s-
For pipeline + sinkv2:
before: 230ms *48 instances
now: 38ms *48 instances
This commit is contained in:
zclllyybb
2023-11-07 10:04:59 +08:00
committed by GitHub
parent fa7a38b587
commit 16644eff7f
8 changed files with 287 additions and 213 deletions

View File

@ -17,7 +17,6 @@
#include "exec/tablet_info.h"
#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Types_types.h>
@ -26,6 +25,7 @@
#include <stddef.h>
#include <algorithm>
#include <memory>
#include <ostream>
#include <tuple>
@ -324,49 +324,20 @@ Status VOlapTablePartitionParam::init() {
}
}
_partitions_map.reset(
new std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>(
VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs)));
_partitions_map = std::make_unique<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>(
VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
}
}
if (_distributed_slot_locs.empty()) {
_compute_tablet_index = [](BlockRow* key,
const VOlapTablePartition& partition) -> uint32_t {
if (partition.load_tablet_idx == -1) {
// load_to_single_tablet = false, just do random
return butil::fast_rand() % partition.num_buckets;
}
// load_to_single_tablet = ture, do round-robin
return partition.load_tablet_idx % partition.num_buckets;
};
} else {
_compute_tablet_index = [this](BlockRow* key,
const VOlapTablePartition& partition) -> uint32_t {
uint32_t hash_val = 0;
for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
auto slot_desc = _slots[_distributed_slot_locs[i]];
auto& column = key->first->get_by_position(_distributed_slot_locs[i]).column;
auto val = column->get_data_at(key->second);
if (val.data != nullptr) {
hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
hash_val);
} else {
hash_val = HashUtil::zlib_crc_hash_null(hash_val);
}
}
return hash_val % partition.num_buckets;
};
}
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
// initial partitions
for (int i = 0; i < _t_param.partitions.size(); ++i) {
const TOlapTablePartition& t_part = _t_param.partitions[i];
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
@ -385,26 +356,6 @@ Status VOlapTablePartitionParam::init() {
return Status::OK();
}
bool VOlapTablePartitionParam::find_partition(BlockRow* block_row,
const VOlapTablePartition** partition) const {
// block_row is gave by inserting process. So try to use transformed index.
auto it =
_is_in_partition
? _partitions_map->find(std::tuple {block_row->first, block_row->second, true})
: _partitions_map->upper_bound(
std::tuple {block_row->first, block_row->second, true});
// for list partition it might result in default partition
if (_is_in_partition) {
*partition = (it != _partitions_map->end()) ? it->second : _default_partition;
it = _partitions_map->end();
}
if (it != _partitions_map->end() &&
_part_contains(it->second, std::tuple {block_row->first, block_row->second, true})) {
*partition = it->second;
}
return (*partition != nullptr);
}
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
BlockRowWithIndicator key) const {
// start_key.second == -1 means only single partition
@ -413,11 +364,6 @@ bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
!comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
}
uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
const VOlapTablePartition& partition) const {
return _compute_tablet_index(block_row, partition);
}
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {

View File

@ -17,6 +17,7 @@
#pragma once
#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>
@ -33,6 +34,8 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
@ -162,9 +165,78 @@ public:
int64_t version() const { return _t_param.version; }
// return true if we found this block_row in partition
bool find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const;
//TODO: use virtual function to refactor it
ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
VOlapTablePartition*& partition) const {
auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
: _partitions_map->upper_bound(std::tuple {block, row, true});
// for list partition it might result in default partition
if (_is_in_partition) {
partition = (it != _partitions_map->end()) ? it->second : _default_partition;
it = _partitions_map->end();
}
if (it != _partitions_map->end() &&
_part_contains(it->second, std::tuple {block, row, true})) {
partition = it->second;
}
return (partition != nullptr);
}
uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const;
ALWAYS_INLINE void find_tablets(
vectorized::Block* block, const std::vector<uint32_t>& indexes,
const std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_indexes /*result*/,
/*TODO: check if flat hash map will be better*/
std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) const {
std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)>
compute_function;
if (!_distributed_slot_locs.empty()) {
//TODO: refactor by saving the hash values. then we can calculate in columnwise.
compute_function = [this](vectorized::Block* block, uint32_t row,
const VOlapTablePartition& partition) -> uint32_t {
uint32_t hash_val = 0;
for (unsigned short _distributed_slot_loc : _distributed_slot_locs) {
auto* slot_desc = _slots[_distributed_slot_loc];
auto& column = block->get_by_position(_distributed_slot_loc).column;
auto val = column->get_data_at(row);
if (val.data != nullptr) {
hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
hash_val);
} else {
hash_val = HashUtil::zlib_crc_hash_null(hash_val);
}
}
return hash_val % partition.num_buckets;
};
} else { // random distribution
compute_function = [](vectorized::Block* block, uint32_t row,
const VOlapTablePartition& partition) -> uint32_t {
if (partition.load_tablet_idx == -1) {
// load_to_single_tablet = false, just do random
return butil::fast_rand() % partition.num_buckets;
}
// load_to_single_tablet = ture, do round-robin
return partition.load_tablet_idx % partition.num_buckets;
};
}
if (partition_tablets_buffer == nullptr) {
for (auto index : indexes) {
tablet_indexes[index] = compute_function(block, index, *partitions[index]);
}
} else { // use buffer
for (auto index : indexes) {
auto& partition_id = partitions[index]->id;
if (auto it = partition_tablets_buffer->find(partition_id);
it != partition_tablets_buffer->end()) {
tablet_indexes[index] = it->second; // tablet
}
// compute and save in buffer
(*partition_tablets_buffer)[partition_id] = tablet_indexes[index] =
compute_function(block, index, *partitions[index]);
}
}
}
const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; }
@ -193,8 +265,6 @@ private:
Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);
std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> _compute_tablet_index;
// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;

View File

@ -41,21 +41,24 @@
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows,
std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_index, bool& stop_processing,
std::vector<bool>& skip, std::vector<int64_t>* miss_rows) {
for (int index = 0; index < rows; index++) {
_vpartition->find_partition(block, index, partitions[index]);
}
Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& stop_processing, bool& is_continue,
bool* missing_partition) {
Status status = Status::OK();
*partition = nullptr;
tablet_index = 0;
BlockRow block_row;
block_row = {block, row_index};
if (!_vpartition->find_partition(&block_row, partition)) {
if (missing_partition != nullptr) { // auto partition table
*missing_partition = true;
return status;
} else {
std::vector<uint32_t> qualified_rows;
qualified_rows.reserve(rows);
for (int row_index = 0; row_index < rows; row_index++) {
if (partitions[row_index] == nullptr) [[unlikely]] {
if (miss_rows != nullptr) { // auto partition table
miss_rows->push_back(row_index); // already reserve memory outside
skip[row_index] = true;
continue;
}
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
@ -70,33 +73,34 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
is_continue = true;
return status;
skip[row_index] = true;
continue;
}
}
if (!(*partition)->is_mutable) {
_num_immutable_partition_filtered_rows++;
is_continue = true;
return status;
}
if ((*partition)->num_buckets <= 0) {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets;
return Status::InternalError(ss.str());
}
_partition_ids.emplace((*partition)->id);
if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
_partition_to_tablet_map.emplace((*partition)->id, tablet_index);
} else {
tablet_index = _partition_to_tablet_map[(*partition)->id];
if (!partitions[row_index]->is_mutable) [[unlikely]] {
_num_immutable_partition_filtered_rows++;
skip[row_index] = true;
continue;
}
} else {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
if (partitions[row_index]->num_buckets <= 0) [[unlikely]] {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets="
<< partitions[row_index]->num_buckets;
return Status::InternalError(ss.str());
}
_partition_ids.emplace(partitions[row_index]->id);
qualified_rows.push_back(row_index);
}
return status;
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index);
} else {
_vpartition->find_tablets(block, qualified_rows, partitions, tablet_index,
&_partition_to_tablet_map);
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -18,10 +18,12 @@
#pragma once
#include <map>
#include <unordered_set>
#include "common/status.h"
#include "exec/tablet_info.h"
#include "util/bitmap.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
namespace doris::vectorized {
@ -39,9 +41,10 @@ public:
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
: _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {};
Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& filtered, bool& is_continue, bool* missing_partition = nullptr);
Status find_tablets(RuntimeState* state, vectorized::Block* block, int rows,
std::vector<VOlapTablePartition*>& partitions,
std::vector<uint32_t>& tablet_index, bool& filtered,
std::vector<bool>& is_continue, std::vector<int64_t>* miss_rows = nullptr);
bool is_find_tablet_every_sink() {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
@ -55,7 +58,7 @@ public:
bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
const std::set<int64_t>& partition_ids() { return _partition_ids; }
const vectorized::flat_hash_set<int64_t>& partition_ids() { return _partition_ids; }
int64_t num_filtered_rows() const { return _num_filtered_rows; }
@ -69,7 +72,7 @@ private:
VOlapTablePartitionParam* _vpartition;
FindTabletMode _find_tablet_mode;
std::map<int64_t, int64_t> _partition_to_tablet_map;
std::set<int64_t> _partition_ids;
vectorized::flat_hash_set<int64_t> _partition_ids;
int64_t _num_filtered_rows = 0;
int64_t _num_immutable_partition_filtered_rows = 0;

View File

@ -221,20 +221,32 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
}
}
void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx) {
// Generate channel payload for sinking data to each tablet
for (const auto& index : partition->indexes) {
auto tablet_id = index.tablets[tablet_index];
if (rows_for_tablet.count(tablet_id) == 0) {
Rows rows;
rows.partition_id = partition->id;
rows.index_id = index.index_id;
rows_for_tablet.insert({tablet_id, rows});
void VOlapTableSinkV2::_generate_rows_for_tablet(
RowsForTablet& rows_for_tablet, const std::vector<VOlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& skip,
size_t row_cnt) {
for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
if (skip[row_idx]) {
continue;
}
auto& partition = partitions[row_idx];
auto& tablet_index = tablet_indexes[row_idx];
for (const auto& index : partition->indexes) {
auto tablet_id = index.tablets[tablet_index];
auto it = rows_for_tablet.find(tablet_id);
if (it == rows_for_tablet.end()) {
Rows rows;
rows.partition_id = partition->id;
rows.index_id = index.index_id;
rows.row_idxes.reserve(row_cnt);
auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
it = tmp_it;
}
it->second.row_idxes.push_back(row_idx);
_number_output_rows++;
}
rows_for_tablet[tablet_id].row_idxes.push_back(row_idx);
_number_output_rows++;
}
}
@ -288,20 +300,22 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc
_row_distribution_watch.start();
const auto num_rows = input_rows;
const auto* __restrict filter_map = _block_convertor->filter_map();
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && filter_map[i]) {
continue;
//reuse vars
_partitions.assign(num_rows, nullptr);
_skip.assign(num_rows, false);
_tablet_indexes.assign(num_rows, 0);
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
_skip[i] = _skip[i] || filter_map[i];
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, tablet_index,
stop_processing, is_continue));
if (is_continue) {
continue;
}
_generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i);
}
_generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, _skip, num_rows);
_row_distribution_watch.stop();
// For each tablet, send its input_rows from block to delta writer

View File

@ -60,6 +60,7 @@
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
@ -137,8 +138,9 @@ private:
void _build_tablet_node_mapping();
void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
const VOlapTablePartition* partition, uint32_t tablet_index,
int row_idx);
const std::vector<VOlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes,
const std::vector<bool>& skip, size_t row_cnt);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams);
@ -184,6 +186,11 @@ private:
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
// reuse for find_tablet
std::vector<VOlapTablePartition*> _partitions;
std::vector<bool> _skip;
std::vector<uint32_t> _tablet_indexes;
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;

View File

@ -44,6 +44,7 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "olap/wal_manager.h"
#include "util/runtime_profile.h"
@ -421,7 +422,6 @@ Status VNodeChannel::open_wait() {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
open_closure->cntl.remote_side());
}
_cancelled = true;
auto error_code = open_closure->cntl.ErrorCode();
auto error_text = open_closure->cntl.ErrorText();
@ -1337,50 +1337,80 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
// Generate channel payload for sinking data to differenct node channel
// Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>;
// first = row_id, second = vector<tablet_id>
void VTabletWriter::_generate_row_distribution_payload(
ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx, size_t row_cnt) {
// Generate channel payload for sinking data to differenct node channel
for (int j = 0; j < partition->indexes.size(); ++j) {
auto tid = partition->indexes[j].tablets[tablet_index];
auto it = _channels[j]->_channels_by_tablet.find(tid);
DCHECK(it != _channels[j]->_channels_by_tablet.end())
<< "unknown tablet, tablet_id=" << tablet_index;
for (const auto& channel : it->second) {
if (channel_to_payload[j].count(channel.get()) < 1) {
channel_to_payload[j].insert(
{channel.get(), Payload {std::unique_ptr<vectorized::IColumn::Selector>(
new vectorized::IColumn::Selector()),
std::vector<int64_t>()}});
}
channel_to_payload[j][channel.get()].first->push_back(row_idx);
channel_to_payload[j][channel.get()].second.push_back(tid);
ChannelDistributionPayload& channel_to_payload,
const std::vector<VOlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& skip,
size_t row_cnt) {
for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
if (skip[row_idx]) {
continue;
}
const auto& partition = partitions[row_idx];
const auto& tablet_index = tablet_indexes[row_idx];
for (int index_num = 0; index_num < partition->indexes.size();
++index_num) { // partition->indexes = [index, tablets...]
auto tablet_id = partition->indexes[index_num].tablets[tablet_index];
auto it = _channels[index_num]->_channels_by_tablet.find(
tablet_id); // (tablet_id, VNodeChannel) where this tablet locate
DCHECK(it != _channels[index_num]->_channels_by_tablet.end())
<< "unknown tablet, tablet_id=" << tablet_index;
std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = it->second;
std::unordered_map<VNodeChannel*, Payload>& payloads_this_index =
channel_to_payload[index_num]; // payloads of this index in every node
for (const auto& locate_node : tablet_locations) {
auto payload_it =
payloads_this_index.find(locate_node.get()); // <VNodeChannel*, Payload>
if (payload_it == payloads_this_index.end()) {
auto [tmp_it, _] = payloads_this_index.emplace(
locate_node.get(),
Payload {std::make_unique<vectorized::IColumn::Selector>(),
std::vector<int64_t>()});
payload_it = tmp_it;
payload_it->second.first->reserve(row_cnt);
payload_it->second.second.reserve(row_cnt);
}
payload_it->second.first->push_back(row_idx);
payload_it->second.second.push_back(tablet_id);
}
_number_output_rows++;
}
_number_output_rows += row_cnt;
}
}
Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, bool has_filtered_rows) {
// only need to calculate one value for single partition.
std::vector<VOlapTablePartition*> partitions(1, nullptr);
std::vector<bool> skip(1, false);
std::vector<uint32_t> tablet_indexes(1, 0);
bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes,
stop_processing, skip));
const VOlapTablePartition* partition = nullptr;
uint32_t tablet_index = 0;
bool stop_processing = false;
for (int32_t i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
continue;
for (size_t i = 0; i < num_rows; i++) {
if (!skip[i]) {
partition = partitions[i];
tablet_index = tablet_indexes[i];
break;
}
bool is_continue = false;
RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i, &partition, tablet_index,
stop_processing, is_continue));
if (is_continue) {
continue;
}
break;
}
if (partition == nullptr) {
return Status::OK();
}
for (int j = 0; j < partition->indexes.size(); ++j) {
auto tid = partition->indexes[j].tablets[tablet_index];
auto it = _channels[j]->_channels_by_tablet.find(tid);
@ -1388,10 +1418,9 @@ Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized
<< "unknown tablet, tablet_id=" << tablet_index;
int64_t row_cnt = 0;
for (const auto& channel : it->second) {
if (channel_to_payload[j].count(channel.get()) < 1) {
if (!channel_to_payload[j].contains(channel.get())) {
channel_to_payload[j].insert(
{channel.get(), Payload {std::unique_ptr<vectorized::IColumn::Selector>(
new vectorized::IColumn::Selector()),
{channel.get(), Payload {std::make_unique<vectorized::IColumn::Selector>(),
std::vector<int64_t>()}});
}
auto& selector = channel_to_payload[j][channel.get()].first;
@ -1535,10 +1564,15 @@ Status VTabletWriter::close(Status exec_status) {
auto status = Status::OK();
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0,
total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 0,
total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0,
num_node_channels = 0;
int64_t serialize_batch_ns = 0;
int64_t queue_push_lock_ns = 0;
int64_t actual_consume_ns = 0;
int64_t total_add_batch_exec_time_ns = 0;
int64_t max_add_batch_exec_time_ns = 0;
int64_t total_wait_exec_time_ns = 0;
int64_t max_wait_exec_time_ns = 0;
int64_t total_add_batch_num = 0;
int64_t num_node_channels = 0;
VNodeChannelStat channel_stat;
for (const auto& index_channel : _channels) {
@ -1665,7 +1699,7 @@ Status VTabletWriter::close(Status exec_status) {
[](const std::shared_ptr<VNodeChannel>& ch) { ch->clear_all_blocks(); });
}
if (_wal_writer.get() != nullptr) {
if (_wal_writer != nullptr) {
static_cast<void>(_wal_writer->finalize());
}
return _close_status;
@ -1703,7 +1737,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
bool stop_processing = false;
std::vector<std::unordered_map<VNodeChannel*, Payload>> channel_to_payload;
ChannelDistributionPayload channel_to_payload;
channel_to_payload.resize(_channels.size());
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
@ -1737,34 +1771,30 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
missing_map.reserve(partition_col.column->size());
// try to find tablet and save missing value
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
bool missing_this = false;
RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, block.get(), i, &partition,
tablet_index, stop_processing,
is_continue, &missing_this));
if (missing_this) {
missing_map.push_back(i);
} else {
_generate_row_distribution_payload(channel_to_payload, partition, tablet_index,
i, 1);
}
}
missing_map.shrink_to_fit();
std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
std::vector<bool> skip(num_rows, false);
std::vector<uint32_t> tablet_indexes(num_rows, 0);
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto type = partition_col.type;
if (missing_map.size() > 0) {
//TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time.
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip,
&missing_map));
if (missing_map.empty()) {
// we don't calculate it distribution when have missing values
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
}
_generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes,
skip, num_rows);
} else { // for missing partition keys, calc the missing partition and save in _partitions_need_create
auto return_type = part_func->data_type();
// expose the data column
vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column;
if (auto* nullable =
if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
range_left_col = nullable->get_nested_column_ptr();
return_type =
@ -1786,23 +1816,20 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
return Status::NeedSendAgain("");
} // creating done
} else { // not auto partition
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
continue;
std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
std::vector<bool> skip(num_rows, false);
std::vector<uint32_t> tablet_indexes(num_rows, 0);
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions,
tablet_indexes, stop_processing, skip));
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
skip[i] = skip[i] || _block_convertor->filter_map()[i];
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, block.get(), i, &partition,
tablet_index, stop_processing,
is_continue));
if (is_continue) {
continue;
}
// each row
_generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i,
1);
}
_generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip,
num_rows);
}
}
_row_distribution_watch.stop();

View File

@ -553,10 +553,13 @@ private:
using ChannelDistributionPayload = std::vector<std::unordered_map<VNodeChannel*, Payload>>;
Status _init(RuntimeState* state, RuntimeProfile* profile);
// payload for each row
void _generate_row_distribution_payload(ChannelDistributionPayload& payload,
const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx, size_t row_cnt);
// payload for every row
void _generate_row_distribution_payload(ChannelDistributionPayload& channel_to_payload,
const std::vector<VOlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes,
const std::vector<bool>& skip, size_t row_cnt);
Status _single_partition_generate(RuntimeState* state, vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, bool has_filtered_rows);