From 8700ec2bbea5874a3cfb8ddd84cfbe6cea080f05 Mon Sep 17 00:00:00 2001 From: "shenyunlong.syl" Date: Tue, 14 May 2024 16:44:55 +0000 Subject: [PATCH] [CP] [OBKV] add obkv error code for filter syntax error && rename query_sync to query_async && error code is overwritten --- src/libtable/src/ob_table.h | 12 +- src/libtable/src/ob_table_impl.cpp | 4 +- src/libtable/src/ob_table_impl.h | 4 +- src/libtable/src/ob_table_rpc_impl.cpp | 38 ++--- src/libtable/src/ob_table_rpc_impl.h | 18 +-- src/libtable/test/ob_batch_execute_test.cpp | 44 +++--- src/libtable/test/run_test_table_api.sh | 20 +-- src/observer/CMakeLists.txt | 2 +- src/observer/ob_srv_xlator_partition.cpp | 4 +- .../table/ob_htable_filter_parser.cpp | 3 +- src/observer/table/ob_table_filter.h | 6 +- .../ob_table_query_and_mutate_processor.cpp | 4 +- ...cpp => ob_table_query_async_processor.cpp} | 142 +++++++++--------- ...sor.h => ob_table_query_async_processor.h} | 82 +++++----- .../table/ob_table_rpc_processor_util.h | 8 +- src/share/table/ob_table.cpp | 2 +- src/share/table/ob_table.h | 6 +- src/share/table/ob_table_rpc_proxy.h | 2 +- src/share/table/ob_table_rpc_struct.cpp | 2 +- src/share/table/ob_table_rpc_struct.h | 6 +- unittest/observer/table/hfilter_parser.test | 20 +-- 21 files changed, 216 insertions(+), 213 deletions(-) rename src/observer/table/{ob_table_query_sync_processor.cpp => ob_table_query_async_processor.cpp} (82%) rename src/observer/table/{ob_table_query_sync_processor.h => ob_table_query_async_processor.h} (73%) diff --git a/src/libtable/src/ob_table.h b/src/libtable/src/ob_table.h index 74ba615f68..87d81c6db1 100644 --- a/src/libtable/src/ob_table.h +++ b/src/libtable/src/ob_table.h @@ -45,10 +45,10 @@ public: virtual int execute_query_and_mutate(const ObTableQueryAndMutate &query_and_mutate, const ObTableRequestOptions &request_options, ObTableQueryAndMutateResult &result) = 0; int execute_query_and_mutate(const ObTableQueryAndMutate &query_and_mutate, ObTableQueryAndMutateResult &result); /// executes a sync query on a table - virtual int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) = 0; - int query_start(const ObTableQuery& query, ObTableQuerySyncResult *&result); - virtual int query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) = 0; - int query_next(ObTableQuerySyncResult *&result); + virtual int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) = 0; + int query_start(const ObTableQuery& query, ObTableQueryAsyncResult *&result); + virtual int query_next(const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) = 0; + int query_next(ObTableQueryAsyncResult *&result); private: DISALLOW_COPY_AND_ASSIGN(ObTable); protected: @@ -81,12 +81,12 @@ inline int ObTable::execute_query_and_mutate(const ObTableQueryAndMutate &query_ return this->execute_query_and_mutate(query_and_mutate, request_options_, result); } -inline int ObTable::query_start(const ObTableQuery &query, ObTableQuerySyncResult *&result) +inline int ObTable::query_start(const ObTableQuery &query, ObTableQueryAsyncResult *&result) { return query_start(query, request_options_, result); } -inline int ObTable::query_next(ObTableQuerySyncResult *&result) +inline int ObTable::query_next(ObTableQueryAsyncResult *&result) { return query_next(request_options_, result); } diff --git a/src/libtable/src/ob_table_impl.cpp b/src/libtable/src/ob_table_impl.cpp index d11c657a16..3ee50828a5 100644 --- a/src/libtable/src/ob_table_impl.cpp +++ b/src/libtable/src/ob_table_impl.cpp @@ -719,7 +719,7 @@ int ObTableImpl::execute_query_and_mutate(const ObTableQueryAndMutate &query_and return ret; } -int ObTableImpl::query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) +int ObTableImpl::query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) { int ret = OB_NOT_IMPLEMENT; UNUSED(query); @@ -728,7 +728,7 @@ int ObTableImpl::query_start(const ObTableQuery& query, const ObTableRequestOpti return ret; } -int ObTableImpl::query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) +int ObTableImpl::query_next(const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) { int ret = OB_NOT_IMPLEMENT; UNUSED(request_options); diff --git a/src/libtable/src/ob_table_impl.h b/src/libtable/src/ob_table_impl.h index f3f0fae382..343466222a 100644 --- a/src/libtable/src/ob_table_impl.h +++ b/src/libtable/src/ob_table_impl.h @@ -48,8 +48,8 @@ public: int execute_query(const ObTableQuery &query, const ObTableRequestOptions &request_options, ObTableEntityIterator *&result) override; int execute_query_and_mutate(const ObTableQueryAndMutate &query_and_mutate, const ObTableRequestOptions &request_options, ObTableQueryAndMutateResult &result) override; /// execute a sync query on a table - int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) override; - int query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) override; + int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) override; + int query_next(const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) override; private: friend class ObTableServiceClientImpl; ObTableImpl(); diff --git a/src/libtable/src/ob_table_rpc_impl.cpp b/src/libtable/src/ob_table_rpc_impl.cpp index 30a8791998..3222e4cf50 100644 --- a/src/libtable/src/ob_table_rpc_impl.cpp +++ b/src/libtable/src/ob_table_rpc_impl.cpp @@ -537,7 +537,7 @@ int ObTableRpcImpl::execute_query_and_mutate(const ObTableQueryAndMutate &query_ return ret; } -int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) +int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) { int ret = OB_SUCCESS; if (!inited_) { @@ -559,7 +559,7 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO } else if (OB_FAIL(tablet_loc.get_leader(leader_loc))) { LOG_WARN("failed to find leader location", K(ret), K(tablet_loc)); } else { - ObTableQuerySyncRequest request; + ObTableQueryAsyncRequest request; request.query_ = query; request.table_name_ = table_name_; request.table_id_ = table_id_; @@ -569,17 +569,17 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO request.consistency_level_ = request_options.consistency_level(); request.query_session_id_ = 0; request.query_type_ = ObQueryOperationType::QUERY_START; - query_sync_multi_result_.reset(); - result = &query_sync_multi_result_.get_one_result(); + query_async_multi_result_.reset(); + result = &query_async_multi_result_.get_one_result(); ret = rpc_proxy_-> timeout(request_options.server_timeout()) .to(leader_loc.get_server()) - .execute_query_sync(request, *result); + .execute_query_async(request, *result); if (OB_SUCC(ret)) { - query_sync_multi_result_.server_addr_ = leader_loc.get_server(); - query_sync_multi_result_.has_more_ = !result->is_end_; - query_sync_multi_result_.session_id_ = result->query_session_id_; - query_sync_multi_result_.result_packet_count_++; + query_async_multi_result_.server_addr_ = leader_loc.get_server(); + query_async_multi_result_.has_more_ = !result->is_end_; + query_async_multi_result_.session_id_ = result->query_session_id_; + query_async_multi_result_.result_packet_count_++; if (result->is_end_ && result->get_row_count() == 0) { ret = OB_ITER_END; } @@ -591,31 +591,31 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO return ret; } -int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) { +int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) { int ret = OB_SUCCESS; if (!inited_) { ret = OB_NOT_INIT; LOG_WARN("not init already", K(ret)); - } else if(!query_sync_multi_result_.has_more_) { + } else if(!query_async_multi_result_.has_more_) { ret = OB_ITER_END; LOG_DEBUG("no more result or query_start hasn't been executed yet", K(ret)); } else { - ObTableQuerySyncRequest request; + ObTableQueryAsyncRequest request; request.credential_ = client_->get_credential(); request.entity_type_ = this->entity_type_; request.consistency_level_ = request_options.consistency_level(); - request.query_session_id_ = query_sync_multi_result_.session_id_; + request.query_session_id_ = query_async_multi_result_.session_id_; request.query_type_ = ObQueryOperationType::QUERY_NEXT; - query_sync_multi_result_.has_more_ = false; - result = &query_sync_multi_result_.get_one_result(); + query_async_multi_result_.has_more_ = false; + result = &query_async_multi_result_.get_one_result(); result->reset(); ret = rpc_proxy_-> timeout(request_options.server_timeout()) - .to(query_sync_multi_result_.server_addr_) - .execute_query_sync(request, *result); + .to(query_async_multi_result_.server_addr_) + .execute_query_async(request, *result); if (OB_SUCC(ret)) { - query_sync_multi_result_.has_more_ = !result->is_end_; - query_sync_multi_result_.result_packet_count_++; + query_async_multi_result_.has_more_ = !result->is_end_; + query_async_multi_result_.result_packet_count_++; if (result->is_end_ && result->get_row_count() == 0) { ret = OB_ITER_END; } diff --git a/src/libtable/src/ob_table_rpc_impl.h b/src/libtable/src/ob_table_rpc_impl.h index 1c58015854..2d1c458507 100644 --- a/src/libtable/src/ob_table_rpc_impl.h +++ b/src/libtable/src/ob_table_rpc_impl.h @@ -33,8 +33,8 @@ public: int execute_query(const ObTableQuery &query, const ObTableRequestOptions &request_options, ObTableEntityIterator *&result) override; int execute_query_and_mutate(const ObTableQueryAndMutate &query_and_mutate, const ObTableRequestOptions &request_options, ObTableQueryAndMutateResult &result) override; /// executes a sync query on a table - int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) override; - int query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) override; + int query_start(const ObTableQuery& query, const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) override; + int query_next(const ObTableRequestOptions &request_options, ObTableQueryAsyncResult *&result) override; private: friend class ObTableServiceClientImpl; typedef ObSEArray IdxArray; @@ -63,13 +63,13 @@ private: DISALLOW_COPY_AND_ASSIGN(QueryMultiResult); }; - class QuerySyncMultiResult: public ObTableEntityIterator + class QueryAsyncMultiResult: public ObTableEntityIterator { public: - QuerySyncMultiResult() + QueryAsyncMultiResult() :result_packet_count_(0) {} - virtual ~QuerySyncMultiResult() { reset(); } + virtual ~QueryAsyncMultiResult() { reset(); } virtual int get_next_entity(const ObITableEntity *&entity) override { int ret = OB_NOT_IMPLEMENT; @@ -84,16 +84,16 @@ private: result_packet_count_ = 0; server_pkt_ts_ = -1; } - ObTableQuerySyncResult &get_one_result() { return one_result_; } + ObTableQueryAsyncResult &get_one_result() { return one_result_; } int64_t get_result_count() const { return result_packet_count_; } public: - ObTableQuerySyncResult one_result_; + ObTableQueryAsyncResult one_result_; int64_t result_packet_count_; int64_t server_pkt_ts_; // for packet validation common::ObAddr server_addr_; int64_t session_id_; bool has_more_; - DISALLOW_COPY_AND_ASSIGN(QuerySyncMultiResult); + DISALLOW_COPY_AND_ASSIGN(QueryAsyncMultiResult); }; private: @@ -118,7 +118,7 @@ private: common::ObArenaAllocator arena_; obrpc::ObTableRpcProxy *rpc_proxy_; QueryMultiResult query_multi_result_; - QuerySyncMultiResult query_sync_multi_result_; + QueryAsyncMultiResult query_async_multi_result_; // disallow copy DISALLOW_COPY_AND_ASSIGN(ObTableRpcImpl); }; diff --git a/src/libtable/test/ob_batch_execute_test.cpp b/src/libtable/test/ob_batch_execute_test.cpp index 53c3912835..ad9a81a0e3 100644 --- a/src/libtable/test/ob_batch_execute_test.cpp +++ b/src/libtable/test/ob_batch_execute_test.cpp @@ -9078,12 +9078,12 @@ TEST_F(TestBatchExecute, htable_append) the_table = NULL; } -// create table if not exists query_sync_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2)); -TEST_F(TestBatchExecute, query_sync_multi_batch) +// create table if not exists query_async_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2)); +TEST_F(TestBatchExecute, query_async_multi_batch) { // setup ObTable *the_table = NULL; - int ret = service_client_->alloc_table(ObString::make_string("query_sync_multi_batch_test"), the_table); + int ret = service_client_->alloc_table(ObString::make_string("query_async_multi_batch_test"), the_table); ASSERT_EQ(OB_SUCCESS, ret); ObTableEntityFactory entity_factory; ObTableBatchOperation batch_operation; @@ -9108,7 +9108,7 @@ TEST_F(TestBatchExecute, query_sync_multi_batch) ASSERT_TRUE(batch_operation.is_same_type()); ASSERT_TRUE(batch_operation.is_same_properties_names()); ObTableBatchOperationResult result; - printf("insert data into query_sync_multi_batch_test using batch_execute...\n"); + printf("insert data into query_async_multi_batch_test using batch_execute...\n"); ASSERT_EQ(OB_SUCCESS, the_table->batch_execute(batch_operation, result)); ASSERT_EQ(BATCH_SIZE, result.count()); for (int64_t i = 0; i < BATCH_SIZE; ++i) @@ -9151,7 +9151,7 @@ TEST_F(TestBatchExecute, query_sync_multi_batch) { // two scan order query.set_scan_order(scan_orders[k]); - ObTableQuerySyncResult *iter = nullptr; + ObTableQueryAsyncResult *iter = nullptr; ASSERT_EQ(OB_SUCCESS, the_table->query_start(query, iter)); const ObITableEntity *result_entity = NULL; int result_cnt = 0; @@ -9198,14 +9198,14 @@ TEST_F(TestBatchExecute, query_sync_multi_batch) the_table = NULL; } -// create table if not exists htable1_query_sync ( +// create table if not exists htable1_query_async ( // K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), // primary key(K, Q, T)); -TEST_F(TestBatchExecute, htble_query_sync) +TEST_F(TestBatchExecute, htble_query_async) { // setup ObTable *the_table = NULL; - int ret = service_client_->alloc_table(ObString::make_string("htable1_query_sync"), the_table); + int ret = service_client_->alloc_table(ObString::make_string("htable1_query_async"), the_table); ASSERT_EQ(OB_SUCCESS, ret); the_table->set_entity_type(ObTableEntityType::ET_HKV); // important ObTableEntityFactory entity_factory; @@ -9270,7 +9270,7 @@ TEST_F(TestBatchExecute, htble_query_sync) const int64_t query_round = 4; query.set_batch(50); - ObTableQuerySyncResult *iter = nullptr; + ObTableQueryAsyncResult *iter = nullptr; const ObITableEntity *result_entity = NULL; uint64_t result_cnt = 0; uint64_t round = 0; @@ -9298,11 +9298,11 @@ TEST_F(TestBatchExecute, htble_query_sync) delete [] rows; } -// create table if not exists large_scan_query_sync_test (C1 bigint primary key, C2 bigint, C3 varchar(100)); -TEST_F(TestBatchExecute, large_scan_query_sync) +// create table if not exists large_scan_query_async_test (C1 bigint primary key, C2 bigint, C3 varchar(100)); +TEST_F(TestBatchExecute, large_scan_query_async) { ObTable *the_table = NULL; - int ret = service_client_->alloc_table(ObString::make_string("large_scan_query_sync_test"), the_table); + int ret = service_client_->alloc_table(ObString::make_string("large_scan_query_async_test"), the_table); ASSERT_EQ(OB_SUCCESS, ret); ObTableEntityFactory entity_factory; ObTableOperationResult r; @@ -9360,7 +9360,7 @@ TEST_F(TestBatchExecute, large_scan_query_sync) query.set_scan_order(ObQueryFlag::Forward); query.set_batch(large_batch_size / query_round); - ObTableQuerySyncResult *iter = nullptr; + ObTableQueryAsyncResult *iter = nullptr; uint64_t result_cnt = 0; ASSERT_EQ(OB_SUCCESS, the_table->query_start(query, iter)); @@ -9392,21 +9392,21 @@ TEST_F(TestBatchExecute, large_scan_query_sync) the_table = NULL; } -// create table if not exists query_sync_with_index_test +// create table if not exists query_async_with_index_test // (C1 bigint, C2 bigint, C3 bigint, // primary key(C1, C2), KEY idx_c2 (C2), KEY idx_c3 (C3), KEY idx_c2c3(C2, C3)); -TEST_F(TestBatchExecute, query_sync_with_index) +TEST_F(TestBatchExecute, query_async_with_index) { // setup ObTable *the_table = NULL; - int ret = service_client_->alloc_table(ObString::make_string("query_sync_with_index_test"), the_table); + int ret = service_client_->alloc_table(ObString::make_string("query_async_with_index_test"), the_table); ASSERT_EQ(OB_SUCCESS, ret); ObTableEntityFactory entity_factory; ObTableOperationResult r; ObITableEntity *entity = NULL; const ObITableEntity *result_entity = NULL; ObTableOperation table_operation; - ObTableQuerySyncResult *iter = nullptr; + ObTableQueryAsyncResult *iter = nullptr; entity = entity_factory.alloc(); ASSERT_TRUE(NULL != entity); @@ -9576,9 +9576,9 @@ TEST_F(TestBatchExecute, query_sync_with_index) the_table = NULL; } -// create table if not exists query_sync_multi_task_test +// create table if not exists query_async_multi_task_test // (C1 bigint primary key, C2 bigint, C3 varchar(100)); -TEST_F(TestBatchExecute, query_sync_multi_task) +TEST_F(TestBatchExecute, query_async_multi_task) { // setup constexpr int64_t thread_num = 10; @@ -9594,7 +9594,7 @@ TEST_F(TestBatchExecute, query_sync_multi_task) request_options.set_server_timeout(120*1000*1000); // 120s for (int64_t i = 0; i < thread_num; ++i) { the_table = NULL; - int ret = service_client_->alloc_table(ObString::make_string("query_sync_multi_task_test"), the_table); + int ret = service_client_->alloc_table(ObString::make_string("query_async_multi_task_test"), the_table); OB_LOG(INFO, "alloc_table succeed", K(i)); ASSERT_EQ(OB_SUCCESS, ret); the_table->set_default_request_options(request_options); @@ -9675,7 +9675,7 @@ TEST_F(TestBatchExecute, query_sync_multi_task) auto task = [&](ObTable * one_table) { int ret; - ObTableQuerySyncResult *iter = nullptr; + ObTableQueryAsyncResult *iter = nullptr; uint64_t result_cnt = 0; ObObj one_value; const ObITableEntity *one_result_entity = NULL; @@ -10494,7 +10494,7 @@ TEST_F(TestBatchExecute, table_query_with_filter) ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range)); ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary"))); ASSERT_EQ(OB_SUCCESS, query.set_filter(ObString::make_string("TableCompareFilter(=, 'C3:hello\'quote')"))); - ASSERT_EQ(OB_ERR_PARSER_SYNTAX, the_table->execute_query(query, iter)); + ASSERT_EQ(OB_KV_FILTER_PARSE_ERROR, the_table->execute_query(query, iter)); } // end case 14 { // case 15 filter and diff --git a/src/libtable/test/run_test_table_api.sh b/src/libtable/test/run_test_table_api.sh index 7a50f5995a..77e1d30409 100755 --- a/src/libtable/test/run_test_table_api.sh +++ b/src/libtable/test/run_test_table_api.sh @@ -55,10 +55,10 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists store_generate_col_tes mysql -h $HOST -P $PORT -u $user -e "drop table if exists check_scan_range_test; create table if not exists check_scan_range_test (C1 bigint, C2 varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin, C3 bigint, PRIMARY KEY(C1, C2), KEY idx_c3 (C3));" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists multi_insert_test; create table if not exists multi_insert_test (C1 bigint primary key, C2 bigint, C3 varchar(100)) PARTITION BY KEY(C1) PARTITIONS 16" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists multi_delete_test; create table if not exists multi_delete_test (C1 bigint primary key, C2 bigint, C3 varchar(100)) PARTITION BY KEY(C1) PARTITIONS 16" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_multi_batch_test; create table if not exists query_sync_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists large_scan_query_sync_test; create table if not exists large_scan_query_sync_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_with_index_test; create table if not exists query_sync_with_index_test (C1 bigint, C2 bigint, C3 bigint, primary key(C1, C2), KEY idx_c2 (C2), KEY idx_c3 (C3), KEY idx_c2c3(C2, C3));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_multi_task_test; create table if not exists query_sync_multi_task_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_multi_batch_test; create table if not exists query_async_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists large_scan_query_async_test; create table if not exists large_scan_query_async_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_with_index_test; create table if not exists query_async_with_index_test (C1 bigint, C2 bigint, C3 bigint, primary key(C1, C2), KEY idx_c2 (C2), KEY idx_c3 (C3), KEY idx_c2c3(C2, C3));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_multi_task_test; create table if not exists query_async_multi_task_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_with_filter; create table if not exists query_with_filter (C1 bigint primary key, C2 bigint, C3 varchar(100), C4 double default 0);" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_and_mutate; create table if not exists query_and_mutate (C1 bigint primary key, C2 bigint, C3 varchar(100), C4 double default 0);" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists atomic_batch_ops; create table if not exists atomic_batch_ops (C1 bigint, C2 varchar(128), C3 varbinary(1024) default null, C4 bigint not null default -1, primary key(C1), UNIQUE KEY idx_c2c4 (C2, C4));" $db @@ -80,7 +80,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment; mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment_empty; create table if not exists htable1_cf1_increment_empty like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; create table if not exists htable1_cf1_append like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_async; create table if not exists htable1_query_async (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_check_and_put; create table if not exists htable1_cf1_check_and_put like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_check_and_put_put; create table if not exists htable1_cf1_check_and_put_put like htable1_cf1" $db @@ -118,10 +118,10 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists virtual_generate_col_t mysql -h $HOST -P $PORT -u $user -e "drop table if exists store_generate_col_test; create table if not exists store_generate_col_test (C1 bigint primary key, C2 varchar(10), C3 varchar(10), GEN varchar(30) generated always as (concat(C2,C3)) stored)" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists multi_insert_test; create table if not exists multi_insert_test (C1 bigint primary key, C2 bigint, C3 varchar(100), index i1(c2) local, index i2(c3) local, index i3(c2, c3)) PARTITION BY KEY(C1) PARTITIONS 16" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists multi_delete_test; create table if not exists multi_delete_test (C1 bigint primary key, C2 bigint, C3 varchar(100), index i1(c2) local, index i2(c3) local, index i3(c2, c3)) PARTITION BY KEY(C1) PARTITIONS 16" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_multi_batch_test; create table if not exists query_sync_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists large_scan_query_sync_test; create table if not exists large_scan_query_sync_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_with_index_test; create table if not exists query_sync_with_index_test (C1 bigint, C2 bigint, C3 bigint, primary key(C1, C2), KEY idx_c2 (C2), KEY idx_c3 (C3), KEY idx_c2c3(C2, C3));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_sync_multi_task_test; create table if not exists query_sync_multi_task_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_multi_batch_test; create table if not exists query_async_multi_batch_test (PK1 bigint, PK2 bigint, C1 bigint, C2 varchar(100), C3 bigint, PRIMARY KEY(PK1, PK2), INDEX idx1(C1, C2));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists large_scan_query_async_test; create table if not exists large_scan_query_async_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_with_index_test; create table if not exists query_async_with_index_test (C1 bigint, C2 bigint, C3 bigint, primary key(C1, C2), KEY idx_c2 (C2), KEY idx_c3 (C3), KEY idx_c2c3(C2, C3));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_async_multi_task_test; create table if not exists query_async_multi_task_test (C1 bigint primary key, C2 bigint, C3 varchar(100));" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_with_filter; create table if not exists query_with_filter (C1 bigint primary key, C2 bigint, C3 varchar(100), C4 double default 0);" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists query_and_mutate; create table if not exists query_and_mutate (C1 bigint primary key, C2 bigint, C3 varchar(100), C4 double default 0);" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists atomic_batch_ops; create table if not exists atomic_batch_ops (C1 bigint, C2 varchar(128), C3 varbinary(1024) default null, C4 bigint not null default -1, primary key(C1), UNIQUE KEY idx_c2c4 (C2, C4));" $db @@ -142,7 +142,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; cr mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_reverse; create table if not exists htable1_cf1_reverse like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_ttl; create table if not exists htable1_cf1_ttl (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T)) kv_attributes='{\"Hbase\": {\"TimeToLive\": 5}}'" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db -mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_async; create table if not exists htable1_query_async (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_check_and_put; create table if not exists htable1_cf1_check_and_put like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_check_and_put_put; create table if not exists htable1_cf1_check_and_put_put like htable1_cf1" $db diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 8bf13a12c8..d0d26454ce 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -141,7 +141,7 @@ ob_set_subtarget(ob_server table table/ob_table_query_and_mutate_processor.cpp table/ob_table_query_processor.cpp table/ob_table_rpc_processor.cpp - table/ob_table_query_sync_processor.cpp + table/ob_table_query_async_processor.cpp table/ob_table_service.cpp table/ob_htable_lock_mgr.cpp table/ob_table_context.cpp diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 4c945a6006..83e584ade0 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -45,7 +45,7 @@ #include "observer/table/ob_table_batch_execute_processor.h" #include "observer/table/ob_table_query_processor.h" #include "observer/table/ob_table_query_and_mutate_processor.h" -#include "observer/table/ob_table_query_sync_processor.h" +#include "observer/table/ob_table_query_async_processor.h" #include "observer/table/ob_table_direct_load_processor.h" #include "storage/ob_storage_rpc.h" @@ -205,7 +205,7 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObTableBatchExecuteP, gctx_); RPC_PROCESSOR(ObTableQueryP, gctx_); RPC_PROCESSOR(ObTableQueryAndMutateP, gctx_); - RPC_PROCESSOR(ObTableQuerySyncP, gctx_); + RPC_PROCESSOR(ObTableQueryAsyncP, gctx_); RPC_PROCESSOR(ObTableDirectLoadP, gctx_); RPC_PROCESSOR(ObTenantTTLP, gctx_); diff --git a/src/observer/table/ob_htable_filter_parser.cpp b/src/observer/table/ob_htable_filter_parser.cpp index f1325a0dd9..3aac7cbc57 100644 --- a/src/observer/table/ob_htable_filter_parser.cpp +++ b/src/observer/table/ob_htable_filter_parser.cpp @@ -88,7 +88,8 @@ int ObHTableFilterParser::parse_filter(const ObString &filter_string, hfilter::F filter = result_filter_; } else { if (1 == ret) { // syntax error - ret = OB_ERR_PARSER_SYNTAX; + ret = OB_KV_FILTER_PARSE_ERROR; + LOG_USER_ERROR(OB_KV_FILTER_PARSE_ERROR, filter_string.length(), filter_string.ptr()); } LOG_WARN("failed to parse filter", K(ret), K_(error_msg), K(filter_string), K_(error_code)); } diff --git a/src/observer/table/ob_table_filter.h b/src/observer/table/ob_table_filter.h index f64f374c5f..9123754bd6 100644 --- a/src/observer/table/ob_table_filter.h +++ b/src/observer/table/ob_table_filter.h @@ -131,7 +131,7 @@ class ObTableQueryResultIterator public: ObTableQueryResultIterator(const ObTableQuery *query = nullptr) : query_(query), - is_query_sync_(false) + is_query_async_(false) { } virtual ~ObTableQueryResultIterator() {} @@ -141,10 +141,10 @@ public: virtual void set_scan_result(table::ObTableApiScanRowIterator *scan_result) = 0; virtual ObTableQueryResult *get_one_result() { return nullptr; } virtual void set_query(const ObTableQuery *query) { query_ = query; }; - virtual void set_query_sync() { is_query_sync_ = true; } + virtual void set_query_async() { is_query_async_ = true; } protected: const ObTableQuery *query_; - bool is_query_sync_; + bool is_query_async_; private: DISALLOW_COPY_AND_ASSIGN(ObTableQueryResultIterator); }; diff --git a/src/observer/table/ob_table_query_and_mutate_processor.cpp b/src/observer/table/ob_table_query_and_mutate_processor.cpp index fb97f9f281..56958fce63 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.cpp +++ b/src/observer/table/ob_table_query_and_mutate_processor.cpp @@ -905,8 +905,8 @@ int ObTableQueryAndMutateP::execute_table_mutation(ObTableQueryResultIterator *r } else if (OB_FAIL(execute_one_mutation(*one_result, rowkey_column_names, affected_rows))) { LOG_WARN("fail to execute one mutation", K(ret), K(rowkey_column_names)); } else if (arg_.query_and_mutate_.return_affected_entity()) { - if (OB_FAIL(result_.affected_entity_.get_property_count() <= 0 - && result_.affected_entity_.add_all_property(*one_result))) { + if (result_.affected_entity_.get_property_count() <= 0 + && OB_FAIL(result_.affected_entity_.add_all_property(*one_result))) { LOG_WARN("fail to add property", K(ret)); } else if (OB_FAIL(result_.affected_entity_.add_all_row(*one_result))) { LOG_WARN("fail to add all rows", K(ret)); diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_async_processor.cpp similarity index 82% rename from src/observer/table/ob_table_query_sync_processor.cpp rename to src/observer/table/ob_table_query_async_processor.cpp index b2252352f2..7ddbd7d473 100644 --- a/src/observer/table/ob_table_query_sync_processor.cpp +++ b/src/observer/table/ob_table_query_async_processor.cpp @@ -11,7 +11,7 @@ */ #define USING_LOG_PREFIX SERVER -#include "ob_table_query_sync_processor.h" +#include "ob_table_query_async_processor.h" #include "ob_table_rpc_processor_util.h" #include "observer/ob_service.h" #include "ob_table_end_trans_cb.h" @@ -32,18 +32,18 @@ using namespace oceanbase::share; using namespace oceanbase::sql; /** - * ---------------------------------------- ObTableQuerySyncSession ---------------------------------------- + * ---------------------------------------- ObTableQueryAsyncSession ---------------------------------------- */ -void ObTableQuerySyncSession::set_result_iterator(ObTableQueryResultIterator *query_result) +void ObTableQueryAsyncSession::set_result_iterator(ObTableQueryResultIterator *query_result) { result_iterator_ = query_result; if (OB_NOT_NULL(result_iterator_)) { result_iterator_->set_query(&query_); - result_iterator_->set_query_sync(); + result_iterator_->set_query_async(); } } -int ObTableQuerySyncSession::init() +int ObTableQueryAsyncSession::init() { int ret = OB_SUCCESS; lib::MemoryContext mem_context = nullptr; @@ -65,7 +65,7 @@ int ObTableQuerySyncSession::init() return ret; } -int ObTableQuerySyncSession::deep_copy_select_columns(const common::ObIArray &query_cols_names_, +int ObTableQueryAsyncSession::deep_copy_select_columns(const common::ObIArray &query_cols_names_, const common::ObIArray &tb_ctx_cols_names_) { int ret = OB_SUCCESS; @@ -84,37 +84,37 @@ int ObTableQuerySyncSession::deep_copy_select_columns(const common::ObIArrayinit()) { - LOG_WARN_RET(OB_ERROR, "failed to init ObQuerySyncMgr instance"); - OB_DELETE(ObQuerySyncMgr, ObModIds::TABLE_PROC, instance); + LOG_WARN_RET(OB_ERROR, "failed to init ObQueryAsyncMgr instance"); + OB_DELETE(ObQueryAsyncMgr, ObModIds::TABLE_PROC, instance); instance = NULL; ATOMIC_BCAS(&once_, 1, 0); } else { @@ -126,10 +126,10 @@ ObQuerySyncMgr &ObQuerySyncMgr::get_instance() } } } - return *(ObQuerySyncMgr *)instance_; + return *(ObQueryAsyncMgr *)instance_; } -int ObQuerySyncMgr::init() +int ObQueryAsyncMgr::init() { int ret = OB_SUCCESS; for (int64_t i = 0; i < DEFAULT_LOCK_ARR_SIZE; ++i) { @@ -148,12 +148,12 @@ int ObQuerySyncMgr::init() return ret; } -uint64_t ObQuerySyncMgr::generate_query_sessid() +uint64_t ObQueryAsyncMgr::generate_query_sessid() { return ATOMIC_AAF(&session_id_, 1); } -int ObQuerySyncMgr::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session) +int ObQueryAsyncMgr::get_query_session(uint64_t sessid, ObTableQueryAsyncSession *&query_session) { int ret = OB_SUCCESS; get_locker(sessid).lock(); @@ -175,7 +175,7 @@ int ObQuerySyncMgr::get_query_session(uint64_t sessid, ObTableQuerySyncSession * return ret; } -int ObQuerySyncMgr::set_query_session(uint64_t sessid, ObTableQuerySyncSession *query_session) +int ObQueryAsyncMgr::set_query_session(uint64_t sessid, ObTableQueryAsyncSession *query_session) { int ret = OB_SUCCESS; bool force = false; @@ -185,7 +185,7 @@ int ObQuerySyncMgr::set_query_session(uint64_t sessid, ObTableQuerySyncSession * return ret; } -void ObQuerySyncMgr::clean_timeout_query_session() +void ObQueryAsyncMgr::clean_timeout_query_session() { int ret = OB_SUCCESS; common::ObSEArray session_id_array; @@ -195,7 +195,7 @@ void ObQuerySyncMgr::clean_timeout_query_session() } else { for (int64_t i = 0; i < session_id_array.count(); i++) { uint64_t sess_id = session_id_array.at(i); - ObTableQuerySyncSession *query_session = nullptr; + ObTableQueryAsyncSession *query_session = nullptr; get_locker(sess_id).lock(); if (OB_FAIL(query_session_map_.get_refactored(sess_id, query_session))) { LOG_DEBUG("query session already deleted by worker", K(ret), K(sess_id)); @@ -213,7 +213,7 @@ void ObQuerySyncMgr::clean_timeout_query_session() LOG_WARN("failed to rollback trans for query session", K(ret), K(sess_id)); } (void)query_session_map_.erase_refactored(sess_id); - OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + OB_DELETE(ObTableQueryAsyncSession, ObModIds::TABLE_PROC, query_session); // connection loses or bug exists LOG_WARN("clean timeout query session success", K(ret), K(sess_id)); } else { @@ -225,7 +225,7 @@ void ObQuerySyncMgr::clean_timeout_query_session() } } -int ObQuerySyncMgr::rollback_trans(ObTableQuerySyncSession &query_session) +int ObQueryAsyncMgr::rollback_trans(ObTableQueryAsyncSession &query_session) { int ret = OB_SUCCESS; sql::TransState &trans_state = query_session.trans_state_; @@ -247,32 +247,32 @@ int ObQuerySyncMgr::rollback_trans(ObTableQuerySyncSession &query_session) NG_TRACE(T_end_trans_end); } } - LOG_DEBUG("ObQuerySyncMgr::rollback_trans", KR(ret)); + LOG_DEBUG("ObQueryAsyncMgr::rollback_trans", KR(ret)); query_session.trans_desc_ = NULL; trans_state.reset(); return ret; } -ObQuerySyncMgr::ObQueryHashMap *ObQuerySyncMgr::get_query_session_map() +ObQueryAsyncMgr::ObQueryHashMap *ObQueryAsyncMgr::get_query_session_map() { return &query_session_map_; } -ObTableQuerySyncSession *ObQuerySyncMgr::alloc_query_session() +ObTableQueryAsyncSession *ObQueryAsyncMgr::alloc_query_session() { int ret = OB_SUCCESS; - ObTableQuerySyncSession *query_session = OB_NEW(ObTableQuerySyncSession, ObModIds::TABLE_PROC); + ObTableQueryAsyncSession *query_session = OB_NEW(ObTableQueryAsyncSession, ObModIds::TABLE_PROC); if (OB_ISNULL(query_session)) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate ObTableQuerySyncSession", K(ret)); + LOG_WARN("failed to allocate ObTableQueryAsyncSession", K(ret)); } else if (OB_FAIL(query_session->init())) { LOG_WARN("failed to init query session", K(ret)); - OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + OB_DELETE(ObTableQueryAsyncSession, ObModIds::TABLE_PROC, query_session); } return query_session; } -int ObQuerySyncMgr::ObGetAllSessionIdOp::operator()(QuerySessionPair &entry) { +int ObQueryAsyncMgr::ObGetAllSessionIdOp::operator()(QuerySessionPair &entry) { int ret = OB_SUCCESS; if (OB_FAIL(session_id_array_.push_back(entry.first))) { LOG_WARN("fail to push back query session id", K(ret)); @@ -281,9 +281,9 @@ int ObQuerySyncMgr::ObGetAllSessionIdOp::operator()(QuerySessionPair &entry) { } /** - * -------------------------------------- ObTableQuerySyncP ---------------------------------------- + * -------------------------------------- ObTableQueryAsyncP ---------------------------------------- */ -ObTableQuerySyncP::ObTableQuerySyncP(const ObGlobalContext &gctx) +ObTableQueryAsyncP::ObTableQueryAsyncP(const ObGlobalContext &gctx) : ObTableRpcProcessor(gctx), result_row_count_(0), query_session_id_(0), @@ -292,13 +292,13 @@ ObTableQuerySyncP::ObTableQuerySyncP(const ObGlobalContext &gctx) is_full_table_scan_(false) {} -int ObTableQuerySyncP::deserialize() +int ObTableQueryAsyncP::deserialize() { arg_.query_.set_deserialize_allocator(&allocator_); return ParentType::deserialize(); } -int ObTableQuerySyncP::check_arg() +int ObTableQueryAsyncP::check_arg() { int ret = OB_SUCCESS; if (arg_.query_type_ == ObQueryOperationType::QUERY_START && !arg_.query_.is_valid() && @@ -314,7 +314,7 @@ int ObTableQuerySyncP::check_arg() return ret; } -void ObTableQuerySyncP::audit_on_finish() +void ObTableQueryAsyncP::audit_on_finish() { audit_record_.consistency_level_ = ObTableConsistencyLevel::STRONG == arg_.consistency_level_ ? ObConsistencyLevel::STRONG @@ -325,7 +325,7 @@ void ObTableQuerySyncP::audit_on_finish() audit_record_.try_cnt_ = retry_count_ + 1; } -uint64_t ObTableQuerySyncP::get_request_checksum() +uint64_t ObTableQueryAsyncP::get_request_checksum() { uint64_t checksum = 0; checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length()); @@ -335,50 +335,50 @@ uint64_t ObTableQuerySyncP::get_request_checksum() return checksum; } -void ObTableQuerySyncP::reset_ctx() +void ObTableQueryAsyncP::reset_ctx() { result_row_count_ = 0; query_session_ = nullptr; ObTableApiProcessorBase::reset_ctx(); } -ObTableAPITransCb *ObTableQuerySyncP::new_callback(rpc::ObRequest *req) +ObTableAPITransCb *ObTableQueryAsyncP::new_callback(rpc::ObRequest *req) { UNUSED(req); return nullptr; } -int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid) +int ObTableQueryAsyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid) { int ret = OB_SUCCESS; real_sessid = arg_sessid; if (ObQueryOperationType::QUERY_START == arg_.query_type_) { - real_sessid = ObQuerySyncMgr::get_instance().generate_query_sessid(); + real_sessid = ObQueryAsyncMgr::get_instance().generate_query_sessid(); } - if (OB_UNLIKELY(real_sessid == ObQuerySyncMgr::INVALID_SESSION_ID)) { + if (OB_UNLIKELY(real_sessid == ObQueryAsyncMgr::INVALID_SESSION_ID)) { ret = OB_ERR_UNKNOWN_SESSION_ID; LOG_WARN("session id is invalid", K(ret), K(real_sessid), K(arg_.query_type_)); } return ret; } -int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session) +int ObTableQueryAsyncP::get_query_session(uint64_t sessid, ObTableQueryAsyncSession *&query_session) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(sessid == ObQuerySyncMgr::INVALID_SESSION_ID)) { + if (OB_UNLIKELY(sessid == ObQueryAsyncMgr::INVALID_SESSION_ID)) { ret = OB_ERR_UNKNOWN_SESSION_ID; LOG_WARN("fail to get query session, session id is invalid", K(ret), K(sessid)); } else if (ObQueryOperationType::QUERY_START == arg_.query_type_) { // query start - query_session = ObQuerySyncMgr::get_instance().alloc_query_session(); + query_session = ObQueryAsyncMgr::get_instance().alloc_query_session(); if (OB_ISNULL(query_session)) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to allocate ObTableQuerySyncSession", K(ret), K(sessid)); - } else if (OB_FAIL(ObQuerySyncMgr::get_instance().set_query_session(sessid, query_session))) { + LOG_WARN("fail to allocate ObTableQueryAsyncSession", K(ret), K(sessid)); + } else if (OB_FAIL(ObQueryAsyncMgr::get_instance().set_query_session(sessid, query_session))) { LOG_WARN("fail to insert session to query map", K(ret), K(sessid)); - OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + OB_DELETE(ObTableQueryAsyncSession, ObModIds::TABLE_PROC, query_session); } else {} } else if (ObQueryOperationType::QUERY_NEXT == arg_.query_type_) { - if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session(sessid, query_session))) { + if (OB_FAIL(ObQueryAsyncMgr::get_instance().get_query_session(sessid, query_session))) { LOG_WARN("fail to get query session from query sync mgr", K(ret), K(sessid)); } else if (OB_ISNULL(query_session)) { ret = OB_ERR_UNEXPECTED; @@ -400,11 +400,11 @@ int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSessio return ret; } -int ObTableQuerySyncP::init_tb_ctx(ObTableCtx &ctx) +int ObTableQueryAsyncP::init_tb_ctx(ObTableCtx &ctx) { int ret = OB_SUCCESS; ObIAllocator &allocator = *query_session_->get_allocator(); - ObTableQuerySyncCtx &query_ctx = query_session_->get_query_ctx(); + ObTableQueryAsyncCtx &query_ctx = query_session_->get_query_ctx(); ObExprFrameInfo &expr_frame_info = query_ctx.expr_frame_info_; bool is_weak_read = arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL; ctx.set_scan(true); @@ -440,11 +440,11 @@ int ObTableQuerySyncP::init_tb_ctx(ObTableCtx &ctx) return ret; } -int ObTableQuerySyncP::execute_query() +int ObTableQueryAsyncP::execute_query() { int ret = OB_SUCCESS; ObIAllocator *allocator = query_session_->get_allocator(); - ObTableQuerySyncCtx &query_ctx = query_session_->get_query_ctx(); + ObTableQueryAsyncCtx &query_ctx = query_session_->get_query_ctx(); ObTableQuery &query = query_session_->get_query(); ObTableApiSpec *spec = nullptr; ObTableApiExecutor *executor = nullptr; @@ -517,11 +517,11 @@ int ObTableQuerySyncP::execute_query() return ret; } -int ObTableQuerySyncP::query_scan_with_init() +int ObTableQueryAsyncP::query_scan_with_init() { int ret = OB_SUCCESS; ObArenaAllocator *allocator = query_session_->get_allocator(); - ObTableQuerySyncCtx &query_ctx = query_session_->get_query_ctx(); + ObTableQueryAsyncCtx &query_ctx = query_session_->get_query_ctx(); ObTableCtx &tb_ctx = query_ctx.tb_ctx_; ObTableQuery &query = query_session_->get_query(); @@ -551,11 +551,11 @@ int ObTableQuerySyncP::query_scan_with_init() return ret; } -int ObTableQuerySyncP::query_scan_without_init() +int ObTableQueryAsyncP::query_scan_without_init() { int ret = OB_SUCCESS; ObTableQueryResultIterator *result_iter = query_session_->get_result_iterator(); - ObTableQuerySyncCtx &query_ctx = query_session_->get_query_ctx(); + ObTableQueryAsyncCtx &query_ctx = query_session_->get_query_ctx(); ObTableCtx &tb_ctx = query_ctx.tb_ctx_; if (OB_ISNULL(result_iter)) { @@ -587,7 +587,7 @@ int ObTableQuerySyncP::query_scan_without_init() return ret; } -int ObTableQuerySyncP::process_query_start() +int ObTableQueryAsyncP::process_query_start() { int ret = OB_SUCCESS; if (OB_FAIL(query_scan_with_init())) { @@ -598,7 +598,7 @@ int ObTableQuerySyncP::process_query_start() return ret; } -int ObTableQuerySyncP::process_query_next() +int ObTableQueryAsyncP::process_query_next() { int ret = OB_SUCCESS; if (OB_FAIL(query_scan_without_init())) { @@ -609,7 +609,7 @@ int ObTableQuerySyncP::process_query_next() return ret; } -int ObTableQuerySyncP::try_process() +int ObTableQueryAsyncP::try_process() { int ret = OB_SUCCESS; if (OB_FAIL(check_query_type())) { @@ -655,12 +655,12 @@ int ObTableQuerySyncP::try_process() "receive_ts", get_receive_timestamp(), K_(result_row_count)); #endif - stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY_SYNC; // table querysync + stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY_ASYNC; // table querysync return ret; } // session.in_use_ must be true -int ObTableQuerySyncP::destory_query_session(bool need_rollback_trans) +int ObTableQueryAsyncP::destory_query_session(bool need_rollback_trans) { int ret = OB_SUCCESS; if (OB_FAIL(end_trans(need_rollback_trans, req_, timeout_ts_))) { @@ -668,25 +668,25 @@ int ObTableQuerySyncP::destory_query_session(bool need_rollback_trans) } int tmp_ret = ret; - ObQuerySyncMgr::get_instance().get_locker(query_session_id_).lock(); + ObQueryAsyncMgr::get_instance().get_locker(query_session_id_).lock(); if (OB_ISNULL(query_session_)) { //overwrite ret ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected null value", K(ret), KP_(query_session)); - } else if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session_map()->erase_refactored(query_session_id_))) { + } else if (OB_FAIL(ObQueryAsyncMgr::get_instance().get_query_session_map()->erase_refactored(query_session_id_))) { LOG_WARN("fail to erase query session from query sync mgr", K(ret)); } else { ObTableQueryUtils::destroy_result_iterator(query_session_->get_result_iter()); - OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session_); + OB_DELETE(ObTableQueryAsyncSession, ObModIds::TABLE_PROC, query_session_); LOG_DEBUG("destory query session success", K(ret), K(query_session_id_)); } - ObQuerySyncMgr::get_instance().get_locker(query_session_id_).unlock(); + ObQueryAsyncMgr::get_instance().get_locker(query_session_id_).unlock(); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; return ret; } -int ObTableQuerySyncP::check_query_type() +int ObTableQueryAsyncP::check_query_type() { int ret = OB_SUCCESS; if (arg_.query_type_ != table::ObQueryOperationType::QUERY_START && diff --git a/src/observer/table/ob_table_query_sync_processor.h b/src/observer/table/ob_table_query_async_processor.h similarity index 73% rename from src/observer/table/ob_table_query_sync_processor.h rename to src/observer/table/ob_table_query_async_processor.h index 99a85a5e98..b416cce31c 100644 --- a/src/observer/table/ob_table_query_sync_processor.h +++ b/src/observer/table/ob_table_query_async_processor.h @@ -10,8 +10,8 @@ * See the Mulan PubL v2 for more details. */ -#ifndef _OB_TABLE_QUERY_SYNC_PROCESSOR_H -#define _OB_TABLE_QUERY_SYNC_PROCESSOR_H 1 +#ifndef _OB_TABLE_QUERY_ASYNC_PROCESSOR_H +#define _OB_TABLE_QUERY_ASYNC_PROCESSOR_H 1 #include "rpc/obrpc/ob_rpc_proxy.h" #include "rpc/obrpc/ob_rpc_processor.h" #include "share/table/ob_table_rpc_proxy.h" @@ -30,17 +30,17 @@ namespace observer { /** - * ---------------------------------------- ObTableQuerySyncCtx ---------------------------------------- + * ---------------------------------------- ObTableQueryAsyncCtx ---------------------------------------- */ -struct ObTableQuerySyncCtx +struct ObTableQueryAsyncCtx { - explicit ObTableQuerySyncCtx(common::ObIAllocator &allocator) + explicit ObTableQueryAsyncCtx(common::ObIAllocator &allocator) : tb_ctx_(allocator), expr_frame_info_(allocator), spec_(nullptr), executor_(nullptr) {} - virtual ~ObTableQuerySyncCtx() + virtual ~ObTableQueryAsyncCtx() { row_iter_.close(); if (OB_NOT_NULL(spec_) && OB_NOT_NULL(executor_)) { @@ -73,14 +73,14 @@ private: }; /** - * ---------------------------------------- ObTableQuerySyncSession ---------------------------------------- + * ---------------------------------------- ObTableQueryAsyncSession ---------------------------------------- */ -class ObTableQuerySyncSession final +class ObTableQueryAsyncSession final { - friend class ObQuerySyncMgr; + friend class ObQueryAsyncMgr; public: - explicit ObTableQuerySyncSession() + explicit ObTableQueryAsyncSession() : in_use_(true), timeout_ts_(10000000), iterator_mementity_(nullptr), @@ -92,7 +92,7 @@ public: result_iterator_(nullptr), query_ctx_(allocator_) {} - ~ObTableQuerySyncSession() {} + ~ObTableQueryAsyncSession() {} void set_result_iterator(table::ObTableQueryResultIterator* iter); table::ObTableQueryResultIterator *get_result_iter() { return result_iterator_; }; @@ -110,7 +110,7 @@ public: int deep_copy_select_columns(const common::ObIArray &query_cols_names_, const common::ObIArray &tb_ctx_cols_names_); - ObTableQuerySyncCtx &get_query_ctx() { return query_ctx_; } + ObTableQueryAsyncCtx &get_query_ctx() { return query_ctx_; } public: sql::TransState* get_trans_state() {return &trans_state_;} transaction::ObTxDesc* get_trans_desc() {return trans_desc_;} @@ -125,7 +125,7 @@ private: table::ObTableQuery query_; // deep copy from arg_.query_ ObSEArray select_columns_; // deep copy from tb_ctx or query, which includes all the actual col names the user acquired table::ObTableQueryResultIterator *result_iterator_; - ObTableQuerySyncCtx query_ctx_; + ObTableQueryAsyncCtx query_ctx_; private: // txn control @@ -134,33 +134,33 @@ private: }; /** - * ------------------------------------ ObQuerySyncSessionRecycle ------------------------------------ + * ------------------------------------ ObQueryAsyncSessionRecycle ------------------------------------ */ -class ObQuerySyncSessionRecycle : public common::ObTimerTask +class ObQueryAsyncSessionRecycle : public common::ObTimerTask { public: - ObQuerySyncSessionRecycle(){} - virtual ~ObQuerySyncSessionRecycle(){} + ObQueryAsyncSessionRecycle(){} + virtual ~ObQueryAsyncSessionRecycle(){} virtual void runTimerTask(); private: void query_session_recycle(); - DISALLOW_COPY_AND_ASSIGN(ObQuerySyncSessionRecycle); + DISALLOW_COPY_AND_ASSIGN(ObQueryAsyncSessionRecycle); }; /** - * -----------------------------------Singleton ObQuerySyncMgr ------------------------------------- + * -----------------------------------Singleton ObQueryAsyncMgr ------------------------------------- */ -class ObQuerySyncMgr final +class ObQueryAsyncMgr final { - friend class ObTableQuerySyncP; + friend class ObTableQueryAsyncP; public: using ObQueryHashMap = - common::hash::ObHashMap; - using QuerySessionPair = common::hash::HashMapPair; - ~ObQuerySyncMgr() {} - static ObQuerySyncMgr &get_instance(); + common::hash::ObHashMap; + using QuerySessionPair = common::hash::HashMapPair; + ~ObQueryAsyncMgr() {} + static ObQueryAsyncMgr &get_instance(); struct ObGetAllSessionIdOp { explicit ObGetAllSessionIdOp(common::ObIArray& session_id_array) : session_id_array_(session_id_array) @@ -170,21 +170,21 @@ public: }; public: - int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_sess_ctx); - int set_query_session(uint64_t sessid, ObTableQuerySyncSession *query_sess_ctx); + int get_query_session(uint64_t sessid, ObTableQueryAsyncSession *&query_sess_ctx); + int set_query_session(uint64_t sessid, ObTableQueryAsyncSession *query_sess_ctx); void clean_timeout_query_session(); public: ObQueryHashMap *get_query_session_map(); - ObTableQuerySyncSession *alloc_query_session(); + ObTableQueryAsyncSession *alloc_query_session(); uint64_t generate_query_sessid(); lib::ObMutex& get_locker(uint64_t sessid) { return locker_arr_[sessid % DEFAULT_LOCK_ARR_SIZE];} private: int init(); - int rollback_trans(ObTableQuerySyncSession &query_session); - ObQuerySyncMgr(); - DISALLOW_COPY_AND_ASSIGN(ObQuerySyncMgr); + int rollback_trans(ObTableQueryAsyncSession &query_session); + ObQueryAsyncMgr(); + DISALLOW_COPY_AND_ASSIGN(ObQueryAsyncMgr); private: static const uint64_t INVALID_SESSION_ID = 0; @@ -194,25 +194,25 @@ private: private: static int64_t once_; // for creating singleton instance - static ObQuerySyncMgr *instance_; + static ObQueryAsyncMgr *instance_; int64_t session_id_; ObQueryHashMap query_session_map_; lib::ObMutex locker_arr_[DEFAULT_LOCK_ARR_SIZE]; - ObQuerySyncSessionRecycle query_session_recycle_; + ObQueryAsyncSessionRecycle query_session_recycle_; common::ObTimer timer_; }; /** - * -------------------------------------- ObTableQuerySyncP ---------------------------------------- + * -------------------------------------- ObTableQueryAsyncP ---------------------------------------- */ -class ObTableQuerySyncP : +class ObTableQueryAsyncP : public ObTableRpcProcessor > { typedef ObTableRpcProcessor> ParentType; public: - explicit ObTableQuerySyncP(const ObGlobalContext &gctx); - virtual ~ObTableQuerySyncP() {} + explicit ObTableQueryAsyncP(const ObGlobalContext &gctx); + virtual ~ObTableQueryAsyncP() {} virtual int deserialize() override; protected: virtual int check_arg() override; @@ -227,11 +227,11 @@ private: int process_query_start(); int process_query_next(); int destory_query_session(bool need_rollback_trans); - DISALLOW_COPY_AND_ASSIGN(ObTableQuerySyncP); + DISALLOW_COPY_AND_ASSIGN(ObTableQueryAsyncP); private: int get_session_id(uint64_t &real_sessid, uint64_t arg_sessid); - int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session); + int get_query_session(uint64_t sessid, ObTableQueryAsyncSession *&query_session); int query_scan_with_init(); int query_scan_without_init(); @@ -244,7 +244,7 @@ private: int64_t result_row_count_; uint64_t query_session_id_; ObArenaAllocator allocator_; - ObTableQuerySyncSession *query_session_; + ObTableQueryAsyncSession *query_session_; int64_t timeout_ts_; bool is_full_table_scan_; }; @@ -252,4 +252,4 @@ private: } // end namespace observer } // end namespace oceanbase -#endif /* _OB_TABLE_QUERY_SYNC_PROCESSOR_H */ +#endif /* _OB_TABLE_QUERY_ASYNC_PROCESSOR_H */ diff --git a/src/observer/table/ob_table_rpc_processor_util.h b/src/observer/table/ob_table_rpc_processor_util.h index 5fced5b298..76b1e4a0fa 100644 --- a/src/observer/table/ob_table_rpc_processor_util.h +++ b/src/observer/table/ob_table_rpc_processor_util.h @@ -34,6 +34,7 @@ enum ObTableProccessType TABLE_API_SINGLE_REPLACE, TABLE_API_SINGLE_INCREMENT, TABLE_API_SINGLE_APPEND, + TABLE_API_SINGLE_PUT, // table batch mutate TABLE_API_MULTI_INSERT, @@ -46,6 +47,7 @@ enum ObTableProccessType TABLE_API_MULTI_APPEND, TABLE_API_BATCH_RETRIVE, TABLE_API_BATCH_HYBRID, + TABLE_API_MULTI_PUT, // hbase mutate TABLE_API_HBASE_DELETE, @@ -59,7 +61,7 @@ enum ObTableProccessType // query TABLE_API_TABLE_QUERY, TABLE_API_HBASE_QUERY, - TABLE_API_TABLE_QUERY_SYNC, + TABLE_API_TABLE_QUERY_ASYNC, // query_and_mutate TABLE_API_QUERY_AND_MUTATE, @@ -269,11 +271,11 @@ public: break; // table query sync - case ObTableProccessType::TABLE_API_TABLE_QUERY_SYNC: + case ObTableProccessType::TABLE_API_TABLE_QUERY_ASYNC: EVENT_INC(TABLEAPI_QUERY_COUNT); EVENT_ADD(TABLEAPI_QUERY_TIME, elapsed_us); EVENT_ADD(TABLEAPI_QUERY_ROW, rows); - SET_AUDIT_SQL_STRING(table_query_sync); + SET_AUDIT_SQL_STRING(table_query_async); break; // table query_and_mutate diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index 189bcab719..4fb56c3d18 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -1701,7 +1701,7 @@ OB_SERIALIZE_MEMBER(ObTableQueryAndMutateResult, affected_rows_, affected_entity_); -OB_SERIALIZE_MEMBER((ObTableQuerySyncResult, ObTableQueryResult), +OB_SERIALIZE_MEMBER((ObTableQueryAsyncResult, ObTableQueryResult), is_end_, query_session_id_ ); diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index b7d185e57b..3c3804d48a 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -949,15 +949,15 @@ public: ObTableQueryResult affected_entity_; }; -class ObTableQuerySyncResult: public ObTableQueryResult +class ObTableQueryAsyncResult: public ObTableQueryResult { OB_UNIS_VERSION(1); public: - ObTableQuerySyncResult() + ObTableQueryAsyncResult() : is_end_(false), query_session_id_(0) {} - virtual ~ObTableQuerySyncResult() {} + virtual ~ObTableQueryAsyncResult() {} public: INHERIT_TO_STRING_KV("ObTableQueryResult", ObTableQueryResult, K_(is_end), K_(query_session_id)); public: diff --git a/src/share/table/ob_table_rpc_proxy.h b/src/share/table/ob_table_rpc_proxy.h index 294724ea1f..e69bfed33f 100644 --- a/src/share/table/ob_table_rpc_proxy.h +++ b/src/share/table/ob_table_rpc_proxy.h @@ -30,7 +30,7 @@ public: RPC_S(PR5 batch_execute, obrpc::OB_TABLE_API_BATCH_EXECUTE, (table::ObTableBatchOperationRequest), table::ObTableBatchOperationResult); RPC_SS(PR5 execute_query, obrpc::OB_TABLE_API_EXECUTE_QUERY, (table::ObTableQueryRequest), table::ObTableQueryResult); RPC_S(PR5 query_and_mutate, obrpc::OB_TABLE_API_QUERY_AND_MUTATE, (table::ObTableQueryAndMutateRequest), table::ObTableQueryAndMutateResult); - RPC_S(PR5 execute_query_sync, obrpc::OB_TABLE_API_EXECUTE_QUERY_ASYNC, (table::ObTableQuerySyncRequest), table::ObTableQuerySyncResult); + RPC_S(PR5 execute_query_async, obrpc::OB_TABLE_API_EXECUTE_QUERY_ASYNC, (table::ObTableQueryAsyncRequest), table::ObTableQueryAsyncResult); RPC_S(PR5 direct_load, obrpc::OB_TABLE_API_DIRECT_LOAD, (table::ObTableDirectLoadRequest), table::ObTableDirectLoadResult); }; diff --git a/src/share/table/ob_table_rpc_struct.cpp b/src/share/table/ob_table_rpc_struct.cpp index 270158d221..72f921b1e3 100644 --- a/src/share/table/ob_table_rpc_struct.cpp +++ b/src/share/table/ob_table_rpc_struct.cpp @@ -87,7 +87,7 @@ OB_SERIALIZE_MEMBER(ObTableQueryAndMutateRequest, query_and_mutate_, binlog_row_image_type_); -OB_SERIALIZE_MEMBER((ObTableQuerySyncRequest, ObTableQueryRequest), +OB_SERIALIZE_MEMBER((ObTableQueryAsyncRequest, ObTableQueryRequest), query_session_id_, query_type_ ); diff --git a/src/share/table/ob_table_rpc_struct.h b/src/share/table/ob_table_rpc_struct.h index c350a5aa8e..d603c1a8f1 100644 --- a/src/share/table/ob_table_rpc_struct.h +++ b/src/share/table/ob_table_rpc_struct.h @@ -252,15 +252,15 @@ public: ObBinlogRowImageType binlog_row_image_type_; }; -class ObTableQuerySyncRequest : public ObTableQueryRequest +class ObTableQueryAsyncRequest : public ObTableQueryRequest { OB_UNIS_VERSION(1); public: - ObTableQuerySyncRequest() + ObTableQueryAsyncRequest() :query_session_id_(0), query_type_(ObQueryOperationType::QUERY_MAX) {} - virtual ~ObTableQuerySyncRequest(){} + virtual ~ObTableQueryAsyncRequest(){} INHERIT_TO_STRING_KV("ObTableQueryRequest", ObTableQueryRequest, K_(query_session_id), K_(query_type)); diff --git a/unittest/observer/table/hfilter_parser.test b/unittest/observer/table/hfilter_parser.test index 6c467f2eb3..0feb1361d4 100644 --- a/unittest/observer/table/hfilter_parser.test +++ b/unittest/observer/table/hfilter_parser.test @@ -12,25 +12,25 @@ RowFilter(<=, 'binaryprefix:abc') AND RowFilter(!=, 'binaryprefix:abc') OR Value RowFilter(<=, 'binaryprefix:abc') OR RowFilter(!=, 'binaryprefix:abc') AND ValueFilter(=, 'substring:abc*123') (RowFilter(<=, 'binaryprefix:abc') OR RowFilter(!=, 'binaryprefix:abc')) AND ValueFilter(=, 'substring:abc*123') Skip While Skip RowFilter(<=, 'binaryprefix:abc') OR RowFilter(!=, 'binaryprefix:abc') AND ValueFilter(=, 'substring:abc*123') ---error 5006 +--error 10514 RowFilter(<=>, 'binary:abc') ---error 5006 +--error 10514 RowF(<, 'binary:abc') ---error 5006 +--error 10514 RowFilter(<, 'binary ---error 5006 +--error 10514 RowFilter(<, 'binary'') ---error 5006 +--error 10514 RowFilter(<, 'binary''') ---error 5006 +--error 10514 RowFilter(<, 'binary:abc')) ---error 5006 +--error 10514 (RowFilter(<, 'binary:abc') ---error 5006 +--error 10514 RowFilter(>, 'substring:abc') ---error 5006 +--error 10514 RowFilter(>=, 'regexstring:abc') ---error 5006 +--error 10514 ( singlecolumnvaluefilter ( !=, 'substring:abc', 'cf1', 'c1') ) ( singlecolumnvaluefilter ( 'cf1', 'c1', !=, 'substring:abc') ) PrefixFilter ( 'abc' )