[improvement](spill) optimize the spilling logic of hash join operator (#32202)

This commit is contained in:
Jerry Hu
2024-03-20 08:20:40 +08:00
committed by yiguolei
parent e892774c9a
commit 612d3595e4
6 changed files with 121 additions and 15 deletions

View File

@ -58,6 +58,21 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
_recovery_probe_blocks =
ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "SpillAndPartition");
_spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 1);
_spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime",
"SpillAndPartition", 1);
_spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
TUnit::BYTES, "SpillAndPartition", 1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
TUnit::UNIT, "SpillAndPartition", 1);
_spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime",
"SpillAndPartition", 1);
_spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime",
"SpillAndPartition", 1);
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize",
TUnit::BYTES, "SpillAndPartition", 1);
// Build phase
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
_build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", TUnit::UNIT, "BuildPhase");
@ -141,7 +156,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
uint32_t partition_index) {
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block || mutable_block->rows() == 0) {
if (!mutable_block || mutable_block->rows() < state->batch_size()) {
--_spilling_task_count;
return Status::OK();
}
@ -153,6 +168,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _runtime_profile.get()));
RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
build_spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer);
}
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@ -191,18 +208,28 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer);
}
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
spilling_stream->get_spill_root_dir());
auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
blocks.emplace_back(partitioned_block->to_block());
partitioned_block.reset();
}
if (!blocks.empty()) {
return spill_io_pool->submit_func([state, &blocks, &spilling_stream, this] {
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
for (auto& block : blocks) {
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
while (!blocks.empty()) {
auto block = std::move(blocks.back());
blocks.pop_back();
if (_spill_status_ok) {
auto st = spilling_stream->spill_block(block, false);
if (!st.ok()) {
@ -217,8 +244,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}
}
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
blocks.clear();
--_spilling_task_count;
if (_spilling_task_count == 0) {
@ -241,6 +266,8 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
if (build_spilling_stream) {
build_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(build_spilling_stream->spill_eof());
build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time,
_spill_read_bytes);
}
auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
@ -248,6 +275,8 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
if (probe_spilling_stream) {
probe_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time,
_spill_read_bytes);
}
return Status::OK();
@ -259,6 +288,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
LOG(INFO) << "no data need to recovery for partition: " << partition_index
<< ", node id: " << _parent->id() << ", task id: " << state->task_id();
return Status::OK();
}
@ -288,6 +319,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
continue;
}
DCHECK_EQ(mutable_block->columns(), block.columns());
if (mutable_block->empty()) {
*mutable_block = std::move(block);
} else {
@ -301,6 +333,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
}
LOG(INFO) << "recovery data done for partition: " << spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
_dependency->set_ready();
@ -350,6 +383,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
}
if (eos) {
LOG(INFO) << "recovery probe data done: " << spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
@ -401,10 +435,17 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt
return _probe_operator->init(tnode_, state);
}
Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::prepare(state));
// here do NOT call `OperatorXBase::prepare(state)`
// RETURN_IF_ERROR(OperatorXBase::prepare(state));
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
RETURN_IF_ERROR(_probe_operator->set_child(_child_x));
RETURN_IF_ERROR(_probe_operator->set_child(_build_side_child));
DCHECK(_build_side_child != nullptr);
_probe_operator->set_build_side_child(_build_side_child);
RETURN_IF_ERROR(_sink_operator->set_child(_build_side_child));
RETURN_IF_ERROR(_probe_operator->prepare(state));
RETURN_IF_ERROR(_sink_operator->prepare(state));
@ -524,6 +565,9 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
partitioned_block.reset();
}
RETURN_IF_ERROR(_sink_operator->sink(local_state._runtime_state.get(), &block, true));
LOG(INFO) << "internal build operator finished, node id: " << id()
<< ", task id: " << state->task_id()
<< ", partition: " << local_state._partition_cursor;
return Status::OK();
}
@ -615,18 +659,25 @@ bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
size_t mem_size = 0;
uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0;
DCHECK_GE(spilling_start, local_state._partition_cursor);
auto& partitioned_build_blocks = local_state._shared_state->partitioned_build_blocks;
auto& probe_blocks = local_state._probe_blocks;
for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; ++i) {
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
if (build_block && build_block->rows() > 0) {
if (build_block && build_block->rows() >= state->batch_size()) {
mem_size += build_block->allocated_bytes();
}
for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
}
auto& partitioned_block = local_state._partitioned_blocks[i];
if (partitioned_block && partitioned_block->rows() >= state->batch_size()) {
mem_size += partitioned_block->allocated_bytes();
}
}
return mem_size;
}
@ -634,14 +685,16 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bool& wait_for_io) {
auto& local_state = get_local_state(state);
wait_for_io = false;
if (_partition_count > (local_state._partition_cursor + 1)) {
local_state._spilling_task_count =
(_partition_count - local_state._partition_cursor - 1) * 2;
uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0;
DCHECK_GE(spilling_start, local_state._partition_cursor);
if (_partition_count > spilling_start) {
local_state._spilling_task_count = (_partition_count - spilling_start) * 2;
} else {
return Status::OK();
}
for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; ++i) {
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
}
@ -657,6 +710,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
}
bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const {
auto& local_state = get_local_state(state);
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return revocable_size > min_revocable_size;
}
auto sys_mem_available = MemInfo::sys_mem_available();
auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark();
@ -692,6 +753,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
bool wait_for_io = false;
RETURN_IF_ERROR(_revoke_memory(state, wait_for_io));
if (wait_for_io) {
local_state._shared_state->need_to_spill = true;
return Status::OK();
}
}

View File

@ -100,6 +100,14 @@ private:
RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
RuntimeProfile::Counter* _spill_read_data_time = nullptr;
RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
RuntimeProfile::Counter* _spill_read_bytes = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
RuntimeProfile::Counter* _build_phase_label = nullptr;
RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;

View File

@ -36,6 +36,11 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_partition_timer = ADD_TIMER(profile(), "PartitionTime");
_partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
_spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillSerializeBlockTime", 1);
_spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteDiskTime", 1);
_spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
return _partitioner->prepare(state, p._child_x->row_desc());
}
@ -51,7 +56,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
if (!mutable_block || mutable_block->rows() == 0) {
if (!mutable_block || mutable_block->rows() < state->batch_size()) {
--_spilling_streams_count;
continue;
}
@ -61,6 +67,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer);
}
auto* spill_io_pool =
@ -79,9 +87,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
if (_spilling_streams_count > 0) {
_shared_state->need_to_spill = true;
std::unique_lock<std::mutex> lock(_spill_lock);
if (_spilling_streams_count > 0) {
_dependency->block();
} else if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
<< ", task id: " << state->task_id();
_dependency->set_ready_to_read();
}
}
return Status::OK();
@ -108,6 +121,11 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
if (_spilling_streams_count == 0) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
if (_child_eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id()
<< ", task id: " << state()->task_id();
_dependency->set_ready_to_read();
}
}
}
@ -157,6 +175,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
return local_state._spill_status;
}
local_state._child_eos = eos;
const auto rows = in_block->rows();
if (rows > 0) {
@ -190,9 +210,18 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]),
&(partition_indexes[i][count]));
}
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = revocable_mem_size(state);
if (revocable_size > state->min_revocable_mem()) {
return local_state.revoke_memory(state);
}
}
}
if (eos) {
LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
<< ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
}
@ -207,7 +236,7 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
size_t mem_size = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
if (block) {
if (block && block->rows() >= state->batch_size()) {
mem_size += block->allocated_bytes();
}
}

View File

@ -61,6 +61,8 @@ protected:
std::atomic<bool> _spill_status_ok {true};
std::mutex _spill_lock;
bool _child_eos {false};
Status _spill_status;
std::mutex _spill_status_lock;
@ -68,6 +70,10 @@ protected:
RuntimeProfile::Counter* _partition_timer = nullptr;
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
};
class PartitionedHashJoinSinkOperatorX

View File

@ -578,6 +578,7 @@ struct HashJoinSharedState : public JoinSharedState {
struct PartitionedHashJoinSharedState : public HashJoinSharedState {
std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks;
std::vector<vectorized::SpillStreamSPtr> spilled_streams;
bool need_to_spill = false;
};
struct NestedLoopJoinSharedState : public JoinSharedState {

View File

@ -1029,7 +1029,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
tnode.hash_join_node.is_broadcast_join;
const auto enable_join_spill = _runtime_state->enable_join_spill();
if (enable_join_spill && !is_broadcast_join) {
const uint32_t partition_count = 16;
const uint32_t partition_count = 32;
op.reset(new PartitionedHashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs,
partition_count));
RETURN_IF_ERROR(cur_pipe->add_operator(op));