diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 78079a0ddf..b610e1b9ed 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -28,9 +28,9 @@ namespace doris::pipeline { PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { - _finish_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx()); + _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_SPILL_DEPENDENCY", + true, state->get_query_ctx()); } Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { @@ -248,21 +248,31 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { }}; auto execution_context = state->get_task_execution_context(); - _shared_state_holder = _shared_state->shared_from_this(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, &parent, state, query_id, execution_context, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { + [this, &parent, state, query_id, mem_tracker, shared_state_holder, execution_context, + submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::Cancelled("Cancelled"); } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); SCOPED_TIMER(Base::_spill_timer); Defer defer {[&]() { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 1755cd866f..016869374b 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -261,11 +261,6 @@ public: bool _eos = false; std::shared_ptr _finish_dependency; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - // temp structures during spilling vectorized::MutableColumns key_columns_; vectorized::MutableColumns value_columns_; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index ff4795f207..43c805b955 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -204,17 +204,28 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime _dependency->Dependency::block(); auto execution_context = state->get_task_execution_context(); - _shared_state_holder = _shared_state->shared_from_this(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); RETURN_IF_ERROR( ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, state, query_id, execution_context, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { + [this, state, query_id, mem_tracker, shared_state_holder, execution_context, + submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; // FIXME: return status is meaningless? @@ -222,7 +233,6 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index eff1e7179c..488d2defd0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -61,11 +61,6 @@ protected: bool _current_partition_eos = true; bool _is_merging = false; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - std::unique_ptr _internal_runtime_profile; RuntimeProfile::Counter* _get_results_timer = nullptr; RuntimeProfile::Counter* _serialize_result_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 62339bb851..f617ab21b1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -155,7 +155,6 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state, uint32_t partition_index) { - _shared_state_holder = _shared_state->shared_from_this(); auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks; auto& mutable_block = partitioned_build_blocks[partition_index]; if (!mutable_block || @@ -178,46 +177,58 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto execution_context = state->get_task_execution_context(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; + MonotonicStopWatch submit_timer; submit_timer.start(); - return spill_io_pool->submit_func( - [execution_context, state, &build_spilling_stream, &mutable_block, submit_timer, this] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return; - } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_build_timer); - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - if (_spill_status_ok) { - auto build_block = mutable_block->to_block(); - DCHECK_EQ(mutable_block->rows(), 0); - auto st = build_spilling_stream->spill_block(state, build_block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - } else { - COUNTER_UPDATE(_spill_build_rows, build_block.rows()); - COUNTER_UPDATE(_spill_build_blocks, 1); - } - } - --_spilling_task_count; + return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder, + execution_context, state, &build_spilling_stream, + &mutable_block, submit_timer, this] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_build_timer); + if (_spill_status_ok) { + auto build_block = mutable_block->to_block(); + DCHECK_EQ(mutable_block->rows(), 0); + auto st = build_spilling_stream->spill_block(state, build_block, false); + if (!st.ok()) { + std::unique_lock lock(_spill_lock); + _spill_status_ok = false; + _spill_status = std::move(st); + } else { + COUNTER_UPDATE(_spill_build_rows, build_block.rows()); + COUNTER_UPDATE(_spill_build_blocks, 1); + } + } + --_spilling_task_count; - if (_spilling_task_count == 0) { - LOG(INFO) << "hash probe " << _parent->id() - << " revoke memory spill_build_block finish"; - std::unique_lock lock(_spill_lock); - _dependency->set_ready(); - } - }); + if (_spilling_task_count == 0) { + LOG(INFO) << "hash probe " << _parent->id() + << " revoke memory spill_build_block finish"; + std::unique_lock lock(_spill_lock); + _dependency->set_ready(); + } + }); } Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state, uint32_t partition_index) { - _shared_state_holder = _shared_state->shared_from_this(); auto& spilling_stream = _probe_spilling_streams[partition_index]; if (!spilling_stream) { RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( @@ -242,45 +253,60 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat if (!blocks.empty()) { auto execution_context = state->get_task_execution_context(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; + MonotonicStopWatch submit_timer; submit_timer.start(); - return spill_io_pool->submit_func( - [execution_context, state, &blocks, spilling_stream, submit_timer, this] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return; - } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_probe_timer); - SCOPED_ATTACH_TASK(state); - COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); - while (!blocks.empty() && !state->is_cancelled()) { - auto block = std::move(blocks.back()); - blocks.pop_back(); - if (_spill_status_ok) { - auto st = spilling_stream->spill_block(state, block, false); - if (!st.ok()) { - std::unique_lock lock(_spill_lock); - _spill_status_ok = false; - _spill_status = std::move(st); - break; - } - COUNTER_UPDATE(_spill_probe_rows, block.rows()); - } else { - break; - } - } - - --_spilling_task_count; - - if (_spilling_task_count == 0) { - LOG(INFO) << "hash probe " << _parent->id() - << " revoke memory spill_probe_blocks finish"; + return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder, + execution_context, state, &blocks, spilling_stream, + submit_timer, this] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return; + } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_probe_timer); + COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); + while (!blocks.empty() && !state->is_cancelled()) { + auto block = std::move(blocks.back()); + blocks.pop_back(); + if (_spill_status_ok) { + auto st = spilling_stream->spill_block(state, block, false); + if (!st.ok()) { std::unique_lock lock(_spill_lock); - _dependency->set_ready(); + _spill_status_ok = false; + _spill_status = std::move(st); + break; } - }); + COUNTER_UPDATE(_spill_probe_rows, block.rows()); + } else { + break; + } + } + + --_spilling_task_count; + + if (_spilling_task_count == 0) { + LOG(INFO) << "hash probe " << _parent->id() + << " revoke memory spill_probe_blocks finish"; + std::unique_lock lock(_spill_lock); + _dependency->set_ready(); + } + }); } else { --_spilling_task_count; if (_spilling_task_count == 0) { @@ -313,7 +339,6 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { - _shared_state_holder = _shared_state->shared_from_this(); auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -328,23 +353,35 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } auto execution_context = state->get_task_execution_context(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context, - submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock || state->is_cancelled()) { - LOG(INFO) << "execution_context released, maybe query was canceled."; + auto read_func = [this, query_id, mem_tracker, state, &spilled_stream, &mutable_block, + shared_state_holder, execution_context, submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; return; } - SCOPED_ATTACH_TASK(state); _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_build_timer); Defer defer([this] { --_spilling_task_count; }); - (void)state; // avoid ut compile error DCHECK_EQ(_spill_status_ok.load(), true); bool eos = false; @@ -369,10 +406,10 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti break; } - DCHECK_EQ(mutable_block->columns(), block.columns()); if (mutable_block->empty()) { *mutable_block = std::move(block); } else { + DCHECK_EQ(mutable_block->columns(), block.columns()); st = mutable_block->merge(std::move(block)); if (!st.ok()) { std::unique_lock lock(_spill_lock); @@ -404,7 +441,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { - _shared_state_holder = _shared_state->shared_from_this(); auto& spilled_stream = _probe_spilling_streams[partition_index]; has_data = false; if (!spilled_stream) { @@ -415,22 +451,32 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti /// TODO: maybe recovery more blocks each time. auto execution_context = state->get_task_execution_context(); + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, execution_context, state, &spilled_stream, &blocks, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; + auto read_func = [this, query_id, mem_tracker, shared_state_holder, execution_context, + &spilled_stream, &blocks, submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; return; } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_probe_timer); Defer defer([this] { --_spilling_task_count; }); - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); DCHECK_EQ(_spill_status_ok.load(), true); vectorized::Block block; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index d650dd1590..3b942d1575 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -82,11 +82,6 @@ private: std::vector> _partitioned_blocks; std::map> _probe_blocks; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - std::vector _probe_spilling_streams; std::unique_ptr _partitioner; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 4f6bfe5f8c..8924ee6f77 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -102,6 +102,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto& p = _parent->cast(); _shared_state->inner_shared_state->hash_table_variants.reset(); auto row_desc = p._child_x->row_desc(); + const auto num_slots = row_desc.num_slots(); std::vector build_blocks; auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); if (inner_sink_state_) { @@ -116,11 +117,18 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } auto execution_context = state->get_task_execution_context(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + _dependency->block(); auto query_id = state->query_id(); auto mem_tracker = state->get_query_ctx()->query_mem_tracker; - auto spill_func = [execution_context, build_blocks = std::move(build_blocks), state, query_id, - mem_tracker, this]() mutable { + auto spill_func = [shared_state_holder, execution_context, + build_blocks = std::move(build_blocks), state, query_id, mem_tracker, + num_slots, this]() mutable { SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); Defer defer {[&]() { // need to reset build_block here, or else build_block will be destructed @@ -128,8 +136,12 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta build_blocks.clear(); }}; - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock || state->is_cancelled()) { + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { LOG(INFO) << "execution_context released, maybe query was canceled."; return; } @@ -163,6 +175,11 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta if (UNLIKELY(build_block.empty())) { continue; } + + if (build_block.columns() > num_slots) { + build_block.erase(num_slots); + } + { SCOPED_TIMER(_partition_timer); (void)_partitioner->do_partitioning(state, &build_block, _mem_tracker.get()); @@ -213,13 +230,24 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); - _shared_state_holder = _shared_state->shared_from_this(); if (!_shared_state->need_to_spill) { + profile()->add_info_string("Spilled", "true"); _shared_state->need_to_spill = true; return _revoke_unpartitioned_block(state); } _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); + + auto execution_context = state->get_task_execution_context(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = + _shared_state->shared_from_this(); + + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; + for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; @@ -235,24 +263,26 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); - auto execution_context = state->get_task_execution_context(); MonotonicStopWatch submit_timer; submit_timer.start(); - auto st = spill_io_pool->submit_func( - [this, execution_context, state, spilling_stream, i, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "execution_context released, maybe query was cancelled."; - return; - } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(_spill_build_timer); - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - _spill_to_disk(i, spilling_stream); - }); + auto st = spill_io_pool->submit_func([this, query_id, mem_tracker, shared_state_holder, + execution_context, spilling_stream, i, submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { + LOG(INFO) << "execution_context released, maybe query was cancelled."; + return; + } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_TIMER(_spill_build_timer); + _spill_to_disk(i, spilling_stream); + }); if (!st.ok()) { --_spilling_streams_count; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 60cb2e8f60..82fe5eacd9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -72,11 +72,6 @@ protected: Status _spill_status; std::mutex _spill_status_lock; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - std::unique_ptr _partitioner; std::unique_ptr _internal_runtime_profile; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 2fdd78b40b..c6a943c59b 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -23,9 +23,9 @@ namespace doris::pipeline { SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { - _finish_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx()); + _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), + parent->get_name() + "_SPILL_DEPENDENCY", + true, state->get_query_ctx()); } Status SpillSortSinkLocalState::init(doris::RuntimeState* state, @@ -226,23 +226,34 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { } auto execution_context = state->get_task_execution_context(); - _shared_state_holder = _shared_state->shared_from_this(); + + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( - [this, state, query_id, &parent, execution_context, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { + [this, state, query_id, mem_tracker, shared_state_holder, &parent, execution_context, + submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::OK(); } _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { if (!_shared_state->sink_status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index d552d67570..9382edd693 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -47,11 +47,6 @@ private: friend class SpillSortSinkOperatorX; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - std::unique_ptr _runtime_state; std::unique_ptr _internal_runtime_profile; RuntimeProfile::Counter* _partial_sort_timer = nullptr; diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index a086fcc43e..fe6b4ee3ef 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -88,15 +88,25 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat }}; auto execution_context = state->get_task_execution_context(); - _shared_state_holder = _shared_state->shared_from_this(); + /// Resources in shared state will be released when the operator is closed, + /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. + /// So, we need hold the pointer of shared state. + std::weak_ptr shared_state_holder = _shared_state->shared_from_this(); auto query_id = state->query_id(); + auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); - auto spill_func = [this, state, query_id, &parent, execution_context, submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { + auto spill_func = [this, state, query_id, mem_tracker, &parent, shared_state_holder, + execution_context, submit_timer] { + SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); + std::shared_ptr execution_context_lock; + auto shared_state_sptr = shared_state_holder.lock(); + if (shared_state_sptr) { + execution_context_lock = execution_context.lock(); + } + if (!shared_state_sptr || !execution_context_lock) { LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::OK(); @@ -104,7 +114,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_merge_sort_timer); - SCOPED_ATTACH_TASK(state); Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index a20eb57889..3a0867afa2 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -57,11 +57,6 @@ protected: bool _opened = false; Status _status; - /// Resources in shared state will be released when the operator is closed, - /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. - /// So, we need hold the pointer of shared state. - std::shared_ptr _shared_state_holder; - int64_t _external_sort_bytes_threshold = 134217728; // 128M std::vector _current_merging_streams; std::unique_ptr _merger;