[refactor](load) move find_tablet out of VOlapTableSink (#21462)

This commit is contained in:
Kaijie Chen
2023-07-06 16:51:32 +08:00
committed by GitHub
parent 2e651bbc9a
commit 457de3fc55
4 changed files with 186 additions and 85 deletions

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/vtablet_finder.h"
#include <fmt/format.h>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "exec/tablet_info.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
namespace doris {
namespace stream_load {
Status OlapTabletFinder::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& stop_processing, bool& is_continue) {
Status status = Status::OK();
*partition = nullptr;
tablet_index = 0;
BlockRow block_row;
block_row = {block, row_index};
if (!_vpartition->find_partition(&block_row, partition)) {
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple={}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
},
&stop_processing));
_num_filtered_rows++;
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
is_continue = true;
return status;
}
if (!(*partition)->is_mutable) {
_num_immutable_partition_filtered_rows++;
is_continue = true;
return status;
}
if ((*partition)->num_buckets <= 0) {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets;
return Status::InternalError(ss.str());
}
_partition_ids.emplace((*partition)->id);
if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
_partition_to_tablet_map.emplace((*partition)->id, tablet_index);
} else {
tablet_index = _partition_to_tablet_map[(*partition)->id];
}
} else {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
}
return status;
}
} // namespace stream_load
} // namespace doris

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <map>
#include "common/status.h"
#include "exec/tablet_info.h"
#include "vec/core/block.h"
namespace doris {
namespace stream_load {
class OlapTabletFinder {
public:
// FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we
// should compute tablet index for every row
// FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should
// compute tablet index for every row batch
// FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should
// only compute tablet index in the corresponding partition once for the whole time in olap table sink
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
: _vpartition(vpartition), _find_tablet_mode(mode) {};
Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& filtered, bool& is_continue);
bool is_find_tablet_every_sink() {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
}
void clear_for_new_batch() {
if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
_partition_to_tablet_map.clear();
}
}
bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
const std::set<int64_t>& partition_ids() { return _partition_ids; }
int64_t num_filtered_rows() const { return _num_filtered_rows; }
int64_t num_immutable_partition_filtered_rows() const {
return _num_immutable_partition_filtered_rows;
}
private:
VOlapTablePartitionParam* _vpartition;
FindTabletMode _find_tablet_mode;
std::map<int64_t, int64_t> _partition_to_tablet_map;
std::set<int64_t> _partition_ids;
int64_t _num_filtered_rows = 0;
int64_t _num_immutable_partition_filtered_rows = 0;
};
} // namespace stream_load
} // namespace doris

View File

@ -90,6 +90,7 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
namespace doris {
class TExpr;
@ -798,7 +799,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
}
if (request.eos()) {
for (auto pid : _parent->_partition_ids) {
for (auto pid : _parent->_tablet_finder->partition_ids()) {
request.add_partition_ids(pid);
}
@ -1031,14 +1032,16 @@ Status VOlapTableSink::init(const TDataSink& t_sink) {
// if distributed column list is empty, we can ensure that tablet is with random distribution info
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
// for the whole olap table sink
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
if (table_sink.partition.distributed_columns.empty()) {
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
findTabletMode = FindTabletMode::FIND_TABLET_EVERY_SINK;
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
} else {
findTabletMode = FindTabletMode::FIND_TABLET_EVERY_BATCH;
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
}
}
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
return _vpartition->init();
}
@ -1221,56 +1224,6 @@ size_t VOlapTableSink::get_pending_bytes() const {
return mem_consumption;
}
Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& stop_processing, bool& is_continue) {
Status status = Status::OK();
*partition = nullptr;
tablet_index = 0;
BlockRow block_row;
block_row = {block, row_index};
if (!_vpartition->find_partition(&block_row, partition)) {
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple={}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
},
&stop_processing));
_number_filtered_rows++;
if (stop_processing) {
return Status::EndOfFile("Encountered unqualified data, stop processing");
}
is_continue = true;
return status;
}
if (!(*partition)->is_mutable) {
_number_immutable_partition_filtered_rows++;
is_continue = true;
return status;
}
if ((*partition)->num_buckets <= 0) {
std::stringstream ss;
ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets;
return Status::InternalError(ss.str());
}
_partition_ids.emplace((*partition)->id);
if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
_partition_to_tablet_map.emplace((*partition)->id, tablet_index);
} else {
tablet_index = _partition_to_tablet_map[(*partition)->id];
}
} else {
tablet_index = _vpartition->find_tablet(&block_row, **partition);
}
return status;
}
void VOlapTableSink::_generate_row_distribution_payload(
ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition,
uint32_t tablet_index, int row_idx, size_t row_cnt) {
@ -1305,8 +1258,8 @@ Status VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize
continue;
}
bool is_continue = false;
RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index, stop_processing,
is_continue));
RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i, &partition, tablet_index,
stop_processing, is_continue));
if (is_continue) {
continue;
}
@ -1375,14 +1328,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
bool stop_processing = false;
std::vector<std::unordered_map<VNodeChannel*, Payload>> channel_to_payload;
channel_to_payload.resize(_channels.size());
if (findTabletMode == FIND_TABLET_EVERY_BATCH) {
// Recaculate is needed
_partition_to_tablet_map.clear();
}
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
auto num_rows = block->rows();
size_t partition_num = _vpartition->get_partitions().size();
if (partition_num == 1 && findTabletMode == FindTabletMode::FIND_TABLET_EVERY_SINK) {
if (partition_num == 1 && _tablet_finder->is_find_tablet_every_sink()) {
RETURN_IF_ERROR(_single_partition_generate(state, block.get(), channel_to_payload, num_rows,
has_filtered_rows));
} else {
@ -1393,8 +1343,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
const VOlapTablePartition* partition = nullptr;
bool is_continue = false;
uint32_t tablet_index = 0;
RETURN_IF_ERROR(find_tablet(state, block.get(), i, &partition, tablet_index,
stop_processing, is_continue));
RETURN_IF_ERROR(_tablet_finder->find_tablet(
state, block.get(), i, &partition, tablet_index, stop_processing, is_continue));
if (is_continue) {
continue;
}
@ -1411,7 +1361,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
// Random distribution and the block belongs to a single tablet, we could optimize to append the whole
// block into node channel.
bool load_block_to_single_tablet =
!_schema->is_dynamic_schema() && _partition_to_tablet_map.size() == 1;
!_schema->is_dynamic_schema() && _tablet_finder->is_single_tablet();
if (load_block_to_single_tablet) {
SCOPED_RAW_TIMER(&_filter_ns);
// Filter block
@ -1606,8 +1556,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter,
_block_convertor->num_filtered_rows() + _number_filtered_rows);
COUNTER_SET(_filtered_rows_counter, _block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_filter_timer, _filter_ns);
@ -1628,8 +1578,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
state->num_rows_load_unselected();
state->set_num_rows_load_total(num_rows_load_total);
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_number_filtered_rows);
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
_tablet_finder->num_filtered_rows());
state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
// print log of add batch time of all node, for tracing load performance easily
std::stringstream ss;

View File

@ -82,6 +82,7 @@ class RefCountClosure;
namespace stream_load {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class OpenPartitionClosure;
// The counter of add_batch rpc of a single node
@ -517,10 +518,6 @@ private:
ChannelDistributionPayload& channel_to_payload,
size_t num_rows, bool has_filtered_rows);
Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index,
const VOlapTablePartition** partition, uint32_t& tablet_index,
bool& stop_processing, bool& is_continue);
void _open_partition(const VOlapTablePartition* partition);
Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg,
@ -560,9 +557,7 @@ private:
RuntimeProfile* _profile = nullptr;
std::set<int64_t> _partition_ids;
// only used for partition with random distribution
std::map<int64_t, int64_t> _partition_to_tablet_map;
std::unique_ptr<OlapTabletFinder> _tablet_finder;
// index_channel
std::vector<std::shared_ptr<IndexChannel>> _channels;
@ -575,8 +570,6 @@ private:
int64_t _send_data_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
int64_t _number_filtered_rows = 0;
int64_t _number_immutable_partition_filtered_rows = 0;
int64_t _filter_ns = 0;
MonotonicStopWatch _row_distribution_watch;
@ -617,15 +610,6 @@ private:
// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;
// FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we
// should compute tablet index for every row
// FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should
// compute tablet index for every row batch
// FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should
// only compute tablet index in the corresponding partition once for the whole time in olap table sink
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK };
FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW;
VOlapTablePartitionParam* _vpartition = nullptr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;