[Improvement](load) single partition load optimize (#20876)

1. When creating a single partition,partition and tablet are not looked up for each row of data
2. Only DISTRIBUTED BY random
This commit is contained in:
zzzxl
2023-06-20 20:29:39 +08:00
committed by GitHub
parent 493f9f563c
commit 190debaac9
2 changed files with 79 additions and 18 deletions

View File

@ -36,9 +36,11 @@
#include <algorithm>
#include <iterator>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
@ -1276,6 +1278,56 @@ void VOlapTableSink::_generate_row_distribution_payload(
}
}
Status VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, int32_t filtered_rows) {
const VOlapTablePartition* partition = nullptr;
uint32_t tablet_index = 0;
bool stop_processing = false;
for (int32_t i = 0; i < num_rows; ++i) {
if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
continue;
}
bool is_continue = false;
RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index, stop_processing,
is_continue));
if (is_continue) {
continue;
}
if (config::enable_lazy_open_partition) {
_open_partition(partition);
}
break;
}
for (int j = 0; j < partition->indexes.size(); ++j) {
auto tid = partition->indexes[j].tablets[tablet_index];
auto it = _channels[j]->_channels_by_tablet.find(tid);
DCHECK(it != _channels[j]->_channels_by_tablet.end())
<< "unknown tablet, tablet_id=" << tablet_index;
int64_t row_cnt = 0;
for (const auto& channel : it->second) {
if (channel_to_payload[j].count(channel.get()) < 1) {
channel_to_payload[j].insert(
{channel.get(), Payload {std::unique_ptr<vectorized::IColumn::Selector>(
new vectorized::IColumn::Selector()),
std::vector<int64_t>()}});
}
auto& selector = channel_to_payload[j][channel.get()].first;
auto& tablet_ids = channel_to_payload[j][channel.get()].second;
for (int32_t i = 0; i < num_rows; ++i) {
if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
continue;
}
selector->push_back(i);
}
tablet_ids.resize(selector->size(), tid);
row_cnt = selector->size();
}
_number_output_rows += row_cnt;
}
return Status::OK();
}
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@ -1328,24 +1380,30 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
_partition_to_tablet_map.clear();
}
_row_distribution_watch.start();
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
continue;
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing,
is_continue));
if (is_continue) {
continue;
}
// each row
_generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1);
// open partition
if (config::enable_lazy_open_partition) {
// aysnc open operation,don't block send operation
_open_partition(partition);
size_t partition_num = _vpartition->get_partitions().size();
if (partition_num == 1 && findTabletMode == FindTabletMode::FIND_TABLET_EVERY_SINK) {
RETURN_IF_ERROR(_single_partition_generate(state, &block, channel_to_payload, num_rows,
filtered_rows));
} else {
for (int i = 0; i < num_rows; ++i) {
if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
continue;
}
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing,
is_continue));
if (is_continue) {
continue;
}
// each row
_generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1);
// open partition
if (config::enable_lazy_open_partition) {
// aysnc open operation,don't block send operation
_open_partition(partition);
}
}
}
_row_distribution_watch.stop();

View File

@ -490,6 +490,9 @@ private:
void _generate_row_distribution_payload(ChannelDistributionPayload& payload,
const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx, size_t row_cnt);
Status _single_partition_generate(RuntimeState* state, vectorized::Block* block,
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, int32_t filtered_rows);
// make input data valid for OLAP table
// return number of invalid/filtered rows.