diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 4b61fb1ac7..e0935b7bb2 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -580,6 +580,7 @@ PCODE_DEF(OB_TABLE_API_EXECUTE, 0x1102) PCODE_DEF(OB_TABLE_API_BATCH_EXECUTE, 0x1103) PCODE_DEF(OB_TABLE_API_EXECUTE_QUERY, 0x1104) PCODE_DEF(OB_TABLE_API_QUERY_AND_MUTATE, 0x1105) +PCODE_DEF(OB_TABLE_API_EXECUTE_QUERY_SYNC, 0x1106) // Event Job API PCODE_DEF(OB_RUN_EVENT_JOB, 0x1201) diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index e8e789da9e..a4ca55b87b 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -262,6 +262,7 @@ ob_set_subtarget(ob_server table table/ob_htable_filters.cpp table/htable_filter_tab.cxx table/htable_filter_lex.cxx + table/ob_table_query_sync_processor.cpp ) set_source_files_properties(table/htable_filter_lex.cxx PROPERTIES COMPILE_FLAGS -Wno-null-conversion) diff --git a/src/observer/ob_srv_xlator_partition.cpp b/src/observer/ob_srv_xlator_partition.cpp index 85bb7d8547..f36acfbca7 100644 --- a/src/observer/ob_srv_xlator_partition.cpp +++ b/src/observer/ob_srv_xlator_partition.cpp @@ -42,6 +42,7 @@ #include "observer/table/ob_table_batch_execute_processor.h" #include "observer/table/ob_table_query_processor.h" #include "observer/table/ob_table_query_and_mutate_processor.h" +#include "observer/table/ob_table_query_sync_processor.h" using namespace oceanbase; using namespace oceanbase::observer; @@ -138,6 +139,7 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator* xlator) RPC_PROCESSOR(ObTableBatchExecuteP, gctx_); RPC_PROCESSOR(ObTableQueryP, gctx_); RPC_PROCESSOR(ObTableQueryAndMutateP, gctx_); + RPC_PROCESSOR(ObTableQuerySyncP, gctx_); // HA GTS RPC_PROCESSOR(ObHaGtsPingRequestP, gctx_); diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_sync_processor.cpp new file mode 100644 index 0000000000..5072001669 --- /dev/null +++ b/src/observer/table/ob_table_query_sync_processor.cpp @@ -0,0 +1,589 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER +#include "ob_table_query_sync_processor.h" +#include "ob_table_rpc_processor_util.h" +#include "observer/ob_service.h" +#include "storage/ob_partition_service.h" +#include "ob_table_end_trans_cb.h" +#include "sql/optimizer/ob_table_location.h" // ObTableLocation +#include "lib/stat/ob_diagnose_info.h" +#include "lib/stat/ob_session_stat.h" +#include "observer/ob_server.h" +#include "lib/string/ob_strings.h" + +using namespace oceanbase::observer; +using namespace oceanbase::common; +using namespace oceanbase::table; +using namespace oceanbase::share; +using namespace oceanbase::sql; + +/** + * ---------------------------------------- ObTableQuerySyncSession ---------------------------------------- + */ +int ObTableQuerySyncSession::deep_copy_select_columns(const ObTableQuery &query) +{ + int ret = OB_SUCCESS; + const ObIArray &select_columns = query.get_select_columns(); + const int64_t N = select_columns.count(); + ObString tmp_str; + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { + if (OB_FAIL(ob_write_string(*get_allocator(), select_columns.at(i), tmp_str))) { + LOG_WARN("failed to copy column name", K(ret)); + break; + } else if (OB_FAIL(query_.add_select_column(tmp_str))) { + LOG_WARN("failed to add column name", K(ret)); + } + } // end for + return ret; +} + +void ObTableQuerySyncSession::set_result_iterator(ObNormalTableQueryResultIterator *query_result) +{ + result_iterator_ = query_result; + if (OB_NOT_NULL(result_iterator_)) { + result_iterator_->set_query(&query_); + result_iterator_->set_query_sync(); + } +} + +int ObTableQuerySyncSession::init() { + int ret = OB_SUCCESS; + lib::MemoryContext mem_context = nullptr; + lib::ContextParam param; + param.set_mem_attr(lib::current_tenant_id(), ObModIds::TABLE_PROC, ObCtxIds::DEFAULT_CTX_ID) + .set_properties(lib::ALLOC_THREAD_SAFE); + + if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(mem_context, param))) { + LOG_WARN("fail to create mem context", K(ret)); + } else if (OB_ISNULL(mem_context)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null mem context ", K(ret)); + } else if (OB_NOT_NULL(iterator_mementity_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("iterator_mementity_ should be NULL", K(ret)); + } else { + table_service_ctx_.scan_param_.iterator_mementity_ = mem_context; + table_service_ctx_.scan_param_.allocator_ = &mem_context->get_arena_allocator(); + iterator_mementity_ = mem_context; + } + return ret; +} + +ObTableQuerySyncSession::~ObTableQuerySyncSession() +{ + if (OB_NOT_NULL(iterator_mementity_)) { + DESTROY_CONTEXT(iterator_mementity_); + } +} + +/** + * ----------------------------------- ObQuerySyncSessionRecycle ------------------------------------- + */ +void ObQuerySyncSessionRecycle::runTimerTask() +{ + query_session_recycle(); +} + +void ObQuerySyncSessionRecycle::query_session_recycle() +{ + ObQuerySyncMgr::get_instance().clean_timeout_query_session(); +} + +/** + * -----------------------------------Singleton ObQuerySyncMgr ------------------------------------- + */ +int64_t ObQuerySyncMgr::once_ = 0; +ObQuerySyncMgr *ObQuerySyncMgr::instance_ = NULL; + +ObQuerySyncMgr::ObQuerySyncMgr() : session_id_(0) +{} + +ObQuerySyncMgr &ObQuerySyncMgr::get_instance() +{ + ObQuerySyncMgr *instance = NULL; + while (OB_UNLIKELY(once_ < 2)) { + if (ATOMIC_BCAS(&once_, 0, 1)) { + instance = OB_NEW(ObQuerySyncMgr, ObModIds::TABLE_PROC); + if (OB_LIKELY(OB_NOT_NULL(instance))) { + if (common::OB_SUCCESS != instance->init()) { + LOG_WARN("failed to init ObQuerySyncMgr instance"); + OB_DELETE(ObQuerySyncMgr, ObModIds::TABLE_PROC, instance); + instance = NULL; + ATOMIC_BCAS(&once_, 1, 0); + } else { + instance_ = instance; + (void)ATOMIC_BCAS(&once_, 1, 2); + } + } else { + (void)ATOMIC_BCAS(&once_, 1, 0); + } + } + } + return *(ObQuerySyncMgr *)instance_; +} + +int ObQuerySyncMgr::init() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(query_session_map_.create(QUERY_SESSION_MAX_SIZE, ObModIds::TABLE_PROC, ObModIds::TABLE_PROC))) { + LOG_WARN("fail to create query session map", K(ret)); + } else if (OB_FAIL(timer_.init())) { + LOG_WARN("fail to init timer_", K(ret)); + } else if (OB_FAIL(timer_.schedule(query_session_recycle_, QUERY_SESSION_CLEAN_DELAY, true))) { + LOG_WARN("fail to schedule query session clean task. ", K(ret)); + } else if (OB_FAIL(timer_.start())) { + LOG_WARN("fail to start query session clean task timer.", K(ret)); + } + return ret; +} + +uint64_t ObQuerySyncMgr::generate_query_sessid() +{ + return ATOMIC_AAF(&session_id_, 1); +} + +int ObQuerySyncMgr::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session) +{ + int ret = OB_SUCCESS; + get_locker(sessid).lock(); + if (OB_FAIL(query_session_map_.get_refactored(sessid, query_session))) { + if (OB_HASH_NOT_EXIST != ret) { + ret = OB_ERR_UNEXPECTED; + } + } else if (OB_ISNULL(query_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected null query session", K(ret), K(sessid)); + } else if (query_session->is_in_use()) { // one session cannot be held concurrently + ret = OB_ERR_UNEXPECTED; + LOG_WARN("query session already in use", K(sessid)); + } else { + query_session->set_in_use(true); + } + get_locker(sessid).unlock(); + return ret; +} + +int ObQuerySyncMgr::set_query_session(uint64_t sessid, ObTableQuerySyncSession *query_session) +{ + int ret = OB_SUCCESS; + bool force = false; + if (OB_FAIL(query_session_map_.set_refactored(sessid, query_session, false))) { + LOG_WARN("set query session failed", K(ret), K(sessid)); + } + return ret; +} + +void ObQuerySyncMgr::clean_timeout_query_session() +{ + int ret = OB_SUCCESS; + common::ObSEArray session_id_array; + ObGetAllSessionIdOp op(session_id_array); + if (OB_FAIL(query_session_map_.foreach_refactored(op))) { + LOG_WARN("fail to get all session id from query sesion map", K(ret)); + } else { + for (int64_t i = 0; i < session_id_array.count(); i++) { + uint64_t sess_id = session_id_array.at(i); + ObTableQuerySyncSession *query_session = nullptr; + get_locker(sess_id).lock(); + if (OB_FAIL(query_session_map_.get_refactored(sess_id, query_session))) { + LOG_DEBUG("query session already deleted by worker", K(ret), K(sess_id)); + } else if (OB_ISNULL(query_session)) { + ret = OB_ERR_NULL_VALUE; + (void)query_session_map_.erase_refactored(sess_id); + LOG_WARN("unexpected null query sesion", K(ret)); + } else if (query_session->is_in_use()) { + } else if (QUERY_SESSION_TIMEOUT + query_session->get_timestamp() > ObTimeUtility::current_time()) { + } else { + const ObGlobalContext &gctx = ObServer::get_instance().get_gctx(); // get gctx + storage::ObPartitionService *part_service = gctx.par_ser_; // get part_service + if (OB_ISNULL(part_service)) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("free query session fail, part service null", K(ret)); + } else { + ObTableServiceQueryCtx *table_service_ctx = query_session->get_table_service_ctx(); + if (OB_ISNULL(table_service_ctx)) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("free query session fail, table service context null", K(ret)); + } else { + table_service_ctx->destroy_result_iterator(part_service); + } + } + (void)query_session_map_.erase_refactored(sess_id); + OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + // connection loses or bug exists + LOG_WARN("clean timeout query session success", K(ret), K(sess_id)); + } + get_locker(sess_id).unlock(); + } + + } +} + +ObQuerySyncMgr::ObQueryHashMap *ObQuerySyncMgr::get_query_session_map() +{ + return &query_session_map_; +} + +ObTableQuerySyncSession *ObQuerySyncMgr::alloc_query_session() +{ + int ret = OB_SUCCESS; + ObTableQuerySyncSession *query_session = OB_NEW(ObTableQuerySyncSession, ObModIds::TABLE_PROC); + if (OB_ISNULL(query_session)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate ObTableQuerySyncSession", K(ret)); + } else if (OB_FAIL(query_session->init())) { + LOG_WARN("failed to init query session", K(ret)); + OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + } + return query_session; +} + +int ObQuerySyncMgr::ObGetAllSessionIdOp::operator()(QuerySessionPair& entry) { + int ret = OB_SUCCESS; + if (OB_FAIL(session_id_array_.push_back(entry.first))) { + LOG_WARN("fail to push back query session id", K(ret)); + } + return ret; +} + +/** + * -------------------------------------- ObTableQuerySyncP ---------------------------------------- + */ +ObTableQuerySyncP::ObTableQuerySyncP(const ObGlobalContext &gctx) + : ObTableRpcProcessor(gctx), + table_service_ctx_(nullptr), + result_row_count_(0), + query_session_id_(0), + allocator_(ObModIds::TABLE_PROC), + query_session_(nullptr) +{} + +int ObTableQuerySyncP::deserialize() +{ + arg_.query_.set_deserialize_allocator(&allocator_); + return ParentType::deserialize(); +} + +int ObTableQuerySyncP::check_arg() +{ + int ret = OB_SUCCESS; + if (arg_.query_type_ == ObQueryOperationType::QUERY_START && !arg_.query_.is_valid() && + arg_.query_.get_htable_filter().is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid table query request", K(ret), "query", arg_.query_); + } else if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG || + arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("some options not supported yet", K(ret), "consistency_level", arg_.consistency_level_); + } + return ret; +} + +void ObTableQuerySyncP::audit_on_finish() +{ + audit_record_.consistency_level_ = ObTableConsistencyLevel::STRONG == arg_.consistency_level_ + ? ObConsistencyLevel::STRONG + : ObConsistencyLevel::WEAK; + audit_record_.return_rows_ = result_.get_row_count(); + audit_record_.table_scan_ = true; // todo: exact judgement + audit_record_.affected_rows_ = result_.get_row_count(); + audit_record_.try_cnt_ = retry_count_ + 1; +} + +uint64_t ObTableQuerySyncP::get_request_checksum() +{ + uint64_t checksum = 0; + checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length()); + checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_)); + const uint64_t op_checksum = arg_.query_.get_checksum(); + checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum)); + return checksum; +} + +void ObTableQuerySyncP::reset_ctx() +{ + result_row_count_ = 0; + query_session_ = nullptr; + table_service_ctx_ = nullptr; + ObTableApiProcessorBase::reset_ctx(); +} + +ObTableAPITransCb *ObTableQuerySyncP::new_callback(rpc::ObRequest *req) +{ + UNUSED(req); + return nullptr; +} + +int ObTableQuerySyncP::get_partition_ids(uint64_t table_id, ObIArray &part_ids) +{ + int ret = OB_SUCCESS; + uint64_t partition_id = arg_.partition_id_; + if (OB_INVALID_ID == partition_id) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("partitioned table not supported", K(ret), K(table_id)); + } else if (OB_FAIL(part_ids.push_back(partition_id))) { + LOG_WARN("failed to push back of partition id", K(ret)); + } + return ret; +} + +int ObTableQuerySyncP::get_session_id(uint64_t &real_sessid, uint64_t arg_sessid) +{ + int ret = OB_SUCCESS; + real_sessid = arg_sessid; + if (ObQueryOperationType::QUERY_START == arg_.query_type_) { + real_sessid = ObQuerySyncMgr::get_instance().generate_query_sessid(); + } + if (OB_UNLIKELY(real_sessid == ObQuerySyncMgr::INVALID_SESSION_ID)) { + ret = OB_ERR_UNKNOWN_SESSION_ID; + LOG_WARN("session id is invalid", K(ret), K(real_sessid), K(arg_.query_type_)); + } + return ret; +} + +int ObTableQuerySyncP::get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(sessid == ObQuerySyncMgr::INVALID_SESSION_ID)) { + ret = OB_ERR_UNKNOWN_SESSION_ID; + LOG_WARN("fail to get query session, session id is invalid", K(ret), K(sessid)); + } else if (ObQueryOperationType::QUERY_START == arg_.query_type_) { + query_session = ObQuerySyncMgr::get_instance().alloc_query_session(); + if (OB_ISNULL(query_session)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate ObTableQuerySyncSession", K(ret), K(sessid)); + } else if (OB_FAIL(ObQuerySyncMgr::get_instance().set_query_session(sessid, query_session))) { + LOG_WARN("fail to insert session to query map", K(ret), K(sessid)); + OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session); + } + } else if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session(sessid, query_session))) { + LOG_WARN("fail to get query session from query sync mgr", K(ret), K(sessid)); + } + + if (OB_SUCC(ret)) { + if (OB_ISNULL(query_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null query session", K(ret), K(sessid)); + } else { + // set trans from session + trans_desc_ptr_ = query_session->get_trans_desc(); + part_epoch_list_ptr_ = query_session->get_part_epoch_list(); + participants_ptr_ = query_session->get_part_leader_list(); + trans_state_ptr_ = query_session->get_trans_state(); + } + } + return ret; +} + +int ObTableQuerySyncP::query_scan_with_old_context(const int64_t timeout) +{ + int ret = OB_SUCCESS; + ObTableQueryResultIterator *result_iterator = query_session_->get_result_iterator(); + if (OB_ISNULL(result_iterator)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("query result iterator null", K(ret)); + } else { + ObTableQueryResult *query_result = nullptr; + result_iterator->set_one_result(&result_); // set result_ as container + if (ObTimeUtility::current_time() > timeout) { + ret = OB_TRANS_TIMEOUT; + LOG_WARN("exceed operatiton timeout", K(ret)); + } else if (OB_FAIL(result_iterator->get_next_result(query_result))) { + if (OB_ITER_END == ret) { + result_.is_end_ = true; // set scan end + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to scan result", K(ret)); + } + } else { + result_.is_end_ = !result_iterator->has_more_result(); + } + } + return ret; +} + +int ObTableQuerySyncP::query_scan_with_new_context( + ObTableQuerySyncSession *query_session, table::ObTableQueryResultIterator *result_iterator, const int64_t timeout) +{ + int ret = OB_SUCCESS; + ObTableQueryResult *query_result = nullptr; + if (ObTimeUtility::current_time() > timeout) { + ret = OB_TRANS_TIMEOUT; + LOG_WARN("exceed operatiton timeout", K(ret), K(rpc_pkt_)->get_timeout()); + } else if (OB_ISNULL(result_iterator)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null result iterator", K(ret)); + } else if (OB_FAIL(result_iterator->get_next_result(query_result))) { + if (OB_ITER_END == ret) { // scan to end + ret = OB_SUCCESS; + result_.is_end_ = true; + } + } else if (result_iterator->has_more_result()){ + result_.is_end_ = false; + query_session->deep_copy_select_columns(arg_.query_); + query_session->set_result_iterator(dynamic_cast(result_iterator)); + } else { + result_.is_end_ = true; + } + return ret; +} + +int ObTableQuerySyncP::query_scan_with_init() +{ + int ret = OB_SUCCESS; + table_service_ctx_ = query_session_->get_table_service_ctx(); + table_service_ctx_->scan_param_.is_thread_scope_ = false; + uint64_t &table_id = table_service_ctx_->param_table_id(); + table_service_ctx_->init_param( + timeout_ts_, + this, + query_session_->get_allocator(), + false /*ignored*/, + arg_.entity_type_, + table::ObBinlogRowImageType::MINIMAL /*ignored*/); + ObSEArray part_ids; + table::ObTableQueryResultIterator *result_iterator = nullptr; + const bool is_readonly = true; + const ObTableConsistencyLevel consistency_level = arg_.consistency_level_; + + if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { + LOG_WARN("failed to get table id", K(ret)); + } else if (OB_FAIL(get_partition_ids(table_id, part_ids))) { + LOG_WARN("failed to get part id", K(ret)); + } else if (1 != part_ids.count()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("should have one partition", K(ret), K(part_ids)); + } else if (FALSE_IT(table_service_ctx_->param_partition_id() = part_ids.at(0))) { + } else if (OB_FAIL(start_trans( + is_readonly, sql::stmt::T_SELECT, consistency_level, table_id, part_ids, timeout_ts_))) { + LOG_WARN("failed to start readonly transaction", K(ret)); + } else if (OB_FAIL(table_service_->execute_query(*table_service_ctx_, arg_.query_, result_, result_iterator))) { + if (OB_TRY_LOCK_ROW_CONFLICT != ret) { + LOG_WARN("failed to execute query", K(ret), K(table_id)); + } + } else if (OB_FAIL(query_scan_with_new_context(query_session_, result_iterator, timeout_ts_))) { + LOG_WARN("fail to query, need rollback", K(ret)); + } else { + audit_row_count_ = result_.get_row_count(); + result_.query_session_id_ = query_session_id_; + } + + return ret; +} + +int ObTableQuerySyncP::query_scan_without_init() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(query_session_->get_result_iterator()) || OB_ISNULL(query_session_->get_table_service_ctx())) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("unexpected null result iterator or table service context", K(ret)); + } else if (OB_FAIL(query_scan_with_old_context(timeout_ts_))) { + LOG_WARN("fail to query scan with old context", K(ret)); + } else { + audit_row_count_ = result_.get_row_count(); + result_.query_session_id_ = query_session_id_; + } + return ret; +} + +int ObTableQuerySyncP::process_query_start() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(query_scan_with_init())) { + LOG_WARN("failed to process query start scan with init", K(ret), K(query_session_id_)); + } else { + LOG_DEBUG("finish query start", K(ret), K(query_session_id_)); + } + return ret; +} + +int ObTableQuerySyncP::process_query_next() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(query_scan_without_init())) { + LOG_WARN("fail to query next scan without init", K(ret), K(query_session_id_)); + } else { + LOG_DEBUG("finish query next", K(ret), K(query_session_id_)); + } + return ret; +} + +int ObTableQuerySyncP::try_process() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_query_type())) { + LOG_WARN("query type is invalid", K(ret), K(arg_.query_type_)); + } else if (OB_FAIL(get_session_id(query_session_id_, arg_.query_session_id_))) { + LOG_WARN("fail to get query session id", K(ret), K(arg_.query_session_id_)); + } else if (OB_FAIL(get_query_session(query_session_id_, query_session_))) { + LOG_WARN("fail to get query session", K(ret), K(query_session_id_)); + } else if (FALSE_IT(table_service_ctx_ = query_session_->get_table_service_ctx())) { + } else if (FALSE_IT(timeout_ts_ = get_timeout_ts())) { + } else { + if (ObQueryOperationType::QUERY_START == arg_.query_type_) { + ret = process_query_start(); + } else if(ObQueryOperationType::QUERY_NEXT == arg_.query_type_) { + ret = process_query_next(); + } + if (OB_FAIL(ret)) { + LOG_WARN("query execution failed, need rollback", K(ret)); + int tmp_ret = ret; + if (OB_FAIL(destory_query_session(true))) { + LOG_WARN("faild to destory query session", K(ret)); + } + ret = tmp_ret; + } else if (result_.is_end_) { + if (OB_FAIL(destory_query_session(false))) { + LOG_WARN("fail to destory query session", K(ret), K(query_session_id_)); + } + } else { + query_session_->set_timestamp(ObTimeUtility::current_time()); + query_session_->set_in_use(false); + } + } + + stat_event_type_ = ObTableProccessType::TABLE_API_TABLE_QUERY_SYNC; // table querysync + return ret; +} + +// session.in_use_ must be true +int ObTableQuerySyncP::destory_query_session(bool need_rollback_trans) +{ + int ret = OB_SUCCESS; + ObQuerySyncMgr::get_instance().get_locker(query_session_id_).lock(); + if (OB_ISNULL(query_session_) || OB_ISNULL(table_service_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected null value", K(ret), K(query_session_), K(table_service_ctx_)); + } else if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session_map()->erase_refactored(query_session_id_))) { + LOG_WARN("fail to erase query session from query sync mgr", K(ret)); + } else { + table_service_ctx_->destroy_result_iterator(part_service_); + end_trans(need_rollback_trans, req_, timeout_ts_); + OB_DELETE(ObTableQuerySyncSession, ObModIds::TABLE_PROC, query_session_); + LOG_DEBUG("destory query session success", K(ret), K(query_session_id_), K(need_rollback_trans)); + } + ObQuerySyncMgr::get_instance().get_locker(query_session_id_).unlock(); + return ret; +} + +int ObTableQuerySyncP::check_query_type() +{ + int ret = OB_SUCCESS; + if (arg_.query_type_ != table::ObQueryOperationType::QUERY_START && + arg_.query_type_ != table::ObQueryOperationType::QUERY_NEXT){ + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid query operation type", K(ret), K(arg_.query_type_)); + } + return ret; +} diff --git a/src/observer/table/ob_table_query_sync_processor.h b/src/observer/table/ob_table_query_sync_processor.h new file mode 100644 index 0000000000..aa4471cb9f --- /dev/null +++ b/src/observer/table/ob_table_query_sync_processor.h @@ -0,0 +1,201 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef _OB_TABLE_QUERY_SYNC_PROCESSOR_H +#define _OB_TABLE_QUERY_SYNC_PROCESSOR_H 1 +#include "rpc/obrpc/ob_rpc_proxy.h" +#include "rpc/obrpc/ob_rpc_processor.h" +#include "share/table/ob_table_rpc_proxy.h" +#include "ob_table_rpc_processor.h" +#include "ob_table_service.h" + +namespace oceanbase +{ +namespace observer +{ + +/** + * ---------------------------------------- ObTableQuerySyncSession ---------------------------------------- + */ +class ObTableQuerySyncSession final +{ +public: + explicit ObTableQuerySyncSession() + : in_use_(true), + timestamp_(0), + query_(), + result_iterator_(nullptr), + allocator_(ObModIds::TABLE_PROC), + table_service_ctx_(allocator_), + iterator_mementity_(nullptr) + {} + ~ObTableQuerySyncSession(); + + void set_timestamp(int64_t timestamp) { timestamp_ = timestamp; } + void set_result_iterator(ObNormalTableQueryResultIterator* iter); + int deep_copy_select_columns(const ObTableQuery &query); + void set_in_use(bool in_use) {in_use_ = in_use;} + bool is_in_use() {return in_use_;} + int init(); + + int64_t get_timestamp() { return timestamp_; } + ObTableServiceQueryCtx *get_table_service_ctx() {return &table_service_ctx_;} + ObNormalTableQueryResultIterator *get_result_iterator() { return result_iterator_; } + ObArenaAllocator *get_allocator() {return &allocator_;} + +public: + ObPartitionLeaderArray* get_part_leader_list() {return &participants_;} + sql::TransState* get_trans_state() {return &trans_state_;} + transaction::ObTransDesc* get_trans_desc() {return &trans_desc_;} + transaction::ObPartitionEpochArray* get_part_epoch_list() {return &part_epoch_list_;} + +private: + bool in_use_; + uint64_t timestamp_; + ObTableQuery query_; // only select_columns is correct + ObNormalTableQueryResultIterator *result_iterator_; + ObArenaAllocator allocator_; + ObTableServiceQueryCtx table_service_ctx_; + lib::MemoryContext iterator_mementity_; + +private: + // txn control + ObPartitionLeaderArray participants_; + sql::TransState trans_state_; + transaction::ObTransDesc trans_desc_; + transaction::ObPartitionEpochArray part_epoch_list_; +}; + +/** + * ------------------------------------ ObQuerySyncSessionRecycle ------------------------------------ + */ +class ObQuerySyncSessionRecycle : public common::ObTimerTask +{ +public: + ObQuerySyncSessionRecycle(){} + virtual ~ObQuerySyncSessionRecycle(){} + virtual void runTimerTask(); + +private: + void query_session_recycle(); + DISALLOW_COPY_AND_ASSIGN(ObQuerySyncSessionRecycle); +}; + +/** + * -----------------------------------Singleton ObQuerySyncMgr ------------------------------------- + */ +class ObQuerySyncMgr final +{ + friend class ObTableQuerySyncP; + +public: + using ObQueryHashMap = + common::hash::ObHashMap; + using QuerySessionPair = common::hash::HashMapPair; + ~ObQuerySyncMgr() {} + static ObQuerySyncMgr &get_instance(); + + struct ObGetAllSessionIdOp { + explicit ObGetAllSessionIdOp(common::ObIArray& session_id_array) : session_id_array_(session_id_array) + {} + int operator()(QuerySessionPair& entry); + common::ObIArray& session_id_array_; + }; + +public: + int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_sess_ctx); + int set_query_session(uint64_t sessid, ObTableQuerySyncSession *query_sess_ctx); + void clean_timeout_query_session(); + +public: + ObQueryHashMap *get_query_session_map(); + ObTableQuerySyncSession *alloc_query_session(); + uint64_t generate_query_sessid(); + lib::ObMutex& get_locker(uint64_t sessid) { return locker_arr_[sessid % DEFAULT_LOCK_ARR_SIZE];} + +private: + int init(); + ObQuerySyncMgr(); + DISALLOW_COPY_AND_ASSIGN(ObQuerySyncMgr); + +private: + static const uint64_t INVALID_SESSION_ID = 0; + static const uint64_t DEFAULT_LOCK_ARR_SIZE = 2000; + static const uint64_t QUERY_SESSION_MAX_SIZE = 1000; + static const uint64_t QUERY_SESSION_TIMEOUT = 120 * 1000 * 1000; // 120s + static const uint64_t QUERY_SESSION_CLEAN_DELAY = 180 * 1000 * 1000; // 180s + +private: + static int64_t once_; // for creating singleton instance + static ObQuerySyncMgr *instance_; + int64_t session_id_; + ObQueryHashMap query_session_map_; + lib::ObMutex locker_arr_[DEFAULT_LOCK_ARR_SIZE]; + ObQuerySyncSessionRecycle query_session_recycle_; + common::ObTimer timer_; +}; + +/** + * -------------------------------------- ObTableQuerySyncP ---------------------------------------- +*/ +class ObTableQuerySyncP : + public ObTableRpcProcessor > +{ + typedef ObTableRpcProcessor> + ParentType; +public: + explicit ObTableQuerySyncP(const ObGlobalContext &gctx); + virtual ~ObTableQuerySyncP() {} + virtual int deserialize() override; + +protected: + virtual int check_arg() override; + virtual int try_process() override; + virtual void reset_ctx() override; + virtual void audit_on_finish() override; + virtual uint64_t get_request_checksum() override; + virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override; + +private: + int process_query_start(); + int process_query_next(); + int process_query_end(); + int destory_query_session(bool need_rollback_trans); + DISALLOW_COPY_AND_ASSIGN(ObTableQuerySyncP); + +private: + int get_partition_ids(uint64_t table_id, ObIArray &part_ids); + int get_session_id(uint64_t &real_sessid, uint64_t arg_sessid); + int get_query_session(uint64_t sessid, ObTableQuerySyncSession *&query_session); + int query_scan_with_init(); + int query_scan_without_init(); + int query_scan_with_old_context(const int64_t timeout); + int query_scan_with_new_context(ObTableQuerySyncSession * session_ctx, table::ObTableQueryResultIterator *result_iterator, + const int64_t timeout); + +private: + void set_trans_from_session(ObTableQuerySyncSession *query_session); + int check_query_type(); + +private: + ObTableServiceQueryCtx *table_service_ctx_; + int64_t result_row_count_; + uint64_t query_session_id_; + ObArenaAllocator allocator_; + ObTableQuerySyncSession *query_session_; + int64_t timeout_ts_; +}; + +} // end namespace observer +} // end namespace oceanbase + +#endif /* _OB_TABLE_QUERY_SYNC_PROCESSOR_H */ \ No newline at end of file diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index 85ae6f46b7..ed15eeaebc 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -246,14 +246,18 @@ ObTableApiProcessorBase::ObTableApiProcessorBase(const ObGlobalContext &gctx) consistency_level_(ObTableConsistencyLevel::STRONG) { need_audit_ = GCONF.enable_sql_audit; + participants_ptr_ = &participants_; + trans_state_ptr_ = &trans_state_; + trans_desc_ptr_ = &trans_desc_; + part_epoch_list_ptr_ = &part_epoch_list_; } void ObTableApiProcessorBase::reset_ctx() { - participants_.reset(); - trans_state_.reset(); - trans_desc_.reset(); - part_epoch_list_.reset(); + participants_ptr_->reset(); + trans_state_ptr_->reset(); + trans_desc_ptr_->reset(); + part_epoch_list_ptr_->reset(); did_async_end_trans_ = false; } @@ -442,7 +446,7 @@ int ObTableApiProcessorBase::start_trans(bool is_readonly, const sql::stmt::Stmt { int ret = OB_SUCCESS; NG_TRACE(T_start_trans_begin); - if (OB_FAIL(get_participants(table_id, part_ids, participants_))) { + if (OB_FAIL(get_participants(table_id, part_ids, *participants_ptr_))) { LOG_WARN("failed to get participants", K(ret)); } const uint64_t tenant_id = credential_.tenant_id_; @@ -462,11 +466,10 @@ int ObTableApiProcessorBase::start_trans(bool is_readonly, const sql::stmt::Stmt start_trans_param.set_type(transaction::ObTransType::TRANS_USER); start_trans_param.set_isolation(transaction::ObTransIsolation::READ_COMMITED); start_trans_param.set_autocommit(true); - // 设置事务一致性类型 start_trans_param.set_consistency_type(trans_consistency_type); - // 默认只要求语句级别快照 - // 如果要控制其他的语义,参见ObTransConsistencyType和ObTransReadSnapshotType定义 - // SQL层在ObSqlTransControl::decide_trans_read_interface_specs()来决定语义 + // use statement snapshot in default + // see ObTransConsistencyType and ObTransReadSnapshotType for more details + // you can also refer to ObSqlTransControl::decide_trans_read_interface_specs of SQL layer start_trans_param.set_read_snapshot_type(transaction::ObTransReadSnapshotType::STATEMENT_SNAPSHOT); start_trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); @@ -474,7 +477,7 @@ int ObTableApiProcessorBase::start_trans(bool is_readonly, const sql::stmt::Stmt const uint64_t proxy_session_id = 1; // ignore const uint64_t org_cluster_id = ObServerConfig::get_instance().cluster_id; - if (true == trans_state_.is_start_trans_executed()) { + if (true == trans_state_ptr_->is_start_trans_executed()) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("start_trans is executed", K(ret)); } else { @@ -483,16 +486,16 @@ int ObTableApiProcessorBase::start_trans(bool is_readonly, const sql::stmt::Stmt start_trans_param, trans_timeout_ts, session_id, - proxy_session_id, trans_desc_))) { + proxy_session_id, *trans_desc_ptr_))) { LOG_WARN("fail start trans", K(ret), K(start_trans_param)); } - trans_state_.set_start_trans_executed(OB_SUCC(ret)); + trans_state_ptr_->set_start_trans_executed(OB_SUCC(ret)); } } NG_TRACE(T_start_trans_end); // 2. start stmt if (OB_SUCC(ret)) { - transaction::ObStmtDesc &stmt_desc = trans_desc_.get_cur_stmt_desc(); + transaction::ObStmtDesc &stmt_desc = trans_desc_ptr_->get_cur_stmt_desc(); const bool is_sfu = false; stmt_desc.stmt_tenant_id_ = tenant_id; stmt_desc.phy_plan_type_ = sql::OB_PHY_PLAN_LOCAL; @@ -508,31 +511,31 @@ int ObTableApiProcessorBase::start_trans(bool is_readonly, const sql::stmt::Stmt const bool is_retry_sql = false; transaction::ObStmtParam stmt_param; ObPartitionArray unreachable_partitions; - if (true == trans_state_.is_start_stmt_executed()) { + if (true == trans_state_ptr_->is_start_stmt_executed()) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("start_stmt is executed", K(ret)); } else if (OB_FAIL(stmt_param.init(tenant_id, stmt_timeout_ts, is_retry_sql))) { LOG_WARN("ObStmtParam init error", K(ret), K(tenant_id), K(is_retry_sql)); } else if (OB_FAIL(part_service_->start_stmt(stmt_param, - trans_desc_, - participants_, unreachable_partitions))) { + *trans_desc_ptr_, + *participants_ptr_, unreachable_partitions))) { LOG_WARN("failed to start stmt", K(ret), K(stmt_param)); } - trans_state_.set_start_stmt_executed(OB_SUCC(ret)); + trans_state_ptr_->set_start_stmt_executed(OB_SUCC(ret)); } // 3. start participant NG_TRACE(T_start_part_begin); if (OB_SUCC(ret)) { - if (true == trans_state_.is_start_participant_executed()) { + if (true == trans_state_ptr_->is_start_participant_executed()) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("start_participant is executed", K(ret)); - } else if (OB_FAIL(part_service_->start_participant(trans_desc_, - participants_.get_partitions(), - part_epoch_list_))) { + } else if (OB_FAIL(part_service_->start_participant(*trans_desc_ptr_, + participants_ptr_->get_partitions(), + *part_epoch_list_ptr_))) { LOG_WARN("fail start participants", K(ret)); } - trans_state_.set_start_participant_executed(OB_SUCC(ret)); + trans_state_ptr_->set_start_participant_executed(OB_SUCC(ret)); } NG_TRACE(T_start_part_end); return ret; @@ -544,31 +547,31 @@ int ObTableApiProcessorBase::end_trans(bool is_rollback, rpc::ObRequest *req, in int ret = OB_SUCCESS; NG_TRACE(T_end_part_begin); int end_ret = OB_SUCCESS; - if (trans_state_.is_start_participant_executed() && trans_state_.is_start_participant_success()) { + if (trans_state_ptr_->is_start_participant_executed() && trans_state_ptr_->is_start_participant_success()) { if (OB_SUCCESS != (end_ret = part_service_->end_participant( - is_rollback, trans_desc_, participants_.get_partitions()))) { + is_rollback, *trans_desc_ptr_, participants_ptr_->get_partitions()))) { ret = (OB_SUCCESS == ret) ? end_ret : ret; LOG_WARN("fail to end participant", K(ret), K(end_ret), K(is_rollback)); } - trans_state_.clear_start_participant_executed(); + trans_state_ptr_->clear_start_participant_executed(); } NG_TRACE(T_end_part_end); - if (trans_state_.is_start_stmt_executed() && trans_state_.is_start_stmt_success()) { + if (trans_state_ptr_->is_start_stmt_executed() && trans_state_ptr_->is_start_stmt_success()) { is_rollback = (is_rollback || OB_SUCCESS != ret); bool is_incomplete = false; ObPartitionArray discard_partitions; if (OB_SUCCESS != (end_ret = part_service_->end_stmt( - is_rollback, is_incomplete, participants_.get_partitions(), - part_epoch_list_, discard_partitions, participants_, trans_desc_))) { + is_rollback, is_incomplete, participants_ptr_->get_partitions(), + *part_epoch_list_ptr_, discard_partitions, *participants_ptr_, *trans_desc_ptr_))) { ret = (OB_SUCCESS == ret) ? end_ret : ret; LOG_WARN("fail to end stmt", K(ret), K(end_ret), K(is_rollback)); } - trans_state_.clear_start_stmt_executed(); + trans_state_ptr_->clear_start_stmt_executed(); } NG_TRACE(T_end_trans_begin); - if (trans_state_.is_start_trans_executed() && trans_state_.is_start_trans_success()) { - if (trans_desc_.is_readonly() || use_sync) { + if (trans_state_ptr_->is_start_trans_executed() && trans_state_ptr_->is_start_trans_success()) { + if (trans_desc_ptr_->is_readonly() || use_sync) { ret = sync_end_trans(is_rollback, timeout_ts); } else { if (is_rollback) { @@ -577,9 +580,9 @@ int ObTableApiProcessorBase::end_trans(bool is_rollback, rpc::ObRequest *req, in ret = async_commit_trans(req, timeout_ts); } } - trans_state_.clear_start_trans_executed(); + trans_state_ptr_->clear_start_trans_executed(); } - trans_state_.reset(); + trans_state_ptr_->reset(); NG_TRACE(T_end_trans_end); return ret; } @@ -588,7 +591,7 @@ int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts { int ret = OB_SUCCESS; sql::ObEndTransSyncCallback callback; - if (OB_FAIL(callback.init(&trans_desc_, NULL))) { + if (OB_FAIL(callback.init(trans_desc_ptr_, NULL))) { LOG_WARN("fail init callback", K(ret)); } else { int wait_ret = OB_SUCCESS; @@ -598,13 +601,13 @@ int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts callback.handout(); const int64_t stmt_timeout_ts = timeout_ts; // whether end_trans is success or not, the callback MUST be invoked - if (OB_FAIL(part_service_->end_trans(is_rollback, trans_desc_, callback, stmt_timeout_ts))) { - LOG_WARN("fail end trans when session terminate", K(ret), K_(trans_desc), K(stmt_timeout_ts)); + if (OB_FAIL(part_service_->end_trans(is_rollback, *trans_desc_ptr_, callback, stmt_timeout_ts))) { + LOG_WARN("fail end trans when session terminate", K(ret), KP_(trans_desc_ptr), K(stmt_timeout_ts)); } // MUST wait here if (OB_UNLIKELY(OB_SUCCESS != (wait_ret = callback.wait()))) { LOG_WARN("sync end trans callback return an error!", K(ret), - K(wait_ret), K_(trans_desc), K(stmt_timeout_ts)); + K(wait_ret), KP_(trans_desc_ptr), K(stmt_timeout_ts)); } ret = OB_SUCCESS != ret? ret : wait_ret; bool has_called_txs_end_trans = false; @@ -640,8 +643,8 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim callback.handout(); const int64_t stmt_timeout_ts = timeout_ts; // whether end_trans is success or not, the callback MUST be invoked - if (OB_FAIL(part_service_->end_trans(is_rollback, trans_desc_, callback, stmt_timeout_ts))) { - LOG_WARN("fail end trans when session terminate", K(ret), K_(trans_desc), K(stmt_timeout_ts)); + if (OB_FAIL(part_service_->end_trans(is_rollback, *trans_desc_ptr_, callback, stmt_timeout_ts))) { + LOG_WARN("fail end trans when session terminate", K(ret), KP_(trans_desc_ptr), K(stmt_timeout_ts)); } // ignore the return code of end_trans THIS_WORKER.disable_retry(); // can NOT retry after set end trans async to be true @@ -904,6 +907,7 @@ template class oceanbase::observer::ObTableRpcProcessor >; template class oceanbase::observer::ObTableRpcProcessor >; template class oceanbase::observer::ObTableRpcProcessor >; +template class oceanbase::observer::ObTableRpcProcessor >; template int ObTableRpcProcessor::deserialize() @@ -1044,7 +1048,6 @@ ObHTableDeleteExecutor::ObHTableDeleteExecutor(common::ObArenaAllocator &alloc, mutations_result_.set_entity_factory(&entity_factory_); } -// @see https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Delete.html int ObHTableDeleteExecutor::htable_delete(const ObTableBatchOperation &batch_operation, int64_t &affected_rows) { int ret = OB_SUCCESS; diff --git a/src/observer/table/ob_table_rpc_processor.h b/src/observer/table/ob_table_rpc_processor.h index 89c256fee1..40de14721d 100644 --- a/src/observer/table/ob_table_rpc_processor.h +++ b/src/observer/table/ob_table_rpc_processor.h @@ -135,7 +135,7 @@ public: const common::ObIArray &part_ids, int64_t timeout_ts); int end_trans(bool is_rollback, rpc::ObRequest *req, int64_t timeout_ts, bool use_sync = false); inline bool did_async_end_trans() const { return did_async_end_trans_; } - inline transaction::ObTransDesc& get_trans_desc() { return trans_desc_; } + inline transaction::ObTransDesc& get_trans_desc() { return *trans_desc_ptr_; } int get_partition_by_rowkey(uint64_t table_id, const ObIArray &rowkeys, common::ObIArray &part_ids, common::ObIArray &rowkeys_per_part); @@ -190,7 +190,7 @@ protected: ObTableRetryPolicy retry_policy_; bool need_retry_in_queue_; int32_t retry_count_; -private: +protected: // trans control ObPartitionLeaderArray participants_; sql::TransState trans_state_; @@ -200,6 +200,10 @@ private: transaction::ObPartitionEpochArray part_epoch_list_; bool did_async_end_trans_; ObTableConsistencyLevel consistency_level_; + ObPartitionLeaderArray *participants_ptr_; + sql::TransState *trans_state_ptr_; + transaction::ObTransDesc *trans_desc_ptr_; + transaction::ObPartitionEpochArray *part_epoch_list_ptr_; }; template diff --git a/src/observer/table/ob_table_rpc_processor_util.h b/src/observer/table/ob_table_rpc_processor_util.h index 680ee90b78..20a1f4d49d 100644 --- a/src/observer/table/ob_table_rpc_processor_util.h +++ b/src/observer/table/ob_table_rpc_processor_util.h @@ -57,6 +57,7 @@ enum ObTableProccessType // query TABLE_API_TABLE_QUERY, TABLE_API_HBASE_QUERY, + TABLE_API_TABLE_QUERY_SYNC, TABLE_API_PROCESS_TYPE_MAX }; @@ -237,6 +238,13 @@ public: EVENT_ADD(HBASEAPI_SCAN_ROW, rows); SET_AUDIT_SQL_STRING(hbase_scan); break; + // table query sync + case ObTableProccessType::TABLE_API_TABLE_QUERY_SYNC: + EVENT_INC(TABLEAPI_QUERY_COUNT); + EVENT_ADD(TABLEAPI_QUERY_TIME, elapsed_us); + EVENT_ADD(TABLEAPI_QUERY_ROW, rows); + SET_AUDIT_SQL_STRING(table_query_sync); + break; default: SET_AUDIT_SQL_STRING(unknown); diff --git a/src/observer/table/ob_table_service.cpp b/src/observer/table/ob_table_service.cpp index 355828418c..42be5d4f98 100644 --- a/src/observer/table/ob_table_service.cpp +++ b/src/observer/table/ob_table_service.cpp @@ -111,8 +111,7 @@ int ObTableService::cons_properties_infos(const schema::ObTableSchema &table_sch const schema::ObColumnSchemaV2 *column_schema = NULL; ObExprResType column_type; const int64_t N = properties.count(); - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { const ObString &cname = properties.at(i); if (NULL == (column_schema = table_schema.get_column_schema(cname))) { ret = OB_ERR_COLUMN_NOT_FOUND; @@ -463,8 +462,7 @@ int ObTableService::do_multi_insert_or_update(ObTableServiceGetCtx &ctx, LOG_WARN("table id is invalid", K(ret), K(table_id)); } else { int64_t N = batch_operation.count(); - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { const ObTableOperation &table_operation = batch_operation.at(i); ObTableOperationResult op_result; ObITableEntity *result_entity = result.get_entity_factory()->alloc(); @@ -549,8 +547,7 @@ int ObTableService::fill_get_result( const int64_t rowkey_column_cnt = scan_result->get_rowkey_column_cnt(); const int64_t N = row->get_count(); ObObj cell_clone; - for (int64_t i = rowkey_column_cnt; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = rowkey_column_cnt; OB_SUCCESS == ret && i < N; ++i) { const ObString &name = properties.at(i - rowkey_column_cnt); const ObObj &cell = row->get_cell(i); if (OB_FAIL(ob_write_obj(*ctx.param_.allocator_, cell, cell_clone))) { @@ -611,8 +608,7 @@ int ObTableService::fill_multi_get_result( ObNewRow *row = NULL; const int64_t N = batch_operation.count(); bool did_get_next_row = true; - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { // left join const ObTableEntity &entity = static_cast(batch_operation.at(i).entity()); ObRowkey expected_key = const_cast(entity).get_rowkey(); @@ -647,8 +643,7 @@ int ObTableService::fill_multi_get_result( ObObj cell_clone; if (expected_key.simple_equal(the_key)) { const int64_t N = row->get_count(); - for (int64_t i = rowkey_size; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = rowkey_size; OB_SUCCESS == ret && i < N; ++i) { const ObString &name = scan_result->get_properties().at(i-rowkey_size); ObObj &cell = row->get_cell(i); if (OB_FAIL(ob_write_obj(*ctx.param_.allocator_, cell, cell_clone))) { @@ -717,8 +712,7 @@ int ObTableService::add_index_columns_if_missing(schema::ObSchemaGetterGuard &sc column_id = (*b)->get_column_id(); bool found = false; const int64_t N = column_ids.count(); - for (int64_t i = 0; !found && i < N; ++i) - { + for (int64_t i = 0; !found && i < N; ++i) { if (column_id == column_ids.at(i)) { found = true; } @@ -1386,8 +1380,7 @@ int ObTableService::fill_new_entity( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected rowkey size", K(ret), K(primary_key_size), KP(new_entity)); } else if (returning_rowkey) { - for (int64_t i = 0; i < primary_key_size && OB_SUCCESS == ret; ++i) - { + for (int64_t i = 0; i < primary_key_size && OB_SUCCESS == ret; ++i) { if (OB_FAIL(ob_write_obj(alloc, row.cells_[i], cell_clone))) { LOG_WARN("failed to copy obj", K(ret)); } else if (OB_FAIL(new_entity->add_rowkey_value(cell_clone))) { @@ -1397,8 +1390,7 @@ int ObTableService::fill_new_entity( } if (OB_SUCC(ret)) { const int64_t N = primary_key_size + properties.count(); - for (int64_t i = primary_key_size, j = 0; OB_SUCCESS == ret && i < N; ++i, ++j) - { + for (int64_t i = primary_key_size, j = 0; OB_SUCCESS == ret && i < N; ++i, ++j) { // deep copy property const ObString &name = properties.at(j); const ObObj &cell = row.cells_[i]; @@ -1592,8 +1584,7 @@ int ObTableService::batch_execute(ObTableServiceGetCtx &ctx, const ObTableBatchO int ret = OB_SUCCESS; int64_t N = batch_operation.count(); ObNewRowIterator *duplicate_row_iter = nullptr; - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { const ObTableOperation &table_operation = batch_operation.at(i); ObTableOperationResult op_result; ObITableEntity *result_entity = result.get_entity_factory()->alloc(); @@ -1658,8 +1649,7 @@ int ObTableService::cons_index_key_type(schema::ObSchemaGetterGuard &schema_guar ObExprResType column_type; const ObIndexInfo &index_key_info = index_schema->get_index_info(); const int64_t N = index_key_info.get_size(); - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { if (OB_FAIL(index_key_info.get_column_id(i, column_id))) { LOG_WARN("failed to get index column", K(ret), K(i)); } else if (OB_FAIL(schema_guard.get_column_schema(data_table_id, column_id, column_schema))) { @@ -1788,12 +1778,10 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx, const ObIArray &scan_ranges = query.get_scan_ranges(); int64_t N = scan_ranges.count(); // check obj type in ranges - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) // foreach range - { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { // foreach range const ObNewRange &range = scan_ranges.at(i); // check column type - for (int64_t j = 0; OB_SUCCESS == ret && j < 2; ++j) - { + for (int64_t j = 0; OB_SUCCESS == ret && j < 2; ++j) { const ObRowkey *p_key = nullptr; if (0 == j) { p_key = &range.get_start_key(); @@ -1808,8 +1796,7 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx, LOG_WARN("wrong rowkey size", K(ret), K(i), K(j), K(*p_key), K_(ctx.columns_type)); } else { const int64_t M = p_key->get_obj_cnt(); - for (int64_t k = 0; OB_SUCCESS == ret && k < M; ++k) - { + for (int64_t k = 0; OB_SUCCESS == ret && k < M; ++k) { ObObj &obj = const_cast(p_key->get_obj_ptr()[k]); if (obj.is_min_value() || obj.is_max_value()) { continue; @@ -1822,8 +1809,7 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx, if (OB_UNLIKELY(padding_num > 0)) { // index scan need fill primary key object ObNewRange index_key_range = range; - for (int64_t j = 0; OB_SUCCESS == ret && j < 2; ++j) - { + for (int64_t j = 0; OB_SUCCESS == ret && j < 2; ++j) { const ObRowkey *p_key = nullptr; if (0 == j) { p_key = &range.get_start_key(); @@ -1841,8 +1827,7 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx, LOG_WARN("no memory", K(ret)); } else { const ObObj *old_objs = p_key->get_obj_ptr(); - for (int64_t k = 0; k < old_objs_num; ++k) - { + for (int64_t k = 0; k < old_objs_num; ++k) { new_objs[k] = old_objs[k]; // shallow copy } // end for if (0 == j) { // padding for startkey @@ -1928,28 +1913,33 @@ int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx, int ObNormalTableQueryResultIterator::get_next_result(table::ObTableQueryResult *&next_result) { int ret = OB_SUCCESS; - if (is_first_result_) { - is_first_result_ = false; - if (0 != one_result_.get_property_count()) { + if (OB_ISNULL(one_result_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("one_result_ should not be null", K(ret)); + } else if (is_first_result_ || is_query_sync_) { + if (0 != one_result_->get_property_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("property should be empty", K(ret)); } const ObIArray &select_columns = query_->get_select_columns(); const int64_t N = select_columns.count(); - for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) - { - if (OB_FAIL(one_result_.add_property_name(select_columns.at(i)))) { + for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { + if (OB_FAIL(one_result_->add_property_name(select_columns.at(i)))) { LOG_WARN("failed to copy name", K(ret)); } } // end for - last_row_ = NULL; + + if (is_first_result_) { + last_row_ = NULL; + is_first_result_ = false; + } } else { - one_result_.reset_except_property(); + one_result_->reset_except_property(); } if (OB_SUCC(ret)) { if (NULL != last_row_) { - if (OB_FAIL(one_result_.add_row(*last_row_))) { + if (OB_FAIL(one_result_->add_row(*last_row_))) { LOG_WARN("failed to add row, ", K(ret)); } last_row_ = NULL; @@ -1957,11 +1947,11 @@ int ObNormalTableQueryResultIterator::get_next_result(table::ObTableQueryResult } if (OB_SUCC(ret)) { - next_result = &one_result_; + next_result = one_result_; ObNewRow *row = nullptr; while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) { LOG_DEBUG("[yzfdebug] scan result", "row", *row); - if (OB_FAIL(one_result_.add_row(*row))) { + if (OB_FAIL(one_result_->add_row(*row))) { if (OB_SIZE_OVERFLOW == ret) { ret = OB_SUCCESS; last_row_ = row; @@ -1969,7 +1959,7 @@ int ObNormalTableQueryResultIterator::get_next_result(table::ObTableQueryResult } else { LOG_WARN("failed to add row", K(ret)); } - } else if (one_result_.reach_batch_size_or_result_size(batch_size_, max_result_size_)) { + } else if (one_result_->reach_batch_size_or_result_size(batch_size_, max_result_size_)) { NG_TRACE(tag9); break; } else { @@ -1978,7 +1968,7 @@ int ObNormalTableQueryResultIterator::get_next_result(table::ObTableQueryResult } // end while if (OB_ITER_END == ret) { has_more_rows_ = false; - if (one_result_.get_row_count() > 0) { + if (one_result_->get_row_count() > 0) { ret = OB_SUCCESS; } } diff --git a/src/observer/table/ob_table_service.h b/src/observer/table/ob_table_service.h index d932580801..fca07ffd37 100644 --- a/src/observer/table/ob_table_service.h +++ b/src/observer/table/ob_table_service.h @@ -39,6 +39,7 @@ using table::ObTableBatchOperationResult; using table::ObITableBatchOperationResult; using table::ObTableQuery; using table::ObTableQueryResult; +using table::ObTableQuerySyncResult; class ObTableApiProcessorBase; class ObTableService; class ObTableApiRowIterator; @@ -126,7 +127,7 @@ class ObNormalTableQueryResultIterator: public table::ObTableQueryResultIterator { public: ObNormalTableQueryResultIterator(const ObTableQuery &query, table::ObTableQueryResult &one_result) - :one_result_(one_result), + :one_result_(&one_result), query_(&query), last_row_(NULL), batch_size_(query.get_batch()), @@ -134,15 +135,19 @@ public: static_cast(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))), scan_result_(NULL), is_first_result_(true), - has_more_rows_(true) + has_more_rows_(true), + is_query_sync_(false) { } virtual ~ObNormalTableQueryResultIterator() {} virtual int get_next_result(table::ObTableQueryResult *&one_result) override; virtual bool has_more_result() const override; void set_scan_result(common::ObNewRowIterator *scan_result) { scan_result_ = scan_result; } + virtual void set_one_result(ObTableQueryResult *result) {one_result_ = result;} + void set_query(const ObTableQuery *query) {query_ = query;} + void set_query_sync() { is_query_sync_ = true ; } private: - table::ObTableQueryResult &one_result_; + table::ObTableQueryResult *one_result_; const ObTableQuery *query_; common::ObNewRow *last_row_; int32_t batch_size_; @@ -150,6 +155,7 @@ private: common::ObNewRowIterator *scan_result_; bool is_first_result_; bool has_more_rows_; + bool is_query_sync_; }; struct ObTableServiceQueryCtx: public ObTableServiceGetCtx diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index 80c398cd6d..5184324d69 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -1546,3 +1546,7 @@ OB_SERIALIZE_MEMBER(ObTableQueryAndMutateResult, affected_rows_, affected_entity_); +OB_SERIALIZE_MEMBER((ObTableQuerySyncResult, ObTableQueryResult), + is_end_, + query_session_id_ +); diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index 6fa3840d3e..a7421bd87c 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -214,6 +214,12 @@ void ObTableEntityFactory::free_all() } } +enum class ObQueryOperationType : int { + QUERY_START = 0, + QUERY_NEXT = 1, + QUERY_MAX +}; + /// Table Operation Type struct ObTableOperationType { @@ -389,7 +395,6 @@ class ObNoRetry : public ObIRetryPolicy {}; /// consistency levels -/// @see https://www.atatech.org/articles/102030 enum class ObTableConsistencyLevel { STRONG = 0, @@ -571,7 +576,6 @@ public: int set_row_offset_per_column_family(int32_t offset); /// Apply the specified server-side filter when performing the Query. /// @param filter - a file string using the hbase filter language - /// @see the filter language at https://issues.apache.org/jira/browse/HBASE-4176 int set_filter(const ObString &filter); const ObIArray &get_columns() const { return select_column_qualifier_; } @@ -795,6 +799,20 @@ public: ObTableQueryResult affected_entity_; }; +class ObTableQuerySyncResult: public ObTableQueryResult +{ + OB_UNIS_VERSION(1); +public: + ObTableQuerySyncResult() + : is_end_(false), + query_session_id_(0) + {} + virtual ~ObTableQuerySyncResult() {} +public: + bool is_end_; + uint64_t query_session_id_; // from server gen +}; + } // end namespace table } // end namespace oceanbase diff --git a/src/share/table/ob_table_rpc_proxy.h b/src/share/table/ob_table_rpc_proxy.h index 2cdfce5419..c32d3c45d7 100644 --- a/src/share/table/ob_table_rpc_proxy.h +++ b/src/share/table/ob_table_rpc_proxy.h @@ -30,6 +30,7 @@ public: RPC_S(PR5 batch_execute, obrpc::OB_TABLE_API_BATCH_EXECUTE, (table::ObTableBatchOperationRequest), table::ObTableBatchOperationResult); RPC_SS(PR5 execute_query, obrpc::OB_TABLE_API_EXECUTE_QUERY, (table::ObTableQueryRequest), table::ObTableQueryResult); RPC_S(PR5 query_and_mutate, obrpc::OB_TABLE_API_QUERY_AND_MUTATE, (table::ObTableQueryAndMutateRequest), table::ObTableQueryAndMutateResult); + RPC_S(PR5 execute_query_sync, obrpc::OB_TABLE_API_EXECUTE_QUERY_SYNC, (table::ObTableQuerySyncRequest), table::ObTableQuerySyncResult); }; }; // end namespace obrpc diff --git a/src/share/table/ob_table_rpc_struct.cpp b/src/share/table/ob_table_rpc_struct.cpp index 04895e12a2..696f436683 100644 --- a/src/share/table/ob_table_rpc_struct.cpp +++ b/src/share/table/ob_table_rpc_struct.cpp @@ -84,4 +84,9 @@ OB_SERIALIZE_MEMBER(ObTableQueryAndMutateRequest, table_id_, partition_id_, entity_type_, - query_and_mutate_); \ No newline at end of file + query_and_mutate_); + +OB_SERIALIZE_MEMBER((ObTableQuerySyncRequest, ObTableQueryRequest), + query_session_id_, + query_type_ + ); diff --git a/src/share/table/ob_table_rpc_struct.h b/src/share/table/ob_table_rpc_struct.h index eebaa7a919..e94ff6f17f 100644 --- a/src/share/table/ob_table_rpc_struct.h +++ b/src/share/table/ob_table_rpc_struct.h @@ -176,7 +176,7 @@ public: //////////////////////////////////////////////////////////////// // @see PCODE_DEF(OB_TABLE_API_EXECUTE_QUERY, 0x1104) -class ObTableQueryRequest final +class ObTableQueryRequest { OB_UNIS_VERSION(1); public: @@ -187,7 +187,7 @@ public: consistency_level_(ObTableConsistencyLevel::STRONG) {} - TO_STRING_KV("credential", common::ObHexStringWrap(credential_), + VIRTUAL_TO_STRING_KV("credential", common::ObHexStringWrap(credential_), K_(table_name), K_(table_id), K_(partition_id), @@ -213,6 +213,7 @@ public: virtual ~ObTableQueryResultIterator() {} virtual int get_next_result(ObTableQueryResult *&one_result) = 0; virtual bool has_more_result() const = 0; + virtual void set_one_result(ObTableQueryResult *result){ UNUSED(result); } }; class ObTableQueryAndMutateRequest final @@ -241,6 +242,23 @@ public: ObBinlogRowImageType binlog_row_image_type_; }; +class ObTableQuerySyncRequest : public ObTableQueryRequest +{ + OB_UNIS_VERSION(1); +public: + ObTableQuerySyncRequest() + :query_session_id_(0), + query_type_(ObQueryOperationType::QUERY_MAX) + {} + virtual ~ObTableQuerySyncRequest(){} + INHERIT_TO_STRING_KV("ObTableQueryRequest", ObTableQueryRequest, + K_(query_session_id), + K_(query_type)); +public: + uint64_t query_session_id_; + ObQueryOperationType query_type_; +}; + } // end namespace table } // end namespace oceanbase diff --git a/src/storage/ob_dml_param.h b/src/storage/ob_dml_param.h index 2173600f4d..1c6bb10c58 100644 --- a/src/storage/ob_dml_param.h +++ b/src/storage/ob_dml_param.h @@ -137,7 +137,8 @@ public: block_cache_hit_rate_(0), ref_table_id_(common::OB_INVALID_ID), partition_guard_(NULL), - iterator_mementity_(nullptr) + iterator_mementity_(nullptr), + is_thread_scope_(true) {} explicit ObTableScanParam(transaction::ObTransDesc& trans_desc) : common::ObVTableScanParam(), @@ -152,7 +153,8 @@ public: block_cache_hit_rate_(0), ref_table_id_(common::OB_INVALID_ID), partition_guard_(NULL), - iterator_mementity_(nullptr) + iterator_mementity_(nullptr), + is_thread_scope_(true) {} virtual ~ObTableScanParam() {} @@ -171,6 +173,7 @@ public: uint64_t ref_table_id_; // main table id ObIPartitionGroupGuard* partition_guard_; lib::MemoryContext iterator_mementity_; + bool is_thread_scope_; OB_INLINE virtual bool is_valid() const { return (NULL != trans_desc_ && trans_desc_->is_valid_or_standalone_stmt() && ObVTableScanParam::is_valid()); diff --git a/src/storage/ob_i_store.cpp b/src/storage/ob_i_store.cpp index 56beb1b501..c80e99866e 100644 --- a/src/storage/ob_i_store.cpp +++ b/src/storage/ob_i_store.cpp @@ -991,8 +991,10 @@ int ObTableAccessContext::init(ObTableScanParam& scan_param, const ObStoreCtx& c param .set_mem_attr( scan_param.pkey_.get_tenant_id(), common::ObModIds::OB_TABLE_SCAN_ITER, common::ObCtxIds::DEFAULT_CTX_ID) - .set_properties(lib::USE_TL_PAGE_OPTIONAL) .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE); + if (scan_param.is_thread_scope_) { + param.set_properties(lib::USE_TL_PAGE_OPTIONAL); + } if (is_inited_) { ret = OB_INIT_TWICE; LOG_WARN("cannot init twice", K(ret)); diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index 2904a87f9f..525de25a50 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -963,6 +963,7 @@ struct ObStoreCtx { snapshot_info_.reset(); trans_table_guard_ = NULL; log_ts_ = INT64_MAX; + is_thread_scope_ = true; } int get_snapshot_info(transaction::ObTransSnapInfo& snap_info) const; bool is_valid() const @@ -975,7 +976,7 @@ struct ObStoreCtx { return common::is_valid_tenant_id(tenant_id_); } TO_STRING_KV(KP_(mem_ctx), KP_(warm_up_ctx), KP_(tables), K_(tenant_id), K_(trans_id), K_(is_sp_trans), K_(isolation), - K_(sql_no), K_(stmt_min_sql_no), K_(snapshot_info), KP_(trans_table_guard)); + K_(sql_no), K_(stmt_min_sql_no), K_(snapshot_info), KP_(trans_table_guard), K_(is_thread_scope)); memtable::ObIMemtableCtx* mem_ctx_; ObWarmUpCtx* warm_up_ctx_; @@ -992,6 +993,8 @@ struct ObStoreCtx { int64_t log_ts_; transaction::ObTransSnapInfo snapshot_info_; transaction::ObTransStateTableGuard* trans_table_guard_; + // storage access lifetime span won't cross thread + bool is_thread_scope_; }; struct ObTableAccessStat { diff --git a/src/storage/ob_partition_storage.cpp b/src/storage/ob_partition_storage.cpp index 2c39ef2d6f..da9db2c2ad 100644 --- a/src/storage/ob_partition_storage.cpp +++ b/src/storage/ob_partition_storage.cpp @@ -2186,14 +2186,14 @@ int ObPartitionStorage::check_new_row_shadow_pk( LOG_WARN( "index column count is invalid", K(ret), K(index_col_cnt), K(rowkey_cnt), K(spk_cnt), K(column_ids.count())); } else if (lib::is_mysql_mode()) { - // mysql兼容:只要unique index key中有null列,则需要填充shadow列 + // mysql compatible: if the unique index key contains a null column, fill the shadow column bool rowkey_has_null = false; for (int64_t i = 0; !rowkey_has_null && i < index_col_cnt; i++) { rowkey_has_null = new_row.get_cell(i).is_null(); } need_spk = rowkey_has_null; } else { - // oracle兼容:只有unique index key全为null列时,才需要填充shadow列 + // oracle compatible: the shadow column needs to be filled only when all unique index keys are null columns bool is_rowkey_all_null = true; for (int64_t i = 0; is_rowkey_all_null && i < index_col_cnt; i++) { is_rowkey_all_null = new_row.get_cell(i).is_null(); @@ -3468,6 +3468,7 @@ int ObPartitionStorage::table_scan( ObTableScanIterator* iter = NULL; ObStoreCtx ctx; ctx.cur_pkey_ = pkey_; + ctx.is_thread_scope_ = param.is_thread_scope_; // STORAGE_LOG(DEBUG, "begin table scan", K(param)); if (OB_UNLIKELY(!is_inited_)) { @@ -3536,6 +3537,7 @@ int ObPartitionStorage::table_scan( ObTableScanIterIterator* iter = NULL; ObStoreCtx ctx; ctx.cur_pkey_ = pkey_; + ctx.is_thread_scope_ = param.is_thread_scope_; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; diff --git a/src/storage/transaction/ob_trans_service.cpp b/src/storage/transaction/ob_trans_service.cpp index 084442bb26..bdac73e6d6 100644 --- a/src/storage/transaction/ob_trans_service.cpp +++ b/src/storage/transaction/ob_trans_service.cpp @@ -883,7 +883,7 @@ int ObTransService::acquire_sche_ctx_(ObTransDesc& trans_desc, ObScheTransCtx*& } else if (!trans_desc.is_nested_stmt()) { TRANS_LOG(WARN, "Non-nested statements should not create a temporary scheduler", K(trans_desc)); } else { - // 构建临时scheduler + // build temporary scheduler const ObTransID& trans_id = trans_desc.get_trans_id(); const bool for_replay = false; bool alloc = true; @@ -6210,12 +6210,12 @@ int ObTransService::init_memtable_ctx_(ObMemtableCtx* mem_ctx, const uint64_t te } int ObTransService::alloc_memtable_ctx_( - const ObPartitionKey& pkey, const bool is_fast_select, const uint64_t tenant_id, ObMemtableCtx*& ctx) + const ObPartitionKey& pkey, const bool tls_enable, const uint64_t tenant_id, ObMemtableCtx*& ctx) { int ret = OB_SUCCESS; ObMemtableCtx* memtable_ctx = NULL; - if (is_fast_select) { + if (!tls_enable) { memtable_ctx = static_cast(mt_ctx_factory_.alloc(tenant_id)); if (NULL != memtable_ctx) { memtable_ctx->set_self_alloc_ctx(true); @@ -6249,7 +6249,7 @@ int ObTransService::alloc_memtable_ctx_( } } else { ret = OB_ALLOCATE_MEMORY_FAILED; - TRANS_LOG(WARN, "allocate memory failed", K(ret), K(pkey), K(is_fast_select)); + TRANS_LOG(WARN, "allocate memory failed", K(ret), K(pkey), K(tls_enable)); } return ret; } @@ -6328,6 +6328,7 @@ int ObTransService::get_store_ctx(const ObTransDesc& trans_desc, const ObPartiti if (trans_desc.is_fast_select() || trans_desc.is_not_create_ctx_participant(pg_key, user_specified_snapshot)) { int64_t part_snapshot_version = ObTransVersion::INVALID_TRANS_VERSION; store_ctx.trans_id_ = trans_desc.get_trans_id(); + bool tls_enable = store_ctx.is_thread_scope_ && !trans_desc.is_fast_select(); if (OB_FAIL(handle_snapshot_for_read_only_participant_( trans_desc, pg_key, user_specified_snapshot, part_snapshot_version))) { TRANS_LOG(WARN, @@ -6336,7 +6337,7 @@ int ObTransService::get_store_ctx(const ObTransDesc& trans_desc, const ObPartiti K(trans_desc), K(pg_key), K(user_specified_snapshot)); - } else if (OB_FAIL(alloc_memtable_ctx_(pg_key, trans_desc.is_fast_select(), pg_key.get_tenant_id(), mt_ctx))) { + } else if (OB_FAIL(alloc_memtable_ctx_(pg_key, tls_enable, pg_key.get_tenant_id(), mt_ctx))) { TRANS_LOG(WARN, "allocate memory failed", K(ret), K(pg_key)); } else if (!mt_ctx->is_self_alloc_ctx() && OB_FAIL(init_memtable_ctx_(mt_ctx, pg_key.get_tenant_id()))) { TRANS_LOG(WARN, "init mem ctx failed", K(ret), K(pg_key)); diff --git a/src/storage/transaction/ob_trans_service.h b/src/storage/transaction/ob_trans_service.h index 5275898a3f..3361a0d21b 100644 --- a/src/storage/transaction/ob_trans_service.h +++ b/src/storage/transaction/ob_trans_service.h @@ -718,7 +718,7 @@ private: int convert_sp_trans_to_dist_trans_(ObTransDesc& trans_desc); int check_snapshot_for_start_stmt_(const ObTransDesc& trans_desc, const ObPartitionLeaderArray& pla); memtable::ObMemtableCtx* alloc_tc_memtable_ctx_(); - int alloc_memtable_ctx_(const common::ObPartitionKey& pg_key, const bool is_fast_select, const uint64_t tenant_id, + int alloc_memtable_ctx_(const common::ObPartitionKey& pg_key, const bool tls_enable /*thread local storage*/, const uint64_t tenant_id, memtable::ObMemtableCtx*& mt_ctx); void release_memtable_ctx_(const common::ObPartitionKey& pg_key, memtable::ObMemtableCtx* mt_ctx); int handle_start_stmt_request_(const ObTransMsg& msg);