/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define private public #include "ob_dml_check.h" #include "storage/ob_partition_service.h" #include "storage/transaction/ob_trans_service.h" #include "mock_ob_iterator.h" namespace oceanbase { using namespace common; using namespace storage; using namespace transaction; using namespace share; using namespace share::schema; using namespace sql; namespace unittest { void check_result_iter(ObNewRowIterator& result, ObStoreRowIterator& expected) { int ret1 = OB_SUCCESS; int ret2 = OB_SUCCESS; int i = 0; ObNewRow* row1 = NULL; const ObStoreRow* row2 = NULL; ret1 = result.get_next_row(row1); ret2 = expected.get_next_row(row2); while (OB_SUCCESS == ret1 && OB_SUCCESS == ret2) { ASSERT_EQ(row1->count_, row2->row_val_.count_); for (int j = 0; j < row1->count_; ++j) { ASSERT_EQ(row1->cells_[j], row2->row_val_.cells_[j]) << "row:" << i << " col:" << j; } ++i; ret1 = result.get_next_row(row1); ret2 = expected.get_next_row(row2); } ASSERT_EQ(ret1, ret2); ASSERT_EQ(OB_ITER_END, ret2); } void do_scan_check( const common::ObPartitionKey& pkey, const char* scan_str, ObTableScanParam& scan_param, MockObServer& server) { int ret = OB_SUCCESS; ObTransDesc trans_desc; uint64_t tenant_id = OB_SYS_TENANT_ID; const int64_t thread_id = 100; bool rollback = false; int64_t timeout = 1000000000; int64_t idle_timeout = 500000000; int64_t trans_expired_time = ObTimeUtility::current_time() + timeout; int64_t stmt_expired_time = ObTimeUtility::current_time() + idle_timeout; ObPartitionService* partition_service = server.get_partition_service(); EXPECT_TRUE(NULL != partition_service); ObStartTransParam trans_param; trans_param.set_access_mode(ObTransAccessMode::READ_WRITE); trans_param.set_type(ObTransType::TRANS_USER); trans_param.set_isolation(ObTransIsolation::READ_COMMITED); trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); ObPartitionArray participants; ObPartitionArray discard_participants; EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc(); stmt_desc.phy_plan_type_ = OB_PHY_PLAN_DISTRIBUTED; stmt_desc.stmt_type_ = stmt::T_SELECT; stmt_desc.is_sfu_ = false; stmt_desc.consistency_level_ = ObTransConsistencyLevel::STRONG; ObPartitionLeaderArray arr; ObStmtParam stmt_param; EXPECT_EQ(OB_SUCCESS, stmt_param.init(tenant_id, stmt_expired_time, false)); sleep(1); // begine transaction ret = partition_service->start_trans(tenant_id, thread_id, trans_param, trans_expired_time, 0, 0, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, arr.push(pkey, trans_desc.get_scheduler())); TRANS_LOG(INFO, "start statment", K(tenant_id), K(trans_desc), K(participants)); ret = partition_service->start_stmt(stmt_param, trans_desc, arr, participants); TRANS_LOG(INFO, "start statment ok"); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObPartitionEpochArray partition_epoch_arr; ret = partition_service->start_participant(trans_desc, participants, partition_epoch_arr); EXPECT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "start participant ok"); // scan rows scan_param.trans_desc_ = &trans_desc; scan_param.pkey_ = pkey; scan_param.timeout_ = ObTimeUtility::current_time() + 1000000000; scan_param.scan_flag_.flag_ = 0; // no flag scan_param.reserved_cell_count_ = 6; scan_param.schema_version_ = ObRestoreSchema::RESTORE_SCHEMA_VERSION; ObNewRowIterator* scan_iter = NULL; ret = partition_service->table_scan(scan_param, scan_iter); EXPECT_EQ(OB_SUCCESS, ret); ObMockIterator scan_expect; ASSERT_EQ(OB_SUCCESS, scan_expect.from(scan_str)); check_result_iter(*scan_iter, scan_expect); ret = partition_service->revert_scan_iter(scan_iter); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ret = partition_service->end_participant(rollback, trans_desc, participants); EXPECT_EQ(OB_SUCCESS, ret); // end scan statement transaction::ObPartitionEpochArray epoch_arr; bool incomplete = false; ret = partition_service->end_stmt(rollback, incomplete, participants, epoch_arr, discard_participants, arr, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); // end transaction static MockObEndTransCallback callback; ret = partition_service->end_trans(rollback, trans_desc, callback, ObTimeUtility::current_time() + 100000000); EXPECT_EQ(OB_SUCCESS, ret); } // void do_create_check(const common::ObPartitionKey &pkey, // const ObVersion &version, // const ObMemberList &member_list, // const int64_t replica_num, // MockObServer &server) //{ // int ret = OB_SUCCESS; // ObArray schemas; // ObTableSchema table_schema; // ObPartitionService *partition_service = server.get_partition_service(); // EXPECT_TRUE(NULL != partition_service); // int64_t now = ObTimeUtility::current_time(); // int64_t schema_version = 0; // ret = ObInnerTableSchema::all_core_table_schema(table_schema); // EXPECT_EQ(OB_SUCCESS, ret); // table_schema.set_table_id(pkey.get_table_id()); // ret = schemas.push_back(table_schema); // EXPECT_EQ(OB_SUCCESS, ret); // ret = partition_service->create_partition(pkey, // schema_version, // version, // replica_num, // member_list, // server.get_self(), // now, // now, // schemas); // EXPECT_EQ(OB_SUCCESS, ret); // sleep(1); //} void do_insert_check(const common::ObPartitionKey& pkey, const char* ins_str, const common::ObIArray& column_ids, MockObServer& server) { int ret = OB_SUCCESS; ObTransDesc trans_desc; uint64_t tenant_id = OB_SYS_TENANT_ID; const int64_t thread_id = 100; bool rollback = false; int64_t timeout = 1000000000; int64_t idle_timeout = 500000000; int64_t trans_expired_time = ObTimeUtility::current_time() + timeout; int64_t stmt_expired_time = ObTimeUtility::current_time() + idle_timeout; int64_t affected_rows = 0; ObPartitionService* partition_service = server.get_partition_service(); EXPECT_TRUE(NULL != partition_service); ObStartTransParam trans_param; trans_param.set_access_mode(ObTransAccessMode::READ_WRITE); trans_param.set_type(ObTransType::TRANS_USER); trans_param.set_isolation(ObTransIsolation::READ_COMMITED); trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); ObPartitionArray participants; ObPartitionArray discard_participants; EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc(); stmt_desc.phy_plan_type_ = OB_PHY_PLAN_DISTRIBUTED; stmt_desc.stmt_type_ = stmt::T_INSERT; stmt_desc.is_sfu_ = false; stmt_desc.consistency_level_ = ObTransConsistencyLevel::STRONG; ObPartitionLeaderArray arr; ObStmtParam stmt_param; EXPECT_EQ(OB_SUCCESS, stmt_param.init(tenant_id, stmt_expired_time, false)); sleep(1); // begine transaction ret = partition_service->start_trans(tenant_id, thread_id, trans_param, trans_expired_time, 0, 0, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, arr.push(pkey, trans_desc.get_scheduler())); TRANS_LOG(INFO, "start statment", K(tenant_id), K(trans_desc), K(participants)); ret = partition_service->start_stmt(stmt_param, trans_desc, arr, participants); EXPECT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "start statment ok"); EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObPartitionEpochArray partition_epoch_arr; ret = partition_service->start_participant(trans_desc, participants, partition_epoch_arr); EXPECT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "start participant ok"); // insert values ObMockNewRowIterator ins_iter; ASSERT_EQ(OB_SUCCESS, ins_iter.from(ins_str)); ObDMLBaseParam dml_param; dml_param.timeout_ = ObTimeUtility::current_time() + timeout; dml_param.schema_version_ = ObRestoreSchema::RESTORE_SCHEMA_VERSION; ret = partition_service->insert_rows(trans_desc, dml_param, pkey, column_ids, &ins_iter, affected_rows); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ret = partition_service->end_participant(rollback, trans_desc, participants); EXPECT_EQ(OB_SUCCESS, ret); // end insert statement transaction::ObPartitionEpochArray epoch_arr; bool incomplete = false; ret = partition_service->end_stmt(rollback, incomplete, participants, epoch_arr, discard_participants, arr, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); // end transaction static MockObEndTransCallback callback; ret = partition_service->end_trans(rollback, trans_desc, callback, ObTimeUtility::current_time() + 100000000); EXPECT_EQ(OB_SUCCESS, ret); } void do_insert_duplicate_check(const common::ObPartitionKey& pkey, const common::ObIArray& column_ids, const common::ObIArray& duplicated_column_ids, const common::ObNewRow& row, const ObInsertFlag flag, const char* duplicate_str, // NULL if no confliction MockObServer& server) { int ret = OB_SUCCESS; ObTransDesc trans_desc; uint64_t tenant_id = OB_SYS_TENANT_ID; const int64_t thread_id = 100; bool rollback = false; int64_t timeout = 1000000000; int64_t idle_timeout = 500000000; int64_t trans_expired_time = ObTimeUtility::current_time() + timeout; int64_t stmt_expired_time = ObTimeUtility::current_time() + idle_timeout; int64_t affected_rows = 0; ObPartitionService* partition_service = server.get_partition_service(); EXPECT_TRUE(NULL != partition_service); ObStartTransParam trans_param; trans_param.set_access_mode(ObTransAccessMode::READ_WRITE); trans_param.set_type(ObTransType::TRANS_USER); trans_param.set_isolation(ObTransIsolation::READ_COMMITED); trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); ObPartitionArray participants; ObPartitionArray discard_participants; EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc(); stmt_desc.phy_plan_type_ = OB_PHY_PLAN_DISTRIBUTED; stmt_desc.stmt_type_ = stmt::T_INSERT; stmt_desc.is_sfu_ = false; stmt_desc.consistency_level_ = ObTransConsistencyLevel::STRONG; ObPartitionLeaderArray arr; ObStmtParam stmt_param; EXPECT_EQ(OB_SUCCESS, stmt_param.init(tenant_id, stmt_expired_time, false)); // begine transaction ret = partition_service->start_trans(tenant_id, thread_id, trans_param, trans_expired_time, 0, 0, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, arr.push(pkey, trans_desc.get_scheduler())); TRANS_LOG(INFO, "start statment", K(tenant_id), K(trans_desc), K(participants)); ret = partition_service->start_stmt(stmt_param, trans_desc, arr, participants); EXPECT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "start statment ok"); EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ObPartitionEpochArray partition_epoch_arr; ret = partition_service->start_participant(trans_desc, participants, partition_epoch_arr); EXPECT_EQ(OB_SUCCESS, ret); TRANS_LOG(INFO, "start participant ok"); // insert values ObNewRowIterator* duplicated_rows = NULL; ObDMLBaseParam dml_param; dml_param.timeout_ = ObTimeUtility::current_time() + timeout; dml_param.schema_version_ = ObRestoreSchema::RESTORE_SCHEMA_VERSION; ret = partition_service->insert_row( trans_desc, dml_param, pkey, column_ids, duplicated_column_ids, row, flag, affected_rows, duplicated_rows); if (NULL != duplicated_rows) { EXPECT_EQ(OB_ERR_PRIMARY_KEY_DUPLICATE, ret); } else { EXPECT_EQ(OB_SUCCESS, ret); } if (NULL != duplicate_str) { EXPECT_TRUE(NULL != duplicated_rows); ObMockIterator dup_iter; ASSERT_EQ(OB_SUCCESS, dup_iter.from(duplicate_str)); check_result_iter(*duplicated_rows, dup_iter); ASSERT_EQ(OB_SUCCESS, partition_service->revert_insert_iter(pkey, duplicated_rows)); } EXPECT_EQ(OB_SUCCESS, participants.push_back(pkey)); ret = partition_service->end_participant(rollback, trans_desc, participants); EXPECT_EQ(OB_SUCCESS, ret); // end insert statement transaction::ObPartitionEpochArray epoch_arr; bool incomplete = false; ret = partition_service->end_stmt(rollback, incomplete, participants, epoch_arr, discard_participants, arr, trans_desc); EXPECT_EQ(OB_SUCCESS, ret); // end transaction static MockObEndTransCallback callback; ret = partition_service->end_trans(rollback, trans_desc, callback, ObTimeUtility::current_time() + 100000000); EXPECT_EQ(OB_SUCCESS, ret); } } // namespace unittest } // namespace oceanbase