PX memory inflation during the creation of index scenarios.
This commit is contained in:
@ -418,7 +418,8 @@ int ObPxDistTransmitOp::do_range_dist()
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
range = ctx_.get_partition_ranges().empty() ? NULL : &ctx_.get_partition_ranges().at(0);
|
||||
ObPxSqcHandler *handler = ctx_.get_sqc_handler();
|
||||
range = handler->get_partition_ranges().empty() ? NULL : &handler->get_partition_ranges().at(0);
|
||||
ObRangeSliceIdCalc slice_id_calc(ctx_.get_allocator(), task_channels_.count(),
|
||||
range, &MY_SPEC.dist_exprs_, MY_SPEC.sort_cmp_funs_, MY_SPEC.sort_collations_);
|
||||
if (ObPxSampleType::OBJECT_SAMPLE == MY_SPEC.sample_type_) {
|
||||
|
||||
@ -1399,10 +1399,11 @@ int ObPxTransmitOp::do_datahub_dynamic_sample(int64_t op_id, ObDynamicSamplePiec
|
||||
} else if (OB_ISNULL(temp_whole_msg)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("whole msg is unexpected", K(ret));
|
||||
} else if (OB_FAIL(ctx_.set_partition_ranges(temp_whole_msg->part_ranges_))) {
|
||||
LOG_WARN("set partition ranges failed", K(ret), K(*temp_whole_msg));
|
||||
} else if (OB_FAIL(handler->set_partition_ranges(temp_whole_msg->part_ranges_))) {
|
||||
LOG_WARN("set partition ranges failed", K(ret), K(piece_msg), K(*temp_whole_msg));
|
||||
} else {
|
||||
LOG_INFO("dynamic sample succ", K(ret), K(piece_msg), K(*temp_whole_msg), K(ctx_.get_partition_ranges()));
|
||||
LOG_INFO("dynamic sample succ", K(ret), K(piece_msg), K(*temp_whole_msg),
|
||||
K(handler->get_partition_ranges()));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -218,6 +218,7 @@ void ObPxSqcHandler::reset()
|
||||
process_flags_ = 0;
|
||||
end_ret_ = OB_SUCCESS;
|
||||
reference_count_ = 1;
|
||||
part_ranges_.reset();
|
||||
call_dtor(sub_coord_);
|
||||
call_dtor(sqc_init_args_);
|
||||
call_dtor(des_phy_plan_);
|
||||
@ -437,5 +438,41 @@ int ObPxSqcHandler::thread_count_auto_scaling(int64_t &reserved_px_thread_count)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPxSqcHandler::set_partition_ranges(const Ob2DArray<ObPxTabletRange> &part_ranges,
|
||||
char *buf, int64_t size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(part_ranges.count() <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("part ranges is empty", K(ret), K(part_ranges.count()));
|
||||
} else {
|
||||
bool part_ranges_empty = false;
|
||||
{
|
||||
SpinRLockGuard rlock_guard(part_ranges_spin_lock_);
|
||||
part_ranges_empty = part_ranges_.empty();
|
||||
}
|
||||
// If the worker thread has already set the part_ranges_;
|
||||
// there is no need to repeat the setup.
|
||||
if (part_ranges_empty) {
|
||||
SpinWLockGuard wlock_guard(part_ranges_spin_lock_);
|
||||
if (part_ranges_.empty()) {
|
||||
int64_t pos = 0;
|
||||
ObPxTabletRange tmp_range;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < part_ranges.count(); ++i) {
|
||||
const ObPxTabletRange &cur_range = part_ranges.at(i);
|
||||
if (0 == size && OB_FAIL(tmp_range.deep_copy_from<true>(cur_range, get_safe_allocator(), buf, size, pos))) {
|
||||
LOG_WARN("deep copy partition range failed", K(ret), K(cur_range));
|
||||
} else if (0 != size && OB_FAIL(tmp_range.deep_copy_from<false>(cur_range, get_safe_allocator(), buf, size, pos))) {
|
||||
LOG_WARN("deep copy partition range failed", K(ret), K(cur_range));
|
||||
} else if (OB_FAIL(part_ranges_.push_back(tmp_range))) {
|
||||
LOG_WARN("push back partition range failed", K(ret), K(tmp_range));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // sql
|
||||
} // oceanbase
|
||||
|
||||
@ -64,7 +64,8 @@ public:
|
||||
mem_context_(NULL), tenant_id_(UINT64_MAX), reserved_px_thread_count_(0), process_flags_(0),
|
||||
end_ret_(OB_SUCCESS), reference_count_(1), notifier_(nullptr), exec_ctx_(nullptr),
|
||||
des_phy_plan_(nullptr), sqc_init_args_(nullptr), sub_coord_(nullptr), rpc_level_(INT32_MAX),
|
||||
node_sequence_id_(0), has_interrupted_(false) {
|
||||
node_sequence_id_(0), has_interrupted_(false),
|
||||
part_ranges_spin_lock_(common::ObLatchIds::PX_TENANT_TARGET_LOCK) {
|
||||
}
|
||||
~ObPxSqcHandler() = default;
|
||||
static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER;
|
||||
@ -127,6 +128,9 @@ public:
|
||||
void set_node_sequence_id(uint64_t node_sequence_id) { node_sequence_id_ = node_sequence_id; }
|
||||
int thread_count_auto_scaling(int64_t &reserved_px_thread_count);
|
||||
bool has_interrupted() const { return has_interrupted_; }
|
||||
const Ob2DArray<ObPxTabletRange> &get_partition_ranges() const { return part_ranges_; }
|
||||
int set_partition_ranges(const Ob2DArray<ObPxTabletRange> &part_ranges,
|
||||
char *buf = NULL, int64_t max_size = 0);
|
||||
TO_STRING_KV(K_(tenant_id), K_(reserved_px_thread_count), KP_(notifier),
|
||||
K_(exec_ctx), K_(des_phy_plan), K_(sqc_init_args), KP_(sub_coord), K_(rpc_level));
|
||||
|
||||
@ -158,6 +162,8 @@ private:
|
||||
* 2. worker register interruption first, then check has_interrupted_, skip execution if has_interrupted_ = true.
|
||||
*/
|
||||
bool has_interrupted_;
|
||||
Ob2DArray<ObPxTabletRange> part_ranges_;
|
||||
SpinRWLock part_ranges_spin_lock_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "common/row/ob_row.h"
|
||||
#include "lib/ob_define.h"
|
||||
#include "share/schema/ob_part_mgr_util.h"
|
||||
#include "sql/engine/px/ob_px_sqc_handler.h"
|
||||
|
||||
using namespace oceanbase::sql;
|
||||
using namespace oceanbase::common;
|
||||
@ -1269,7 +1270,8 @@ int ObSlaveMapPkeyRangeIdxCalc::build_partition_range_channel_map(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
part_range_channel_map.destroy();
|
||||
const Ob2DArray<ObPxTabletRange> &part_ranges = exec_ctx_.get_partition_ranges();
|
||||
ObPxSqcHandler *handler = exec_ctx_.get_sqc_handler();
|
||||
const Ob2DArray<ObPxTabletRange> &part_ranges = handler->get_partition_ranges();
|
||||
if (OB_FAIL(part_range_channel_map.create(DEFAULT_PARTITION_COUNT, common::ObModIds::OB_SQL_PX))) {
|
||||
LOG_WARN("create part range map failed", K(ret));
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user