diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_client.h b/deps/oblib/src/lib/mysqlclient/ob_isql_client.h index e90e81791e..27012dd0a1 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_client.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_client.h @@ -53,10 +53,12 @@ public: virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) = 0; virtual int read(ReadResult &res, const int64_t cluster_id, const uint64_t tenant_id, const char *sql) = 0; virtual int read(ReadResult &res, const char *sql) { return this->read(res, OB_SYS_TENANT_ID, sql); } + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) = 0; // execute update sql virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) = 0; virtual int write(const char *sql, int64_t &affected_rows) { return this->write(OB_SYS_TENANT_ID, sql, affected_rows); } + virtual int write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) = 0; // executor execute int execute(const uint64_t tenant_id, sqlclient::ObIExecutor &executor) diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection_pool.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection_pool.h index 520bf54e05..0fd2fb7201 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection_pool.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection_pool.h @@ -116,9 +116,9 @@ public: // acquired connection must be released virtual int acquire(ObISQLConnection *&conn, ObISQLClient *client_addr) { - return this->acquire(OB_INVALID_TENANT_ID, conn, client_addr); + return this->acquire(OB_INVALID_TENANT_ID, conn, client_addr, 0); } - virtual int acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr) = 0; + virtual int acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr, const int32_t group_id) = 0; virtual int release(ObISQLConnection *conn, const bool success) = 0; virtual int on_client_inactive(ObISQLClient *client_addr) = 0; virtual ObSQLConnPoolType get_type() = 0; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.cpp index 520be905db..969308ad83 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.cpp @@ -698,10 +698,11 @@ void ObMySQLConnectionPool::runTimerTask() } } -int ObMySQLConnectionPool::acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr) +int ObMySQLConnectionPool::acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr, const int32_t group_id) { int ret = OB_SUCCESS; UNUSED(client_addr); + UNUSED(group_id); ObMySQLConnection *mysql_conn = NULL; conn = NULL; if (OB_FAIL(acquire(tenant_id, mysql_conn))) { diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.h index 5289ab161d..ecec768c41 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection_pool.h @@ -140,7 +140,7 @@ public: virtual int escape(const char *from, const int64_t from_size, char *to, const int64_t to_size, int64_t &out_size); - virtual int acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr) override; + virtual int acquire(const uint64_t tenant_id, ObISQLConnection *&conn, ObISQLClient *client_addr, const int32_t group_id) override; virtual int release(ObISQLConnection *conn, const bool success) override; virtual int on_client_inactive(ObISQLClient *client_addr) override diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp index a2a7ad413c..aa0a6e214f 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp @@ -56,11 +56,11 @@ void ObCommonSqlProxy::operator=(const ObCommonSqlProxy &o) pool_ = o.pool_; } -int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const char *sql) +int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const char *sql, const int32_t group_id) { int ret = OB_SUCCESS; ObISQLConnection *conn = NULL; - if (OB_FAIL(acquire(tenant_id, conn))) { + if (OB_FAIL(acquire(tenant_id, conn, group_id))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_FAIL(read(conn, result, tenant_id, sql))) { LOG_WARN("read failed", K(ret)); @@ -76,7 +76,7 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c if (OB_ISNULL(exec_sql_addr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("read with typically exec addr failed", K(ret), K(exec_sql_addr)); - } else if (OB_FAIL(acquire(tenant_id, conn))) { + } else if (OB_FAIL(acquire(tenant_id, conn, 0/*group_id*/))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_FAIL(read(conn, result, tenant_id, sql, exec_sql_addr))) { LOG_WARN("read failed", K(ret)); @@ -89,7 +89,7 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c { int ret = OB_SUCCESS; ObISQLConnection *conn = NULL; - if (OB_FAIL(acquire(tenant_id, conn))) { + if (OB_FAIL(acquire(tenant_id, conn, 0/*group_id*/))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (nullptr != session_param) { conn->set_ddl_info(&session_param->ddl_info_); @@ -133,7 +133,7 @@ int ObCommonSqlProxy::read(ObISQLConnection *conn, ReadResult &result, return ret; } -int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) +int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) { int ret = OB_SUCCESS; int64_t start = ::oceanbase::common::ObTimeUtility::current_time(); @@ -141,7 +141,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, int64_t & if (OB_ISNULL(sql)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("empty sql"); - } else if (OB_FAIL(acquire(tenant_id, conn))) { + } else if (OB_FAIL(acquire(tenant_id, conn, group_id))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_ISNULL(conn)) { ret = OB_INNER_STAT_ERROR; @@ -171,7 +171,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, if (OB_UNLIKELY(sql.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("empty sql"); - } else if (OB_FAIL(acquire(tenant_id, conn))) { + } else if (OB_FAIL(acquire(tenant_id, conn, 0/*group_id*/))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_ISNULL(conn)) { ret = OB_INNER_STAT_ERROR; @@ -254,7 +254,7 @@ int ObCommonSqlProxy::execute(const uint64_t tenant_id, ObIExecutor &executor) int ret = OB_SUCCESS; int64_t start = ::oceanbase::common::ObTimeUtility::current_time(); ObISQLConnection *conn = NULL; - if (OB_FAIL(acquire(tenant_id, conn))) { + if (OB_FAIL(acquire(tenant_id, conn, 0/*group_id*/))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_ISNULL(conn)) { ret = OB_INNER_STAT_ERROR; @@ -336,13 +336,13 @@ int ObCommonSqlProxy::add_slashes(const char *from, const int64_t from_size, return ret; } -int ObCommonSqlProxy::acquire(const uint64_t tenant_id, sqlclient::ObISQLConnection *&conn) +int ObCommonSqlProxy::acquire(const uint64_t tenant_id, sqlclient::ObISQLConnection *&conn, const int32_t group_id) { int ret = OB_SUCCESS; if (!is_inited()) { ret = OB_NOT_INIT; LOG_WARN("mysql proxy not inited", K(ret)); - } else if (OB_FAIL(pool_->acquire(tenant_id, conn, this))) { + } else if (OB_FAIL(pool_->acquire(tenant_id, conn, this, group_id))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (OB_ISNULL(conn)) { ret = OB_ERR_UNEXPECTED; @@ -367,7 +367,7 @@ int ObCommonSqlProxy::read( } else if (NULL == sql) { ret = OB_INVALID_ARGUMENT; LOG_WARN("empty sql"); - } else if (OB_FAIL(acquire(tenant_id, conn))) { + } else if (OB_FAIL(acquire(tenant_id, conn, 0/*group_id*/))) { LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (NULL == conn) { ret = OB_INNER_STAT_ERROR; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h index a5924e8f2d..380c48d59c 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h @@ -107,9 +107,10 @@ public: virtual int escape(const char *from, const int64_t from_size, char *to, const int64_t to_size, int64_t &out_size) override; // execute query and return data result - virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override; + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override { return this->read(res, tenant_id, sql, 0/*group_id*/); } int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const ObSessionParam *session_param); int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const common::ObAddr *sql_exec_addr); + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) override; //only for across cluster //cluster_id can not GCONF.cluster_id virtual int read(ReadResult &res, @@ -118,7 +119,8 @@ public: const char *sql) override; using ObISQLClient::read; // execute update sql - virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; + virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override { return this->write(tenant_id, sql, 0/**/, affected_rows); } + virtual int write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) override; int write(const uint64_t tenant_id, const ObString sql, int64_t &affected_rows, int64_t compatibility_mode, const ObSessionParam *session_param = nullptr, const common::ObAddr *sql_exec_addr = nullptr); @@ -137,8 +139,8 @@ public: int execute(const uint64_t tenant_id, sqlclient::ObIExecutor &executor); protected: - int acquire(sqlclient::ObISQLConnection *&conn) { return this->acquire(OB_INVALID_TENANT_ID, conn); } - int acquire(const uint64_t tenant_id, sqlclient::ObISQLConnection *&conn); + int acquire(sqlclient::ObISQLConnection *&conn) { return this->acquire(OB_INVALID_TENANT_ID, conn, 0); } + int acquire(const uint64_t tenant_id, sqlclient::ObISQLConnection *&conn, const int32_t group_id); int read(sqlclient::ObISQLConnection *conn, ReadResult &result, const uint64_t tenant_id, const char *sql, const common::ObAddr *sql_exec_addr = nullptr); diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.cpp index 4555911852..41ece19084 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.cpp @@ -66,11 +66,12 @@ int ObMySQLTransaction::start_transaction( int ObMySQLTransaction::start( ObISQLClient *sql_client, const uint64_t tenant_id, - bool with_snapshot/* = false*/) + bool with_snapshot/* = false*/, + const int32_t group_id /* = 0*/) { int ret = OB_SUCCESS; start_time_ = ::oceanbase::common::ObTimeUtility::current_time(); - if (OB_FAIL(connect(tenant_id, sql_client))) { + if (OB_FAIL(connect(tenant_id, group_id, sql_client))) { LOG_WARN("failed to init", K(ret), K(tenant_id)); } else if (enable_query_stash_ && OB_FAIL(query_stash_desc_.create(1024, "BucketQueryS", "NodeQueryS"))) { LOG_WARN("failed to init map", K(ret), K(tenant_id)); diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.h index 9a802a8d6d..5a64435b5b 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_transaction.h @@ -63,7 +63,8 @@ public: // start transaction virtual int start(ObISQLClient *proxy, const uint64_t tenant_id, - bool with_snapshot = false); + bool with_snapshot = false, + const int32_t group_id = 0); virtual int start(ObISQLClient *proxy, const uint64_t &tenant_id, const int64_t &refreshed_schema_version, diff --git a/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.cpp index 4c7c63d9d6..0542ee31d4 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.cpp @@ -33,12 +33,12 @@ ObSingleConnectionProxy::~ObSingleConnectionProxy() (void)close(); } -int ObSingleConnectionProxy::connect(const uint64_t tenant_id, ObISQLClient *sql_client) +int ObSingleConnectionProxy::connect(const uint64_t tenant_id, const int32_t group_id, ObISQLClient *sql_client) { int ret = OB_SUCCESS; - if (NULL == sql_client || NULL == sql_client->get_pool()) { + if (NULL == sql_client || NULL == sql_client->get_pool() || group_id < 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(sql_client)); + LOG_WARN("invalid argument", K(sql_client), K(group_id)); } else if (NULL != pool_ || NULL != conn_) { ret = OB_INNER_STAT_ERROR; LOG_WARN("transaction can only be started once", K(tenant_id), K(pool_), K(conn_)); @@ -46,7 +46,7 @@ int ObSingleConnectionProxy::connect(const uint64_t tenant_id, ObISQLClient *sql oracle_mode_ = sql_client->is_oracle_mode(); pool_ = sql_client->get_pool(); - if (OB_FAIL(pool_->acquire(tenant_id, conn_, sql_client))) { + if (OB_FAIL(pool_->acquire(tenant_id, conn_, sql_client, group_id))) { LOG_WARN("acquire connection failed", K(ret), K(tenant_id), K(pool_)); } else if (NULL == conn_) { ret = OB_INNER_STAT_ERROR; @@ -72,10 +72,11 @@ int ObSingleConnectionProxy::connect(const uint64_t tenant_id, ObISQLClient *sql } int ObSingleConnectionProxy::read(ReadResult &res, - const uint64_t tenant_id, const char *sql) + const uint64_t tenant_id, const char *sql, const int32_t group_id) { int ret = OB_SUCCESS; res.reset(); + UNUSED(group_id); if (!check_inner_stat()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("check inner stat failed"); @@ -92,6 +93,7 @@ int ObSingleConnectionProxy::read(ReadResult &res, LOG_TRACE("execute sql", KCSTRING(sql), K(ret)); return ret; } + int ObSingleConnectionProxy::read(ReadResult &res, const int64_t cluster_id, const uint64_t tenant_id, const char *sql) @@ -116,9 +118,10 @@ int ObSingleConnectionProxy::read(ReadResult &res, } int ObSingleConnectionProxy::write( - const uint64_t tenant_id, const char *sql, int64_t &affected_rows) + const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) { int ret = OB_SUCCESS; + UNUSED(group_id); if (!check_inner_stat()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("check inner stat failed"); diff --git a/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.h index 5635fc22d4..a7f467601d 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_single_connection_proxy.h @@ -37,13 +37,15 @@ public: virtual int escape(const char *from, const int64_t from_size, char *to, const int64_t to_size, int64_t &out_size) override; // %res should be destructed before execute other sql - virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override; + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override { return this->read(res, tenant_id, sql, 0/*group_id*/); } + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) override; virtual int read(ReadResult &res, const int64_t cluster_id, const uint64_t tenant_id, const char *sql) override; - virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; + virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override { return this->write(tenant_id, sql, 0/*group_id*/, affected_rows); } + virtual int write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) override; using ObISQLClient::read; using ObISQLClient::write; - int connect(const uint64_t tenant_id, ObISQLClient *sql_client); + int connect(const uint64_t tenant_id, const int32_t group_id, ObISQLClient *sql_client); virtual sqlclient::ObISQLConnectionPool *get_pool() override { return pool_; } virtual sqlclient::ObISQLConnection *get_connection() override { return conn_; } diff --git a/mittest/simple_server/storage_ha/test_transfer_lock_info_operator.cpp b/mittest/simple_server/storage_ha/test_transfer_lock_info_operator.cpp index 977a69d94b..2c078bf59f 100644 --- a/mittest/simple_server/storage_ha/test_transfer_lock_info_operator.cpp +++ b/mittest/simple_server/storage_ha/test_transfer_lock_info_operator.cpp @@ -48,6 +48,7 @@ TEST_F(TestTransferLockInfoOperator, TransferLockInfo) src_ls_id_ = ObLSID(1001); dest_ls_id_ = ObLSID(1002); task_id_ = 1; + const int32_t group_id = 0; ObTransferLockStatus start_status = ObTransferLockStatus(ObTransferLockStatus::START); ObTransferLockStatus doing_status = ObTransferLockStatus(ObTransferLockStatus::DOING); int64_t start_src_lock_owner = 111; @@ -63,8 +64,8 @@ TEST_F(TestTransferLockInfoOperator, TransferLockInfo) common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); // insert - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::insert(start_src_lock_info, sql_proxy)); - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::insert(start_dest_lock_info, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::insert(start_src_lock_info, group_id, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::insert(start_dest_lock_info, group_id, sql_proxy)); // select ObTransferLockInfoRowKey src_row_key; @@ -77,8 +78,8 @@ TEST_F(TestTransferLockInfoOperator, TransferLockInfo) ObTransferTaskLockInfo new_start_src_lock_info; ObTransferTaskLockInfo new_start_dest_lock_info; - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, new_start_src_lock_info, sql_proxy)); - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::get(dest_row_key, task_id_, start_status, false, new_start_dest_lock_info, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, group_id, new_start_src_lock_info, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::get(dest_row_key, task_id_, start_status, false, group_id, new_start_dest_lock_info, sql_proxy)); LOG_INFO("[MITTEST]transfer_lock_info", K(new_start_src_lock_info)); ASSERT_EQ(new_start_src_lock_info.tenant_id_, start_src_lock_info.tenant_id_); @@ -95,11 +96,11 @@ TEST_F(TestTransferLockInfoOperator, TransferLockInfo) ASSERT_EQ(new_start_dest_lock_info.lock_owner_, start_dest_lock_info.lock_owner_); // remove - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::remove(tenant_id_, src_ls_id_, task_id_, start_status, sql_proxy)); - ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, new_start_src_lock_info, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::remove(tenant_id_, src_ls_id_, task_id_, start_status, group_id, sql_proxy)); + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, group_id, new_start_src_lock_info, sql_proxy)); - ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::remove(tenant_id_, dest_ls_id_, task_id_, start_status, sql_proxy)); - ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, new_start_dest_lock_info, sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObTransferLockInfoOperator::remove(tenant_id_, dest_ls_id_, task_id_, start_status, group_id, sql_proxy)); + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferLockInfoOperator::get(src_row_key, task_id_, start_status, false, group_id, new_start_dest_lock_info, sql_proxy)); } } // namespace diff --git a/mittest/simple_server/test_ob_tablet_to_ls_operator.cpp b/mittest/simple_server/test_ob_tablet_to_ls_operator.cpp index 16fc588a41..cc4e5c305d 100644 --- a/mittest/simple_server/test_ob_tablet_to_ls_operator.cpp +++ b/mittest/simple_server/test_ob_tablet_to_ls_operator.cpp @@ -56,6 +56,7 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) int64_t new_transfer_seq = 1; int64_t ls_id = ObLSID::INVALID_LS_ID; int64_t transfer_seq = -1; + const int32_t group_id = 0; info.reset(); info.init(t1, ls1, table_id, old_transfer_seq); ls_infos.push_back(info); @@ -80,7 +81,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: invalid old_transfer_seq ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -90,7 +92,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) -1, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: invalid new_transfer_seq ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -100,7 +103,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, -1, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: old_transfer_seq == new_transfer_seq ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -110,7 +114,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, old_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: invalid tablet_id ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -120,7 +125,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: invalid old_ls_id ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -130,7 +136,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ObLSID(), new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: invalid new_ls_id ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -140,7 +147,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ObLSID()); + ObLSID(), + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_INVALID_ARGUMENT: old_ls_id == new_ls_id ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -150,7 +158,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls1); + ls1, + group_id); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); // test OB_ENTRY_NOT_EXIST: t1 is not in ls3 @@ -161,7 +170,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls3, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret); // test OB_ENTRY_NOT_EXIST: t1's transfer_seq should be 0, but here it's 3 old_transfer_seq = 3; @@ -172,7 +182,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret); // test OB_SUCCESS: transfer t1 from ls1 to ls2 @@ -184,7 +195,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_SUCCESS, ret); // test OB_ENTRY_NOT_EXIST: t1 is not in ls1 ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -194,7 +206,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret); // test OB_SUCCESS: transfer t1 from ls2 to ls3 old_transfer_seq = new_transfer_seq; @@ -206,7 +219,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls2, new_transfer_seq, - ls3); + ls3, + group_id); ASSERT_EQ(OB_SUCCESS, ret); // test OB_SUCCESS: transfer t2 from ls2 to ls1 old_transfer_seq = 0; @@ -218,7 +232,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls2, new_transfer_seq, - ls1); + ls1, + group_id); ASSERT_EQ(OB_SUCCESS, ret); // test OB_ENTRY_NOT_EXIST: t2 is not in ls2 ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -228,7 +243,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls2, new_transfer_seq, - ls1); + ls1, + group_id); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret); // test OB_SUCCESS: transfer t2 from ls1 to ls2 old_transfer_seq = new_transfer_seq; @@ -240,7 +256,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls1, new_transfer_seq, - ls2); + ls2, + group_id); ASSERT_EQ(OB_SUCCESS, ret); // test OB_SUCCESS: transfer t3 from ls3 to ls1 old_transfer_seq = 0; @@ -252,7 +269,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls3, new_transfer_seq, - ls1); + ls1, + group_id); ASSERT_EQ(OB_SUCCESS, ret); // test OB_ENTRY_NOT_EXIST: t3 is not in ls3 ret = ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( @@ -262,7 +280,8 @@ TEST_F(TestTabletToLSOperator, UpdateLSAndTransSeq) old_transfer_seq, ls3, new_transfer_seq, - ls1); + ls1, + group_id); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ret); // test final result in table __all_tablet_to_ls diff --git a/mittest/simple_server/test_tenant_transfer_service.cpp b/mittest/simple_server/test_tenant_transfer_service.cpp index bcb25b8e36..0bdd557e4d 100644 --- a/mittest/simple_server/test_tenant_transfer_service.cpp +++ b/mittest/simple_server/test_tenant_transfer_service.cpp @@ -176,7 +176,7 @@ TEST_F(TestTenantTransferService, test_service) } } ObTransferTask task; - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_status().is_init_status() || task.get_status().is_start_status()); ASSERT_TRUE(task.get_part_list().count() == g_part_list.count()); ARRAY_FOREACH(g_part_list, idx) { @@ -191,7 +191,7 @@ TEST_F(TestTenantTransferService, test_service) task.reset(); ObArenaAllocator allocator; ObString tablet_list_str; - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/)); LOG_INFO("generate tablet list", K(task)); ASSERT_TRUE(task.is_valid()); const ObTransferTabletList &tablet_list = task.get_tablet_list(); @@ -213,11 +213,11 @@ TEST_F(TestTenantTransferService, test_service) ASSERT_EQ(OB_SUCCESS, tenant_transfer->try_cancel_transfer_task(ObTransferTaskID(555))); // task which does not exist will be canceled successfully ASSERT_EQ(OB_OP_NOT_ALLOW, tenant_transfer->try_cancel_transfer_task(aborted_task_id)); ASSERT_EQ(OB_SUCCESS, tenant_transfer->try_cancel_transfer_task(init_task_id)); - ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, init_task_id, false, init_task)); + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, init_task_id, false, init_task, 0/*group_id*/)); // try clear transfer task task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/)); ASSERT_EQ(OB_SUCCESS, ret); sql.reset(); ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("update oceanbase.__all_transfer_task set status = 'COMPLETED' where task_id = %ld", task.get_task_id().id())); @@ -241,7 +241,7 @@ TEST_F(TestTenantTransferService, test_service) ASSERT_EQ(OB_SUCCESS, finished_part_list.to_display_str(allocator, finished_part_list_str)); LOG_WARN("finished_part_list", K(finished_part_list_str)); ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500003,500002:500004,500016:500014,500016:500015")); - ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task)); + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/)); create_time = OB_INVALID_TIMESTAMP; finish_time = OB_INVALID_TIMESTAMP; ObTransferTask history_task; @@ -274,7 +274,7 @@ TEST_F(TestTenantTransferService, test_batch_part_list) } } ObTransferTask task; - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, batch_task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, batch_task_id, false, task, 0/*group_id*/)); ASSERT_TRUE(ObTenantTransferService::PART_COUNT_IN_A_TRANSFER == task.get_part_list().count()); ARRAY_FOREACH(task.get_part_list(), idx) { ASSERT_TRUE(is_contain(g_batch_part_list, task.get_part_list().at(idx))); diff --git a/mittest/simple_server/test_transfer_task_operator.cpp b/mittest/simple_server/test_transfer_task_operator.cpp index 45164bb034..21cd6bee0c 100644 --- a/mittest/simple_server/test_transfer_task_operator.cpp +++ b/mittest/simple_server/test_transfer_task_operator.cpp @@ -237,16 +237,16 @@ TEST_F(TestTransferTaskOperator, test_operator) // get ObTransferTask task; - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_task_id() == task_id_); ASSERT_TRUE(task.get_tablet_list().empty()); ASSERT_TRUE(0 == strcmp(transfer_task_comment_to_str(task.get_comment()), "Task canceled")); LOG_INFO("get from table", K(task)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, true, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, true, task, 0/*group_id*/)); LOG_INFO("get from table", K(task)); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(sql_proxy, tenant_id_, - ObTransferTaskID(555), false, task)); + ObTransferTaskID(555), false, task, 0/*group_id*/)); // get_task_with_time int64_t create_time = OB_INVALID_TIMESTAMP; @@ -266,66 +266,66 @@ TEST_F(TestTransferTaskOperator, test_operator) // get by dest_ls task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, dest_ls_, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, dest_ls_, task, 0/*group_id*/)); ASSERT_TRUE(task_id_ == task.get_task_id()); - ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, ObLSID(555), task)); + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, ObLSID(555), task, 0/*group_id*/)); ObTransferTask dup_dest_ls_task; ASSERT_EQ(OB_SUCCESS, dup_dest_ls_task.init(ObTransferTaskID(2223), ObLSID(1003), ObLSID(1004), part_list_, ObTransferStatus(ObTransferStatus::INIT), trace_id_, ObBalanceTaskID(2))); ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(sql_proxy, tenant_id_, dup_dest_ls_task)); task.reset(); - ASSERT_EQ(OB_ERR_UNEXPECTED, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, ObLSID(1004), task)); + ASSERT_EQ(OB_ERR_UNEXPECTED, ObTransferTaskOperator::get_by_dest_ls(sql_proxy, tenant_id_, ObLSID(1004), task, 0/*group_id*/)); // update_to_start_status ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_to_start_status(sql_proxy, tenant_id_,task_id_, ObTransferStatus(ObTransferStatus::INIT), part_list_, not_exist_part_list_, lock_conflict_part_list_, table_lock_tablet_list_, tablet_list_, ObTransferStatus(ObTransferStatus::START), lock_owner_id_)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task, 0/*group_id*/)); ASSERT_TRUE(!task.get_tablet_list().empty()); LOG_INFO("update to start status", K(task)); ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::update_to_start_status(sql_proxy, tenant_id_,task_id_, ObTransferStatus(ObTransferStatus::ABORTED), part_list_, not_exist_part_list_, lock_conflict_part_list_, table_lock_tablet_list_, tablet_list_, ObTransferStatus(ObTransferStatus::START), lock_owner_id_)); // update start_scn - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_start_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::START), start_scn_)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_start_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::START), start_scn_, 0/*group_id*/)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_start_scn() == start_scn_); //ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::update_start_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::ABORTED), start_scn_)); // update status ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_status_and_result(sql_proxy, tenant_id_, task_id_, - ObTransferStatus(ObTransferStatus::START), ObTransferStatus(ObTransferStatus::DOING), OB_SUCCESS)); + ObTransferStatus(ObTransferStatus::START), ObTransferStatus(ObTransferStatus::DOING), OB_SUCCESS, 0/*group_id*/)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_status().status() == ObTransferStatus::DOING); //ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::update_status(sql_proxy, tenant_id_, task_id_, // ObTransferStatus(ObTransferStatus::ABORTED), ObTransferStatus(ObTransferStatus::ABORTED))); // finish task ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::finish_task(sql_proxy, tenant_id_, other_task_id, - ObTransferStatus(ObTransferStatus::INIT), ObTransferStatus(ObTransferStatus::COMPLETED), OB_SUCCESS, ObTransferTaskComment::EMPTY_COMMENT)); + ObTransferStatus(ObTransferStatus::INIT), ObTransferStatus(ObTransferStatus::COMPLETED), OB_SUCCESS, ObTransferTaskComment::EMPTY_COMMENT, 0/*group_id*/)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, other_task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, other_task_id, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_status().status() == ObTransferStatus::COMPLETED); ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::finish_task(sql_proxy, tenant_id_, other_task_id, - ObTransferStatus(ObTransferStatus::ABORTED), ObTransferStatus(ObTransferStatus::ABORTED), OB_SUCCESS, ObTransferTaskComment::EMPTY_COMMENT)); + ObTransferStatus(ObTransferStatus::ABORTED), ObTransferStatus(ObTransferStatus::ABORTED), OB_SUCCESS, ObTransferTaskComment::EMPTY_COMMENT, 0/*group_id*/)); // finish task from init ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::finish_task_from_init(sql_proxy, tenant_id_, dup_dest_ls_task.get_task_id(), ObTransferStatus(ObTransferStatus::INIT), part_list_, not_exist_part_list_, lock_conflict_part_list_, ObTransferStatus(ObTransferStatus::COMPLETED), OB_SUCCESS, ObTransferTaskComment::TASK_COMPLETED_AS_NO_VALID_PARTITION)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, other_task_id, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, other_task_id, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_status().status() == ObTransferStatus::COMPLETED); ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::finish_task_from_init(sql_proxy, tenant_id_, dup_dest_ls_task.get_task_id(), ObTransferStatus(ObTransferStatus::ABORTED), part_list_, not_exist_part_list_, lock_conflict_part_list_, ObTransferStatus(ObTransferStatus::COMPLETED), OB_SUCCESS, ObTransferTaskComment::TASK_COMPLETED_AS_NO_VALID_PARTITION)); // update finish_scn - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_finish_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::DOING), finish_scn_)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::update_finish_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::DOING), finish_scn_, 0/*group_id*/)); task.reset(); - ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(sql_proxy, tenant_id_, task_id_, false, task, 0/*group_id*/)); ASSERT_TRUE(task.get_finish_scn() == finish_scn_); - ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::update_finish_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::ABORTED), finish_scn_)); + ASSERT_EQ(OB_STATE_NOT_MATCH, ObTransferTaskOperator::update_finish_scn(sql_proxy, tenant_id_, task_id_, ObTransferStatus(ObTransferStatus::ABORTED), finish_scn_, 0/*group_id*/)); // get_all_task_status task_status.reset(); diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index e4128f4eae..9bd0c48601 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -156,7 +156,8 @@ ObInnerSQLConnection::ObInnerSQLConnection() last_query_timestamp_(0), force_remote_execute_(false), force_no_reuse_(false), - use_external_session_(false) + use_external_session_(false), + group_id_(0) { } @@ -187,7 +188,8 @@ int ObInnerSQLConnection::init(ObInnerSQLConnectionPool *pool, ObISQLClient *client_addr, /* = NULL */ ObRestoreSQLModifier *sql_modifier /* = NULL */, const bool use_static_engine /* = false */, - const bool is_oracle_mode /* = false */) + const bool is_oracle_mode /* = false */, + const int32_t group_id /* = 0*/) { int ret = OB_SUCCESS; if (inited_) { @@ -222,6 +224,7 @@ int ObInnerSQLConnection::init(ObInnerSQLConnectionPool *pool, if (OB_FAIL(init_session(extern_session, use_static_engine))) { LOG_WARN("init session failed", K(ret)); } else { + group_id_ = group_id; inited_ = true; } } @@ -679,6 +682,7 @@ int ObInnerSQLConnection::do_query(sqlclient::ObIExecutor &executor, ObInnerSQLR LOG_WARN("executor execute failed", K(ret)); } else { ObSQLSessionInfo &session = res.result_set().get_session(); + session.set_expect_group_id(group_id_); if (OB_ISNULL(res.sql_ctx().schema_guard_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema guard is null"); @@ -1044,7 +1048,7 @@ int ObInnerSQLConnection::start_transaction_inner( LOG_WARN("fail to set tz info wrap", K(ret)); } else if (FALSE_IT(handler->get_result()->set_conn_id(OB_INVALID_ID))) { } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(resource_server_addr).by(tenant_id). - timeout(query_timeout). + timeout(query_timeout).group_id(group_id_). inner_sql_sync_transmit( arg, *(handler->get_result()), handler->get_handle()))) { LOG_WARN("inner_sql_sync_transmit process failed", K(ret), K(tenant_id)); @@ -1224,6 +1228,7 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id, } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()) .by(tenant_id) .timeout(query_timeout) + .group_id(group_id_) .inner_sql_sync_transmit(arg, *(handler->get_result()), handler->get_handle()))) { @@ -1294,6 +1299,7 @@ int ObInnerSQLConnection::rollback() LOG_WARN("fail to set tz info wrap", K(ret)); } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()).by(OB_SYS_TENANT_ID). timeout(query_timeout). + group_id(group_id_). inner_sql_sync_transmit( arg, *(handler->get_result()), handler->get_handle()))) { LOG_WARN("inner_sql_sync_transmit process failed", @@ -1364,7 +1370,7 @@ int ObInnerSQLConnection::commit() } else if (OB_FAIL(arg.set_tz_info_wrap(get_session().get_tz_info_wrap()))) { LOG_WARN("fail to set tz info wrap", K(ret)); } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()).by(OB_SYS_TENANT_ID). - timeout(query_timeout). + timeout(query_timeout).group_id(group_id_). inner_sql_sync_transmit( arg, *(handler->get_result()), handler->get_handle()))) { LOG_WARN("inner_sql_sync_transmit process failed", @@ -1503,13 +1509,14 @@ int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const Ob get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); ObInnerSqlRpcStreamHandle *handler = res.remote_result_set().get_stream_handler(); + if (OB_ISNULL(handler)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("handler is null ptr", K(ret)); } else if (OB_FAIL(arg.set_tz_info_wrap(get_session().get_tz_info_wrap()))) { LOG_WARN("fail to set tz info wrap", K(ret)); } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()).by(tenant_id). - timeout(query_timeout).inner_sql_sync_transmit( + timeout(query_timeout).group_id(group_id_).inner_sql_sync_transmit( arg, *(handler->get_result()), handler->get_handle()))) { // complement data for offline ddl may exceed rpc default timeout, thus need to set it to a bigger value for this proxy. LOG_WARN("inner_sql_sync_transmit process failed", K(ret), K(tenant_id)); @@ -1618,6 +1625,7 @@ int ObInnerSQLConnection::execute_read_inner(const int64_t cluster_id, static_assert(ctx_size <= ObISQLClient::ReadResult::BUF_SIZE, "buffer not enough"); ObSqlQueryExecutor executor(sql); const bool local_execute = is_local_execute(cluster_id, tenant_id); + if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("connection not inited", K(ret)); @@ -1704,7 +1712,7 @@ int ObInnerSQLConnection::execute_read_inner(const int64_t cluster_id, } else if (OB_FAIL(arg.set_tz_info_wrap(get_session().get_tz_info_wrap()))) { LOG_WARN("fail to set tz info wrap", K(ret)); } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()). - dst_cluster_id(cluster_id).by(tenant_id).timeout(query_timeout). + dst_cluster_id(cluster_id).by(tenant_id).timeout(query_timeout).group_id(group_id_). inner_sql_sync_transmit( arg, *(handler->get_result()), handler->get_handle()))) { LOG_WARN("inner_sql_sync_transmit process failed", K(ret), K(tenant_id), K(cluster_id)); diff --git a/src/observer/ob_inner_sql_connection.h b/src/observer/ob_inner_sql_connection.h index 8d2a9ab532..6af20ef4ac 100644 --- a/src/observer/ob_inner_sql_connection.h +++ b/src/observer/ob_inner_sql_connection.h @@ -141,7 +141,8 @@ public: ObISQLClient *client_addr = NULL, ObRestoreSQLModifier *sql_modifer = NULL, const bool use_static_engine = false, - const bool is_oracle_mode = false); + const bool is_oracle_mode = false, + const int32_t group_id = 0); int destroy(void); inline void reset() { destroy(); } virtual int execute_read(const uint64_t tenant_id, const char *sql, @@ -414,6 +415,7 @@ private: // ask the inner sql connection to use external session instead of internal one // this enables show session / kill session using sql query command bool use_external_session_; + int32_t group_id_; DISABLE_COPY_ASSIGN(ObInnerSQLConnection); }; diff --git a/src/observer/ob_inner_sql_connection_pool.cpp b/src/observer/ob_inner_sql_connection_pool.cpp index 3662bbf997..1dce1ffe4f 100644 --- a/src/observer/ob_inner_sql_connection_pool.cpp +++ b/src/observer/ob_inner_sql_connection_pool.cpp @@ -91,7 +91,7 @@ int ObInnerSQLConnectionPool::init(ObMultiVersionSchemaService *schema_service, return ret; } -int ObInnerSQLConnectionPool::acquire(const uint64_t tenant_id, common::sqlclient::ObISQLConnection *&conn, ObISQLClient *client_addr) +int ObInnerSQLConnectionPool::acquire(const uint64_t tenant_id, common::sqlclient::ObISQLConnection *&conn, ObISQLClient *client_addr, const int32_t group_id) { int ret = OB_SUCCESS; UNUSED(tenant_id); @@ -102,7 +102,8 @@ int ObInnerSQLConnectionPool::acquire(const uint64_t tenant_id, common::sqlclien } else if (OB_FAIL(alloc_conn(inner_sql_conn))) { LOG_WARN("alloc connection from pool failed", K(ret)); } else if (OB_FAIL(inner_sql_conn->init(this, schema_service_, ob_sql_, vt_iter_creator_, - config_, nullptr /* session_info */, client_addr, nullptr/*sql modifer*/, is_ddl_))) { + config_, nullptr /* session_info */, client_addr, nullptr/*sql modifer*/, is_ddl_, + false /*is_oracle_mode*/, group_id))) { LOG_WARN("init connection failed", K(ret)); } else if (OB_FAIL(add_to_used_conn_list(inner_sql_conn))) { LOG_WARN("add_to_used_conn_list failed", K(ret)); diff --git a/src/observer/ob_inner_sql_connection_pool.h b/src/observer/ob_inner_sql_connection_pool.h index 4a4de39f47..a09a3d4540 100644 --- a/src/observer/ob_inner_sql_connection_pool.h +++ b/src/observer/ob_inner_sql_connection_pool.h @@ -74,7 +74,7 @@ public: char *to, const int64_t to_size, int64_t &out_size); // acquired connection must be released - virtual int acquire(const uint64_t tenant_id, common::sqlclient::ObISQLConnection *&conn, ObISQLClient *client_addr) override; + virtual int acquire(const uint64_t tenant_id, common::sqlclient::ObISQLConnection *&conn, ObISQLClient *client_addr, const int32_t group_id) override; virtual int release(common::sqlclient::ObISQLConnection *conn, const bool success); int acquire_spi_conn(sql::ObSQLSessionInfo *session_info, observer::ObInnerSQLConnection *&conn); int acquire(sql::ObSQLSessionInfo *session_info, diff --git a/src/observer/ob_sql_client_decorator.cpp b/src/observer/ob_sql_client_decorator.cpp index ec78a9e256..57c3a6d14a 100644 --- a/src/observer/ob_sql_client_decorator.cpp +++ b/src/observer/ob_sql_client_decorator.cpp @@ -39,30 +39,30 @@ int ObSQLClientRetry::read(ReadResult &res, const int64_t cluster_id, const uint return OB_NOT_SUPPORTED; } -int ObSQLClientRetry::read(ReadResult &res, const uint64_t tenant_id, const char *sql) +int ObSQLClientRetry::read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) { int ret = OB_SUCCESS; if (OB_ISNULL(sql_client_)) { ret = OB_INNER_STAT_ERROR; } else { - ret = sql_client_->read(res, tenant_id, sql); + ret = sql_client_->read(res, tenant_id, sql, group_id); if (OB_FAIL(ret)) { for (int32_t retry = 0; retry < retry_limit_ && OB_SUCCESS != ret; retry++) { LOG_WARN("retry execute query when failed", K(ret), K(retry), K_(retry_limit), K(sql)); - ret = sql_client_->read(res, tenant_id, sql); + ret = sql_client_->read(res, tenant_id, sql, group_id); } } } return ret; } -int ObSQLClientRetry::write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) +int ObSQLClientRetry::write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) { int ret = OB_SUCCESS; if (OB_ISNULL(sql_client_)) { ret = OB_INNER_STAT_ERROR; } else { - ret = sql_client_->write(tenant_id, sql, affected_rows); + ret = sql_client_->write(tenant_id, sql, group_id, affected_rows); } return ret; } @@ -139,7 +139,7 @@ int ObSQLClientRetryWeak::read(ReadResult &res, const int64_t cluster_id, const return OB_NOT_SUPPORTED; } -int ObSQLClientRetryWeak::read(ReadResult &res, const uint64_t tenant_id, const char *sql) +int ObSQLClientRetryWeak::read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) { int ret = OB_SUCCESS; if (OB_ISNULL(sql_client_)) { @@ -147,13 +147,13 @@ int ObSQLClientRetryWeak::read(ReadResult &res, const uint64_t tenant_id, const } else { // normal read if (check_sys_variable_) { - ret = sql_client_->read(res, tenant_id, sql); + ret = sql_client_->read(res, tenant_id, sql, group_id); } else { sqlclient::ObISQLConnection *conn = sql_client_->get_connection(); ObSingleConnectionProxy single_conn_proxy; if (OB_NOT_NULL(conn)) { // for transaction - } else if (OB_FAIL(single_conn_proxy.connect(tenant_id, sql_client_))) { + } else if (OB_FAIL(single_conn_proxy.connect(tenant_id, group_id, sql_client_))) { LOG_WARN("failed to get mysql connect", KR(ret), K(tenant_id)); } else { conn = single_conn_proxy.get_connection(); @@ -166,13 +166,13 @@ int ObSQLClientRetryWeak::read(ReadResult &res, const uint64_t tenant_id, const return ret; } -int ObSQLClientRetryWeak::write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) +int ObSQLClientRetryWeak::write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) { int ret = OB_SUCCESS; if (OB_ISNULL(sql_client_)) { ret = OB_INNER_STAT_ERROR; } else { - ret = sql_client_->write(tenant_id, sql, affected_rows); + ret = sql_client_->write(tenant_id, sql, group_id, affected_rows); } return ret; } diff --git a/src/observer/ob_sql_client_decorator.h b/src/observer/ob_sql_client_decorator.h index 0c1a2cab2b..dbaab24d87 100644 --- a/src/observer/ob_sql_client_decorator.h +++ b/src/observer/ob_sql_client_decorator.h @@ -30,9 +30,12 @@ public: virtual int escape(const char *from, const int64_t from_size, char *to, const int64_t to_size, int64_t &out_size) override; - virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override; + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override { return this->read(res, tenant_id, sql, 0 /*group_id*/); } + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) override; virtual int read(ReadResult &res, const int64_t cluster_id, const uint64_t tenant_id, const char *sql) override; - virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; + virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override { return this->write(tenant_id, sql, 0/*group_id*/, affected_rows); } + virtual int write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) override; + virtual sqlclient::ObISQLConnectionPool *get_pool() override; virtual sqlclient::ObISQLConnection *get_connection() override; using ObISQLClient::read; @@ -86,9 +89,11 @@ public: virtual int escape(const char *from, const int64_t from_size, char *to, const int64_t to_size, int64_t &out_size) override; - virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override; + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override { return this->read(res, tenant_id, sql, 0 /*group_id*/); } + virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const int32_t group_id) override; virtual int read(ReadResult &res, const int64_t cluster_id, const uint64_t tenant_id, const char *sql) override; - virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; + virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override { return this->write(tenant_id, sql, 0/*group_id*/, affected_rows); } + virtual int write(const uint64_t tenant_id, const char *sql, const int32_t group_id, int64_t &affected_rows) override; using ObISQLClient::read; using ObISQLClient::write; diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 7be38fa1ce..8baeced910 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -32974,10 +32974,11 @@ int ObDDLSQLTransaction::start(ObISQLClient *proxy, int ObDDLSQLTransaction::start( ObISQLClient *proxy, const uint64_t tenant_id, - bool with_snapshot /*= false*/) + bool with_snapshot /*= false*/, + const int32_t group_id /* = 0*/) { int ret = OB_NOT_SUPPORTED; - UNUSEDx(proxy, with_snapshot, tenant_id); + UNUSEDx(proxy, with_snapshot, tenant_id, group_id); return ret; } diff --git a/src/rootserver/ob_ddl_service.h b/src/rootserver/ob_ddl_service.h index b1603bab89..edfb832799 100644 --- a/src/rootserver/ob_ddl_service.h +++ b/src/rootserver/ob_ddl_service.h @@ -2614,7 +2614,8 @@ public: bool with_snapshot = false) override; virtual int start(ObISQLClient *proxy, const uint64_t tenant_id, - bool with_snapshot = false) override; + bool with_snapshot = false, + const int32_t group_id = 0) override; static int lock_all_ddl_operation( common::ObMySQLTransaction &trans, const uint64_t tenant_id); diff --git a/src/rootserver/ob_tenant_transfer_service.cpp b/src/rootserver/ob_tenant_transfer_service.cpp index 53f77d1e68..f073386f9f 100644 --- a/src/rootserver/ob_tenant_transfer_service.cpp +++ b/src/rootserver/ob_tenant_transfer_service.cpp @@ -205,7 +205,8 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id) tenant_id_, task_id, true/*for_update*/, - task))) { + task, + 0/*group_id*/))) { LOG_WARN("fail to get task", KR(ret), K_(tenant_id), K(task_id), K(task)); } else if (OB_UNLIKELY(!task.is_valid())) { ret = OB_INVALID_ARGUMENT; @@ -1104,7 +1105,8 @@ int ObTenantTransferService::try_cancel_transfer_task(const ObTransferTaskID tas tenant_id_, task_id, true/*for_update*/, - task))) { + task, + 0/*group_id*/))) { if (OB_ENTRY_NOT_EXIST != ret) { LOG_WARN("fail to get task", KR(ret), K_(tenant_id), K(task_id), K(task)); } else { diff --git a/src/share/ob_max_id_fetcher.cpp b/src/share/ob_max_id_fetcher.cpp index cfc154c7e8..3247138e06 100755 --- a/src/share/ob_max_id_fetcher.cpp +++ b/src/share/ob_max_id_fetcher.cpp @@ -87,7 +87,14 @@ const char *ObMaxIdFetcher::max_id_name_info_[OB_MAX_ID_TYPE][2] = { lib::ObMutex ObMaxIdFetcher::mutex_bucket_[MAX_TENANT_MUTEX_BUCKET_CNT]; ObMaxIdFetcher::ObMaxIdFetcher(ObMySQLProxy &proxy) - : proxy_(proxy) + : proxy_(proxy), + group_id_(0) +{ +} + +ObMaxIdFetcher::ObMaxIdFetcher(ObMySQLProxy &proxy, const int32_t group_id) + : proxy_(proxy), + group_id_(group_id) { } @@ -432,7 +439,7 @@ int ObMaxIdFetcher::update_max_id(ObISQLClient &sql_client, const uint64_t tenan zone.ptr(), id_name, ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))) { LOG_WARN("sql_string append format string failed", K(ret)); - } else if (OB_FAIL(sql_client.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("sql client write fail", K(sql), K(affected_rows), K(ret)); } else if (!is_single_row(affected_rows)) { ret = OB_INNER_STAT_ERROR; @@ -535,7 +542,7 @@ int ObMaxIdFetcher::insert_initial_value(common::ObISQLClient &sql_client, uint6 zone.ptr(), name, obj.get_type(), static_cast(value), info))) { LOG_WARN("sql string assign failed", K(ret)); - } else if (OB_FAIL(sql_client.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", K(ret)); } return ret; diff --git a/src/share/ob_max_id_fetcher.h b/src/share/ob_max_id_fetcher.h index 33f61e4971..f55fef8310 100755 --- a/src/share/ob_max_id_fetcher.h +++ b/src/share/ob_max_id_fetcher.h @@ -82,6 +82,7 @@ class ObMaxIdFetcher { public: explicit ObMaxIdFetcher(common::ObMySQLProxy &proxy); + explicit ObMaxIdFetcher(common::ObMySQLProxy &proxy, const int32_t group_id); virtual ~ObMaxIdFetcher(); // For generate new object_ids @@ -114,6 +115,7 @@ private: private: common::ObMySQLProxy &proxy_; static lib::ObMutex mutex_bucket_[MAX_TENANT_MUTEX_BUCKET_CNT]; + int32_t group_id_; DISALLOW_COPY_AND_ASSIGN(ObMaxIdFetcher); }; diff --git a/src/share/ob_primary_standby_service.cpp b/src/share/ob_primary_standby_service.cpp index 75e0224029..7b0b006dac 100644 --- a/src/share/ob_primary_standby_service.cpp +++ b/src/share/ob_primary_standby_service.cpp @@ -447,6 +447,7 @@ int ObPrimaryStandbyService::switch_to_standby( { int ret = OB_SUCCESS; ObAllTenantInfo tenant_info; + const int32_t group_id = 0; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(inited)); @@ -505,7 +506,7 @@ int ObPrimaryStandbyService::switch_to_standby( } else if (compat_version < DATA_VERSION_4_2_0_0) { //Regardless of the data_version change and switchover concurrency scenario, //if there is concurrency, the member_list lock that has not been released by the operation and maintenance process - } else if (OB_FAIL(ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(tenant_id, *sql_proxy_))) { + } else if (OB_FAIL(ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(tenant_id, group_id, *sql_proxy_))) { LOG_WARN("failed to unlock member list when switch to standby", K(ret), K(tenant_id)); } if (FAILEDx(role_transition_service.switchover_update_tenant_status(tenant_id, diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index 1059233792..81519e4961 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -16,4 +16,6 @@ CGID_DEF(OBCG_MYSQL_LOGIN, 12) CGID_DEF(OBCG_CDCSERVICE, 13) CGID_DEF(OBCG_DIAG_TENANT, 14) CGID_DEF(OBCG_WR, 15) +CGID_DEF(OBCG_STORAGE_HA_LEVEL1, 16) +CGID_DEF(OBCG_STORAGE_HA_LEVEL2, 17) CGID_DEF(OBCG_LQ, 100) diff --git a/src/share/tablet/ob_tablet_to_ls_operator.cpp b/src/share/tablet/ob_tablet_to_ls_operator.cpp index 485f130a9c..50522517a1 100644 --- a/src/share/tablet/ob_tablet_to_ls_operator.cpp +++ b/src/share/tablet/ob_tablet_to_ls_operator.cpp @@ -420,7 +420,8 @@ int ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( const int64_t old_transfer_seq, const ObLSID &old_ls_id, const int64_t new_transfer_seq, - const ObLSID &new_ls_id) + const ObLSID &new_ls_id, + const int32_t group_id) { int ret = OB_SUCCESS; ObSqlString sql; @@ -431,10 +432,11 @@ int ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( || !tablet_id.is_valid() || !old_ls_id.is_valid() || !new_ls_id.is_valid() - || old_ls_id == new_ls_id)) { + || old_ls_id == new_ls_id + || group_id < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), - K(tablet_id), K(old_transfer_seq), K(new_transfer_seq), K(old_ls_id), K(new_ls_id)); + K(tablet_id), K(old_transfer_seq), K(new_transfer_seq), K(old_ls_id), K(new_ls_id), K(group_id)); } else if (OB_FAIL(sql.append_fmt( "UPDATE %s SET transfer_seq = %ld, ls_id = %ld " "WHERE tablet_id = %lu AND transfer_seq = %ld AND ls_id = %ld", @@ -448,7 +450,7 @@ int ObTabletToLSTableOperator::update_ls_id_and_transfer_seq( K(tablet_id), K(old_ls_id), K(new_ls_id), K(old_transfer_seq), K(new_transfer_seq)); } else { int64_t affected_rows = 0; - if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), group_id, affected_rows))) { LOG_WARN("fail to write sql", KR(ret), K(sql), K(tenant_id)); } else if (0 == affected_rows) { ret = OB_ENTRY_NOT_EXIST; diff --git a/src/share/tablet/ob_tablet_to_ls_operator.h b/src/share/tablet/ob_tablet_to_ls_operator.h index 1815549607..d33d4cf7c1 100644 --- a/src/share/tablet/ob_tablet_to_ls_operator.h +++ b/src/share/tablet/ob_tablet_to_ls_operator.h @@ -116,6 +116,7 @@ public: // @param[in] old_ls_id old LS ID // @param[in] new_transfer_seq new Transfer Sequence // @param[in] new_ls_id new LS ID + // @param[in] group_id rpc queue id // // @ret OB_SUCCESS the updation is successful // @ret OB_ENTRY_NOT_EXIST affected rows = 0, @@ -130,7 +131,8 @@ public: const int64_t old_transfer_seq, const ObLSID &old_ls_id, const int64_t new_transfer_seq, - const ObLSID &new_ls_id); + const ObLSID &new_ls_id, + const int32_t group_id); // Get rows from __all_tablet_to_ls according to ObTableIDs // // @param [in] sql_proxy, ObMySQLProxy or ObMySQLTransaction diff --git a/src/share/transfer/ob_transfer_task_operator.cpp b/src/share/transfer/ob_transfer_task_operator.cpp index d30436ed50..4eabd062f8 100644 --- a/src/share/transfer/ob_transfer_task_operator.cpp +++ b/src/share/transfer/ob_transfer_task_operator.cpp @@ -30,19 +30,20 @@ int ObTransferTaskOperator::get( const uint64_t tenant_id, const ObTransferTaskID task_id, const bool for_update, - ObTransferTask &task) + ObTransferTask &task, + const int32_t group_id) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !task_id.is_valid())) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !task_id.is_valid() || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(task_id), K(for_update)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(task_id), K(for_update), K(group_id)); } else { ObSqlString sql; SMART_VAR(ObISQLClient::ReadResult, result) { if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE task_id = %ld%s", OB_ALL_TRANSFER_TASK_TNAME, task_id.id(), for_update ? " FOR UPDATE" : ""))) { LOG_WARN("fail to assign sql", KR(ret), K(task_id), K(for_update)); - } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr(), group_id))) { LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -171,14 +172,15 @@ int ObTransferTaskOperator::get_by_src_ls( common::ObISQLClient &sql_proxy, const uint64_t tenant_id, const ObLSID &src_ls, - ObTransferTask &task) + ObTransferTask &task, + const int32_t group_id) { int ret = OB_SUCCESS; const bool is_src_ls = true; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !src_ls.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls)); - } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, src_ls, is_src_ls, task))) { + } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, src_ls, is_src_ls, group_id, task))) { LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(src_ls)); } return ret; @@ -188,14 +190,15 @@ int ObTransferTaskOperator::get_by_dest_ls( common::ObISQLClient &sql_proxy, const uint64_t tenant_id, const ObLSID &dest_ls, - ObTransferTask &task) + ObTransferTask &task, + const int32_t group_id) { int ret = OB_SUCCESS; const bool is_src_ls = false; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !dest_ls.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(dest_ls)); - } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, dest_ls, is_src_ls, task))) { + } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, dest_ls, is_src_ls, group_id, task))) { LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(dest_ls)); } return ret; @@ -395,7 +398,8 @@ int ObTransferTaskOperator::finish_task( const ObTransferStatus &old_status, const ObTransferStatus &new_status, const int result, - const ObTransferTaskComment &comment) + const ObTransferTaskComment &comment, + const int32_t group_id) { int ret = OB_SUCCESS; bool can_change = false; @@ -432,7 +436,7 @@ int ObTransferTaskOperator::finish_task( LOG_WARN("fail to splice update sql", KR(ret), K(tenant_id), K(sql)); } else if (OB_FAIL(sql.append_fmt(" AND status='%s'", old_status.str()))) { LOG_WARN("fail to append fmt", KR(ret), K(tenant_id), K(sql), K(old_status)); - } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), group_id, affected_rows))) { LOG_WARN("fail to write sql", KR(ret), K(tenant_id), K(sql), K(affected_rows)); } else if (OB_UNLIKELY(1 != affected_rows)) { ret = OB_STATE_NOT_MATCH; @@ -552,14 +556,16 @@ int ObTransferTaskOperator::update_status_and_result( const ObTransferTaskID task_id, const ObTransferStatus &old_status, const ObTransferStatus &new_status, - const int result) + const int result, + const int32_t group_id) { int ret = OB_SUCCESS; bool can_change = false; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) - || !task_id.is_valid())) { + || !task_id.is_valid() + || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(task_id)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(task_id), K(group_id)); } else if (OB_FAIL(ObTransferStatusHelper::check_can_change_status( old_status, new_status, @@ -584,7 +590,7 @@ int ObTransferTaskOperator::update_status_and_result( LOG_WARN("fail to splice update sql", KR(ret), K(tenant_id), K(sql)); } else if (OB_FAIL(sql.append_fmt(" AND status='%s'", old_status.str()))) { LOG_WARN("fail to append fmt", KR(ret), K(tenant_id), K(sql), K(old_status)); - } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), group_id, affected_rows))) { LOG_WARN("fail to write sql", KR(ret), K(tenant_id), K(sql), K(affected_rows)); } else if (OB_UNLIKELY(1 != affected_rows && 0 != affected_rows)) { ret = OB_STATE_NOT_MATCH; @@ -603,16 +609,18 @@ int ObTransferTaskOperator::update_start_scn( const uint64_t tenant_id, const ObTransferTaskID task_id, const ObTransferStatus &old_status, - const share::SCN &start_scn) + const share::SCN &start_scn, + const int32_t group_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !task_id.is_valid() || !old_status.is_valid() - || !start_scn.is_valid())) { + || !start_scn.is_valid() + || group_id < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), - K(tenant_id), K(task_id), K(old_status), K(start_scn)); + K(tenant_id), K(task_id), K(old_status), K(start_scn), K(group_id)); } else { ObSqlString sql; ObDMLSqlSplicer dml_splicer; @@ -627,7 +635,7 @@ int ObTransferTaskOperator::update_start_scn( LOG_WARN("fail to splice update sql", KR(ret), K(tenant_id), K(sql)); } else if (OB_FAIL(sql.append_fmt(" AND status='%s'", old_status.str()))) { LOG_WARN("fail to append fmt", KR(ret), K(tenant_id), K(sql), K(old_status)); - } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), group_id, affected_rows))) { LOG_WARN("fail to write sql", KR(ret), K(tenant_id), K(sql), K(affected_rows)); } else if (OB_UNLIKELY(1 != affected_rows && 0 != affected_rows)) { ret = OB_STATE_NOT_MATCH; @@ -645,16 +653,18 @@ int ObTransferTaskOperator::update_finish_scn( const uint64_t tenant_id, const ObTransferTaskID task_id, const ObTransferStatus &old_status, - const share::SCN &finish_scn) + const share::SCN &finish_scn, + const int32_t group_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !task_id.is_valid() || !old_status.is_valid() - || !finish_scn.is_valid())) { + || !finish_scn.is_valid() + || group_id < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), - K(tenant_id), K(task_id), K(old_status), K(finish_scn)); + K(tenant_id), K(task_id), K(old_status), K(finish_scn), K(group_id)); } else { ObSqlString sql; ObDMLSqlSplicer dml_splicer; @@ -669,7 +679,7 @@ int ObTransferTaskOperator::update_finish_scn( LOG_WARN("fail to splice update sql", KR(ret), K(tenant_id), K(sql)); } else if (OB_FAIL(sql.append_fmt(" AND status='%s'", old_status.str()))) { LOG_WARN("fail to append fmt", KR(ret), K(tenant_id), K(sql), K(old_status)); - } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), group_id, affected_rows))) { LOG_WARN("fail to write sql", KR(ret), K(tenant_id), K(sql), K(affected_rows)); } else if (OB_UNLIKELY(1 != affected_rows)) { ret = OB_STATE_NOT_MATCH; @@ -687,12 +697,13 @@ int ObTransferTaskOperator::get_by_ls_id_( const uint64_t tenant_id, const ObLSID &ls_id, const bool is_src_ls, + const int32_t group_id, ObTransferTask &task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid())) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid() || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(group_id)); } else { ObSqlString sql; SMART_VAR(ObISQLClient::ReadResult, result) { @@ -708,7 +719,7 @@ int ObTransferTaskOperator::get_by_ls_id_( } } - if (FAILEDx(sql_proxy.read(result, tenant_id, sql.ptr()))) { + if (FAILEDx(sql_proxy.read(result, tenant_id, sql.ptr(), group_id))) { LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/transfer/ob_transfer_task_operator.h b/src/share/transfer/ob_transfer_task_operator.h index e2a9614e26..de17c19936 100644 --- a/src/share/transfer/ob_transfer_task_operator.h +++ b/src/share/transfer/ob_transfer_task_operator.h @@ -46,6 +46,7 @@ public: * @param [in] task_id: target task_id * @param [in] for_update: select for update * @param [out] task: transfer task + * @param [in] group_id: rpc queue id * @return * - OB_ENTRY_NOT_EXIST: not found * - OB_SUCCESS: successful @@ -56,7 +57,8 @@ public: const uint64_t tenant_id, const ObTransferTaskID task_id, const bool for_update, - ObTransferTask &task); + ObTransferTask &task, + const int32_t group_id); /* * get a transfer task by task_id with create_time and finish_time @@ -123,6 +125,7 @@ public: * @param [in] tenant_id: target tenant_id * @param [in] src_ls: src ls_id * @param [out] task: transfer task + * @param [in] group_id: rpc queue id * @return * - OB_ENTRY_NOT_EXIST: not found * - OB_ERR_UNEXPECTED: more than 1 transfer task on a ls @@ -133,7 +136,8 @@ public: common::ObISQLClient &sql_proxy, const uint64_t tenant_id, const ObLSID &src_ls, - ObTransferTask &task); + ObTransferTask &task, + const int32_t group_id); /* * get transfer task by dest ls (there is no more than 1 transfer task on a ls) @@ -142,6 +146,7 @@ public: * @param [in] tenant_id: target tenant_id * @param [in] dest_ls: destination ls_id * @param [out] task: transfer task + * @param [in] group_id: rpc queue id * @return * - OB_ENTRY_NOT_EXIST: not found * - OB_ERR_UNEXPECTED: more than 1 transfer task on a ls @@ -152,7 +157,8 @@ public: common::ObISQLClient &sql_proxy, const uint64_t tenant_id, const ObLSID &dest_ls, - ObTransferTask &task); + ObTransferTask &task, + const int32_t group_id); /* * insert task @@ -211,6 +217,7 @@ public: * @param [in] new_status: new task status * @param [in] result: return code for the transfer process * @param [in] comment: information for task finish + * @param [in] group_id: rpc queue id * @return * - OB_STATE_NOT_MATCH: task not found or task status mismatch * - OB_SUCCESS: successful @@ -223,7 +230,8 @@ public: const ObTransferStatus &old_status, const ObTransferStatus &new_status, const int result, - const ObTransferTaskComment &comment); + const ObTransferTaskComment &comment, + const int32_t group_id); /* * finish task from INIT status to COMPLETED when part_list is all unreachable @@ -280,6 +288,7 @@ public: * @param [in] old_status: old task status * @param [in] new_status: new task status * @param [in] result: task result + * @param [in] group_id: rpc queue id * @return * - OB_STATE_NOT_MATCH: task not found or task status mismatch * - OB_SUCCESS: successful @@ -291,7 +300,8 @@ public: const ObTransferTaskID task_id, const ObTransferStatus &old_status, const ObTransferStatus &new_status, - const int result); + const int result, + const int32_t group_id); /* @@ -302,6 +312,7 @@ public: * @param [in] task_id: target task_id * @param [in] old_status: old task status * @param [in] start_scn: start scn + * @param [in] group_id: rpc queue id, default is 0 * @return * - OB_STATE_NOT_MATCH: task not found or task status mismatch * - OB_SUCCESS: successful @@ -312,7 +323,8 @@ public: const uint64_t tenant_id, const ObTransferTaskID task_id, const ObTransferStatus &old_status, - const share::SCN &start_scn); + const share::SCN &start_scn, + const int32_t group_id); /* * update finish_scn @@ -322,6 +334,7 @@ public: * @param [in] task_id: target task_id * @param [in] old_status: old task status * @param [in] finish_scn: finish scn + * @param [in] group_id: rpc queue id, default is 0 * @return * - OB_STATE_NOT_MATCH: task not found or task status mismatch * - OB_SUCCESS: successful @@ -332,7 +345,8 @@ public: const uint64_t tenant_id, const ObTransferTaskID, const ObTransferStatus &old_status, - const share::SCN &finish_scn); + const share::SCN &finish_scn, + const int32_t group_id); /* * record transfer task in __all_transfer_task_history @@ -415,6 +429,7 @@ private: const uint64_t tenant_id, const ObLSID &ls_id, const bool is_src_ls, + const int32_t group_id, ObTransferTask &task); static int construct_transfer_tasks_( common::sqlclient::ObMySQLResult &res, diff --git a/src/sql/executor/ob_direct_receive_op.cpp b/src/sql/executor/ob_direct_receive_op.cpp index 09b4a11b31..9ead7afd7f 100644 --- a/src/sql/executor/ob_direct_receive_op.cpp +++ b/src/sql/executor/ob_direct_receive_op.cpp @@ -145,12 +145,14 @@ int ObDirectReceiveOp::inner_close() LOG_WARN("session or plan ctx or rpc is NULL", K(ret)); } else { ObQueryRetryInfo retry_info; + const int32_t group_id = OB_INVALID_ID == session->get_expect_group_id() ? 0 : session->get_expect_group_id(); ObExecutorRpcCtx rpc_ctx(session->get_effective_tenant_id(), plan_ctx->get_timeout_timestamp(), ctx_.get_task_exec_ctx().get_min_cluster_version(), &retry_info, session, - plan_ctx->is_plain_select_stmt()); + plan_ctx->is_plain_select_stmt(), + group_id); int tmp_ret = rpc->task_kill(rpc_ctx, resp_handler->get_task_id(), resp_handler->get_dst_addr()); if (OB_SUCCESS != tmp_ret) { LOG_WARN("kill task failed", K(tmp_ret), diff --git a/src/sql/executor/ob_executor_rpc_impl.cpp b/src/sql/executor/ob_executor_rpc_impl.cpp index b8299edabd..0eac7e2789 100644 --- a/src/sql/executor/ob_executor_rpc_impl.cpp +++ b/src/sql/executor/ob_executor_rpc_impl.cpp @@ -54,6 +54,7 @@ int ObExecutorRpcImpl::task_execute(ObExecutorRpcCtx &rpc_ctx, RemoteStreamHandle &real_handler = handler.get_remote_stream_handle(); RemoteStreamHandle::MyHandle &h = real_handler.get_handle(); int64_t timeout_timestamp = rpc_ctx.get_timeout_timestamp(); + const int32_t group_id = rpc_ctx.get_group_id(); if (OB_ISNULL(proxy_) || OB_ISNULL(real_handler.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("not init", K(ret), K_(proxy), "result", real_handler.get_result()); @@ -71,6 +72,7 @@ int ObExecutorRpcImpl::task_execute(ObExecutorRpcCtx &rpc_ctx, } else if (OB_FAIL(to_proxy .by(tenant_id) .timeout(timeout) + .group_id(group_id) .task_execute(task, *real_handler.get_result(), h))) { LOG_WARN("rpc task_execute fail", K(ret), @@ -121,6 +123,7 @@ int ObExecutorRpcImpl::task_execute_v2(ObExecutorRpcCtx &rpc_ctx, RemoteStreamHandleV2 &real_handler = handler.get_remote_stream_handle_v2(); RemoteStreamHandleV2::MyHandle &h = real_handler.get_handle(); int64_t timeout_timestamp = rpc_ctx.get_timeout_timestamp(); + const int32_t group_id = rpc_ctx.get_group_id(); if (OB_ISNULL(proxy_) || OB_ISNULL(real_handler.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("not init", K(ret), K_(proxy), "result", real_handler.get_result()); @@ -135,6 +138,7 @@ int ObExecutorRpcImpl::task_execute_v2(ObExecutorRpcCtx &rpc_ctx, } else if (OB_FAIL(to_proxy .by(tenant_id) .timeout(timeout) + .group_id(group_id) .remote_task_execute(task, *real_handler.get_result(), h))) { LOG_WARN("rpc task_execute fail", K(ret), K(tenant_id), K(svr), K(timeout), K(timeout_timestamp)); diff --git a/src/sql/executor/ob_executor_rpc_impl.h b/src/sql/executor/ob_executor_rpc_impl.h index 08146aff97..f730b854a8 100644 --- a/src/sql/executor/ob_executor_rpc_impl.h +++ b/src/sql/executor/ob_executor_rpc_impl.h @@ -233,13 +233,15 @@ public: uint64_t min_cluster_version, ObQueryRetryInfo *retry_info, ObSQLSessionInfo *session, - bool is_plain_select) + bool is_plain_select, + int32_t group_id) : rpc_tenant_id_(rpc_tenant_id), timeout_timestamp_(timeout_timestamp), min_cluster_version_(min_cluster_version), retry_info_(retry_info), session_(session), - is_plain_select_(is_plain_select) + is_plain_select_(is_plain_select), + group_id_(group_id) { } ~ObExecutorRpcCtx() {} @@ -256,11 +258,13 @@ public: inline ObQueryRetryInfo *get_retry_info_for_update() const { return retry_info_; } bool is_retry_for_rpc_timeout() const { return is_plain_select_; } int check_status() const; + int32_t get_group_id() const { return group_id_; } TO_STRING_KV(K_(rpc_tenant_id), K_(timeout_timestamp), K_(min_cluster_version), K_(retry_info), - K_(is_plain_select)); + K_(is_plain_select), + K_(group_id)); private: uint64_t rpc_tenant_id_; int64_t timeout_timestamp_; @@ -269,6 +273,7 @@ private: ObQueryRetryInfo *retry_info_; const ObSQLSessionInfo *session_;//该类中的变量会并发访问,注意session成功并发访问是否正确 bool is_plain_select_;//stmt_type == T_SELECT && not select...for update + int32_t group_id_; private: DISALLOW_COPY_AND_ASSIGN(ObExecutorRpcCtx); }; diff --git a/src/sql/executor/ob_remote_scheduler.cpp b/src/sql/executor/ob_remote_scheduler.cpp index ddca225e56..d24d9d298b 100644 --- a/src/sql/executor/ob_remote_scheduler.cpp +++ b/src/sql/executor/ob_remote_scheduler.cpp @@ -233,12 +233,14 @@ int ObRemoteScheduler::execute_with_sql(ObExecContext &ctx, ObPhysicalPlan *phy_ } if (OB_SUCC(ret)) { ObScanner *scanner = NULL; + const int32_t group_id = OB_INVALID_ID == session->get_expect_group_id() ? 0 : session->get_expect_group_id(); ObExecutorRpcCtx rpc_ctx(session->get_rpc_tenant_id(), plan_ctx->get_timeout_timestamp(), ctx.get_task_exec_ctx().get_min_cluster_version(), retry_info, ctx.get_my_session(), - plan_ctx->is_plain_select_stmt()); + plan_ctx->is_plain_select_stmt(), + group_id); if (OB_FAIL(rpc->task_execute_v2(rpc_ctx, task, task.get_runner_svr(), diff --git a/src/sql/executor/ob_remote_task_executor.cpp b/src/sql/executor/ob_remote_task_executor.cpp index 622028e615..672fc0a302 100644 --- a/src/sql/executor/ob_remote_task_executor.cpp +++ b/src/sql/executor/ob_remote_task_executor.cpp @@ -63,12 +63,14 @@ int ObRemoteTaskExecutor::execute(ObExecContext &query_ctx, ObJob *job, ObTaskIn } else { // 将task_info设成OB_TASK_STATE_RUNNING状态,后面如果重试可能会用到该状态 task_info->set_state(OB_TASK_STATE_RUNNING); + const int32_t group_id = OB_INVALID_ID == session->get_expect_group_id() ? 0 : session->get_expect_group_id(); ObExecutorRpcCtx rpc_ctx(session->get_rpc_tenant_id(), plan_ctx->get_timeout_timestamp(), query_ctx.get_task_exec_ctx().get_min_cluster_version(), retry_info, query_ctx.get_my_session(), - plan_ctx->is_plain_select_stmt()); + plan_ctx->is_plain_select_stmt(), + group_id); if (OB_FAIL(rpc->task_execute(rpc_ctx, task, task_info->get_task_location().get_server(), diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index df2a115587..3081c9b5f9 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -373,7 +373,7 @@ int ObTxFinishTransfer::get_transfer_tablet_info_from_inner_table_( if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(*sql_proxy_, tenant_id, task_id, for_update, transfer_task))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(*sql_proxy_, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(tablet_list.assign(transfer_task.get_tablet_list()))) { LOG_WARN("failed to assign tablet_list", KR(ret), K(transfer_task)); @@ -788,7 +788,7 @@ int ObTxFinishTransfer::unlock_ls_member_list_(const uint64_t tenant_id, const s { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::unlock_ls_member_list( - tenant_id, ls_id, task_id_.id(), member_list, status, *sql_proxy_))) { + tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(ls_id), K_(task_id), K(status)); } return ret; @@ -799,7 +799,7 @@ int ObTxFinishTransfer::lock_ls_member_list_(const uint64_t tenant_id, const sha { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list( - tenant_id, ls_id, task_id_.id(), member_list, status, *sql_proxy_))) { + tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list)); } else { #ifdef ERRSIM @@ -888,16 +888,16 @@ int ObTxFinishTransfer::update_transfer_task_result_(const ObTransferTaskID &tas const bool for_update = true; ObTransferStatus next_status; next_status = OB_SUCCESS == result ? ObTransferStatus::COMPLETED : ObTransferStatus::DOING; - if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task))) { + if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to get transfer task", K(ret), K(task_id), K(tenant_id)); } else if (transfer_task.get_start_scn() >= finish_scn) { ret = OB_ERR_UNEXPECTED; LOG_WARN("finish scn not expected", K(ret), K(transfer_task), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::update_finish_scn( - trans, tenant_id, task_id, transfer_task.get_status(), finish_scn))) { + trans, tenant_id, task_id, transfer_task.get_status(), finish_scn, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to update finish scn", K(ret), K(tenant_id), K(task_id), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::finish_task( - trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, ObTransferTaskComment::EMPTY_COMMENT))) { + trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, ObTransferTaskComment::EMPTY_COMMENT, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to finish task", K(ret), K(tenant_id), K(task_id)); } #ifdef ERRSIM @@ -1046,7 +1046,7 @@ int ObTxFinishTransfer::start_trans_( LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout)); } else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) { LOG_WARN("set timeout context failed", K(ret)); - } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id))) { + } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, false/*with_snapshot*/, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to start trans", K(ret), K(tenant_id)); } else { LOG_INFO("start trans", K(tenant_id)); @@ -1087,7 +1087,7 @@ int ObTxFinishTransfer::select_transfer_task_for_update_(const ObTransferTaskID if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arg", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (!task.get_status().is_doing_status()) { ret = OB_STATE_NOT_MATCH; diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index 3ef69cb0a9..ad5fc4e22e 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -170,7 +170,7 @@ int ObTransferHandler::get_transfer_task_from_inner_table_( if (! task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arg", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE_HA_LEVEL1))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -218,7 +218,7 @@ int ObTransferHandler::fetch_transfer_task_from_inner_table_by_src_ls_( const ObLSID &src_ls_id = ls_->get_ls_id(); ObTransferTask task; if (OB_FAIL(ObTransferTaskOperator::get_by_src_ls( - *sql_proxy_, tenant_id, src_ls_id, task))) { + *sql_proxy_, tenant_id, src_ls_id, task, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(src_ls_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -246,7 +246,7 @@ int ObTransferHandler::fetch_transfer_task_from_inner_table_by_dest_ls_( const ObLSID &dest_ls_id = ls_->get_ls_id(); ObTransferTask task; if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls( - *sql_proxy_, tenant_id, dest_ls_id, task))) { + *sql_proxy_, tenant_id, dest_ls_id, task, share::OBCG_STORAGE_HA_LEVEL2))) { LOG_WARN("failed to get transfer task by dest ls", K(ret), K(tenant_id), K(dest_ls_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -565,7 +565,7 @@ int ObTransferHandler::lock_src_and_dest_ls_member_list_( } else if (OB_FAIL(lock_ls_list.push_back(dest_ls_id))) { LOG_WARN("failed to push back", K(ret), K(dest_ls_id)); } else if (OB_FAIL(ObMemberListLockUtils::batch_lock_ls_member_list(tenant_id, task_id, - lock_ls_list, member_list, status, *sql_proxy_))) { + lock_ls_list, member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { LOG_WARN("failed to batch lock ls member list", K(ret)); } else if (OB_FAIL(check_ls_member_list_same_(src_ls_id, dest_ls_id, member_list, is_same))) { LOG_WARN("failed to check ls member listsame", K(ret), K(src_ls_id), K(dest_ls_id)); @@ -647,7 +647,7 @@ int ObTransferHandler::inner_unlock_ls_member_list_( const int64_t task_id = task_info.task_id_.id(); const ObTransferLockStatus status(ObTransferLockStatus::START); if (OB_FAIL(ObMemberListLockUtils::unlock_ls_member_list( - tenant_id, ls_id, task_id, member_list, status, *sql_proxy_))) { + tenant_id, ls_id, task_id, member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { LOG_WARN("failed to lock ls member list", K(ret), K(task_info), K(ls_id), K(member_list)); } return ret; @@ -926,6 +926,8 @@ int ObTransferHandler::start_trans_( omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); int64_t stmt_timeout = 10_s; const int64_t LOCK_MEMBER_LIST_TIMEOUT = 10_s; + const bool with_snapshot = false; + const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL1; if (tenant_config.is_valid()) { stmt_timeout = tenant_config->_transfer_start_trans_timeout + LOCK_MEMBER_LIST_TIMEOUT; if (tenant_config->_enable_balance_kill_transaction) { @@ -941,7 +943,7 @@ int ObTransferHandler::start_trans_( LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout)); } else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) { LOG_WARN("set timeout context failed", K(ret)); - } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id))) { + } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, with_snapshot, group_id))) { LOG_WARN("failed to start trans", K(ret)); } return ret; @@ -1508,7 +1510,7 @@ int ObTransferHandler::update_all_tablet_to_ls_( const ObTransferTabletInfo &tablet_info = task_info.tablet_list_.at(i); if (OB_FAIL(ObTabletToLSTableOperator::update_ls_id_and_transfer_seq(trans, task_info.tenant_id_, tablet_info.tablet_id_, tablet_info.transfer_seq_, task_info.src_ls_id_, - tablet_info.transfer_seq_ + 1, task_info.dest_ls_id_))) { + tablet_info.transfer_seq_ + 1, task_info.dest_ls_id_, share::OBCG_STORAGE_HA_LEVEL1))) { LOG_WARN("failed to update ls id and transfer seq", K(ret), K(tablet_info), K(task_info)); } } @@ -1577,7 +1579,7 @@ int ObTransferHandler::update_transfer_status_( } else if (!task_info.is_valid() || !next_status.is_valid()) { LOG_WARN("update transfer status get invalid argument", K(ret), K(task_info), K(next_status)); } else { - if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task))) { + if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL1))) { LOG_WARN("failed to get transfer task", K(ret), K(task_id), K(tenant_id)); } else if (task_info.status_ != transfer_task.get_status() || task_info.src_ls_id_ != transfer_task.get_src_ls() @@ -1586,10 +1588,10 @@ int ObTransferHandler::update_transfer_status_( LOG_WARN("task info in not equal to inner table transfer task, unexpected", K(ret), K(task_info), K(transfer_task)); } else if (start_scn.is_valid() && OB_FAIL(ObTransferTaskOperator::update_start_scn( - trans, tenant_id, task_id, transfer_task.get_status(), start_scn))) { + trans, tenant_id, task_id, transfer_task.get_status(), start_scn, share::OBCG_STORAGE_HA_LEVEL1))) { LOG_WARN("failed to update finish scn", K(ret), K(tenant_id), K(task_id), K(start_scn)); } else if (OB_FAIL(ObTransferTaskOperator::update_status_and_result( - trans, tenant_id, task_id, transfer_task.get_status(), next_status, result))) { + trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, share::OBCG_STORAGE_HA_LEVEL1))) { LOG_WARN("failed to finish task", K(ret), K(tenant_id), K(task_id)); } else { #ifdef ERRSIM diff --git a/src/storage/high_availability/ob_transfer_lock_info_operator.cpp b/src/storage/high_availability/ob_transfer_lock_info_operator.cpp index f15fa34a1e..fdfae61e1f 100644 --- a/src/storage/high_availability/ob_transfer_lock_info_operator.cpp +++ b/src/storage/high_availability/ob_transfer_lock_info_operator.cpp @@ -27,13 +27,14 @@ namespace storage { /* ObTransferLockInfoOperator */ -int ObTransferLockInfoOperator::insert(const ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy) +int ObTransferLockInfoOperator::insert(const ObTransferTaskLockInfo &lock_info, + const int32_t group_id, common::ObISQLClient &sql_proxy) { int ret = OB_SUCCESS; const uint64_t tenant_id = lock_info.tenant_id_; - if (!lock_info.is_valid()) { + if (!lock_info.is_valid() || group_id < 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(lock_info)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(lock_info), K(group_id)); } else { ObSqlString sql; ObDMLSqlSplicer dml_splicer; @@ -44,7 +45,7 @@ int ObTransferLockInfoOperator::insert(const ObTransferTaskLockInfo &lock_info, LOG_WARN("failed to finish row", K(ret), K(tenant_id), K(lock_info)); } else if (OB_FAIL(dml_splicer.splice_insert_sql(OB_ALL_LS_TRANSFER_MEMBER_LIST_LOCK_INFO_TNAME, sql))) { LOG_WARN("failed to splice insert sql", K(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(sql_proxy.write(gen_meta_tenant_id(tenant_id), sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(gen_meta_tenant_id(tenant_id), sql.ptr(), group_id, affected_rows))) { if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { ret = OB_ENTRY_EXIST; } else { @@ -67,14 +68,14 @@ int ObTransferLockInfoOperator::insert(const ObTransferTaskLockInfo &lock_info, } int ObTransferLockInfoOperator::remove(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, common::ObISQLClient &sql_proxy) + const ObTransferLockStatus &status, const int32_t group_id, common::ObISQLClient &sql_proxy) { int ret = OB_SUCCESS; ObSqlString sql; int64_t affected_rows = 0; - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid())) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid() || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(group_id)); } else if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE tenant_id = %lu and ls_id = %ld" " and task_id = %ld and status = '%s'", OB_ALL_LS_TRANSFER_MEMBER_LIST_LOCK_INFO_TNAME, @@ -83,7 +84,7 @@ int ObTransferLockInfoOperator::remove(const uint64_t tenant_id, const share::Ob task_id, status.str()))) { LOG_WARN("failed to assign sql", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status)); - } else if (OB_FAIL(sql_proxy.write(gen_meta_tenant_id(tenant_id), sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_proxy.write(gen_meta_tenant_id(tenant_id), sql.ptr(), group_id, affected_rows))) { LOG_WARN("failed to write sql", K(ret), K(tenant_id), K(sql), K(affected_rows)); } else if (OB_UNLIKELY(0 == affected_rows)) { ret = OB_ENTRY_NOT_EXIST; @@ -106,14 +107,14 @@ int ObTransferLockInfoOperator::remove(const uint64_t tenant_id, const share::Ob } int ObTransferLockInfoOperator::get(const ObTransferLockInfoRowKey &row_key, const int64_t task_id, - const ObTransferLockStatus &status, const bool for_update, ObTransferTaskLockInfo &lock_info, + const ObTransferLockStatus &status, const bool for_update, const int32_t group_id, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy) { int ret = OB_SUCCESS; const uint64_t tenant_id = row_key.tenant_id_; - if (OB_UNLIKELY(!row_key.is_valid())) { + if (OB_UNLIKELY(!row_key.is_valid() || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(for_update)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(for_update), K(group_id)); } else { ObSqlString sql; SMART_VAR(ObISQLClient::ReadResult, result) @@ -127,7 +128,7 @@ int ObTransferLockInfoOperator::get(const ObTransferLockInfoRowKey &row_key, con status.str(), for_update ? " FOR UPDATE" : ""))) { LOG_WARN("fail to assign sql", K(ret), K(tenant_id), K(row_key), K(task_id), K(status), K(for_update)); - } else if (OB_FAIL(sql_proxy.read(result, gen_meta_tenant_id(tenant_id), sql.ptr()))) { + } else if (OB_FAIL(sql_proxy.read(result, gen_meta_tenant_id(tenant_id), sql.ptr(), group_id))) { LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql)); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -143,12 +144,12 @@ int ObTransferLockInfoOperator::get(const ObTransferLockInfoRowKey &row_key, con } int ObTransferLockInfoOperator::fetch_all(common::ObISQLClient &sql_proxy, const uint64_t tenant_id, - common::ObArray &lock_infos) + const int32_t group_id, common::ObArray &lock_infos) { int ret = OB_SUCCESS; - if (OB_INVALID_ID == tenant_id) { + if (OB_INVALID_ID == tenant_id || group_id < 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(group_id)); } else { ObSqlString sql; SMART_VAR(ObISQLClient::ReadResult, result) @@ -157,7 +158,7 @@ int ObTransferLockInfoOperator::fetch_all(common::ObISQLClient &sql_proxy, const OB_ALL_LS_TRANSFER_MEMBER_LIST_LOCK_INFO_TNAME, tenant_id))) { LOG_WARN("fail to assign sql", K(ret), K(tenant_id)); - } else if (OB_FAIL(sql_proxy.read(result, gen_meta_tenant_id(tenant_id), sql.ptr()))) { + } else if (OB_FAIL(sql_proxy.read(result, gen_meta_tenant_id(tenant_id), sql.ptr(), group_id))) { LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql)); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -275,4 +276,4 @@ int ObTransferLockInfoOperator::parse_sql_results_( } } // namespace storage -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/storage/high_availability/ob_transfer_lock_info_operator.h b/src/storage/high_availability/ob_transfer_lock_info_operator.h index 61bda8ebe2..170d5e556b 100644 --- a/src/storage/high_availability/ob_transfer_lock_info_operator.h +++ b/src/storage/high_availability/ob_transfer_lock_info_operator.h @@ -28,12 +28,12 @@ public: ObTransferLockInfoOperator() = default; ~ObTransferLockInfoOperator() = default; - static int insert(const ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy); + static int insert(const ObTransferTaskLockInfo &lock_info, const int32_t group_id, common::ObISQLClient &sql_proxy); static int remove(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, common::ObISQLClient &sql_proxy); + const ObTransferLockStatus &status, const int32_t group_id, common::ObISQLClient &sql_proxy); static int get(const ObTransferLockInfoRowKey &row_key, const int64_t task_id, const ObTransferLockStatus &status, - const bool for_update, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy); - static int fetch_all(common::ObISQLClient &sql_proxy, const uint64_t tenant_id, + const bool for_update, const int32_t group_id, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy); + static int fetch_all(common::ObISQLClient &sql_proxy, const uint64_t tenant_id, const int32_t group_id, common::ObArray &lock_infos); private: @@ -46,4 +46,4 @@ private: } // namespace storage } // namespace oceanbase -#endif \ No newline at end of file +#endif diff --git a/src/storage/high_availability/ob_transfer_lock_utils.cpp b/src/storage/high_availability/ob_transfer_lock_utils.cpp index ac94dd6f3f..dca1369324 100644 --- a/src/storage/high_availability/ob_transfer_lock_utils.cpp +++ b/src/storage/high_availability/ob_transfer_lock_utils.cpp @@ -53,7 +53,7 @@ static int get_ls_handle(const uint64_t tenant_id, const share::ObLSID &ls_id, s int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, const int64_t task_id, const common::ObArray &lock_ls_list, const common::ObMemberList &member_list, - const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy) + const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; ObArray sorted_ls_list; @@ -63,7 +63,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c std::sort(sorted_ls_list.begin(), sorted_ls_list.end()); for (int64_t i = 0; OB_SUCC(ret) && i < sorted_ls_list.count(); ++i) { const share::ObLSID &ls_id = sorted_ls_list.at(i); - if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) { + if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, group_id, sql_proxy))) { LOG_WARN("failed to lock ls member list", K(ret), K(ls_id), K(member_list)); } } @@ -73,7 +73,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, - common::ObMySQLProxy &sql_proxy) + const int32_t group_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; int64_t lock_owner = -1; @@ -83,14 +83,14 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s if (!ls_id.is_valid() || !member_list.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(ls_id)); - } else if (OB_FAIL(unlock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) { + } else if (OB_FAIL(unlock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, group_id, sql_proxy))) { LOG_WARN("failed to unlock ls member list", K(ret)); - } else if (OB_FAIL(get_lock_owner(tenant_id, ls_id, task_id, status, sql_proxy, lock_owner))) { + } else if (OB_FAIL(get_lock_owner(tenant_id, ls_id, task_id, status, group_id, sql_proxy, lock_owner))) { LOG_WARN("failed to get lock owner", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status)); } else if (OB_FAIL(get_member_list_str(member_list, member_list_str))) { LOG_WARN("failed to get member list str", K(ret), K(member_list)); } else if (OB_FAIL( - insert_lock_info(tenant_id, ls_id, task_id, status, lock_owner, member_list_str.ptr(), real_lock_owner, sql_proxy))) { + insert_lock_info(tenant_id, ls_id, task_id, status, lock_owner, member_list_str.ptr(), group_id, real_lock_owner, sql_proxy))) { LOG_WARN("failed to insert lock info", K(ret), K(ls_id), K(task_id)); } else { #ifdef ERRSIM @@ -105,11 +105,11 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s bool need_lock_palf = false; if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, real_lock_owner, member_list_str.ptr()))) { LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status), K(real_lock_owner)); - } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) { + } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, group_id, palf_lock_owner, palf_is_locked))) { LOG_WARN("failed to get config change lock stat", K(ret), K(tenant_id), K(ls_id)); } else if (OB_FAIL(check_lock_status_(palf_is_locked, palf_lock_owner, real_lock_owner, need_lock_palf))) { LOG_WARN("failed to check lock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(real_lock_owner)); - } else if (need_lock_palf && OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) { + } else if (need_lock_palf && OB_FAIL(try_lock_config_change_(lock_info, lock_timeout, group_id))) { LOG_WARN("failed to try lock config config", K(ret), K(tenant_id), @@ -128,7 +128,7 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, - common::ObMySQLProxy &sql_proxy) + const int32_t group_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -142,7 +142,7 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); row_key.tenant_id_ = tenant_id; row_key.ls_id_ = ls_id; - if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id))) { + if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id, false/*with_snapshot*/, group_id))) { LOG_WARN("failed to start trans", K(ret), K(meta_tenant_id)); } else { int64_t lock_owner = -1; @@ -153,7 +153,7 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const bool need_unlock = true; bool need_relock_before_unlock = false; - if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, lock_info, trans))) { + if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, group_id, lock_info, trans))) { if (OB_ENTRY_NOT_EXIST == ret) { // palf need to be unlocked ret = OB_SUCCESS; LOG_INFO("member list already unlocked", @@ -166,18 +166,18 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const } else { LOG_WARN("failed to get lock info", K(ret), K(tenant_id), K(row_key)); } - } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) { + } else if (OB_FAIL(get_config_change_lock_stat_(lock_info, group_id, palf_lock_owner, palf_is_locked))) { LOG_WARN("failed to get config change lock stat"); } else if (OB_FAIL(check_unlock_status_(palf_is_locked, palf_lock_owner, lock_info.lock_owner_, need_unlock, need_relock_before_unlock))) { LOG_WARN("failed to check unlock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(lock_info)); } else if (FALSE_IT(lock_owner = lock_info.lock_owner_)) { // assign lock owner - } else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout))) { + } else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout, group_id))) { LOG_WARN("failed to relock config change", K(ret), K(lock_info), K(palf_lock_owner)); - } else if (need_unlock && OB_FAIL(unlock_config_change_(lock_info, lock_timeout))) { + } else if (need_unlock && OB_FAIL(unlock_config_change_(lock_info, lock_timeout, group_id))) { LOG_WARN("failed to get paxos member list", K(ret), K(lock_info)); - } else if (OB_FAIL(ObTransferLockInfoOperator::remove(tenant_id, ls_id, task_id, status, trans))) { + } else if (OB_FAIL(ObTransferLockInfoOperator::remove(tenant_id, ls_id, task_id, status, group_id, trans))) { LOG_WARN("failed to update lock info", K(ret), K(row_key), K(lock_info), K(palf_lock_owner), K(palf_is_locked)); } else { #ifdef ERRSIM @@ -228,11 +228,12 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const return ret; } -int ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy) +int ObMemberListLockUtils::unlock_member_list_when_switch_to_standby( + const uint64_t tenant_id, const int32_t group_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; ObArray lock_infos; - if (OB_FAIL(ObTransferLockInfoOperator::fetch_all(sql_proxy, tenant_id, lock_infos))) { + if (OB_FAIL(ObTransferLockInfoOperator::fetch_all(sql_proxy, tenant_id, group_id, lock_infos))) { LOG_WARN("failed to fetch all lock info", K(ret), K(tenant_id)); } else if (lock_infos.empty()) { LOG_INFO("no need unlock member list when switch to standby", K(tenant_id)); @@ -249,6 +250,7 @@ int ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(const uint6 task_id, fake_member_list, status, + group_id, sql_proxy))) { LOG_WARN("failed to unlock ls member list", K(ret), K(lock_info)); } @@ -258,7 +260,7 @@ int ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(const uint6 } int ObMemberListLockUtils::try_lock_config_change_( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id) { int ret = OB_SUCCESS; bool ls_exist = false; @@ -270,7 +272,7 @@ int ObMemberListLockUtils::try_lock_config_change_( storage::ObStorageRpc storage_rpc; if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) { LOG_WARN("failed to init storage rpc", K(ret)); - } else if (OB_FAIL(inner_try_lock_config_change_(lock_info, lock_timeout, storage_rpc))) { + } else if (OB_FAIL(inner_try_lock_config_change_(lock_info, lock_timeout, group_id, storage_rpc))) { LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info)); } else { LOG_INFO("try lock config change fallback", K(lock_info), K(lock_timeout)); @@ -289,7 +291,7 @@ int ObMemberListLockUtils::try_lock_config_change_( } int ObMemberListLockUtils::inner_try_lock_config_change_( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id, storage::ObStorageRpc &storage_rpc) { int ret = OB_SUCCESS; @@ -301,14 +303,14 @@ int ObMemberListLockUtils::inner_try_lock_config_change_( } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); } else if (OB_FAIL(storage_rpc.lock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_, - lock_info.lock_owner_, lock_timeout))) { + lock_info.lock_owner_, lock_timeout, group_id))) { LOG_WARN("failed to try lock config config", K(ret), K(lock_info)); } return ret; } int ObMemberListLockUtils::get_config_change_lock_stat_( - const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked) + const ObTransferTaskLockInfo &lock_info, const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked) { int ret = OB_SUCCESS; bool ls_exist = false; @@ -319,7 +321,7 @@ int ObMemberListLockUtils::get_config_change_lock_stat_( storage::ObStorageRpc storage_rpc; if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) { LOG_WARN("failed to init storage rpc", K(ret)); - } else if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, palf_lock_owner, is_locked, storage_rpc))) { + } else if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, group_id, palf_lock_owner, is_locked, storage_rpc))) { LOG_WARN("failed to get lock config change fallback", K(ret), K(lock_info)); } else { LOG_INFO("get lock config change stat fallback", K(lock_info), K(palf_lock_owner), K(is_locked)); @@ -338,7 +340,7 @@ int ObMemberListLockUtils::get_config_change_lock_stat_( } int ObMemberListLockUtils::get_config_change_lock_stat_fallback_( - const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, + const ObTransferTaskLockInfo &lock_info, const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked, storage::ObStorageRpc &storage_rpc) { int ret = OB_SUCCESS; @@ -352,14 +354,14 @@ int ObMemberListLockUtils::get_config_change_lock_stat_fallback_( } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); } else if (OB_FAIL(storage_rpc.get_config_change_lock_stat(lock_info.tenant_id_, src_info, - lock_info.ls_id_, palf_lock_owner, is_locked))) { + lock_info.ls_id_, group_id, palf_lock_owner, is_locked))) { LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info)); } return ret; } int ObMemberListLockUtils::unlock_config_change_( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout) + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id) { int ret = OB_SUCCESS; bool ls_exist = false; @@ -371,7 +373,7 @@ int ObMemberListLockUtils::unlock_config_change_( storage::ObStorageRpc storage_rpc; if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) { LOG_WARN("failed to init storage rpc", K(ret)); - } else if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout, storage_rpc))) { + } else if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout, group_id, storage_rpc))) { LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info)); } else { LOG_INFO("unlock lock config change fallback", K(lock_info), K(lock_timeout)); @@ -390,7 +392,8 @@ int ObMemberListLockUtils::unlock_config_change_( } int ObMemberListLockUtils::unlock_config_change_fallback_( - const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, storage::ObStorageRpc &storage_rpc) + const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id, + storage::ObStorageRpc &storage_rpc) { int ret = OB_SUCCESS; ObStorageHASrcInfo src_info; @@ -401,7 +404,7 @@ int ObMemberListLockUtils::unlock_config_change_fallback_( } else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) { LOG_WARN("failed to get ls leader", K(ret), K(lock_info)); } else if (OB_FAIL(storage_rpc.unlock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_, - lock_info.lock_owner_, lock_timeout))) { + lock_info.lock_owner_, lock_timeout, group_id))) { LOG_WARN("failed to try lock config config", K(ret), K(lock_info)); } return ret; @@ -410,7 +413,7 @@ int ObMemberListLockUtils::unlock_config_change_fallback_( /* ObMemberListLockUtils */ int ObMemberListLockUtils::get_lock_owner(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, common::ObMySQLProxy &proxy, int64_t &lock_owner) + const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &proxy, int64_t &lock_owner) { int ret = OB_SUCCESS; share::ObCommonID common_id; @@ -419,7 +422,7 @@ int ObMemberListLockUtils::get_lock_owner(const uint64_t tenant_id, const share: if (!is_valid_tenant_id(tenant_id) || !ls_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("tenant id not valid", K(ret), K(tenant_id)); - } else if (OB_FAIL(ObCommonIDUtils::gen_monotonic_id(meta_tenant_id, id_type, proxy, common_id))) { + } else if (OB_FAIL(ObCommonIDUtils::gen_monotonic_id(meta_tenant_id, id_type, group_id, proxy, common_id))) { LOG_WARN("failed to gen monotonic id", K(ret), K(meta_tenant_id)); } else { lock_owner = common_id.id(); @@ -454,7 +457,7 @@ int ObMemberListLockUtils::get_member_list_str( int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const ObTransferLockStatus &status, const int64_t lock_owner, const common::ObString &comment, - int64_t &real_lock_owner, common::ObISQLClient &sql_proxy) + const int32_t group_id, int64_t &real_lock_owner, common::ObISQLClient &sql_proxy) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -465,7 +468,7 @@ int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const shar const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, lock_owner, comment))) { LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(status), K(real_lock_owner)); - } else if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id))) { + } else if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id, false/*with_snapshot*/, group_id))) { LOG_WARN("failed to start trans", K(ret), K(meta_tenant_id)); } else { if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, &trans, true/*for_update*/, tenant_info))) { @@ -473,11 +476,11 @@ int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const shar } else if (!tenant_info.is_primary()) { ret = OB_OP_NOT_ALLOW; LOG_WARN("tenant is not primary, do not allow insert lock info", K(tenant_id), K(tenant_info)); - } else if (OB_FAIL(ObTransferLockInfoOperator::insert(lock_info, trans))) { + } else if (OB_FAIL(ObTransferLockInfoOperator::insert(lock_info, group_id, trans))) { if (OB_ENTRY_EXIST == ret) { ret = OB_SUCCESS; ObTransferTaskLockInfo tmp_lock_info; - if (OB_FAIL(get_lock_info(tenant_id, ls_id, task_id, status, tmp_lock_info, trans))) { + if (OB_FAIL(get_lock_info(tenant_id, ls_id, task_id, status, group_id, tmp_lock_info, trans))) { LOG_WARN("failed to get lock info", K(ret), K(ls_id)); } else { real_lock_owner = tmp_lock_info.lock_owner_; @@ -497,7 +500,7 @@ int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const shar } int ObMemberListLockUtils::get_lock_info(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy) + const ObTransferLockStatus &status, const int32_t group_id, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy) { int ret = OB_SUCCESS; lock_info.reset(); @@ -508,7 +511,7 @@ int ObMemberListLockUtils::get_lock_info(const uint64_t tenant_id, const share:: if (!row_key.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(row_key)); - } else if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, lock_info, sql_proxy))) { + } else if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, group_id, lock_info, sql_proxy))) { LOG_WARN("failed to get lock info", K(ret), K(row_key), K(task_id), K(status)); } return ret; @@ -588,10 +591,10 @@ int ObMemberListLockUtils::check_unlock_status_( } int ObMemberListLockUtils::relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, - const int64_t palf_lock_owner, const int64_t lock_timeout) + const int64_t palf_lock_owner, const int64_t lock_timeout, const int32_t group_id) { int ret = OB_SUCCESS; - if (OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) { + if (OB_FAIL(try_lock_config_change_(lock_info, lock_timeout, group_id))) { LOG_WARN("failed to try lock config change", K(ret), K(lock_info)); } else { LOG_WARN("relock before unlock", K(ret), K(lock_info)); diff --git a/src/storage/high_availability/ob_transfer_lock_utils.h b/src/storage/high_availability/ob_transfer_lock_utils.h index 8fdda729f4..9c422c7ddd 100644 --- a/src/storage/high_availability/ob_transfer_lock_utils.h +++ b/src/storage/high_availability/ob_transfer_lock_utils.h @@ -27,42 +27,42 @@ public: /* member list*/ static int batch_lock_ls_member_list(const uint64_t tenant_id, const int64_t task_id, const common::ObArray &lock_ls_list, const common::ObMemberList &member_list, - const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); + const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &sql_proxy); static int lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); + const common::ObMemberList &member_list, const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &sql_proxy); static int unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy); + const common::ObMemberList &member_list, const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &sql_proxy); static int unlock_for_ob_admin(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t lock_id); public: /* interface used for primary switch over to standby */ - static int unlock_member_list_when_switch_to_standby(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy); + static int unlock_member_list_when_switch_to_standby(const uint64_t tenant_id, const int32_t group_id, common::ObMySQLProxy &sql_proxy); private: /* sql operator */ static int insert_lock_info(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const ObTransferLockStatus &status, const int64_t lock_owner, const common::ObString &comment, - int64_t &real_lock_owner, common::ObISQLClient &sql_proxy); + const int32_t group_id, int64_t &real_lock_owner, common::ObISQLClient &sql_proxy); static int get_lock_info(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy); + const ObTransferLockStatus &status, const int32_t group_id, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy); private: /* monotonic id*/ static int get_lock_owner(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, - const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy, int64_t &lock_owner); + const ObTransferLockStatus &status, const int32_t group_id, common::ObMySQLProxy &sql_proxy, int64_t &lock_owner); static int get_member_list_str(const common::ObMemberList &member_list, ObSqlString &member_list_str); private: /* palf lock config*/ - static int try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); + static int try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id); static int inner_try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, - storage::ObStorageRpc &storage_rpc); - static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info, + const int32_t group_id, storage::ObStorageRpc &storage_rpc); + static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info, const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked); - static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info, + static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info, const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked, storage::ObStorageRpc &storage_rpc); - static int unlock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout); - static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, + static int unlock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id); + static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, const int32_t group_id, storage::ObStorageRpc &storage_rpc); private: @@ -71,7 +71,7 @@ private: static int check_unlock_status_(const bool palf_is_locked, const int64_t palf_lock_owner, const int64_t inner_table_lock_owner, bool &need_unlock, bool &need_relock_before_unlock); static int relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, const int64_t palf_lock_owner, - const int64_t lock_timeout); + const int64_t lock_timeout, const int32_t group_id); static int init_storage_rpc_(obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy, storage::ObStorageRpc &storage_rpc); static void destory_storage_rpc_(obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy, storage::ObStorageRpc &storage_rpc); diff --git a/src/storage/ob_common_id_utils.cpp b/src/storage/ob_common_id_utils.cpp index 2b8379dfe7..6aa847e254 100644 --- a/src/storage/ob_common_id_utils.cpp +++ b/src/storage/ob_common_id_utils.cpp @@ -70,12 +70,13 @@ int ObCommonIDUtils::gen_unique_id_by_rpc(const uint64_t tenant_id, ObCommonID & int ObCommonIDUtils::gen_monotonic_id(const uint64_t tenant_id, const ObMaxIdType id_type, + const int32_t group_id, common::ObMySQLProxy &proxy, share::ObCommonID &id) { int ret = OB_SUCCESS; uint64_t ret_id = OB_INVALID_ID; - ObMaxIdFetcher id_fetcher(proxy); + ObMaxIdFetcher id_fetcher(proxy, group_id); id.reset(); diff --git a/src/storage/ob_common_id_utils.h b/src/storage/ob_common_id_utils.h index 47dff0b281..1aae4bde9e 100644 --- a/src/storage/ob_common_id_utils.h +++ b/src/storage/ob_common_id_utils.h @@ -60,6 +60,7 @@ public: // @param [out] id generated monotonically increasing ID static int gen_monotonic_id(const uint64_t tenant_id, const share::ObMaxIdType id_type, + const int32_t group_id, common::ObMySQLProxy &proxy, share::ObCommonID &id); }; diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 82a049de3b..a7cb3b6db0 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -3288,6 +3288,7 @@ int ObStorageRpc::check_start_transfer_tablets( } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) + .group_id(share::OBCG_STORAGE_HA_LEVEL2) .check_start_transfer_tablets(arg))) { LOG_WARN("failed to check src transfer tablets", K(ret), K(src_info), K(arg)); } @@ -3317,6 +3318,7 @@ int ObStorageRpc::get_ls_active_trans_count( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) + .group_id(share::OBCG_STORAGE_HA_LEVEL2) .get_ls_active_trans_count(arg, res))) { LOG_WARN("failed to get ls active trans count", K(ret), K(src_info), K(arg)); } else { @@ -3358,6 +3360,7 @@ int ObStorageRpc::get_transfer_start_scn( .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) .timeout(get_transfer_start_scn_timeout) + .group_id(share::OBCG_STORAGE_HA_LEVEL1) .get_transfer_start_scn(arg, res))) { LOG_WARN("failed to get transfer start scn", K(ret), K(src_info), K(arg)); } else { @@ -3389,6 +3392,7 @@ int ObStorageRpc::fetch_ls_replay_scn( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) + .group_id(share::OBCG_STORAGE_HA_LEVEL2) .fetch_ls_replay_scn(arg, res))) { LOG_WARN("failed to fetch ls replay scn", K(ret), K(src_info), K(arg)); } else { @@ -3422,6 +3426,7 @@ int ObStorageRpc::check_tablets_logical_table_replaced( } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) + .group_id(share::OBCG_STORAGE_HA_LEVEL2) .check_transfer_tablet_backfill_completed(arg, res))) { LOG_WARN("failed to check tablets backfill completed", K(ret), K(src_info), K(arg)); } else { @@ -3475,15 +3480,16 @@ int ObStorageRpc::lock_config_change( const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout) + const int64_t lock_timeout, + const int32_t group_id) { int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid() || group_id < 0) { ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id), K(group_id)); } else { ObStorageConfigChangeOpArg arg; ObStorageConfigChangeOpRes res; @@ -3497,6 +3503,7 @@ int ObStorageRpc::lock_config_change( .by(tenant_id) .timeout(timeout) .dst_cluster_id(src_info.cluster_id_) + .group_id(group_id) .lock_config_change(arg, res))) { LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); } @@ -3509,15 +3516,16 @@ int ObStorageRpc::unlock_config_change( const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout) + const int64_t lock_timeout, + const int32_t group_id) { int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid() || group_id < 0) { ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id), K(group_id)); } else { ObStorageConfigChangeOpArg arg; ObStorageConfigChangeOpRes res; @@ -3531,6 +3539,7 @@ int ObStorageRpc::unlock_config_change( .by(tenant_id) .timeout(timeout) .dst_cluster_id(src_info.cluster_id_) + .group_id(group_id) .unlock_config_change(arg, res))) { LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); } @@ -3542,6 +3551,7 @@ int ObStorageRpc::get_config_change_lock_stat( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, + const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked) { @@ -3549,9 +3559,9 @@ int ObStorageRpc::get_config_change_lock_stat( if (!is_inited_) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); - } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) { + } else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid() || group_id < 0) { ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id)); + STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id), K(group_id)); } else { ObStorageConfigChangeOpArg arg; ObStorageConfigChangeOpRes res; @@ -3563,6 +3573,7 @@ int ObStorageRpc::get_config_change_lock_stat( .by(tenant_id) .timeout(timeout) .dst_cluster_id(src_info.cluster_id_) + .group_id(group_id) .get_config_change_lock_stat(arg, res))) { LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg)); } else { @@ -3591,6 +3602,7 @@ int ObStorageRpc::wakeup_transfer_service( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) + .group_id(share::OBCG_STORAGE_HA_LEVEL2) .wakeup_transfer_service(arg))) { LOG_WARN("failed to wakeup transfer service", K(ret), K(src_info), K(arg)); } diff --git a/src/storage/ob_storage_rpc.h b/src/storage/ob_storage_rpc.h index 76a883098a..12a2c22fde 100755 --- a/src/storage/ob_storage_rpc.h +++ b/src/storage/ob_storage_rpc.h @@ -1138,17 +1138,20 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout) = 0; + const int64_t lock_timeout, + const int32_t group_id) = 0; virtual int unlock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout) = 0; + const int64_t lock_timeout, + const int32_t group_id) = 0; virtual int get_config_change_lock_stat( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, + const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked) = 0; virtual int wakeup_transfer_service( @@ -1249,17 +1252,20 @@ public: const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout); + const int64_t lock_timeout, + const int32_t group_id); virtual int unlock_config_change( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, const int64_t lock_owner, - const int64_t lock_timeout); + const int64_t lock_timeout, + const int32_t group_id); virtual int get_config_change_lock_stat( const uint64_t tenant_id, const ObStorageHASrcInfo &src_info, const share::ObLSID &ls_id, + const int32_t group_id, int64_t &palf_lock_owner, bool &is_locked); virtual int wakeup_transfer_service(