fix bug:fill in the correct tenant data version for direct_load
This commit is contained in:
@ -194,15 +194,19 @@ int ObTableLoadBeginP::create_table_ctx(const ObTableLoadParam ¶m,
|
|||||||
// start redef table
|
// start redef table
|
||||||
ObTableLoadRedefTableStartArg start_arg;
|
ObTableLoadRedefTableStartArg start_arg;
|
||||||
ObTableLoadRedefTableStartRes start_res;
|
ObTableLoadRedefTableStartRes start_res;
|
||||||
|
uint64_t data_version = 0;
|
||||||
start_arg.tenant_id_ = param.tenant_id_;
|
start_arg.tenant_id_ = param.tenant_id_;
|
||||||
start_arg.table_id_ = param.table_id_;
|
start_arg.table_id_ = param.table_id_;
|
||||||
start_arg.parallelism_ = param.session_count_;
|
start_arg.parallelism_ = param.session_count_;
|
||||||
if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, session_info))) {
|
if (OB_FAIL(GET_MIN_DATA_VERSION(param.tenant_id_, data_version))) {
|
||||||
|
LOG_WARN("fail to get tenant data version", KR(ret));
|
||||||
|
} else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, session_info))) {
|
||||||
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
|
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
|
||||||
} else {
|
} else {
|
||||||
ddl_param.dest_table_id_ = start_res.dest_table_id_;
|
ddl_param.dest_table_id_ = start_res.dest_table_id_;
|
||||||
ddl_param.task_id_ = start_res.task_id_;
|
ddl_param.task_id_ = start_res.task_id_;
|
||||||
ddl_param.schema_version_ = start_res.schema_version_;
|
ddl_param.schema_version_ = start_res.schema_version_;
|
||||||
|
ddl_param.data_version_ = data_version;
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
|
||||||
@ -271,9 +275,11 @@ int ObTableLoadPreBeginPeerP::process()
|
|||||||
param.online_opt_stat_gather_ = arg_.online_opt_stat_gather_;
|
param.online_opt_stat_gather_ = arg_.online_opt_stat_gather_;
|
||||||
param.dup_action_ = arg_.dup_action_;
|
param.dup_action_ = arg_.dup_action_;
|
||||||
ObTableLoadDDLParam ddl_param;
|
ObTableLoadDDLParam ddl_param;
|
||||||
|
uint64_t data_version = 0;
|
||||||
ddl_param.dest_table_id_ = arg_.dest_table_id_;
|
ddl_param.dest_table_id_ = arg_.dest_table_id_;
|
||||||
ddl_param.task_id_ = arg_.task_id_;
|
ddl_param.task_id_ = arg_.task_id_;
|
||||||
ddl_param.schema_version_ = arg_.schema_version_;
|
ddl_param.schema_version_ = arg_.schema_version_;
|
||||||
|
ddl_param.data_version_ = arg_.data_version_;
|
||||||
if (OB_FAIL(create_table_ctx(param, ddl_param, table_ctx))) {
|
if (OB_FAIL(create_table_ctx(param, ddl_param, table_ctx))) {
|
||||||
LOG_WARN("fail to create table ctx", KR(ret));
|
LOG_WARN("fail to create table ctx", KR(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -228,6 +228,7 @@ int ObTableLoadCoordinator::pre_begin_peers()
|
|||||||
request.dest_table_id_ = ctx_->ddl_param_.dest_table_id_;
|
request.dest_table_id_ = ctx_->ddl_param_.dest_table_id_;
|
||||||
request.task_id_ = ctx_->ddl_param_.task_id_;
|
request.task_id_ = ctx_->ddl_param_.task_id_;
|
||||||
request.schema_version_ = ctx_->ddl_param_.schema_version_;
|
request.schema_version_ = ctx_->ddl_param_.schema_version_;
|
||||||
|
request.data_version_ = ctx_->ddl_param_.data_version_;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
|
||||||
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
|
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
|
||||||
const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info = target_all_leader_info_array.at(i);
|
const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info = target_all_leader_info_array.at(i);
|
||||||
|
|||||||
@ -101,15 +101,19 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam ¶m,
|
|||||||
// start redef table
|
// start redef table
|
||||||
ObTableLoadRedefTableStartArg start_arg;
|
ObTableLoadRedefTableStartArg start_arg;
|
||||||
ObTableLoadRedefTableStartRes start_res;
|
ObTableLoadRedefTableStartRes start_res;
|
||||||
|
uint64_t data_version = 0;
|
||||||
start_arg.tenant_id_ = param.tenant_id_;
|
start_arg.tenant_id_ = param.tenant_id_;
|
||||||
start_arg.table_id_ = param.table_id_;
|
start_arg.table_id_ = param.table_id_;
|
||||||
start_arg.parallelism_ = param.session_count_;
|
start_arg.parallelism_ = param.session_count_;
|
||||||
if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *session_info_))) {
|
if (OB_FAIL(GET_MIN_DATA_VERSION(param.tenant_id_, data_version))) {
|
||||||
|
LOG_WARN("fail to get tenant data version", KR(ret));
|
||||||
|
} else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *session_info_))) {
|
||||||
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
|
LOG_WARN("fail to start redef table", KR(ret), K(start_arg));
|
||||||
} else {
|
} else {
|
||||||
ddl_param.dest_table_id_ = start_res.dest_table_id_;
|
ddl_param.dest_table_id_ = start_res.dest_table_id_;
|
||||||
ddl_param.task_id_ = start_res.task_id_;
|
ddl_param.task_id_ = start_res.task_id_;
|
||||||
ddl_param.schema_version_ = start_res.schema_version_;
|
ddl_param.schema_version_ = start_res.schema_version_;
|
||||||
|
ddl_param.data_version_ = data_version;
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
|
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
|
||||||
|
|||||||
@ -75,6 +75,7 @@ int ObTableLoadStoreCtx::init(
|
|||||||
insert_table_param.snapshot_version_ = ObTimeUtil::current_time_ns();
|
insert_table_param.snapshot_version_ = ObTimeUtil::current_time_ns();
|
||||||
insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_;
|
insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_;
|
||||||
insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了
|
insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了
|
||||||
|
insert_table_param.data_version_ = ctx_->ddl_param_.data_version_;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < partition_id_array.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < partition_id_array.count(); ++i) {
|
||||||
const ObLSID &ls_id = partition_id_array[i].ls_id_;
|
const ObLSID &ls_id = partition_id_array[i].ls_id_;
|
||||||
const ObTableLoadPartitionId &part_tablet_id = partition_id_array[i].part_tablet_id_;
|
const ObTableLoadPartitionId &part_tablet_id = partition_id_array[i].part_tablet_id_;
|
||||||
|
|||||||
@ -152,22 +152,27 @@ public:
|
|||||||
struct ObTableLoadDDLParam
|
struct ObTableLoadDDLParam
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTableLoadDDLParam() : dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0) {}
|
ObTableLoadDDLParam()
|
||||||
|
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0), data_version_(0) {}
|
||||||
void reset()
|
void reset()
|
||||||
{
|
{
|
||||||
dest_table_id_ = common::OB_INVALID_ID;
|
dest_table_id_ = common::OB_INVALID_ID;
|
||||||
task_id_ = 0;
|
task_id_ = 0;
|
||||||
schema_version_ = 0;
|
schema_version_ = 0;
|
||||||
|
data_version_ = 0;
|
||||||
}
|
}
|
||||||
bool is_valid() const
|
bool is_valid() const
|
||||||
{
|
{
|
||||||
return common::OB_INVALID_ID != dest_table_id_ && 0 != task_id_ && 0 != schema_version_;
|
return common::OB_INVALID_ID != dest_table_id_ && 0 != task_id_ && 0 != schema_version_ &&
|
||||||
|
0 != data_version_;
|
||||||
}
|
}
|
||||||
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version));
|
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(data_version));
|
||||||
|
|
||||||
public:
|
public:
|
||||||
uint64_t dest_table_id_;
|
uint64_t dest_table_id_;
|
||||||
int64_t task_id_;
|
int64_t task_id_;
|
||||||
int64_t schema_version_;
|
int64_t schema_version_;
|
||||||
|
int64_t data_version_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObTableLoadMutexGuard
|
class ObTableLoadMutexGuard
|
||||||
|
|||||||
@ -41,6 +41,7 @@ OB_SERIALIZE_MEMBER(ObTableLoadPreBeginPeerRequest,
|
|||||||
dest_table_id_,
|
dest_table_id_,
|
||||||
task_id_,
|
task_id_,
|
||||||
schema_version_,
|
schema_version_,
|
||||||
|
data_version_,
|
||||||
partition_id_array_,
|
partition_id_array_,
|
||||||
target_partition_id_array_);
|
target_partition_id_array_);
|
||||||
|
|
||||||
|
|||||||
@ -66,7 +66,8 @@ public:
|
|||||||
online_opt_stat_gather_(false),
|
online_opt_stat_gather_(false),
|
||||||
dest_table_id_(common::OB_INVALID_ID),
|
dest_table_id_(common::OB_INVALID_ID),
|
||||||
task_id_(0),
|
task_id_(0),
|
||||||
schema_version_(0)
|
schema_version_(0),
|
||||||
|
data_version_(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
TO_STRING_KV(K_(table_id),
|
TO_STRING_KV(K_(table_id),
|
||||||
@ -78,6 +79,7 @@ public:
|
|||||||
K_(dest_table_id),
|
K_(dest_table_id),
|
||||||
K_(task_id),
|
K_(task_id),
|
||||||
K_(schema_version),
|
K_(schema_version),
|
||||||
|
K_(data_version),
|
||||||
K_(partition_id_array),
|
K_(partition_id_array),
|
||||||
K_(target_partition_id_array));
|
K_(target_partition_id_array));
|
||||||
public:
|
public:
|
||||||
@ -92,6 +94,7 @@ public:
|
|||||||
uint64_t dest_table_id_;
|
uint64_t dest_table_id_;
|
||||||
int64_t task_id_;
|
int64_t task_id_;
|
||||||
int64_t schema_version_;
|
int64_t schema_version_;
|
||||||
|
int64_t data_version_;
|
||||||
// partition info
|
// partition info
|
||||||
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> partition_id_array_;//orig table
|
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> partition_id_array_;//orig table
|
||||||
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> target_partition_id_array_;//FIXME: target table
|
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> target_partition_id_array_;//FIXME: target table
|
||||||
|
|||||||
@ -91,7 +91,7 @@ int ObDirectLoadInsertTableContext::init(const ObDirectLoadInsertTableParam &par
|
|||||||
table_insert_param.write_major_ = true;
|
table_insert_param.write_major_ = true;
|
||||||
table_insert_param.execution_id_ = param.execution_id_;
|
table_insert_param.execution_id_ = param.execution_id_;
|
||||||
table_insert_param.ddl_task_id_ = param.ddl_task_id_;
|
table_insert_param.ddl_task_id_ = param.ddl_task_id_;
|
||||||
table_insert_param.data_format_version_ = 1;
|
table_insert_param.data_format_version_ = param.data_version_;
|
||||||
for (int64_t i = 0; i < param.ls_partition_ids_.count(); ++i) {
|
for (int64_t i = 0; i < param.ls_partition_ids_.count(); ++i) {
|
||||||
const ObTableLoadLSIdAndPartitionId &ls_partition_id = param.ls_partition_ids_.at(i);
|
const ObTableLoadLSIdAndPartitionId &ls_partition_id = param.ls_partition_ids_.at(i);
|
||||||
if (OB_FAIL(table_insert_param.ls_tablet_ids_.push_back(
|
if (OB_FAIL(table_insert_param.ls_tablet_ids_.push_back(
|
||||||
|
|||||||
@ -27,6 +27,7 @@ public:
|
|||||||
int64_t snapshot_version_;
|
int64_t snapshot_version_;
|
||||||
int64_t execution_id_;
|
int64_t execution_id_;
|
||||||
int64_t ddl_task_id_;
|
int64_t ddl_task_id_;
|
||||||
|
int64_t data_version_;
|
||||||
common::ObArray<table::ObTableLoadLSIdAndPartitionId> ls_partition_ids_;
|
common::ObArray<table::ObTableLoadLSIdAndPartitionId> ls_partition_ids_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user