diff --git a/src/sql/engine/px/exchange/ob_px_dist_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_dist_transmit_op.cpp index ca078e7dbc..8e90fd8824 100644 --- a/src/sql/engine/px/exchange/ob_px_dist_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_dist_transmit_op.cpp @@ -418,7 +418,8 @@ int ObPxDistTransmitOp::do_range_dist() } } if (OB_SUCC(ret)) { - range = ctx_.get_partition_ranges().empty() ? NULL : &ctx_.get_partition_ranges().at(0); + ObPxSqcHandler *handler = ctx_.get_sqc_handler(); + range = handler->get_partition_ranges().empty() ? NULL : &handler->get_partition_ranges().at(0); ObRangeSliceIdCalc slice_id_calc(ctx_.get_allocator(), task_channels_.count(), range, &MY_SPEC.dist_exprs_, MY_SPEC.sort_cmp_funs_, MY_SPEC.sort_collations_); if (ObPxSampleType::OBJECT_SAMPLE == MY_SPEC.sample_type_) { diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index 5ec67fef3a..fbc99c1f7c 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -1399,10 +1399,11 @@ int ObPxTransmitOp::do_datahub_dynamic_sample(int64_t op_id, ObDynamicSamplePiec } else if (OB_ISNULL(temp_whole_msg)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("whole msg is unexpected", K(ret)); - } else if (OB_FAIL(ctx_.set_partition_ranges(temp_whole_msg->part_ranges_))) { - LOG_WARN("set partition ranges failed", K(ret), K(*temp_whole_msg)); + } else if (OB_FAIL(handler->set_partition_ranges(temp_whole_msg->part_ranges_))) { + LOG_WARN("set partition ranges failed", K(ret), K(piece_msg), K(*temp_whole_msg)); } else { - LOG_INFO("dynamic sample succ", K(ret), K(piece_msg), K(*temp_whole_msg), K(ctx_.get_partition_ranges())); + LOG_INFO("dynamic sample succ", K(ret), K(piece_msg), K(*temp_whole_msg), + K(handler->get_partition_ranges())); } } return ret; diff --git a/src/sql/engine/px/ob_px_sqc_handler.cpp b/src/sql/engine/px/ob_px_sqc_handler.cpp index a2e0704e95..3f604cae64 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.cpp +++ b/src/sql/engine/px/ob_px_sqc_handler.cpp @@ -218,6 +218,7 @@ void ObPxSqcHandler::reset() process_flags_ = 0; end_ret_ = OB_SUCCESS; reference_count_ = 1; + part_ranges_.reset(); call_dtor(sub_coord_); call_dtor(sqc_init_args_); call_dtor(des_phy_plan_); @@ -437,5 +438,41 @@ int ObPxSqcHandler::thread_count_auto_scaling(int64_t &reserved_px_thread_count) return ret; } +int ObPxSqcHandler::set_partition_ranges(const Ob2DArray &part_ranges, + char *buf, int64_t size) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(part_ranges.count() <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("part ranges is empty", K(ret), K(part_ranges.count())); + } else { + bool part_ranges_empty = false; + { + SpinRLockGuard rlock_guard(part_ranges_spin_lock_); + part_ranges_empty = part_ranges_.empty(); + } + // If the worker thread has already set the part_ranges_; + // there is no need to repeat the setup. + if (part_ranges_empty) { + SpinWLockGuard wlock_guard(part_ranges_spin_lock_); + if (part_ranges_.empty()) { + int64_t pos = 0; + ObPxTabletRange tmp_range; + for (int64_t i = 0; OB_SUCC(ret) && i < part_ranges.count(); ++i) { + const ObPxTabletRange &cur_range = part_ranges.at(i); + if (0 == size && OB_FAIL(tmp_range.deep_copy_from(cur_range, get_safe_allocator(), buf, size, pos))) { + LOG_WARN("deep copy partition range failed", K(ret), K(cur_range)); + } else if (0 != size && OB_FAIL(tmp_range.deep_copy_from(cur_range, get_safe_allocator(), buf, size, pos))) { + LOG_WARN("deep copy partition range failed", K(ret), K(cur_range)); + } else if (OB_FAIL(part_ranges_.push_back(tmp_range))) { + LOG_WARN("push back partition range failed", K(ret), K(tmp_range)); + } + } + } + } + } + return ret; +} + } // sql } // oceanbase diff --git a/src/sql/engine/px/ob_px_sqc_handler.h b/src/sql/engine/px/ob_px_sqc_handler.h index 4bb60aadde..c9d6513880 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.h +++ b/src/sql/engine/px/ob_px_sqc_handler.h @@ -64,7 +64,8 @@ public: mem_context_(NULL), tenant_id_(UINT64_MAX), reserved_px_thread_count_(0), process_flags_(0), end_ret_(OB_SUCCESS), reference_count_(1), notifier_(nullptr), exec_ctx_(nullptr), des_phy_plan_(nullptr), sqc_init_args_(nullptr), sub_coord_(nullptr), rpc_level_(INT32_MAX), - node_sequence_id_(0), has_interrupted_(false) { + node_sequence_id_(0), has_interrupted_(false), + part_ranges_spin_lock_(common::ObLatchIds::PX_TENANT_TARGET_LOCK) { } ~ObPxSqcHandler() = default; static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER; @@ -127,6 +128,9 @@ public: void set_node_sequence_id(uint64_t node_sequence_id) { node_sequence_id_ = node_sequence_id; } int thread_count_auto_scaling(int64_t &reserved_px_thread_count); bool has_interrupted() const { return has_interrupted_; } + const Ob2DArray &get_partition_ranges() const { return part_ranges_; } + int set_partition_ranges(const Ob2DArray &part_ranges, + char *buf = NULL, int64_t max_size = 0); TO_STRING_KV(K_(tenant_id), K_(reserved_px_thread_count), KP_(notifier), K_(exec_ctx), K_(des_phy_plan), K_(sqc_init_args), KP_(sub_coord), K_(rpc_level)); @@ -158,6 +162,8 @@ private: * 2. worker register interruption first, then check has_interrupted_, skip execution if has_interrupted_ = true. */ bool has_interrupted_; + Ob2DArray part_ranges_; + SpinRWLock part_ranges_spin_lock_; }; } diff --git a/src/sql/executor/ob_slice_calc.cpp b/src/sql/executor/ob_slice_calc.cpp index 6e5915d3c6..124dc2b804 100644 --- a/src/sql/executor/ob_slice_calc.cpp +++ b/src/sql/executor/ob_slice_calc.cpp @@ -21,6 +21,7 @@ #include "common/row/ob_row.h" #include "lib/ob_define.h" #include "share/schema/ob_part_mgr_util.h" +#include "sql/engine/px/ob_px_sqc_handler.h" using namespace oceanbase::sql; using namespace oceanbase::common; @@ -1269,7 +1270,8 @@ int ObSlaveMapPkeyRangeIdxCalc::build_partition_range_channel_map( { int ret = OB_SUCCESS; part_range_channel_map.destroy(); - const Ob2DArray &part_ranges = exec_ctx_.get_partition_ranges(); + ObPxSqcHandler *handler = exec_ctx_.get_sqc_handler(); + const Ob2DArray &part_ranges = handler->get_partition_ranges(); if (OB_FAIL(part_range_channel_map.create(DEFAULT_PARTITION_COUNT, common::ObModIds::OB_SQL_PX))) { LOG_WARN("create part range map failed", K(ret)); } else {