[improvement](spill) improve cancel of spill and improve log printing (#33229)
* [improvement](spill) improve cancel of spill and improve log printing * fix
This commit is contained in:
@ -148,9 +148,6 @@ Status PartitionedAggSinkOperatorX::open(RuntimeState* state) {
|
||||
return _agg_sink_operator->open(state);
|
||||
}
|
||||
|
||||
Status PartitionedAggSinkOperatorX::close(RuntimeState* state) {
|
||||
return _agg_sink_operator->close(state);
|
||||
}
|
||||
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
|
||||
bool eos) {
|
||||
auto& local_state = get_local_state(state);
|
||||
@ -227,8 +224,9 @@ Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
|
||||
}
|
||||
|
||||
Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory"
|
||||
<< ", eos: " << _eos;
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << Base::_parent->id()
|
||||
<< " revoke_memory"
|
||||
<< ", eos: " << _eos;
|
||||
RETURN_IF_ERROR(Base::_shared_state->sink_status);
|
||||
if (!_shared_state->is_spilled) {
|
||||
_shared_state->is_spilled = true;
|
||||
@ -258,28 +256,33 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
[this, &parent, state, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
LOG(INFO) << "query " << print_id(state->query_id())
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
SCOPED_TIMER(Base::_spill_timer);
|
||||
Defer defer {[&]() {
|
||||
if (!Base::_shared_state->sink_status.ok()) {
|
||||
LOG(WARNING)
|
||||
<< "agg node " << Base::_parent->id()
|
||||
<< " revoke_memory error: " << Base::_shared_state->sink_status;
|
||||
} else {
|
||||
LOG(INFO) << " agg node " << Base::_parent->id() << " revoke_memory finish"
|
||||
<< ", eos: " << _eos;
|
||||
}
|
||||
{
|
||||
if (_eos) {
|
||||
Base::_dependency->set_ready_to_read();
|
||||
_finish_dependency->set_ready();
|
||||
} else {
|
||||
Base::_dependency->Dependency::set_ready();
|
||||
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
LOG(WARNING)
|
||||
<< "query " << print_id(state->query_id()) << " agg node "
|
||||
<< Base::_parent->id()
|
||||
<< " revoke_memory error: " << Base::_shared_state->sink_status;
|
||||
}
|
||||
_shared_state->close();
|
||||
} else {
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
|
||||
<< Base::_parent->id() << " revoke_memory finish"
|
||||
<< ", eos: " << _eos;
|
||||
}
|
||||
|
||||
if (_eos) {
|
||||
Base::_dependency->set_ready_to_read();
|
||||
_finish_dependency->set_ready();
|
||||
} else {
|
||||
Base::_dependency->Dependency::set_ready();
|
||||
}
|
||||
}};
|
||||
auto* runtime_state = _runtime_state.get();
|
||||
|
||||
@ -71,7 +71,8 @@ public:
|
||||
std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos(
|
||||
Base::_shared_state->partition_count);
|
||||
auto& iter = Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator;
|
||||
while (iter != Base::_shared_state->in_mem_shared_state->aggregate_data_container->end()) {
|
||||
while (iter != Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() &&
|
||||
!state->is_cancelled()) {
|
||||
const auto& key = iter.template get_key<typename HashTableType::key_type>();
|
||||
auto partition_index = Base::_shared_state->get_partition_index(hash_table.hash(key));
|
||||
spill_infos[partition_index].keys_.emplace_back(key);
|
||||
@ -93,7 +94,7 @@ public:
|
||||
++iter;
|
||||
}
|
||||
auto hash_null_key_data = hash_table.has_null_key_data();
|
||||
for (int i = 0; i < Base::_shared_state->partition_count; ++i) {
|
||||
for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) {
|
||||
auto spill_null_key_data =
|
||||
(hash_null_key_data && i == Base::_shared_state->partition_count - 1);
|
||||
if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
|
||||
@ -160,7 +161,7 @@ public:
|
||||
SCOPED_TIMER(_spill_write_disk_timer);
|
||||
Status status;
|
||||
Defer defer {[&]() { spill_stream->end_spill(status); }};
|
||||
status = spill_stream->spill_block(block_, false);
|
||||
status = spill_stream->spill_block(state, block_, false);
|
||||
return status;
|
||||
});
|
||||
if (!status.ok()) {
|
||||
@ -320,8 +321,6 @@ public:
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
|
||||
|
||||
DataDistribution required_data_distribution() const override {
|
||||
|
||||
@ -24,7 +24,6 @@
|
||||
#include "common/status.h"
|
||||
#include "pipeline/exec/operator.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec//utils/util.hpp"
|
||||
#include "vec/spill/spill_stream_manager.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
@ -123,12 +122,19 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
|
||||
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
|
||||
bool* eos) {
|
||||
auto& local_state = get_local_state(state);
|
||||
Defer defer {[&]() {
|
||||
if (!local_state._status.ok() || *eos) {
|
||||
local_state._shared_state->close();
|
||||
}
|
||||
}};
|
||||
|
||||
local_state.inc_running_big_mem_op_num(state);
|
||||
SCOPED_TIMER(local_state.exec_time_counter());
|
||||
RETURN_IF_ERROR(local_state._status);
|
||||
|
||||
if (local_state._shared_state->is_spilled) {
|
||||
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
|
||||
local_state._status = local_state.initiate_merge_spill_partition_agg_data(state);
|
||||
RETURN_IF_ERROR(local_state._status);
|
||||
|
||||
/// When `_is_merging` is true means we are reading spilled data and merging the data into hash table.
|
||||
if (local_state._is_merging) {
|
||||
@ -138,7 +144,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
|
||||
|
||||
// not spilled in sink or current partition still has data
|
||||
auto* runtime_state = local_state._runtime_state.get();
|
||||
RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, eos));
|
||||
local_state._status = _agg_source_operator->get_block(runtime_state, block, eos);
|
||||
RETURN_IF_ERROR(local_state._status);
|
||||
if (local_state._runtime_state) {
|
||||
auto* source_local_state =
|
||||
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
|
||||
@ -190,7 +197,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
}
|
||||
|
||||
_is_merging = true;
|
||||
LOG(INFO) << "agg node " << _parent->node_id() << " merge spilled agg data";
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << _parent->node_id()
|
||||
<< " merge spilled agg data";
|
||||
|
||||
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
|
||||
_dependency->Dependency::block();
|
||||
@ -206,7 +214,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
[this, state, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
LOG(INFO) << "query " << print_id(state->query_id())
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
// FIXME: return status is meaningless?
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
@ -214,12 +223,17 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_status.ok()) {
|
||||
LOG(WARNING) << "agg node " << _parent->node_id()
|
||||
<< " merge spilled agg data error: " << _status;
|
||||
if (!_status.ok() || state->is_cancelled()) {
|
||||
if (!_status.ok()) {
|
||||
LOG(WARNING) << "query " << print_id(state->query_id())
|
||||
<< " agg node " << _parent->node_id()
|
||||
<< " merge spilled agg data error: " << _status;
|
||||
}
|
||||
_shared_state->close();
|
||||
} else if (_shared_state->spill_partitions.empty()) {
|
||||
LOG(INFO) << "agg node " << _parent->node_id()
|
||||
<< " merge spilled agg data finish";
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id())
|
||||
<< " agg node " << _parent->node_id()
|
||||
<< " merge spilled agg data finish";
|
||||
}
|
||||
Base::_shared_state->in_mem_shared_state->aggregate_data_container
|
||||
->init_once();
|
||||
@ -237,7 +251,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
Base::_spill_read_bytes, Base::_spill_read_wait_io_timer);
|
||||
vectorized::Block block;
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
while (!eos && !state->is_cancelled()) {
|
||||
{
|
||||
SCOPED_TIMER(Base::_spill_recover_time);
|
||||
_status = stream->read_next_block_sync(&block, &eos);
|
||||
|
||||
@ -195,7 +195,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
|
||||
if (_spill_status_ok) {
|
||||
auto build_block = mutable_block->to_block();
|
||||
DCHECK_EQ(mutable_block->rows(), 0);
|
||||
auto st = build_spilling_stream->spill_block(build_block, false);
|
||||
auto st = build_spilling_stream->spill_block(state, build_block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_spill_status_ok = false;
|
||||
@ -262,7 +262,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
|
||||
auto block = std::move(blocks.back());
|
||||
blocks.pop_back();
|
||||
if (_spill_status_ok) {
|
||||
auto st = spilling_stream->spill_block(block, false);
|
||||
auto st = spilling_stream->spill_block(state, block, false);
|
||||
if (!st.ok()) {
|
||||
std::unique_lock<std::mutex> lock(_spill_lock);
|
||||
_spill_status_ok = false;
|
||||
|
||||
@ -231,7 +231,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
|
||||
if (_spill_status_ok) {
|
||||
auto block = partitioned_block->to_block();
|
||||
partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty());
|
||||
auto st = spilling_stream->spill_block(block, false);
|
||||
auto st = spilling_stream->spill_block(state(), block, false);
|
||||
if (!st.ok()) {
|
||||
_spill_status_ok = false;
|
||||
std::lock_guard<std::mutex> l(_spill_status_lock);
|
||||
|
||||
@ -203,8 +203,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
profile()->add_info_string("Spilled", "true");
|
||||
}
|
||||
|
||||
LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory"
|
||||
<< ", eos: " << _eos;
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << Base::_parent->id()
|
||||
<< " revoke_memory"
|
||||
<< ", eos: " << _eos;
|
||||
RETURN_IF_ERROR(Base::_shared_state->sink_status);
|
||||
|
||||
auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
|
||||
@ -234,73 +235,78 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
MonotonicStopWatch submit_timer;
|
||||
submit_timer.start();
|
||||
|
||||
status =
|
||||
ExecEnv::GetInstance()
|
||||
->spill_stream_mgr()
|
||||
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
|
||||
->submit_func([this, state, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
status = ExecEnv::GetInstance()
|
||||
->spill_stream_mgr()
|
||||
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
|
||||
->submit_func([this, state, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "query " << print_id(state->query_id())
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
LOG(WARNING)
|
||||
<< "sort node " << _parent->id()
|
||||
<< " revoke memory error: " << _shared_state->sink_status;
|
||||
} else {
|
||||
LOG(INFO)
|
||||
<< "sort node " << _parent->id() << " revoke memory finish";
|
||||
}
|
||||
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
LOG(WARNING) << "query " << print_id(state->query_id())
|
||||
<< " sort node " << _parent->id()
|
||||
<< " revoke memory error: "
|
||||
<< _shared_state->sink_status;
|
||||
}
|
||||
_shared_state->close();
|
||||
} else {
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id())
|
||||
<< " sort node " << _parent->id()
|
||||
<< " revoke memory finish";
|
||||
}
|
||||
|
||||
_spilling_stream->end_spill(_shared_state->sink_status);
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
_shared_state->clear();
|
||||
}
|
||||
_spilling_stream->end_spill(_shared_state->sink_status);
|
||||
if (!_shared_state->sink_status.ok()) {
|
||||
_shared_state->close();
|
||||
}
|
||||
|
||||
_spilling_stream.reset();
|
||||
if (_eos) {
|
||||
_dependency->set_ready_to_read();
|
||||
_finish_dependency->set_ready();
|
||||
} else {
|
||||
_dependency->Dependency::set_ready();
|
||||
}
|
||||
}};
|
||||
_spilling_stream.reset();
|
||||
if (_eos) {
|
||||
_dependency->set_ready_to_read();
|
||||
_finish_dependency->set_ready();
|
||||
} else {
|
||||
_dependency->Dependency::set_ready();
|
||||
}
|
||||
}};
|
||||
|
||||
_shared_state->sink_status =
|
||||
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
_shared_state->sink_status = parent._sort_sink_operator->prepare_for_spill(
|
||||
_runtime_state.get());
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
|
||||
auto* sink_local_state = _runtime_state->get_sink_local_state();
|
||||
update_profile(sink_local_state->profile());
|
||||
auto* sink_local_state = _runtime_state->get_sink_local_state();
|
||||
update_profile(sink_local_state->profile());
|
||||
|
||||
bool eos = false;
|
||||
vectorized::Block block;
|
||||
while (!eos && !state->is_cancelled()) {
|
||||
{
|
||||
SCOPED_TIMER(_spill_merge_sort_timer);
|
||||
_shared_state->sink_status =
|
||||
parent._sort_sink_operator->merge_sort_read_for_spill(
|
||||
_runtime_state.get(), &block,
|
||||
_shared_state->spill_block_batch_row_count, &eos);
|
||||
}
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
{
|
||||
SCOPED_TIMER(Base::_spill_timer);
|
||||
_shared_state->sink_status =
|
||||
_spilling_stream->spill_block(block, eos);
|
||||
}
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
block.clear_column_data();
|
||||
}
|
||||
parent._sort_sink_operator->reset(_runtime_state.get());
|
||||
bool eos = false;
|
||||
vectorized::Block block;
|
||||
while (!eos && !state->is_cancelled()) {
|
||||
{
|
||||
SCOPED_TIMER(_spill_merge_sort_timer);
|
||||
_shared_state->sink_status =
|
||||
parent._sort_sink_operator->merge_sort_read_for_spill(
|
||||
_runtime_state.get(), &block,
|
||||
_shared_state->spill_block_batch_row_count, &eos);
|
||||
}
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
{
|
||||
SCOPED_TIMER(Base::_spill_timer);
|
||||
_shared_state->sink_status =
|
||||
_spilling_stream->spill_block(state, block, eos);
|
||||
}
|
||||
RETURN_IF_ERROR(_shared_state->sink_status);
|
||||
block.clear_column_data();
|
||||
}
|
||||
parent._sort_sink_operator->reset(_runtime_state.get());
|
||||
|
||||
return Status::OK();
|
||||
});
|
||||
return Status::OK();
|
||||
});
|
||||
if (!status.ok()) {
|
||||
_spilling_stream->end_spill(status);
|
||||
|
||||
|
||||
@ -72,7 +72,8 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
|
||||
}
|
||||
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) {
|
||||
auto& parent = Base::_parent->template cast<Parent>();
|
||||
LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data";
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->node_id()
|
||||
<< " merge spill data";
|
||||
_dependency->Dependency::block();
|
||||
|
||||
Status status;
|
||||
@ -91,7 +92,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
auto spill_func = [this, state, &parent, execution_context, submit_timer] {
|
||||
auto execution_context_lock = execution_context.lock();
|
||||
if (!execution_context_lock) {
|
||||
LOG(INFO) << "execution_context released, maybe query was cancelled.";
|
||||
LOG(INFO) << "query " << print_id(state->query_id())
|
||||
<< " execution_context released, maybe query was cancelled.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -99,21 +101,30 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
SCOPED_TIMER(_spill_merge_sort_timer);
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
Defer defer {[&]() {
|
||||
if (!_status.ok()) {
|
||||
LOG(WARNING) << "sort node " << _parent->node_id()
|
||||
<< " merge spill data error: " << _status;
|
||||
if (!_status.ok() || state->is_cancelled()) {
|
||||
if (!_status.ok()) {
|
||||
LOG(WARNING) << "query " << print_id(state->query_id()) << " sort node "
|
||||
<< _parent->node_id() << " merge spill data error: " << _status;
|
||||
}
|
||||
_shared_state->close();
|
||||
for (auto& stream : _current_merging_streams) {
|
||||
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
|
||||
}
|
||||
_current_merging_streams.clear();
|
||||
} else {
|
||||
LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data finish";
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node "
|
||||
<< _parent->node_id() << " merge spill data finish";
|
||||
}
|
||||
_dependency->Dependency::set_ready();
|
||||
}};
|
||||
vectorized::Block merge_sorted_block;
|
||||
vectorized::SpillStreamSPtr tmp_stream;
|
||||
while (true) {
|
||||
while (!state->is_cancelled()) {
|
||||
int max_stream_count = _calc_spill_blocks_to_merge();
|
||||
LOG(INFO) << "sort node " << _parent->id() << " merge spill streams, streams count: "
|
||||
<< _shared_state->sorted_streams.size()
|
||||
<< ", curren merge max stream count: " << max_stream_count;
|
||||
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->id()
|
||||
<< " merge spill streams, streams count: "
|
||||
<< _shared_state->sorted_streams.size()
|
||||
<< ", curren merge max stream count: " << max_stream_count;
|
||||
{
|
||||
SCOPED_TIMER(Base::_spill_recover_time);
|
||||
_status = _create_intermediate_merger(
|
||||
@ -150,7 +161,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
_status = _merger->get_next(&merge_sorted_block, &eos);
|
||||
}
|
||||
RETURN_IF_ERROR(_status);
|
||||
_status = tmp_stream->spill_block(merge_sorted_block, eos);
|
||||
_status = tmp_stream->spill_block(state, merge_sorted_block, eos);
|
||||
RETURN_IF_ERROR(_status);
|
||||
}
|
||||
}
|
||||
@ -159,7 +170,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
}
|
||||
_current_merging_streams.clear();
|
||||
}
|
||||
DCHECK(false);
|
||||
return Status::OK();
|
||||
};
|
||||
return ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
|
||||
@ -242,6 +252,15 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) {
|
||||
Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
|
||||
bool* eos) {
|
||||
auto& local_state = get_local_state(state);
|
||||
Defer defer {[&]() {
|
||||
if (!local_state._status.ok() || *eos) {
|
||||
local_state._shared_state->close();
|
||||
for (auto& stream : local_state._current_merging_streams) {
|
||||
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
|
||||
}
|
||||
local_state._current_merging_streams.clear();
|
||||
}
|
||||
}};
|
||||
if (local_state.Base::_shared_state->enable_spill) {
|
||||
local_state.inc_running_big_mem_op_num(state);
|
||||
}
|
||||
@ -250,13 +269,16 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc
|
||||
|
||||
if (local_state.Base::_shared_state->enable_spill && local_state._shared_state->is_spilled) {
|
||||
if (!local_state._merger) {
|
||||
return local_state.initiate_merge_sort_spill_streams(state);
|
||||
local_state._status = local_state.initiate_merge_sort_spill_streams(state);
|
||||
return local_state._status;
|
||||
} else {
|
||||
RETURN_IF_ERROR(local_state._merger->get_next(block, eos));
|
||||
local_state._status = local_state._merger->get_next(block, eos);
|
||||
RETURN_IF_ERROR(local_state._status);
|
||||
}
|
||||
} else {
|
||||
RETURN_IF_ERROR(
|
||||
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
|
||||
local_state._status =
|
||||
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
|
||||
RETURN_IF_ERROR(local_state._status);
|
||||
}
|
||||
local_state.reached_limit(block, eos);
|
||||
return Status::OK();
|
||||
|
||||
@ -231,11 +231,27 @@ void AggSpillPartition::close() {
|
||||
}
|
||||
|
||||
void PartitionedAggSharedState::close() {
|
||||
// need to use CAS instead of only `if (!is_closed)` statement,
|
||||
// to avoid concurrent entry of close() both pass the if statement
|
||||
bool false_close = false;
|
||||
if (!is_closed.compare_exchange_strong(false_close, true)) {
|
||||
return;
|
||||
}
|
||||
DCHECK(!false_close && is_closed);
|
||||
for (auto partition : spill_partitions) {
|
||||
partition->close();
|
||||
}
|
||||
spill_partitions.clear();
|
||||
}
|
||||
void SpillSortSharedState::clear() {
|
||||
|
||||
void SpillSortSharedState::close() {
|
||||
// need to use CAS instead of only `if (!is_closed)` statement,
|
||||
// to avoid concurrent entry of close() both pass the if statement
|
||||
bool false_close = false;
|
||||
if (!is_closed.compare_exchange_strong(false_close, true)) {
|
||||
return;
|
||||
}
|
||||
DCHECK(!false_close && is_closed);
|
||||
for (auto& stream : sorted_streams) {
|
||||
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
|
||||
}
|
||||
|
||||
@ -398,6 +398,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
|
||||
size_t max_partition_index;
|
||||
Status sink_status;
|
||||
bool is_spilled = false;
|
||||
std::atomic_bool is_closed = false;
|
||||
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
|
||||
|
||||
size_t get_partition_index(size_t hash_value) const {
|
||||
@ -467,11 +468,12 @@ struct SpillSortSharedState : public BasicSharedState,
|
||||
LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count;
|
||||
}
|
||||
}
|
||||
void clear();
|
||||
void close();
|
||||
|
||||
SortSharedState* in_mem_shared_state = nullptr;
|
||||
bool enable_spill = false;
|
||||
bool is_spilled = false;
|
||||
std::atomic_bool is_closed = false;
|
||||
Status sink_status;
|
||||
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
|
||||
|
||||
|
||||
@ -312,7 +312,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_
|
||||
wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
|
||||
if (is_wg_mem_high_water_mark) {
|
||||
if (revocable_mem_bytes > min_revocable_mem_bytes) {
|
||||
LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark";
|
||||
LOG_EVERY_N(INFO, 10) << "revoke memory, hight water mark";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -332,11 +332,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_
|
||||
mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
|
||||
}
|
||||
|
||||
LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, revocable_mem_bytes: "
|
||||
<< PrettyPrinter::print_bytes(revocable_mem_bytes)
|
||||
<< ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op)
|
||||
<< ", min_revocable_mem_bytes: "
|
||||
<< PrettyPrinter::print_bytes(min_revocable_mem_bytes);
|
||||
LOG_EVERY_N(INFO, 10) << "revoke memory, low water mark, revocable_mem_bytes: "
|
||||
<< PrettyPrinter::print_bytes(revocable_mem_bytes)
|
||||
<< ", mem_limit_of_op: "
|
||||
<< PrettyPrinter::print_bytes(mem_limit_of_op)
|
||||
<< ", min_revocable_mem_bytes: "
|
||||
<< PrettyPrinter::print_bytes(min_revocable_mem_bytes);
|
||||
return (revocable_mem_bytes > mem_limit_of_op ||
|
||||
revocable_mem_bytes > min_revocable_mem_bytes);
|
||||
} else {
|
||||
|
||||
@ -580,19 +580,23 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
|
||||
Status st = Status::OK();
|
||||
|
||||
const bool has_cancel_reason = request->has_cancel_reason();
|
||||
LOG(INFO) << fmt::format("Cancel instance {}, reason: {}", print_id(tid),
|
||||
has_cancel_reason
|
||||
? PPlanFragmentCancelReason_Name(request->cancel_reason())
|
||||
: "INTERNAL_ERROR");
|
||||
if (request->has_fragment_id()) {
|
||||
TUniqueId query_id;
|
||||
query_id.__set_hi(request->query_id().hi());
|
||||
query_id.__set_lo(request->query_id().lo());
|
||||
LOG(INFO) << fmt::format(
|
||||
"Cancel query {}, reason: {}", print_id(query_id),
|
||||
has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason())
|
||||
: "INTERNAL_ERROR");
|
||||
_exec_env->fragment_mgr()->cancel_fragment(
|
||||
query_id, request->fragment_id(),
|
||||
has_cancel_reason ? request->cancel_reason()
|
||||
: PPlanFragmentCancelReason::INTERNAL_ERROR);
|
||||
} else {
|
||||
LOG(INFO) << fmt::format(
|
||||
"Cancel instance {}, reason: {}", print_id(tid),
|
||||
has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason())
|
||||
: "INTERNAL_ERROR");
|
||||
_exec_env->fragment_mgr()->cancel_instance(
|
||||
tid, has_cancel_reason ? request->cancel_reason()
|
||||
: PPlanFragmentCancelReason::INTERNAL_ERROR);
|
||||
|
||||
@ -46,6 +46,16 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d
|
||||
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path());
|
||||
}
|
||||
|
||||
SpillStream::~SpillStream() {
|
||||
bool exists = false;
|
||||
auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
|
||||
if (status.ok() && exists) {
|
||||
auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(), SPILL_GC_DIR_PREFIX,
|
||||
std::filesystem::path(spill_dir_).filename().string());
|
||||
(void)io::global_local_filesystem()->rename(spill_dir_, gc_dir);
|
||||
}
|
||||
}
|
||||
|
||||
Status SpillStream::prepare() {
|
||||
writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_, data_dir_, spill_dir_);
|
||||
|
||||
@ -103,9 +113,9 @@ Status SpillStream::wait_spill() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SpillStream::spill_block(const Block& block, bool eof) {
|
||||
Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) {
|
||||
size_t written_bytes = 0;
|
||||
RETURN_IF_ERROR(writer_->write(block, written_bytes));
|
||||
RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
|
||||
if (eof) {
|
||||
RETURN_IF_ERROR(writer_->close());
|
||||
writer_.reset();
|
||||
|
||||
@ -40,6 +40,8 @@ public:
|
||||
std::string spill_dir, size_t batch_rows, size_t batch_bytes,
|
||||
RuntimeProfile* profile);
|
||||
|
||||
~SpillStream();
|
||||
|
||||
int64_t id() const { return stream_id_; }
|
||||
|
||||
SpillDataDir* get_data_dir() const { return data_dir_; }
|
||||
@ -51,7 +53,7 @@ public:
|
||||
|
||||
Status prepare_spill();
|
||||
|
||||
Status spill_block(const Block& block, bool eof);
|
||||
Status spill_block(RuntimeState* state, const Block& block, bool eof);
|
||||
|
||||
void end_spill(const Status& status);
|
||||
|
||||
|
||||
@ -198,8 +198,6 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
|
||||
}
|
||||
|
||||
void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
|
||||
stream->close();
|
||||
|
||||
auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(), SPILL_GC_DIR_PREFIX,
|
||||
std::filesystem::path(stream->get_spill_dir()).filename().string());
|
||||
(void)io::global_local_filesystem()->rename(stream->get_spill_dir(), gc_dir);
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "io/fs/local_file_system.h"
|
||||
#include "io/fs/local_file_writer.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "vec/spill/spill_stream_manager.h"
|
||||
|
||||
@ -36,12 +37,6 @@ Status SpillWriter::open() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
SpillWriter::~SpillWriter() {
|
||||
if (!closed_) {
|
||||
(void)Status::Error<ErrorCode::INTERNAL_ERROR>("spill writer not closed correctly");
|
||||
}
|
||||
}
|
||||
|
||||
Status SpillWriter::close() {
|
||||
if (closed_ || !file_writer_) {
|
||||
return Status::OK();
|
||||
@ -68,7 +63,7 @@ Status SpillWriter::close() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SpillWriter::write(const Block& block, size_t& written_bytes) {
|
||||
Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& written_bytes) {
|
||||
written_bytes = 0;
|
||||
DCHECK(file_writer_);
|
||||
auto rows = block.rows();
|
||||
@ -79,7 +74,7 @@ Status SpillWriter::write(const Block& block, size_t& written_bytes) {
|
||||
auto tmp_block = block.clone_empty();
|
||||
const auto& src_data = block.get_columns_with_type_and_name();
|
||||
|
||||
for (size_t row_idx = 0; row_idx < rows;) {
|
||||
for (size_t row_idx = 0; row_idx < rows && !state->is_cancelled();) {
|
||||
tmp_block.clear_column_data();
|
||||
|
||||
auto& dst_data = tmp_block.get_columns_with_type_and_name();
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec/core/block.h"
|
||||
namespace doris {
|
||||
class RuntimeState;
|
||||
|
||||
namespace vectorized {
|
||||
class SpillDataDir;
|
||||
@ -35,13 +36,11 @@ public:
|
||||
file_path_ = dir + "/" + std::to_string(file_index_);
|
||||
}
|
||||
|
||||
~SpillWriter();
|
||||
|
||||
Status open();
|
||||
|
||||
Status close();
|
||||
|
||||
Status write(const Block& block, size_t& written_bytes);
|
||||
Status write(RuntimeState* state, const Block& block, size_t& written_bytes);
|
||||
|
||||
int64_t get_id() const { return stream_id_; }
|
||||
|
||||
|
||||
@ -1726,7 +1726,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
|
||||
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
|
||||
checker = "checkExternalAggPartitionBits", fuzzy = true)
|
||||
public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks.
|
||||
public int externalAggPartitionBits = 5; // means that the hash table will be partitioned into 32 blocks.
|
||||
|
||||
public boolean isEnableJoinSpill() {
|
||||
return enableJoinSpill;
|
||||
|
||||
Reference in New Issue
Block a user