[CP] [Fix] table async query
This commit is contained in:
@ -65,11 +65,22 @@ int ObTableQuerySyncSession::init()
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTableQuerySyncSession::~ObTableQuerySyncSession()
|
||||
int ObTableQuerySyncSession::deep_copy_select_columns(const common::ObIArray<common::ObString> &query_cols_names_,
|
||||
const common::ObIArray<common::ObString> &tb_ctx_cols_names_)
|
||||
{
|
||||
if (OB_NOT_NULL(iterator_mementity_)) {
|
||||
DESTROY_CONTEXT(iterator_mementity_);
|
||||
int ret = OB_SUCCESS;
|
||||
// use column names specified in the query if provided
|
||||
// otherwise default to column names from the table context
|
||||
const common::ObIArray<common::ObString> &source_cols = query_cols_names_.count() == 0 ? tb_ctx_cols_names_ : query_cols_names_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < source_cols.count(); i++) {
|
||||
common::ObString select_column;
|
||||
if (OB_FAIL(ob_write_string(allocator_, source_cols.at(i), select_column))) {
|
||||
LOG_WARN("fail to deep copy select column", K(ret), K(select_columns_.at(i)));
|
||||
} else if (OB_FAIL(select_columns_.push_back(select_column))) {
|
||||
LOG_WARN("fail to push back select column", K(ret), K(select_column));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -515,6 +526,8 @@ int ObTableQuerySyncP::query_scan_with_init()
|
||||
LOG_WARN("fail to deep copy query", K(ret), K(arg_.query_));
|
||||
} else if (OB_FAIL(init_tb_ctx(tb_ctx))) {
|
||||
LOG_WARN("fail to init table ctx", K(ret));
|
||||
} else if (OB_FAIL(query_session_->deep_copy_select_columns(query.get_select_columns(), tb_ctx.get_query_col_names()))) {
|
||||
LOG_WARN("fail to deep copy select columns from table ctx", K(ret));
|
||||
} else if (OB_FAIL(start_trans(true, /* is_readonly */
|
||||
sql::stmt::T_SELECT,
|
||||
arg_.consistency_level_,
|
||||
@ -545,7 +558,7 @@ int ObTableQuerySyncP::query_scan_without_init()
|
||||
if (OB_ISNULL(result_iter)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("unexpected null result iterator", K(ret));
|
||||
} else if (OB_FAIL(result_.deep_copy_property_names(query_session_->get_query().get_select_columns()))) {
|
||||
} else if (OB_FAIL(result_.deep_copy_property_names(query_session_->get_select_columns()))) {
|
||||
LOG_WARN("fail to deep copy property names to one result", K(ret), K(query_session_->get_query()));
|
||||
} else {
|
||||
ObTableQueryResult *query_result = nullptr;
|
||||
@ -606,27 +619,38 @@ int ObTableQuerySyncP::try_process()
|
||||
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||
} else {
|
||||
if (ObQueryOperationType::QUERY_START == arg_.query_type_) {
|
||||
ret = process_query_start();
|
||||
} else if(ObQueryOperationType::QUERY_NEXT == arg_.query_type_) {
|
||||
ret = process_query_next();
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("query execution failed, need rollback", K(ret));
|
||||
int tmp_ret = ret;
|
||||
if (OB_FAIL(destory_query_session(true))) {
|
||||
LOG_WARN("faild to destory query session", K(ret));
|
||||
WITH_CONTEXT(query_session_->get_memory_ctx()) {
|
||||
if (ObQueryOperationType::QUERY_START == arg_.query_type_) {
|
||||
ret = process_query_start();
|
||||
} else if (ObQueryOperationType::QUERY_NEXT == arg_.query_type_) {
|
||||
ret = process_query_next();
|
||||
}
|
||||
ret = tmp_ret;
|
||||
} else if (result_.is_end_) {
|
||||
if (OB_FAIL(destory_query_session(false))) {
|
||||
LOG_WARN("fail to destory query session", K(ret), K(query_session_id_));
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("query execution failed, need rollback", K(ret));
|
||||
int tmp_ret = ret;
|
||||
if (OB_FAIL(destory_query_session(true))) {
|
||||
LOG_WARN("faild to destory query session", K(ret));
|
||||
}
|
||||
ret = tmp_ret;
|
||||
} else if (result_.is_end_) {
|
||||
if (OB_FAIL(destory_query_session(false))) {
|
||||
LOG_WARN("fail to destory query session", K(ret), K(query_session_id_));
|
||||
}
|
||||
} else {
|
||||
query_session_->set_in_use(false);
|
||||
}
|
||||
} else {
|
||||
query_session_->set_in_use(false);
|
||||
}
|
||||
}
|
||||
LOG_INFO("one query sync finish", K(result_.is_end_));
|
||||
|
||||
#ifndef NDEBUG
|
||||
// debug mode
|
||||
LOG_INFO("[TABLE] execute query", K(ret), K_(arg), K(result_),
|
||||
K_(retry_count), K_(result_row_count));
|
||||
#else
|
||||
// release mode
|
||||
LOG_TRACE("[TABLE] execute query", K(ret), K_(arg), K_(timeout_ts), K_(retry_count), K(result_.is_end_),
|
||||
"receive_ts", get_receive_timestamp(), K_(result_row_count));
|
||||
#endif
|
||||
|
||||
stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY_SYNC; // table querysync
|
||||
return ret;
|
||||
|
||||
@ -54,6 +54,24 @@ struct ObTableQuerySyncCtx
|
||||
table::ObTableApiScanRowIterator row_iter_;
|
||||
};
|
||||
|
||||
/**
|
||||
* ---------------------------------------- ObTableQueryAsyncEntifyDestroyGuard ----------------------------------------
|
||||
*/
|
||||
class ObTableQueryAsyncEntifyDestroyGuard
|
||||
{
|
||||
public:
|
||||
ObTableQueryAsyncEntifyDestroyGuard(lib::MemoryContext &entity) : ref_(entity) {}
|
||||
~ObTableQueryAsyncEntifyDestroyGuard()
|
||||
{
|
||||
if (OB_NOT_NULL(ref_)) {
|
||||
DESTROY_CONTEXT(ref_);
|
||||
ref_ = NULL;
|
||||
}
|
||||
}
|
||||
private:
|
||||
lib::MemoryContext &ref_;
|
||||
};
|
||||
|
||||
/**
|
||||
* ---------------------------------------- ObTableQuerySyncSession ----------------------------------------
|
||||
*/
|
||||
@ -65,14 +83,16 @@ public:
|
||||
explicit ObTableQuerySyncSession()
|
||||
: in_use_(true),
|
||||
timeout_ts_(10000000),
|
||||
iterator_mementity_(nullptr),
|
||||
iterator_mementity_destroy_guard_(iterator_mementity_),
|
||||
allocator_("TbAQueryP", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
tenant_id_(MTL_ID()),
|
||||
query_(),
|
||||
select_columns_(),
|
||||
result_iterator_(nullptr),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
query_ctx_(allocator_),
|
||||
iterator_mementity_(nullptr)
|
||||
query_ctx_(allocator_)
|
||||
{}
|
||||
~ObTableQuerySyncSession();
|
||||
~ObTableQuerySyncSession() {}
|
||||
|
||||
void set_result_iterator(table::ObTableQueryResultIterator* iter);
|
||||
table::ObTableQueryResultIterator *get_result_iter() { return result_iterator_; };
|
||||
@ -82,9 +102,14 @@ public:
|
||||
|
||||
void set_timout_ts(uint64_t timeout_ts) { timeout_ts_ = timeout_ts; }
|
||||
table::ObTableQueryResultIterator *get_result_iterator() { return result_iterator_; }
|
||||
lib::MemoryContext &get_memory_ctx() { return iterator_mementity_; }
|
||||
ObArenaAllocator *get_allocator() {return &allocator_;}
|
||||
common::ObObjectID get_tenant_id() { return tenant_id_; }
|
||||
table::ObTableQuery &get_query() { return query_; }
|
||||
common::ObIArray<common::ObString> &get_select_columns() { return select_columns_; }
|
||||
int deep_copy_select_columns(const common::ObIArray<common::ObString> &query_cols_names_,
|
||||
const common::ObIArray<common::ObString> &tb_ctx_cols_names_);
|
||||
|
||||
ObTableQuerySyncCtx &get_query_ctx() { return query_ctx_; }
|
||||
public:
|
||||
sql::TransState* get_trans_state() {return &trans_state_;}
|
||||
@ -93,12 +118,14 @@ public:
|
||||
private:
|
||||
bool in_use_;
|
||||
uint64_t timeout_ts_;
|
||||
lib::MemoryContext iterator_mementity_;
|
||||
ObTableQueryAsyncEntifyDestroyGuard iterator_mementity_destroy_guard_;
|
||||
ObArenaAllocator allocator_;
|
||||
common::ObObjectID tenant_id_;
|
||||
table::ObTableQuery query_; // deep copy from arg_.query_
|
||||
ObSEArray<ObString, 16> select_columns_; // deep copy from tb_ctx or query, which includes all the actual col names the user acquired
|
||||
table::ObTableQueryResultIterator *result_iterator_;
|
||||
ObArenaAllocator allocator_;
|
||||
ObTableQuerySyncCtx query_ctx_;
|
||||
lib::MemoryContext iterator_mementity_;
|
||||
|
||||
private:
|
||||
// txn control
|
||||
|
||||
@ -957,6 +957,8 @@ public:
|
||||
query_session_id_(0)
|
||||
{}
|
||||
virtual ~ObTableQuerySyncResult() {}
|
||||
public:
|
||||
INHERIT_TO_STRING_KV("ObTableQueryResult", ObTableQueryResult, K_(is_end), K_(query_session_id));
|
||||
public:
|
||||
bool is_end_;
|
||||
uint64_t query_session_id_; // from server gen
|
||||
|
||||
Reference in New Issue
Block a user