[chore](spill) add timers for performance tuning (#33185)
This commit is contained in:
@ -252,13 +252,16 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
|
||||
[this, &parent, state, execution_context] {
|
||||
[this, &parent, state, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
SCOPED_TIMER(Base::_spill_timer);
|
||||
Defer defer {[&]() {
|
||||
|
||||
@ -197,9 +197,13 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
RETURN_IF_ERROR(
|
||||
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
|
||||
[this, state, execution_context] {
|
||||
[this, state, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
@ -207,6 +211,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_status.ok()) {
|
||||
|
||||
@ -25,10 +25,11 @@ namespace doris::pipeline {
|
||||
|
||||
PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeState* state,
|
||||
OperatorXBase* parent)
|
||||
: JoinProbeLocalState(state, parent) {}
|
||||
: PipelineXSpillLocalState(state, parent),
|
||||
_child_block(vectorized::Block::create_unique()) {}
|
||||
|
||||
Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
|
||||
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
|
||||
RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
|
||||
_internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
|
||||
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
|
||||
|
||||
@ -38,45 +39,32 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
|
||||
RETURN_IF_ERROR(_partitioner->init(p._probe_exprs));
|
||||
RETURN_IF_ERROR(_partitioner->prepare(state, p._child_x->row_desc()));
|
||||
|
||||
_spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "SpillAndPartition");
|
||||
_partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "SpillAndPartition");
|
||||
_partition_shuffle_timer =
|
||||
ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "SpillAndPartition");
|
||||
_spill_build_rows =
|
||||
ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, "SpillAndPartition");
|
||||
_recovery_build_rows =
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, "SpillAndPartition");
|
||||
_spill_probe_rows =
|
||||
ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, "SpillAndPartition");
|
||||
_recovery_probe_rows =
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, "SpillAndPartition");
|
||||
_spill_build_blocks =
|
||||
ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, "SpillAndPartition");
|
||||
_spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition");
|
||||
_partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", "Partition");
|
||||
_partition_shuffle_timer = ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", "Partition");
|
||||
_spill_build_rows = ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, "Spill");
|
||||
_spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1);
|
||||
_recovery_build_rows = ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, "Spill");
|
||||
_recovery_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryBuildTime", "Spill", 1);
|
||||
_spill_probe_rows = ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, "Spill");
|
||||
_recovery_probe_rows = ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, "Spill");
|
||||
_spill_build_blocks = ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, "Spill");
|
||||
_recovery_build_blocks =
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, "SpillAndPartition");
|
||||
_spill_probe_blocks =
|
||||
ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, "SpillAndPartition");
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, "Spill");
|
||||
_spill_probe_blocks = ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, "Spill");
|
||||
_spill_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", "Spill", 1);
|
||||
_recovery_probe_blocks =
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "SpillAndPartition");
|
||||
ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "Spill");
|
||||
_recovery_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "RecoveryProbeTime", "Spill", 1);
|
||||
|
||||
_spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
|
||||
Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 1);
|
||||
_spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime",
|
||||
"SpillAndPartition", 1);
|
||||
_spill_serialize_block_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1);
|
||||
_spill_write_disk_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1);
|
||||
_spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
|
||||
TUnit::BYTES, "SpillAndPartition", 1);
|
||||
TUnit::BYTES, "Spill", 1);
|
||||
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
|
||||
TUnit::UNIT, "SpillAndPartition", 1);
|
||||
_spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime",
|
||||
"SpillAndPartition", 1);
|
||||
_spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime",
|
||||
"SpillAndPartition", 1);
|
||||
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize",
|
||||
TUnit::BYTES, "SpillAndPartition", 1);
|
||||
_spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime",
|
||||
"SpillAndPartition", 1);
|
||||
_spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime",
|
||||
"SpillAndPartition", 1);
|
||||
TUnit::UNIT, "Spill", 1);
|
||||
|
||||
// Build phase
|
||||
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
|
||||
@ -109,6 +97,10 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
|
||||
_process_other_join_conjunct_timer =
|
||||
ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase");
|
||||
_init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime", "ProbePhase");
|
||||
_probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase");
|
||||
_join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer", "ProbePhase");
|
||||
_build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock", "ProbePhase");
|
||||
_probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows", TUnit::UNIT, "ProbePhase");
|
||||
return Status::OK();
|
||||
}
|
||||
#define UPDATE_PROFILE(counter, name) \
|
||||
@ -149,7 +141,7 @@ void PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch
|
||||
#undef UPDATE_PROFILE
|
||||
|
||||
Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(PipelineXLocalStateBase::open(state));
|
||||
RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
|
||||
return _partitioner->open(state);
|
||||
}
|
||||
Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
|
||||
@ -157,7 +149,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
dec_running_big_mem_op_num(state);
|
||||
RETURN_IF_ERROR(JoinProbeLocalState::close(state));
|
||||
RETURN_IF_ERROR(PipelineXSpillLocalState::close(state));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -187,13 +179,17 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
|
||||
build_spilling_stream->get_spill_root_dir());
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
return spill_io_pool->submit_func(
|
||||
[execution_context, state, &build_spilling_stream, &mutable_block, this] {
|
||||
[execution_context, state, &build_spilling_stream, &mutable_block, submit_timer, this] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
if (_spill_status_ok) {
|
||||
@ -248,14 +244,18 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
|
||||
if (!blocks.empty()) {
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
return spill_io_pool->submit_func(
|
||||
[execution_context, state, &blocks, spilling_stream, this] {
|
||||
[execution_context, state, &blocks, spilling_stream, submit_timer, this] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
_dependency->set_ready();
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_probe_timer);
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
|
||||
while (!blocks.empty() && !state->is_cancelled()) {
|
||||
@ -329,12 +329,19 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context] {
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto read_func = [this, state, &spilled_stream, &mutable_block, execution_context,
|
||||
submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_recovery_build_timer);
|
||||
Defer defer([this] { --_spilling_task_count; });
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
@ -403,12 +410,19 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
|
||||
/// TODO: maybe recovery more blocks each time.
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto read_func = [this, execution_context, state, &spilled_stream, &blocks] {
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto read_func = [this, execution_context, state, &spilled_stream, &blocks, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_recovery_probe_timer);
|
||||
Defer defer([this] { --_spilling_task_count; });
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
@ -827,4 +841,4 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -38,16 +38,13 @@ using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
|
||||
class PartitionedHashJoinProbeOperatorX;
|
||||
|
||||
class PartitionedHashJoinProbeLocalState final
|
||||
: public JoinProbeLocalState<PartitionedHashJoinSharedState,
|
||||
PartitionedHashJoinProbeLocalState> {
|
||||
: public PipelineXSpillLocalState<PartitionedHashJoinSharedState> {
|
||||
public:
|
||||
using Parent = PartitionedHashJoinProbeOperatorX;
|
||||
ENABLE_FACTORY_CREATOR(PartitionedHashJoinProbeLocalState);
|
||||
PartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent);
|
||||
~PartitionedHashJoinProbeLocalState() override = default;
|
||||
|
||||
void add_tuple_is_null_column(vectorized::Block* block) override {}
|
||||
|
||||
Status init(RuntimeState* state, LocalStateInfo& info) override;
|
||||
Status open(RuntimeState* state) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
@ -68,9 +65,15 @@ public:
|
||||
friend class PartitionedHashJoinProbeOperatorX;
|
||||
|
||||
private:
|
||||
template <typename LocalStateType>
|
||||
friend class StatefulOperatorX;
|
||||
|
||||
std::shared_ptr<BasicSharedState> _in_mem_shared_state_sptr;
|
||||
uint32_t _partition_cursor {0};
|
||||
|
||||
std::unique_ptr<vectorized::Block> _child_block;
|
||||
bool _child_eos {false};
|
||||
|
||||
std::mutex _spill_lock;
|
||||
Status _spill_status;
|
||||
|
||||
@ -98,22 +101,21 @@ private:
|
||||
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_build_rows = nullptr;
|
||||
RuntimeProfile::Counter* _spill_build_blocks = nullptr;
|
||||
RuntimeProfile::Counter* _spill_build_timer = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_build_rows = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_build_blocks = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_build_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_probe_rows = nullptr;
|
||||
RuntimeProfile::Counter* _spill_probe_blocks = nullptr;
|
||||
RuntimeProfile::Counter* _spill_probe_timer = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
|
||||
RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
|
||||
|
||||
RuntimeProfile::Counter* _spill_read_data_time = nullptr;
|
||||
RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
|
||||
RuntimeProfile::Counter* _spill_read_bytes = nullptr;
|
||||
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_data_size = nullptr;
|
||||
RuntimeProfile::Counter* _spill_block_count = nullptr;
|
||||
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
|
||||
|
||||
RuntimeProfile::Counter* _build_phase_label = nullptr;
|
||||
RuntimeProfile::Counter* _build_rows_counter = nullptr;
|
||||
@ -137,6 +139,10 @@ private:
|
||||
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
|
||||
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
|
||||
RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
|
||||
RuntimeProfile::Counter* _probe_timer = nullptr;
|
||||
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
|
||||
RuntimeProfile::Counter* _join_filter_timer = nullptr;
|
||||
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
|
||||
};
|
||||
|
||||
class PartitionedHashJoinProbeOperatorX final
|
||||
|
||||
@ -25,7 +25,7 @@ namespace doris::pipeline {
|
||||
|
||||
Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
|
||||
doris::pipeline::LocalSinkStateInfo& info) {
|
||||
RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
|
||||
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::init(state, info));
|
||||
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
|
||||
_shared_state->partitioned_build_blocks.resize(p._partition_count);
|
||||
_shared_state->spilled_streams.resize(p._partition_count);
|
||||
@ -33,30 +33,26 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
|
||||
_partitioner = std::make_unique<PartitionerType>(p._partition_count);
|
||||
RETURN_IF_ERROR(_partitioner->init(p._build_exprs));
|
||||
|
||||
_partition_timer = ADD_TIMER(profile(), "PartitionTime");
|
||||
_partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
|
||||
|
||||
_spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillSerializeBlockTime", 1);
|
||||
_spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteDiskTime", 1);
|
||||
_spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
|
||||
_spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
|
||||
_spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteWaitIOTime", 1);
|
||||
_partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", "Spill", 1);
|
||||
_partition_shuffle_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", "Spill", 1);
|
||||
_spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillBuildTime", "Spill", 1);
|
||||
|
||||
return _partitioner->prepare(state, p._child_x->row_desc());
|
||||
}
|
||||
|
||||
Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
|
||||
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
|
||||
return _partitioner->open(state);
|
||||
}
|
||||
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
|
||||
SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
|
||||
SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
|
||||
if (PipelineXSinkLocalState::_closed) {
|
||||
SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
|
||||
SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
|
||||
if (PipelineXSpillSinkLocalState::_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
dec_running_big_mem_op_num(state);
|
||||
return PipelineXSinkLocalState::close(state, exec_status);
|
||||
return PipelineXSpillSinkLocalState::close(state, exec_status);
|
||||
}
|
||||
|
||||
Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
@ -90,16 +86,23 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
DCHECK(spill_io_pool != nullptr);
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto st = spill_io_pool->submit_func([this, execution_context, state, spilling_stream, i] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
_spill_to_disk(i, spilling_stream);
|
||||
});
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto st = spill_io_pool->submit_func(
|
||||
[this, execution_context, state, spilling_stream, i, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return;
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_build_timer);
|
||||
(void)state; // avoid ut compile error
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
_spill_to_disk(i, spilling_stream);
|
||||
});
|
||||
|
||||
if (!st.ok()) {
|
||||
--_spilling_streams_count;
|
||||
@ -274,4 +277,4 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
|
||||
return local_state.revoke_memory(state);
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -39,7 +39,7 @@ using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
|
||||
class PartitionedHashJoinSinkOperatorX;
|
||||
|
||||
class PartitionedHashJoinSinkLocalState
|
||||
: public PipelineXSinkLocalState<PartitionedHashJoinSharedState> {
|
||||
: public PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState> {
|
||||
public:
|
||||
using Parent = PartitionedHashJoinSinkOperatorX;
|
||||
ENABLE_FACTORY_CREATOR(PartitionedHashJoinSinkLocalState);
|
||||
@ -51,7 +51,7 @@ public:
|
||||
|
||||
protected:
|
||||
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
|
||||
: PipelineXSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
|
||||
: PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
|
||||
|
||||
void _spill_to_disk(uint32_t partition_index,
|
||||
const vectorized::SpillStreamSPtr& spilling_stream);
|
||||
@ -76,11 +76,7 @@ protected:
|
||||
|
||||
RuntimeProfile::Counter* _partition_timer = nullptr;
|
||||
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_data_size = nullptr;
|
||||
RuntimeProfile::Counter* _spill_block_count = nullptr;
|
||||
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_build_timer = nullptr;
|
||||
};
|
||||
|
||||
class PartitionedHashJoinSinkOperatorX
|
||||
@ -139,4 +135,4 @@ private:
|
||||
};
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -55,6 +55,9 @@ void SpillSortSinkLocalState::_init_counters() {
|
||||
|
||||
_spill_merge_sort_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", "Spill", 1);
|
||||
|
||||
_spill_wait_in_queue_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1);
|
||||
}
|
||||
#define UPDATE_PROFILE(counter, name) \
|
||||
do { \
|
||||
@ -227,17 +230,22 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
status =
|
||||
ExecEnv::GetInstance()
|
||||
->spill_stream_mgr()
|
||||
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
|
||||
->submit_func([this, state, &parent, execution_context] {
|
||||
->submit_func([this, state, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
|
||||
@ -43,6 +43,8 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
|
||||
TUnit::BYTES, "Spill", 1);
|
||||
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
|
||||
TUnit::UNIT, "Spill", 1);
|
||||
_spill_wait_in_queue_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", "Spill", 1);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -82,13 +84,18 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
|
||||
auto execution_context = state->get_task_execution_context();
|
||||
_shared_state_holder = _shared_state->shared_from_this();
|
||||
auto spill_func = [this, state, &parent, execution_context] {
|
||||
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
auto spill_func = [this, state, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_TIMER(_spill_merge_sort_timer);
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
|
||||
@ -455,6 +455,8 @@ public:
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", "Spill", 1);
|
||||
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize",
|
||||
TUnit::BYTES, "Spill", 1);
|
||||
_spill_wait_in_queue_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1);
|
||||
_spill_write_wait_io_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1);
|
||||
_spill_read_wait_io_timer =
|
||||
@ -469,6 +471,7 @@ public:
|
||||
RuntimeProfile::Counter* _spill_read_bytes;
|
||||
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
|
||||
};
|
||||
|
||||
class DataSinkOperatorXBase;
|
||||
@ -776,6 +779,8 @@ public:
|
||||
TUnit::BYTES, "Spill", 1);
|
||||
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
|
||||
TUnit::UNIT, "Spill", 1);
|
||||
_spill_wait_in_queue_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1);
|
||||
_spill_write_wait_io_timer =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1);
|
||||
_spill_read_wait_io_timer =
|
||||
@ -789,6 +794,7 @@ public:
|
||||
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_data_size = nullptr;
|
||||
RuntimeProfile::Counter* _spill_block_count = nullptr;
|
||||
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
|
||||
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user