[improvement](spill) improve cancel (#34451)
* [improvement](spill) improve cancel * fix
This commit is contained in:
@ -28,9 +28,9 @@ namespace doris::pipeline {
|
||||
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
|
||||
RuntimeState* state)
|
||||
: Base(parent, state) {
|
||||
_finish_dependency = std::make_shared<FinishDependency>(
|
||||
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
|
||||
state->get_query_ctx());
|
||||
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
|
||||
parent->get_name() + "_SPILL_DEPENDENCY",
|
||||
true, state->get_query_ctx());
|
||||
}
|
||||
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
|
||||
doris::pipeline::LocalSinkStateInfo& info) {
|
||||
@ -248,21 +248,31 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
}};
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
|
||||
[this, &parent, state, query_id, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
[this, &parent, state, query_id, mem_tracker, shared_state_holder, execution_context,
|
||||
submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
SCOPED_TIMER(Base::_spill_timer);
|
||||
Defer defer {[&]() {
|
||||
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
|
||||
|
||||
@ -261,11 +261,6 @@ public:
|
||||
bool _eos = false;
|
||||
std::shared_ptr<Dependency> _finish_dependency;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
|
||||
|
||||
// temp structures during spilling
|
||||
vectorized::MutableColumns key_columns_;
|
||||
vectorized::MutableColumns value_columns_;
|
||||
|
||||
@ -204,17 +204,28 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
_dependency->Dependency::block();
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedAggSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
RETURN_IF_ERROR(
|
||||
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
|
||||
[this, state, query_id, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
[this, state, query_id, mem_tracker, shared_state_holder, execution_context,
|
||||
submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
// FIXME: return status is meaningless?
|
||||
@ -222,7 +233,6 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_status.ok() || state->is_cancelled()) {
|
||||
if (!_status.ok()) {
|
||||
|
||||
@ -61,11 +61,6 @@ protected:
|
||||
bool _current_partition_eos = true;
|
||||
bool _is_merging = false;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<PartitionedAggSharedState> _shared_state_holder;
|
||||
|
||||
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
|
||||
RuntimeProfile::Counter* _get_results_timer = nullptr;
|
||||
RuntimeProfile::Counter* _serialize_result_timer = nullptr;
|
||||
|
||||
@ -155,7 +155,6 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
|
||||
|
||||
Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state,
|
||||
uint32_t partition_index) {
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
|
||||
auto& mutable_block = partitioned_build_blocks[partition_index];
|
||||
if (!mutable_block ||
|
||||
@ -178,46 +177,58 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
|
||||
|
||||
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
return spill_io_pool->submit_func(
|
||||
[execution_context, state, &build_spilling_stream, &mutable_block, submit_timer, this] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
if (_spill_status_ok) {
|
||||
auto build_block = mutable_block->to_block();
|
||||
DCHECK_EQ(mutable_block->rows(), 0);
|
||||
auto st = build_spilling_stream->spill_block(state, build_block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_spill_status_ok = false;
|
||||
_spill_status = std::move(st);
|
||||
} else {
|
||||
COUNTER_UPDATE(_spill_build_rows, build_block.rows());
|
||||
COUNTER_UPDATE(_spill_build_blocks, 1);
|
||||
}
|
||||
}
|
||||
--_spilling_task_count;
|
||||
return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, state, &build_spilling_stream,
|
||||
&mutable_block, submit_timer, this] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
if (_spill_status_ok) {
|
||||
auto build_block = mutable_block->to_block();
|
||||
DCHECK_EQ(mutable_block->rows(), 0);
|
||||
auto st = build_spilling_stream->spill_block(state, build_block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_spill_status_ok = false;
|
||||
_spill_status = std::move(st);
|
||||
} else {
|
||||
COUNTER_UPDATE(_spill_build_rows, build_block.rows());
|
||||
COUNTER_UPDATE(_spill_build_blocks, 1);
|
||||
}
|
||||
}
|
||||
--_spilling_task_count;
|
||||
|
||||
if (_spilling_task_count == 0) {
|
||||
LOG(INFO) << "hash probe " << _parent->id()
|
||||
<< " revoke memory spill_build_block finish";
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_dependency->set_ready();
|
||||
}
|
||||
});
|
||||
if (_spilling_task_count == 0) {
|
||||
LOG(INFO) << "hash probe " << _parent->id()
|
||||
<< " revoke memory spill_build_block finish";
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_dependency->set_ready();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state,
|
||||
uint32_t partition_index) {
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto& spilling_stream = _probe_spilling_streams[partition_index];
|
||||
if (!spilling_stream) {
|
||||
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
|
||||
@ -242,45 +253,60 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
|
||||
|
||||
if (!blocks.empty()) {
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
return spill_io_pool->submit_func(
|
||||
[execution_context, state, &blocks, spilling_stream, submit_timer, this] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_probe_timer);
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
|
||||
while (!blocks.empty() && !state->is_cancelled()) {
|
||||
auto block = std::move(blocks.back());
|
||||
blocks.pop_back();
|
||||
if (_spill_status_ok) {
|
||||
auto st = spilling_stream->spill_block(state, block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_spill_status_ok = false;
|
||||
_spill_status = std::move(st);
|
||||
break;
|
||||
}
|
||||
COUNTER_UPDATE(_spill_probe_rows, block.rows());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
--_spilling_task_count;
|
||||
|
||||
if (_spilling_task_count == 0) {
|
||||
LOG(INFO) << "hash probe " << _parent->id()
|
||||
<< " revoke memory spill_probe_blocks finish";
|
||||
return spill_io_pool->submit_func([query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, state, &blocks, spilling_stream,
|
||||
submit_timer, this] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_probe_timer);
|
||||
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
|
||||
while (!blocks.empty() && !state->is_cancelled()) {
|
||||
auto block = std::move(blocks.back());
|
||||
blocks.pop_back();
|
||||
if (_spill_status_ok) {
|
||||
auto st = spilling_stream->spill_block(state, block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_dependency->set_ready();
|
||||
_spill_status_ok = false;
|
||||
_spill_status = std::move(st);
|
||||
break;
|
||||
}
|
||||
});
|
||||
COUNTER_UPDATE(_spill_probe_rows, block.rows());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
--_spilling_task_count;
|
||||
|
||||
if (_spilling_task_count == 0) {
|
||||
LOG(INFO) << "hash probe " << _parent->id()
|
||||
<< " revoke memory spill_probe_blocks finish";
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_dependency->set_ready();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
--_spilling_task_count;
|
||||
if (_spilling_task_count == 0) {
|
||||
@ -313,7 +339,6 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
|
||||
Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState* state,
|
||||
uint32_t partition_index,
|
||||
bool& has_data) {
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
|
||||
has_data = false;
|
||||
if (!spilled_stream) {
|
||||
@ -328,23 +353,35 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
|
||||
}
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context,
|
||||
submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock || state->is_cancelled()) {
|
||||
LOG(INFO) << "execution_context released, maybe query was canceled.";
|
||||
auto read_func = [this, query_id, mem_tracker, state, &spilled_stream, &mutable_block,
|
||||
shared_state_holder, execution_context, submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_recovery_build_timer);
|
||||
Defer defer([this] { --_spilling_task_count; });
|
||||
(void)state; // avoid ut compile error
|
||||
DCHECK_EQ(_spill_status_ok.load(), true);
|
||||
|
||||
bool eos = false;
|
||||
@ -369,10 +406,10 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
|
||||
break;
|
||||
}
|
||||
|
||||
DCHECK_EQ(mutable_block->columns(), block.columns());
|
||||
if (mutable_block->empty()) {
|
||||
*mutable_block = std::move(block);
|
||||
} else {
|
||||
DCHECK_EQ(mutable_block->columns(), block.columns());
|
||||
st = mutable_block->merge(std::move(block));
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
@ -404,7 +441,6 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
|
||||
Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state,
|
||||
uint32_t partition_index,
|
||||
bool& has_data) {
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto& spilled_stream = _probe_spilling_streams[partition_index];
|
||||
has_data = false;
|
||||
if (!spilled_stream) {
|
||||
@ -415,22 +451,32 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
|
||||
|
||||
/// TODO: maybe recovery more blocks each time.
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto read_func = [this, execution_context, state, &spilled_stream, &blocks, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
auto read_func = [this, query_id, mem_tracker, shared_state_holder, execution_context,
|
||||
&spilled_stream, &blocks, submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_recovery_probe_timer);
|
||||
Defer defer([this] { --_spilling_task_count; });
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
DCHECK_EQ(_spill_status_ok.load(), true);
|
||||
|
||||
vectorized::Block block;
|
||||
|
||||
@ -82,11 +82,6 @@ private:
|
||||
std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
|
||||
std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
|
||||
|
||||
std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;
|
||||
|
||||
std::unique_ptr<PartitionerType> _partitioner;
|
||||
|
||||
@ -102,6 +102,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
|
||||
_shared_state->inner_shared_state->hash_table_variants.reset();
|
||||
auto row_desc = p._child_x->row_desc();
|
||||
const auto num_slots = row_desc.num_slots();
|
||||
std::vector<vectorized::Block> build_blocks;
|
||||
auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state();
|
||||
if (inner_sink_state_) {
|
||||
@ -116,11 +117,18 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
}
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
|
||||
_dependency->block();
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
auto spill_func = [execution_context, build_blocks = std::move(build_blocks), state, query_id,
|
||||
mem_tracker, this]() mutable {
|
||||
auto spill_func = [shared_state_holder, execution_context,
|
||||
build_blocks = std::move(build_blocks), state, query_id, mem_tracker,
|
||||
num_slots, this]() mutable {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
Defer defer {[&]() {
|
||||
// need to reset build_block here, or else build_block will be destructed
|
||||
@ -128,8 +136,12 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
build_blocks.clear();
|
||||
}};
|
||||
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock || state->is_cancelled()) {
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) {
|
||||
LOG(INFO) << "execution_context released, maybe query was canceled.";
|
||||
return;
|
||||
}
|
||||
@ -163,6 +175,11 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
if (UNLIKELY(build_block.empty())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (build_block.columns() > num_slots) {
|
||||
build_block.erase(num_slots);
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TIMER(_partition_timer);
|
||||
(void)_partitioner->do_partitioning(state, &build_block, _mem_tracker.get());
|
||||
@ -213,13 +230,24 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
<< ", eos: " << _child_eos;
|
||||
DCHECK_EQ(_spilling_streams_count, 0);
|
||||
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
if (!_shared_state->need_to_spill) {
|
||||
profile()->add_info_string("Spilled", "true");
|
||||
_shared_state->need_to_spill = true;
|
||||
return _revoke_unpartitioned_block(state);
|
||||
}
|
||||
|
||||
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
|
||||
_shared_state->shared_from_this();
|
||||
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) {
|
||||
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
|
||||
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
|
||||
@ -235,24 +263,26 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
auto* spill_io_pool =
|
||||
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
|
||||
DCHECK(spill_io_pool != nullptr);
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto st = spill_io_pool->submit_func(
|
||||
[this, execution_context, state, spilling_stream, i, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
_spill_to_disk(i, spilling_stream);
|
||||
});
|
||||
auto st = spill_io_pool->submit_func([this, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, spilling_stream, i, submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
_spill_to_disk(i, spilling_stream);
|
||||
});
|
||||
|
||||
if (!st.ok()) {
|
||||
--_spilling_streams_count;
|
||||
|
||||
@ -72,11 +72,6 @@ protected:
|
||||
Status _spill_status;
|
||||
std::mutex _spill_status_lock;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<PartitionedHashJoinSharedState> _shared_state_holder;
|
||||
|
||||
std::unique_ptr<PartitionerType> _partitioner;
|
||||
|
||||
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
|
||||
|
||||
@ -23,9 +23,9 @@
|
||||
namespace doris::pipeline {
|
||||
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
|
||||
: Base(parent, state) {
|
||||
_finish_dependency = std::make_shared<FinishDependency>(
|
||||
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
|
||||
state->get_query_ctx());
|
||||
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
|
||||
parent->get_name() + "_SPILL_DEPENDENCY",
|
||||
true, state->get_query_ctx());
|
||||
}
|
||||
|
||||
Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
|
||||
@ -226,23 +226,34 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
}
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<SpillSortSharedState> shared_state_holder = _shared_state->shared_from_this();
|
||||
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
|
||||
[this, state, query_id, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
[this, state, query_id, mem_tracker, shared_state_holder, &parent, execution_context,
|
||||
submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
|
||||
@ -47,11 +47,6 @@ private:
|
||||
|
||||
friend class SpillSortSinkOperatorX;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<SpillSortSharedState> _shared_state_holder;
|
||||
|
||||
std::unique_ptr<RuntimeState> _runtime_state;
|
||||
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
|
||||
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
|
||||
|
||||
@ -88,15 +88,25 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
}};
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::weak_ptr<SpillSortSharedState> shared_state_holder = _shared_state->shared_from_this();
|
||||
auto query_id = state->query_id();
|
||||
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto spill_func = [this, state, query_id, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
auto spill_func = [this, state, query_id, mem_tracker, &parent, shared_state_holder,
|
||||
execution_context, submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
execution_context_lock = execution_context.lock();
|
||||
}
|
||||
if (!shared_state_sptr || !execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(query_id)
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
@ -104,7 +114,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_merge_sort_timer);
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_status.ok() || state->is_cancelled()) {
|
||||
if (!_status.ok()) {
|
||||
|
||||
@ -57,11 +57,6 @@ protected:
|
||||
bool _opened = false;
|
||||
Status _status;
|
||||
|
||||
/// Resources in shared state will be released when the operator is closed,
|
||||
/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts.
|
||||
/// So, we need hold the pointer of shared state.
|
||||
std::shared_ptr<SpillSortSharedState> _shared_state_holder;
|
||||
|
||||
int64_t _external_sort_bytes_threshold = 134217728; // 128M
|
||||
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
|
||||
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
|
||||
|
||||
Reference in New Issue
Block a user