Acquire table lock before create hidden table

This commit is contained in:
Hongqin-Li
2023-06-29 12:48:11 +00:00
committed by ob-robot
parent 07a56f4636
commit 379fbf754c
5 changed files with 53 additions and 43 deletions

View File

@ -881,6 +881,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam &param,
param.dest_table_schema_,
param.parallelism_,
param.consumer_group_id_,
param.task_id_,
static_cast<const obrpc::ObAlterTableArg *>(param.ddl_arg_),
*param.allocator_,
task_record))) {
@ -895,6 +896,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam &param,
param.dest_table_schema_,
param.parallelism_,
param.consumer_group_id_,
param.task_id_,
alter_table_arg,
*param.allocator_,
task_record))) {
@ -926,6 +928,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam &param,
param.dest_table_schema_,
param.parallelism_,
param.consumer_group_id_,
param.task_id_,
static_cast<const obrpc::ObAlterTableArg *>(param.ddl_arg_),
*param.allocator_,
task_record))) {
@ -938,6 +941,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam &param,
param.src_table_schema_->get_table_id(),
param.schema_version_,
param.consumer_group_id_,
param.task_id_,
static_cast<const obrpc::ObAlterTableArg *>(param.ddl_arg_),
*param.allocator_,
task_record))) {
@ -1430,21 +1434,19 @@ int ObDDLScheduler::create_table_redefinition_task(
const share::schema::ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record)
{
int ret = OB_SUCCESS;
int64_t task_id = 0;
SMART_VAR(ObTableRedefinitionTask, redefinition_task) {
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLScheduler has not been inited", K(ret));
} else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
} else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(redefinition_task.init(src_schema->get_tenant_id(),
task_id,
type,
@ -1472,21 +1474,19 @@ int ObDDLScheduler::create_drop_primary_key_task(
const ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record)
{
int ret = OB_SUCCESS;
int64_t task_id = 0;
SMART_VAR(ObDropPrimaryKeyTask, drop_pk_task) {
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLScheduler has not been inited", K(ret));
} else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
} else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(drop_pk_task.init(src_schema->get_tenant_id(),
task_id,
type,
@ -1514,21 +1514,19 @@ int ObDDLScheduler::create_column_redefinition_task(
const share::schema::ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record)
{
int ret = OB_SUCCESS;
int64_t task_id = 0;
SMART_VAR(ObColumnRedefinitionTask, redefinition_task) {
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLScheduler has not been inited", K(ret));
} else if (OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
} else if (OB_UNLIKELY(0 == task_id) || OB_ISNULL(alter_table_arg) || OB_ISNULL(src_schema) || OB_ISNULL(dest_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), src_schema->get_tenant_id(), task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
LOG_WARN("invalid arguments", K(ret), K(task_id), KP(alter_table_arg), KP(src_schema), KP(dest_schema));
} else if (OB_FAIL(redefinition_task.init(src_schema->get_tenant_id(),
task_id,
type,
@ -1555,22 +1553,20 @@ int ObDDLScheduler::create_modify_autoinc_task(
const int64_t table_id,
const int64_t schema_version,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record)
{
int ret = OB_SUCCESS;
SMART_VAR(ObModifyAutoincTask, modify_autoinc_task) {
int64_t task_id = 0;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id
|| schema_version <= 0 || nullptr == arg || !arg->is_valid())) {
|| schema_version <= 0 || 0 == task_id || nullptr == arg || !arg->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(table_id), K(schema_version), K(arg));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), tenant_id, task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(table_id), K(schema_version), K(task_id), K(arg));
} else if (OB_FAIL(modify_autoinc_task.init(tenant_id, task_id, table_id, schema_version, consumer_group_id, *arg))) {
LOG_WARN("init global index task failed", K(ret), K(table_id), K(arg));
} else if (OB_FAIL(modify_autoinc_task.set_trace_id(*ObCurTraceId::get_trace_id()))) {

View File

@ -346,6 +346,7 @@ private:
const share::schema::ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record);
@ -357,6 +358,7 @@ private:
const ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record);
@ -368,6 +370,7 @@ private:
const share::schema::ObTableSchema *dest_schema,
const int64_t parallelism,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record);
@ -378,6 +381,7 @@ private:
const int64_t table_id,
const int64_t schema_version,
const int64_t consumer_group_id,
const int64_t task_id,
const obrpc::ObAlterTableArg *alter_table_arg,
ObIAllocator &allocator,
ObDDLTaskRecord &task_record);

View File

@ -157,7 +157,7 @@ OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField,
is_abort_);
ObCreateDDLTaskParam::ObCreateDDLTaskParam()
: tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), consumer_group_id_(0), parent_task_id_(0),
: tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), consumer_group_id_(0), parent_task_id_(0), task_id_(0),
type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr)
{
}
@ -172,9 +172,10 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id,
const int64_t consumer_group_id,
ObIAllocator *allocator,
const obrpc::ObDDLArg *ddl_arg,
const int64_t parent_task_id)
const int64_t parent_task_id,
const int64_t task_id)
: tenant_id_(tenant_id), object_id_(object_id), schema_version_(schema_version), parallelism_(parallelism), consumer_group_id_(consumer_group_id),
parent_task_id_(parent_task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema),
parent_task_id_(parent_task_id), task_id_(task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema),
ddl_arg_(ddl_arg), allocator_(allocator)
{
}

View File

@ -143,11 +143,12 @@ public:
const int64_t consumer_group_id,
ObIAllocator *allocator,
const obrpc::ObDDLArg *ddl_arg = nullptr,
const int64_t parent_task_id = 0);
const int64_t parent_task_id = 0,
const int64_t task_id = 0);
~ObCreateDDLTaskParam() = default;
bool is_valid() const { return OB_INVALID_ID != tenant_id_ && type_ > share::DDL_INVALID
&& type_ < share::DDL_MAX && nullptr != allocator_; }
TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id),
TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), K_(task_id),
K_(type), KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg));
public:
uint64_t tenant_id_;
@ -156,6 +157,7 @@ public:
int64_t parallelism_;
int64_t consumer_group_id_;
int64_t parent_task_id_;
int64_t task_id_;
share::ObDDLType type_;
const ObTableSchema *src_table_schema_;
const ObTableSchema *dest_table_schema_;

View File

@ -11213,12 +11213,20 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar
} else {
ObDDLSQLTransaction trans(schema_service_);
ObDDLTaskRecord task_record;
int64_t task_id = 0;
int64_t refreshed_schema_version = 0;
bool with_primary_key_operation = false;
if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) {
LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id));
} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) {
LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service->get_sql_proxy(), tenant_id, task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
} else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema,
nullptr,
ObTableLockOwnerID(task_id),
trans))) {
LOG_WARN("failed to lock ddl lock", K(ret));
}
// TODO yiren, refactor it, create user hidden table after alter index/column/part/cst...
if (OB_FAIL(ret)) {
@ -11369,7 +11377,9 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar
alter_table_arg.parallelism_,
alter_table_arg.consumer_group_id_,
&alter_table_arg.allocator_,
&alter_table_arg);
&alter_table_arg,
0/*parent_task_id*/,
task_id);
if (orig_table_schema->is_tmp_table() || orig_table_schema->is_external_table()) {
ret = OB_OP_NOT_ALLOW;
char err_msg[OB_MAX_ERROR_MSG_LEN] = {0};
@ -11383,11 +11393,6 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar
LOG_WARN("failed to alter table that has conflict ddl", K(ret), K(orig_table_schema->get_table_id()));
} else if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) {
LOG_WARN("submit ddl task failed", K(ret));
} else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema,
bind_tablets ? &new_table_schema : nullptr,
ObTableLockOwnerID(task_record.task_id_),
trans))) {
LOG_WARN("failed to lock ddl lock", K(ret));
} else {
res.task_id_ = task_record.task_id_;
}
@ -11401,14 +11406,11 @@ int ObDDLService::do_offline_ddl_in_trans(obrpc::ObAlterTableArg &alter_table_ar
alter_table_arg.parallelism_,
alter_table_arg.consumer_group_id_,
&alter_table_arg.allocator_,
&alter_table_arg);
&alter_table_arg,
0/*parent_task_id*/,
task_id);
if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) {
LOG_WARN("submit ddl task failed", K(ret));
} else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(new_table_schema,
nullptr,
ObTableLockOwnerID(task_record.task_id_),
trans))) {
LOG_WARN("failed to lock ddl lock", K(ret));
} else {
res.task_id_ = task_record.task_id_;
}
@ -11480,6 +11482,7 @@ int ObDDLService::create_hidden_table(
ObDDLSQLTransaction trans(schema_service_);
common::ObArenaAllocator allocator;
ObDDLTaskRecord task_record;
int64_t task_id = 0;
int64_t refreshed_schema_version = 0;
new_table_schema.set_tenant_id(dest_tenant_id);
new_table_schema.set_table_state_flag(ObTableStateFlag::TABLE_STATE_OFFLINE_DDL);
@ -11487,6 +11490,13 @@ int ObDDLService::create_hidden_table(
LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id));
} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, refreshed_schema_version))) {
LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version));
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service->get_sql_proxy(), tenant_id, task_id))) {
LOG_WARN("fetch new task id failed", K(ret));
} else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema,
nullptr,
ObTableLockOwnerID(task_id),
trans))) {
LOG_WARN("failed to lock ddl lock", K(ret));
} else if (OB_FAIL(create_user_hidden_table(
*orig_table_schema,
new_table_schema,
@ -11527,14 +11537,11 @@ int ObDDLService::create_hidden_table(
create_hidden_table_arg.parallelism_,
create_hidden_table_arg.consumer_group_id_,
&allocator_for_redef,
&alter_table_arg);
&alter_table_arg,
0,
task_id);
if (OB_FAIL(root_service->get_ddl_scheduler().create_ddl_task(param, trans, task_record))) {
LOG_WARN("submit ddl task failed", K(ret));
} else if (OB_FAIL(ObDDLLock::lock_for_offline_ddl(*orig_table_schema,
bind_tablets ? &new_table_schema : nullptr,
ObTableLockOwnerID(task_record.task_id_),
trans))) {
LOG_WARN("failed to lock ddl lock", K(ret));
} else if (orig_table_schema->get_table_state_flag() == ObTableStateFlag::TABLE_STATE_OFFLINE_DDL) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("offline ddl is being executed, other ddl operations are not allowed, create hidden table fail", K(ret), K(create_hidden_table_arg));