From 94a75c27e702b5d9065a126a8892d1cb02e8c35a Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Wed, 13 Mar 2024 11:29:03 +0800 Subject: [PATCH] [feature](pipelineX) support paritition tablet sink shuffle (#31689) --- be/src/io/fs/hdfs_file_system.h | 2 +- .../pipeline/exec/exchange_sink_operator.cpp | 142 ++++++++++++++++-- be/src/pipeline/exec/exchange_sink_operator.h | 34 ++++- be/src/pipeline/task_scheduler.cpp | 9 +- be/src/vec/functions/function_utility.cpp | 12 +- be/src/vec/sink/load_stream_stub.cpp | 22 ++- be/src/vec/sink/load_stream_stub.h | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 44 +++--- .../vec/sink/writer/async_result_writer.cpp | 7 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- .../org/apache/doris/qe/SessionVariable.java | 2 +- .../test_memtable_flush_fault.groovy | 3 +- .../insert_into_table/no_partition.groovy | 11 -- 13 files changed, 226 insertions(+), 66 deletions(-) diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index c4e23f2010..db854cafa9 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -51,7 +51,7 @@ public: _invalid(false) {} ~HdfsFileSystemHandle() { - DCHECK(_ref_cnt == 0); + DCHECK(_ref_cnt == 0) << _ref_cnt; if (hdfs_fs != nullptr) { // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" // even if we create a new one diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1220230a34..333abdbcc0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include "common/status.h" @@ -130,7 +131,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ADD_CHILD_TIMER_WITH_LEVEL(_profile, "WaitForRpcBufferQueue", timer_name, 1); auto& p = _parent->cast(); - + _part_type = p._part_type; std::map fragment_id_to_channel_index; for (int i = 0; i < p._dests.size(); ++i) { const auto& fragment_instance_id = p._dests[i].fragment_instance_id; @@ -156,7 +157,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf local_size++; } } - if (p._part_type == TPartitionType::UNPARTITIONED || p._part_type == TPartitionType::RANDOM) { + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) { std::random_device rd; std::mt19937 g(rd()); shuffle(channels.begin(), channels.end(), g); @@ -174,7 +175,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "ExchangeSinkQueueDependency", true, state->get_query_ctx()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); - if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && + if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), @@ -191,7 +192,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf size_t dep_id = 0; _local_channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); - for (auto channel : channels) { + for (auto* channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); DCHECK(_local_channels_dependency[dep_id] != nullptr); @@ -201,7 +202,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf } } } - if (p._part_type == TPartitionType::HASH_PARTITIONED) { + if (_part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); _partitioner.reset( new vectorized::Crc32HashPartitioner(channels.size())); @@ -209,7 +210,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", fmt::format("Crc32HashPartitioner({})", _partition_count)); - } else if (p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = channel_shared_ptrs.size(); _partitioner.reset(new vectorized::Crc32HashPartitioner( channel_shared_ptrs.size())); @@ -217,6 +218,37 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", fmt::format("Crc32HashPartitioner({})", _partition_count)); + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _partition_count = channels.size(); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + _txn_id = p._tablet_sink_txn_id; + _schema = std::make_shared(); + RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema)); + _vpartition = std::make_unique(_schema, p._tablet_sink_partition); + RETURN_IF_ERROR(_vpartition->init()); + auto find_tablet_mode = vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; + _tablet_finder = + std::make_unique(_vpartition.get(), find_tablet_mode); + _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); + _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + //_block_convertor no need init_autoinc_info here + _block_convertor = + std::make_unique(_tablet_sink_tuple_desc); + _location = p._pool->add(new OlapTableLocationParam(p._tablet_sink_location)); + _row_distribution.init( + {.state = _state, + .block_convertor = _block_convertor.get(), + .tablet_finder = _tablet_finder.get(), + .vpartition = _vpartition.get(), + .add_partition_request_timer = _add_partition_request_timer, + .txn_id = _txn_id, + .pool = p._pool.get(), + .location = _location, + .vec_output_expr_ctxs = &_fake_expr_ctxs, + .schema = _schema, + .caller = (void*)this, + .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); } _finish_dependency->block(); @@ -226,10 +258,31 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf Status ExchangeSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - auto& p = _parent->cast(); - if (p._part_type == TPartitionType::HASH_PARTITIONED || - p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + if (_part_type == TPartitionType::HASH_PARTITIONED || + _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { RETURN_IF_ERROR(_partitioner->open(state)); + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); + } + return Status::OK(); +} + +Status ExchangeSinkLocalState::_send_new_partition_batch() { + if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + vectorized::Block tmp_block = + _row_distribution._batching_block->to_block(); // Borrow out, for lval ref + auto& p = _parent->cast(); + // these order is only. + // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. + // 2. deal batched block + // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. + _row_distribution.clear_batching_stats(); + RETURN_IF_ERROR(p.sink(_state, &tmp_block, false)); + // Recovery back + _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; } return Status::OK(); } @@ -255,14 +308,21 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _part_type(sink.output_partition.type), _dests(destinations), _dest_node_id(sink.dest_node_id), - _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) { + _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), + _tablet_sink_schema(sink.tablet_sink_schema), + _tablet_sink_partition(sink.tablet_sink_partition), + _tablet_sink_location(sink.tablet_sink_location), + _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), + _tablet_sink_txn_id(sink.tablet_sink_txn_id) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || + sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); _name = "ExchangeSinkOperatorX"; + _pool = std::make_shared(); } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -300,7 +360,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SCOPED_TIMER(local_state.exec_time_counter()); local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); bool all_receiver_eof = true; - for (auto channel : local_state.channels) { + for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { all_receiver_eof = false; break; @@ -317,7 +377,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; - for (auto channel : local_state.channels) { + for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { status = channel->send_local_block(block); HANDLE_CHANNEL_STATUS(state, channel, status); @@ -342,7 +402,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { block_holder->get_block()->Clear(); } - for (auto channel : local_state.channels) { + for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { @@ -398,6 +458,41 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block state, local_state.channel_shared_ptrs, local_state._partition_count, (uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, eos)); } + } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + // check out of limit + RETURN_IF_ERROR(local_state._send_new_partition_batch()); + std::shared_ptr convert_block = std::make_shared(); + const auto& num_channels = local_state._partition_count; + std::vector> channel2rows; + channel2rows.resize(num_channels); + auto input_rows = block->rows(); + + if (input_rows > 0) { + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + local_state._number_input_rows += input_rows; + + RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( + *block, convert_block, filtered_rows, has_filtered_rows, + local_state._row_part_tablet_ids, local_state._number_input_rows)); + + const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } + } + + if (eos) { + local_state._row_distribution._deal_batched = true; + RETURN_IF_ERROR(local_state._send_new_partition_batch()); + } + RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, + channel2rows, convert_block.get(), eos)); + } else { // Range partition // 1. calculate range @@ -463,13 +558,22 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch int num_channels, const HashValueType* __restrict channel_ids, int rows, vectorized::Block* block, bool eos) { - std::vector channel2rows[num_channels]; - + std::vector> channel2rows; + channel2rows.resize(num_channels); for (uint32_t i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); } - Status status; + RETURN_IF_ERROR( + channel_add_rows_with_idx(state, channels, num_channels, channel2rows, block, eos)); + return Status::OK(); +} + +template +Status ExchangeSinkOperatorX::channel_add_rows_with_idx( + RuntimeState* state, Channels& channels, int num_channels, + std::vector>& channel2rows, vectorized::Block* block, bool eos) { + Status status = Status::OK(); for (int i = 0; i < num_channels; ++i) { if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { status = channels[i]->add_rows(block, channel2rows[i], false); @@ -502,6 +606,12 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); + _state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); + } SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 4a2f1a3dfd..9384adbe5f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -122,7 +122,10 @@ public: std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; - + static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { + return Status::OK(); + } + Status _send_new_partition_batch(); std::vector*> channels; std::vector>> channel_shared_ptrs; @@ -190,6 +193,22 @@ private: int _partition_count; std::shared_ptr _finish_dependency; + + // for shuffle data by partition and tablet + int64_t _txn_id = -1; + vectorized::VExprContextSPtrs _fake_expr_ctxs; + std::unique_ptr _vpartition = nullptr; + std::unique_ptr _tablet_finder = nullptr; + std::shared_ptr _schema = nullptr; + std::unique_ptr _block_convertor = nullptr; + TupleDescriptor* _tablet_sink_tuple_desc = nullptr; + RowDescriptor* _tablet_sink_row_desc = nullptr; + OlapTableLocationParam* _location = nullptr; + vectorized::VRowDistribution _row_distribution; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + std::vector _row_part_tablet_ids; + int64_t _number_input_rows = 0; + TPartitionType::type _part_type; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { @@ -219,6 +238,11 @@ private: Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, const HashValueType* channel_ids, int rows, vectorized::Block* block, bool eos); + + template + Status channel_add_rows_with_idx(RuntimeState* state, Channels& channels, int num_channels, + std::vector>& channel2rows, + vectorized::Block* block, bool eos); RuntimeState* _state = nullptr; const std::vector& _texprs; @@ -242,6 +266,14 @@ private: bool _transfer_large_data_by_brpc = false; segment_v2::CompressionTypePB _compression_type; + + // for tablet sink shuffle + const TOlapTableSchemaParam& _tablet_sink_schema; + const TOlapTablePartitionParam& _tablet_sink_partition; + const TOlapTableLocationParam& _tablet_sink_location; + const TTupleId& _tablet_sink_tuple_id; + int64_t _tablet_sink_txn_id = -1; + std::shared_ptr _pool; }; } // namespace pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e30a5b2d26..3e03f3636f 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -237,8 +237,13 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status // for pending finish now. So that could call close directly. Status status = task->close(exec_status); if (!status.ok() && state != PipelineTaskState::CANCELED) { - task->query_context()->cancel(true, status.to_string(), - Status::Cancelled(status.to_string())); + if (task->is_pipelineX()) { //should call fragment context cancel, in it will call query context cancel + task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + std::string(status.msg())); + } else { + task->query_context()->cancel(true, status.to_string(), + Status::Cancelled(status.to_string())); + } state = PipelineTaskState::CANCELED; } task->set_state(state); diff --git a/be/src/vec/functions/function_utility.cpp b/be/src/vec/functions/function_utility.cpp index 454e5afb8c..ea2cfe6615 100644 --- a/be/src/vec/functions/function_utility.cpp +++ b/be/src/vec/functions/function_utility.cpp @@ -70,13 +70,23 @@ public: bool use_default_implementation_for_nulls() const override { return false; } + bool use_default_implementation_for_constants() const override { return false; } + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count) const override { ColumnPtr& argument_column = block.get_by_position(arguments[0]).column; auto res_column = ColumnUInt8::create(); - if (auto* nullable_column = check_and_get_column(*argument_column)) { + if (is_column_const(*argument_column)) { + Int64 seconds = argument_column->get_int(0); + for (int i = 0; i < input_rows_count; i++) { + std::this_thread::sleep_for(std::chrono::seconds(seconds)); + res_column->insert(1); + } + + block.replace_by_position(result, std::move(res_column)); + } else if (auto* nullable_column = check_and_get_column(*argument_column)) { auto null_map_column = ColumnUInt8::create(); auto nested_column = nullable_column->get_nested_column_ptr(); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index bb12f4fdf0..da0276fdd0 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -20,6 +20,7 @@ #include #include "olap/rowset/rowset_writer.h" +#include "runtime/query_context.h" #include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/network_util.h" @@ -306,7 +307,7 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i return Status::OK(); } -Status LoadStreamStub::close_wait(int64_t timeout_ms) { +Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", { while (true) { }; @@ -321,11 +322,20 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) { DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock lock(_close_mutex); if (!_is_closed.load()) { - int ret = _close_cv.wait_for(lock, timeout_ms * 1000); - if (ret != 0) { - return Status::InternalError( - "stream close_wait timeout, error={}, load_id={}, dst_id={}, stream_id={}", ret, - print_id(_load_id), _dst_id, _stream_id); + auto timeout_sec = timeout_ms / 1000; + while (!state->get_query_ctx()->is_cancelled() && timeout_sec > 0) { + //the query maybe cancel, so need check after wait 1s + timeout_sec = timeout_sec - 1; + int ret = _close_cv.wait_for(lock, 1000000); + if (ret == 0) { + break; + } + if (timeout_sec <= 0) { + return Status::InternalError( + "stream close_wait timeout, timeout_ms={}, load_id={}, dst_id={}, " + "stream_id={}", + timeout_ms, print_id(_load_id), _dst_id, _stream_id); + } } } RETURN_IF_ERROR(_check_cancel()); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 3bc6331dc0..f20b0e6ea3 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -150,7 +150,7 @@ public: // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD - Status close_wait(int64_t timeout_ms = 0); + Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled void cancel(Status reason); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b6e4d0b962..e34f739156 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -687,35 +687,37 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { // check out of limit RETURN_IF_ERROR(_send_new_partition_batch()); - if (UNLIKELY(block->rows() == 0)) { - return Status::OK(); - } - std::shared_ptr convert_block; - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - _number_input_rows += block->rows(); - RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - *block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, - _number_input_rows)); - - const auto& row_ids = _row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; + std::shared_ptr convert_block = std::make_shared(); const auto& num_channels = _channels.size(); std::vector> channel2rows; channel2rows.resize(num_channels); - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id = tablet_ids[idx]; - channel2rows[tablet_id % num_channels].emplace_back(row); - } + auto input_rows = block->rows(); - RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels, num_channels, channel2rows, - convert_block.get(), - _enable_pipeline_exec ? eos : false)); + if (input_rows > 0) { + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + _number_input_rows += input_rows; + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, + _number_input_rows)); + + const auto& row_ids = _row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } + } if (eos) { _row_distribution._deal_batched = true; RETURN_IF_ERROR(_send_new_partition_batch()); } + RETURN_IF_ERROR(channel_add_rows_with_idx(state, _channels, num_channels, channel2rows, + convert_block.get(), + _enable_pipeline_exec ? eos : false)); + } else { // Range partition // 1. calculate range diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index fc431b6e86..2658f371a6 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -163,6 +163,10 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi if (_writer_status.ok() && _eos) { _writer_status = finish(state); } + // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout + if (_finish_dependency) { + _finish_dependency->set_ready(); + } Status close_st = close(_writer_status); // If it is already failed before, then not update the write status so that we could get @@ -171,9 +175,6 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi _writer_status = close_st; } _writer_thread_closed = true; - if (_finish_dependency) { - _finish_dependency->set_ready(); - } } Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_block, diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 6499ae76d3..68506ca161 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -560,7 +560,7 @@ Status VTabletWriterV2::close(Status exec_status) { << print_id(_load_id); return Status::TimedOut("load timed out before close waiting"); } - RETURN_IF_ERROR(stream->close_wait(remain_ms)); + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 202dcfa167..705e1ea656 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -860,7 +860,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_NEREIDS_DML_WITH_PIPELINE, needForward = true, varType = VariableAnnotation.EXPERIMENTAL, description = {"在新优化器中,使用pipeline引擎执行DML", "execute DML with pipeline engine in Nereids"}) - public boolean enableNereidsDmlWithPipeline = false; + public boolean enableNereidsDmlWithPipeline = true; @VariableMgr.VarAttr(name = ENABLE_STRICT_CONSISTENCY_DML, needForward = true) public boolean enableStrictConsistencyDml = false; diff --git a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy index a1a7be381e..5b954f1171 100644 --- a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy +++ b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy @@ -56,7 +56,8 @@ suite("test_memtable_flush_fault", "nonConcurrent") { sql insert_sql sql "sync" } catch (Exception e){ - assertTrue(e.getMessage().contains("[IO_ERROR]dbug_be_memtable_submit_flush_error")) + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error")) } finally { GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error") } diff --git a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy index c68ac24c74..f53627dc70 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy @@ -26,17 +26,6 @@ suite('nereids_insert_no_partition') { sql 'set enable_nereids_dml=true' sql 'set enable_strict_consistency_dml=true' - explain { - // TODO: test turn off pipeline when dml, remove it if pipeline sink is ok - sql ''' - insert into uni_light_sc_mow_not_null_nop_t with t as( - select * except(kaint, kmintint, kjson) from src where id is not null) - select * from t left semi join t t2 on t.id = t2.id; - ''' - - notContains("MultiCastDataSinks") - } - sql '''insert into agg_nop_t select * except(kaint, kmintint, kjson) from src''' sql 'sync'