[BUG] donot allow create memtable when the previous one has active write ref
This commit is contained in:
		| @ -105,6 +105,7 @@ ob_unittest_observer(test_ls_replica test_ls_replica.cpp) | |||||||
| ob_unittest_observer(test_create_clone_tenant_resource_pool test_create_clone_tenant_resource_pool.cpp) | ob_unittest_observer(test_create_clone_tenant_resource_pool test_create_clone_tenant_resource_pool.cpp) | ||||||
| ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp) | ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp) | ||||||
| ob_unittest_observer(test_tenant_snapshot_service test_tenant_snapshot_service.cpp) | ob_unittest_observer(test_tenant_snapshot_service test_tenant_snapshot_service.cpp) | ||||||
|  | ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_reverse_order.cpp) | ||||||
| # TODO(muwei.ym): open later | # TODO(muwei.ym): open later | ||||||
| ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) | ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) | ||||||
| ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) | ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) | ||||||
|  | |||||||
							
								
								
									
										336
									
								
								mittest/simple_server/test_callbacks_with_reverse_order.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										336
									
								
								mittest/simple_server/test_callbacks_with_reverse_order.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,336 @@ | |||||||
|  | /** | ||||||
|  |  * Copyright (c) 2021 OceanBase | ||||||
|  |  * OceanBase CE is licensed under Mulan PubL v2. | ||||||
|  |  * You can use this software according to the terms and conditions of the Mulan PubL v2. | ||||||
|  |  * You may obtain a copy of Mulan PubL v2 at: | ||||||
|  |  *          http://license.coscl.org.cn/MulanPubL-2.0 | ||||||
|  |  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||||
|  |  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||||
|  |  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||||
|  |  * See the Mulan PubL v2 for more details. | ||||||
|  |  */ | ||||||
|  | #include <gtest/gtest.h> | ||||||
|  | #include <thread> | ||||||
|  | #include <iostream> | ||||||
|  | #define protected public | ||||||
|  | #define private public | ||||||
|  | #include "env/ob_simple_cluster_test_base.h" | ||||||
|  | #include "storage/compaction/ob_compaction_diagnose.h" | ||||||
|  | #include "storage/compaction/ob_schedule_dag_func.h" | ||||||
|  | #include "storage/ls/ob_ls.h" | ||||||
|  | #include "storage/tx_storage/ob_ls_handle.h" | ||||||
|  | #include "storage/tx_storage/ob_ls_service.h" | ||||||
|  | #include "storage/tx/ob_tx_data_functor.h" | ||||||
|  | #include "storage/tablet/ob_tablet.h" | ||||||
|  | #include "storage/ob_relative_table.h" | ||||||
|  | #include "storage/ob_dml_running_ctx.h" | ||||||
|  | #include "storage/access/ob_rows_info.h" | ||||||
|  | static int qcc = 0; | ||||||
|  | static int qcc2 = 0; | ||||||
|  | static int qcc3 = 0; | ||||||
|  | namespace oceanbase | ||||||
|  | { | ||||||
|  | namespace storage | ||||||
|  | { | ||||||
|  | int ObLSTabletService::insert_tablet_rows( | ||||||
|  |     const int64_t row_count, | ||||||
|  |     ObTabletHandle &tablet_handle, | ||||||
|  |     ObDMLRunningCtx &run_ctx, | ||||||
|  |     ObStoreRow *rows, | ||||||
|  |     ObRowsInfo &rows_info) | ||||||
|  | { | ||||||
|  |   int ret = OB_SUCCESS; | ||||||
|  |   ObRelativeTable &table = run_ctx.relative_table_; | ||||||
|  |   const bool check_exists = !table.is_storage_index_table() || table.is_unique_index(); | ||||||
|  |   bool exists = false; | ||||||
|  |   // // 1. Defensive checking of new rows. | ||||||
|  |   // if (GCONF.enable_defensive_check()) { | ||||||
|  |   //   for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) { | ||||||
|  |   //     ObStoreRow &tbl_row = rows[i]; | ||||||
|  |   //     if (OB_FAIL(check_new_row_legitimacy(run_ctx, tbl_row.row_val_))) { | ||||||
|  |   //       LOG_WARN("Failed to check new row legitimacy", K(ret), K_(tbl_row.row_val)); | ||||||
|  |   //     } | ||||||
|  |   //   } | ||||||
|  |   // } | ||||||
|  |  | ||||||
|  |   // 2. Check uniqueness constraint in memetable only(active + frozen). | ||||||
|  |   // It would be more efficient and elegant to completely merge the uniqueness constraint | ||||||
|  |   // and write conflict checking, but the implementation currently is to minimize intrusion | ||||||
|  |   // into the memtable. | ||||||
|  |   // if (check_exists && OB_FAIL(tablet_handle.get_obj()->rowkeys_exists(run_ctx.store_ctx_, table, | ||||||
|  |   //                                                                     rows_info, exists))) { | ||||||
|  |   //   LOG_WARN("Failed to check the uniqueness constraint", K(ret), K(rows_info)); | ||||||
|  |   // } else if (exists) { | ||||||
|  |   //   ret = OB_ERR_PRIMARY_KEY_DUPLICATE; | ||||||
|  |   //   blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey(); | ||||||
|  |   //   LOG_WARN("Rowkey already exist", K(ret), K(table), K(duplicate_rowkey)); | ||||||
|  |   // } | ||||||
|  |  | ||||||
|  |   // 3. Insert rows with uniqueness constraint and write conflict checking. | ||||||
|  |   // Check write conflict in memtable + sstable. | ||||||
|  |   // Check uniqueness constraint in sstable only. | ||||||
|  |   if (OB_SUCC(ret)) { | ||||||
|  |     if (OB_FAIL(tablet_handle.get_obj()->insert_rows(table, run_ctx.store_ctx_, rows, rows_info, | ||||||
|  |         check_exists, *run_ctx.col_descs_, row_count, run_ctx.dml_param_.encrypt_meta_))) { | ||||||
|  |       if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { | ||||||
|  |         blocksstable::ObDatumRowkey &duplicate_rowkey = rows_info.get_conflict_rowkey(); | ||||||
|  |         TRANS_LOG(WARN, "Rowkey already exist", K(ret), K(table), K(duplicate_rowkey), | ||||||
|  |                  K(rows_info.get_conflict_idx())); | ||||||
|  |       } else if (OB_TRY_LOCK_ROW_CONFLICT != ret) { | ||||||
|  |         TRANS_LOG(WARN, "Failed to insert rows to tablet", K(ret), K(rows_info)); | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   // 4. Log user error message if rowkey is duplicate. | ||||||
|  |   if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret && !run_ctx.dml_param_.is_ignore_) { | ||||||
|  |     int tmp_ret = OB_SUCCESS; | ||||||
|  |     char rowkey_buffer[OB_TMP_BUF_SIZE_256]; | ||||||
|  |     ObString index_name = "PRIMARY"; | ||||||
|  |     if (OB_TMP_FAIL(extract_rowkey(table, rows_info.get_conflict_rowkey(), | ||||||
|  |          rowkey_buffer, OB_TMP_BUF_SIZE_256, run_ctx.dml_param_.tz_info_))) { | ||||||
|  |       TRANS_LOG(WARN, "Failed to extract rowkey", K(ret), K(tmp_ret)); | ||||||
|  |     } | ||||||
|  |     if (table.is_index_table()) { | ||||||
|  |       if (OB_TMP_FAIL(table.get_index_name(index_name))) { | ||||||
|  |         TRANS_LOG(WARN, "Failed to get index name", K(ret), K(tmp_ret)); | ||||||
|  |       } | ||||||
|  |     } else if (lib::is_oracle_mode() && OB_TMP_FAIL(table.get_primary_key_name(index_name))) { | ||||||
|  |       TRANS_LOG(WARN, "Failed to get pk name", K(ret), K(tmp_ret)); | ||||||
|  |     } | ||||||
|  |     LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, rowkey_buffer, index_name.length(), index_name.ptr()); | ||||||
|  |   } | ||||||
|  |   return ret; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_table) | ||||||
|  | { | ||||||
|  |   int ret = OB_SUCCESS; | ||||||
|  |   ObTabletTableIterator &iter = relative_table.tablet_iter_; | ||||||
|  |   const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_; | ||||||
|  |   const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_; | ||||||
|  |   bool need_print = false; | ||||||
|  |   if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 0 && qcc2 == 0) { | ||||||
|  |     need_print = true; | ||||||
|  |     qcc2++; | ||||||
|  |     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable())); | ||||||
|  |     usleep(1 * 1000 * 1000); | ||||||
|  |     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable())); | ||||||
|  |   } | ||||||
|  |   if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 1 && qcc3 == 0) { | ||||||
|  |     while (qcc2 == 0) { | ||||||
|  |       usleep(1000); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   while (OB_SUCC(ret) && need_to_refresh_table(*iter.table_iter())) { | ||||||
|  |     if (OB_FAIL(store_ctx_.ls_->get_tablet_svr()->get_read_tables( | ||||||
|  |         tablet_id, | ||||||
|  |         ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US, | ||||||
|  |         store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(), | ||||||
|  |         iter, | ||||||
|  |         relative_table.allow_not_ready()))) { | ||||||
|  |       TRANS_LOG(WARN, "fail to get", K(store_ctx_.mvcc_acc_ctx_.tx_id_), K(ret)); | ||||||
|  |     } else { | ||||||
|  |       // no worry. iter will hold tablet reference and its life cycle is longer than guard | ||||||
|  |       tablet_ = iter.get_tablet(); | ||||||
|  |       if (store_ctx_.timeout_ > 0) { | ||||||
|  |         const int64_t query_left_time = store_ctx_.timeout_ - ObTimeUtility::current_time(); | ||||||
|  |         if (query_left_time <= 0) { | ||||||
|  |           ret = OB_TRANS_STMT_TIMEOUT; | ||||||
|  |         } | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   if (need_print) { | ||||||
|  |     TRANS_LOG(INFO, "qc debug", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable())); | ||||||
|  |   } | ||||||
|  |   if (OB_SUCC(ret)) { | ||||||
|  |     if (tablet_id.id() == 200001 && store_ctx_.mvcc_acc_ctx_.tx_id_.get_id() % 2 == 1 && qcc3 == 0) { | ||||||
|  |       qcc++; | ||||||
|  |       qcc3++; | ||||||
|  |       TRANS_LOG(INFO, "qc debug2", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable())); | ||||||
|  |       usleep(2 * 1000 * 1000); | ||||||
|  |       TRANS_LOG(INFO, "qc debug2", K(store_ctx_.mvcc_acc_ctx_.tx_id_), KPC(iter.table_iter()->get_last_memtable())); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   return ret; | ||||||
|  | } | ||||||
|  | } | ||||||
|  | namespace unittest | ||||||
|  | { | ||||||
|  | using namespace oceanbase::transaction; | ||||||
|  | using namespace oceanbase::storage; | ||||||
|  | using namespace oceanbase::memtable; | ||||||
|  | using namespace oceanbase::storage::checkpoint; | ||||||
|  | #define EXE_SQL(sql_str)                                            \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                       \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); | ||||||
|  | #define EXE_SQL_FMT(...)                                            \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__));               \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); | ||||||
|  | #define WRITE_SQL_BY_CONN(conn, sql_str)                                \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                           \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); | ||||||
|  | #define WRITE_SQL_FMT_BY_CONN(conn, ...)                                \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__));                   \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); | ||||||
|  | #define READ_SQL_BY_CONN(conn, sql_str)         \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                           \ | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res)); | ||||||
|  | class ObCallbackReverseTest : public ObSimpleClusterTestBase | ||||||
|  | { | ||||||
|  | public: | ||||||
|  |   ObCallbackReverseTest() : ObSimpleClusterTestBase("callbacks_with_reverse_order", "200G", "40G") {} | ||||||
|  |   void prepare_tenant_env() | ||||||
|  |   { | ||||||
|  |     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); | ||||||
|  |     int64_t affected_rows = 0; | ||||||
|  |     ObSqlString sql; | ||||||
|  |     sqlclient::ObISQLConnection *connection = nullptr; | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); | ||||||
|  |     ASSERT_NE(nullptr, connection); | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000"); | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000"); | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;"); | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;"); | ||||||
|  |     sleep(5); | ||||||
|  |   } | ||||||
|  |   void create_test_tenant(uint64_t &tenant_id) | ||||||
|  |   { | ||||||
|  |     TRANS_LOG(INFO, "create_tenant start"); | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G")); | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id)); | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); | ||||||
|  |     TRANS_LOG(INFO, "create_tenant end", K(tenant_id)); | ||||||
|  |   } | ||||||
|  |   // you should use single partition when using it | ||||||
|  |   void get_tablet_id_with_table_name(const char *name, | ||||||
|  |                                      ObTabletID &tablet) | ||||||
|  |   { | ||||||
|  |     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy(); | ||||||
|  |     int ret = OB_SUCCESS; | ||||||
|  |     ObSqlString sql; | ||||||
|  |     int64_t affected_rows = 0; | ||||||
|  |     int64_t tablet_id = 0; | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select tablet_id from oceanbase.__all_virtual_table where table_name=%s", name)); | ||||||
|  |     SMART_VAR(ObMySQLProxy::MySQLResult, res) { | ||||||
|  |       ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); | ||||||
|  |       sqlclient::ObMySQLResult *result = res.get_result(); | ||||||
|  |       ASSERT_NE(nullptr, result); | ||||||
|  |       ASSERT_EQ(OB_SUCCESS, result->next()); | ||||||
|  |       ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id)); | ||||||
|  |     } | ||||||
|  |     tablet = (uint64_t)tablet_id; | ||||||
|  |   } | ||||||
|  |   void minor_freeze_data() | ||||||
|  |   { | ||||||
|  |     common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); | ||||||
|  |     sqlclient::ObISQLConnection *connection = nullptr; | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); | ||||||
|  |     int ret = OB_SUCCESS; | ||||||
|  |     ObSqlString sql; | ||||||
|  |     int64_t affected_rows = 0; | ||||||
|  |     WRITE_SQL_BY_CONN(connection, "alter system minor freeze;"); | ||||||
|  |   } | ||||||
|  |   void get_ls(uint64_t tenant_id, ObLS *&ls) | ||||||
|  |   { | ||||||
|  |     ls = nullptr; | ||||||
|  |     share::ObTenantSwitchGuard tenant_guard; | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); | ||||||
|  |     ObLSService *ls_svr = MTL(ObLSService*); | ||||||
|  |     ASSERT_NE(nullptr, ls_svr); | ||||||
|  |     ObLSHandle handle; | ||||||
|  |     share::ObLSID ls_id(1001); | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD)); | ||||||
|  |     ASSERT_NE(nullptr, ls = handle.get_ls()); | ||||||
|  |   } | ||||||
|  |   void get_memtable(const ObTabletID tablet_id, | ||||||
|  |                     ObTableHandleV2 &handle) | ||||||
|  |   { | ||||||
|  |     ObLS *ls = NULL; | ||||||
|  |     get_ls(1002, ls); | ||||||
|  |     ObTabletHandle tablet_handle; | ||||||
|  |     ObTablet *tablet = nullptr; | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, ls->get_tablet_svr()->get_tablet(tablet_id, tablet_handle)); | ||||||
|  |     tablet = tablet_handle.get_obj(); | ||||||
|  |     ASSERT_EQ(OB_SUCCESS, tablet->get_active_memtable(handle)); | ||||||
|  |   } | ||||||
|  | private: | ||||||
|  | }; | ||||||
|  | TEST_F(ObCallbackReverseTest, callback_reverse_test) | ||||||
|  | { | ||||||
|  |   ObSqlString sql; | ||||||
|  |   int64_t affected_rows = 0; | ||||||
|  |   // ============================== Phase1. create tenant and table ============================== | ||||||
|  |   TRANS_LOG(INFO, "create tenant start"); | ||||||
|  |   uint64_t tenant_id = 0; | ||||||
|  |   create_test_tenant(tenant_id); | ||||||
|  |   TRANS_LOG(INFO, "create tenant end"); | ||||||
|  |   share::ObTenantSwitchGuard tenant_guard; | ||||||
|  |   ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); | ||||||
|  |   TRANS_LOG(INFO, "create table start"); | ||||||
|  |   common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); | ||||||
|  |   EXE_SQL("create table qcc (a int primary key)"); | ||||||
|  |   usleep(10 * 1000 * 1000); | ||||||
|  |   TRANS_LOG(INFO, "create_table end"); | ||||||
|  |   prepare_tenant_env(); | ||||||
|  |   std::thread t1( | ||||||
|  |     [this]() { | ||||||
|  |       ObSqlString sql; | ||||||
|  |       int64_t affected_rows = 0; | ||||||
|  |       common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); | ||||||
|  |       sqlclient::ObISQLConnection *connection = nullptr; | ||||||
|  |       ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); | ||||||
|  |       ASSERT_NE(nullptr, connection); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_idle_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_query_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_lock_timeout = 0"); | ||||||
|  |       TRANS_LOG(INFO, "insert data start1"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "begin;"); | ||||||
|  |       WRITE_SQL_FMT_BY_CONN(connection, "insert into qcc values(1);"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "commit;"); | ||||||
|  |       TRANS_LOG(INFO, "insert data end1"); | ||||||
|  |     }); | ||||||
|  |   std::thread t2( | ||||||
|  |     [this]() { | ||||||
|  |       ObSqlString sql; | ||||||
|  |       int64_t affected_rows = 0; | ||||||
|  |       common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); | ||||||
|  |       sqlclient::ObISQLConnection *connection = nullptr; | ||||||
|  |       ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); | ||||||
|  |       ASSERT_NE(nullptr, connection); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_idle_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_query_timeout = 10000000000"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "set SESSION ob_trx_lock_timeout = 0"); | ||||||
|  |       TRANS_LOG(INFO, "insert data start2"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "begin;"); | ||||||
|  |       WRITE_SQL_FMT_BY_CONN(connection, "insert into qcc values(1);"); | ||||||
|  |       WRITE_SQL_BY_CONN(connection, "commit;"); | ||||||
|  |       TRANS_LOG(INFO, "insert data end2"); | ||||||
|  |     }); | ||||||
|  |   std::thread t3( | ||||||
|  |     [this]() { | ||||||
|  |       while (qcc == 0) { | ||||||
|  |         TRANS_LOG(INFO, "qcc is not increased", K(qcc)); | ||||||
|  |         usleep(100 * 1000); | ||||||
|  |       } | ||||||
|  |       minor_freeze_data(); | ||||||
|  |     }); | ||||||
|  |   t1.join(); | ||||||
|  |   t2.join(); | ||||||
|  |   t3.join(); | ||||||
|  |   ASSERT_EQ(1, qcc); | ||||||
|  | } | ||||||
|  | } // namespace unittest | ||||||
|  | } // namespace oceanbase | ||||||
|  | int main(int argc, char **argv) | ||||||
|  | { | ||||||
|  |   using namespace oceanbase::unittest; | ||||||
|  |   oceanbase::unittest::init_log_and_gtest(argc, argv); | ||||||
|  |   OB_LOGGER.set_log_level("info"); | ||||||
|  |   ::testing::InitGoogleTest(&argc, argv); | ||||||
|  |   return RUN_ALL_TESTS(); | ||||||
|  | } | ||||||
| @ -165,7 +165,7 @@ int ObTabletMemtableMgr::reset_storage_recorder() | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| inline int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_( | inline int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_for_leader_( | ||||||
|   memtable::ObMemtable *last_frozen_memtable, |   memtable::ObMemtable *last_frozen_memtable, | ||||||
|   memtable::ObMemtable *new_memtable) |   memtable::ObMemtable *new_memtable) | ||||||
| { | { | ||||||
| @ -192,14 +192,23 @@ inline int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_( | |||||||
|     double_check = !double_check; |     double_check = !double_check; | ||||||
|   } while (!can_resolve && double_check); |   } while (!can_resolve && double_check); | ||||||
|  |  | ||||||
|   if (can_resolve) { |  | ||||||
|  |   if (write_ref > 0) { | ||||||
|  |     // NB: for the leader, if the write ref on the frozen memtable is greater | ||||||
|  |     // than 0, we cannot create a new memtable. Otherwise we may finish the | ||||||
|  |     // write on the new memtable before finishing the write on the frozen | ||||||
|  |     // memtable and cause the writes and callbacks on memtable_ctx out of order. | ||||||
|  |     ret = OB_EAGAIN; | ||||||
|  |     TRANS_LOG(INFO, "last frozen's write flag is not 0 during create new memtable", | ||||||
|  |               KPC(last_frozen_memtable), KPC(new_memtable)); | ||||||
|  |   } else if (can_resolve) { | ||||||
|     last_frozen_memtable->set_resolved_active_memtable_left_boundary(true); |     last_frozen_memtable->set_resolved_active_memtable_left_boundary(true); | ||||||
|     last_frozen_memtable->resolve_right_boundary(); |     last_frozen_memtable->resolve_right_boundary(); | ||||||
|     TRANS_LOG(INFO, "[resolve_right_boundary] in create_memtable on leader", KPC(last_frozen_memtable)); |     TRANS_LOG(INFO, "[resolve_right_boundary] in create_memtable on leader", KPC(last_frozen_memtable)); | ||||||
|     if (new_memtable != last_frozen_memtable) { |     if (new_memtable != last_frozen_memtable) { | ||||||
|       new_memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn()); |       new_memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn()); | ||||||
|     } |     } | ||||||
|   } else if (unsubmitted_cnt > 0 || write_ref > 0) { |   } else if (unsubmitted_cnt > 0) { | ||||||
|     new_memtable->set_logging_blocked(); |     new_memtable->set_logging_blocked(); | ||||||
|     TRANS_LOG(INFO, "set new memtable logging blocked", KPC(last_frozen_memtable), KPC(new_memtable)); |     TRANS_LOG(INFO, "set new memtable logging blocked", KPC(last_frozen_memtable), KPC(new_memtable)); | ||||||
|   } |   } | ||||||
| @ -307,7 +316,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn, | |||||||
|           } |           } | ||||||
|         } |         } | ||||||
|         // for leader, decide the right boundary of frozen memtable |         // for leader, decide the right boundary of frozen memtable | ||||||
|         else if (OB_FAIL(try_resolve_boundary_on_create_memtable_(last_frozen_memtable, memtable))) { |         else if (OB_FAIL(try_resolve_boundary_on_create_memtable_for_leader_(last_frozen_memtable, memtable))) { | ||||||
|           TRANS_LOG(WARN, "try resolve boundary fail", K(ret)); |           TRANS_LOG(WARN, "try resolve boundary fail", K(ret)); | ||||||
|         } |         } | ||||||
|       } else { |       } else { | ||||||
|  | |||||||
| @ -126,7 +126,7 @@ private: | |||||||
|   int get_first_frozen_memtable_(ObTableHandleV2 &handle) const; |   int get_first_frozen_memtable_(ObTableHandleV2 &handle) const; | ||||||
|   void clean_tail_memtable_(); |   void clean_tail_memtable_(); | ||||||
|   int get_last_frozen_memtable_(ObTableHandleV2 &handle) const; |   int get_last_frozen_memtable_(ObTableHandleV2 &handle) const; | ||||||
|   int try_resolve_boundary_on_create_memtable_(memtable::ObMemtable *last_frozen_memtable, |   int try_resolve_boundary_on_create_memtable_for_leader_(memtable::ObMemtable *last_frozen_memtable, | ||||||
|                                                memtable::ObMemtable *new_memtable); |                                                memtable::ObMemtable *new_memtable); | ||||||
|   int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable, |   int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable, | ||||||
|                                                 share::SCN start_scn, |                                                 share::SCN start_scn, | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 Handora
					Handora