[CP]htable support query async
This commit is contained in:
@ -210,7 +210,7 @@ int ObTableRpcImpl::aggregate_tablet_by_server(const ObIArray<ObTabletLocation>
|
||||
if (OB_FAIL(server_tablet_map.foreach_refactored(free_idx_array_fn))) {
|
||||
LOG_WARN("failed to do foreach", K(ret));
|
||||
}
|
||||
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
|
||||
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -560,7 +560,7 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO
|
||||
ObTableQuerySyncRequest request;
|
||||
request.query_ = query;
|
||||
request.table_name_ = table_name_;
|
||||
request.table_id_ = table_id_;
|
||||
request.table_id_ = table_id_;
|
||||
request.tablet_id_ = tablet_id;
|
||||
request.credential_ = client_->get_credential();
|
||||
request.entity_type_ = this->entity_type_;
|
||||
@ -586,7 +586,7 @@ int ObTableRpcImpl::query_start(const ObTableQuery& query, const ObTableRequestO
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObTableQuerySyncResult *&result) {
|
||||
@ -621,5 +621,5 @@ int ObTableRpcImpl::query_next(const ObTableRequestOptions &request_options, ObT
|
||||
LOG_WARN("failed to execute rpc call for query next", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
|
@ -8835,6 +8835,106 @@ TEST_F(TestBatchExecute, query_sync_multi_batch)
|
||||
the_table = NULL;
|
||||
}
|
||||
|
||||
// create table if not exists htable1_query_sync (
|
||||
// K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024),
|
||||
// primary key(K, Q, T));
|
||||
TEST_F(TestBatchExecute, htble_query_sync)
|
||||
{
|
||||
// setup
|
||||
ObTable *the_table = NULL;
|
||||
int ret = service_client_->alloc_table(ObString::make_string("htable1_query_sync"), the_table);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
the_table->set_entity_type(ObTableEntityType::ET_HKV); // important
|
||||
ObTableEntityFactory<ObTableEntity> entity_factory;
|
||||
ObTableBatchOperation batch_operation;
|
||||
ObITableEntity *entity = NULL;
|
||||
////////////////////////////////////////////////////////////////
|
||||
static constexpr int64_t VERSIONS_COUNT = 10;
|
||||
DefaultBuf *rows = new (std::nothrow) DefaultBuf[BATCH_SIZE];
|
||||
ASSERT_TRUE(NULL != rows);
|
||||
ObObj key1, key2, key3;
|
||||
ObObj value;
|
||||
for (int64_t i = 0; i < BATCH_SIZE; ++i) {
|
||||
sprintf(rows[i], "row%ld", i);
|
||||
key1.set_varbinary(ObString::make_string(rows[i]));
|
||||
key2.set_varbinary(ObString::make_string("")); // empty qualifier
|
||||
for (int64_t k = 0; k < VERSIONS_COUNT; ++k)
|
||||
{
|
||||
key3.set_int(k);
|
||||
entity = entity_factory.alloc();
|
||||
ASSERT_TRUE(NULL != entity);
|
||||
ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key1));
|
||||
ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key2));
|
||||
ASSERT_EQ(OB_SUCCESS, entity->add_rowkey_value(key3));
|
||||
value.set_varbinary(ObString::make_string("value_string"));
|
||||
ASSERT_EQ(OB_SUCCESS, entity->set_property(V, value));
|
||||
ASSERT_EQ(OB_SUCCESS, batch_operation.insert(*entity));
|
||||
} // end for
|
||||
} // end for
|
||||
|
||||
ASSERT_TRUE(!batch_operation.is_readonly());
|
||||
ASSERT_TRUE(batch_operation.is_same_type());
|
||||
ASSERT_TRUE(batch_operation.is_same_properties_names());
|
||||
ObTableBatchOperationResult result;
|
||||
ASSERT_EQ(OB_SUCCESS, the_table->batch_execute(batch_operation, result));
|
||||
OB_LOG(INFO, "batch execute result", K(result));
|
||||
ASSERT_EQ(BATCH_SIZE*VERSIONS_COUNT, result.count());
|
||||
////////////////////////////////////////////////////////////////
|
||||
ObTableQuery query;
|
||||
ASSERT_EQ(OB_SUCCESS, query.add_select_column(K));
|
||||
ASSERT_EQ(OB_SUCCESS, query.add_select_column(Q));
|
||||
ASSERT_EQ(OB_SUCCESS, query.add_select_column(T));
|
||||
ASSERT_EQ(OB_SUCCESS, query.add_select_column(V));
|
||||
ObObj pk_objs_start[3];
|
||||
pk_objs_start[0].set_min_value();
|
||||
pk_objs_start[1].set_min_value();
|
||||
pk_objs_start[2].set_min_value();
|
||||
ObObj pk_objs_end[3];
|
||||
pk_objs_end[0].set_max_value();
|
||||
pk_objs_end[1].set_max_value();
|
||||
pk_objs_end[2].set_max_value();
|
||||
ObNewRange range;
|
||||
range.start_key_.assign(pk_objs_start, 3);
|
||||
range.end_key_.assign(pk_objs_end, 3);
|
||||
range.border_flag_.set_inclusive_start();
|
||||
range.border_flag_.set_inclusive_end();
|
||||
ASSERT_EQ(OB_SUCCESS, query.add_scan_range(range));
|
||||
|
||||
ObHTableFilter &htable_filter = query.htable_filter();
|
||||
ASSERT_EQ(OB_SUCCESS, htable_filter.add_column(ObString::make_string("")));
|
||||
htable_filter.set_max_versions(2);
|
||||
htable_filter.set_valid(true);
|
||||
const int64_t query_round = 4;
|
||||
query.set_batch(50);
|
||||
|
||||
ObTableQuerySyncResult *iter = nullptr;
|
||||
const ObITableEntity *result_entity = NULL;
|
||||
uint64_t result_cnt = 0;
|
||||
uint64_t round = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, the_table->query_start(query, iter));
|
||||
while (OB_SUCC(iter->get_next_entity(result_entity))) {
|
||||
++result_cnt;
|
||||
}
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
while (OB_SUCC(the_table->query_next(iter))) {
|
||||
++round;
|
||||
// printf("iterator row count: %ld\n", iter->get_row_count());
|
||||
while (OB_SUCC(iter->get_next_entity(result_entity))) {
|
||||
++result_cnt;
|
||||
}
|
||||
printf("round: %ld\n", round);
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
}
|
||||
ASSERT_EQ(OB_ITER_END, ret);
|
||||
ASSERT_EQ(200, result_cnt); // set_max_versions(2),故只有200条
|
||||
ASSERT_EQ(round, query_round - 1); // start已经扫描了一次
|
||||
////////////////////////////////////////////////////////////////
|
||||
// teardown
|
||||
service_client_->free_table(the_table);
|
||||
the_table = NULL;
|
||||
delete [] rows;
|
||||
}
|
||||
|
||||
// create table if not exists large_scan_query_sync_test (C1 bigint primary key, C2 bigint, C3 varchar(100));
|
||||
TEST_F(TestBatchExecute, large_scan_query_sync)
|
||||
{
|
||||
|
@ -81,6 +81,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment;
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_increment_empty; create table if not exists htable1_cf1_increment_empty like htable1_cf1" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; create table if not exists htable1_cf1_append like htable1_cf1" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db
|
||||
|
||||
# run
|
||||
./test_table_api "$HOST" "$PORT" "$tenant_name" "$user_name" "$passwd" "$db" "$table_name" $RPCPORT
|
||||
@ -137,6 +138,7 @@ mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_append; cr
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_reverse; create table if not exists htable1_cf1_reverse like htable1_cf1" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_ttl; create table if not exists htable1_cf1_ttl (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T)) comment='{\"HColumnDescriptor\": {\"TimeToLive\": 5}}'" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_cf1_empty_cq; create table if not exists htable1_cf1_empty_cq (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db
|
||||
mysql -h $HOST -P $PORT -u $user -e "drop table if exists htable1_query_sync; create table if not exists htable1_query_sync (K varbinary(1024), Q varbinary(256), T bigint, V varbinary(1024), primary key(K, Q, T));" $db
|
||||
|
||||
# run
|
||||
./test_table_api "$HOST" "$PORT" "$tenant_name" "$user_name" "$passwd" "$db" "$table_name" $RPCPORT
|
||||
|
@ -615,25 +615,26 @@ int ObHTableScanMatcher::set_to_new_row(const ObHTableCell &arg_curr_row)
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
ObHTableRowIterator::ObHTableRowIterator(const ObTableQuery &query)
|
||||
:child_op_(NULL),
|
||||
htable_filter_(query.get_htable_filter()),
|
||||
hfilter_(NULL),
|
||||
limit_per_row_per_cf_(htable_filter_.get_max_results_per_column_family()),
|
||||
offset_per_row_per_cf_(htable_filter_.get_row_offset_per_column_family()),
|
||||
max_result_size_(query.get_max_result_size()),
|
||||
batch_size_(query.get_batch()),
|
||||
time_to_live_(0),
|
||||
curr_cell_(),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
column_tracker_(NULL),
|
||||
matcher_(NULL),
|
||||
column_tracker_wildcard_(),
|
||||
column_tracker_explicit_(),
|
||||
matcher_impl_(htable_filter_),
|
||||
scan_order_(query.get_scan_order()),
|
||||
cell_count_(0),
|
||||
count_per_row_(0),
|
||||
has_more_cells_(true)
|
||||
: ObTableQueryResultIterator(&query),
|
||||
child_op_(NULL),
|
||||
htable_filter_(query.get_htable_filter()),
|
||||
hfilter_(NULL),
|
||||
limit_per_row_per_cf_(htable_filter_.get_max_results_per_column_family()),
|
||||
offset_per_row_per_cf_(htable_filter_.get_row_offset_per_column_family()),
|
||||
max_result_size_(query.get_max_result_size()),
|
||||
batch_size_(query.get_batch()),
|
||||
time_to_live_(0),
|
||||
curr_cell_(),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
column_tracker_(NULL),
|
||||
matcher_(NULL),
|
||||
column_tracker_wildcard_(),
|
||||
column_tracker_explicit_(),
|
||||
matcher_impl_(htable_filter_),
|
||||
scan_order_(query.get_scan_order()),
|
||||
cell_count_(0),
|
||||
count_per_row_(0),
|
||||
has_more_cells_(true)
|
||||
{}
|
||||
|
||||
ObHTableRowIterator::~ObHTableRowIterator()
|
||||
@ -1027,14 +1028,14 @@ void ObHTableRowIterator::set_ttl(int32_t ttl_value)
|
||||
////////////////////////////////////////////////////////////////
|
||||
ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
|
||||
table::ObTableQueryResult &one_result)
|
||||
:query_(query),
|
||||
row_iterator_(query),
|
||||
one_result_(one_result),
|
||||
hfilter_(NULL),
|
||||
batch_size_(query.get_batch()),
|
||||
max_result_size_(std::min(query.get_max_result_size(),
|
||||
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
|
||||
is_first_result_(true)
|
||||
: ObTableQueryResultIterator(&query),
|
||||
row_iterator_(query),
|
||||
one_result_(&one_result),
|
||||
hfilter_(NULL),
|
||||
batch_size_(query.get_batch()),
|
||||
max_result_size_(std::min(query.get_max_result_size(),
|
||||
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
|
||||
is_first_result_(true)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1042,26 +1043,28 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
|
||||
int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_first_result_) {
|
||||
is_first_result_ = false;
|
||||
if (0 != one_result_.get_property_count()) {
|
||||
if (is_first_result_ || is_query_sync_) {
|
||||
if (is_first_result_) {
|
||||
is_first_result_ = false;
|
||||
}
|
||||
if (0 != one_result_->get_property_count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("property should be empty", K(ret));
|
||||
}
|
||||
const ObIArray<ObString> &select_columns = query_.get_select_columns();
|
||||
const ObIArray<ObString> &select_columns = query_->get_select_columns();
|
||||
const int64_t N = select_columns.count();
|
||||
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
|
||||
{
|
||||
if (OB_FAIL(one_result_.add_property_name(select_columns.at(i)))) {
|
||||
if (OB_FAIL(one_result_->add_property_name(select_columns.at(i)))) {
|
||||
LOG_WARN("failed to copy name", K(ret));
|
||||
}
|
||||
} // end for
|
||||
} else {
|
||||
one_result_.reset_except_property();
|
||||
one_result_->reset_except_property();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
bool has_filter_row = (NULL != hfilter_) && (hfilter_->has_filter_row());
|
||||
next_result = &one_result_;
|
||||
next_result = one_result_;
|
||||
ObTableQueryResult *htable_row = nullptr;
|
||||
// ObNewRow first_entity;
|
||||
// ObObj first_entity_cells[4];
|
||||
@ -1116,14 +1119,14 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
|
||||
}
|
||||
/* @todo check batch limit and size limit */
|
||||
// We have got one hbase row, store it to this batch
|
||||
if (OB_FAIL(one_result_.add_all_row(*htable_row))) {
|
||||
if (OB_FAIL(one_result_->add_all_row(*htable_row))) {
|
||||
LOG_WARN("failed to add cells to row", K(ret));
|
||||
}
|
||||
if (NULL != hfilter_) {
|
||||
hfilter_->reset();
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (one_result_.reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
|
||||
if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1133,17 +1136,17 @@ int ObHTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END == ret
|
||||
&& one_result_.get_row_count() > 0) {
|
||||
&& one_result_->get_row_count() > 0) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_.get_row_count());
|
||||
LOG_DEBUG("[yzfdebug] get_next_result", K(ret), "row_count", one_result_->get_row_count());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObHTableFilterOperator::parse_filter_string(common::ObArenaAllocator* allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObString &hfilter_string = query_.get_htable_filter().get_filter();
|
||||
const ObString &hfilter_string = query_->get_htable_filter().get_filter();
|
||||
if (hfilter_string.empty()) {
|
||||
hfilter_ = NULL;
|
||||
} else if (NULL == allocator) {
|
||||
|
@ -233,14 +233,15 @@ public:
|
||||
/// Fetch next batch result
|
||||
virtual int get_next_result(ObTableQueryResult *&one_result) override;
|
||||
virtual bool has_more_result() const override { return row_iterator_.has_more_result(); }
|
||||
virtual table::ObTableQueryResult *get_one_result() override { return one_result_; }
|
||||
virtual void set_one_result(ObTableQueryResult *result) override {one_result_ = result;}
|
||||
void set_scan_result(table::ObTableApiScanRowIterator *scan_result) { row_iterator_.set_scan_result(scan_result); }
|
||||
void set_ttl(int32_t ttl_value) { row_iterator_.set_ttl(ttl_value); }
|
||||
// parse the filter string
|
||||
int parse_filter_string(common::ObArenaAllocator* allocator);
|
||||
private:
|
||||
const ObTableQuery &query_;
|
||||
ObHTableRowIterator row_iterator_;
|
||||
table::ObTableQueryResult &one_result_;
|
||||
table::ObTableQueryResult *one_result_;
|
||||
table::ObHTableFilterParser filter_parser_;
|
||||
table::hfilter::Filter *hfilter_;
|
||||
int32_t batch_size_;
|
||||
|
@ -34,7 +34,7 @@ using namespace oceanbase::sql;
|
||||
/**
|
||||
* ---------------------------------------- ObTableQuerySyncSession ----------------------------------------
|
||||
*/
|
||||
void ObTableQuerySyncSession::set_result_iterator(ObNormalTableQueryResultIterator *query_result)
|
||||
void ObTableQuerySyncSession::set_result_iterator(ObTableQueryResultIterator *query_result)
|
||||
{
|
||||
result_iterator_ = query_result;
|
||||
if (OB_NOT_NULL(result_iterator_)) {
|
||||
@ -336,32 +336,6 @@ ObTableAPITransCb *ObTableQuerySyncP::new_callback(rpc::ObRequest *req)
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
int ObTableQuerySyncP::get_tablet_ids(uint64_t table_id, ObIArray<ObTabletID> &tablet_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletID tablet_id = arg_.tablet_id_;
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
if (!tablet_id.is_valid()) {
|
||||
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(table_id), K(table_schema));
|
||||
} else if (!table_schema->is_partitioned_table()) {
|
||||
tablet_id = table_schema->get_tablet_id();
|
||||
} else {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("partitioned table not supported", K(ret), K(table_id));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(tablet_ids.push_back(tablet_id))) {
|
||||
LOG_WARN("failed to push back", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -414,59 +388,6 @@ int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSessio
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableQuerySyncP::query_scan_with_old_context(const int64_t timeout)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator();
|
||||
if (OB_ISNULL(result_iterator)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("query result iterator null", K(ret));
|
||||
} else {
|
||||
ObTableQueryResult *query_result = nullptr;
|
||||
result_iterator->set_one_result(&result_); // set result_ as container
|
||||
if (ObTimeUtility::current_time() > timeout) {
|
||||
ret = OB_TRANS_TIMEOUT;
|
||||
LOG_WARN("exceed operatiton timeout", K(ret));
|
||||
} else if (OB_FAIL(result_iterator->get_next_result(query_result))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
result_.is_end_ = true; // set scan end
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to scan result", K(ret));
|
||||
}
|
||||
} else {
|
||||
result_.is_end_ = !result_iterator->has_more_result();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableQuerySyncP::query_scan_with_new_context(
|
||||
ObTableQuerySyncSession *query_session, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableQueryResult *query_result = nullptr;
|
||||
if (ObTimeUtility::current_time() > timeout) {
|
||||
ret = OB_TRANS_TIMEOUT;
|
||||
LOG_WARN("exceed operatiton timeout", K(ret), K(rpc_pkt_)->get_timeout());
|
||||
} else if (OB_ISNULL(result_iterator)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected null result iterator", K(ret));
|
||||
} else if (OB_FAIL(result_iterator->get_next_result(query_result))) {
|
||||
if (OB_ITER_END == ret) { // scan to end
|
||||
ret = OB_SUCCESS;
|
||||
result_.is_end_ = true;
|
||||
}
|
||||
} else if (result_iterator->has_more_result()){
|
||||
result_.is_end_ = false;
|
||||
query_session->set_result_iterator(dynamic_cast<ObNormalTableQueryResultIterator *>(result_iterator));
|
||||
query_session->set_trans_desc(trans_desc_); // save processor's trans_desc_ to query session
|
||||
} else {
|
||||
result_.is_end_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableQuerySyncP::init_tb_ctx(ObTableCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -526,6 +447,19 @@ int ObTableQuerySyncP::execute_query(ObTableQuerySyncSession &query_session)
|
||||
result_iter = htable_result_iter;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// hbase model, compress the result packet
|
||||
ObCompressorType compressor_type = INVALID_COMPRESSOR;
|
||||
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(
|
||||
GCONF.tableapi_transport_compress_func, compressor_type))) {
|
||||
compressor_type = INVALID_COMPRESSOR;
|
||||
} else if (NONE_COMPRESSOR == compressor_type) {
|
||||
compressor_type = INVALID_COMPRESSOR;
|
||||
}
|
||||
this->set_result_compress_type(compressor_type);
|
||||
ret = OB_SUCCESS; // reset ret
|
||||
}
|
||||
} else {
|
||||
normal_result_iter = OB_NEWx(ObNormalTableQueryResultIterator, (allocator), query, result_);
|
||||
if (OB_ISNULL(normal_result_iter)) {
|
||||
@ -571,7 +505,7 @@ int ObTableQuerySyncP::execute_query(ObTableQuerySyncSession &query_session)
|
||||
// 4. do scan and save result iter
|
||||
if (OB_SUCC(ret)) {
|
||||
ObTableQueryResult *one_result = nullptr;
|
||||
query_session.set_result_iterator(dynamic_cast<ObNormalTableQueryResultIterator *>(result_iter));
|
||||
query_session.set_result_iterator(result_iter);
|
||||
if (ObTimeUtility::current_time() > timeout_ts_) {
|
||||
ret = OB_TRANS_TIMEOUT;
|
||||
LOG_WARN("exceed operatiton timeout", K(ret));
|
||||
@ -760,7 +694,7 @@ int ObTableQuerySyncP::deep_copy_result_property_names()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableQuery &query = query_session_->get_query();
|
||||
ObNormalTableQueryResultIterator *result_iterator = query_session_->get_result_iterator();
|
||||
ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator();
|
||||
if ((OB_NOT_NULL(result_iterator))) {
|
||||
const ObIArray<ObString> &select_columns = query.get_select_columns();
|
||||
ObTableQueryResult *one_result = result_iterator->get_one_result();
|
||||
|
@ -74,13 +74,13 @@ public:
|
||||
{}
|
||||
~ObTableQuerySyncSession();
|
||||
|
||||
void set_result_iterator(ObNormalTableQueryResultIterator* iter);
|
||||
void set_result_iterator(table::ObTableQueryResultIterator* iter);
|
||||
void set_in_use(bool in_use) {in_use_ = in_use;}
|
||||
bool is_in_use() {return in_use_;}
|
||||
int init();
|
||||
|
||||
void set_timout_ts(uint64_t timeout_ts) { timeout_ts_ = timeout_ts; }
|
||||
ObNormalTableQueryResultIterator *get_result_iterator() { return result_iterator_; }
|
||||
table::ObTableQueryResultIterator *get_result_iterator() { return result_iterator_; }
|
||||
ObArenaAllocator *get_allocator() {return &allocator_;}
|
||||
common::ObObjectID get_tenant_id() { return tenant_id_; }
|
||||
table::ObTableQuery &get_query() { return query_; }
|
||||
@ -95,7 +95,7 @@ private:
|
||||
uint64_t timeout_ts_;
|
||||
common::ObObjectID tenant_id_;
|
||||
ObTableQuery query_; // deep copy from arg_.query_
|
||||
ObNormalTableQueryResultIterator *result_iterator_;
|
||||
table::ObTableQueryResultIterator *result_iterator_;
|
||||
ObArenaAllocator allocator_;
|
||||
ObTableQuerySyncCtx query_ctx_;
|
||||
lib::MemoryContext iterator_mementity_;
|
||||
@ -199,22 +199,16 @@ protected:
|
||||
private:
|
||||
int process_query_start();
|
||||
int process_query_next();
|
||||
int process_query_end();
|
||||
int destory_query_session(bool need_rollback_trans);
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableQuerySyncP);
|
||||
|
||||
private:
|
||||
int get_tablet_ids(uint64_t table_id, ObIArray<ObTabletID> &tablet_ids);
|
||||
int get_session_id(uint64_t &real_sessid, uint64_t arg_sessid);
|
||||
int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session);
|
||||
int query_scan_with_init();
|
||||
int query_scan_without_init();
|
||||
int query_scan_with_old_context(const int64_t timeout);
|
||||
int query_scan_with_new_context(ObTableQuerySyncSession * session_ctx, table::ObTableQueryResultIterator *result_iterator,
|
||||
const int64_t timeout);
|
||||
|
||||
private:
|
||||
void set_trans_from_session(ObTableQuerySyncSession *query_session);
|
||||
int check_query_type();
|
||||
int init_tb_ctx(table::ObTableCtx &ctx);
|
||||
int execute_query(ObTableQuerySyncSession &query_session);
|
||||
|
@ -134,8 +134,8 @@ public:
|
||||
virtual int get_next_result(table::ObTableQueryResult *&one_result) override;
|
||||
virtual bool has_more_result() const override;
|
||||
void set_scan_result(table::ObTableApiScanRowIterator *scan_result) { scan_result_ = scan_result; }
|
||||
virtual void set_one_result(ObTableQueryResult *result) {one_result_ = result;}
|
||||
table::ObTableQueryResult *get_one_result() { return one_result_; }
|
||||
virtual void set_one_result(ObTableQueryResult *result) override {one_result_ = result;}
|
||||
virtual table::ObTableQueryResult *get_one_result() override { return one_result_; }
|
||||
void set_query(const ObTableQuery *query) {query_ = query;}
|
||||
void set_query_sync() { is_query_sync_ = true ; }
|
||||
private:
|
||||
|
@ -209,11 +209,21 @@ public:
|
||||
class ObTableQueryResultIterator
|
||||
{
|
||||
public:
|
||||
ObTableQueryResultIterator() {}
|
||||
ObTableQueryResultIterator(const ObTableQuery *query = nullptr)
|
||||
: query_(query),
|
||||
is_query_sync_(false)
|
||||
{
|
||||
}
|
||||
virtual ~ObTableQueryResultIterator() {}
|
||||
virtual int get_next_result(ObTableQueryResult *&one_result) = 0;
|
||||
virtual bool has_more_result() const = 0;
|
||||
virtual void set_one_result(ObTableQueryResult *result){ UNUSED(result); }
|
||||
virtual ObTableQueryResult *get_one_result() { return nullptr; }
|
||||
virtual void set_query(const ObTableQuery *query) { query_ = query; };
|
||||
virtual void set_query_sync() { is_query_sync_ = true; }
|
||||
protected:
|
||||
const ObTableQuery *query_;
|
||||
bool is_query_sync_;
|
||||
};
|
||||
|
||||
class ObTableQueryAndMutateRequest final
|
||||
|
Reference in New Issue
Block a user