337 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			337 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| #include <gtest/gtest.h>
 | |
| #include <thread>
 | |
| #include <iostream>
 | |
| #define protected public
 | |
| #define private public
 | |
| #include "env/ob_simple_cluster_test_base.h"
 | |
| #include "storage/compaction/ob_compaction_diagnose.h"
 | |
| #include "storage/compaction/ob_schedule_dag_func.h"
 | |
| #include "storage/ls/ob_ls.h"
 | |
| #include "storage/tx_storage/ob_ls_handle.h"
 | |
| #include "storage/tx_storage/ob_ls_service.h"
 | |
| #include "storage/tx/ob_tx_data_functor.h"
 | |
| #include "storage/tablet/ob_tablet.h"
 | |
| #include "storage/ob_relative_table.h"
 | |
| #include "storage/ob_dml_running_ctx.h"
 | |
| #include "storage/access/ob_rows_info.h"
 | |
| static int qcc = 0;
 | |
| static int qcc2 = 0;
 | |
| static int qcc3 = 0;
 | |
| namespace oceanbase
 | |
| {
 | |
| namespace storage
 | |
| {
 | |
| int ObLSTabletService::insert_tablet_rows(
 | |
|     const int64_t row_count,
 | |
|     ObTabletHandle &tablet_handle,
 | |
|     ObDMLRunningCtx &run_ctx,
 | |
|     ObStoreRow *rows,
 | |
|     ObRowsInfo &rows_info)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObRelativeTable &table = run_ctx.relative_table_;
 | |
|   const bool check_exists = !table.is_storage_index_table() || table.is_unique_index();
 | |
|   bool exists = false;
 | |
|   // // 1. Defensive checking of new rows.
 | |
|   // if (GCONF.enable_defensive_check()) {
 | |
|   //   for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) {
 | |
|   //     ObStoreRow &tbl_row = rows[i];
 | |
|   //     if (OB_FAIL(check_new_row_legitimacy(run_ctx, tbl_row.row_val_))) {
 | |
|   //       LOG_WARN("Failed to check new row legitimacy", K(ret), K_(tbl_row.row_val));
 | |
|   //     }
 | |
|   //   }
 | |
|   // }
 | |
| 
 | |
|   // 2. Check uniqueness constraint in memetable only(active + frozen).
 | |
|   // It would be more efficient and elegant to completely merge the uniqueness constraint
 | |
|   // and write conflict checking, but the implementation currently is to minimize intrusion
 | |
|   // into the memtable.
 | |
|   // if (check_exists && OB_FAIL(tablet_handle.get_obj()->rowkeys_exists(run_ctx.store_ctx_, table,
 | |
|   //                                                                     rows_info, exists))) {
 | |
|   //   LOG_WARN("Failed to check the uniqueness constraint", K(ret), K(rows_info));
 | |
|   // } else if (exists) {
 | |
|   //   ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
 | |
|   //   blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey();
 | |
|   //   LOG_WARN("Rowkey already exist", K(ret), K(table), K(duplicate_rowkey));
 | |
|   // }
 | |
| 
 | |
|   // 3. Insert rows with uniqueness constraint and write conflict checking.
 | |
|   // Check write conflict in memtable + sstable.
 | |
|   // Check uniqueness constraint in sstable only.
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_FAIL(tablet_handle.get_obj()->insert_rows(table, run_ctx.store_ctx_, rows, rows_info,
 | |
|         check_exists, *run_ctx.col_descs_, row_count, run_ctx.dml_param_.encrypt_meta_))) {
 | |
|       if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
 | |
|         blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey();
 | |
|         TRANS_LOG(WARN, "Rowkey already exist", K(ret), K(table), K(duplicate_rowkey),
 | |
|                  K(rows_info.get_conflict_idx()));
 | |
|       } else if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
 | |
|         TRANS_LOG(WARN, "Failed to insert rows to tablet", K(ret), K(rows_info));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // 4. Log user error message if rowkey is duplicate.
 | |
|   if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret && !run_ctx.dml_param_.is_ignore_) {
 | |
|     int tmp_ret = OB_SUCCESS;
 | |
|     char rowkey_buffer[OB_TMP_BUF_SIZE_256];
 | |
|     ObString index_name = "PRIMARY";
 | |
|     if (OB_TMP_FAIL(extract_rowkey(table, rows_info.get_conflict_rowkey(),
 | |
|          rowkey_buffer, OB_TMP_BUF_SIZE_256, run_ctx.dml_param_.tz_info_))) {
 | |
|       TRANS_LOG(WARN, "Failed to extract rowkey", K(ret), K(tmp_ret));
 | |
|     }
 | |
|     if (table.is_index_table()) {
 | |
|       if (OB_TMP_FAIL(table.get_index_name(index_name))) {
 | |
|         TRANS_LOG(WARN, "Failed to get index name", K(ret), K(tmp_ret));
 | |
|       }
 | |
|     } else if (lib::is_oracle_mode() && OB_TMP_FAIL(table.get_primary_key_name(index_name))) {
 | |
|       TRANS_LOG(WARN, "Failed to get pk name", K(ret), K(tmp_ret));
 | |
|     }
 | |
|     LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, rowkey_buffer, index_name.length(), index_name.ptr());
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_table)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObTabletTableIterator &iter = relative_table.tablet_iter_;
 | |
|   const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
 | |
|   const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
 | |
|   bool need_print = false;
 | |
|   if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 0 && qcc2 == 0) {
 | |
|     need_print = true;
 | |
|     qcc2++;
 | |
|     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable()));
 | |
|     usleep(1 * 1000 * 1000);
 | |
|     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable()));
 | |
|   }
 | |
|   if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 1 && qcc3 == 0) {
 | |
|     while (qcc2 == 0) {
 | |
|       usleep(1000);
 | |
|     }
 | |
|   }
 | |
|   while (OB_SUCC(ret) && need_to_refresh_table(*iter.table_iter())) {
 | |
|     if (OB_FAIL(store_ctx_.ls_->get_tablet_svr()->get_read_tables(
 | |
|         tablet_id,
 | |
|         ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US,
 | |
|         store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
 | |
|         iter,
 | |
|         relative_table.allow_not_ready()))) {
 | |
|       TRANS_LOG(WARN, "fail to get", K(store_ctx_.mvcc_acc_ctx_.tx_id_), K(ret));
 | |
|     } else {
 | |
|       // no worry. iter will hold tablet reference and its life cycle is longer than guard
 | |
|       tablet_ = iter.get_tablet();
 | |
|       if (store_ctx_.timeout_ > 0) {
 | |
|         const int64_t query_left_time = store_ctx_.timeout_ - ObTimeUtility::current_time();
 | |
|         if (query_left_time <= 0) {
 | |
|           ret = OB_TRANS_STMT_TIMEOUT;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   if (need_print) {
 | |
|     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable()));
 | |
|   }
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 1 && qcc3 == 0) {
 | |
|       qcc++;
 | |
|       qcc3++;
 | |
|       TRANS_LOG(INFO, "qc debug2", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable()));
 | |
|       usleep(2 * 1000 * 1000);
 | |
|       TRANS_LOG(INFO, "qc debug2", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable()));
 | |
|     }
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| }
 | |
| namespace unittest
 | |
| {
 | |
| using namespace oceanbase::transaction;
 | |
| using namespace oceanbase::storage;
 | |
| using namespace oceanbase::memtable;
 | |
| using namespace oceanbase::storage::checkpoint;
 | |
| #define EXE_SQL(sql_str)                                            \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                       \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
 | |
| #define EXE_SQL_FMT(...)                                            \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__));               \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
 | |
| #define WRITE_SQL_BY_CONN(conn, sql_str)                                \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                           \
 | |
|   ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
 | |
| #define WRITE_SQL_FMT_BY_CONN(conn, ...)                                \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__));                   \
 | |
|   ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
 | |
| #define READ_SQL_BY_CONN(conn, sql_str)         \
 | |
|   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                           \
 | |
|   ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res));
 | |
| class ObCallbackReverseTest : public ObSimpleClusterTestBase
 | |
| {
 | |
| public:
 | |
|   ObCallbackReverseTest() : ObSimpleClusterTestBase("callbacks_with_reverse_order", "200G", "40G") {}
 | |
|   void prepare_tenant_env()
 | |
|   {
 | |
|     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | |
|     int64_t affected_rows = 0;
 | |
|     ObSqlString sql;
 | |
|     sqlclient::ObISQLConnection *connection = nullptr;
 | |
|     ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
 | |
|     ASSERT_NE(nullptr, connection);
 | |
|     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000");
 | |
|     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000");
 | |
|     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000");
 | |
|     WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;");
 | |
|     WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;");
 | |
|     sleep(5);
 | |
|   }
 | |
|   void create_test_tenant(uint64_t &tenant_id)
 | |
|   {
 | |
|     TRANS_LOG(INFO, "create_tenant start");
 | |
|     ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G"));
 | |
|     ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id));
 | |
|     ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
 | |
|     TRANS_LOG(INFO, "create_tenant end", K(tenant_id));
 | |
|   }
 | |
|   // you should use single partition when using it
 | |
|   void get_tablet_id_with_table_name(const char *name,
 | |
|                                      ObTabletID &tablet)
 | |
|   {
 | |
|     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy();
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObSqlString sql;
 | |
|     int64_t affected_rows = 0;
 | |
|     int64_t tablet_id = 0;
 | |
|     ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select tablet_id from oceanbase.__all_virtual_table where table_name=%s", name));
 | |
|     SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | |
|       ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
 | |
|       sqlclient::ObMySQLResult *result = res.get_result();
 | |
|       ASSERT_NE(nullptr, result);
 | |
|       ASSERT_EQ(OB_SUCCESS, result->next());
 | |
|       ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id));
 | |
|     }
 | |
|     tablet = (uint64_t)tablet_id;
 | |
|   }
 | |
|   void minor_freeze_data()
 | |
|   {
 | |
|     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | |
|     sqlclient::ObISQLConnection *connection = nullptr;
 | |
|     ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObSqlString sql;
 | |
|     int64_t affected_rows = 0;
 | |
|     WRITE_SQL_BY_CONN(connection, "alter system minor freeze;");
 | |
|   }
 | |
|   void get_ls(uint64_t tenant_id, ObLS *&ls)
 | |
|   {
 | |
|     ls = nullptr;
 | |
|     share::ObTenantSwitchGuard tenant_guard;
 | |
|     ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id));
 | |
|     ObLSService *ls_svr = MTL(ObLSService*);
 | |
|     ASSERT_NE(nullptr, ls_svr);
 | |
|     ObLSHandle handle;
 | |
|     share::ObLSID ls_id(1001);
 | |
|     ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD));
 | |
|     ASSERT_NE(nullptr, ls = handle.get_ls());
 | |
|   }
 | |
|   void get_memtable(const ObTabletID tablet_id,
 | |
|                     ObTableHandleV2 &handle)
 | |
|   {
 | |
|     ObLS *ls = NULL;
 | |
|     get_ls(1002, ls);
 | |
|     ObTabletHandle tablet_handle;
 | |
|     ObTablet *tablet = nullptr;
 | |
|     ASSERT_EQ(OB_SUCCESS, ls->get_tablet_svr()->get_tablet(tablet_id, tablet_handle));
 | |
|     tablet = tablet_handle.get_obj();
 | |
|     ASSERT_EQ(OB_SUCCESS, tablet->get_active_memtable(handle));
 | |
|   }
 | |
| private:
 | |
| };
 | |
| TEST_F(ObCallbackReverseTest, callback_reverse_test)
 | |
| {
 | |
|   ObSqlString sql;
 | |
|   int64_t affected_rows = 0;
 | |
|   // ============================== Phase1. create tenant and table ==============================
 | |
|   TRANS_LOG(INFO, "create tenant start");
 | |
|   uint64_t tenant_id = 0;
 | |
|   create_test_tenant(tenant_id);
 | |
|   TRANS_LOG(INFO, "create tenant end");
 | |
|   share::ObTenantSwitchGuard tenant_guard;
 | |
|   ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id));
 | |
|   TRANS_LOG(INFO, "create table start");
 | |
|   common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | |
|   EXE_SQL("create table qcc (a int primary key)");
 | |
|   usleep(10 * 1000 * 1000);
 | |
|   TRANS_LOG(INFO, "create_table end");
 | |
|   prepare_tenant_env();
 | |
|   std::thread t1(
 | |
|     [this]() {
 | |
|       ObSqlString sql;
 | |
|       int64_t affected_rows = 0;
 | |
|       common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | |
|       sqlclient::ObISQLConnection *connection = nullptr;
 | |
|       ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
 | |
|       ASSERT_NE(nullptr, connection);
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_idle_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_query_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_lock_timeout = 0");
 | |
|       TRANS_LOG(INFO, "insert data start1");
 | |
|       WRITE_SQL_BY_CONN(connection, "begin;");
 | |
|       WRITE_SQL_FMT_BY_CONN(connection, "insert into qcc values(1);");
 | |
|       WRITE_SQL_BY_CONN(connection, "commit;");
 | |
|       TRANS_LOG(INFO, "insert data end1");
 | |
|     });
 | |
|   std::thread t2(
 | |
|     [this]() {
 | |
|       ObSqlString sql;
 | |
|       int64_t affected_rows = 0;
 | |
|       common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | |
|       sqlclient::ObISQLConnection *connection = nullptr;
 | |
|       ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
 | |
|       ASSERT_NE(nullptr, connection);
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_idle_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_query_timeout = 10000000000");
 | |
|       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_lock_timeout = 0");
 | |
|       TRANS_LOG(INFO, "insert data start2");
 | |
|       WRITE_SQL_BY_CONN(connection, "begin;");
 | |
|       WRITE_SQL_FMT_BY_CONN(connection, "insert into qcc values(1);");
 | |
|       WRITE_SQL_BY_CONN(connection, "commit;");
 | |
|       TRANS_LOG(INFO, "insert data end2");
 | |
|     });
 | |
|   std::thread t3(
 | |
|     [this]() {
 | |
|       while (qcc == 0) {
 | |
|         TRANS_LOG(INFO, "qcc is not increased", K(qcc));
 | |
|         usleep(100 * 1000);
 | |
|       }
 | |
|       minor_freeze_data();
 | |
|     });
 | |
|   t1.join();
 | |
|   t2.join();
 | |
|   t3.join();
 | |
|   ASSERT_EQ(1, qcc);
 | |
| }
 | |
| } // namespace unittest
 | |
| } // namespace oceanbase
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   using namespace oceanbase::unittest;
 | |
|   oceanbase::unittest::init_log_and_gtest(argc, argv);
 | |
|   OB_LOGGER.set_log_level("info");
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | 
