fix gi split task randomly bug
This commit is contained in:
@ -86,7 +86,8 @@ OB_SERIALIZE_MEMBER(ObPxRpcInitTaskResponse,
|
||||
OB_SERIALIZE_MEMBER(ObPxRpcInitSqcResponse,
|
||||
rc_,
|
||||
reserved_thread_count_,
|
||||
partitions_info_);
|
||||
partitions_info_,
|
||||
sqc_order_gi_tasks_);
|
||||
OB_SERIALIZE_MEMBER(ObSqcTableLocationKey,
|
||||
table_location_key_,
|
||||
ref_table_id_,
|
||||
@ -647,6 +648,7 @@ OB_DEF_SERIALIZE(ObPxRpcInitSqcArgs)
|
||||
}
|
||||
// can reuse cache from now on
|
||||
(const_cast<ObSqcSerializeCache &>(ser_cache_)).cache_serialized_ = ser_cache_.enable_serialize_cache_;
|
||||
LST_DO_CODE(OB_UNIS_ENCODE, qc_order_gi_tasks_);
|
||||
LOG_TRACE("serialize sqc", K_(sqc));
|
||||
LOG_DEBUG("end trace sqc args", K(pos), K(buf_len), K(this->get_serialize_size()));
|
||||
return ret;
|
||||
@ -696,6 +698,7 @@ OB_DEF_SERIALIZE_SIZE(ObPxRpcInitSqcArgs)
|
||||
}
|
||||
// always serialize
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, sqc_);
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, qc_order_gi_tasks_);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
@ -777,6 +780,12 @@ int ObPxRpcInitSqcArgs::do_deserialize(int64_t &pos, const char *net_buf, int64_
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
// if version of qc is old, qc_order_gi_tasks_ will not be serialized and the value will be false.
|
||||
qc_order_gi_tasks_ = false;
|
||||
LST_DO_CODE(OB_UNIS_DECODE, qc_order_gi_tasks_);
|
||||
LOG_TRACE("deserialize qc order gi tasks", K(qc_order_gi_tasks_), K(sqc_), K(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -244,7 +244,8 @@ public:
|
||||
access_external_table_files_(),
|
||||
px_detectable_ids_(),
|
||||
interrupt_by_dm_(false),
|
||||
p2p_dh_map_info_()
|
||||
p2p_dh_map_info_(),
|
||||
sqc_order_gi_tasks_(false)
|
||||
{}
|
||||
~ObPxSqcMeta() = default;
|
||||
int assign(const ObPxSqcMeta &other);
|
||||
@ -363,6 +364,8 @@ public:
|
||||
ObP2PDhMapInfo &get_p2p_dh_map_info() { return p2p_dh_map_info_;};
|
||||
void set_sqc_count(int64_t sqc_cnt) { sqc_count_ = sqc_cnt; }
|
||||
int64_t get_sqc_count() const { return sqc_count_;}
|
||||
void set_sqc_order_gi_tasks(bool v) { sqc_order_gi_tasks_ = v; }
|
||||
bool sqc_order_gi_tasks() const { return sqc_order_gi_tasks_; }
|
||||
ObQCMonitoringInfo &get_monitoring_info() { return monitoring_info_; }
|
||||
const ObQCMonitoringInfo &get_monitoring_info() const { return monitoring_info_; }
|
||||
TO_STRING_KV(K_(need_report), K_(execution_id), K_(qc_id), K_(sqc_id), K_(dfo_id), K_(exec_addr), K_(qc_addr),
|
||||
@ -370,7 +373,8 @@ public:
|
||||
K_(task_count), K_(max_task_count), K_(min_task_count),
|
||||
K_(thread_inited), K_(thread_finish), K_(px_int_id),
|
||||
K_(transmit_use_interm_result),
|
||||
K_(recieve_use_interm_result), K_(serial_receive_channels));
|
||||
K_(recieve_use_interm_result), K_(serial_receive_channels),
|
||||
K_(sqc_order_gi_tasks));
|
||||
private:
|
||||
uint64_t execution_id_;
|
||||
uint64_t qc_id_;
|
||||
@ -438,6 +442,7 @@ private:
|
||||
// for p2p dh msg
|
||||
ObP2PDhMapInfo p2p_dh_map_info_;
|
||||
int64_t sqc_count_;
|
||||
bool sqc_order_gi_tasks_;
|
||||
};
|
||||
|
||||
class ObDfo
|
||||
@ -809,7 +814,8 @@ public:
|
||||
static_engine_root_(nullptr),
|
||||
des_allocator_(NULL),
|
||||
sqc_handler_(NULL),
|
||||
scan_spec_ops_()
|
||||
scan_spec_ops_(),
|
||||
qc_order_gi_tasks_(true)
|
||||
{}
|
||||
~ObPxRpcInitSqcArgs() = default;
|
||||
|
||||
@ -845,6 +851,8 @@ public:
|
||||
ObPxSqcHandler *sqc_handler_;
|
||||
ObSEArray<const ObTableScanSpec*, 8> scan_spec_ops_;
|
||||
ObSqcSerializeCache ser_cache_;
|
||||
// whether qc support order gi tasks. default value is true and set false before deserialize.
|
||||
bool qc_order_gi_tasks_;
|
||||
};
|
||||
|
||||
struct ObPxCleanDtlIntermResInfo
|
||||
@ -1143,13 +1151,15 @@ public:
|
||||
ObPxRpcInitSqcResponse()
|
||||
: rc_(common::OB_NOT_INIT),
|
||||
reserved_thread_count_(0),
|
||||
partitions_info_()
|
||||
partitions_info_(),
|
||||
sqc_order_gi_tasks_(false)
|
||||
{}
|
||||
TO_STRING_KV(K_(rc), K_(reserved_thread_count));
|
||||
public:
|
||||
int rc_;
|
||||
int64_t reserved_thread_count_;
|
||||
ObSEArray<ObPxTabletInfo, 8> partitions_info_;
|
||||
bool sqc_order_gi_tasks_;
|
||||
};
|
||||
|
||||
class ObPxWorkerEnvArgs
|
||||
|
||||
@ -1230,6 +1230,7 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx,
|
||||
pkt.sqc_id_ = sqc.get_sqc_id();
|
||||
pkt.rc_ = resp.rc_;
|
||||
pkt.task_count_ = resp.reserved_thread_count_;
|
||||
pkt.sqc_order_gi_tasks_ = resp.sqc_order_gi_tasks_;
|
||||
if (resp.reserved_thread_count_ < sqc.get_max_task_count()) {
|
||||
LOG_TRACE("SQC don`t have enough thread or thread auto scaling, Downgraded thread allocation",
|
||||
K(resp), K(sqc));
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "share/schema/ob_part_mgr_util.h"
|
||||
#include "sql/engine/dml/ob_table_modify_op.h"
|
||||
#include "sql/engine/ob_engine_op_traits.h"
|
||||
#include "sql/engine/px/ob_px_sqc_handler.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1045,15 +1046,18 @@ int ObAffinitizeGranuleSplitter::split_tasks_affinity(ObExecContext &ctx,
|
||||
int ret = OB_SUCCESS;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
ObPxAffinityByRandom affinitize_rule;
|
||||
ObSQLSessionInfo *my_session = NULL;
|
||||
ObPxTabletInfo partition_row_info;
|
||||
ObTabletIdxMap idx_map;
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx))) {
|
||||
bool qc_order_gi_tasks = false;
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx)) || OB_ISNULL(ctx.get_sqc_handler())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get my session", K(ret));
|
||||
LOG_WARN("fail to get my session", K(ret), K(my_session), K(ctx.get_sqc_handler()));
|
||||
} else {
|
||||
qc_order_gi_tasks = ctx.get_sqc_handler()->get_sqc_init_arg().qc_order_gi_tasks_;
|
||||
}
|
||||
int64_t cur_idx = -1;
|
||||
ObPxAffinityByRandom affinitize_rule(qc_order_gi_tasks);
|
||||
ARRAY_FOREACH_X(taskset.gi_task_set_, idx, cnt, OB_SUCC(ret)) {
|
||||
if (cur_idx != taskset.gi_task_set_.at(idx).idx_) {
|
||||
cur_idx = taskset.gi_task_set_.at(idx).idx_; // get all different parition key in Affinitize
|
||||
@ -1096,6 +1100,7 @@ int ObAffinitizeGranuleSplitter::split_tasks_affinity(ObExecContext &ctx,
|
||||
} else if (OB_FAIL(affinitize_rule.add_partition(tablet_loc.tablet_id_.id(),
|
||||
tablet_idx,
|
||||
parallelism,
|
||||
my_session->get_effective_tenant_id(),
|
||||
partition_row_info))) {
|
||||
LOG_WARN("Failed to get affinitize taskid" , K(ret));
|
||||
}
|
||||
|
||||
@ -360,7 +360,8 @@ public:
|
||||
sqc_id_(common::OB_INVALID_ID),
|
||||
rc_(common::OB_SUCCESS),
|
||||
task_count_(0),
|
||||
err_msg_() {}
|
||||
err_msg_(),
|
||||
sqc_order_gi_tasks_(false) {}
|
||||
virtual ~ObPxInitSqcResultMsg() = default;
|
||||
void reset() {}
|
||||
TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(task_count));
|
||||
@ -372,6 +373,7 @@ public:
|
||||
ObPxUserErrorMsg err_msg_; // for error msg & warning msg
|
||||
// No need to serialize
|
||||
ObSEArray<ObPxTabletInfo, 8> tablets_info_;
|
||||
bool sqc_order_gi_tasks_;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@ -66,7 +66,7 @@ int ObInitSqcP::process()
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_TRACE("receive dfo", K_(arg));
|
||||
ObPxSqcHandler *sqc_handler = arg_.sqc_handler_;
|
||||
|
||||
result_.sqc_order_gi_tasks_ = true;
|
||||
/**
|
||||
* 只要能进process,after process一定会被调用,所以可以用中断覆盖整个
|
||||
* SQC的生命周期。
|
||||
|
||||
@ -184,6 +184,9 @@ int ObPxMsgProc::on_sqc_init_msg(ObExecContext &ctx, const ObPxInitSqcResultMsg
|
||||
} else {
|
||||
sqc->set_task_count(pkt.task_count_);
|
||||
sqc->set_thread_inited(true);
|
||||
sqc->set_sqc_order_gi_tasks(pkt.sqc_order_gi_tasks_);
|
||||
LOG_TRACE("set sqc support_order_gi_tasks", K(sqc->get_dfo_id()), K(sqc->get_sqc_id()),
|
||||
K(pkt.sqc_order_gi_tasks_));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -163,7 +163,6 @@ int ObPxTaskProcess::process()
|
||||
arg_.exec_ctx_->set_sqc_handler(arg_.sqc_handler_);
|
||||
arg_.exec_ctx_->set_px_task_id(arg_.task_.get_task_id());
|
||||
arg_.exec_ctx_->set_px_sqc_id(arg_.task_.get_sqc_id());
|
||||
|
||||
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &max_wait_desc : NULL);
|
||||
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL);
|
||||
|
||||
@ -268,7 +267,7 @@ int ObPxTaskProcess::execute(ObOpSpec &root_spec)
|
||||
need_fill_batch_info = true;
|
||||
}
|
||||
LOG_TRACE("trace run op spec root", K(&ctx), K(ctx.get_frames()),
|
||||
K(batch_count), K(need_fill_batch_info));
|
||||
K(batch_count), K(need_fill_batch_info), K(root_spec.get_id()), K(&(root->get_exec_ctx())));
|
||||
CK(IS_PX_TRANSMIT(root_spec.get_type()));
|
||||
for (int i = 0; i < batch_count && OB_SUCC(ret); ++i) {
|
||||
if (need_fill_batch_info) {
|
||||
|
||||
@ -2607,15 +2607,22 @@ int ObPxChannelUtil::sqcs_channles_asyn_wait(ObIArray<ObPxSqcMeta *> &sqcs)
|
||||
int ObPxAffinityByRandom::add_partition(int64_t tablet_id,
|
||||
int64_t tablet_idx,
|
||||
int64_t worker_cnt,
|
||||
uint64_t tenant_id,
|
||||
ObPxTabletInfo &partition_row_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_TRACE("add partition", K(tablet_id), K(tablet_idx), K(worker_cnt), K(this), K(order_partitions_));
|
||||
if (0 >= worker_cnt) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("The worker cnt is invalid", K(ret), K(worker_cnt));
|
||||
} else {
|
||||
TabletHashValue part_hash_value;
|
||||
part_hash_value.hash_value_ = 0;
|
||||
if (order_partitions_) {
|
||||
part_hash_value.hash_value_ = 0;
|
||||
} else {
|
||||
uint64_t value = (tenant_id << 32 | tablet_idx);
|
||||
part_hash_value.hash_value_ = common::murmurhash(&value, sizeof(value), worker_cnt);
|
||||
}
|
||||
part_hash_value.tablet_idx_ = tablet_idx;
|
||||
part_hash_value.tablet_id_ = tablet_id;
|
||||
part_hash_value.partition_info_ = partition_row_info;
|
||||
@ -2648,23 +2655,25 @@ int ObPxAffinityByRandom::do_random(bool use_partition_info, uint64_t tenant_id)
|
||||
&& (tablet_hash_values_.at(0).tablet_idx_ > tablet_hash_values_.at(1).tablet_idx_)) {
|
||||
asc_order = false;
|
||||
}
|
||||
// in partition wise affinity scenario, partition_idx of a pair of partitions may be different.
|
||||
// for example, T1 consists of p0, p1, p2 and T2 consists of p1, p2
|
||||
// T1.p1 <===> T2.p1 and T1.p2 <===> T2.p2
|
||||
// The partition_idx of T1.p1 is 1 and the partition_idx of T2.p1 is 0.
|
||||
// If we calculate hash value of partition_idx and sort partitions by the hash value,
|
||||
// T1.p1 and T2.p1 may be assigned to different worker.
|
||||
// So we sort partitions by partition_idx and generate a relative_idx which starts from zero.
|
||||
// Then calculate hash value with the relative_idx
|
||||
auto part_idx_compare_fun = [](TabletHashValue a, TabletHashValue b) -> bool { return a.tablet_idx_ > b.tablet_idx_; };
|
||||
std::sort(tablet_hash_values_.begin(),
|
||||
tablet_hash_values_.end(),
|
||||
part_idx_compare_fun);
|
||||
int64_t relative_idx = 0;
|
||||
for (int64_t i = 0; i < tablet_hash_values_.count(); i++) {
|
||||
uint64_t value = ((tenant_id << 32) | relative_idx);
|
||||
tablet_hash_values_.at(i).hash_value_ = common::murmurhash(&value, sizeof(value), worker_cnt_);
|
||||
relative_idx++;
|
||||
if (order_partitions_) {
|
||||
// in partition wise affinity scenario, partition_idx of a pair of partitions may be different.
|
||||
// for example, T1 consists of p0, p1, p2 and T2 consists of p1, p2
|
||||
// T1.p1 <===> T2.p1 and T1.p2 <===> T2.p2
|
||||
// The partition_idx of T1.p1 is 1 and the partition_idx of T2.p1 is 0.
|
||||
// If we calculate hash value of partition_idx and sort partitions by the hash value,
|
||||
// T1.p1 and T2.p1 may be assigned to different worker.
|
||||
// So we sort partitions by partition_idx and generate a relative_idx which starts from zero.
|
||||
// Then calculate hash value with the relative_idx
|
||||
auto part_idx_compare_fun = [](TabletHashValue a, TabletHashValue b) -> bool { return a.tablet_idx_ > b.tablet_idx_; };
|
||||
std::sort(tablet_hash_values_.begin(),
|
||||
tablet_hash_values_.end(),
|
||||
part_idx_compare_fun);
|
||||
int64_t relative_idx = 0;
|
||||
for (int64_t i = 0; i < tablet_hash_values_.count(); i++) {
|
||||
uint64_t value = ((tenant_id << 32) | relative_idx);
|
||||
tablet_hash_values_.at(i).hash_value_ = common::murmurhash(&value, sizeof(value), worker_cnt_);
|
||||
relative_idx++;
|
||||
}
|
||||
}
|
||||
|
||||
// 先打乱所有的序
|
||||
@ -2672,7 +2681,7 @@ int ObPxAffinityByRandom::do_random(bool use_partition_info, uint64_t tenant_id)
|
||||
std::sort(tablet_hash_values_.begin(),
|
||||
tablet_hash_values_.end(),
|
||||
compare_fun);
|
||||
LOG_TRACE("after sort partition_hash_values randomly", K(tablet_hash_values_));
|
||||
LOG_TRACE("after sort partition_hash_values randomly", K(tablet_hash_values_), K(this), K(order_partitions_));
|
||||
|
||||
// 如果没有partition的统计信息则将它们round放置
|
||||
if (!use_partition_info) {
|
||||
@ -3383,8 +3392,9 @@ int ObSlaveMapUtil::build_ppwj_ch_mn_map(ObExecContext &ctx, ObDfo &parent, ObDf
|
||||
DASTabletLocArray locations;
|
||||
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
|
||||
// 所有的affinitize计算都是SQC局部,不是全局的。
|
||||
ObPxAffinityByRandom affinitize_rule;
|
||||
ObPxSqcMeta &sqc = *sqcs.at(idx);
|
||||
ObPxAffinityByRandom affinitize_rule(sqc.sqc_order_gi_tasks());
|
||||
LOG_TRACE("build ppwj_ch_mn_map", K(sqc));
|
||||
ObPxTabletInfo partition_row_info;
|
||||
locations.reset();
|
||||
if (OB_FAIL(get_pkey_table_locations(child.get_pkey_table_loc_id(), sqc, locations))) {
|
||||
@ -3423,6 +3433,7 @@ int ObSlaveMapUtil::build_ppwj_ch_mn_map(ObExecContext &ctx, ObDfo &parent, ObDf
|
||||
} else if (OB_FAIL(affinitize_rule.add_partition(location.tablet_id_.id(),
|
||||
tablet_idx,
|
||||
sqc.get_task_count(),
|
||||
ctx.get_my_session()->get_effective_tenant_id(),
|
||||
partition_row_info))) {
|
||||
LOG_WARN("fail calc task_id", K(location.tablet_id_), K(sqc), K(ret));
|
||||
}
|
||||
|
||||
@ -436,13 +436,14 @@ public:
|
||||
TO_STRING_KV(K_(tablet_id), K_(tablet_idx), K_(hash_value), K_(worker_id), K_(partition_info));
|
||||
};
|
||||
public:
|
||||
ObPxAffinityByRandom() :
|
||||
worker_cnt_(0), tablet_hash_values_() {}
|
||||
ObPxAffinityByRandom(bool order_partitions) :
|
||||
worker_cnt_(0), tablet_hash_values_(), order_partitions_(order_partitions) {}
|
||||
virtual ~ObPxAffinityByRandom() = default;
|
||||
int reserve(int64_t size) { return tablet_hash_values_.reserve(size); }
|
||||
int add_partition(int64_t tablet_id,
|
||||
int64_t tablet_idx,
|
||||
int64_t worker_cnt,
|
||||
uint64_t tenant_id,
|
||||
ObPxTabletInfo &partition_row_info);
|
||||
int do_random(bool use_partition_info, uint64_t tenant_id);
|
||||
const ObIArray<TabletHashValue> &get_result() { return tablet_hash_values_; }
|
||||
@ -450,6 +451,7 @@ public:
|
||||
private:
|
||||
int64_t worker_cnt_;
|
||||
ObSEArray<TabletHashValue, 8> tablet_hash_values_;
|
||||
bool order_partitions_;
|
||||
};
|
||||
|
||||
class ObSlaveMapUtil
|
||||
|
||||
@ -49,10 +49,10 @@ TEST_F(ObRandomAffiTaskSplitTest, split_task_test) {
|
||||
int64_t parallel = 3;
|
||||
int64_t tenant_id = 1;
|
||||
ObPxTabletInfo px_part_info;
|
||||
ObPxAffinityByRandom affinitize_rule;
|
||||
ObPxAffinityByRandom affinitize_rule(true);
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
px_part_info.physical_row_count_ = (10 - i) * 100;
|
||||
affinitize_rule.add_partition(i,i,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(i,i,parallel,tenant_id,px_part_info);
|
||||
}
|
||||
affinitize_rule.do_random(true, tenant_id);
|
||||
const common::ObIArray<ObPxAffinityByRandom::TabletHashValue>& result = affinitize_rule.get_result();
|
||||
@ -70,18 +70,18 @@ TEST_F(ObRandomAffiTaskSplitTest, split_task_test) {
|
||||
int64_t parallel = 16;
|
||||
int64_t tenant_id = 1;
|
||||
ObPxTabletInfo px_part_info;
|
||||
ObPxAffinityByRandom affinitize_rule;
|
||||
ObPxAffinityByRandom affinitize_rule(true);
|
||||
|
||||
px_part_info.physical_row_count_ = 3000;
|
||||
affinitize_rule.add_partition(0,0,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(0,0,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 1000;
|
||||
affinitize_rule.add_partition(1,1,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(1,1,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 2500;
|
||||
affinitize_rule.add_partition(2,2,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(2,2,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 3500;
|
||||
affinitize_rule.add_partition(3,3,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(3,3,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 2000;
|
||||
affinitize_rule.add_partition(4,4,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(4,4,parallel,tenant_id,px_part_info);
|
||||
|
||||
affinitize_rule.do_random(true, tenant_id);
|
||||
|
||||
@ -100,18 +100,18 @@ TEST_F(ObRandomAffiTaskSplitTest, split_task_test) {
|
||||
int64_t parallel = 3;
|
||||
int64_t tenant_id = 1;
|
||||
ObPxTabletInfo px_part_info;
|
||||
ObPxAffinityByRandom affinitize_rule;
|
||||
ObPxAffinityByRandom affinitize_rule(true);
|
||||
|
||||
px_part_info.physical_row_count_ = 3000;
|
||||
affinitize_rule.add_partition(0,0,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(0,0,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 4000;
|
||||
affinitize_rule.add_partition(1,1,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(1,1,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 2500;
|
||||
affinitize_rule.add_partition(2,2,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(2,2,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 1500;
|
||||
affinitize_rule.add_partition(3,3,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(3,3,parallel,tenant_id,px_part_info);
|
||||
px_part_info.physical_row_count_ = 2000;
|
||||
affinitize_rule.add_partition(4,4,parallel,px_part_info);
|
||||
affinitize_rule.add_partition(4,4,parallel,tenant_id,px_part_info);
|
||||
|
||||
affinitize_rule.do_random(true, tenant_id);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user