From dad56a7695ba6dab40ecaa0beaa29f95267d904f Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 6 Feb 2023 23:02:50 +0800 Subject: [PATCH] [CP]htable support query async --- src/libtable/src/ob_table_rpc_impl.cpp | 8 +- src/libtable/test/ob_batch_execute_test.cpp | 100 ++++++++++++++++++ src/libtable/test/run_test_table_api.sh | 2 + .../table/ob_htable_filter_operator.cpp | 81 +++++++------- .../table/ob_htable_filter_operator.h | 5 +- .../table/ob_table_query_sync_processor.cpp | 98 +++-------------- .../table/ob_table_query_sync_processor.h | 12 +-- src/observer/table/ob_table_service.h | 4 +- src/share/table/ob_table_rpc_struct.h | 12 ++- 9 files changed, 183 insertions(+), 139 deletions(-) diff --git a/src/libtable/src/ob_table_rpc_impl.cpp b/src/libtable/src/ob_table_rpc_impl.cpp index 75f2002662..df0bb7b7db 100644 --- a/src/libtable/src/ob_table_rpc_impl.cpp +++ b/src/libtable/src/ob_table_rpc_impl.cpp @@ -210,7 +210,7 @@ int ObTableRpcImpl::aggregate_tablet_by_server(const ObIArray if (OB_FAIL(server_tablet_map.foreach_refactored(free_idx_array_fn))) { LOG_WARN("failed to do foreach", K(ret)); } - ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret; + ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret; return ret; } @@ -560,7 +560,7 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO ObTableQuerySyncRequest request; request.query_ = query; request.table_name_ = table_name_; - request.table_id_ = table_id_; + request.table_id_ = table_id_; request.tablet_id_ = tablet_id; request.credential_ = client_->get_credential(); request.entity_type_ = this->entity_type_; @@ -586,7 +586,7 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO } } } - return ret; + return ret; } int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) { @@ -621,5 +621,5 @@ int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObT LOG_WARN("failed to execute rpc call for query next", K(ret)); } } - return ret; + return ret; } diff --git a/src/libtable/test/ob_batch_execute_test.cpp b/src/libtable/test/ob_batch_execute_test.cpp index 581a5d1900..bf757f26af 100644 --- a/src/libtable/test/ob_batch_execute_test.cpp +++ b/src/libtable/test/ob_batch_execute_test.cpp @@ -8835,6 +8835,106 @@ TEST_F(TestBatchExecute, query_sync_multi_batch) the_table = NULL; } +// create table if not exists htable1_query_sync ( +// K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), +// primary key(K, Q, T)); +TEST_F(TestBatchExecute, htble_query_sync) +{ + // setup + ObTable *the_table = NULL; + int ret = service_client_->alloc_table(ObString::make_string("htable1_query_sync"), the_table); + ASSERT_EQ(OB_SUCCESS, ret); + the_table->set_entity_type(ObTableEntityType::ET_HKV); // important + ObTableEntityFactory entity_factory; + ObTableBatchOperation batch_operation; + ObITableEntity *entity = NULL; + //////////////////////////////////////////////////////////////// + static constexpr int64_t VERSIONS_COUNT = 10; + DefaultBuf *rows = new (std::nothrow) DefaultBuf[BATCH_SIZE]; + ASSERT_TRUE(NULL != rows); + ObObj key1, key2, key3; + ObObj value; + for (int64_t i = 0; i < BATCH_SIZE; ++i) { + sprintf(rows[i], "row%ld", i); + key1.set_varbinary(ObString::make_string(rows[i])); + key2.set_varbinary(ObString::make_string("")); // empty qualifier + for (int64_t k = 0; k < VERSIONS_COUNT; ++k) + { + key3.set_int(k); + entity = entity_factory.alloc(); + ASSERT_TRUE(NULL != entity); + ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key1)); + ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key2)); + ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key3)); + value.set_varbinary(ObString::make_string("value_string")); + ASSERT_EQ(OB_SUCCESS, entity->set_property(V, value)); + ASSERT_EQ(OB_SUCCESS, batch_operation.insert(*entity)); + } // end for + } // end for + + ASSERT_TRUE(!batch_operation.is_readonly()); + ASSERT_TRUE(batch_operation.is_same_type()); + ASSERT_TRUE(batch_operation.is_same_properties_names()); + ObTableBatchOperationResult result; + ASSERT_EQ(OB_SUCCESS, the_table->batch_execute(batch_operation, result)); + OB_LOG(INFO, "batch execute result", K(result)); + ASSERT_EQ(BATCH_SIZE*VERSIONS_COUNT, result.count()); + //////////////////////////////////////////////////////////////// + ObTableQuery query; + ASSERT_EQ(OB_SUCCESS, query.add_select_column(K)); + ASSERT_EQ(OB_SUCCESS, query.add_select_column(Q)); + ASSERT_EQ(OB_SUCCESS, query.add_select_column(T)); + ASSERT_EQ(OB_SUCCESS, query.add_select_column(V)); + ObObj pk_objs_start[3]; + pk_objs_start[0].set_min_value(); + pk_objs_start[1].set_min_value(); + pk_objs_start[2].set_min_value(); + ObObj pk_objs_end[3]; + pk_objs_end[0].set_max_value(); + pk_objs_end[1].set_max_value(); + pk_objs_end[2].set_max_value(); + ObNewRange range; + range.start_key_.assign(pk_objs_start, 3); + range.end_key_.assign(pk_objs_end, 3); + range.border_flag_.set_inclusive_start(); + range.border_flag_.set_inclusive_end(); + ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range)); + + ObHTableFilter &htable_filter = query.htable_filter(); + ASSERT_EQ(OB_SUCCESS, htable_filter.add_column(ObString::make_string(""))); + htable_filter.set_max_versions(2); + htable_filter.set_valid(true); + const int64_t query_round = 4; + query.set_batch(50); + + ObTableQuerySyncResult *iter = nullptr; + const ObITableEntity *result_entity = NULL; + uint64_t result_cnt = 0; + uint64_t round = 0; + ASSERT_EQ(OB_SUCCESS, the_table->query_start(query, iter)); + while (OB_SUCC(iter->get_next_entity(result_entity))) { + ++result_cnt; + } + ASSERT_EQ(OB_ITER_END, ret); + while (OB_SUCC(the_table->query_next(iter))) { + ++round; + // printf("iterator row count: %ld\n", iter->get_row_count()); + while (OB_SUCC(iter->get_next_entity(result_entity))) { + ++result_cnt; + } + printf("round: %ld\n", round); + ASSERT_EQ(OB_ITER_END, ret); + } + ASSERT_EQ(OB_ITER_END, ret); + ASSERT_EQ(200, result_cnt); // set_max_versions(2),故只有200条 + ASSERT_EQ(round, query_round - 1); // start已经扫描了一次 + //////////////////////////////////////////////////////////////// + // teardown + service_client_->free_table(the_table); + the_table = NULL; + delete [] rows; +} + // create table if not exists large_scan_query_sync_test (C1 bigint primary key, C2 bigint, C3 varchar(100)); TEST_F(TestBatchExecute, large_scan_query_sync) { diff --git a/src/libtable/test/run_test_table_api.sh b/src/libtable/test/run_test_table_api.sh index aa30e1960f..95579f7aed 100755 --- a/src/libtable/test/run_test_table_api.sh +++ b/src/libtable/test/run_test_table_api.sh @@ -81,6 +81,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment; mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment_empty; create table if not exists htable1_cf1_increment_empty like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; create table if not exists htable1_cf1_append like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db # run ./test_table_api "$HOST" "$PORT" "$tenant_name" "$user_name" "$passwd" "$db" "$table_name" $RPCPORT @@ -137,6 +138,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; cr mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_reverse; create table if not exists htable1_cf1_reverse like htable1_cf1" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_ttl; create table if not exists htable1_cf1_ttl (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T)) comment='{\"HColumnDescriptor\": {\"TimeToLive\": 5}}'" $db mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db +mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db # run ./test_table_api "$HOST" "$PORT" "$tenant_name" "$user_name" "$passwd" "$db" "$table_name" $RPCPORT diff --git a/src/observer/table/ob_htable_filter_operator.cpp b/src/observer/table/ob_htable_filter_operator.cpp index a652637048..eaca1e9ffb 100644 --- a/src/observer/table/ob_htable_filter_operator.cpp +++ b/src/observer/table/ob_htable_filter_operator.cpp @@ -615,25 +615,26 @@ int ObHTableScanMatcher::set_to_new_row(const ObHTableCell &arg_curr_row) //////////////////////////////////////////////////////////////// ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query) - :child_op_(NULL), - htable_filter_(query.get_htable_filter()), - hfilter_(NULL), - limit_per_row_per_cf_(htable_filter_.get_max_results_per_column_family()), - offset_per_row_per_cf_(htable_filter_.get_row_offset_per_column_family()), - max_result_size_(query.get_max_result_size()), - batch_size_(query.get_batch()), - time_to_live_(0), - curr_cell_(), - allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), - column_tracker_(NULL), - matcher_(NULL), - column_tracker_wildcard_(), - column_tracker_explicit_(), - matcher_impl_(htable_filter_), - scan_order_(query.get_scan_order()), - cell_count_(0), - count_per_row_(0), - has_more_cells_(true) + : ObTableQueryResultIterator(&query), + child_op_(NULL), + htable_filter_(query.get_htable_filter()), + hfilter_(NULL), + limit_per_row_per_cf_(htable_filter_.get_max_results_per_column_family()), + offset_per_row_per_cf_(htable_filter_.get_row_offset_per_column_family()), + max_result_size_(query.get_max_result_size()), + batch_size_(query.get_batch()), + time_to_live_(0), + curr_cell_(), + allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), + column_tracker_(NULL), + matcher_(NULL), + column_tracker_wildcard_(), + column_tracker_explicit_(), + matcher_impl_(htable_filter_), + scan_order_(query.get_scan_order()), + cell_count_(0), + count_per_row_(0), + has_more_cells_(true) {} ObHTableRowIterator::~ObHTableRowIterator() @@ -1027,14 +1028,14 @@ void ObHTableRowIterator::set_ttl(int32_t ttl_value) //////////////////////////////////////////////////////////////// ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query, table::ObTableQueryResult &one_result) - :query_(query), - row_iterator_(query), - one_result_(one_result), - hfilter_(NULL), - batch_size_(query.get_batch()), - max_result_size_(std::min(query.get_max_result_size(), - static_cast(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))), - is_first_result_(true) + : ObTableQueryResultIterator(&query), + row_iterator_(query), + one_result_(&one_result), + hfilter_(NULL), + batch_size_(query.get_batch()), + max_result_size_(std::min(query.get_max_result_size(), + static_cast(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))), + is_first_result_(true) { } @@ -1042,26 +1043,28 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query, int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) { int ret = OB_SUCCESS; - if (is_first_result_) { - is_first_result_ = false; - if (0 != one_result_.get_property_count()) { + if (is_first_result_ || is_query_sync_) { + if (is_first_result_) { + is_first_result_ = false; + } + if (0 != one_result_->get_property_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("property should be empty", K(ret)); } - const ObIArray &select_columns = query_.get_select_columns(); + const ObIArray &select_columns = query_->get_select_columns(); const int64_t N = select_columns.count(); for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { - if (OB_FAIL(one_result_.add_property_name(select_columns.at(i)))) { + if (OB_FAIL(one_result_->add_property_name(select_columns.at(i)))) { LOG_WARN("failed to copy name", K(ret)); } } // end for } else { - one_result_.reset_except_property(); + one_result_->reset_except_property(); } if (OB_SUCC(ret)) { bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row()); - next_result = &one_result_; + next_result = one_result_; ObTableQueryResult *htable_row = nullptr; // ObNewRow first_entity; // ObObj first_entity_cells[4]; @@ -1116,14 +1119,14 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) } /* @todo check batch limit and size limit */ // We have got one hbase row, store it to this batch - if (OB_FAIL(one_result_.add_all_row(*htable_row))) { + if (OB_FAIL(one_result_->add_all_row(*htable_row))) { LOG_WARN("failed to add cells to row", K(ret)); } if (NULL != hfilter_) { hfilter_->reset(); } if (OB_SUCC(ret)) { - if (one_result_.reach_batch_size_or_result_size(batch_size_, max_result_size_)) { + if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) { break; } } @@ -1133,17 +1136,17 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result) } } if (OB_ITER_END == ret - && one_result_.get_row_count() > 0) { + && one_result_->get_row_count() > 0) { ret = OB_SUCCESS; } - LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_.get_row_count()); + LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_->get_row_count()); return ret; } int ObHTableFilterOperator::parse_filter_string(common::ObArenaAllocator* allocator) { int ret = OB_SUCCESS; - const ObString &hfilter_string = query_.get_htable_filter().get_filter(); + const ObString &hfilter_string = query_->get_htable_filter().get_filter(); if (hfilter_string.empty()) { hfilter_ = NULL; } else if (NULL == allocator) { diff --git a/src/observer/table/ob_htable_filter_operator.h b/src/observer/table/ob_htable_filter_operator.h index 1b2b6ba0b8..f45db8e1e1 100644 --- a/src/observer/table/ob_htable_filter_operator.h +++ b/src/observer/table/ob_htable_filter_operator.h @@ -233,14 +233,15 @@ public: /// Fetch next batch result virtual int get_next_result(ObTableQueryResult *&one_result) override; virtual bool has_more_result() const override { return row_iterator_.has_more_result(); } + virtual table::ObTableQueryResult *get_one_result() override { return one_result_; } + virtual void set_one_result(ObTableQueryResult *result) override {one_result_ = result;} void set_scan_result(table::ObTableApiScanRowIterator *scan_result) { row_iterator_.set_scan_result(scan_result); } void set_ttl(int32_t ttl_value) { row_iterator_.set_ttl(ttl_value); } // parse the filter string int parse_filter_string(common::ObArenaAllocator* allocator); private: - const ObTableQuery &query_; ObHTableRowIterator row_iterator_; - table::ObTableQueryResult &one_result_; + table::ObTableQueryResult *one_result_; table::ObHTableFilterParser filter_parser_; table::hfilter::Filter *hfilter_; int32_t batch_size_; diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_sync_processor.cpp index 33bf182101..361f119f90 100644 --- a/src/observer/table/ob_table_query_sync_processor.cpp +++ b/src/observer/table/ob_table_query_sync_processor.cpp @@ -34,7 +34,7 @@ using namespace oceanbase::sql; /** * ---------------------------------------- ObTableQuerySyncSession ---------------------------------------- */ -void ObTableQuerySyncSession::set_result_iterator(ObNormalTableQueryResultIterator *query_result) +void ObTableQuerySyncSession::set_result_iterator(ObTableQueryResultIterator *query_result) { result_iterator_ = query_result; if (OB_NOT_NULL(result_iterator_)) { @@ -336,32 +336,6 @@ ObTableAPITransCb *ObTableQuerySyncP::new_callback(rpc::ObRequest *req) return nullptr; } -int ObTableQuerySyncP::get_tablet_ids(uint64_t table_id, ObIArray &tablet_ids) -{ - int ret = OB_SUCCESS; - ObTabletID tablet_id = arg_.tablet_id_; - share::schema::ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = NULL; - if (!tablet_id.is_valid()) { - if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("failed to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { - LOG_WARN("failed to get table schema", K(ret), K(table_id), K(table_schema)); - } else if (!table_schema->is_partitioned_table()) { - tablet_id = table_schema->get_tablet_id(); - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("partitioned table not supported", K(ret), K(table_id)); - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(tablet_ids.push_back(tablet_id))) { - LOG_WARN("failed to push back", K(ret)); - } - } - return ret; -} - int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid) { int ret = OB_SUCCESS; @@ -414,59 +388,6 @@ int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSessio return ret; } -int ObTableQuerySyncP::query_scan_with_old_context(const int64_t timeout) -{ - int ret = OB_SUCCESS; - ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator(); - if (OB_ISNULL(result_iterator)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("query result iterator null", K(ret)); - } else { - ObTableQueryResult *query_result = nullptr; - result_iterator->set_one_result(&result_); // set result_ as container - if (ObTimeUtility::current_time() > timeout) { - ret = OB_TRANS_TIMEOUT; - LOG_WARN("exceed operatiton timeout", K(ret)); - } else if (OB_FAIL(result_iterator->get_next_result(query_result))) { - if (OB_ITER_END == ret) { - result_.is_end_ = true; // set scan end - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to scan result", K(ret)); - } - } else { - result_.is_end_ = !result_iterator->has_more_result(); - } - } - return ret; -} - -int ObTableQuerySyncP::query_scan_with_new_context( - ObTableQuerySyncSession *query_session, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout) -{ - int ret = OB_SUCCESS; - ObTableQueryResult *query_result = nullptr; - if (ObTimeUtility::current_time() > timeout) { - ret = OB_TRANS_TIMEOUT; - LOG_WARN("exceed operatiton timeout", K(ret), K(rpc_pkt_)->get_timeout()); - } else if (OB_ISNULL(result_iterator)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected null result iterator", K(ret)); - } else if (OB_FAIL(result_iterator->get_next_result(query_result))) { - if (OB_ITER_END == ret) { // scan to end - ret = OB_SUCCESS; - result_.is_end_ = true; - } - } else if (result_iterator->has_more_result()){ - result_.is_end_ = false; - query_session->set_result_iterator(dynamic_cast(result_iterator)); - query_session->set_trans_desc(trans_desc_); // save processor's trans_desc_ to query session - } else { - result_.is_end_ = true; - } - return ret; -} - int ObTableQuerySyncP::init_tb_ctx(ObTableCtx &ctx) { int ret = OB_SUCCESS; @@ -526,6 +447,19 @@ int ObTableQuerySyncP::execute_query(ObTableQuerySyncSession &query_session) result_iter = htable_result_iter; } } + + if (OB_SUCC(ret)) { + // hbase model, compress the result packet + ObCompressorType compressor_type = INVALID_COMPRESSOR; + if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type( + GCONF.tableapi_transport_compress_func, compressor_type))) { + compressor_type = INVALID_COMPRESSOR; + } else if (NONE_COMPRESSOR == compressor_type) { + compressor_type = INVALID_COMPRESSOR; + } + this->set_result_compress_type(compressor_type); + ret = OB_SUCCESS; // reset ret + } } else { normal_result_iter = OB_NEWx(ObNormalTableQueryResultIterator, (allocator), query, result_); if (OB_ISNULL(normal_result_iter)) { @@ -571,7 +505,7 @@ int ObTableQuerySyncP::execute_query(ObTableQuerySyncSession &query_session) // 4. do scan and save result iter if (OB_SUCC(ret)) { ObTableQueryResult *one_result = nullptr; - query_session.set_result_iterator(dynamic_cast(result_iter)); + query_session.set_result_iterator(result_iter); if (ObTimeUtility::current_time() > timeout_ts_) { ret = OB_TRANS_TIMEOUT; LOG_WARN("exceed operatiton timeout", K(ret)); @@ -760,7 +694,7 @@ int ObTableQuerySyncP::deep_copy_result_property_names() { int ret = OB_SUCCESS; ObTableQuery &query = query_session_->get_query(); - ObNormalTableQueryResultIterator *result_iterator = query_session_->get_result_iterator(); + ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator(); if ((OB_NOT_NULL(result_iterator))) { const ObIArray &select_columns = query.get_select_columns(); ObTableQueryResult *one_result = result_iterator->get_one_result(); diff --git a/src/observer/table/ob_table_query_sync_processor.h b/src/observer/table/ob_table_query_sync_processor.h index d6046ae34b..0eb74bf2ad 100644 --- a/src/observer/table/ob_table_query_sync_processor.h +++ b/src/observer/table/ob_table_query_sync_processor.h @@ -74,13 +74,13 @@ public: {} ~ObTableQuerySyncSession(); - void set_result_iterator(ObNormalTableQueryResultIterator* iter); + void set_result_iterator(table::ObTableQueryResultIterator* iter); void set_in_use(bool in_use) {in_use_ = in_use;} bool is_in_use() {return in_use_;} int init(); void set_timout_ts(uint64_t timeout_ts) { timeout_ts_ = timeout_ts; } - ObNormalTableQueryResultIterator *get_result_iterator() { return result_iterator_; } + table::ObTableQueryResultIterator *get_result_iterator() { return result_iterator_; } ObArenaAllocator *get_allocator() {return &allocator_;} common::ObObjectID get_tenant_id() { return tenant_id_; } table::ObTableQuery &get_query() { return query_; } @@ -95,7 +95,7 @@ private: uint64_t timeout_ts_; common::ObObjectID tenant_id_; ObTableQuery query_; // deep copy from arg_.query_ - ObNormalTableQueryResultIterator *result_iterator_; + table::ObTableQueryResultIterator *result_iterator_; ObArenaAllocator allocator_; ObTableQuerySyncCtx query_ctx_; lib::MemoryContext iterator_mementity_; @@ -199,22 +199,16 @@ protected: private: int process_query_start(); int process_query_next(); - int process_query_end(); int destory_query_session(bool need_rollback_trans); DISALLOW_COPY_AND_ASSIGN(ObTableQuerySyncP); private: - int get_tablet_ids(uint64_t table_id, ObIArray &tablet_ids); int get_session_id(uint64_t &real_sessid, uint64_t arg_sessid); int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session); int query_scan_with_init(); int query_scan_without_init(); - int query_scan_with_old_context(const int64_t timeout); - int query_scan_with_new_context(ObTableQuerySyncSession * session_ctx, table::ObTableQueryResultIterator *result_iterator, - const int64_t timeout); private: - void set_trans_from_session(ObTableQuerySyncSession *query_session); int check_query_type(); int init_tb_ctx(table::ObTableCtx &ctx); int execute_query(ObTableQuerySyncSession &query_session); diff --git a/src/observer/table/ob_table_service.h b/src/observer/table/ob_table_service.h index f3cb964e2e..3e10939da2 100644 --- a/src/observer/table/ob_table_service.h +++ b/src/observer/table/ob_table_service.h @@ -134,8 +134,8 @@ public: virtual int get_next_result(table::ObTableQueryResult *&one_result) override; virtual bool has_more_result() const override; void set_scan_result(table::ObTableApiScanRowIterator *scan_result) { scan_result_ = scan_result; } - virtual void set_one_result(ObTableQueryResult *result) {one_result_ = result;} - table::ObTableQueryResult *get_one_result() { return one_result_; } + virtual void set_one_result(ObTableQueryResult *result) override {one_result_ = result;} + virtual table::ObTableQueryResult *get_one_result() override { return one_result_; } void set_query(const ObTableQuery *query) {query_ = query;} void set_query_sync() { is_query_sync_ = true ; } private: diff --git a/src/share/table/ob_table_rpc_struct.h b/src/share/table/ob_table_rpc_struct.h index 37d358b302..669e874e10 100644 --- a/src/share/table/ob_table_rpc_struct.h +++ b/src/share/table/ob_table_rpc_struct.h @@ -209,11 +209,21 @@ public: class ObTableQueryResultIterator { public: - ObTableQueryResultIterator() {} + ObTableQueryResultIterator(const ObTableQuery *query = nullptr) + : query_(query), + is_query_sync_(false) + { + } virtual ~ObTableQueryResultIterator() {} virtual int get_next_result(ObTableQueryResult *&one_result) = 0; virtual bool has_more_result() const = 0; virtual void set_one_result(ObTableQueryResult *result){ UNUSED(result); } + virtual ObTableQueryResult *get_one_result() { return nullptr; } + virtual void set_query(const ObTableQuery *query) { query_ = query; }; + virtual void set_query_sync() { is_query_sync_ = true; } +protected: + const ObTableQuery *query_; + bool is_query_sync_; }; class ObTableQueryAndMutateRequest final