diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 1cecd56a82..f0229431b7 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -578,8 +578,6 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( for (int id : read_params.topn_filter_source_node_ids) { auto& runtime_predicate = read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id); - DCHECK(runtime_predicate.inited()) - << "runtime predicate not inited, source_node_id=" << id; runtime_predicate.set_tablet_schema(_tablet_schema); } } diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 657dd7d62d..687332e1ae 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -154,13 +154,15 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size()); RETURN_IF_CANCELLED(state); - // update runtime predicate if (_use_topn_opt) { - vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); - if (!new_top.is_null() && new_top != local_state.old_top) { - auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top)); - local_state.old_top = std::move(new_top); + auto& predicate = state->get_query_ctx()->get_runtime_predicate(_node_id); + if (predicate.need_update()) { + vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); + if (!new_top.is_null() && new_top != local_state.old_top) { + auto* query_ctx = state->get_query_ctx(); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top)); + local_state.old_top = std::move(new_top); + } } } if (!_reuse_mem) { diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index fcfc9db702..4975b03720 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -46,13 +46,19 @@ public: Status init(PrimitiveType type, bool nulls_first, bool is_asc, const std::string& col_name); - bool inited() { - std::unique_lock wlock(_rwlock); + bool inited() const { + std::shared_lock rlock(_rwlock); return _inited; } + bool need_update() const { + std::shared_lock rlock(_rwlock); + return _inited && _tablet_schema; + } + void set_tablet_schema(TabletSchemaSPtr tablet_schema) { std::unique_lock wlock(_rwlock); + // when sort node and scan node are not in the same backend, predicate will not be initialized if (_tablet_schema || !_inited) { return; } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index b142f01169..160690f773 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -141,13 +141,15 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool RETURN_IF_ERROR(_sorter->append_block(input_block)); RETURN_IF_CANCELLED(state); - // update runtime predicate if (_use_topn_opt) { - Field new_top = _sorter->get_top_value(); - if (!new_top.is_null() && new_top != old_top) { - auto* query_ctx = state->get_query_ctx(); - RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top)); - old_top = std::move(new_top); + auto& predicate = state->get_query_ctx()->get_runtime_predicate(_id); + if (predicate.need_update()) { + vectorized::Field new_top = _sorter->get_top_value(); + if (!new_top.is_null() && new_top != old_top) { + auto* query_ctx = state->get_query_ctx(); + RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top)); + old_top = std::move(new_top); + } } } if (!_reuse_mem) {