From 0d1e89d206398a903eb703572c1a0ed68e7a0388 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 7 Nov 2024 08:47:38 +0000 Subject: [PATCH] fix ddl hold snapshot repeatedly. --- .../ddl_task/ob_constraint_task.cpp | 23 +++++++--- src/rootserver/ddl_task/ob_constraint_task.h | 2 +- .../ddl_task/ob_ddl_redefinition_task.cpp | 2 +- .../ddl_task/ob_index_build_task.cpp | 23 +++++++--- src/rootserver/ddl_task/ob_index_build_task.h | 2 +- src/rootserver/ob_snapshot_info_manager.cpp | 43 +++++++++++++------ src/rootserver/ob_snapshot_info_manager.h | 8 ++++ src/share/ob_ddl_common.cpp | 18 ++++++-- src/share/ob_ddl_common.h | 1 + 9 files changed, 93 insertions(+), 29 deletions(-) diff --git a/src/rootserver/ddl_task/ob_constraint_task.cpp b/src/rootserver/ddl_task/ob_constraint_task.cpp index 9490f23c9..38ba65638 100755 --- a/src/rootserver/ddl_task/ob_constraint_task.cpp +++ b/src/rootserver/ddl_task/ob_constraint_task.cpp @@ -633,7 +633,9 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record) return ret; } -int ObConstraintTask::hold_snapshot(const int64_t snapshot_version) +int ObConstraintTask::hold_snapshot( + common::ObMySQLTransaction &trans, + const int64_t snapshot_version) { int ret = OB_SUCCESS; ObDDLService &ddl_service = root_service_->get_ddl_service(); @@ -668,7 +670,7 @@ int ObConstraintTask::hold_snapshot(const int64_t snapshot_version) OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, table_schema->get_aux_lob_piece_tid(), tablet_ids))) { LOG_WARN("failed to get data lob piece table snapshot", K(ret)); } else if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot( - ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) { + trans, SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) { LOG_WARN("acquire snapshot failed", K(ret), K(tablet_ids)); } else { snapshot_version_ = snapshot_version; @@ -742,14 +744,16 @@ int ObConstraintTask::wait_trans_end() DEBUG_SYNC(CONSTRAINT_WAIT_TRANS_END); if (OB_SUCC(ret) && new_status != CHECK_CONSTRAINT_VALID && snapshot_version_ > 0 && !snapshot_held_) { - if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(root_service_->get_sql_proxy(), + ObMySQLTransaction trans; + if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id_))) { + LOG_WARN("fail to start trans", K(ret), K(tenant_id_)); + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(trans, tenant_id_, task_id_, snapshot_version_))) { LOG_WARN("update snapshot version failed", K(ret), K(task_id_)); - } else if (OB_FAIL(hold_snapshot(snapshot_version_))) { + } else if (OB_FAIL(hold_snapshot(trans, snapshot_version_))) { if (OB_SNAPSHOT_DISCARDED == ret) { - ret = OB_SUCCESS; snapshot_held_ = false; snapshot_version_ = 0; wait_trans_ctx_.reset(); @@ -760,6 +764,15 @@ int ObConstraintTask::wait_trans_end() new_status = CHECK_CONSTRAINT_VALID; snapshot_held_ = true; } + if (trans.is_started()) { + bool need_commit = (ret == OB_SUCCESS); + int tmp_ret = trans.end(need_commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit)); + } + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + ret = OB_SNAPSHOT_DISCARDED == ret ? OB_SUCCESS : ret; } if (OB_FAIL(switch_status(new_status, true, ret))) { diff --git a/src/rootserver/ddl_task/ob_constraint_task.h b/src/rootserver/ddl_task/ob_constraint_task.h index 0a0e5c361..68b4d1324 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.h +++ b/src/rootserver/ddl_task/ob_constraint_task.h @@ -109,7 +109,7 @@ public: virtual int deserialize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) override; virtual int64_t get_serialize_param_size() const override; private: - int hold_snapshot(const int64_t snapshot_version); + int hold_snapshot(common::ObMySQLTransaction &trans, const int64_t snapshot_version); int release_snapshot(const int64_t snapshot_version); int wait_trans_end(); int validate_constraint_valid(); diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index 2a7ea04cc..52437a4c2 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -410,7 +410,7 @@ int ObDDLRedefinitionTask::hold_snapshot_for_major_refresh_mv_(const int64_t sna } if (OB_SUCC(ret)) { ObDDLService &ddl_service = root_service->get_ddl_service(); - if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot( + if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot_in_trans( ddl_service.get_sql_proxy(), SNAPSHOT_FOR_MAJOR_REFRESH_MV, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) { LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids)); diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 99309ff2b..94d68e527 100755 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -678,14 +678,16 @@ int ObIndexBuildTask::wait_trans_end() // persistent snapshot_version into inner table and hold snapshot of data_table and index table if (OB_SUCC(ret) && !state_finished && snapshot_version_ > 0 && !snapshot_held_) { - if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(root_service_->get_sql_proxy(), + ObMySQLTransaction trans; + if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id_))) { + LOG_WARN("fail to start trans", K(ret), K(tenant_id_)); + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_snapshot_version(trans, tenant_id_, task_id_, snapshot_version_))) { LOG_WARN("update snapshot version failed", K(ret), K(task_id_), K(snapshot_version_)); - } else if (OB_FAIL(hold_snapshot(snapshot_version_))) { + } else if (OB_FAIL(hold_snapshot(trans, snapshot_version_))) { if (OB_SNAPSHOT_DISCARDED == ret) { - ret = OB_SUCCESS; snapshot_version_ = 0; snapshot_held_ = false; LOG_INFO("snapshot discarded, need retry waiting trans", K(ret)); @@ -696,6 +698,15 @@ int ObIndexBuildTask::wait_trans_end() snapshot_held_ = true; state_finished = true; } + if (trans.is_started()) { + bool need_commit = (ret == OB_SUCCESS); + int tmp_ret = trans.end(need_commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit)); + } + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + ret = OB_SNAPSHOT_DISCARDED == ret ? OB_SUCCESS : ret; } if (state_finished || OB_FAIL(ret)) { @@ -708,7 +719,9 @@ int ObIndexBuildTask::wait_trans_end() return ret; } -int ObIndexBuildTask::hold_snapshot(const int64_t snapshot) +int ObIndexBuildTask::hold_snapshot( + common::ObMySQLTransaction &trans, + const int64_t snapshot) { int ret = OB_SUCCESS; SCN snapshot_scn; @@ -752,7 +765,7 @@ int ObIndexBuildTask::hold_snapshot(const int64_t snapshot) OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, data_table_schema->get_aux_lob_piece_tid(), tablet_ids))) { LOG_WARN("failed to get data lob piece table snapshot", K(ret)); } else if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot( - ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) { + trans, SNAPSHOT_FOR_DDL, tenant_id_, schema_version_, snapshot_scn, nullptr, tablet_ids))) { LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids)); } } diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index 2e24df9a8..6644e3bc1 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -137,7 +137,7 @@ private: int clean_on_failed(); int succ(); virtual int cleanup_impl() override; - int hold_snapshot(const int64_t snapshot); + int hold_snapshot(common::ObMySQLTransaction &trans, const int64_t snapshot); int release_snapshot(const int64_t snapshot); int update_index_status_in_schema( const share::schema::ObTableSchema &index_schema, diff --git a/src/rootserver/ob_snapshot_info_manager.cpp b/src/rootserver/ob_snapshot_info_manager.cpp index 8cb71d63b..005ccfe35 100644 --- a/src/rootserver/ob_snapshot_info_manager.cpp +++ b/src/rootserver/ob_snapshot_info_manager.cpp @@ -56,7 +56,7 @@ int ObSnapshotInfoManager::acquire_snapshot( return ret; } -int ObSnapshotInfoManager::batch_acquire_snapshot( +int ObSnapshotInfoManager::batch_acquire_snapshot_in_trans( common::ObMySQLProxy &proxy, share::ObSnapShotType snapshot_type, const uint64_t tenant_id, @@ -67,6 +67,33 @@ int ObSnapshotInfoManager::batch_acquire_snapshot( { int ret = OB_SUCCESS; ObMySQLTransaction trans; + if (OB_FAIL(trans.start(&proxy, tenant_id))) { + LOG_WARN("fail to start trans", K(ret), K(tenant_id)); + } else if (OB_FAIL(batch_acquire_snapshot(trans, snapshot_type, + tenant_id, schema_version, snapshot_scn, comment, tablet_ids))) { + LOG_WARN("batch add snapshot failed", K(ret)); + } + if (trans.is_started()) { + bool need_commit = (ret == OB_SUCCESS); + int tmp_ret = trans.end(need_commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(tmp_ret), K(need_commit)); + } + ret = OB_SUCC(ret) ? tmp_ret : ret; + } + return ret; +} + +int ObSnapshotInfoManager::batch_acquire_snapshot( + common::ObMySQLTransaction &trans, + share::ObSnapShotType snapshot_type, + const uint64_t tenant_id, + const int64_t schema_version, + const SCN &snapshot_scn, + const char *comment, + const common::ObIArray &tablet_ids) +{ + int ret = OB_SUCCESS; ObSnapshotTableProxy snapshot_proxy; ObSnapshotInfo snapshot; ObTimeoutCtx timeout_ctx; @@ -75,9 +102,9 @@ int ObSnapshotInfoManager::batch_acquire_snapshot( snapshot.snapshot_scn_ = snapshot_scn; snapshot.schema_version_ = schema_version; snapshot.comment_ = comment; - if (OB_UNLIKELY(!snapshot.is_valid() || tablet_ids.count() <= 0)) { + if (OB_UNLIKELY(!trans.is_started() || !snapshot.is_valid() || tablet_ids.count() <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tablet_ids.count())); + LOG_WARN("invalid argument", K(ret), "trans_started", trans.is_started(), K(snapshot), K(tablet_ids.count())); } else { int64_t rpc_timeout = 0; int64_t trx_timeout = 0; @@ -89,20 +116,10 @@ int ObSnapshotInfoManager::batch_acquire_snapshot( LOG_WARN("set trx timeout failed", K(ret), K(trx_timeout)); } else if (OB_FAIL(timeout_ctx.set_timeout(rpc_timeout))) { LOG_WARN("set timeout failed", K(ret), K(rpc_timeout)); - } else if (OB_FAIL(trans.start(&proxy, tenant_id))) { - LOG_WARN("fail to start trans", K(ret), K(tenant_id)); } else if (OB_FAIL(snapshot_proxy.batch_add_snapshot(trans, snapshot_type, tenant_id, schema_version, snapshot.snapshot_scn_, comment, tablet_ids))) { LOG_WARN("batch add snapshot failed", K(ret)); } - if (trans.is_started()) { - bool need_commit = (ret == OB_SUCCESS); - int tmp_ret = trans.end(need_commit); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("fail to end trans", K(tmp_ret), K(need_commit)); - } - ret = OB_SUCC(ret) ? tmp_ret : ret; - } ROOTSERVICE_EVENT_ADD("snapshot", "batch_acquire_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_); } diff --git a/src/rootserver/ob_snapshot_info_manager.h b/src/rootserver/ob_snapshot_info_manager.h index 43d94cbd5..a54df667f 100644 --- a/src/rootserver/ob_snapshot_info_manager.h +++ b/src/rootserver/ob_snapshot_info_manager.h @@ -62,6 +62,14 @@ public: share::ObSnapShotType snapshot_type, int64_t &count); int batch_acquire_snapshot( + common::ObMySQLTransaction &trans, + share::ObSnapShotType snapshot_type, + const uint64_t tenant_id, + const int64_t schema_version, + const share::SCN &snapshot_scn, + const char *comment, + const common::ObIArray &tablet_ids); + int batch_acquire_snapshot_in_trans( common::ObMySQLProxy &proxy, share::ObSnapShotType snapshot_type, const uint64_t tenant_id, diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index d52a686ea..83ac5f125 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1435,12 +1435,15 @@ int ObDDLUtil::obtain_snapshot( ret = OB_ERR_UNEXPECTED; LOG_WARN("snapshot version is invalid", K(ret), KPC(wait_trans_ctx)); } else if (snapshot_version > 0 && !snapshot_held) { - if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::update_snapshot_version(root_service->get_sql_proxy(), + ObMySQLTransaction trans; + if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id))) { + LOG_WARN("fail to start trans", K(ret), K(tenant_id)); + } else if (OB_FAIL(rootserver::ObDDLTaskRecordOperator::update_snapshot_version(trans, tenant_id, task->get_task_id(), snapshot_version))) { LOG_WARN("update snapshot version failed", K(ret), K(task->get_task_id()), K(tenant_id)); - } else if (OB_FAIL(hold_snapshot(task, table_id, target_table_id, root_service, snapshot_version))) { + } else if (OB_FAIL(hold_snapshot(trans, task, table_id, target_table_id, root_service, snapshot_version))) { if (OB_SNAPSHOT_DISCARDED == ret) { snapshot_version = 0; snapshot_held = false; @@ -1451,6 +1454,14 @@ int ObDDLUtil::obtain_snapshot( } else { snapshot_held = true; } + if (trans.is_started()) { + bool need_commit = (ret == OB_SUCCESS); + int tmp_ret = trans.end(need_commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(need_commit)); + } + ret = OB_SUCC(ret) ? tmp_ret : ret; + } } if (OB_FAIL(ret)) { @@ -1475,6 +1486,7 @@ int ObDDLUtil::obtain_snapshot( } int ObDDLUtil::hold_snapshot( + common::ObMySQLTransaction &trans, rootserver::ObDDLTask* task, const uint64_t table_id, const uint64_t target_table_id, @@ -1532,7 +1544,7 @@ int ObDDLUtil::hold_snapshot( } else { rootserver::ObDDLService &ddl_service = root_service->get_ddl_service(); if (OB_FAIL(ddl_service.get_snapshot_mgr().batch_acquire_snapshot( - ddl_service.get_sql_proxy(), SNAPSHOT_FOR_DDL, tenant_id, schema_version, snapshot_scn, nullptr, tablet_ids))) { + trans, SNAPSHOT_FOR_DDL, tenant_id, schema_version, snapshot_scn, nullptr, tablet_ids))) { LOG_WARN("batch acquire snapshot failed", K(ret), K(tablet_ids)); } } diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index aa8e554eb..caf88c144 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -906,6 +906,7 @@ public: bool &all_dag_exit); private: static int hold_snapshot( + common::ObMySQLTransaction &trans, rootserver::ObDDLTask* task, const uint64_t table_id, const uint64_t target_table_id,