fix ddl task when the tenant switch to standby

This commit is contained in:
simonjoylet
2023-03-03 09:13:52 +00:00
committed by ob-robot
parent 730345eeb7
commit c6e23516d2
23 changed files with 259 additions and 91 deletions

View File

@ -97,7 +97,7 @@ int ObColumnRedefinitionTask::init(const ObDDLTaskRecord &task_record)
} else if (!task_record.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret));
} else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
} else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret));
} else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) {
LOG_WARN("set ddl stmt str failed", K(ret));
@ -494,17 +494,19 @@ int ObColumnRedefinitionTask::serialize_params_to_message(char *buf, const int64
return ret;
}
int ObColumnRedefinitionTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos)
int ObColumnRedefinitionTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
obrpc::ObAlterTableArg tmp_arg;
if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
} else {

View File

@ -63,7 +63,7 @@ private:
int copy_table_constraints();
int copy_table_foreign_keys();
virtual int serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
private:
static const int64_t OB_COLUMN_REDEFINITION_TASK_VERSION = 1L;

View File

@ -550,7 +550,7 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record)
} else if (OB_ISNULL(root_service)) {
ret = OB_ERR_SYS;
LOG_WARN("error sys, root service must not be nullptr", K(ret));
} else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
} else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret));
} else {
object_id_ = table_id;
@ -1819,17 +1819,19 @@ int ObConstraintTask::serialize_params_to_message(char *buf, const int64_t buf_l
return ret;
}
int ObConstraintTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos)
int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
ObAlterTableArg tmp_arg;
if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
}

View File

@ -104,7 +104,7 @@ public:
virtual int process() override;
int update_check_constraint_finish(const int ret_code);
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
virtual void flt_set_task_span_tag() const override;
virtual void flt_set_status_span_tag() const override;

View File

@ -216,7 +216,7 @@ int ObDDLRetryTask::init(const ObDDLTaskRecord &task_record)
is_schema_change_done_ = false; // do not worry about it, check_schema_change_done will correct it.
if (nullptr != task_record.message_) {
int64_t pos = 0;
if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("fail to deserialize params from message", K(ret));
}
}
@ -614,18 +614,20 @@ int ObDDLRetryTask::serialize_params_to_message(char *buf, const int64_t buf_siz
return ret;
}
int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos)
int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size));
} else if (OB_FAIL(serialization::decode_i64(buf, buf_size, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (ObDDLType::DDL_DROP_DATABASE == task_type_) {
obrpc::ObDropDatabaseArg tmp_arg;
if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) {
LOG_WARN("deep copy table arg failed", K(ret));
}
@ -633,6 +635,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t
obrpc::ObDropTableArg tmp_arg;
if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) {
LOG_WARN("deep copy table arg failed", K(ret));
}
@ -640,6 +644,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t
obrpc::ObTruncateTableArg tmp_arg;
if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) {
LOG_WARN("deep copy table arg failed", K(ret));
}
@ -650,6 +656,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const char *buf, const int64_t
obrpc::ObAlterTableArg tmp_arg;
if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_ddl_arg(allocator_, task_type_, &tmp_arg))) {
LOG_WARN("deep copy table arg failed", K(ret));
}

View File

@ -39,7 +39,7 @@ public:
virtual int process() override;
virtual bool is_valid() const override;
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
static int update_task_status_wait_child_task_finish(
common::ObMySQLTransaction &trans,

View File

@ -1524,6 +1524,18 @@ int ObDDLScheduler::insert_task_record(
return ret;
}
static bool is_tenant_primary(const ObIArray<uint64_t> &primary_tenant_ids, const uint64_t tenant_id)
{
bool is_primary = false;
for (int64_t i = 0; i < primary_tenant_ids.count(); ++i) {
if (primary_tenant_ids.at(i) == tenant_id) {
is_primary = true;
break;
}
}
return is_primary;
}
int ObDDLScheduler::recover_task()
{
int ret = OB_SUCCESS;
@ -1533,12 +1545,15 @@ int ObDDLScheduler::recover_task()
} else {
ObSqlString sql_string;
ObArray<ObDDLTaskRecord> task_records;
ObArray<uint64_t> primary_tenant_ids;
ObArenaAllocator allocator(lib::ObLabel("DdlTasRecord"));
share::schema::ObMultiVersionSchemaService &schema_service = root_service_->get_schema_service();
if (OB_FAIL(ObDDLTaskRecordOperator::get_all_ddl_task_record(root_service_->get_sql_proxy(), allocator, task_records))) {
LOG_WARN("get task record failed", K(ret), K(sql_string));
} else if (OB_FAIL(ObAllTenantInfoProxy::get_primary_tenant_ids(&root_service_->get_sql_proxy(), primary_tenant_ids))) {
LOG_WARN("get primary tenant id failed", K(ret));
}
LOG_INFO("start processing ddl recovery", K(task_records));
LOG_INFO("start processing ddl recovery", K(task_records), K(primary_tenant_ids));
for (int64_t i = 0; OB_SUCC(ret) && i < task_records.count(); ++i) {
const ObDDLTaskRecord &cur_record = task_records.at(i);
int64_t tenant_schema_version = 0;
@ -1547,8 +1562,11 @@ int ObDDLScheduler::recover_task()
ObMySQLTransaction trans;
if (OB_FAIL(schema_service.get_tenant_schema_version(cur_record.tenant_id_, tenant_schema_version))) {
LOG_WARN("failed to get tenant schema version", K(ret), K(cur_record));
} else if (!is_tenant_primary(primary_tenant_ids, cur_record.tenant_id_)) {
LOG_INFO("tenant not primary, skip schedule ddl task", K(cur_record));
} else if (tenant_schema_version < cur_record.schema_version_) {
// schema has not publish, by pass now
LOG_INFO("skip schedule ddl task, because tenant schema version too old", K(tenant_schema_version), K(cur_record));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), cur_record.tenant_id_))) {
LOG_WARN("start transaction failed", K(ret));
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans,
@ -1616,21 +1634,14 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record)
LOG_WARN("ddl task record is invalid", K(ret), K(record));
} else {
switch (record.ddl_type_) {
case ObDDLType::DDL_CREATE_INDEX: {
if (OB_FAIL(schedule_build_index_task(record))) {
LOG_WARN("schedule global index task failed", K(ret), K(record));
}
case ObDDLType::DDL_CREATE_INDEX:
ret = schedule_build_index_task(record);
break;
}
case ObDDLType::DDL_DROP_INDEX:
if (OB_FAIL(schedule_drop_index_task(record))) {
LOG_WARN("schedule drop index task failed", K(ret));
}
ret = schedule_drop_index_task(record);
break;
case DDL_DROP_PRIMARY_KEY:
if (OB_FAIL(schedule_drop_primary_key_task(record))) {
LOG_WARN("schedule drop primary key task failed", K(ret));
}
ret = schedule_drop_primary_key_task(record);
break;
case DDL_MODIFY_COLUMN:
case DDL_ADD_PRIMARY_KEY:
@ -1639,29 +1650,20 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record)
case DDL_CONVERT_TO_CHARACTER:
case DDL_TABLE_REDEFINITION:
case DDL_DIRECT_LOAD:
if (OB_FAIL(schedule_table_redefinition_task(record))) {
LOG_WARN("schedule table redefinition task failed", K(ret));
}
ret = schedule_table_redefinition_task(record);
break;
case DDL_DROP_COLUMN:
case DDL_ADD_COLUMN_OFFLINE:
case DDL_COLUMN_REDEFINITION:
if(OB_FAIL(schedule_column_redefinition_task(record))) {
LOG_WARN("schedule column redefinition task failed", K(ret));
}
ret = schedule_column_redefinition_task(record);
break;
case DDL_CHECK_CONSTRAINT:
case DDL_FOREIGN_KEY_CONSTRAINT:
case DDL_ADD_NOT_NULL_COLUMN:
if (OB_FAIL(schedule_constraint_task(record))) {
LOG_WARN("schedule constraint task failed", K(ret));
}
ret = schedule_constraint_task(record);
break;
case DDL_MODIFY_AUTO_INCREMENT:
if (OB_FAIL(schedule_modify_autoinc_task(record))) {
LOG_WARN("schedule modify autoinc task failed", K(ret));
}
ret = schedule_modify_autoinc_task(record);
break;
case DDL_DROP_DATABASE:
case DDL_DROP_TABLE:
@ -1670,9 +1672,7 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record)
case DDL_DROP_SUB_PARTITION:
case DDL_TRUNCATE_PARTITION:
case DDL_TRUNCATE_SUB_PARTITION:
if (OB_FAIL(schedule_ddl_retry_task(record))) {
LOG_WARN("schedule ddl retry task failed", K(ret));
}
ret = schedule_ddl_retry_task(record);
break;
default: {
ret = OB_NOT_SUPPORTED;
@ -1681,6 +1681,9 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record)
}
}
LOG_INFO("schedule ddl task", K(ret), K(record));
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
@ -1711,9 +1714,6 @@ int ObDDLScheduler::schedule_build_index_task(
allocator_.free(build_index_task);
build_index_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1740,9 +1740,6 @@ int ObDDLScheduler::schedule_drop_primary_key_task(const ObDDLTaskRecord &task_r
allocator_.free(drop_pk_task);
drop_pk_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1772,9 +1769,6 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task
allocator_.free(redefinition_task);
redefinition_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1801,9 +1795,6 @@ int ObDDLScheduler::schedule_column_redefinition_task(const ObDDLTaskRecord &tas
allocator_.free(redefinition_task);
redefinition_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1830,9 +1821,6 @@ int ObDDLScheduler::schedule_ddl_retry_task(const ObDDLTaskRecord &task_record)
allocator_.free(ddl_retry_task);
ddl_retry_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1859,9 +1847,6 @@ int ObDDLScheduler::schedule_constraint_task(const ObDDLTaskRecord &task_record)
allocator_.free(constraint_task);
constraint_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1888,9 +1873,6 @@ int ObDDLScheduler::schedule_modify_autoinc_task(const ObDDLTaskRecord &task_rec
allocator_.free(modify_autoinc_task);
modify_autoinc_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}
@ -1917,9 +1899,6 @@ int ObDDLScheduler::schedule_drop_index_task(const ObDDLTaskRecord &task_record)
allocator_.free(drop_index_task);
drop_index_task = nullptr;
}
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
}
return ret;
}

View File

@ -820,6 +820,7 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_cancel = false;
bool is_standby_tenant = false;
int real_ret_code = ret_code;
bool is_tenant_dropped = false;
ObDDLTaskStatus real_new_status = new_status;
@ -847,6 +848,11 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
} else if (is_tenant_dropped) {
need_retry_ = false;
LOG_INFO("tenant has been dropped, exit anyway", K(ret), K(tenant_id_));
} else if (OB_FAIL(ObAllTenantInfoProxy::is_standby_tenant(&root_service->get_sql_proxy(), tenant_id_, is_standby_tenant))) {
LOG_WARN("check is standby tenant failed", K(ret), K(tenant_id_));
} else if (is_standby_tenant) {
need_retry_ = false;
LOG_INFO("tenant is standby, exit anyway", K(ret), K(tenant_id_));
} else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) {
LOG_WARN("start transaction failed", K(ret));
} else {

View File

@ -424,7 +424,7 @@ public:
int64_t get_data_format_version() const { return data_format_version_; }
static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id);
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const = 0;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) = 0;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) = 0;
virtual int64_t get_serialize_param_size() const = 0;
const ObString &get_ddl_stmt_str() const { return ddl_stmt_str_; }
int set_ddl_stmt_str(const ObString &ddl_stmt_str);

View File

@ -90,7 +90,7 @@ int ObDropIndexTask::init(
ret_code_ = task_record.ret_code_;
if (nullptr != task_record.message_.ptr()) {
int64_t pos = 0;
if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret));
}
}
@ -418,15 +418,17 @@ int ObDropIndexTask::serialize_params_to_message(char *buf, const int64_t buf_si
return ret;
}
int ObDropIndexTask::deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos)
int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos)
{
int ret = OB_SUCCESS;
obrpc::ObDropIndexArg tmp_drop_index_arg;
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), KP(buf), K(buf_size));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size));
} else if (OB_FAIL(tmp_drop_index_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("deserialize failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_drop_index_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_drop_index_arg));
} else if (OB_FAIL(deep_copy_index_arg(allocator_, tmp_drop_index_arg, drop_index_arg_))) {
LOG_WARN("deep copy drop index arg failed", K(ret));
}

View File

@ -37,7 +37,7 @@ public:
virtual int process() override;
virtual bool is_valid() const override;
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
INHERIT_TO_STRING_KV("ObDDLTask", ObDDLTask, KP_(root_service));
virtual void flt_set_task_span_tag() const override;

View File

@ -400,7 +400,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record)
} else if (!task_record.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(task_record));
} else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
} else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret));
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(
task_record.tenant_id_, schema_guard, schema_version))) {
@ -1469,17 +1469,19 @@ int ObIndexBuildTask::serialize_params_to_message(char *buf, const int64_t buf_l
return ret;
}
int ObIndexBuildTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos)
int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
ObCreateIndexArg tmp_arg;
if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, create_index_arg_))) {
LOG_WARN("deep copy create index arg failed", K(ret));
} else {

View File

@ -107,7 +107,7 @@ public:
virtual bool is_valid() const override;
virtual int collect_longops_stat(share::ObLongopsValue &value) override;
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
virtual bool support_longops_monitoring() const override { return true; }
static int deep_copy_index_arg(common::ObIAllocator &allocator, const obrpc::ObCreateIndexArg &source_arg, obrpc::ObCreateIndexArg &dest_arg);

View File

@ -198,7 +198,7 @@ int ObModifyAutoincTask::init(const ObDDLTaskRecord &task_record)
} else if (OB_UNLIKELY(!task_record.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(task_record));
} else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
} else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret));
} else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) {
LOG_WARN("set ddl stmt str failed", K(ret));
@ -599,17 +599,19 @@ int ObModifyAutoincTask::serialize_params_to_message(char *buf, const int64_t bu
return ret;
}
int ObModifyAutoincTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos)
int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
obrpc::ObAlterTableArg tmp_arg;
if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
}

View File

@ -64,7 +64,7 @@ public:
int init(const ObDDLTaskRecord &task_record);
virtual int process() override;
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
int notify_update_autoinc_finish(const uint64_t autoinc_val, const int ret_code);
virtual void flt_set_task_span_tag() const override;

View File

@ -99,7 +99,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record)
} else if (!task_record.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(task_record));
} else if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
} else if (OB_FAIL(deserlize_params_from_message(task_record.tenant_id_, task_record.message_.ptr(), task_record.message_.length(), pos))) {
LOG_WARN("deserialize params from message failed", K(ret), K(task_record.message_), K(common::lbt()));
} else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) {
LOG_WARN("set ddl stmt str failed", K(ret));
@ -962,7 +962,7 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_
return ret;
}
int ObTableRedefinitionTask::deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos)
int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
int8_t copy_indexes = 0;
@ -972,13 +972,15 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const char *buf, cons
int8_t ignore_errors = 0;
int8_t do_finish = 0;
obrpc::ObAlterTableArg tmp_arg;
if (OB_UNLIKELY(nullptr == buf || data_len <= 0)) {
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(data_len));
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &parallelism_))) {

View File

@ -57,7 +57,7 @@ public:
inline void set_is_ignore_errors(const bool is_ignore_errors) {is_ignore_errors_ = is_ignore_errors;}
inline void set_is_do_finish(const bool is_do_finish) {is_do_finish_ = is_do_finish;}
virtual int serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const override;
virtual int deserlize_params_from_message(const char *buf, const int64_t data_len, int64_t &pos) override;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t data_len, int64_t &pos) override;
virtual int64_t get_serialize_param_size() const override;
int assign(const ObTableRedefinitionTask *table_redef_task);
virtual int collect_longops_stat(share::ObLongopsValue &value) override;

View File

@ -1070,7 +1070,7 @@ int ObDDLUtil::get_data_format_version(
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "message_unhex", task_message);
if (ObDDLType::DDL_CREATE_INDEX == ddl_type) {
SMART_VAR(rootserver::ObIndexBuildTask, task) {
if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) {
if (OB_FAIL(task.deserlize_params_from_message(tenant_id, task_message.ptr(), task_message.length(), pos))) {
LOG_WARN("deserialize from msg failed", K(ret));
} else {
data_format_version = task.get_data_format_version();
@ -1078,7 +1078,7 @@ int ObDDLUtil::get_data_format_version(
}
} else {
SMART_VAR(rootserver::ObTableRedefinitionTask, task) {
if (OB_FAIL(task.deserlize_params_from_message(task_message.ptr(), task_message.length(), pos))) {
if (OB_FAIL(task.deserlize_params_from_message(tenant_id, task_message.ptr(), task_message.length(), pos))) {
LOG_WARN("deserialize from msg failed", K(ret));
} else {
data_format_version = task.get_data_format_version();
@ -1091,6 +1091,75 @@ int ObDDLUtil::get_data_format_version(
return ret;
}
static inline void try_replace_user_tenant_id(const uint64_t user_tenant_id, uint64_t &check_tenant_id)
{
check_tenant_id = !is_user_tenant(check_tenant_id) ? check_tenant_id : user_tenant_id;
}
int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObAlterTableArg &alter_table_arg)
{
int ret = OB_SUCCESS;
if (!is_user_tenant(tenant_id)) {
LOG_TRACE("not user tenant, no need to replace", K(tenant_id));
} else {
try_replace_user_tenant_id(tenant_id, alter_table_arg.exec_tenant_id_);
for (int64_t i = 0; OB_SUCC(ret) && i < alter_table_arg.index_arg_list_.count(); ++i) {
obrpc::ObIndexArg *index_arg = alter_table_arg.index_arg_list_.at(i);
try_replace_user_tenant_id(tenant_id, index_arg->exec_tenant_id_);
try_replace_user_tenant_id(tenant_id, index_arg->tenant_id_);
}
for (int64_t i = 0; OB_SUCC(ret) && i < alter_table_arg.foreign_key_arg_list_.count(); ++i) {
obrpc::ObCreateForeignKeyArg &fk_arg = alter_table_arg.foreign_key_arg_list_.at(i);
try_replace_user_tenant_id(tenant_id, fk_arg.exec_tenant_id_);
try_replace_user_tenant_id(tenant_id, fk_arg.tenant_id_);
}
if (is_user_tenant(alter_table_arg.alter_table_schema_.get_tenant_id())) {
alter_table_arg.alter_table_schema_.set_tenant_id(tenant_id);
}
try_replace_user_tenant_id(tenant_id, alter_table_arg.sequence_ddl_arg_.exec_tenant_id_);
if (is_user_tenant(alter_table_arg.sequence_ddl_arg_.seq_schema_.get_tenant_id())) {
alter_table_arg.sequence_ddl_arg_.seq_schema_.set_tenant_id(tenant_id);
}
}
return ret;
}
int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObCreateIndexArg &create_index_arg)
{
int ret = OB_SUCCESS;
if (!is_user_tenant(tenant_id)) {
LOG_TRACE("not user tenant, no need to replace", K(tenant_id));
} else {
try_replace_user_tenant_id(tenant_id, create_index_arg.exec_tenant_id_);
try_replace_user_tenant_id(tenant_id, create_index_arg.tenant_id_);
if (is_user_tenant(create_index_arg.index_schema_.get_tenant_id())) {
create_index_arg.index_schema_.set_tenant_id(tenant_id);
}
}
return ret;
}
#define REPLACE_DDL_ARG_FUNC(ArgType) \
int ObDDLUtil::replace_user_tenant_id(const uint64_t tenant_id, ArgType &ddl_arg) \
{ \
int ret = OB_SUCCESS; \
if (!is_user_tenant(tenant_id)) { \
LOG_TRACE("not user tenant, no need to replace", K(tenant_id)); \
} else { \
try_replace_user_tenant_id(tenant_id, ddl_arg.exec_tenant_id_); \
try_replace_user_tenant_id(tenant_id, ddl_arg.tenant_id_); \
} \
return ret; \
}
REPLACE_DDL_ARG_FUNC(obrpc::ObDropDatabaseArg)
REPLACE_DDL_ARG_FUNC(obrpc::ObDropTableArg)
REPLACE_DDL_ARG_FUNC(obrpc::ObDropIndexArg)
REPLACE_DDL_ARG_FUNC(obrpc::ObTruncateTableArg)
#undef REPLACE_DDL_ARG_FUNC
/****************** ObCheckTabletDataComplementOp *************/
int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(

View File

@ -24,7 +24,12 @@ namespace oceanbase
namespace obrpc
{
class ObSrvRpcProxy;
class ObAlterTableArg;
struct ObAlterTableArg;
struct ObDropDatabaseArg;
struct ObDropTableArg;
struct ObDropIndexArg;
struct ObTruncateTableArg;
struct ObCreateIndexArg;
}
namespace sql
{
@ -328,6 +333,13 @@ public:
const uint64_t task_id,
int64_t &data_format_version);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObAlterTableArg &alter_table_arg);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropDatabaseArg &drop_db_arg);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropTableArg &drop_table_arg);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObDropIndexArg &drop_index_arg);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObTruncateTableArg &trucnate_table_arg);
static int replace_user_tenant_id(const uint64_t tenant_id, obrpc::ObCreateIndexArg &create_index_arg);
private:
static int generate_column_name_str(
const common::ObIArray<ObColumnNameInfo> &column_names,

View File

@ -270,6 +270,53 @@ int ObAllTenantInfoProxy::is_standby_tenant(
return ret;
}
int ObAllTenantInfoProxy::get_primary_tenant_ids(
ObISQLClient *proxy,
ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
tenant_ids.reset();
ObSqlString sql;
ObTenantRole primary_role(ObTenantRole::PRIMARY_TENANT);
if (OB_ISNULL(proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy is null", KR(ret), KP(proxy));
} else if (OB_FAIL(sql.append_fmt("select tenant_id from %s where tenant_role = '%s'",
OB_ALL_VIRTUAL_TENANT_INFO_TNAME, primary_role.to_str()))) {
LOG_WARN("gnenerate sql failed", K(ret));
} else {
HEAP_VAR(ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
if (OB_FAIL(proxy->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
LOG_WARN("failed to read", KR(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get sql result", KR(ret));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next sql result failed", K(ret));
} else {
ret = OB_SUCCESS;
break;;
}
} else {
uint64_t tenant_id = OB_INVALID_TENANT_ID;
EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", tenant_id, uint64_t);
if (OB_FAIL(ret)) {
LOG_WARN("failed to get result", KR(ret));
} else if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
LOG_WARN("push back tenant id failed", K(ret), K(tenant_id), K(tenant_ids.count()));
}
}
}
}
}
}
return ret;
}
int ObAllTenantInfoProxy::load_tenant_info(const uint64_t tenant_id,
ObISQLClient *proxy,
const bool for_update,

View File

@ -156,6 +156,10 @@ public:
ObISQLClient *proxy,
const uint64_t tenant_id,
bool &is_standby);
static int get_primary_tenant_ids(
ObISQLClient *proxy,
ObIArray<uint64_t> &tenant_ids);
/**
* @description: get target tenant's tenant_info from inner table
* @param[in] tenant_id

View File

@ -64,6 +64,7 @@ int ObDDLExecutorUtil::wait_ddl_finish(
} else {
int tmp_ret = OB_SUCCESS;
bool is_tenant_dropped = false;
bool is_tenant_standby = false;
while (OB_SUCC(ret)) {
if (OB_SUCCESS == ObDDLErrorMessageTableOperator::get_ddl_error_message(
tenant_id, task_id, -1 /* target_object_id */, unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len)) {
@ -79,6 +80,13 @@ int ObDDLExecutorUtil::wait_ddl_finish(
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", K(ret), K(tenant_id));
break;
} else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id));
} else if (is_tenant_standby) {
ret = OB_STANDBY_READ_ONLY;
FORWARD_USER_ERROR(ret, "DDL not finish, need check");
LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id));
break;
} else if (OB_FAIL(handle_session_exception(session))) {
LOG_WARN("session exeception happened", K(ret), K(is_support_cancel));
if (is_support_cancel && OB_TMP_FAIL(cancel_ddl_task(tenant_id, common_rpc_proxy))) {
@ -100,6 +108,7 @@ int ObDDLExecutorUtil::wait_build_index_finish(const uint64_t tenant_id, const i
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_tenant_dropped = false;
bool is_tenant_standby = false;
ObAddr unused_addr;
int64_t unused_user_msg_len = 0;
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + OB_MAX_USER_SPECIFIED_TIMEOUT);
@ -122,6 +131,12 @@ int ObDDLExecutorUtil::wait_build_index_finish(const uint64_t tenant_id, const i
} else if (is_tenant_dropped) {
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", K(ret), K(tenant_id));
} else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id));
} else if (is_tenant_standby) {
ret = OB_STANDBY_READ_ONLY;
FORWARD_USER_ERROR(ret, "DDL not finish, need check");
LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id));
}
return ret;
}
@ -146,6 +161,7 @@ int ObDDLExecutorUtil::wait_ddl_retry_task_finish(
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), KP(common_rpc_proxy));
} else {
bool is_tenant_dropped = false;
bool is_tenant_standby = false;
int tmp_ret = OB_SUCCESS;
while (OB_SUCC(ret)) {
if (OB_SUCCESS == ObDDLErrorMessageTableOperator::get_ddl_error_message(
@ -189,6 +205,13 @@ int ObDDLExecutorUtil::wait_ddl_retry_task_finish(
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", K(ret), K(tenant_id));
break;
} else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id));
} else if (is_tenant_standby) {
ret = OB_STANDBY_READ_ONLY;
FORWARD_USER_ERROR(ret, "DDL not finish, need check");
LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id));
break;
} else if (OB_FAIL(handle_session_exception(session))) {
LOG_WARN("session exeception happened", K(ret));
if (OB_TMP_FAIL(cancel_ddl_task(tenant_id, common_rpc_proxy))) {

View File

@ -293,6 +293,7 @@ int ObDropIndexExecutor::wait_drop_index_finish(
while (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
bool is_tenant_dropped = false;
bool is_tenant_standby = false;
if (OB_SUCCESS == share::ObDDLErrorMessageTableOperator::get_ddl_error_message(
tenant_id, task_id, -1 /* target_object_id */, unused_addr, false /* is_ddl_retry_task */, *GCTX.sql_proxy_, error_message, unused_user_msg_len)) {
ret = error_message.ret_code_;
@ -307,6 +308,13 @@ int ObDropIndexExecutor::wait_drop_index_finish(
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", K(ret), K(tenant_id));
break;
} else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id));
} else if (is_tenant_standby) {
ret = OB_STANDBY_READ_ONLY;
FORWARD_USER_ERROR(ret, "DDL not finish, need check");
LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id));
break;
} else if (OB_FAIL(session.check_session_status())) {
LOG_WARN("session exeception happened", K(ret));
} else {