From e29d8cb110849637fc7beca5b5d76b103b9c95bf Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 16 Nov 2023 15:00:55 +0800 Subject: [PATCH] [feature](move-memtable) support pipelineX in sink v2 (#27067) --- be/src/exec/data_sink.cpp | 8 +- .../exec/olap_table_sink_v2_operator.cpp | 50 ++ .../exec/olap_table_sink_v2_operator.h | 74 ++- be/src/pipeline/pipeline_x/operator.cpp | 3 + .../pipeline_x_fragment_context.cpp | 30 +- .../pipeline_x/pipeline_x_fragment_context.h | 3 + be/src/vec/sink/vtablet_sink_v2.cpp | 505 +--------------- be/src/vec/sink/vtablet_sink_v2.h | 178 +----- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 541 ++++++++++++++++++ be/src/vec/sink/writer/vtablet_writer_v2.h | 233 ++++++++ 10 files changed, 948 insertions(+), 677 deletions(-) create mode 100644 be/src/pipeline/exec/olap_table_sink_v2_operator.cpp create mode 100644 be/src/vec/sink/writer/vtablet_writer_v2.cpp create mode 100644 be/src/vec/sink/writer/vtablet_writer_v2.h diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index f849dc84a2..970e7a3a18 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink break; } case TDataSinkType::OLAP_TABLE_SINK: { - Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status)); + sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false)); } - RETURN_IF_ERROR(status); break; } case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: { @@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink break; } case TDataSinkType::OLAP_TABLE_SINK: { - Status status = Status::OK(); DCHECK(thrift_sink.__isset.olap_table_sink); if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status)); + sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false)); } else { sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false)); } - RETURN_IF_ERROR(status); break; } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp new file mode 100644 index 0000000000..99efc1d752 --- /dev/null +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap_table_sink_v2_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { + +OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast(); + RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit)); + return Status::OK(); +} + +Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status exec_status) { + if (Base::_closed) { + return Status::OK(); + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(exec_time_counter()); + if (_closed) { + return _close_status; + } + _close_status = Base::close(state, exec_status); + return _close_status; +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index f280e856f0..5fb8f64dd3 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -18,6 +18,7 @@ #pragma once #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/vtablet_sink_v2.h" namespace doris { @@ -41,9 +42,76 @@ public: bool can_write() override { return true; } // TODO: need use mem_limit }; -OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() { - return std::make_shared(this, _sink); -} +class OlapTableSinkV2OperatorX; + +class OlapTableSinkV2LocalState final + : public AsyncWriterSink { +public: + using Base = AsyncWriterSink; + using Parent = OlapTableSinkV2OperatorX; + ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState); + OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class OlapTableSinkV2OperatorX; + +private: + Status _close_status = Status::OK(); +}; + +class OlapTableSinkV2OperatorX final : public DataSinkOperatorX { +public: + using Base = DataSinkOperatorX; + OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector& t_output_expr, bool group_commit) + : Base(operator_id, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _group_commit(group_commit), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, source_state); + } + +private: + friend class OlapTableSinkV2LocalState; + template + friend class AsyncWriterSink; + const RowDescriptor& _row_desc; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + const std::vector& _t_output_expr; + const bool _group_commit; + ObjectPool* _pool; +}; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index f8430a5715..9e6df06da0 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -46,6 +46,7 @@ #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" +#include "pipeline/exec/olap_table_sink_v2_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState) DECLARE_OPERATOR_X(JdbcTableSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkLocalState) +DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) @@ -624,5 +626,6 @@ template class PipelineXLocalState; template class AsyncWriterSink; template class AsyncWriterSink; template class AsyncWriterSink; +template class AsyncWriterSink; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 200ac23504..7113989ee1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -62,6 +62,7 @@ #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" +#include "pipeline/exec/olap_table_sink_v2_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -268,9 +269,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData break; } case TDataSinkType::OLAP_TABLE_SINK: { - if (state->query_options().enable_memtable_on_sink_node) { - return Status::InternalError( - "Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node "); + if (state->query_options().enable_memtable_on_sink_node && + !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { + _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc, + output_exprs, false)); } else { _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs, false)); @@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); _runtime_states[i]->resize_op_id_to_local_state(max_operator_id()); + _runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node); + _runtime_states[i]->set_total_load_streams(request.total_load_streams); + _runtime_states[i]->set_num_local_sink(request.num_local_sink); std::map pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique( @@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) { std::placeholders::_2)}, shared_from_this()); } + +bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { + OlapTableSchemaParam schema; + if (!schema.init(sink.schema).ok()) { + return false; + } + if (schema.is_partial_update()) { + return true; + } + for (const auto& index_schema : schema.indexes()) { + for (const auto& index : index_schema->indexes) { + if (index->index_type() == INVERTED) { + return true; + } + } + } + return false; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 4d2a59277e..6fa91aedf1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -145,6 +145,9 @@ private: const TPipelineFragmentParams& params, const RowDescriptor& row_desc, RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); + + bool _has_inverted_index_or_partial_update(TOlapTableSink sink); + OperatorXPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index ac9be0e7fb..9385bd9320 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -17,47 +17,22 @@ #include "vec/sink/vtablet_sink_v2.h" -#include -#include -#include #include #include -#include -#include -#include -#include -#include -#include #include -#include #include #include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" #include "common/object_pool.h" -#include "common/signal_handler.h" #include "common/status.h" -#include "exec/tablet_info.h" #include "olap/delta_writer_v2.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" #include "runtime/runtime_state.h" -#include "runtime/thread_context.h" -#include "service/brpc.h" -#include "util/brpc_client_cache.h" #include "util/doris_metrics.h" -#include "util/network_util.h" -#include "util/threadpool.h" -#include "util/thrift_util.h" -#include "util/uid_util.h" -#include "vec/core/block.h" -#include "vec/exprs/vexpr.h" #include "vec/sink/delta_writer_v2_pool.h" #include "vec/sink/load_stream_stub.h" #include "vec/sink/load_stream_stub_pool.h" -#include "vec/sink/vtablet_block_convertor.h" -#include "vec/sink/vtablet_finder.h" namespace doris { class TExpr; @@ -65,391 +40,16 @@ class TExpr; namespace vectorized { VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector& texprs, Status* status) - : DataSink(row_desc), _pool(pool) { - // From the thrift expressions create the real exprs. - *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs); - _name = "VOlapTableSinkV2"; -} + const std::vector& texprs, bool group_commit) + : AsyncWriterSink(row_desc, texprs), + _pool(pool), + _group_commit(group_commit) {} VOlapTableSinkV2::~VOlapTableSinkV2() = default; -Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { - // add new tablet locations. it will use by address. so add to pool - auto* new_locations = _pool->add(new std::vector(result->tablets)); - _location->add_locations(*new_locations); - - // update new node info - _nodes_info->add_nodes(result->nodes); - - // incremental open stream - RETURN_IF_ERROR(_incremental_open_streams(result->partitions)); - - return Status::OK(); -} - -static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { - return static_cast(writer)->on_partitions_created(result); -} - -Status VOlapTableSinkV2::_incremental_open_streams( - const std::vector& partitions) { - // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. - std::unordered_set known_indexes; - std::unordered_set new_backends; - for (const auto& t_partition : partitions) { - VOlapTablePartition* partition = nullptr; - RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition)); - for (const auto& index : partition->indexes) { - for (const auto& tablet_id : index.tablets) { - auto nodes = _location->find_tablet(tablet_id)->node_ids; - for (auto& node : nodes) { - PTabletID tablet; - tablet.set_partition_id(partition->id); - tablet.set_index_id(index.index_id); - tablet.set_tablet_id(tablet_id); - if (!_streams_for_node.contains(node)) { - new_backends.insert(node); - } - _tablets_for_node[node].emplace(tablet_id, tablet); - if (known_indexes.contains(index.index_id)) [[likely]] { - continue; - } - _indexes_from_node[node].emplace_back(tablet); - known_indexes.insert(index.index_id); - } - } - } - } - for (int64_t node_id : new_backends) { - auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); - RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); - _streams_for_node[node_id] = load_streams; - } - return Status::OK(); -} - -Status VOlapTableSinkV2::_init_row_distribution() { - VRowDistributionContext ctx; - - ctx.state = _state; - ctx.block_convertor = _block_convertor.get(); - ctx.tablet_finder = _tablet_finder.get(); - ctx.vpartition = _vpartition; - ctx.add_partition_request_timer = _add_partition_request_timer; - ctx.txn_id = _txn_id; - ctx.pool = _pool; - ctx.location = _location; - ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs; - ctx.on_partitions_created = &vectorized::on_partitions_created; - ctx.caller = (void*)this; - ctx.schema = _schema; - - _row_distribution.init(&ctx); - - RETURN_IF_ERROR(_row_distribution.open(_output_row_desc)); - - return Status::OK(); -} - Status VOlapTableSinkV2::init(const TDataSink& t_sink) { - DCHECK(t_sink.__isset.olap_table_sink); - auto& table_sink = t_sink.olap_table_sink; - _load_id.set_hi(table_sink.load_id.hi); - _load_id.set_lo(table_sink.load_id.lo); - _txn_id = table_sink.txn_id; - _num_replicas = table_sink.num_replicas; - _tuple_desc_id = table_sink.tuple_id; - _write_file_cache = table_sink.write_file_cache; - _schema.reset(new OlapTableSchemaParam()); - RETURN_IF_ERROR(_schema->init(table_sink.schema)); - _location = _pool->add(new OlapTableLocationParam(table_sink.location)); - _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); - - // if distributed column list is empty, we can ensure that tablet is with random distribution info - // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition - // for the whole olap table sink - auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; - if (table_sink.partition.distributed_columns.empty()) { - if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { - find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; - } else { - find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; - } - } - _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); - _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); - RETURN_IF_ERROR(_vpartition->init()); - - return Status::OK(); -} - -Status VOlapTableSinkV2::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - - _state = state; - - _sender_id = state->per_fragment_instance_idx(); - _num_senders = state->num_per_fragment_instances(); - _backend_id = state->backend_id(); - _stream_per_node = state->load_stream_per_node(); - _total_streams = state->total_load_streams(); - _num_local_sink = state->num_local_sink(); - DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; - DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; - DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; - LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node - << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; - _is_high_priority = - (state->execution_timeout() <= config::load_task_high_priority_threshold_second); - - // profile must add to state's object pool - _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2")); - _mem_tracker = std::make_shared("VOlapTableSinkV2:" + - std::to_string(state->load_job_id())); - SCOPED_TIMER(_profile->total_time_counter()); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - - // get table's tuple descriptor - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); - if (_output_tuple_desc == nullptr) { - return Status::InternalError("unknown destination tuple descriptor, id = {}", - _tuple_desc_id); - } - _block_convertor = std::make_unique(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); - _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); - - // add all counter - _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); - _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); - _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); - _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); - _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); - _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime"); - _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime"); - _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); - _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); - _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); - _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); - - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); - if (config::share_delta_writers) { - _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( - _load_id, _num_local_sink); - } else { - _delta_writer_for_tablet = std::make_shared(_load_id); - } - return Status::OK(); -} - -Status VOlapTableSinkV2::open(RuntimeState* state) { - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); - SCOPED_TIMER(_profile->total_time_counter()); - SCOPED_TIMER(_open_timer); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - signal::set_signal_task_id(_load_id); - - _build_tablet_node_mapping(); - RETURN_IF_ERROR(_open_streams(_backend_id)); - RETURN_IF_ERROR(_init_row_distribution()); - - return Status::OK(); -} - -Status VOlapTableSinkV2::_open_streams(int64_t src_id) { - for (auto& [dst_id, _] : _tablets_for_node) { - auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); - RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); - _streams_for_node[dst_id] = streams; - } - return Status::OK(); -} - -Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id, - ::doris::stream_load::LoadStreams& streams) { - auto node_info = _nodes_info->find_node(dst_id); - if (node_info == nullptr) { - return Status::InternalError("Unknown node {} in tablet location", dst_id); - } - // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams.streams() | std::ranges::views::take(1)) { - const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - _state->enable_profile())); - } - // for the rest streams, open without getting tablet schema - for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, {}, _total_streams, - _state->enable_profile())); - } - return Status::OK(); -} - -void VOlapTableSinkV2::_build_tablet_node_mapping() { - std::unordered_set known_indexes; - for (const auto& partition : _vpartition->get_partitions()) { - for (const auto& index : partition->indexes) { - for (const auto& tablet_id : index.tablets) { - auto nodes = _location->find_tablet(tablet_id)->node_ids; - for (auto& node : nodes) { - PTabletID tablet; - tablet.set_partition_id(partition->id); - tablet.set_index_id(index.index_id); - tablet.set_tablet_id(tablet_id); - _tablets_for_node[node].emplace(tablet_id, tablet); - if (known_indexes.contains(index.index_id)) [[likely]] { - continue; - } - _indexes_from_node[node].emplace_back(tablet); - known_indexes.insert(index.index_id); - } - } - } - } -} - -void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector& row_part_tablet_ids, - RowsForTablet& rows_for_tablet) { - for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { - auto& row_ids = row_part_tablet_ids[index_idx].row_ids; - auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; - auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; - - for (int i = 0; i < row_ids.size(); i++) { - auto& tablet_id = tablet_ids[i]; - auto it = rows_for_tablet.find(tablet_id); - if (it == rows_for_tablet.end()) { - Rows rows; - rows.partition_id = partition_ids[i]; - rows.index_id = _schema->indexes()[index_idx]->index_id; - rows.row_idxes.reserve(row_ids.size()); - auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); - it = tmp_it; - } - it->second.row_idxes.push_back(row_ids[i]); - _number_output_rows++; - } - } -} - -Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams) { - auto location = _location->find_tablet(tablet_id); - if (location == nullptr) { - return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); - } - for (auto& node_id : location->node_ids) { - PTabletID tablet; - tablet.set_partition_id(partition_id); - tablet.set_index_id(index_id); - tablet.set_tablet_id(tablet_id); - _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); - RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); - } - _stream_index = (_stream_index + 1) % _stream_per_node; - return Status::OK(); -} - -Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Status status = Status::OK(); - - if (state->query_options().dry_run_query) { - return status; - } - - auto input_rows = input_block->rows(); - auto input_bytes = input_block->bytes(); - if (UNLIKELY(input_rows == 0)) { - return status; - } - SCOPED_TIMER(_profile->total_time_counter()); - _number_input_rows += input_rows; - // update incrementally so that FE can get the progress. - // the real 'num_rows_load_total' will be set when sink being closed. - state->update_num_rows_load_total(input_rows); - state->update_num_bytes_load_total(input_bytes); - DorisMetrics::instance()->load_rows->increment(input_rows); - DorisMetrics::instance()->load_bytes->increment(input_bytes); - - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - - SCOPED_RAW_TIMER(&_send_data_ns); - // This is just for passing compilation. - _row_distribution_watch.start(); - - std::shared_ptr block; - RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - *input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); - RowsForTablet rows_for_tablet; - _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); - - _row_distribution_watch.stop(); - - // For each tablet, send its input_rows from block to delta writer - for (const auto& [tablet_id, rows] : rows_for_tablet) { - Streams streams; - RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams)); - RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); - } - - return Status::OK(); -} - -Status VOlapTableSinkV2::_write_memtable(std::shared_ptr block, - int64_t tablet_id, const Rows& rows, - const Streams& streams) { - DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { - WriteRequest req { - .tablet_id = tablet_id, - .txn_id = _txn_id, - .index_id = rows.index_id, - .partition_id = rows.partition_id, - .load_id = _load_id, - .tuple_desc = _output_tuple_desc, - .table_schema_param = _schema.get(), - .is_high_priority = _is_high_priority, - .write_file_cache = _write_file_cache, - }; - for (auto& index : _schema->indexes()) { - if (index->index_id == rows.index_id) { - req.slots = &index->slots; - req.schema_hash = index->schema_hash; - break; - } - } - return DeltaWriterV2::open(&req, streams); - }); - { - SCOPED_TIMER(_wait_mem_limit_timer); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); - } - SCOPED_TIMER(_write_memtable_timer); - auto st = delta_writer->write(block.get(), rows.row_idxes, false); - return st; -} - -Status VOlapTableSinkV2::_cancel(Status status) { - LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", due to error: " << status; - if (_delta_writer_for_tablet) { - _delta_writer_for_tablet->cancel(status); - _delta_writer_for_tablet.reset(); - } - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } + RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); + RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit)); return Status::OK(); } @@ -457,97 +57,8 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { if (_closed) { return _close_status; } - SCOPED_TIMER(_close_timer); - Status status = exec_status; - if (status.ok()) { - // only if status is ok can we call this _profile->total_time_counter(). - // if status is not ok, this sink may not be prepared, so that _profile is null - SCOPED_TIMER(_profile->total_time_counter()); - - COUNTER_SET(_input_rows_counter, _number_input_rows); - COUNTER_SET(_output_rows_counter, _number_output_rows); - COUNTER_SET(_filtered_rows_counter, - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); - COUNTER_SET(_send_data_timer, _send_data_ns); - COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); - COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - - // release streams from the pool first, to prevent memory leak - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } - - { - SCOPED_TIMER(_close_writer_timer); - // close all delta writers if this is the last user - RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile)); - _delta_writer_for_tablet.reset(); - } - - { - // send CLOSE_LOAD to all streams, return ERROR if any - for (const auto& [_, streams] : _streams_for_node) { - RETURN_IF_ERROR(_close_load(streams->streams())); - } - } - - { - SCOPED_TIMER(_close_load_timer); - for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - RETURN_IF_ERROR(stream->close_wait()); - } - } - } - - std::vector tablet_commit_infos; - for (const auto& [node_id, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - for (auto tablet_id : stream->success_tablets()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = node_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); - } - } - } - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); - _streams_for_node.clear(); - - // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + - state->num_rows_load_unselected(); - state->set_num_rows_load_total(num_rows_load_total); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows()); - state->update_num_rows_load_unselected( - _tablet_finder->num_immutable_partition_filtered_rows()); - - LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id; - } else { - RETURN_IF_ERROR(_cancel(status)); - } - - _close_status = status; - RETURN_IF_ERROR(DataSink::close(state, exec_status)); - return status; -} - -Status VOlapTableSinkV2::_close_load(const Streams& streams) { - auto node_id = streams[0]->dst_id(); - std::vector tablets_to_commit; - for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { - if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { - tablets_to_commit.push_back(tablet); - } - } - for (const auto& stream : streams) { - RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); - } - return Status::OK(); + _close_status = AsyncWriterSink::close(state, exec_status); + return _close_status; } } // namespace vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 1f317420de..cef4659bdd 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -26,204 +26,46 @@ #include #include #include -#include -#include #include +#include +#include // IWYU pragma: no_include #include // IWYU pragma: keep -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include "common/config.h" #include "common/status.h" -#include "exec/data_sink.h" -#include "exec/tablet_info.h" -#include "gutil/ref_counted.h" #include "runtime/exec_env.h" -#include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" #include "runtime/types.h" -#include "util/countdown_latch.h" -#include "util/runtime_profile.h" -#include "util/stopwatch.hpp" -#include "vec/columns/column.h" -#include "vec/common/allocator.h" -#include "vec/common/hash_table/phmap_fwd_decl.h" -#include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/sink/vrow_distribution.h" +#include "vec/sink/async_writer_sink.h" +#include "vec/sink/writer/vtablet_writer_v2.h" namespace doris { -class DeltaWriterV2; -class LoadStreamStub; -class ObjectPool; -class RowDescriptor; -class RuntimeState; -class TDataSink; -class TExpr; -class TabletSchema; -class TupleDescriptor; - -namespace stream_load { -class LoadStreams; -} namespace vectorized { -class OlapTableBlockConvertor; -class OlapTabletFinder; -class VOlapTableSinkV2; -class DeltaWriterV2Map; +inline constexpr char VOLAP_TABLE_SINK_V2[] = "VOlapTableSinkV2"; -using Streams = std::vector>; - -struct Rows { - int64_t partition_id; - int64_t index_id; - std::vector row_idxes; -}; - -using RowsForTablet = std::unordered_map; - -// Write block data to Olap Table. -// When OlapTableSink::open() called, there will be a consumer thread running in the background. -// When you call VOlapTableSinkV2::send(), you will be the producer who products pending batches. -// Join the consumer thread in close(). -class VOlapTableSinkV2 final : public DataSink { +class VOlapTableSinkV2 final : public AsyncWriterSink { public: // Construct from thrift struct which is generated by FE. VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector& texprs, Status* status); + const std::vector& texprs, bool group_commit); ~VOlapTableSinkV2() override; Status init(const TDataSink& sink) override; - // TODO: unify the code of prepare/open/close with result sink - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; - - Status close(RuntimeState* state, Status close_status) override; - - Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; - - Status on_partitions_created(TCreatePartitionResult* result); + Status close(RuntimeState* state, Status exec_status) override; private: - Status _init_row_distribution(); - - Status _open_streams(int64_t src_id); - - Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams); - - Status _incremental_open_streams(const std::vector& partitions); - - void _build_tablet_node_mapping(); - - void _generate_rows_for_tablet(std::vector& row_part_tablet_ids, - RowsForTablet& rows_for_tablet); - - Status _write_memtable(std::shared_ptr block, int64_t tablet_id, - const Rows& rows, const Streams& streams); - - Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams); - - Status _close_load(const Streams& streams); - - Status _cancel(Status status); - - std::shared_ptr _mem_tracker; - ObjectPool* _pool; - // unique load id - PUniqueId _load_id; - int64_t _txn_id = -1; - int _num_replicas = -1; - int _tuple_desc_id = -1; + bool _group_commit = false; - // this is tuple descriptor of destination OLAP table - TupleDescriptor* _output_tuple_desc = nullptr; - RowDescriptor* _output_row_desc = nullptr; - - // number of senders used to insert into OlapTable, if we only support single node insert, - // all data from select should collectted and then send to OlapTable. - // To support multiple senders, we maintain a channel for each sender. - int _sender_id = -1; - int _num_senders = -1; - int64_t _backend_id = -1; - int _stream_per_node = -1; - int _total_streams = -1; - int _num_local_sink = -1; - bool _is_high_priority = false; - bool _write_file_cache = false; - - // TODO(zc): think about cache this data - std::shared_ptr _schema; - OlapTableLocationParam* _location = nullptr; - DorisNodesInfo* _nodes_info = nullptr; - - std::unique_ptr _tablet_finder; - - std::unique_ptr _block_convertor; - - // Stats for this - int64_t _send_data_ns = 0; - int64_t _number_input_rows = 0; - int64_t _number_output_rows = 0; - - MonotonicStopWatch _row_distribution_watch; - - RuntimeProfile::Counter* _input_rows_counter = nullptr; - RuntimeProfile::Counter* _output_rows_counter = nullptr; - RuntimeProfile::Counter* _filtered_rows_counter = nullptr; - RuntimeProfile::Counter* _send_data_timer = nullptr; - RuntimeProfile::Counter* _row_distribution_timer = nullptr; - RuntimeProfile::Counter* _write_memtable_timer = nullptr; - RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; - RuntimeProfile::Counter* _validate_data_timer = nullptr; - RuntimeProfile::Counter* _open_timer = nullptr; - RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _close_writer_timer = nullptr; - RuntimeProfile::Counter* _close_load_timer = nullptr; - RuntimeProfile::Counter* _add_partition_request_timer = nullptr; - - // Save the status of close() method - Status _close_status; - - VOlapTablePartitionParam* _vpartition = nullptr; - vectorized::VExprContextSPtrs _output_vexpr_ctxs; - - RuntimeState* _state = nullptr; - - std::unordered_set _opened_partitions; - - std::unordered_map> _tablets_for_node; - std::unordered_map> _indexes_from_node; - - std::unordered_map> - _streams_for_node; - - size_t _stream_index = 0; - std::shared_ptr _delta_writer_for_tablet; - - VRowDistribution _row_distribution; - // reuse to avoid frequent memory allocation and release. - std::vector _row_part_tablet_ids; + Status _close_status = Status::OK(); }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp new file mode 100644 index 0000000000..7cf553fdda --- /dev/null +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -0,0 +1,541 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vtablet_writer_v2.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/object_pool.h" +#include "common/signal_handler.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "olap/delta_writer_v2.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "runtime/thread_context.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/doris_metrics.h" +#include "util/threadpool.h" +#include "util/thrift_util.h" +#include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/sink/delta_writer_v2_pool.h" +#include "vec/sink/load_stream_stub.h" +#include "vec/sink/load_stream_stub_pool.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris { + +namespace vectorized { + +VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) + : AsyncResultWriter(output_exprs), _t_sink(t_sink) { + DCHECK(t_sink.__isset.olap_table_sink); +} + +VTabletWriterV2::~VTabletWriterV2() = default; + +Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open stream + RETURN_IF_ERROR(_incremental_open_streams(result->partitions)); + + return Status::OK(); +} + +static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +Status VTabletWriterV2::_incremental_open_streams( + const std::vector& partitions) { + // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. + std::unordered_set known_indexes; + std::unordered_set new_backends; + for (const auto& t_partition : partitions) { + VOlapTablePartition* partition = nullptr; + RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition)); + for (const auto& index : partition->indexes) { + for (const auto& tablet_id : index.tablets) { + auto nodes = _location->find_tablet(tablet_id)->node_ids; + for (auto& node : nodes) { + PTabletID tablet; + tablet.set_partition_id(partition->id); + tablet.set_index_id(index.index_id); + tablet.set_tablet_id(tablet_id); + if (!_streams_for_node.contains(node)) { + new_backends.insert(node); + } + _tablets_for_node[node].emplace(tablet_id, tablet); + if (known_indexes.contains(index.index_id)) [[likely]] { + continue; + } + _indexes_from_node[node].emplace_back(tablet); + known_indexes.insert(index.index_id); + } + } + } + } + for (int64_t node_id : new_backends) { + auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); + RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); + _streams_for_node[node_id] = load_streams; + } + return Status::OK(); +} + +Status VTabletWriterV2::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void*)this; + ctx.schema = _schema; + + _row_distribution.init(&ctx); + + RETURN_IF_ERROR(_row_distribution.open(_output_row_desc)); + + return Status::OK(); +} + +Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) { + _pool = pool; + _group_commit = group_commit; + return Status::OK(); +} + +Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { + auto& table_sink = _t_sink.olap_table_sink; + _load_id.set_hi(table_sink.load_id.hi); + _load_id.set_lo(table_sink.load_id.lo); + signal::set_signal_task_id(_load_id); + _txn_id = table_sink.txn_id; + _num_replicas = table_sink.num_replicas; + _tuple_desc_id = table_sink.tuple_id; + _write_file_cache = table_sink.write_file_cache; + _schema.reset(new OlapTableSchemaParam()); + RETURN_IF_ERROR(_schema->init(table_sink.schema)); + _location = _pool->add(new OlapTableLocationParam(table_sink.location)); + _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); + + // if distributed column list is empty, we can ensure that tablet is with random distribution info + // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition + // for the whole olap table sink + auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + if (table_sink.partition.distributed_columns.empty()) { + if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; + } else { + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; + } + } + _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); + _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); + RETURN_IF_ERROR(_vpartition->init()); + + _state = state; + _profile = profile; + + _sender_id = state->per_fragment_instance_idx(); + _num_senders = state->num_per_fragment_instances(); + _backend_id = state->backend_id(); + _stream_per_node = state->load_stream_per_node(); + _total_streams = state->total_load_streams(); + _num_local_sink = state->num_local_sink(); + DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; + DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; + DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; + LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node + << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; + _is_high_priority = + (state->execution_timeout() <= config::load_task_high_priority_threshold_second); + + // profile must add to state's object pool + _profile = state->obj_pool()->add(new RuntimeProfile("VTabletWriterV2")); + _mem_tracker = + std::make_shared("VTabletWriterV2:" + std::to_string(state->load_job_id())); + SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + + // get table's tuple descriptor + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); + if (_output_tuple_desc == nullptr) { + return Status::InternalError("unknown destination tuple descriptor, id = {}", + _tuple_desc_id); + } + _block_convertor = std::make_unique(_output_tuple_desc); + _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), + _state->batch_size()); + _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); + + // add all counter + _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); + _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); + _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); + _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); + _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", "SendDataTime"); + _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", "SendDataTime"); + _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); + _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); + + if (config::share_delta_writers) { + _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( + _load_id, _num_local_sink); + } else { + _delta_writer_for_tablet = std::make_shared(_load_id); + } + return Status::OK(); +} + +Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { + RETURN_IF_ERROR(_init(state, profile)); + SCOPED_TIMER(_profile->total_time_counter()); + SCOPED_TIMER(_open_timer); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + + _build_tablet_node_mapping(); + RETURN_IF_ERROR(_open_streams(_backend_id)); + RETURN_IF_ERROR(_init_row_distribution()); + + return Status::OK(); +} + +Status VTabletWriterV2::_open_streams(int64_t src_id) { + for (auto& [dst_id, _] : _tablets_for_node) { + auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); + RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); + _streams_for_node[dst_id] = streams; + } + return Status::OK(); +} + +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, + ::doris::stream_load::LoadStreams& streams) { + const auto* node_info = _nodes_info->find_node(dst_id); + if (node_info == nullptr) { + return Status::InternalError("Unknown node {} in tablet location", dst_id); + } + // get tablet schema from each backend only in the 1st stream + for (auto& stream : streams.streams() | std::ranges::views::take(1)) { + const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, tablets_for_schema, _total_streams, + _state->enable_profile())); + } + // for the rest streams, open without getting tablet schema + for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, {}, _total_streams, + _state->enable_profile())); + } + return Status::OK(); +} + +void VTabletWriterV2::_build_tablet_node_mapping() { + std::unordered_set known_indexes; + for (const auto& partition : _vpartition->get_partitions()) { + for (const auto& index : partition->indexes) { + for (const auto& tablet_id : index.tablets) { + auto nodes = _location->find_tablet(tablet_id)->node_ids; + for (auto& node : nodes) { + PTabletID tablet; + tablet.set_partition_id(partition->id); + tablet.set_index_id(index.index_id); + tablet.set_tablet_id(tablet_id); + _tablets_for_node[node].emplace(tablet_id, tablet); + if (known_indexes.contains(index.index_id)) [[likely]] { + continue; + } + _indexes_from_node[node].emplace_back(tablet); + known_indexes.insert(index.index_id); + } + } + } + } +} + +void VTabletWriterV2::_generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet) { + for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { + auto& row_ids = row_part_tablet_ids[index_idx].row_ids; + auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; + auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; + + for (int i = 0; i < row_ids.size(); i++) { + auto& tablet_id = tablet_ids[i]; + auto it = rows_for_tablet.find(tablet_id); + if (it == rows_for_tablet.end()) { + Rows rows; + rows.partition_id = partition_ids[i]; + rows.index_id = _schema->indexes()[index_idx]->index_id; + rows.row_idxes.reserve(row_ids.size()); + auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); + it = tmp_it; + } + it->second.row_idxes.push_back(row_ids[i]); + _number_output_rows++; + } + } +} + +Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, + Streams& streams) { + auto location = _location->find_tablet(tablet_id); + if (location == nullptr) { + return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); + } + for (auto& node_id : location->node_ids) { + PTabletID tablet; + tablet.set_partition_id(partition_id); + tablet.set_index_id(index_id); + tablet.set_tablet_id(tablet_id); + _tablets_for_node[node_id].emplace(tablet_id, tablet); + streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); + RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); + } + _stream_index = (_stream_index + 1) % _stream_per_node; + return Status::OK(); +} + +Status VTabletWriterV2::append_block(Block& input_block) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Status status = Status::OK(); + + if (_state->query_options().dry_run_query) { + return status; + } + + auto input_rows = input_block.rows(); + auto input_bytes = input_block.bytes(); + if (UNLIKELY(input_rows == 0)) { + return status; + } + SCOPED_TIMER(_profile->total_time_counter()); + _number_input_rows += input_rows; + // update incrementally so that FE can get the progress. + // the real 'num_rows_load_total' will be set when sink being closed. + _state->update_num_rows_load_total(input_rows); + _state->update_num_bytes_load_total(input_bytes); + DorisMetrics::instance()->load_rows->increment(input_rows); + DorisMetrics::instance()->load_bytes->increment(input_bytes); + + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + + SCOPED_RAW_TIMER(&_send_data_ns); + // This is just for passing compilation. + _row_distribution_watch.start(); + + std::shared_ptr block; + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + RowsForTablet rows_for_tablet; + _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); + + _row_distribution_watch.stop(); + + // For each tablet, send its input_rows from block to delta writer + for (const auto& [tablet_id, rows] : rows_for_tablet) { + Streams streams; + RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams)); + RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); + } + + return Status::OK(); +} + +Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t tablet_id, + const Rows& rows, const Streams& streams) { + DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { + WriteRequest req { + .tablet_id = tablet_id, + .txn_id = _txn_id, + .index_id = rows.index_id, + .partition_id = rows.partition_id, + .load_id = _load_id, + .tuple_desc = _output_tuple_desc, + .table_schema_param = _schema.get(), + .is_high_priority = _is_high_priority, + .write_file_cache = _write_file_cache, + }; + for (auto& index : _schema->indexes()) { + if (index->index_id == rows.index_id) { + req.slots = &index->slots; + req.schema_hash = index->schema_hash; + break; + } + } + return DeltaWriterV2::open(&req, streams); + }); + { + SCOPED_TIMER(_wait_mem_limit_timer); + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); + } + SCOPED_TIMER(_write_memtable_timer); + auto st = delta_writer->write(block.get(), rows.row_idxes, false); + return st; +} + +Status VTabletWriterV2::_cancel(Status status) { + LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id << ", due to error: " << status; + if (_delta_writer_for_tablet) { + _delta_writer_for_tablet->cancel(status); + _delta_writer_for_tablet.reset(); + } + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } + return Status::OK(); +} + +Status VTabletWriterV2::close(Status exec_status) { + std::lock_guard close_lock(_close_mutex); + if (_is_closed) { + return _close_status; + } + SCOPED_TIMER(_close_timer); + Status status = exec_status; + if (status.ok()) { + // only if status is ok can we call this _profile->total_time_counter(). + // if status is not ok, this sink may not be prepared, so that _profile is null + SCOPED_TIMER(_profile->total_time_counter()); + + COUNTER_SET(_input_rows_counter, _number_input_rows); + COUNTER_SET(_output_rows_counter, _number_output_rows); + COUNTER_SET(_filtered_rows_counter, + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); + COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); + COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); + + // release streams from the pool first, to prevent memory leak + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } + + { + SCOPED_TIMER(_close_writer_timer); + // close all delta writers if this is the last user + RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile)); + _delta_writer_for_tablet.reset(); + } + + { + // send CLOSE_LOAD to all streams, return ERROR if any + for (const auto& [_, streams] : _streams_for_node) { + RETURN_IF_ERROR(_close_load(streams->streams())); + } + } + + { + SCOPED_TIMER(_close_load_timer); + for (const auto& [_, streams] : _streams_for_node) { + for (const auto& stream : streams->streams()) { + RETURN_IF_ERROR(stream->close_wait()); + } + } + } + + std::vector tablet_commit_infos; + for (const auto& [node_id, streams] : _streams_for_node) { + for (const auto& stream : streams->streams()) { + for (auto tablet_id : stream->success_tablets()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = node_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + } + } + } + _state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(), + std::make_move_iterator(tablet_commit_infos.begin()), + std::make_move_iterator(tablet_commit_infos.end())); + _streams_for_node.clear(); + + // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node + int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + + _state->num_rows_load_unselected(); + _state->set_num_rows_load_total(num_rows_load_total); + _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + _state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); + + LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id; + } else { + RETURN_IF_ERROR(_cancel(status)); + } + + _is_closed = true; + _close_status = status; + return status; +} + +Status VTabletWriterV2::_close_load(const Streams& streams) { + auto node_id = streams[0]->dst_id(); + std::vector tablets_to_commit; + for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { + if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { + tablets_to_commit.push_back(tablet); + } + } + for (const auto& stream : streams) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + } + return Status::OK(); +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h new file mode 100644 index 0000000000..d4ccf7b652 --- /dev/null +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +// IWYU pragma: no_include +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/columns/column.h" +#include "vec/common/allocator.h" +#include "vec/common/hash_table/phmap_fwd_decl.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/writer/async_result_writer.h" + +namespace doris { +class DeltaWriterV2; +class LoadStreamStub; +class ObjectPool; +class RowDescriptor; +class RuntimeState; +class TDataSink; +class TExpr; +class TabletSchema; +class TupleDescriptor; + +namespace stream_load { +class LoadStreams; +} + +namespace vectorized { + +class OlapTableBlockConvertor; +class OlapTabletFinder; +class VTabletWriterV2; +class DeltaWriterV2Map; + +using Streams = std::vector>; + +struct Rows { + int64_t partition_id; + int64_t index_id; + std::vector row_idxes; +}; + +using RowsForTablet = std::unordered_map; + +// Write block data to Olap Table. +// When OlapTableSink::open() called, there will be a consumer thread running in the background. +// When you call VTabletWriterV2::send(), you will be the producer who products pending batches. +// Join the consumer thread in close(). +class VTabletWriterV2 final : public AsyncResultWriter { +public: + // Construct from thrift struct which is generated by FE. + VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + + ~VTabletWriterV2() override; + + Status init_properties(ObjectPool* pool, bool group_commit); + + Status append_block(Block& block) override; + + Status open(RuntimeState* state, RuntimeProfile* profile) override; + + Status close(Status close_status) override; + + Status on_partitions_created(TCreatePartitionResult* result); + +private: + Status _init_row_distribution(); + + Status _init(RuntimeState* state, RuntimeProfile* profile); + + Status _open_streams(int64_t src_id); + + Status _open_streams_to_backend(int64_t dst_id, ::doris::stream_load::LoadStreams& streams); + + Status _incremental_open_streams(const std::vector& partitions); + + void _build_tablet_node_mapping(); + + void _generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet); + + Status _write_memtable(std::shared_ptr block, int64_t tablet_id, + const Rows& rows, const Streams& streams); + + Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, + Streams& streams); + + Status _close_load(const Streams& streams); + + Status _cancel(Status status); + + std::shared_ptr _mem_tracker; + + TDataSink _t_sink; + ObjectPool* _pool; + + // unique load id + PUniqueId _load_id; + int64_t _txn_id = -1; + int _num_replicas = -1; + int _tuple_desc_id = -1; + + // this is tuple descriptor of destination OLAP table + TupleDescriptor* _output_tuple_desc = nullptr; + RowDescriptor* _output_row_desc = nullptr; + + // number of senders used to insert into OlapTable, if we only support single node insert, + // all data from select should collectted and then send to OlapTable. + // To support multiple senders, we maintain a channel for each sender. + int _sender_id = -1; + int _num_senders = -1; + int64_t _backend_id = -1; + int _stream_per_node = -1; + int _total_streams = -1; + int _num_local_sink = -1; + bool _is_high_priority = false; + bool _write_file_cache = false; + + // TODO(zc): think about cache this data + std::shared_ptr _schema; + OlapTableLocationParam* _location = nullptr; + DorisNodesInfo* _nodes_info = nullptr; + + std::unique_ptr _tablet_finder; + + std::unique_ptr _block_convertor; + + // Stats for this + int64_t _send_data_ns = 0; + int64_t _number_input_rows = 0; + int64_t _number_output_rows = 0; + + MonotonicStopWatch _row_distribution_watch; + + RuntimeProfile::Counter* _input_rows_counter = nullptr; + RuntimeProfile::Counter* _output_rows_counter = nullptr; + RuntimeProfile::Counter* _filtered_rows_counter = nullptr; + RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _row_distribution_timer = nullptr; + RuntimeProfile::Counter* _write_memtable_timer = nullptr; + RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; + RuntimeProfile::Counter* _validate_data_timer = nullptr; + RuntimeProfile::Counter* _open_timer = nullptr; + RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _close_writer_timer = nullptr; + RuntimeProfile::Counter* _close_load_timer = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + + std::mutex _close_mutex; + bool _is_closed = false; + // Save the status of close() method + Status _close_status; + + VOlapTablePartitionParam* _vpartition = nullptr; + + RuntimeState* _state = nullptr; // not owned, set when open + RuntimeProfile* _profile = nullptr; // not owned, set when open + bool _group_commit = false; + + std::unordered_set _opened_partitions; + + std::unordered_map> _tablets_for_node; + std::unordered_map> _indexes_from_node; + + std::unordered_map> + _streams_for_node; + + size_t _stream_index = 0; + std::shared_ptr _delta_writer_for_tablet; + + VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; +}; + +} // namespace vectorized +} // namespace doris