From b87473dac26c92bda28bb441be15ae8847896d2c Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Sun, 29 Jan 2023 16:04:46 +0800 Subject: [PATCH] Fix update execution id core --- .../ddl_task/ob_ddl_redefinition_task.cpp | 54 +++- .../ddl_task/ob_ddl_redefinition_task.h | 4 +- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 306 ++++++++---------- src/rootserver/ddl_task/ob_ddl_scheduler.h | 9 +- src/rootserver/ddl_task/ob_ddl_task.cpp | 22 +- src/rootserver/ddl_task/ob_ddl_task.h | 9 +- .../ddl_task/ob_index_build_task.cpp | 73 +++-- src/rootserver/ddl_task/ob_index_build_task.h | 1 + .../ddl_task/ob_table_redefinition_task.cpp | 14 +- src/share/ob_ddl_common.cpp | 60 ++-- src/share/ob_ddl_common.h | 2 +- 11 files changed, 298 insertions(+), 256 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index fee314e25..8b9756990 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -105,20 +105,7 @@ int ObDDLRedefinitionSSTableBuildTask::process() } else if (OB_FAIL(sys_variable_schema->get_oracle_mode(oracle_mode))) { LOG_WARN("get oracle mode failed", K(ret)); } else { - (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, - dest_table_id_, - task_id_, - execution_id_, - inner_sql_exec_addr_, - trace_id_, - schema_version_, - snapshot_version_, - need_exec_new_inner_sql); - if (!need_exec_new_inner_sql) { - LOG_INFO("succ to wait and complete old task finished!", K(ret)); - } else if (OB_FAIL(root_service_->get_ddl_scheduler().on_update_execution_id(task_id_, execution_id_))) { // genenal new ObIndexSSTableBuildTask::execution_id_ and persist to inner table - LOG_WARN("failed to update execution id", K(ret)); - } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, + if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, data_table_id_, dest_table_id_, schema_version_, @@ -2202,3 +2189,42 @@ int ObSyncTabletAutoincSeqCtx::call_and_process_all_tablet_autoinc_seqs(P &proxy } return ret; } + +int ObDDLRedefinitionTask::try_reap_old_replica_build_task() +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + const int64_t data_table_id = object_id_; + const int64_t dest_table_id = target_object_id_; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObIndexBuildTask has not been inited", K(ret)); + } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( + tenant_id_, schema_guard))) { + LOG_WARN("fail to get tenant schema guard", K(ret), K(data_table_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, data_table_id, table_schema))) { + LOG_WARN("get table schema failed", K(ret), K(tenant_id_), K(data_table_id)); + } else if (OB_UNLIKELY(nullptr == table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("error unexpected, table schema must not be nullptr", K(ret)); + } else { + const int64_t old_execution_id = get_execution_id(); + const ObTabletID unused_tablet_id; + const ObDDLTaskInfo unused_addition_info; + const int old_ret_code = OB_SUCCESS; + bool need_exec_new_inner_sql = true; + ObAddr invalid_addr; + (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id, + task_id_, old_execution_id, invalid_addr, trace_id_, + table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql); + if (!need_exec_new_inner_sql) { + if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) { + LOG_INFO("succ to wait and complete old task finished!", K(ret)); + } + } else { + ret = OB_ENTRY_NOT_EXIST; + } + } + return ret; +} diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index 81c3240f9..31b1395d1 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -106,7 +106,7 @@ class ObDDLRedefinitionTask : public ObDDLTask { public: explicit ObDDLRedefinitionTask(const share::ObDDLType task_type): - ObDDLTask(task_type), lock_(), wait_trans_ctx_(), sync_tablet_autoinc_seq_ctx_(), + ObDDLTask(task_type), wait_trans_ctx_(), sync_tablet_autoinc_seq_ctx_(), build_replica_request_time_(0), complete_sstable_job_ret_code_(INT64_MAX), alter_table_arg_(), dependent_task_result_map_(), snapshot_held_(false), has_synced_autoincrement_(false), has_synced_stats_info_(false), update_autoinc_job_ret_code_(INT64_MAX), update_autoinc_job_time_(0), @@ -126,6 +126,7 @@ public: virtual void flt_set_task_span_tag() const = 0; virtual void flt_set_status_span_tag() const = 0; virtual int cleanup_impl() override; + int try_reap_old_replica_build_task(); protected: int prepare(const share::ObDDLTaskStatus next_task_status); int lock_table(const share::ObDDLTaskStatus next_task_status); @@ -208,7 +209,6 @@ protected: static const int64_t MAX_DEPEND_OBJECT_COUNT = 100L; static const int64_t RETRY_INTERVAL = 1 * 1000 * 1000; // 1s static const int64_t RETRY_LIMIT = 100; - common::TCRWLock lock_; ObDDLWaitTransEndCtx wait_trans_ctx_; ObSyncTabletAutoincSeqCtx sync_tablet_autoinc_seq_ctx_; int64_t build_replica_request_time_; diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 4654edd7b..055dcbd8f 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -184,10 +184,12 @@ int ObDDLTaskQueue::add_task_to_last(ObDDLTask *task) return ret; } -int ObDDLTaskQueue::get_task(const ObDDLTaskKey &task_key, ObDDLTask *&task) +template +int ObDDLTaskQueue::modify_task(const ObDDLTaskKey &task_key, F &&op) { int ret = OB_SUCCESS; common::ObSpinLockGuard guard(lock_); + ObDDLTask *task = nullptr; if (OB_UNLIKELY(!is_inited_)) { ret = common::OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); @@ -197,14 +199,21 @@ int ObDDLTaskQueue::get_task(const ObDDLTaskKey &task_key, ObDDLTask *&task) } else if (OB_FAIL(task_map_.get_refactored(task_key, task))) { ret = OB_HASH_NOT_EXIST == ret ? OB_ENTRY_NOT_EXIST : ret; LOG_WARN("get from task map failed", K(ret), K(task_key)); + } else if (OB_ISNULL(task)) { + ret = OB_ERR_SYS; + LOG_WARN("invalid task", K(ret), K(task_key)); + } else if (OB_FAIL(op(*task))) { + LOG_WARN("failed to modify task", K(ret)); } return ret; } -int ObDDLTaskQueue::get_task(const int64_t task_id, ObDDLTask *&task) +template +int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op) { int ret = OB_SUCCESS; common::ObSpinLockGuard guard(lock_); + ObDDLTask *task = nullptr; if (OB_UNLIKELY(!is_inited_)) { ret = common::OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); @@ -214,6 +223,11 @@ int ObDDLTaskQueue::get_task(const int64_t task_id, ObDDLTask *&task) } else if (OB_FAIL(task_id_map_.get_refactored(task_id, task))) { ret = OB_HASH_NOT_EXIST == ret ? OB_ENTRY_NOT_EXIST : ret; LOG_WARN("get from task map failed", K(ret), K(task_id)); + } else if (OB_ISNULL(task)) { + ret = OB_ERR_SYS; + LOG_WARN("invalid task", K(ret), K(task_id)); + } else if (OB_FAIL(op(*task))) { + LOG_WARN("failed to modify task", K(ret), K(task_id)); } return ret; } @@ -900,7 +914,6 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id, { int ret = OB_SUCCESS; ObDDLTask *task = nullptr; - ObTableRedefinitionTask *table_redefinition_task = nullptr; int64_t table_task_status = 0; int64_t table_execution_id = 0; int64_t pos = 0; @@ -917,21 +930,23 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id, table_task_status, table_execution_id))) { LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id)); - } else if (OB_FAIL(task_queue_.get_task(task_id, task))) { - LOG_WARN("get task fail", K(ret)); - } else if (OB_ISNULL(table_redefinition_task = static_cast(task))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get task", K(ret)); } else { HEAP_VAR(ObTableRedefinitionTask, redefinition_task) { ObDDLTaskRecord task_record; common::ObArenaAllocator allocator(lib::ObLabel("copy_table_dep")); task_record.reset(); - if (OB_UNLIKELY(!table_redefinition_task->is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("table rdefinition task is not valid", K(ret)); - } else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) { - LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task)); + if (OB_FAIL(task_queue_.modify_task(task_id, [&task_record, &allocator](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + ObTableRedefinitionTask *table_redefinition_task = static_cast(&task); + if (OB_UNLIKELY(!table_redefinition_task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table rdefinition task is not valid", K(ret)); + } else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) { + LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task)); + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); } else if (OB_FAIL(redefinition_task.init(task_record))) { LOG_WARN("init table redefinition task failed", K(ret)); } else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) { @@ -979,7 +994,6 @@ int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t ten { int ret = OB_SUCCESS; ObDDLTask *task = nullptr; - ObTableRedefinitionTask *table_redefinition_task = nullptr; int64_t table_task_status = 0; int64_t table_execution_id = 0; int64_t pos = 0; @@ -996,21 +1010,23 @@ int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t ten table_task_status, table_execution_id))) { LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id)); - } else if (OB_FAIL(task_queue_.get_task(task_id, task))) { - LOG_WARN("get task fail", K(ret)); - } else if (OB_ISNULL(table_redefinition_task = static_cast(task))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get task", K(ret)); } else { HEAP_VAR(ObTableRedefinitionTask, redefinition_task) { ObDDLTaskRecord task_record; common::ObArenaAllocator allocator(lib::ObLabel("finish_redef")); task_record.reset(); - if (OB_UNLIKELY(!table_redefinition_task->is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("table rdefinition task is not valid", K(ret)); - } else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) { - LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task)); + if (OB_FAIL(task_queue_.modify_task(task_id, [&task_record, &allocator](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + ObTableRedefinitionTask *table_redefinition_task = static_cast(&task); + if (OB_UNLIKELY(!table_redefinition_task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table rdefinition task is not valid", K(ret)); + } else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) { + LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task)); + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); } else if (OB_FAIL(redefinition_task.init(task_record))) { LOG_WARN("init table redefinition task failed", K(ret)); } else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) { @@ -1939,53 +1955,17 @@ int ObDDLScheduler::on_column_checksum_calc_reply( } else if (OB_UNLIKELY(!(task_key.is_valid() && tablet_id.is_valid()))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(task_key), K(tablet_id), K(ret_code)); - } else { - ObDDLTask *ddl_task = nullptr; - if (OB_FAIL(task_queue_.get_task(task_key, ddl_task))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("get task failed", K(ret), K(task_key)); - } - } else if (OB_ISNULL(ddl_task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("index task is null", K(ret)); - } else if (ObDDLType::DDL_CREATE_INDEX != ddl_task->get_task_type()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ddl task type not global index", K(ret), K(ddl_task)); - } else if (OB_FAIL(reinterpret_cast(ddl_task)->update_column_checksum_calc_status(tablet_id, ret_code))) { - LOG_WARN("update column checksum calc status failed", K(ret)); - } - } - return ret; -} - -int ObDDLScheduler::on_update_execution_id( - const int64_t task_id, - int64_t &ret_execution_id) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("not init", K(ret)); - } else if (OB_UNLIKELY(task_id <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(task_id)); - } else { - ObDDLTask *ddl_task = nullptr; - if (OB_FAIL(task_queue_.get_task(task_id, ddl_task))) { - LOG_WARN("get task failed", K(ret), K(task_id)); - } else if (OB_ISNULL(ddl_task)) { - ret = OB_ERR_SYS; - LOG_WARN("ddl task must not be nullptr", K(ret)); - } else if (OB_FAIL(ddl_task->get_task_id() != task_id)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("update task id is diff from ddl task id", - K(ret), K(task_id), KPC(ddl_task)); - } else if (OB_FAIL(ddl_task->push_execution_id())) { - LOG_WARN("fail to push execution id", K(ret), KPC(ddl_task)); - } - if (nullptr != ddl_task) { - ret_execution_id = ddl_task->get_execution_id(); // ignore ret, if fail, take old execution id - } + } else if (OB_FAIL(task_queue_.modify_task(task_key, [&tablet_id, &ret_code](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + if (OB_UNLIKELY(ObDDLType::DDL_CREATE_INDEX != task.get_task_type())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ddl task type not global index", K(ret), K(task)); + } else if (OB_FAIL(reinterpret_cast(&task)->update_column_checksum_calc_status(tablet_id, ret_code))) { + LOG_WARN("update column checksum calc status failed", K(ret)); + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); } return ret; } @@ -2005,59 +1985,53 @@ int ObDDLScheduler::on_sstable_complement_job_reply( } else if (OB_UNLIKELY(!(task_key.is_valid() && snapshot_version > 0 && execution_id >= 0))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(task_key), K(snapshot_version), K(execution_id), K(ret_code)); - } else { - ObDDLTask *ddl_task = nullptr; - if (OB_FAIL(task_queue_.get_task(task_key, ddl_task))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("get task failed", K(ret), K(task_key)); - } - } else if (OB_ISNULL(ddl_task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("index task is null", K(ret)); - } else { - const int64_t task_type = ddl_task->get_task_type(); - switch (task_type) { - case ObDDLType::DDL_CREATE_INDEX: - if (OB_FAIL(static_cast(ddl_task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { - LOG_WARN("update complete sstable job status failed", K(ret)); - } - break; - case ObDDLType::DDL_DROP_PRIMARY_KEY: - if (OB_FAIL(static_cast(ddl_task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { - LOG_WARN("update complete sstable job status", K(ret)); - } - break; - case ObDDLType::DDL_ADD_PRIMARY_KEY: - case ObDDLType::DDL_ALTER_PRIMARY_KEY: - case ObDDLType::DDL_ALTER_PARTITION_BY: - case ObDDLType::DDL_MODIFY_COLUMN: - case ObDDLType::DDL_CONVERT_TO_CHARACTER: - case ObDDLType::DDL_TABLE_REDEFINITION: - case ObDDLType::DDL_DIRECT_LOAD: - if (OB_FAIL(static_cast(ddl_task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { - LOG_WARN("update complete sstable job status", K(ret)); - } - break; - case ObDDLType::DDL_CHECK_CONSTRAINT: - case ObDDLType::DDL_FOREIGN_KEY_CONSTRAINT: - case ObDDLType::DDL_ADD_NOT_NULL_COLUMN: - if (OB_FAIL(static_cast(ddl_task)->update_check_constraint_finish(ret_code))) { - LOG_WARN("update check constraint finish", K(ret)); - } - break; - case ObDDLType::DDL_DROP_COLUMN: - case ObDDLType::DDL_ADD_COLUMN_OFFLINE: - case ObDDLType::DDL_COLUMN_REDEFINITION: - if (OB_FAIL(static_cast(ddl_task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { - LOG_WARN("update complete sstable job status", K(ret), K(tablet_id), K(snapshot_version), K(ret_code)); - } - break; - default: - ret = OB_NOT_SUPPORTED; - LOG_WARN("not supported ddl task", K(ret), K(*ddl_task)); - break; - } - } + } else if (OB_FAIL(task_queue_.modify_task(task_key, [&tablet_id, &snapshot_version, &execution_id, &ret_code, &addition_info](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + const int64_t task_type = task.get_task_type(); + switch (task_type) { + case ObDDLType::DDL_CREATE_INDEX: + if (OB_FAIL(static_cast(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { + LOG_WARN("update complete sstable job status failed", K(ret)); + } + break; + case ObDDLType::DDL_DROP_PRIMARY_KEY: + if (OB_FAIL(static_cast(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { + LOG_WARN("update complete sstable job status", K(ret)); + } + break; + case ObDDLType::DDL_ADD_PRIMARY_KEY: + case ObDDLType::DDL_ALTER_PRIMARY_KEY: + case ObDDLType::DDL_ALTER_PARTITION_BY: + case ObDDLType::DDL_MODIFY_COLUMN: + case ObDDLType::DDL_CONVERT_TO_CHARACTER: + case ObDDLType::DDL_TABLE_REDEFINITION: + case ObDDLType::DDL_DIRECT_LOAD: + if (OB_FAIL(static_cast(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { + LOG_WARN("update complete sstable job status", K(ret)); + } + break; + case ObDDLType::DDL_CHECK_CONSTRAINT: + case ObDDLType::DDL_FOREIGN_KEY_CONSTRAINT: + case ObDDLType::DDL_ADD_NOT_NULL_COLUMN: + if (OB_FAIL(static_cast(&task)->update_check_constraint_finish(ret_code))) { + LOG_WARN("update check constraint finish", K(ret)); + } + break; + case ObDDLType::DDL_DROP_COLUMN: + case ObDDLType::DDL_ADD_COLUMN_OFFLINE: + case ObDDLType::DDL_COLUMN_REDEFINITION: + if (OB_FAIL(static_cast(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { + LOG_WARN("update complete sstable job status", K(ret), K(tablet_id), K(snapshot_version), K(ret_code)); + } + break; + default: + ret = OB_NOT_SUPPORTED; + LOG_WARN("not supported ddl task", K(ret), K(task)); + break; + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); } return ret; } @@ -2077,29 +2051,11 @@ int ObDDLScheduler::on_ddl_task_finish( LOG_WARN("invalid arguments", K(ret), K(parent_task_id), K(child_task_key)); } else { ObDDLTask *ddl_task = nullptr; - ObDDLRedefinitionTask *redefinition_task = nullptr; - if (OB_FAIL(task_queue_.get_task(parent_task_id, ddl_task))) { - if (OB_ENTRY_NOT_EXIST == ret) { - bool is_cancel = false; - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(parent_task_trace_id, is_cancel))) { - LOG_WARN("check task is canceled", K(tmp_ret), K(parent_task_trace_id)); - } - if (is_cancel || OB_ENTRY_NOT_EXIST == tmp_ret) { - ret = OB_CANCELED; - LOG_INFO("parent task is canceled, return to cleaup child task", - K(ret), K(ret_code), K(parent_task_id), K(child_task_key), K(parent_task_trace_id)); - } - } - if (OB_FAIL(ret) && OB_CANCELED != ret) { - LOG_WARN("get from task map failed", K(ret), K(parent_task_id), K(parent_task_trace_id)); - } - } else if (OB_ISNULL(ddl_task)) { - ret = OB_ERR_SYS; - LOG_WARN("ddl task must not be nullptr", K(ret)); - } else if (FALSE_IT(redefinition_task = static_cast(ddl_task))) { - } else if (OB_FAIL(redefinition_task->on_child_task_finish(child_task_key.object_id_, ret_code))) { - LOG_WARN("on child task finish failed", K(ret), K(child_task_key)); + if (OB_FAIL(task_queue_.modify_task(parent_task_id, [&child_task_key, &ret_code](ObDDLTask &task) -> int { + ObDDLRedefinitionTask *redefinition_task = static_cast(&task); + return redefinition_task->on_child_task_finish(child_task_key.object_id_, ret_code); + }))) { + LOG_WARN("failed to modify task", K(ret)); } } return ret; @@ -2116,37 +2072,31 @@ int ObDDLScheduler::notify_update_autoinc_end(const ObDDLTaskKey &task_key, } else if (OB_UNLIKELY(!task_key.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(task_key), K(ret_code)); - } else { - ObDDLTask *ddl_task = nullptr; - if (OB_FAIL(task_queue_.get_task(task_key, ddl_task))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("get task failed", K(ret), K(task_key)); - } - } else if (OB_ISNULL(ddl_task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("index task is null", K(ret)); - } else { - const int64_t task_type = ddl_task->get_task_type(); - switch (task_type) { - case ObDDLType::DDL_MODIFY_COLUMN: - case ObDDLType::DDL_ALTER_PARTITION_BY: - case ObDDLType::DDL_TABLE_REDEFINITION: - case ObDDLType::DDL_DIRECT_LOAD: - if (OB_FAIL(static_cast(ddl_task)->notify_update_autoinc_finish(autoinc_val, ret_code))) { - LOG_WARN("update complete sstable job status", K(ret)); - } - break; - case ObDDLType::DDL_MODIFY_AUTO_INCREMENT: - if (OB_FAIL(static_cast(ddl_task)->notify_update_autoinc_finish(autoinc_val, ret_code))) { - LOG_WARN("update complete sstable job status", K(ret), K(ret_code)); - } - break; - default: - ret = OB_NOT_SUPPORTED; - LOG_WARN("not supported ddl task", K(ret), K(*ddl_task)); - break; - } - } + } else if (OB_FAIL(task_queue_.modify_task(task_key, [&autoinc_val, &ret_code](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + const int64_t task_type = task.get_task_type(); + switch (task_type) { + case ObDDLType::DDL_MODIFY_COLUMN: + case ObDDLType::DDL_ALTER_PARTITION_BY: + case ObDDLType::DDL_TABLE_REDEFINITION: + case ObDDLType::DDL_DIRECT_LOAD: + if (OB_FAIL(static_cast(&task)->notify_update_autoinc_finish(autoinc_val, ret_code))) { + LOG_WARN("update complete sstable job status", K(ret)); + } + break; + case ObDDLType::DDL_MODIFY_AUTO_INCREMENT: + if (OB_FAIL(static_cast(&task)->notify_update_autoinc_finish(autoinc_val, ret_code))) { + LOG_WARN("update complete sstable job status", K(ret), K(ret_code)); + } + break; + default: + ret = OB_NOT_SUPPORTED; + LOG_WARN("not supported ddl task", K(ret), K(task)); + break; + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); } return ret; } diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index 2d812b480..f2d1fa45d 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -59,8 +59,10 @@ public: int get_next_task(ObDDLTask *&task); int remove_task(ObDDLTask *task); int add_task_to_last(ObDDLTask *task); - int get_task(const ObDDLTaskKey &task_key, ObDDLTask *&task); - int get_task(const int64_t task_id, ObDDLTask *&task); + template + int modify_task(const ObDDLTaskKey &task_key, F &&op); + template + int modify_task(const int64_t task_id, F &&op); int update_task_copy_deps_setting(const int64_t task_id, const bool is_copy_constraints, const bool is_copy_indexes, @@ -207,9 +209,6 @@ public: int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res); int update_ddl_task_active_time(const int64_t task_id); - int on_update_execution_id( - const int64_t task_id, - int64_t &ret_execution_id); int prepare_alter_table_arg(const ObPrepareAlterTableArgParam ¶m, const ObTableSchema *target_table_schema, obrpc::ObAlterTableArg &alter_table_arg); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 1781012f7..82597995d 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -1154,22 +1154,28 @@ int ObDDLTask::copy_longops_stat(ObLongopsValue &value) return ret; } -int ObDDLTask::push_execution_id() +int64_t ObDDLTask::get_execution_id() const +{ + TCRLockGuard guard(lock_); + return execution_id_; +} + +int ObDDLTask::push_execution_id(const uint64_t tenant_id, const int64_t task_id, int64_t &new_execution_id) { int ret = OB_SUCCESS; ObMySQLTransaction trans; ObRootService *root_service = nullptr; - int64_t table_task_status = 0; - int64_t table_execution_id = 0; + int64_t task_status = 0; + int64_t execution_id = 0; if (OB_ISNULL(root_service = GCTX.root_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, root service must not be nullptr", K(ret)); - } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) { + } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id))) { LOG_WARN("start transaction failed", K(ret)); } else { - if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) { + if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id, task_id, task_status, execution_id))) { LOG_WARN("select for update failed", K(ret), K(task_id)); - } else if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id_, task_id_, table_execution_id + 1))) { + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id, task_id, execution_id + 1))) { LOG_WARN("update task status failed", K(ret)); } bool commit = (OB_SUCCESS == ret); @@ -1179,9 +1185,7 @@ int ObDDLTask::push_execution_id() ret = (OB_SUCCESS == ret) ? tmp_ret : ret; } if (OB_SUCC(ret)) { - execution_id_ = table_execution_id + 1; - } else { - execution_id_ = execution_id_ >= 0 ? execution_id_ : 0; // use old execution or inner table default value(0) + new_execution_id = execution_id + 1; } } return ret; diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 9ecc32cc6..1197fce8d 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -373,7 +373,7 @@ class ObDDLTask : public common::ObDLinkBase { public: explicit ObDDLTask(const share::ObDDLType task_type) - : ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false), + : lock_(), ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false), task_type_(task_type), trace_id_(), tenant_id_(0), object_id_(0), schema_version_(0), target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), @@ -403,7 +403,6 @@ public: ObDDLTaskKey get_task_key() const { return ObDDLTaskKey(target_object_id_, schema_version_); } int64_t get_parent_task_id() const { return parent_task_id_; } int64_t get_task_version() const { return task_version_; } - int64_t get_execution_id() const { return execution_id_; } int64_t get_parallelism() const { return parallelism_; } static int deep_copy_table_arg(common::ObIAllocator &allocator, const obrpc::ObDDLArg &source_arg, @@ -438,7 +437,8 @@ public: void calc_next_schedule_ts(const int ret_code, const int64_t total_task_cnt); bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); } bool is_replica_build_need_retry(const int ret_code); - int push_execution_id(); + int64_t get_execution_id() const; + static int push_execution_id(const uint64_t tenant_id, const int64_t task_id, int64_t &new_execution_id); virtual bool support_longops_monitoring() const { return false; } int cleanup(); virtual int cleanup_impl() = 0; @@ -488,6 +488,7 @@ protected: int init_ddl_task_monitor_info(const ObTableSchema *target_schema); protected: static const int64_t MAX_ERR_TOLERANCE_CNT = 3L; // Max torlerance count for error code. + common::TCRWLock lock_; ObDDLTracing ddl_tracing_; bool is_inited_; bool need_retry_; @@ -515,7 +516,7 @@ protected: ObDDLTaskStatInfo stat_info_; int64_t delay_schedule_time_; int64_t next_schedule_ts_; - int64_t execution_id_; + int64_t execution_id_; // guarded by lock_ common::ObAddr sql_exec_addr_; int64_t cluster_version_; }; diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 9507a923f..b7d892c0b 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -82,19 +82,7 @@ int ObIndexSSTableBuildTask::process() ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, table schema must not be nullptr", K(ret)); } else { - (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id_, - task_id_, - execution_id_, - inner_sql_exec_addr_, - trace_id_, - table_schema->get_schema_version(), - snapshot_version_, - need_exec_new_inner_sql); - if (!need_exec_new_inner_sql) { - LOG_INFO("succ to wait and complete old task finished!", K(ret)); - } else if (OB_FAIL(root_service_->get_ddl_scheduler().on_update_execution_id(task_id_, execution_id_))) { // genenal new ObIndexSSTableBuildTask::execution_id_ and persist to inner table - LOG_WARN("failed to update execution id", K(ret)); - } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, data_table_id_, + if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, data_table_id_, dest_table_id_, table_schema->get_schema_version(), snapshot_version_, @@ -723,13 +711,55 @@ int ObIndexBuildTask::release_snapshot(const int64_t snapshot) return ret; } +int ObIndexBuildTask::try_reap_old_replica_build_task() +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + const int64_t data_table_id = object_id_; + const int64_t dest_table_id = target_object_id_; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObIndexBuildTask has not been inited", K(ret)); + } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( + tenant_id_, schema_guard))) { + LOG_WARN("fail to get tenant schema guard", K(ret), K(data_table_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, data_table_id, table_schema))) { + LOG_WARN("get table schema failed", K(ret), K(tenant_id_), K(data_table_id)); + } else if (OB_UNLIKELY(nullptr == table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("error unexpected, table schema must not be nullptr", K(ret)); + } else { + const int64_t old_execution_id = get_execution_id(); + const ObTabletID unused_tablet_id; + const ObDDLTaskInfo unused_addition_info; + const int old_ret_code = OB_SUCCESS; + bool need_exec_new_inner_sql = true; + ObAddr invalid_addr; + (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id, + task_id_, old_execution_id, invalid_addr, trace_id_, + table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql); + if (!need_exec_new_inner_sql) { + if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) { + LOG_INFO("succ to wait and complete old task finished!", K(ret)); + } + } else { + ret = OB_ENTRY_NOT_EXIST; + } + } + return ret; +} + // construct ObIndexSSTableBuildTask build task int ObIndexBuildTask::send_build_single_replica_request() { int ret = OB_SUCCESS; + int64_t new_execution_id = 0; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObIndexBuildTask has not been inited", K(ret)); + } else if (OB_FAIL(ObDDLTask::push_execution_id(tenant_id_, task_id_, new_execution_id))) { + LOG_WARN("failed to fetch new execution id", K(ret)); } else { if (OB_FAIL(ObDDLUtil::get_sys_ls_leader_addr(GCONF.cluster_id, tenant_id_, create_index_arg_.inner_sql_exec_addr_))) { LOG_WARN("get sys ls leader addr fail", K(ret), K(tenant_id_)); @@ -746,7 +776,7 @@ int ObIndexBuildTask::send_build_single_replica_request() target_object_id_, schema_version_, snapshot_version_, - execution_id_, + new_execution_id, trace_id_, parallelism_, root_service_, @@ -770,6 +800,7 @@ int ObIndexBuildTask::check_build_single_replica(bool &is_end) { int ret = OB_SUCCESS; is_end = false; + TCWLockGuard guard(lock_); if (INT64_MAX == complete_sstable_job_ret_code_) { // not complete } else if (OB_SUCCESS == complete_sstable_job_ret_code_) { @@ -812,7 +843,9 @@ int ObIndexBuildTask::wait_data_complement() // submit a job to complete sstable for the index table on snapshot_version if (OB_SUCC(ret) && !state_finished && !is_sstable_complete_task_submitted_) { - if (OB_FAIL(send_build_single_replica_request())) { + if (OB_SUCCESS == try_reap_old_replica_build_task()) { + state_finished = true; + } else if (OB_FAIL(send_build_single_replica_request())) { LOG_WARN("fail to send build single replica request", K(ret)); } } @@ -830,7 +863,7 @@ int ObIndexBuildTask::wait_data_complement() if (OB_SUCC(ret) && state_finished && !create_index_arg_.is_spatial_index()) { bool dummy_equal = false; if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum( - tenant_id_, execution_id_, object_id_, index_table_id_, task_id_, false/*index build*/, dummy_equal, root_service_->get_sql_proxy()))) { + tenant_id_, get_execution_id(), object_id_, index_table_id_, task_id_, false/*index build*/, dummy_equal, root_service_->get_sql_proxy()))) { if (OB_ITER_END != ret) { LOG_WARN("fail to check column checksum", K(ret), K(index_table_id_), K(object_id_), K(task_id_)); state_finished = true; @@ -973,7 +1006,7 @@ int ObIndexBuildTask::verify_checksum() bool is_column_checksum_ready = false; bool dummy_equal = false; if (!wait_column_checksum_ctx_.is_inited() && OB_FAIL(wait_column_checksum_ctx_.init( - task_id_, tenant_id_, object_id_, index_table_id_, schema_version_, check_unique_snapshot_, execution_id_, checksum_wait_timeout))) { + task_id_, tenant_id_, object_id_, index_table_id_, schema_version_, check_unique_snapshot_, get_execution_id(), checksum_wait_timeout))) { LOG_WARN("init context of wait column checksum failed", K(ret), K(object_id_), K(index_table_id_)); } else { if (OB_FAIL(wait_column_checksum_ctx_.try_wait(is_column_checksum_ready))) { @@ -986,7 +1019,7 @@ int ObIndexBuildTask::verify_checksum() // do nothing } else { if (OB_FAIL(ObDDLChecksumOperator::check_column_checksum( - tenant_id_, execution_id_, object_id_, index_table_id_, task_id_, true/*check unique index*/, dummy_equal, root_service_->get_sql_proxy()))) { + tenant_id_, get_execution_id(), object_id_, index_table_id_, task_id_, true/*check unique index*/, dummy_equal, root_service_->get_sql_proxy()))) { if (OB_CHECKSUM_ERROR == ret && is_unique_index_) { ret = OB_ERR_DUPLICATED_UNIQUE_KEY; } @@ -1035,6 +1068,7 @@ int ObIndexBuildTask::update_complete_sstable_job_status( { int ret = OB_SUCCESS; UNUSED(addition_info); + TCWLockGuard guard(lock_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -1048,7 +1082,8 @@ int ObIndexBuildTask::update_complete_sstable_job_status( ret = OB_ERR_UNEXPECTED; LOG_WARN("snapshot version not match", K(ret), K(snapshot_version), K(snapshot_version_)); } else if (execution_id < execution_id_) { - LOG_INFO("receive a mismatch execution result, ignore", K(ret_code), K(execution_id), K(execution_id_)); + ret = OB_TASK_EXPIRED; + LOG_WARN("receive a mismatch execution result", K(ret), K(ret_code), K(execution_id), K(execution_id_)); } else { complete_sstable_job_ret_code_ = ret_code; sstable_complete_ts_ = ObTimeUtility::current_time(); diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index 0af8242df..e040ebd50 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -127,6 +127,7 @@ private: const share::schema::ObTableSchema &index_schema, const share::schema::ObIndexStatus new_status); int check_health(); + int try_reap_old_replica_build_task(); int send_build_single_replica_request(); int check_build_single_replica(bool &is_end); int check_need_verify_checksum(bool &need_verify); diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 6d1505fa9..6d537394c 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -148,7 +148,8 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, snapshot version is not equal", K(ret), K(snapshot_version_), K(snapshot_version)); } else if (execution_id < execution_id_) { - LOG_INFO("receive a mismatch execution result, ignore", K(ret_code), K(execution_id), K(execution_id_)); + ret = OB_TASK_EXPIRED; + LOG_WARN("receive a mismatch execution result, ignore", K(ret_code), K(execution_id), K(execution_id_)); } else { complete_sstable_job_ret_code_ = ret_code; execution_id_ = execution_id; // update ObTableRedefinitionTask::execution_id_ from ObDDLRedefinitionSSTableBuildTask::execution_id_ @@ -185,6 +186,7 @@ int ObTableRedefinitionTask::send_build_replica_request_by_sql() bool modify_autoinc = false; bool use_heap_table_ddl_plan = false; ObRootService *root_service = GCTX.root_service_; + int64_t new_execution_id = 0; if (OB_ISNULL(root_service)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, root service must not be nullptr", K(ret)); @@ -192,6 +194,8 @@ int ObTableRedefinitionTask::send_build_replica_request_by_sql() LOG_WARN("failed to check modify autoinc", K(ret)); } else if (OB_FAIL(check_use_heap_table_ddl_plan(use_heap_table_ddl_plan))) { LOG_WARN("fail to check heap table ddl plan", K(ret)); + } else if (OB_FAIL(ObDDLTask::push_execution_id(tenant_id_, task_id_, new_execution_id))) { + LOG_WARN("failed to fetch new execution id", K(ret)); } else { ObSQLMode sql_mode = alter_table_arg_.sql_mode_; if (!modify_autoinc) { @@ -213,7 +217,7 @@ int ObTableRedefinitionTask::send_build_replica_request_by_sql() target_object_id_, schema_version_, snapshot_version_, - execution_id_, // will init in ObDDLRedefinitionSSTableBuildTask::process + new_execution_id, sql_mode, trace_id_, parallelism_, @@ -307,7 +311,9 @@ int ObTableRedefinitionTask::table_redefinition(const ObDDLTaskStatus next_task_ } if (OB_SUCC(ret) && !is_build_replica_end && 0 == build_replica_request_time_) { - if (OB_FAIL(send_build_replica_request())) { + if (OB_SUCCESS == try_reap_old_replica_build_task()) { + is_build_replica_end = true; + } else if (OB_FAIL(send_build_replica_request())) { LOG_WARN("fail to send build replica request", K(ret)); } else { build_replica_request_time_ = ObTimeUtility::current_time(); @@ -348,7 +354,7 @@ int ObTableRedefinitionTask::replica_end_check(const int ret_code) break; } default : { - if (OB_FAIL(check_data_dest_tables_columns_checksum(execution_id_))) { + if (OB_FAIL(check_data_dest_tables_columns_checksum(get_execution_id()))) { LOG_WARN("fail to check the columns checksum of data table and destination table", K(ret)); } break; diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index 62e6a1361..2d08e807f 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1019,7 +1019,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( const common::ObAddr &inner_sql_exec_addr, const common::ObCurTraceId::TraceId &trace_id, const uint64_t tenant_id, - const int64_t schema_version, + const int64_t execution_id, const int64_t scn, bool &is_old_task_session_exist) { @@ -1034,9 +1034,6 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || trace_id.is_invalid() || !inner_sql_exec_addr.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tenant_id), K(trace_id), K(inner_sql_exec_addr)); - } else if (!inner_sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr)); } else { ret = OB_SUCCESS; common::ObMySQLProxy &proxy = root_service->get_sql_proxy(); @@ -1048,21 +1045,44 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get trace id string failed", K(ret), K(trace_id)); - } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " - " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c%ld%c%ld%c\" ", - OB_ALL_VIRTUAL_SESSION_INFO_TNAME, - trace_id_str, - ip_str, - inner_sql_exec_addr.get_port(), - charater, - charater, - charater, - charater, - schema_version, - charater, - scn, - charater ))) { - LOG_WARN("assign sql string failed", K(ret)); + } else if (!inner_sql_exec_addr.is_valid()) { + if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " + " and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + charater, + charater, + execution_id, + charater, + charater, + charater, + scn, + charater ))) { + LOG_WARN("assign sql string failed", K(ret)); + } + } else { + if (!inner_sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr)); + } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " + " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + ip_str, + inner_sql_exec_addr.get_port(), + charater, + charater, + execution_id, + charater, + charater, + charater, + scn, + charater ))) { + LOG_WARN("assign sql string failed", K(ret)); + } + } + + if (OB_FAIL(ret)) { } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &inner_sql_exec_addr))) { LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL((result = res.get_result()))) { @@ -1569,7 +1589,7 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task( } else if (is_all_sstable_build_finished) { LOG_INFO("all tablet sstable has build finished"); } else { - if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, schema_version, scn, is_old_task_session_exist))) { + if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, execution_id, scn, is_old_task_session_exist))) { LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr)); } else if (!is_old_task_session_exist) { LOG_WARN("old inner sql session is not exist.", K(ret)); diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index 1731566d4..1f9b88f03 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -374,7 +374,7 @@ private: const common::ObAddr &inner_sql_exec_addr, const common::ObCurTraceId::TraceId &trace_id, const uint64_t tenant_id, - const int64_t schema_version, + const int64_t execution_id, const int64_t scn, bool &is_old_task_session_exist);