fix parallel drop column checksum error
This commit is contained in:
committed by
wangzelin.wzl
parent
3b348dd951
commit
594d885e2c
@ -53,6 +53,8 @@ int ObDDLSingleReplicaExecutor::build(const ObDDLSingleReplicaExecutorParam &par
|
|||||||
build_info_tmp.stat_ = ObPartitionBuildStat::BUILD_INIT;
|
build_info_tmp.stat_ = ObPartitionBuildStat::BUILD_INIT;
|
||||||
if (OB_FAIL(build_infos.push_back(build_info_tmp))) {
|
if (OB_FAIL(build_infos.push_back(build_info_tmp))) {
|
||||||
LOG_WARN("fail to push back build info", K(ret));
|
LOG_WARN("fail to push back build info", K(ret));
|
||||||
|
} else if (OB_FAIL(tablet_task_ids_.push_back(i + 1))) {
|
||||||
|
LOG_WARN("fail to push tablet task id", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // timeout, need reset task status
|
} else { // timeout, need reset task status
|
||||||
@ -115,6 +117,7 @@ int ObDDLSingleReplicaExecutor::schedule_task()
|
|||||||
arg.task_id_ = task_id_;
|
arg.task_id_ = task_id_;
|
||||||
arg.parallelism_ = parallelism_;
|
arg.parallelism_ = parallelism_;
|
||||||
arg.execution_id_ = execution_id_;
|
arg.execution_id_ = execution_id_;
|
||||||
|
arg.tablet_task_id_ = tablet_task_ids_.at(i);
|
||||||
if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_,
|
if (OB_FAIL(location_service->get(tenant_id_, arg.source_tablet_id_,
|
||||||
expire_renew_time, is_cache_hit, ls_id))) {
|
expire_renew_time, is_cache_hit, ls_id))) {
|
||||||
LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_));
|
LOG_WARN("get ls failed", K(ret), K(arg.source_tablet_id_));
|
||||||
|
|||||||
@ -103,6 +103,7 @@ private:
|
|||||||
share::ObDDLType type_;
|
share::ObDDLType type_;
|
||||||
common::ObArray<common::ObTabletID> source_tablet_ids_;
|
common::ObArray<common::ObTabletID> source_tablet_ids_;
|
||||||
common::ObArray<common::ObTabletID> dest_tablet_ids_;
|
common::ObArray<common::ObTabletID> dest_tablet_ids_;
|
||||||
|
common::ObArray<int64_t> tablet_task_ids_;
|
||||||
int64_t source_table_id_;
|
int64_t source_table_id_;
|
||||||
int64_t dest_table_id_;
|
int64_t dest_table_id_;
|
||||||
int64_t schema_version_;
|
int64_t schema_version_;
|
||||||
|
|||||||
@ -674,7 +674,6 @@ int ObIndexBuildTask::wait_data_complement()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && state_finished) {
|
if (OB_SUCC(ret) && state_finished) {
|
||||||
uint64_t execution_id = OB_INVALID_ID;
|
|
||||||
bool dummy_equal = false;
|
bool dummy_equal = false;
|
||||||
if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum(
|
if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum(
|
||||||
tenant_id_, redefinition_execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) {
|
tenant_id_, redefinition_execution_id_, object_id_, index_table_id_, task_id_, dummy_equal, root_service_->get_sql_proxy()))) {
|
||||||
|
|||||||
@ -5571,7 +5571,8 @@ OB_SERIALIZE_MEMBER(ObLogReqLoadProxyProgressRequest, agency_addr_seq_, principa
|
|||||||
OB_SERIALIZE_MEMBER(ObLogReqLoadProxyProgressResponse, err_, progress_);
|
OB_SERIALIZE_MEMBER(ObLogReqLoadProxyProgressResponse, err_, progress_);
|
||||||
|
|
||||||
OB_SERIALIZE_MEMBER(ObDDLBuildSingleReplicaRequestArg, tenant_id_, ls_id_, source_tablet_id_, dest_tablet_id_,
|
OB_SERIALIZE_MEMBER(ObDDLBuildSingleReplicaRequestArg, tenant_id_, ls_id_, source_tablet_id_, dest_tablet_id_,
|
||||||
source_table_id_, dest_schema_id_, schema_version_, snapshot_version_, ddl_type_, task_id_, execution_id_, parallelism_);
|
source_table_id_, dest_schema_id_, schema_version_, snapshot_version_, ddl_type_, task_id_, execution_id_,
|
||||||
|
parallelism_, tablet_task_id_);
|
||||||
|
|
||||||
int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaRequestArg &other)
|
int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaRequestArg &other)
|
||||||
{
|
{
|
||||||
@ -5586,6 +5587,9 @@ int ObDDLBuildSingleReplicaRequestArg::assign(const ObDDLBuildSingleReplicaReque
|
|||||||
snapshot_version_ = other.snapshot_version_;
|
snapshot_version_ = other.snapshot_version_;
|
||||||
ddl_type_ = other.ddl_type_;
|
ddl_type_ = other.ddl_type_;
|
||||||
task_id_ = other.task_id_;
|
task_id_ = other.task_id_;
|
||||||
|
parallelism_ = other.parallelism_;
|
||||||
|
execution_id_ = other.execution_id_;
|
||||||
|
tablet_task_id_ = other.tablet_task_id_;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -7115,16 +7115,19 @@ struct ObDDLBuildSingleReplicaRequestArg final
|
|||||||
{
|
{
|
||||||
OB_UNIS_VERSION(1);
|
OB_UNIS_VERSION(1);
|
||||||
public:
|
public:
|
||||||
ObDDLBuildSingleReplicaRequestArg() : tenant_id_(OB_INVALID_ID), ls_id_(), source_tablet_id_(), dest_tablet_id_(), source_table_id_(OB_INVALID_ID),
|
ObDDLBuildSingleReplicaRequestArg() : tenant_id_(OB_INVALID_ID), ls_id_(), source_tablet_id_(), dest_tablet_id_(),
|
||||||
dest_schema_id_(OB_INVALID_ID), schema_version_(0), snapshot_version_(0), ddl_type_(0), task_id_(0), parallelism_(0), execution_id_(0) {}
|
source_table_id_(OB_INVALID_ID), dest_schema_id_(OB_INVALID_ID),
|
||||||
|
schema_version_(0), snapshot_version_(0), ddl_type_(0), task_id_(0),
|
||||||
|
parallelism_(0), execution_id_(0), tablet_task_id_(0) {}
|
||||||
bool is_valid() const {
|
bool is_valid() const {
|
||||||
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid()
|
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid()
|
||||||
&& OB_INVALID_ID != source_table_id_ && OB_INVALID_ID != dest_schema_id_ && schema_version_ > 0 && snapshot_version_ > 0
|
&& OB_INVALID_ID != source_table_id_ && OB_INVALID_ID != dest_schema_id_ && schema_version_ > 0 && snapshot_version_ > 0
|
||||||
&& task_id_ > 0 && parallelism_ > 0;
|
&& task_id_ > 0 && parallelism_ > 0 && tablet_task_id_ > 0;
|
||||||
}
|
}
|
||||||
int assign(const ObDDLBuildSingleReplicaRequestArg &other);
|
int assign(const ObDDLBuildSingleReplicaRequestArg &other);
|
||||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id),
|
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id),
|
||||||
K_(source_table_id), K_(dest_schema_id), K_(schema_version), K_(snapshot_version), K_(task_id), K_(parallelism), K_(execution_id));
|
K_(source_table_id), K_(dest_schema_id), K_(schema_version), K_(snapshot_version),
|
||||||
|
K_(task_id), K_(parallelism), K_(execution_id), K_(tablet_task_id));
|
||||||
public:
|
public:
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
share::ObLSID ls_id_;
|
share::ObLSID ls_id_;
|
||||||
@ -7138,6 +7141,7 @@ public:
|
|||||||
int64_t task_id_;
|
int64_t task_id_;
|
||||||
int64_t parallelism_;
|
int64_t parallelism_;
|
||||||
int64_t execution_id_;
|
int64_t execution_id_;
|
||||||
|
int64_t tablet_task_id_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObDDLBuildSingleReplicaRequestResult final
|
struct ObDDLBuildSingleReplicaRequestResult final
|
||||||
|
|||||||
@ -114,6 +114,7 @@ int ObComplementDataParam::init(const ObDDLBuildSingleReplicaRequestArg &arg)
|
|||||||
schema_version_ = arg.schema_version_;
|
schema_version_ = arg.schema_version_;
|
||||||
task_id_ = arg.task_id_;
|
task_id_ = arg.task_id_;
|
||||||
execution_id_ = arg.execution_id_;
|
execution_id_ = arg.execution_id_;
|
||||||
|
tablet_task_id_ = arg.tablet_task_id_;
|
||||||
FLOG_INFO("succeed to init ObComplementDataParam", K(ret), K(is_inited_), K(tenant_id_), K(ls_id_),
|
FLOG_INFO("succeed to init ObComplementDataParam", K(ret), K(is_inited_), K(tenant_id_), K(ls_id_),
|
||||||
K(source_tablet_id_), K(dest_tablet_id_), K(schema_version_), K(task_id_), K(arg), K(concurrent_cnt_));
|
K(source_tablet_id_), K(dest_tablet_id_), K(schema_version_), K(task_id_), K(arg), K(concurrent_cnt_));
|
||||||
}
|
}
|
||||||
@ -1094,7 +1095,7 @@ int ObComplementWriteTask::append_row(ObLocalScan &local_scan)
|
|||||||
report_col_checksums,
|
report_col_checksums,
|
||||||
report_col_ids,
|
report_col_ids,
|
||||||
1/*execution_id*/,
|
1/*execution_id*/,
|
||||||
param_->source_tablet_id_.id()/*task_id*/,
|
param_->tablet_task_id_ << 48 | task_id_,
|
||||||
*GCTX.sql_proxy_))) {
|
*GCTX.sql_proxy_))) {
|
||||||
LOG_WARN("fail to report origin table checksum", K(ret));
|
LOG_WARN("fail to report origin table checksum", K(ret));
|
||||||
} else {/* do nothing. */}
|
} else {/* do nothing. */}
|
||||||
|
|||||||
@ -41,7 +41,8 @@ public:
|
|||||||
source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),
|
source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),
|
||||||
data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("CompleteDataPar"),
|
data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("CompleteDataPar"),
|
||||||
row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0),
|
row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0),
|
||||||
concurrent_cnt_(0), task_id_(0), execution_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID)
|
concurrent_cnt_(0), task_id_(0), execution_id_(0), tablet_task_id_(0),
|
||||||
|
compat_mode_(lib::Worker::CompatMode::INVALID)
|
||||||
{}
|
{}
|
||||||
~ObComplementDataParam() { destroy(); }
|
~ObComplementDataParam() { destroy(); }
|
||||||
int init(const ObDDLBuildSingleReplicaRequestArg &arg);
|
int init(const ObDDLBuildSingleReplicaRequestArg &arg);
|
||||||
@ -52,7 +53,7 @@ public:
|
|||||||
return common::OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid()
|
return common::OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid()
|
||||||
&& dest_tablet_id_.is_valid() && OB_NOT_NULL(data_table_schema_) && OB_NOT_NULL(hidden_table_schema_)
|
&& dest_tablet_id_.is_valid() && OB_NOT_NULL(data_table_schema_) && OB_NOT_NULL(hidden_table_schema_)
|
||||||
&& 0 != concurrent_cnt_ && snapshot_version_ > 0 && compat_mode_ != lib::Worker::CompatMode::INVALID
|
&& 0 != concurrent_cnt_ && snapshot_version_ > 0 && compat_mode_ != lib::Worker::CompatMode::INVALID
|
||||||
&& execution_id_ > 0;
|
&& execution_id_ > 0 && tablet_task_id_ > 0;
|
||||||
}
|
}
|
||||||
int get_hidden_table_key(ObITable::TableKey &table_key) const;
|
int get_hidden_table_key(ObITable::TableKey &table_key) const;
|
||||||
void destroy()
|
void destroy()
|
||||||
@ -78,10 +79,11 @@ public:
|
|||||||
concurrent_cnt_ = 0;
|
concurrent_cnt_ = 0;
|
||||||
task_id_ = 0;
|
task_id_ = 0;
|
||||||
execution_id_ = 0;
|
execution_id_ = 0;
|
||||||
|
tablet_task_id_ = 0;
|
||||||
compat_mode_ = lib::Worker::CompatMode::INVALID;
|
compat_mode_ = lib::Worker::CompatMode::INVALID;
|
||||||
}
|
}
|
||||||
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id),
|
TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id),
|
||||||
KPC_(data_table_schema), KPC_(hidden_table_schema), K_(schema_version),
|
KPC_(data_table_schema), KPC_(hidden_table_schema), K_(schema_version), K_(tablet_task_id),
|
||||||
K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode));
|
K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode));
|
||||||
public:
|
public:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
@ -98,6 +100,7 @@ public:
|
|||||||
int64_t concurrent_cnt_;
|
int64_t concurrent_cnt_;
|
||||||
int64_t task_id_;
|
int64_t task_id_;
|
||||||
int64_t execution_id_;
|
int64_t execution_id_;
|
||||||
|
int64_t tablet_task_id_;
|
||||||
lib::Worker::CompatMode compat_mode_;
|
lib::Worker::CompatMode compat_mode_;
|
||||||
ObSEArray<common::ObStoreRange, 32> ranges_;
|
ObSEArray<common::ObStoreRange, 32> ranges_;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user