diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h index a469410726..9c22f1df74 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet.h @@ -150,7 +150,7 @@ public: static const uint16_t DISABLE_DEBUGSYNC_FLAG = 1 << 12; static const uint16_t CONTEXT_FLAG = 1 << 11; static const uint16_t UNNEED_RESPONSE_FLAG = 1 << 10; - static const uint16_t BAD_ROUTING_FLAG = 1 << 9; + static const uint16_t REQUIRE_REROUTING_FLAG = 1 << 9; static const uint16_t ENABLE_RATELIMIT_FLAG = 1 << 8; static const uint16_t BACKGROUND_FLOW_FLAG = 1 << 7; static const uint16_t TRACE_INFO_FLAG = 1 << 6; @@ -269,8 +269,8 @@ public: inline void unset_stream(); inline void set_unneed_response(); inline bool unneed_response() const; - inline void set_bad_routing(); - inline bool bad_routing() const; + inline void set_require_rerouting(); + inline bool require_rerouting() const; inline bool ratelimit_enabled() const; inline void enable_ratelimit(); @@ -536,14 +536,14 @@ bool ObRpcPacket::unneed_response() const return hdr_.flags_ & ObRpcPacketHeader::UNNEED_RESPONSE_FLAG; } -void ObRpcPacket::set_bad_routing() +void ObRpcPacket::set_require_rerouting() { - hdr_.flags_ |= ObRpcPacketHeader::BAD_ROUTING_FLAG; + hdr_.flags_ |= ObRpcPacketHeader::REQUIRE_REROUTING_FLAG; } -bool ObRpcPacket::bad_routing() const +bool ObRpcPacket::require_rerouting() const { - return hdr_.flags_ & ObRpcPacketHeader::BAD_ROUTING_FLAG; + return hdr_.flags_ & ObRpcPacketHeader::REQUIRE_REROUTING_FLAG; } bool ObRpcPacket::ratelimit_enabled() const diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index cbfdc60532..753a0e12d0 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -859,7 +859,6 @@ PCODE_DEF(OB_TTL_RESPONSE, 0x1109) // PCODE_DEF(OB_TABLE_API_LOAD_GET_TRANS_STATUS_PEER, 0x1121) // not supported on 4.2 PCODE_DEF(OB_TABLE_API_EXECUTE_GROUP_UPDATE, 0x1122) - PCODE_DEF(OB_TABLE_API_DIRECT_LOAD, 0x1123) PCODE_DEF(OB_TABLE_API_MOVE, 0x1124) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp index e2691b76f7..fe0d625536 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp @@ -311,8 +311,8 @@ int ObRpcProcessorBase::do_response(const Response &rsp) packet->set_stream_last(); } } - if (rsp.bad_routing_) { - packet->set_bad_routing(); + if (rsp.require_rerouting_) { + packet->set_require_rerouting(); } packet->set_unis_version(0); packet->calc_checksum(); @@ -480,7 +480,7 @@ int ObRpcProcessorBase::part_response(const int retcode, bool is_last) if (OB_SUCC(ret)) { const int64_t sessid = sc_ ? sc_->sessid() : 0; ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket(); - Response rsp(sessid, is_stream_, is_last, bad_routing_, pkt); + Response rsp(sessid, is_stream_, is_last, require_rerouting_, pkt); if ((need_compressed) && NULL != tmp_buf) { // compress the serialized result buffer char *dst_buf = pkt_buf + sizeof(ObRpcPacket) + ez_rpc_header_size; @@ -528,7 +528,7 @@ int ObRpcProcessorBase::part_response_error(rpc::ObRequest* req, const int retco } else { ObRpcPacket pkt; pkt.set_content(tbuf, pos); - Response err_rsp(sessid, is_stream_, is_last, bad_routing_, &pkt); + Response err_rsp(sessid, is_stream_, is_last, require_rerouting_, &pkt); if (OB_FAIL(do_response(err_rsp))) { RPC_OBRPC_LOG(WARN, "response data fail", K(ret)); } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h index 378907e254..ce30c3eec6 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h @@ -34,7 +34,7 @@ public: public: ObRpcProcessorBase() : rpc_pkt_(NULL), sh_(NULL), sc_(NULL), is_stream_(false), is_stream_end_(false), - bad_routing_(false), preserve_recv_data_(false), preserved_buf_(NULL), + require_rerouting_(false), preserve_recv_data_(false), preserved_buf_(NULL), uncompressed_buf_(NULL), using_buffer_(NULL), send_timestamp_(0), pkt_size_(0), tenant_id_(0), result_compress_type_(common::INVALID_COMPRESSOR) {} @@ -82,12 +82,12 @@ protected: Response(int64_t sessid, bool is_stream, bool is_stream_last, - bool bad_routing, + bool require_rerouting, ObRpcPacket *pkt) : sessid_(sessid), is_stream_(is_stream), is_stream_last_(is_stream_last), - bad_routing_(bad_routing), + require_rerouting_(require_rerouting), pkt_(pkt) { } @@ -97,11 +97,11 @@ protected: bool is_stream_last_; // for routing check - bool bad_routing_; + bool require_rerouting_; ObRpcPacket *pkt_; - TO_STRING_KV(K(sessid_), K(is_stream_), K(is_stream_last_), K_(bad_routing)); + TO_STRING_KV(K_(sessid), K_(is_stream), K_(is_stream_last), K_(require_rerouting)); }; void reuse(); @@ -152,9 +152,8 @@ protected: // invalid, so the stream is end. bool is_stream_end_; - // If this request accidently should not been handled by this server, - // mark the flag so that the client can refresh location cache. - bool bad_routing_; + // For rerouting in obkv + bool require_rerouting_; // The flag marks received data must copy out from `easy buffer' // before we response packet back. Typical case is when we use diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 2e41316f34..adabca15e7 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -161,6 +161,7 @@ ob_set_subtarget(ob_server table table/ttl/ob_tenant_tablet_ttl_mgr.cpp table/ttl/ob_table_ttl_task.cpp table/ttl/ob_table_ttl_executor.cpp + table/ob_table_move_response.cpp ) ob_set_subtarget(ob_server table_load diff --git a/src/observer/table/ob_table_batch_execute_processor.cpp b/src/observer/table/ob_table_batch_execute_processor.cpp index 12ea21fe23..de46dd2a1e 100644 --- a/src/observer/table/ob_table_batch_execute_processor.cpp +++ b/src/observer/table/ob_table_batch_execute_processor.cpp @@ -21,6 +21,7 @@ #include "ob_htable_utils.h" #include "ob_table_cg_service.h" #include "observer/ob_req_time_service.h" +#include "ob_table_move_response.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -119,9 +120,9 @@ uint64_t ObTableBatchExecuteP::get_request_checksum() int ObTableBatchExecuteP::response(const int retcode) { int ret = OB_SUCCESS; - if (!need_retry_in_queue_ && !did_async_end_trans()) { + if (!need_retry_in_queue_ && !had_do_response()) { // For HKV table, modify the value of timetamp to be positive - if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) { + if (ObTableEntityType::ET_HKV == arg_.entity_type_) { const int64_t N = result_.count(); for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i) { @@ -133,7 +134,21 @@ int ObTableBatchExecuteP::response(const int retcode) } } // end for } - if (OB_SUCC(ret)) { + + // return the package even if negate_htable_timestamp fails + const obrpc::ObRpcPacket *rpc_pkt = &reinterpret_cast(req_->get_packet()); + if (is_require_rerouting_err(retcode) && rpc_pkt->require_rerouting()) { + // response rerouting packet + ObTableMoveResponseSender sender(req_, retcode); + if (OB_FAIL(sender.init(arg_.table_id_, arg_.tablet_id_, *gctx_.schema_service_))) { + LOG_WARN("fail to init move response sender", K(ret), K_(arg)); + } else if (OB_FAIL(sender.response())) { + LOG_WARN("fail to do move response", K(ret)); + } + if (OB_FAIL(ret)) { + ret = ObRpcProcessor::response(retcode); // do common response when do move response failed + } + } else { ret = ObRpcProcessor::response(retcode); } } @@ -161,8 +176,10 @@ int ObTableBatchExecuteP::try_process() LOG_WARN("no operation in the batch", K(ret)); } else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { LOG_WARN("failed to get table id", K(ret)); - } else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) { - LOG_WARN("fail to check index supported", K(ret), K(table_id)); + } else if (FALSE_IT(table_id_ = arg_.table_id_)) { + } else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) { + } else if (OB_FAIL(check_table_index_supported(table_id_, is_index_supported))) { + LOG_WARN("fail to check index supported", K(ret), K(table_id_)); } else if (OB_UNLIKELY(!is_index_supported)) { ret = OB_NOT_SUPPORTED; LOG_WARN("index type is not supported by table api", K(ret)); diff --git a/src/observer/table/ob_table_end_trans_cb.h b/src/observer/table/ob_table_end_trans_cb.h index 302e391486..f6d707ed3b 100644 --- a/src/observer/table/ob_table_end_trans_cb.h +++ b/src/observer/table/ob_table_end_trans_cb.h @@ -12,7 +12,7 @@ #ifndef _OB_TABLE_END_TRANS_CB_H #define _OB_TABLE_END_TRANS_CB_H 1 -#include "ob_rpc_async_response.h" +#include "ob_table_rpc_response_sender.h" #include "sql/ob_end_trans_callback.h" #include "share/table/ob_table.h" #include "ob_htable_lock_mgr.h" @@ -61,7 +61,7 @@ private: ObTableEntity result_entity_; common::ObArenaAllocator allocator_; ObTableOperationResult result_; - obrpc::ObRpcAsyncResponse response_sender_; + obrpc::ObTableRpcResponseSender response_sender_; }; class ObTableBatchExecuteEndTransCb: public ObTableAPITransCb @@ -88,7 +88,7 @@ private: common::ObArenaAllocator allocator_; table::ObTableEntityFactory entity_factory_; ObTableBatchOperationResult result_; - obrpc::ObRpcAsyncResponse response_sender_; + obrpc::ObTableRpcResponseSender response_sender_; ObTableOperationType::Type table_operation_type_; }; diff --git a/src/observer/table/ob_table_execute_processor.cpp b/src/observer/table/ob_table_execute_processor.cpp index 4640010188..ef395d9941 100644 --- a/src/observer/table/ob_table_execute_processor.cpp +++ b/src/observer/table/ob_table_execute_processor.cpp @@ -20,6 +20,7 @@ #include "ob_table_scan_executor.h" #include "ob_table_cg_service.h" #include "observer/ob_req_time_service.h" +#include "ob_table_move_response.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -206,10 +207,12 @@ int ObTableApiExecuteP::try_process() uint64_t table_id = arg_.table_id_; bool is_index_supported = true; const ObTableOperation &table_operation = arg_.table_operation_; - if (ObTableOperationType::GET != table_operation.type()) { - if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { - LOG_WARN("failed to get table id", K(ret)); - } else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) { + if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { + LOG_WARN("failed to get table id", K(ret)); + } else if (FALSE_IT(table_id_ = arg_.table_id_)) { + } else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) { + } else if (ObTableOperationType::GET != table_operation.type()) { + if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) { LOG_WARN("fail to check index supported", K(ret), K(table_id)); } } @@ -324,12 +327,26 @@ uint64_t ObTableApiExecuteP::get_request_checksum() int ObTableApiExecuteP::response(const int retcode) { int ret = OB_SUCCESS; - if (!need_retry_in_queue_ && !did_async_end_trans()) { + if (!need_retry_in_queue_ && !had_do_response()) { if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) { // @note modify the value of timestamp to be positive ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_); } - if (OB_SUCC(ret)) { + + // return the package even if negate_htable_timestamp fails + const ObRpcPacket *rpc_pkt = &reinterpret_cast(req_->get_packet()); + if (is_require_rerouting_err(retcode) && rpc_pkt->require_rerouting()) { + // response rerouting packet + ObTableMoveResponseSender sender(req_, retcode); + if (OB_FAIL(sender.init(arg_.table_id_, arg_.tablet_id_, *gctx_.schema_service_))) { + LOG_WARN("fail to init move response sender", K(ret), K_(arg)); + } else if (OB_FAIL(sender.response())) { + LOG_WARN("fail to do move response", K(ret)); + } + if (OB_FAIL(ret)) { + ret = ObRpcProcessor::response(retcode); // do common response when do move response failed + } + } else { ret = ObRpcProcessor::response(retcode); } } @@ -432,7 +449,7 @@ ObTableAPITransCb *ObTableApiExecuteP::new_callback(rpc::ObRequest *req) int ObTableApiExecuteP::before_response(int error_code) { // NOTE: when check_timeout failed, the result.entity_ is null, and serialize result cause coredump - if (!did_async_end_trans() && OB_ISNULL(result_.get_entity())) { + if (!had_do_response() && OB_ISNULL(result_.get_entity())) { result_.set_entity(result_entity_); } return ObTableRpcProcessor::before_response(error_code); diff --git a/src/observer/table/ob_table_move_response.cpp b/src/observer/table/ob_table_move_response.cpp new file mode 100644 index 0000000000..c011b413a8 --- /dev/null +++ b/src/observer/table/ob_table_move_response.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER +#include "ob_table_move_response.h" +#include "share/schema/ob_schema_getter_guard.h" +#include "observer/ob_server_struct.h" +#include "share/partition_table/ob_partition_location.h" + +using namespace oceanbase::observer; +using namespace oceanbase::common; +using namespace oceanbase::table; +//////////////////////////////////////////////////////////////// + +int ObTableMoveResponseSender::get_replica(const uint64_t table_id, + const common::ObTabletID &tablet_id, + table::ObTableMoveReplicaInfo &replica) +{ + int ret = OB_SUCCESS; + bool is_cache_hit = false; + int64_t expire_renew_time = INT64_MAX; //对于get接口,需要传一个最大值,表示需要拿最新的location cache,并让老的失效掉 + share::ObLSID ls_id; + share::ObLSLocation ls_loc; + share::ObLSReplicaLocation replica_loc; + + if (OB_FAIL(GCTX.location_service_->get(MTL_ID(), tablet_id, expire_renew_time, is_cache_hit, ls_id))) { + LOG_WARN("fail to get partition", K(ret), K(table_id), K(tablet_id)); + } else if (OB_FAIL(GCTX.location_service_->get(GCONF.cluster_id, MTL_ID(), ls_id, expire_renew_time, is_cache_hit, ls_loc))) { + LOG_WARN("fail get partition", K(ret), K(table_id), K(tablet_id)); + } else if (OB_FAIL(ls_loc.get_leader(replica_loc))) { + LOG_WARN("fail to get strong leader replica", K(ret)); + } else { + replica.server_ = replica_loc.get_server(); + replica.role_ = replica_loc.get_role(); + replica.replica_type_ = replica_loc.get_replica_type(); + replica.part_renew_time_ = ls_loc.get_renew_time(); + } + + return ret; +} + +int ObTableMoveResponseSender::init(const uint64_t table_id, + const common::ObTabletID &tablet_id, + share::schema::ObMultiVersionSchemaService &schema_service) +{ + int ret = OB_SUCCESS; + + ObTableMoveReplicaInfo &replica = result_.get_replica_info(); + if (OB_FAIL(get_replica(table_id, tablet_id, replica))) { + LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tablet_id)); + } else { + share::schema::ObSchemaGetterGuard schema_guard; + const share::schema::ObTableSchema *table_schema = nullptr; + if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + LOG_WARN("fail to get table schema", K(table_id), K(ret)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("NULL ptr", K(ret), K(table_id)); + } else { + replica.set_table_id(table_id); + replica.set_schema_version(table_schema->get_schema_version()); + replica.set_tablet_id(tablet_id); + + // set move pcode + response_sender_.set_pcode(obrpc::OB_TABLE_API_MOVE); + LOG_DEBUG("move response init successfully", K(replica)); + } + } + + return ret; +} diff --git a/src/observer/table/ob_table_move_response.h b/src/observer/table/ob_table_move_response.h new file mode 100644 index 0000000000..937a444fb4 --- /dev/null +++ b/src/observer/table/ob_table_move_response.h @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef _OB_TABLE_MOVE_RESPONSE_H +#define _OB_TABLE_MOVE_RESPONSE_H 1 +#include "ob_table_rpc_response_sender.h" +#include "share/table/ob_table.h" + +namespace oceanbase +{ +namespace observer +{ +class ObTableMoveResponseSender +{ +public: + ObTableMoveResponseSender(rpc::ObRequest *req, const int ret_code) + :response_sender_(req, result_, ret_code) + { + } + virtual ~ObTableMoveResponseSender() = default; + OB_INLINE table::ObTableMoveResult& get_result() { return result_; } + int init(const uint64_t table_id, + const common::ObTabletID &tablet_id, + share::schema::ObMultiVersionSchemaService &schema_service); + int response() { return response_sender_.response(common::OB_SUCCESS); }; +private: + int get_replica(const uint64_t table_id, + const common::ObTabletID &tablet_id, + table::ObTableMoveReplicaInfo &replica); +private: + table::ObTableMoveResult result_; + obrpc::ObTableRpcResponseSender response_sender_; +private: + DISALLOW_COPY_AND_ASSIGN(ObTableMoveResponseSender); +}; + +} // end namespace server +} // end namespace oceanbase + +#endif /* _OB_TABLE_MOVE_RESPONSE_H */ diff --git a/src/observer/table/ob_table_query_and_mutate_processor.cpp b/src/observer/table/ob_table_query_and_mutate_processor.cpp index 6095cba14e..f9e783e5de 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.cpp +++ b/src/observer/table/ob_table_query_and_mutate_processor.cpp @@ -955,19 +955,19 @@ int ObTableQueryAndMutateP::try_process() int64_t affected_rows = 0; const bool is_hkv = (ObTableEntityType::ET_HKV == arg_.entity_type_); ObHTableLockHandle *lock_handle = nullptr; - uint64_t table_id = OB_INVALID_ID; if (OB_FAIL(init_scan_tb_ctx(cache_guard))) { LOG_WARN("fail to init scan table ctx", K(ret)); - } else if (FALSE_IT(table_id = tb_ctx_.get_ref_table_id())) { + } else if (FALSE_IT(table_id_ = arg_.table_id_)) { + } else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) { } else if (is_hkv && OB_FAIL(HTABLE_LOCK_MGR->acquire_handle(lock_handle))) { LOG_WARN("fail to get htable lock handle", K(ret)); - } else if (is_hkv && OB_FAIL(ObHTableUtils::lock_htable_row(table_id, query, *lock_handle, ObHTableLockMode::EXCLUSIVE))) { - LOG_WARN("fail to lock htable row", K(ret), K(table_id), K(query)); + } else if (is_hkv && OB_FAIL(ObHTableUtils::lock_htable_row(table_id_, query, *lock_handle, ObHTableLockMode::EXCLUSIVE))) { + LOG_WARN("fail to lock htable row", K(ret), K_(table_id), K(query)); } else if (OB_FAIL(start_trans(false, /* is_readonly */ sql::stmt::T_UPDATE, consistency_level, - table_id, + table_id_, tb_ctx_.get_ls_id(), get_timeout_ts()))) { LOG_WARN("fail to start readonly transaction", K(ret)); diff --git a/src/observer/table/ob_table_query_processor.cpp b/src/observer/table/ob_table_query_processor.cpp index 3a9ba46dd3..0896494075 100644 --- a/src/observer/table/ob_table_query_processor.cpp +++ b/src/observer/table/ob_table_query_processor.cpp @@ -269,6 +269,8 @@ int ObTableQueryP::try_process() LOG_WARN("fail to get spec from cache", K(ret)); } else if (OB_FAIL(spec->create_executor(tb_ctx_, executor))) { LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_)); + } else if (FALSE_IT(table_id_ = arg_.table_id_)) { + } else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) { } else if (OB_FAIL(start_trans(true, /* is_readonly */ sql::stmt::T_SELECT, arg_.consistency_level_, diff --git a/src/observer/table/ob_table_query_sync_processor.cpp b/src/observer/table/ob_table_query_sync_processor.cpp index 024b6290f8..efd3464523 100644 --- a/src/observer/table/ob_table_query_sync_processor.cpp +++ b/src/observer/table/ob_table_query_sync_processor.cpp @@ -598,6 +598,8 @@ int ObTableQuerySyncP::try_process() } else if (OB_FAIL(get_query_session(query_session_id_, query_session_))) { LOG_WARN("fail to get query session", K(ret), K(query_session_id_)); } else if (FALSE_IT(timeout_ts_ = get_timeout_ts())) { + } else if (FALSE_IT(table_id_ = arg_.table_id_)) { + } else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) { } else { if (ObQueryOperationType::QUERY_START == arg_.query_type_) { ret = process_query_start(); diff --git a/src/observer/table/ob_table_query_sync_processor.h b/src/observer/table/ob_table_query_sync_processor.h index f5d816cb76..e39c758e72 100644 --- a/src/observer/table/ob_table_query_sync_processor.h +++ b/src/observer/table/ob_table_query_sync_processor.h @@ -186,7 +186,6 @@ public: explicit ObTableQuerySyncP(const ObGlobalContext &gctx); virtual ~ObTableQuerySyncP() {} virtual int deserialize() override; - protected: virtual int check_arg() override; virtual int try_process() override; @@ -195,6 +194,7 @@ protected: virtual uint64_t get_request_checksum() override; virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override; + private: int process_query_start(); int process_query_next(); diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index dc8e4fe5c4..bb75c6af53 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -29,6 +29,7 @@ #include "storage/tx/ob_trans_service.h" #include "ob_table_session_pool.h" #include "storage/tx/wrs/ob_weak_read_util.h" +#include "ob_table_move_response.h" using namespace oceanbase::observer; using namespace oceanbase::common; @@ -89,9 +90,9 @@ int ObTableLoginP::process() } } // whether the client should refresh location cache - if (OB_SUCCESS != ret && is_bad_routing_err(ret)) { - ObRpcProcessor::bad_routing_ = true; - LOG_WARN("[TABLE] login bad routing", K(ret), "bad_routing", ObRpcProcessor::bad_routing_); + if (OB_SUCCESS != ret && is_require_rerouting_err(ret)) { + ObRpcProcessor::require_rerouting_ = true; + LOG_WARN("[TABLE] login require rerouting", K(ret), "require_rerouting", ObRpcProcessor::require_rerouting_); } ObTenantStatEstGuard stat_guard(result_.tenant_id_); #ifndef NDEBUG @@ -223,7 +224,7 @@ ObTableApiProcessorBase::ObTableApiProcessorBase(const ObGlobalContext &gctx) need_retry_in_queue_(false), retry_count_(0), trans_desc_(NULL), - did_async_end_trans_(false) + had_do_response_(false) { need_audit_ = GCONF.enable_sql_audit; trans_state_ptr_ = &trans_state_; @@ -233,7 +234,7 @@ void ObTableApiProcessorBase::reset_ctx() { trans_state_ptr_->reset(); trans_desc_ = NULL; - did_async_end_trans_ = false; + had_do_response_ = false; } int ObTableApiProcessorBase::get_ls_id(const ObTabletID &tablet_id, ObLSID &ls_id) @@ -568,7 +569,7 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim callback.callback(ret); } // ignore the return code of end_trans - did_async_end_trans_ = true; // don't send response in this worker thread + had_do_response_ = true; // don't send response in this worker thread // @note the req_ may be freed, req_processor can not be read any more. // The req_has_wokenup_ MUST set to be true, otherwise req_processor will invoke req_->set_process_start_end_diff, cause memory core // @see ObReqProcessor::run() req_->set_process_start_end_diff(ObTimeUtility::current_time()); @@ -880,15 +881,15 @@ int ObTableRpcProcessor::process() if (OB_FAIL(process_with_retry(RpcProcessor::arg_.credential_, get_timeout_ts()))) { if (OB_NOT_NULL(request_string_)) { // request_string_ has been generated if enable sql_audit LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), K(request_string_)); - } else if (did_async_end_trans()) { // req_ may be freed + } else if (had_do_response()) { // req_ may be freed LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_)); } else { LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), "request", RpcProcessor::arg_); } - // whether the client should refresh location cache - if (is_bad_routing_err(ret)) { - ObRpcProcessor::bad_routing_ = true; - LOG_WARN("table_api request bad routing", K(ret), "bad_routing", ObRpcProcessor::bad_routing_); + // whether the client should refresh location cache and retry + if (is_require_rerouting_err(ret)) { + ObRpcProcessor::require_rerouting_ = true; + LOG_WARN("table_api request require rerouting", K(ret), "require_rerouting", ObRpcProcessor::require_rerouting_); } } return ret; @@ -910,12 +911,26 @@ int ObTableRpcProcessor::before_response(int error_code) } template -int ObTableRpcProcessor::response(const int retcode) +int ObTableRpcProcessor::response(int error_code) { int ret = OB_SUCCESS; // if it is waiting for retry in queue, the response can NOT be sent. - if (!need_retry_in_queue_) { - ret = RpcProcessor::response(retcode); + if (!need_retry_in_queue_ && !had_do_response()) { + const ObRpcPacket *rpc_pkt = &reinterpret_cast(this->req_->get_packet()); + if (is_require_rerouting_err(error_code) && rpc_pkt->require_rerouting()) { + // response rerouting packet + ObTableMoveResponseSender sender(this->req_, error_code); + if (OB_FAIL(sender.init(ObTableApiProcessorBase::table_id_, ObTableApiProcessorBase::tablet_id_, *gctx_.schema_service_))) { + LOG_WARN("fail to init move response sender", K(ret), K(RpcProcessor::arg_)); + } else if (OB_FAIL(sender.response())) { + LOG_WARN("fail to do move response", K(ret)); + } + if (OB_FAIL(ret)) { + ret = RpcProcessor::response(error_code); // do common response when do move response failed + } + } else { + ret = RpcProcessor::response(error_code); + } } return ret; } @@ -978,9 +993,9 @@ void ObTableRpcProcessor::generate_sql_id() snprintf(audit_record_.sql_id_, (int32_t)sizeof(audit_record_.sql_id_), "TABLEAPI0x%04Xvv%016lX", RpcProcessor::PCODE, checksum); } -bool oceanbase::observer::is_bad_routing_err(const int err) +bool oceanbase::observer::is_require_rerouting_err(const int err) { - // bad routing check : whether client should refresh location cache + // rerouting: whether client should refresh location cache and retry // Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp return (is_master_changed_error(err) || is_server_down_error(err) diff --git a/src/observer/table/ob_table_rpc_processor.h b/src/observer/table/ob_table_rpc_processor.h index 8c586a6acd..eeb37b87d6 100644 --- a/src/observer/table/ob_table_rpc_processor.h +++ b/src/observer/table/ob_table_rpc_processor.h @@ -134,11 +134,11 @@ public: const share::ObLSID &ls_id, int64_t timeout_ts); void release_read_trans(); - inline bool did_async_end_trans() const { return did_async_end_trans_; } inline transaction::ObTxDesc *get_trans_desc() { return trans_desc_; } int get_tablet_by_rowkey(uint64_t table_id, const ObIArray &rowkeys, ObIArray &tablet_ids); inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return tx_snapshot_; } + inline bool had_do_response() const { return had_do_response_; } int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const; protected: virtual int check_arg() = 0; @@ -181,11 +181,13 @@ protected: ObTableRetryPolicy retry_policy_; bool need_retry_in_queue_; int32_t retry_count_; + uint64_t table_id_; + ObTabletID tablet_id_; protected: // trans control sql::TransState trans_state_; transaction::ObTxDesc *trans_desc_; - bool did_async_end_trans_; + bool had_do_response_; // asynchronous transactions return packet in advance sql::TransState *trans_state_ptr_; transaction::ObTxReadSnapshot tx_snapshot_; }; diff --git a/src/observer/table/ob_table_rpc_processor_util.h b/src/observer/table/ob_table_rpc_processor_util.h index e44d69473d..350771664c 100644 --- a/src/observer/table/ob_table_rpc_processor_util.h +++ b/src/observer/table/ob_table_rpc_processor_util.h @@ -276,7 +276,7 @@ private: ~ObTableRpcProcessorUtil() = delete; }; -bool is_bad_routing_err(const int err); +bool is_require_rerouting_err(const int err); } // end namespace observer } // end namespace oceanbase diff --git a/src/observer/table/ob_rpc_async_response.h b/src/observer/table/ob_table_rpc_response_sender.h similarity index 77% rename from src/observer/table/ob_rpc_async_response.h rename to src/observer/table/ob_table_rpc_response_sender.h index b5b1aee043..e687f6419f 100644 --- a/src/observer/table/ob_rpc_async_response.h +++ b/src/observer/table/ob_table_rpc_response_sender.h @@ -10,8 +10,8 @@ * See the Mulan PubL v2 for more details. */ -#ifndef _OB_RPC_ASYNC_RESPONSE_H -#define _OB_RPC_ASYNC_RESPONSE_H 1 +#ifndef _OB_TABLE_RPC_RESPONSE_SENDER_H +#define _OB_TABLE_RPC_RESPONSE_SENDER_H 1 #include "rpc/ob_request.h" #include "rpc/obrpc/ob_rpc_packet.h" #include "rpc/frame/ob_req_processor.h" @@ -25,30 +25,40 @@ namespace obrpc { // this class is copied from ObRpcProcessor template -class ObRpcAsyncResponse +class ObTableRpcResponseSender { public: - ObRpcAsyncResponse(rpc::ObRequest *req, T &result) + ObTableRpcResponseSender(rpc::ObRequest *req, T &result, const int exec_ret_code = common::OB_SUCCESS) :req_(req), result_(result), + exec_ret_code_(exec_ret_code), + pcode_(ObRpcPacketCode::OB_INVALID_RPC_CODE), using_buffer_(NULL) - {} - virtual ~ObRpcAsyncResponse() = default; - int response(const int retcode); + { + if (OB_NOT_NULL(req_)) { + const ObRpcPacket *rpc_pkt = &reinterpret_cast(req_->get_packet()); + pcode_ = rpc_pkt->get_pcode(); + } + } + virtual ~ObTableRpcResponseSender() = default; + int response(const int cb_param); + OB_INLINE void set_pcode(ObRpcPacketCode pcode) { pcode_ = pcode; } private: int serialize(); - int do_response(ObRpcPacket *response_pkt, bool bad_routing); + int do_response(ObRpcPacket *response_pkt, bool require_rerouting); char *easy_alloc(int64_t size) const; // disallow copy - DISALLOW_COPY_AND_ASSIGN(ObRpcAsyncResponse); + DISALLOW_COPY_AND_ASSIGN(ObTableRpcResponseSender); private: rpc::ObRequest *req_; T &result_; + const int exec_ret_code_; // processor执行的返回码 + ObRpcPacketCode pcode_; common::ObDataBuffer *using_buffer_; }; template -char *ObRpcAsyncResponse::easy_alloc(int64_t size) const +char *ObTableRpcResponseSender::easy_alloc(int64_t size) const { void *buf = NULL; if (OB_ISNULL(req_)) { @@ -60,7 +70,7 @@ char *ObRpcAsyncResponse::easy_alloc(int64_t size) const } template -int ObRpcAsyncResponse::serialize() +int ObTableRpcResponseSender::serialize() { int ret = common::OB_SUCCESS; if (OB_ISNULL(using_buffer_)) { @@ -77,12 +87,15 @@ int ObRpcAsyncResponse::serialize() } template -int ObRpcAsyncResponse::do_response(ObRpcPacket *response_pkt, bool bad_routing) +int ObTableRpcResponseSender::do_response(ObRpcPacket *response_pkt, bool require_rerouting) { int ret = common::OB_SUCCESS; if (OB_ISNULL(req_)) { ret = common::OB_ERR_NULL_VALUE; RPC_OBRPC_LOG(WARN, "req is NULL", K(ret)); + } else if (ObRpcPacketCode::OB_INVALID_RPC_CODE == pcode_) { + ret = common::OB_ERR_UNEXPECTED; + RPC_OBRPC_LOG(WARN, "pcode is invalid", K(ret), K_(pcode), KPC_(req)); } else { const ObRpcPacket *rpc_pkt = &reinterpret_cast(req_->get_packet()); // TODO: fufeng, make force_destroy_second as a configure item @@ -94,12 +107,11 @@ int ObRpcAsyncResponse::do_response(ObRpcPacket *response_pkt, bool bad_routi // _OB_LOG(ERROR, "pkt process too long time: pkt_receive_ts=%ld, pkt_code=%d", rts, pcode); // } //copy packet into req buffer - ObRpcPacketCode pcode = rpc_pkt->get_pcode(); ObRpcPacket *packet = NULL; req_->set_trace_point(rpc::ObRequest::OB_EASY_REQUEST_RPC_ASYNC_RSP); if (OB_SUCC(ret)) { packet = response_pkt; - packet->set_pcode(pcode); + packet->set_pcode(pcode_); packet->set_chid(rpc_pkt->get_chid()); packet->set_session_id(0); // not stream packet->set_trace_id(rpc_pkt->get_trace_id()); @@ -115,8 +127,8 @@ int ObRpcAsyncResponse::do_response(ObRpcPacket *response_pkt, bool bad_routi packet->set_pop_process_start_diff(req_->get_pop_process_start_diff()); packet->set_process_start_end_diff(req_->get_process_start_end_diff()); packet->set_process_end_response_diff(req_->get_process_end_response_diff()); - if (bad_routing) { - packet->set_bad_routing(); + if (require_rerouting) { + packet->set_require_rerouting(); } packet->calc_checksum(); } @@ -127,9 +139,10 @@ int ObRpcAsyncResponse::do_response(ObRpcPacket *response_pkt, bool bad_routi } template -int ObRpcAsyncResponse::response(const int retcode) +int ObTableRpcResponseSender::response(const int cb_param) { int ret = common::OB_SUCCESS; + int retcode = (cb_param == OB_SUCCESS ? exec_ret_code_ : cb_param); if (OB_ISNULL(req_)) { ret = common::OB_INVALID_ARGUMENT; RPC_OBRPC_LOG(WARN, "invalid req, maybe stream rpc timeout", K(ret), K(retcode), @@ -202,21 +215,22 @@ int ObRpcAsyncResponse::response(const int retcode) using_buffer_->get_position()))) { RPC_OBRPC_LOG(WARN, "serialize result code fail", K(ret)); } else { - // also send result if process successfully. - if (common::OB_SUCCESS == retcode) { + // 1. send result if process successfully. + // 2. send result if require rerouting + if (common::OB_SUCCESS == retcode || observer::is_require_rerouting_err(retcode)) { if (OB_FAIL(serialize())) { RPC_OBRPC_LOG(WARN, "serialize result fail", K(ret)); } } } - // routing check : whether client should refresh location cache and retry + // rerouting: whether client should refresh location cache and retry // Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp - bool bad_routing = false; + bool require_rerouting = false; if (OB_SUCC(ret)) { - if (common::OB_SUCCESS != retcode && observer::is_bad_routing_err(retcode)) { - bad_routing = true; - RPC_OBRPC_LOG(WARN, "bad routing", K(retcode), K(bad_routing)); + if (common::OB_SUCCESS != retcode && observer::is_require_rerouting_err(retcode)) { + require_rerouting = true; + RPC_OBRPC_LOG(INFO, "require rerouting", K(retcode), K(require_rerouting)); } } @@ -224,8 +238,8 @@ int ObRpcAsyncResponse::response(const int retcode) ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket(); //Response rsp(sessid, is_stream_, is_last, pkt); pkt->set_content(using_buffer_->get_data(), using_buffer_->get_position()); - if (OB_FAIL(do_response(pkt, bad_routing))) { - RPC_OBRPC_LOG(WARN, "response data fail", K(ret)); + if (OB_FAIL(do_response(pkt, require_rerouting))) { + RPC_OBRPC_LOG(WARN, "response data fail", K(ret), K(retcode)); } } @@ -236,4 +250,4 @@ int ObRpcAsyncResponse::response(const int retcode) } // end namespace obrpc } // end namespace oceanbase -#endif /* _OB_RPC_ASYNC_RESPONSE_H */ +#endif /* _OB_TABLE_RPC_RESPONSE_SENDER_H */ diff --git a/src/observer/table/ob_table_scan_executor.cpp b/src/observer/table/ob_table_scan_executor.cpp index 2ce3e5318e..4c2d1baf95 100644 --- a/src/observer/table/ob_table_scan_executor.cpp +++ b/src/observer/table/ob_table_scan_executor.cpp @@ -118,7 +118,7 @@ int ObTableApiScanExecutor::prepare_das_task() scan_op = static_cast(task_op); scan_op->set_scan_ctdef(&scan_spec_.get_ctdef().scan_ctdef_); scan_op->set_scan_rtdef(&tsc_rtdef_.scan_rtdef_); - scan_op->set_can_part_retry(nullptr == tsc_rtdef_.scan_rtdef_.sample_info_); + scan_op->set_can_part_retry(false); tsc_rtdef_.scan_rtdef_.table_loc_->is_reading_ = true; if (scan_spec_.get_ctdef().lookup_ctdef_ != nullptr) { //is local index lookup, need to set the lookup ctdef to the das scan op diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index c82cf40296..96212a62c3 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -1716,4 +1716,21 @@ int ObTableAggregation::deep_copy(ObIAllocator &allocator, ObTableAggregation &d OB_SERIALIZE_MEMBER(ObTableAggregation, type_, - column_); \ No newline at end of file + column_); + +//////////////////////////////////////////////////////////////// +OB_SERIALIZE_MEMBER(ObTableMoveReplicaInfo, + table_id_, + schema_version_, + tablet_id_, + server_, + role_, + replica_type_, + part_renew_time_, + reserved_); + + + +OB_SERIALIZE_MEMBER(ObTableMoveResult, + replica_info_, + reserved_); \ No newline at end of file diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index 47c081d4c0..e24fd40996 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -21,12 +21,14 @@ #include "lib/container/ob_se_array.h" #include "lib/hash/ob_hashmap.h" #include "lib/list/ob_dlist.h" +#include "lib/net/ob_addr.h" #include "common/ob_common_types.h" #include "common/ob_range.h" #include "rpc/obrpc/ob_poc_rpc_server.h" #include "share/table/ob_table_ttl_common.h" #include "common/rowkey/ob_rowkey.h" +#include "common/ob_role.h" namespace oceanbase { namespace common @@ -957,6 +959,59 @@ public: common::ObString end_rowkey_; }; +struct ObTableMoveReplicaInfo final +{ + OB_UNIS_VERSION(1); +public: + ObTableMoveReplicaInfo() + : table_id_(common::OB_INVALID_ID), + schema_version_(common::OB_INVALID_VERSION), + tablet_id_(common::ObTabletID::INVALID_TABLET_ID), + role_(common::ObRole::INVALID_ROLE), + replica_type_(common::ObReplicaType::REPLICA_TYPE_MAX), + part_renew_time_(0), + reserved_(0) + {} + virtual ~ObTableMoveReplicaInfo() {} + TO_STRING_KV(K_(table_id), + K_(schema_version), + K_(part_renew_time), + K_(tablet_id), + K_(server), + K_(role), + K_(replica_type), + K_(reserved)); + OB_INLINE void set_table_id(const uint64_t table_id) { table_id_ = table_id; } + OB_INLINE void set_schema_version(const uint64_t schema_version) { schema_version_ = schema_version; } + OB_INLINE void set_tablet_id(const common::ObTabletID &tablet_id) { tablet_id_ = tablet_id; } +public: + uint64_t table_id_; + uint64_t schema_version_; + common::ObTabletID tablet_id_; + common::ObAddr server_; + common::ObRole role_; + common::ObReplicaType replica_type_; + int64_t part_renew_time_; + uint64_t reserved_; +}; + +class ObTableMoveResult final +{ + OB_UNIS_VERSION(1); +public: + ObTableMoveResult() + : reserved_(0) + {} + virtual ~ObTableMoveResult() {} + TO_STRING_KV(K_(replica_info), + K_(reserved)); + + OB_INLINE ObTableMoveReplicaInfo& get_replica_info() { return replica_info_; } +private: + ObTableMoveReplicaInfo replica_info_; + uint64_t reserved_; +}; + } // end namespace table } // end namespace oceanbase