[pipelineX](sort) Support sort operator (#23322)

This commit is contained in:
Gabriel
2023-08-22 19:36:50 +08:00
committed by GitHub
parent 6124eea55e
commit 1609b6cbf2
22 changed files with 452 additions and 73 deletions

View File

@ -59,8 +59,8 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* st
_hash_table_size_counter(nullptr),
_max_row_size_counter(nullptr) {}
Status AggSinkLocalState::init(RuntimeState* state, Dependency* dependency) {
_dependency = (AggDependency*)dependency;
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_dependency = (AggDependency*)info.dependency;
_shared_state = (AggSharedState*)_dependency->shared_state();
_agg_data = _shared_state->agg_data.get();
_agg_arena_pool = _shared_state->agg_arena_pool.get();
@ -79,8 +79,7 @@ Status AggSinkLocalState::init(RuntimeState* state, Dependency* dependency) {
for (size_t i = 0; i < _shared_state->probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _shared_state->probe_expr_ctxs[i]));
}
std::string title = fmt::format("AggSinkLocalState");
_profile = p._pool->add(new RuntimeProfile(title));
_profile = p._pool->add(new RuntimeProfile("AggSinkLocalState"));
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_hash_table_memory_usage =
ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage");
@ -712,9 +711,9 @@ Status AggSinkLocalState::try_spill_disk(bool eos) {
_agg_data->method_variant);
}
AggSinkOperatorX::AggSinkOperatorX(const int id, ObjectPool* pool, const TPlanNode& tnode,
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX(id),
: DataSinkOperatorX(tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
@ -825,7 +824,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
Status AggSinkOperatorX::open(doris::RuntimeState* state) {
Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@ -855,10 +854,10 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_
return Status::OK();
}
Status AggSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) {
Status AggSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) {
auto local_state = AggSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
return local_state->init(state, dependency);
return local_state->init(state, info);
}
Status AggSinkOperatorX::close(RuntimeState* state) {

View File

@ -52,7 +52,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState {
public:
AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state);
Status init(RuntimeState* state, Dependency* dependency) override;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status try_spill_disk(bool eos = false);
@ -312,8 +312,7 @@ private:
class AggSinkOperatorX final : public DataSinkOperatorX {
public:
AggSinkOperatorX(const int id, ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode", _name);
}
@ -322,7 +321,7 @@ public:
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, Dependency* dependency) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
@ -331,7 +330,7 @@ public:
bool can_write(RuntimeState* state) override { return true; }
void get_dependency(DependencySPtr& dependency) override {
dependency.reset(new AggDependency());
dependency.reset(new AggDependency(id()));
}
private:

View File

@ -38,7 +38,7 @@ AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
_dependency = (AggDependency*)info.dependency;
_shared_state = ((AggSharedState*)_dependency->shared_state());
_shared_state = (AggSharedState*)_dependency->shared_state();
_agg_data = _shared_state->agg_data.get();
_get_results_timer = ADD_TIMER(profile(), "GetResultsTime");
_serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");

View File

@ -101,7 +101,8 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
return _parent->cast<ExchangeSinkOperatorX>()._transfer_large_data_by_brpc;
}
Status ExchangeSinkLocalState::init(RuntimeState* state, Dependency* dependency) {
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
_sender_id = info.sender_id;
_broadcast_pb_blocks.resize(config::num_broadcast_buffer);
_broadcast_pb_block_idx = 0;
auto& p = _parent->cast<ExchangeSinkOperatorX>();
@ -187,10 +188,10 @@ segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() {
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
const int id, RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context)
: DataSinkOperatorX(id),
: DataSinkOperatorX(sink.dest_node_id),
_context(context),
_pool(pool),
_row_desc(row_desc),
@ -209,10 +210,10 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
const int id, ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id,
ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context)
: DataSinkOperatorX(id),
: DataSinkOperatorX(dest_node_id),
_context(context),
_pool(pool),
_row_desc(row_desc),
@ -224,11 +225,10 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_name = "ExchangeSinkOperatorX";
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(const int id, ObjectPool* pool,
const RowDescriptor& row_desc,
ExchangeSinkOperatorX::ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context)
: DataSinkOperatorX(id),
: DataSinkOperatorX(0),
_context(context),
_pool(pool),
_row_desc(row_desc),
@ -253,10 +253,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}
Status ExchangeSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) {
Status ExchangeSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) {
auto local_state = ExchangeSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
return local_state->init(state, dependency);
return local_state->init(state, info);
}
Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
@ -523,8 +523,18 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
Status status;
for (int i = 0; i < num_channels; ++i) {
if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
status = channels[i]->add_rows(block, channel2rows[i], eos);
status = channels[i]->add_rows(block, channel2rows[i], false);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
channel2rows[i].clear();
}
}
if (eos) {
for (int i = 0; i < num_channels; ++i) {
if (!channels[i]->is_receiver_eof()) {
status = channels[i]->add_rows(block, channel2rows[i], true);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
}
@ -533,8 +543,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
Status ExchangeSinkOperatorX::try_close(RuntimeState* state) {
auto& local_state = state->get_sink_local_state(id())->cast<ExchangeSinkLocalState>();
DCHECK(local_state._serializer.get_block() == nullptr ||
local_state._serializer.get_block()->rows() == 0);
local_state._serializer.reset_block();
Status final_st = Status::OK();
for (int i = 0; i < local_state.channels.size(); ++i) {
Status st = local_state.channels[i]->close(state);

View File

@ -81,7 +81,7 @@ public:
only_local_exchange(false),
_serializer(this) {}
Status init(RuntimeState* state, Dependency* dependency) override;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
@ -155,17 +155,16 @@ private:
class ExchangeSinkOperatorX final : public DataSinkOperatorX {
public:
ExchangeSinkOperatorX(const int id, RuntimeState* state, ObjectPool* pool,
const RowDescriptor& row_desc, const TDataStreamSink& sink,
ExchangeSinkOperatorX(RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& row_desc,
ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc,
bool send_query_statistics_with_every_batch,
PipelineXFragmentContext* context);
Status init(const TDataSink& tsink) override;
@ -174,7 +173,7 @@ public:
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, Dependency* dependency) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

View File

@ -41,7 +41,7 @@ bool ExchangeSourceOperator::is_pending_finish() const {
}
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState(state, parent), num_rows_skipped(0) {}
: PipelineXLocalState(state, parent), num_rows_skipped(0), is_ready(false) {}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
if (_init) {
@ -65,7 +65,6 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
: OperatorXBase(pool, tnode, descs, op_name),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
_is_ready(false),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
@ -108,11 +107,11 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<ExchangeLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (_is_merging && !_is_ready) {
if (_is_merging && !local_state.is_ready) {
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset));
_is_ready = true;
local_state.is_ready = true;
return Status::OK();
}
bool eos = false;

View File

@ -59,6 +59,7 @@ class ExchangeLocalState : public PipelineXLocalState {
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
bool is_ready;
};
class ExchangeSourceOperatorX final : public OperatorXBase {
@ -83,7 +84,6 @@ private:
friend class ExchangeLocalState;
const int _num_senders;
const bool _is_merging;
bool _is_ready;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

View File

@ -224,6 +224,16 @@ void PipelineXLocalState::reached_limit(vectorized::Block* block, bool* eos) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
void PipelineXLocalState::reached_limit(vectorized::Block* block, SourceState& source_state) {
if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) {
block->set_num_rows(_parent->_limit - _num_rows_returned);
source_state = SourceState::FINISHED;
}
_num_rows_returned += block->rows();
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
std::string DataSinkOperatorX::debug_string() const {
std::stringstream ss;
ss << _name << ", is_closed: " << _is_closed;

View File

@ -485,6 +485,12 @@ struct LocalStateInfo {
Dependency* dependency;
};
// This struct is used only for initializing local sink state.
struct LocalSinkStateInfo {
const int sender_id;
Dependency* dependency;
};
class PipelineXLocalState {
public:
PipelineXLocalState(RuntimeState* state, OperatorXBase* parent)
@ -512,6 +518,7 @@ public:
bool reached_limit() const;
void reached_limit(vectorized::Block* block, bool* eos);
void reached_limit(vectorized::Block* block, SourceState& source_state);
RuntimeProfile* profile() { return _runtime_profile.get(); }
MemTracker* mem_tracker() { return _mem_tracker.get(); }
@ -694,7 +701,7 @@ public:
: _parent(parent_), _state(state_) {}
virtual ~PipelineXSinkLocalState() {}
virtual Status init(RuntimeState* state, Dependency* dependency) { return Status::OK(); }
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) { return Status::OK(); }
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this));
@ -734,7 +741,7 @@ public:
virtual Status init(const TDataSink& tsink) override { return Status::OK(); }
virtual Status setup_local_state(RuntimeState* state, Dependency* dependency) = 0;
virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0;
template <class TARGET>
TARGET& cast() {

View File

@ -50,7 +50,7 @@ bool ResultSinkOperator::can_write() {
return _sink->_sender->can_sink();
}
Status ResultSinkLocalState::init(RuntimeState* state, Dependency* dependency) {
Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
auto& p = _parent->cast<ResultSinkOperatorX>();
auto fragment_instance_id = state->fragment_instance_id();
auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})",
@ -79,10 +79,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, Dependency* dependency) {
return Status::OK();
}
ResultSinkOperatorX::ResultSinkOperatorX(const int id, const RowDescriptor& row_desc,
ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr,
const TResultSink& sink, int buffer_size)
: DataSinkOperatorX(id),
: DataSinkOperatorX(0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_buf_size(buffer_size) {
@ -119,10 +119,10 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Status ResultSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) {
Status ResultSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) {
auto local_state = ResultSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
return local_state->init(state, dependency);
return local_state->init(state, info);
}
Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,

View File

@ -48,7 +48,7 @@ public:
ResultSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state)
: PipelineXSinkLocalState(parent, state) {}
Status init(RuntimeState* state, Dependency* dependency) override;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
private:
friend class ResultSinkOperatorX;
@ -62,12 +62,11 @@ private:
class ResultSinkOperatorX final : public DataSinkOperatorX {
public:
ResultSinkOperatorX(const int id, const RowDescriptor& row_desc,
const std::vector<TExpr>& select_exprs, const TResultSink& sink,
int buffer_size);
ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TResultSink& sink, int buffer_size);
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, Dependency* dependency) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

View File

@ -20,9 +20,163 @@
#include <string>
#include "pipeline/exec/operator.h"
#include "runtime/query_context.h"
#include "vec/common/sort/heap_sorter.h"
#include "vec/common/sort/topn_sorter.h"
namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator)
Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
auto& p = _parent->cast<SortSinkOperatorX>();
_dependency = (SortDependency*)info.dependency;
_shared_state = (SortSharedState*)_dependency->shared_state();
_profile = p._pool->add(new RuntimeProfile("SortSinkLocalState"));
switch (p._algorithm) {
case SortAlgorithm::HEAP_SORT: {
_shared_state->sorter = vectorized::HeapSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc());
break;
}
case SortAlgorithm::TOPN_SORT: {
_shared_state->sorter = vectorized::TopNSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
break;
}
case SortAlgorithm::FULL_SORT: {
_shared_state->sorter = vectorized::FullSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
break;
}
default: {
return Status::InvalidArgument("Invalid sort algorithm!");
}
}
_shared_state->sorter->init_profile(_profile);
SCOPED_TIMER(_profile->total_time_counter());
_profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true");
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_sort_blocks_memory_usage =
ADD_CHILD_COUNTER(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage");
_child_get_next_timer = ADD_TIMER(_profile, "ChildGetResultTime");
_sink_timer = ADD_TIMER(_profile, "PartialSortTotalTime");
return p._vsort_exec_exprs.clone(state, _vsort_exec_exprs);
}
SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX(tnode.node_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
_reuse_mem(true),
_limit(tnode.limit),
_use_topn_opt(tnode.sort_node.use_topn_opt),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {}
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
_is_asc_order = tnode.sort_node.sort_info.is_asc_order;
_nulls_first = tnode.sort_node.sort_info.nulls_first;
// init runtime predicate
if (_use_topn_opt) {
auto query_ctx = state->get_query_ctx();
auto first_sort_expr_node = tnode.sort_node.sort_info.ordering_exprs[0].nodes[0];
if (first_sort_expr_node.node_type == TExprNodeType::SLOT_REF) {
auto first_sort_slot = first_sort_expr_node.slot_ref;
for (auto tuple_desc : _row_descriptor.tuple_descriptors()) {
if (tuple_desc->id() != first_sort_slot.tuple_id) {
continue;
}
for (auto slot : tuple_desc->slots()) {
if (slot->id() == first_sort_slot.slot_id) {
RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init(slot->type().type,
_nulls_first[0]));
break;
}
}
}
}
if (!query_ctx->get_runtime_predicate().inited()) {
return Status::InternalError("runtime predicate is not properly initialized");
}
}
return Status::OK();
}
Status SortSinkOperatorX::prepare(RuntimeState* state) {
const auto& row_desc = _child_x->row_desc();
// If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap sort in priority.
// To do heap sorting, each income block will be filtered by heap-top row. There will be some
// `memcpy` operations. To ensure heap sort will not incur performance fallback, we should
// exclude cases which incoming blocks has string column which is sensitive to operations like
// `filter` and `memcpy`
if (_limit > 0 && _limit + _offset < vectorized::HeapSorter::HEAP_SORT_THRESHOLD &&
(_use_two_phase_read || _use_topn_opt || !row_desc.has_varlen_slots())) {
_algorithm = SortAlgorithm::HEAP_SORT;
_reuse_mem = false;
} else if (_limit > 0 && row_desc.has_varlen_slots() &&
_limit + _offset < vectorized::TopNSorter::TOPN_SORT_THRESHOLD) {
_algorithm = SortAlgorithm::TOPN_SORT;
} else {
_algorithm = SortAlgorithm::FULL_SORT;
}
return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor);
}
Status SortSinkOperatorX::open(RuntimeState* state) {
return _vsort_exec_exprs.open(state);
}
Status SortSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) {
auto local_state = SortSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
return local_state->init(state, info);
}
Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
auto& local_state = state->get_sink_local_state(id())->cast<SortSinkLocalState>();
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input."));
// update runtime predicate
if (_use_topn_opt) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
if (!new_top.is_null() && new_top != local_state.old_top) {
auto& sort_description = local_state._shared_state->sorter->get_sort_description();
auto col = in_block->get_by_position(sort_description[0].column_number);
bool is_reverse = sort_description[0].direction < 0;
auto query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(
query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse));
local_state.old_top = std::move(new_top);
}
}
if (!_reuse_mem) {
in_block->clear();
}
}
if (source_state == SourceState::FINISHED) {
RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read());
local_state._dependency->set_done();
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -20,6 +20,7 @@
#include <stdint.h>
#include "operator.h"
#include "vec/core/field.h"
#include "vec/exec/vsort_node.h"
namespace doris {
@ -43,5 +44,78 @@ public:
bool can_write() override { return true; }
};
enum class SortAlgorithm { HEAP_SORT, TOPN_SORT, FULL_SORT };
class SortSinkOperatorX;
class SortSinkLocalState : public PipelineXSinkLocalState {
ENABLE_FACTORY_CREATOR(SortSinkLocalState);
public:
SortSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state)
: PipelineXSinkLocalState(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
private:
friend class SortSinkOperatorX;
SortDependency* _dependency;
SortSharedState* _shared_state;
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::Counter* _sort_blocks_memory_usage;
RuntimeProfile::Counter* _child_get_next_timer = nullptr;
RuntimeProfile::Counter* _sink_timer = nullptr;
// topn top value
vectorized::Field old_top {vectorized::Field::Types::Null};
};
class SortSinkOperatorX final : public DataSinkOperatorX {
public:
SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode", _name);
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
bool can_write(RuntimeState* state) override { return true; }
void get_dependency(DependencySPtr& dependency) override {
dependency.reset(new SortDependency(id()));
}
private:
friend class SortSinkLocalState;
// Number of rows to skip.
const int64_t _offset;
ObjectPool* _pool;
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
bool _reuse_mem;
const int64_t _limit;
const bool _use_topn_opt;
SortAlgorithm _algorithm;
const RowDescriptor _row_descriptor;
const bool _use_two_phase_read;
};
} // namespace pipeline
} // namespace doris

View File

@ -25,4 +25,52 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(SortSourceOperator, SourceOperator)
SortLocalState::SortLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState(state, parent), _get_next_timer(nullptr) {}
Status SortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
_dependency = (SortDependency*)info.dependency;
_shared_state = (SortSharedState*)_dependency->shared_state();
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
return Status::OK();
}
SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, std::string op_name)
: OperatorXBase(pool, tnode, descs, op_name) {}
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<SortLocalState>();
SCOPED_TIMER(local_state._get_next_timer);
bool eos;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._shared_state->sorter->get_next(state, block, &eos));
local_state.reached_limit(block, &eos);
if (eos) {
_runtime_profile->add_info_string(
"Spilled", local_state._shared_state->sorter->is_spilled() ? "true" : "false");
source_state = SourceState::FINISHED;
}
return Status::OK();
}
bool SortSourceOperatorX::can_read(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<SortLocalState>();
return local_state._dependency->done();
}
Status SortSourceOperatorX::setup_local_state(RuntimeState* state, LocalStateInfo& info) {
auto local_state = SortLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
return local_state->init(state, info);
}
Status SortSourceOperatorX::close(doris::RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<SortLocalState>();
local_state._shared_state->sorter = nullptr;
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -44,5 +44,41 @@ public:
Status open(RuntimeState*) override { return Status::OK(); }
};
class SortSourceOperatorX;
class SortLocalState : public PipelineXLocalState {
ENABLE_FACTORY_CREATOR(SortLocalState);
public:
SortLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
private:
friend class SortSourceOperatorX;
SortDependency* _dependency;
SortSharedState* _shared_state;
RuntimeProfile::Counter* _get_next_timer = nullptr;
};
class SortSourceOperatorX final : public OperatorXBase {
public:
SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
std::string op_name);
bool can_read(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status close(RuntimeState* state) override;
bool is_source() const override { return true; }
private:
friend class SortLocalState;
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -17,23 +17,27 @@
#pragma once
#include "vec/common/sort/sorter.h"
#include "vec/exec/vaggregation_node.h"
namespace doris::pipeline {
namespace doris {
namespace pipeline {
class Dependency;
using DependencySPtr = std::shared_ptr<Dependency>;
class Dependency {
public:
Dependency() : _done(false) {}
Dependency(int id) : _id(id), _done(false) {}
virtual ~Dependency() = default;
[[nodiscard]] bool done() const { return _done; }
void set_done() { _done = true; }
virtual void* shared_state() = 0;
[[nodiscard]] int id() const { return _id; }
private:
int _id;
std::atomic<bool> _done;
};
@ -63,7 +67,7 @@ public:
class AggDependency final : public Dependency {
public:
AggDependency() : Dependency() {
AggDependency(int id) : Dependency(id) {
_mem_tracker = std::make_unique<MemTracker>("AggregateOperator:");
}
~AggDependency() override = default;
@ -118,4 +122,21 @@ private:
MemoryRecord _mem_usage_record;
std::unique_ptr<MemTracker> _mem_tracker;
};
} // namespace doris::pipeline
struct SortSharedState {
public:
std::unique_ptr<vectorized::Sorter> sorter;
};
class SortDependency final : public Dependency {
public:
SortDependency(int id) : Dependency(id) {}
~SortDependency() override = default;
void* shared_state() override { return (void*)&_sort_state; };
private:
SortSharedState _sort_state;
};
} // namespace pipeline
} // namespace doris

View File

@ -49,6 +49,8 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@ -235,8 +237,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
_sink.reset(new ExchangeSinkOperatorX(_sink_idx++, state, pool, row_desc,
thrift_sink.stream_sink, params.destinations,
_sink.reset(new ExchangeSinkOperatorX(state, pool, row_desc, thrift_sink.stream_sink,
params.destinations,
send_query_statistics_with_every_batch, this));
break;
}
@ -246,8 +248,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}
// TODO: figure out good buffer size based on size of output row
_sink.reset(new ResultSinkOperatorX(_sink_idx++, row_desc, output_exprs,
thrift_sink.result_sink,
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink,
vectorized::RESULT_SINK_BUFFER_SIZE));
break;
}
@ -299,7 +300,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto task = std::make_unique<PipelineXTask>(
_pipelines[pip_id], _total_tasks++, _runtime_states[i].get(), this,
_pipelines[pip_id]->pipeline_profile(), scan_ranges);
_pipelines[pip_id]->pipeline_profile(), scan_ranges, local_params.sender_id);
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
_runtime_profile->add_child(_pipelines[pip_id]->pipeline_profile(), true, nullptr);
if (pip_id < _pipelines.size() - 1) {
@ -471,7 +472,18 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
cur_pipe = add_pipeline();
DataSinkOperatorXPtr sink;
sink.reset(new AggSinkOperatorX(_sink_idx++, pool, tnode, descs));
sink.reset(new AggSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;
}
case TPlanNodeType::SORT_NODE: {
op.reset(new SortSourceOperatorX(pool, tnode, descs, "SortSourceXOperator"));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
cur_pipe = add_pipeline();
DataSinkOperatorXPtr sink;
sink.reset(new SortSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;

View File

@ -124,8 +124,6 @@ private:
// of it in pipeline task not the fragment_context
DataSinkOperatorXPtr _sink;
size_t _sink_idx = 0;
std::atomic_bool _canceled = false;
};
} // namespace pipeline

View File

@ -32,6 +32,7 @@
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
@ -44,16 +45,16 @@ namespace doris::pipeline {
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context,
RuntimeProfile* parent_profile,
const std::vector<TScanRangeParams>& scan_ranges)
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id)
: PipelineTask(pipeline, index, state, fragment_context, parent_profile),
_scan_ranges(scan_ranges),
_operators(pipeline->operator_xs()),
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()) {
_sink(pipeline->sink_shared_pointer()),
_sender_id(sender_id) {
_pipeline_task_watcher.start();
_sink->get_dependency(_downstream_dependency);
_upstream_dependency.reset((Dependency*)nullptr);
}
Status PipelineXTask::prepare(RuntimeState* state) {
@ -93,15 +94,19 @@ Status PipelineXTask::_open() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_open_timer);
LocalStateInfo info {_scan_ranges, _upstream_dependency.get()};
Status st = Status::OK();
for (auto& o : _operators) {
Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end()
? (Dependency*)nullptr
: _upstream_dependency.find(o->id())->second.get();
LocalStateInfo info {_scan_ranges, dep};
Status cur_st = o->setup_local_state(_state, info);
if (!cur_st.ok()) {
st = cur_st;
}
}
RETURN_IF_ERROR(_sink->setup_local_state(_state, _downstream_dependency.get()));
LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
RETURN_IF_ERROR(st);
_opened = true;
return Status::OK();

View File

@ -50,7 +50,7 @@ class PipelineXTask : public PipelineTask {
public:
PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
const std::vector<TScanRangeParams>& scan_ranges);
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id);
Status prepare(RuntimeState* state) override;
@ -96,10 +96,11 @@ public:
DependencySPtr& get_downstream_dependency() { return _downstream_dependency; }
void set_upstream_dependency(DependencySPtr& upstream_dependency) {
_upstream_dependency = upstream_dependency;
_upstream_dependency.insert({upstream_dependency->id(), upstream_dependency});
}
private:
using DependencyMap = std::map<int, DependencySPtr>;
Status _open() override;
const std::vector<TScanRangeParams> _scan_ranges;
@ -109,7 +110,9 @@ private:
OperatorXPtr _root;
DataSinkOperatorXPtr _sink;
DependencySPtr _upstream_dependency;
const int _sender_id;
DependencyMap _upstream_dependency;
DependencySPtr _downstream_dependency;
};
} // namespace doris::pipeline

View File

@ -417,21 +417,25 @@ int64_t RuntimeState::get_load_mem_limit() {
void RuntimeState::emplace_local_state(
int id, std::shared_ptr<doris::pipeline::PipelineXLocalState> state) {
std::unique_lock<std::mutex> l(_local_state_lock);
_op_id_to_local_state.emplace(id, state);
}
std::shared_ptr<doris::pipeline::PipelineXLocalState> RuntimeState::get_local_state(int id) {
std::unique_lock<std::mutex> l(_local_state_lock);
DCHECK(_op_id_to_local_state.find(id) != _op_id_to_local_state.end());
return _op_id_to_local_state[id];
}
void RuntimeState::emplace_sink_local_state(
int id, std::shared_ptr<doris::pipeline::PipelineXSinkLocalState> state) {
std::unique_lock<std::mutex> l(_local_sink_state_lock);
_op_id_to_sink_local_state.emplace(id, state);
}
std::shared_ptr<doris::pipeline::PipelineXSinkLocalState> RuntimeState::get_sink_local_state(
int id) {
std::unique_lock<std::mutex> l(_local_sink_state_lock);
DCHECK(_op_id_to_sink_local_state.find(id) != _op_id_to_sink_local_state.end());
return _op_id_to_sink_local_state[id];
}

View File

@ -547,6 +547,9 @@ private:
std::map<int, std::shared_ptr<doris::pipeline::PipelineXSinkLocalState>>
_op_id_to_sink_local_state;
std::mutex _local_state_lock;
std::mutex _local_sink_state_lock;
QueryContext* _query_ctx = nullptr;
// true if max_filter_ratio is 0