[feature](move-memtable) support pipelineX in sink v2 (#27067)

This commit is contained in:
Kaijie Chen
2023-11-16 15:00:55 +08:00
committed by GitHub
parent 54989175fb
commit e29d8cb110
10 changed files with 948 additions and 677 deletions

View File

@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap_table_sink_v2_operator.h"
#include "common/status.h"
namespace doris::pipeline {
OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
}
Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
return Status::OK();
}
Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status exec_status) {
if (Base::_closed) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
if (_closed) {
return _close_status;
}
_close_status = Base::close(state, exec_status);
return _close_status;
}
} // namespace doris::pipeline

View File

@ -18,6 +18,7 @@
#pragma once
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vtablet_sink_v2.h"
namespace doris {
@ -41,9 +42,76 @@ public:
bool can_write() override { return true; } // TODO: need use mem_limit
};
OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
}
class OlapTableSinkV2OperatorX;
class OlapTableSinkV2LocalState final
: public AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX> {
public:
using Base = AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
using Parent = OlapTableSinkV2OperatorX;
ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
return Base::open(state);
}
Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkV2OperatorX;
private:
Status _close_status = Status::OK();
};
class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2LocalState> {
public:
using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr, bool group_commit)
: Base(operator_id, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_group_commit(group_commit),
_pool(pool) {};
Status init(const TDataSink& thrift_sink) override {
RETURN_IF_ERROR(Base::init(thrift_sink));
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, source_state);
}
private:
friend class OlapTableSinkV2LocalState;
template <typename Writer, typename Parent>
friend class AsyncWriterSink;
const RowDescriptor& _row_desc;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
const bool _group_commit;
ObjectPool* _pool;
};
} // namespace pipeline
} // namespace doris

View File

@ -46,6 +46,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
@ -624,5 +626,6 @@ template class PipelineXLocalState<LocalExchangeDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
} // namespace doris::pipeline

View File

@ -62,6 +62,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@ -268,9 +269,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node) {
return Status::InternalError(
"Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node ");
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
_sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc,
output_exprs, false));
} else {
_sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs,
false));
@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(
@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) {
std::placeholders::_2)},
shared_from_this());
}
bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
}
if (schema.is_partial_update()) {
return true;
}
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
return true;
}
}
}
return false;
}
} // namespace doris::pipeline

View File

@ -145,6 +145,9 @@ private:
const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
OperatorXPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;

View File

@ -17,47 +17,22 @@
#include "vec/sink/vtablet_sink_v2.h"
#include <brpc/uri.h>
#include <bthread/bthread.h>
#include <fmt/format.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <algorithm>
#include <execution>
#include <mutex>
#include <ranges>
#include <string>
#include <unordered_map>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "olap/delta_writer_v2.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
#include "util/network_util.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
namespace doris {
class TExpr;
@ -65,391 +40,16 @@ class TExpr;
namespace vectorized {
VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
: DataSink(row_desc), _pool(pool) {
// From the thrift expressions create the real exprs.
*status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
_name = "VOlapTableSinkV2";
}
const std::vector<TExpr>& texprs, bool group_commit)
: AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, texprs),
_pool(pool),
_group_commit(group_commit) {}
VOlapTableSinkV2::~VOlapTableSinkV2() = default;
Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) {
// add new tablet locations. it will use by address. so add to pool
auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
_location->add_locations(*new_locations);
// update new node info
_nodes_info->add_nodes(result->nodes);
// incremental open stream
RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
return Status::OK();
}
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
return static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
}
Status VOlapTableSinkV2::_incremental_open_streams(
const std::vector<TOlapTablePartition>& partitions) {
// do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
std::unordered_set<int64_t> known_indexes;
std::unordered_set<int64_t> new_backends;
for (const auto& t_partition : partitions) {
VOlapTablePartition* partition = nullptr;
RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto nodes = _location->find_tablet(tablet_id)->node_ids;
for (auto& node : nodes) {
PTabletID tablet;
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
if (!_streams_for_node.contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
}
}
}
for (int64_t node_id : new_backends) {
auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
_streams_for_node[node_id] = load_streams;
}
return Status::OK();
}
Status VOlapTableSinkV2::_init_row_distribution() {
VRowDistributionContext ctx;
ctx.state = _state;
ctx.block_convertor = _block_convertor.get();
ctx.tablet_finder = _tablet_finder.get();
ctx.vpartition = _vpartition;
ctx.add_partition_request_timer = _add_partition_request_timer;
ctx.txn_id = _txn_id;
ctx.pool = _pool;
ctx.location = _location;
ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
ctx.on_partitions_created = &vectorized::on_partitions_created;
ctx.caller = (void*)this;
ctx.schema = _schema;
_row_distribution.init(&ctx);
RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
return Status::OK();
}
Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
DCHECK(t_sink.__isset.olap_table_sink);
auto& table_sink = t_sink.olap_table_sink;
_load_id.set_hi(table_sink.load_id.hi);
_load_id.set_lo(table_sink.load_id.lo);
_txn_id = table_sink.txn_id;
_num_replicas = table_sink.num_replicas;
_tuple_desc_id = table_sink.tuple_id;
_write_file_cache = table_sink.write_file_cache;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
// if distributed column list is empty, we can ensure that tablet is with random distribution info
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
// for the whole olap table sink
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
if (table_sink.partition.distributed_columns.empty()) {
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
} else {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
}
}
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
RETURN_IF_ERROR(_vpartition->init());
return Status::OK();
}
Status VOlapTableSinkV2::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
_state = state;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_backend_id = state->backend_id();
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
_num_local_sink = state->num_local_sink();
DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node
<< ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
_is_high_priority =
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
_mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
std::to_string(state->load_job_id()));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("unknown destination tuple descriptor, id = {}",
_tuple_desc_id);
}
_block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
_state->batch_size());
_output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false));
// add all counter
_input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
_row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime");
_write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime");
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
if (config::share_delta_writers) {
_delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
_load_id, _num_local_sink);
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
return Status::OK();
}
Status VOlapTableSinkV2::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
signal::set_signal_task_id(_load_id);
_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(_backend_id));
RETURN_IF_ERROR(_init_row_distribution());
return Status::OK();
}
Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
_streams_for_node[dst_id] = streams;
}
return Status::OK();
}
Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
::doris::stream_load::LoadStreams& streams) {
auto node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
_txn_id, *_schema, tablets_for_schema, _total_streams,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
_txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
}
return Status::OK();
}
void VOlapTableSinkV2::_build_tablet_node_mapping() {
std::unordered_set<int64_t> known_indexes;
for (const auto& partition : _vpartition->get_partitions()) {
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto nodes = _location->find_tablet(tablet_id)->node_ids;
for (auto& node : nodes) {
PTabletID tablet;
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node].emplace(tablet_id, tablet);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
}
}
}
}
void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
RowsForTablet& rows_for_tablet) {
for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
for (int i = 0; i < row_ids.size(); i++) {
auto& tablet_id = tablet_ids[i];
auto it = rows_for_tablet.find(tablet_id);
if (it == rows_for_tablet.end()) {
Rows rows;
rows.partition_id = partition_ids[i];
rows.index_id = _schema->indexes()[index_idx]->index_id;
rows.row_idxes.reserve(row_ids.size());
auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
it = tmp_it;
}
it->second.row_idxes.push_back(row_ids[i]);
_number_output_rows++;
}
}
}
Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams) {
auto location = _location->find_tablet(tablet_id);
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
for (auto& node_id : location->node_ids) {
PTabletID tablet;
tablet.set_partition_id(partition_id);
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
}
Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
if (state->query_options().dry_run_query) {
return status;
}
auto input_rows = input_block->rows();
auto input_bytes = input_block->bytes();
if (UNLIKELY(input_rows == 0)) {
return status;
}
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += input_rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_rows);
state->update_num_bytes_load_total(input_bytes);
DorisMetrics::instance()->load_rows->increment(input_rows);
DorisMetrics::instance()->load_bytes->increment(input_bytes);
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
_row_distribution_watch.start();
std::shared_ptr<vectorized::Block> block;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
*input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids));
RowsForTablet rows_for_tablet;
_generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
_row_distribution_watch.stop();
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
Streams streams;
RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
}
return Status::OK();
}
Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block,
int64_t tablet_id, const Rows& rows,
const Streams& streams) {
DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
.index_id = rows.index_id,
.partition_id = rows.partition_id,
.load_id = _load_id,
.tuple_desc = _output_tuple_desc,
.table_schema_param = _schema.get(),
.is_high_priority = _is_high_priority,
.write_file_cache = _write_file_cache,
};
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
req.schema_hash = index->schema_hash;
break;
}
}
return DeltaWriterV2::open(&req, streams);
});
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
SCOPED_TIMER(_write_memtable_timer);
auto st = delta_writer->write(block.get(), rows.row_idxes, false);
return st;
}
Status VOlapTableSinkV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
if (_delta_writer_for_tablet) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit));
return Status::OK();
}
@ -457,97 +57,8 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return _close_status;
}
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that _profile is null
SCOPED_TIMER(_profile->total_time_counter());
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter,
_block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
// release streams from the pool first, to prevent memory leak
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
_delta_writer_for_tablet.reset();
}
{
// send CLOSE_LOAD to all streams, return ERROR if any
for (const auto& [_, streams] : _streams_for_node) {
RETURN_IF_ERROR(_close_load(streams->streams()));
}
}
{
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
}
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (const auto& [node_id, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = node_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
}
}
}
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
_streams_for_node.clear();
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
state->num_rows_load_unselected();
state->set_num_rows_load_total(num_rows_load_total);
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id;
} else {
RETURN_IF_ERROR(_cancel(status));
}
_close_status = status;
RETURN_IF_ERROR(DataSink::close(state, exec_status));
return status;
}
Status VOlapTableSinkV2::_close_load(const Streams& streams) {
auto node_id = streams[0]->dst_id();
std::vector<PTabletID> tablets_to_commit;
for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
tablets_to_commit.push_back(tablet);
}
}
for (const auto& stream : streams) {
RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
}
return Status::OK();
_close_status = AsyncWriterSink::close(state, exec_status);
return _close_status;
}
} // namespace vectorized

View File

@ -26,204 +26,46 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <cstddef>
#include <cstdint>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/vrow_distribution.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/writer/vtablet_writer_v2.h"
namespace doris {
class DeltaWriterV2;
class LoadStreamStub;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class TDataSink;
class TExpr;
class TabletSchema;
class TupleDescriptor;
namespace stream_load {
class LoadStreams;
}
namespace vectorized {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class VOlapTableSinkV2;
class DeltaWriterV2Map;
inline constexpr char VOLAP_TABLE_SINK_V2[] = "VOlapTableSinkV2";
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
struct Rows {
int64_t partition_id;
int64_t index_id;
std::vector<int32_t> row_idxes;
};
using RowsForTablet = std::unordered_map<int64_t, Rows>;
// Write block data to Olap Table.
// When OlapTableSink::open() called, there will be a consumer thread running in the background.
// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches.
// Join the consumer thread in close().
class VOlapTableSinkV2 final : public DataSink {
class VOlapTableSinkV2 final : public AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2> {
public:
// Construct from thrift struct which is generated by FE.
VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status);
const std::vector<TExpr>& texprs, bool group_commit);
~VOlapTableSinkV2() override;
Status init(const TDataSink& sink) override;
// TODO: unify the code of prepare/open/close with result sink
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status close_status) override;
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;
Status on_partitions_created(TCreatePartitionResult* result);
Status close(RuntimeState* state, Status exec_status) override;
private:
Status _init_row_distribution();
Status _open_streams(int64_t src_id);
Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams);
Status _incremental_open_streams(const std::vector<TOlapTablePartition>& partitions);
void _build_tablet_node_mapping();
void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
RowsForTablet& rows_for_tablet);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams);
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams);
Status _close_load(const Streams& streams);
Status _cancel(Status status);
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool* _pool;
// unique load id
PUniqueId _load_id;
int64_t _txn_id = -1;
int _num_replicas = -1;
int _tuple_desc_id = -1;
bool _group_commit = false;
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
RowDescriptor* _output_row_desc = nullptr;
// number of senders used to insert into OlapTable, if we only support single node insert,
// all data from select should collectted and then send to OlapTable.
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
int64_t _backend_id = -1;
int _stream_per_node = -1;
int _total_streams = -1;
int _num_local_sink = -1;
bool _is_high_priority = false;
bool _write_file_cache = false;
// TODO(zc): think about cache this data
std::shared_ptr<OlapTableSchemaParam> _schema;
OlapTableLocationParam* _location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
std::unique_ptr<OlapTabletFinder> _tablet_finder;
std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
// Stats for this
int64_t _send_data_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
RuntimeProfile::Counter* _row_distribution_timer = nullptr;
RuntimeProfile::Counter* _write_memtable_timer = nullptr;
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
// Save the status of close() method
Status _close_status;
VOlapTablePartitionParam* _vpartition = nullptr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
RuntimeState* _state = nullptr;
std::unordered_set<int64_t> _opened_partitions;
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
std::unordered_map<int64_t, std::shared_ptr<::doris::stream_load::LoadStreams>>
_streams_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
std::vector<RowPartTabletIds> _row_part_tablet_ids;
Status _close_status = Status::OK();
};
} // namespace vectorized

View File

@ -0,0 +1,541 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/writer/vtablet_writer_v2.h"
#include <brpc/uri.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <execution>
#include <mutex>
#include <ranges>
#include <string>
#include <unordered_map>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/object_pool.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "olap/delta_writer_v2.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
namespace doris {
namespace vectorized {
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs)
: AsyncResultWriter(output_exprs), _t_sink(t_sink) {
DCHECK(t_sink.__isset.olap_table_sink);
}
VTabletWriterV2::~VTabletWriterV2() = default;
Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
// add new tablet locations. it will use by address. so add to pool
auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
_location->add_locations(*new_locations);
// update new node info
_nodes_info->add_nodes(result->nodes);
// incremental open stream
RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
return Status::OK();
}
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
}
Status VTabletWriterV2::_incremental_open_streams(
const std::vector<TOlapTablePartition>& partitions) {
// do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
std::unordered_set<int64_t> known_indexes;
std::unordered_set<int64_t> new_backends;
for (const auto& t_partition : partitions) {
VOlapTablePartition* partition = nullptr;
RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto nodes = _location->find_tablet(tablet_id)->node_ids;
for (auto& node : nodes) {
PTabletID tablet;
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
if (!_streams_for_node.contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
}
}
}
for (int64_t node_id : new_backends) {
auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, _backend_id, node_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
_streams_for_node[node_id] = load_streams;
}
return Status::OK();
}
Status VTabletWriterV2::_init_row_distribution() {
VRowDistributionContext ctx;
ctx.state = _state;
ctx.block_convertor = _block_convertor.get();
ctx.tablet_finder = _tablet_finder.get();
ctx.vpartition = _vpartition;
ctx.add_partition_request_timer = _add_partition_request_timer;
ctx.txn_id = _txn_id;
ctx.pool = _pool;
ctx.location = _location;
ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
ctx.on_partitions_created = &vectorized::on_partitions_created;
ctx.caller = (void*)this;
ctx.schema = _schema;
_row_distribution.init(&ctx);
RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
return Status::OK();
}
Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) {
_pool = pool;
_group_commit = group_commit;
return Status::OK();
}
Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
auto& table_sink = _t_sink.olap_table_sink;
_load_id.set_hi(table_sink.load_id.hi);
_load_id.set_lo(table_sink.load_id.lo);
signal::set_signal_task_id(_load_id);
_txn_id = table_sink.txn_id;
_num_replicas = table_sink.num_replicas;
_tuple_desc_id = table_sink.tuple_id;
_write_file_cache = table_sink.write_file_cache;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
// if distributed column list is empty, we can ensure that tablet is with random distribution info
// and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
// for the whole olap table sink
auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
if (table_sink.partition.distributed_columns.empty()) {
if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
} else {
find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
}
}
_vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
RETURN_IF_ERROR(_vpartition->init());
_state = state;
_profile = profile;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_backend_id = state->backend_id();
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
_num_local_sink = state->num_local_sink();
DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node
<< ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
_is_high_priority =
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("VTabletWriterV2"));
_mem_tracker =
std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id()));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("unknown destination tuple descriptor, id = {}",
_tuple_desc_id);
}
_block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
_state->batch_size());
_output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false));
// add all counter
_input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
_row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime");
_write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime");
if (config::share_delta_writers) {
_delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
_load_id, _num_local_sink);
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
return Status::OK();
}
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
RETURN_IF_ERROR(_init(state, profile));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(_backend_id));
RETURN_IF_ERROR(_init_row_distribution());
return Status::OK();
}
Status VTabletWriterV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
_streams_for_node[dst_id] = streams;
}
return Status::OK();
}
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id,
::doris::stream_load::LoadStreams& streams) {
const auto* node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
_txn_id, *_schema, tablets_for_schema, _total_streams,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info,
_txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
}
return Status::OK();
}
void VTabletWriterV2::_build_tablet_node_mapping() {
std::unordered_set<int64_t> known_indexes;
for (const auto& partition : _vpartition->get_partitions()) {
for (const auto& index : partition->indexes) {
for (const auto& tablet_id : index.tablets) {
auto nodes = _location->find_tablet(tablet_id)->node_ids;
for (auto& node : nodes) {
PTabletID tablet;
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node].emplace(tablet_id, tablet);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
}
}
}
}
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
RowsForTablet& rows_for_tablet) {
for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
for (int i = 0; i < row_ids.size(); i++) {
auto& tablet_id = tablet_ids[i];
auto it = rows_for_tablet.find(tablet_id);
if (it == rows_for_tablet.end()) {
Rows rows;
rows.partition_id = partition_ids[i];
rows.index_id = _schema->indexes()[index_idx]->index_id;
rows.row_idxes.reserve(row_ids.size());
auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
it = tmp_it;
}
it->second.row_idxes.push_back(row_ids[i]);
_number_output_rows++;
}
}
}
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams) {
auto location = _location->find_tablet(tablet_id);
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
for (auto& node_id : location->node_ids) {
PTabletID tablet;
tablet.set_partition_id(partition_id);
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
}
Status VTabletWriterV2::append_block(Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
if (_state->query_options().dry_run_query) {
return status;
}
auto input_rows = input_block.rows();
auto input_bytes = input_block.bytes();
if (UNLIKELY(input_rows == 0)) {
return status;
}
SCOPED_TIMER(_profile->total_time_counter());
_number_input_rows += input_rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
_state->update_num_rows_load_total(input_rows);
_state->update_num_bytes_load_total(input_bytes);
DorisMetrics::instance()->load_rows->increment(input_rows);
DorisMetrics::instance()->load_bytes->increment(input_bytes);
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
SCOPED_RAW_TIMER(&_send_data_ns);
// This is just for passing compilation.
_row_distribution_watch.start();
std::shared_ptr<vectorized::Block> block;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids));
RowsForTablet rows_for_tablet;
_generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
_row_distribution_watch.stop();
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
Streams streams;
RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
}
return Status::OK();
}
Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams) {
DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
.index_id = rows.index_id,
.partition_id = rows.partition_id,
.load_id = _load_id,
.tuple_desc = _output_tuple_desc,
.table_schema_param = _schema.get(),
.is_high_priority = _is_high_priority,
.write_file_cache = _write_file_cache,
};
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
req.schema_hash = index->schema_hash;
break;
}
}
return DeltaWriterV2::open(&req, streams);
});
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
}
SCOPED_TIMER(_write_memtable_timer);
auto st = delta_writer->write(block.get(), rows.row_idxes, false);
return st;
}
Status VTabletWriterV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
if (_delta_writer_for_tablet) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
return Status::OK();
}
Status VTabletWriterV2::close(Status exec_status) {
std::lock_guard<std::mutex> close_lock(_close_mutex);
if (_is_closed) {
return _close_status;
}
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that _profile is null
SCOPED_TIMER(_profile->total_time_counter());
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter,
_block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
// release streams from the pool first, to prevent memory leak
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
_delta_writer_for_tablet.reset();
}
{
// send CLOSE_LOAD to all streams, return ERROR if any
for (const auto& [_, streams] : _streams_for_node) {
RETURN_IF_ERROR(_close_load(streams->streams()));
}
}
{
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
}
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (const auto& [node_id, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = node_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
}
}
}
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
_streams_for_node.clear();
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
_state->num_rows_load_unselected();
_state->set_num_rows_load_total(num_rows_load_total);
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
_state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id;
} else {
RETURN_IF_ERROR(_cancel(status));
}
_is_closed = true;
_close_status = status;
return status;
}
Status VTabletWriterV2::_close_load(const Streams& streams) {
auto node_id = streams[0]->dst_id();
std::vector<PTabletID> tablets_to_commit;
for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
tablets_to_commit.push_back(tablet);
}
}
for (const auto& stream : streams) {
RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
}
return Status::OK();
}
} // namespace vectorized
} // namespace doris

View File

@ -0,0 +1,233 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/controller.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <atomic>
#include <cstddef>
#include <cstdint>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/vrow_distribution.h"
#include "vec/sink/writer/async_result_writer.h"
namespace doris {
class DeltaWriterV2;
class LoadStreamStub;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class TDataSink;
class TExpr;
class TabletSchema;
class TupleDescriptor;
namespace stream_load {
class LoadStreams;
}
namespace vectorized {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class VTabletWriterV2;
class DeltaWriterV2Map;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
struct Rows {
int64_t partition_id;
int64_t index_id;
std::vector<int32_t> row_idxes;
};
using RowsForTablet = std::unordered_map<int64_t, Rows>;
// Write block data to Olap Table.
// When OlapTableSink::open() called, there will be a consumer thread running in the background.
// When you call VTabletWriterV2::send(), you will be the producer who products pending batches.
// Join the consumer thread in close().
class VTabletWriterV2 final : public AsyncResultWriter {
public:
// Construct from thrift struct which is generated by FE.
VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs);
~VTabletWriterV2() override;
Status init_properties(ObjectPool* pool, bool group_commit);
Status append_block(Block& block) override;
Status open(RuntimeState* state, RuntimeProfile* profile) override;
Status close(Status close_status) override;
Status on_partitions_created(TCreatePartitionResult* result);
private:
Status _init_row_distribution();
Status _init(RuntimeState* state, RuntimeProfile* profile);
Status _open_streams(int64_t src_id);
Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams);
Status _incremental_open_streams(const std::vector<TOlapTablePartition>& partitions);
void _build_tablet_node_mapping();
void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
RowsForTablet& rows_for_tablet);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams);
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
Streams& streams);
Status _close_load(const Streams& streams);
Status _cancel(Status status);
std::shared_ptr<MemTracker> _mem_tracker;
TDataSink _t_sink;
ObjectPool* _pool;
// unique load id
PUniqueId _load_id;
int64_t _txn_id = -1;
int _num_replicas = -1;
int _tuple_desc_id = -1;
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
RowDescriptor* _output_row_desc = nullptr;
// number of senders used to insert into OlapTable, if we only support single node insert,
// all data from select should collectted and then send to OlapTable.
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
int64_t _backend_id = -1;
int _stream_per_node = -1;
int _total_streams = -1;
int _num_local_sink = -1;
bool _is_high_priority = false;
bool _write_file_cache = false;
// TODO(zc): think about cache this data
std::shared_ptr<OlapTableSchemaParam> _schema;
OlapTableLocationParam* _location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
std::unique_ptr<OlapTabletFinder> _tablet_finder;
std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
// Stats for this
int64_t _send_data_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
MonotonicStopWatch _row_distribution_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
RuntimeProfile::Counter* _row_distribution_timer = nullptr;
RuntimeProfile::Counter* _write_memtable_timer = nullptr;
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
std::mutex _close_mutex;
bool _is_closed = false;
// Save the status of close() method
Status _close_status;
VOlapTablePartitionParam* _vpartition = nullptr;
RuntimeState* _state = nullptr; // not owned, set when open
RuntimeProfile* _profile = nullptr; // not owned, set when open
bool _group_commit = false;
std::unordered_set<int64_t> _opened_partitions;
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
std::unordered_map<int64_t, std::shared_ptr<::doris::stream_load::LoadStreams>>
_streams_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
std::vector<RowPartTabletIds> _row_part_tablet_ids;
};
} // namespace vectorized
} // namespace doris