[CP] [CP] Support OBKV Rerouting
This commit is contained in:
14
deps/oblib/src/rpc/obrpc/ob_rpc_packet.h
vendored
14
deps/oblib/src/rpc/obrpc/ob_rpc_packet.h
vendored
@ -150,7 +150,7 @@ public:
|
|||||||
static const uint16_t DISABLE_DEBUGSYNC_FLAG = 1 << 12;
|
static const uint16_t DISABLE_DEBUGSYNC_FLAG = 1 << 12;
|
||||||
static const uint16_t CONTEXT_FLAG = 1 << 11;
|
static const uint16_t CONTEXT_FLAG = 1 << 11;
|
||||||
static const uint16_t UNNEED_RESPONSE_FLAG = 1 << 10;
|
static const uint16_t UNNEED_RESPONSE_FLAG = 1 << 10;
|
||||||
static const uint16_t BAD_ROUTING_FLAG = 1 << 9;
|
static const uint16_t REQUIRE_REROUTING_FLAG = 1 << 9;
|
||||||
static const uint16_t ENABLE_RATELIMIT_FLAG = 1 << 8;
|
static const uint16_t ENABLE_RATELIMIT_FLAG = 1 << 8;
|
||||||
static const uint16_t BACKGROUND_FLOW_FLAG = 1 << 7;
|
static const uint16_t BACKGROUND_FLOW_FLAG = 1 << 7;
|
||||||
static const uint16_t TRACE_INFO_FLAG = 1 << 6;
|
static const uint16_t TRACE_INFO_FLAG = 1 << 6;
|
||||||
@ -269,8 +269,8 @@ public:
|
|||||||
inline void unset_stream();
|
inline void unset_stream();
|
||||||
inline void set_unneed_response();
|
inline void set_unneed_response();
|
||||||
inline bool unneed_response() const;
|
inline bool unneed_response() const;
|
||||||
inline void set_bad_routing();
|
inline void set_require_rerouting();
|
||||||
inline bool bad_routing() const;
|
inline bool require_rerouting() const;
|
||||||
|
|
||||||
inline bool ratelimit_enabled() const;
|
inline bool ratelimit_enabled() const;
|
||||||
inline void enable_ratelimit();
|
inline void enable_ratelimit();
|
||||||
@ -536,14 +536,14 @@ bool ObRpcPacket::unneed_response() const
|
|||||||
return hdr_.flags_ & ObRpcPacketHeader::UNNEED_RESPONSE_FLAG;
|
return hdr_.flags_ & ObRpcPacketHeader::UNNEED_RESPONSE_FLAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObRpcPacket::set_bad_routing()
|
void ObRpcPacket::set_require_rerouting()
|
||||||
{
|
{
|
||||||
hdr_.flags_ |= ObRpcPacketHeader::BAD_ROUTING_FLAG;
|
hdr_.flags_ |= ObRpcPacketHeader::REQUIRE_REROUTING_FLAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObRpcPacket::bad_routing() const
|
bool ObRpcPacket::require_rerouting() const
|
||||||
{
|
{
|
||||||
return hdr_.flags_ & ObRpcPacketHeader::BAD_ROUTING_FLAG;
|
return hdr_.flags_ & ObRpcPacketHeader::REQUIRE_REROUTING_FLAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObRpcPacket::ratelimit_enabled() const
|
bool ObRpcPacket::ratelimit_enabled() const
|
||||||
|
|||||||
@ -859,7 +859,6 @@ PCODE_DEF(OB_TTL_RESPONSE, 0x1109)
|
|||||||
// PCODE_DEF(OB_TABLE_API_LOAD_GET_TRANS_STATUS_PEER, 0x1121) // not supported on 4.2
|
// PCODE_DEF(OB_TABLE_API_LOAD_GET_TRANS_STATUS_PEER, 0x1121) // not supported on 4.2
|
||||||
|
|
||||||
PCODE_DEF(OB_TABLE_API_EXECUTE_GROUP_UPDATE, 0x1122)
|
PCODE_DEF(OB_TABLE_API_EXECUTE_GROUP_UPDATE, 0x1122)
|
||||||
|
|
||||||
PCODE_DEF(OB_TABLE_API_DIRECT_LOAD, 0x1123)
|
PCODE_DEF(OB_TABLE_API_DIRECT_LOAD, 0x1123)
|
||||||
PCODE_DEF(OB_TABLE_API_MOVE, 0x1124)
|
PCODE_DEF(OB_TABLE_API_MOVE, 0x1124)
|
||||||
|
|
||||||
|
|||||||
@ -311,8 +311,8 @@ int ObRpcProcessorBase::do_response(const Response &rsp)
|
|||||||
packet->set_stream_last();
|
packet->set_stream_last();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (rsp.bad_routing_) {
|
if (rsp.require_rerouting_) {
|
||||||
packet->set_bad_routing();
|
packet->set_require_rerouting();
|
||||||
}
|
}
|
||||||
packet->set_unis_version(0);
|
packet->set_unis_version(0);
|
||||||
packet->calc_checksum();
|
packet->calc_checksum();
|
||||||
@ -480,7 +480,7 @@ int ObRpcProcessorBase::part_response(const int retcode, bool is_last)
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
const int64_t sessid = sc_ ? sc_->sessid() : 0;
|
const int64_t sessid = sc_ ? sc_->sessid() : 0;
|
||||||
ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket();
|
ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket();
|
||||||
Response rsp(sessid, is_stream_, is_last, bad_routing_, pkt);
|
Response rsp(sessid, is_stream_, is_last, require_rerouting_, pkt);
|
||||||
if ((need_compressed) && NULL != tmp_buf) {
|
if ((need_compressed) && NULL != tmp_buf) {
|
||||||
// compress the serialized result buffer
|
// compress the serialized result buffer
|
||||||
char *dst_buf = pkt_buf + sizeof(ObRpcPacket) + ez_rpc_header_size;
|
char *dst_buf = pkt_buf + sizeof(ObRpcPacket) + ez_rpc_header_size;
|
||||||
@ -528,7 +528,7 @@ int ObRpcProcessorBase::part_response_error(rpc::ObRequest* req, const int retco
|
|||||||
} else {
|
} else {
|
||||||
ObRpcPacket pkt;
|
ObRpcPacket pkt;
|
||||||
pkt.set_content(tbuf, pos);
|
pkt.set_content(tbuf, pos);
|
||||||
Response err_rsp(sessid, is_stream_, is_last, bad_routing_, &pkt);
|
Response err_rsp(sessid, is_stream_, is_last, require_rerouting_, &pkt);
|
||||||
if (OB_FAIL(do_response(err_rsp))) {
|
if (OB_FAIL(do_response(err_rsp))) {
|
||||||
RPC_OBRPC_LOG(WARN, "response data fail", K(ret));
|
RPC_OBRPC_LOG(WARN, "response data fail", K(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
15
deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h
vendored
15
deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h
vendored
@ -34,7 +34,7 @@ public:
|
|||||||
public:
|
public:
|
||||||
ObRpcProcessorBase()
|
ObRpcProcessorBase()
|
||||||
: rpc_pkt_(NULL), sh_(NULL), sc_(NULL), is_stream_(false), is_stream_end_(false),
|
: rpc_pkt_(NULL), sh_(NULL), sc_(NULL), is_stream_(false), is_stream_end_(false),
|
||||||
bad_routing_(false), preserve_recv_data_(false), preserved_buf_(NULL),
|
require_rerouting_(false), preserve_recv_data_(false), preserved_buf_(NULL),
|
||||||
uncompressed_buf_(NULL), using_buffer_(NULL), send_timestamp_(0), pkt_size_(0), tenant_id_(0),
|
uncompressed_buf_(NULL), using_buffer_(NULL), send_timestamp_(0), pkt_size_(0), tenant_id_(0),
|
||||||
result_compress_type_(common::INVALID_COMPRESSOR)
|
result_compress_type_(common::INVALID_COMPRESSOR)
|
||||||
{}
|
{}
|
||||||
@ -82,12 +82,12 @@ protected:
|
|||||||
Response(int64_t sessid,
|
Response(int64_t sessid,
|
||||||
bool is_stream,
|
bool is_stream,
|
||||||
bool is_stream_last,
|
bool is_stream_last,
|
||||||
bool bad_routing,
|
bool require_rerouting,
|
||||||
ObRpcPacket *pkt)
|
ObRpcPacket *pkt)
|
||||||
: sessid_(sessid),
|
: sessid_(sessid),
|
||||||
is_stream_(is_stream),
|
is_stream_(is_stream),
|
||||||
is_stream_last_(is_stream_last),
|
is_stream_last_(is_stream_last),
|
||||||
bad_routing_(bad_routing),
|
require_rerouting_(require_rerouting),
|
||||||
pkt_(pkt)
|
pkt_(pkt)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
@ -97,11 +97,11 @@ protected:
|
|||||||
bool is_stream_last_;
|
bool is_stream_last_;
|
||||||
|
|
||||||
// for routing check
|
// for routing check
|
||||||
bool bad_routing_;
|
bool require_rerouting_;
|
||||||
|
|
||||||
ObRpcPacket *pkt_;
|
ObRpcPacket *pkt_;
|
||||||
|
|
||||||
TO_STRING_KV(K(sessid_), K(is_stream_), K(is_stream_last_), K_(bad_routing));
|
TO_STRING_KV(K_(sessid), K_(is_stream), K_(is_stream_last), K_(require_rerouting));
|
||||||
};
|
};
|
||||||
|
|
||||||
void reuse();
|
void reuse();
|
||||||
@ -152,9 +152,8 @@ protected:
|
|||||||
// invalid, so the stream is end.
|
// invalid, so the stream is end.
|
||||||
bool is_stream_end_;
|
bool is_stream_end_;
|
||||||
|
|
||||||
// If this request accidently should not been handled by this server,
|
// For rerouting in obkv
|
||||||
// mark the flag so that the client can refresh location cache.
|
bool require_rerouting_;
|
||||||
bool bad_routing_;
|
|
||||||
|
|
||||||
// The flag marks received data must copy out from `easy buffer'
|
// The flag marks received data must copy out from `easy buffer'
|
||||||
// before we response packet back. Typical case is when we use
|
// before we response packet back. Typical case is when we use
|
||||||
|
|||||||
@ -161,6 +161,7 @@ ob_set_subtarget(ob_server table
|
|||||||
table/ttl/ob_tenant_tablet_ttl_mgr.cpp
|
table/ttl/ob_tenant_tablet_ttl_mgr.cpp
|
||||||
table/ttl/ob_table_ttl_task.cpp
|
table/ttl/ob_table_ttl_task.cpp
|
||||||
table/ttl/ob_table_ttl_executor.cpp
|
table/ttl/ob_table_ttl_executor.cpp
|
||||||
|
table/ob_table_move_response.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
ob_set_subtarget(ob_server table_load
|
ob_set_subtarget(ob_server table_load
|
||||||
|
|||||||
@ -21,6 +21,7 @@
|
|||||||
#include "ob_htable_utils.h"
|
#include "ob_htable_utils.h"
|
||||||
#include "ob_table_cg_service.h"
|
#include "ob_table_cg_service.h"
|
||||||
#include "observer/ob_req_time_service.h"
|
#include "observer/ob_req_time_service.h"
|
||||||
|
#include "ob_table_move_response.h"
|
||||||
|
|
||||||
using namespace oceanbase::observer;
|
using namespace oceanbase::observer;
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -119,9 +120,9 @@ uint64_t ObTableBatchExecuteP::get_request_checksum()
|
|||||||
int ObTableBatchExecuteP::response(const int retcode)
|
int ObTableBatchExecuteP::response(const int retcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!need_retry_in_queue_ && !did_async_end_trans()) {
|
if (!need_retry_in_queue_ && !had_do_response()) {
|
||||||
// For HKV table, modify the value of timetamp to be positive
|
// For HKV table, modify the value of timetamp to be positive
|
||||||
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
if (ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
||||||
const int64_t N = result_.count();
|
const int64_t N = result_.count();
|
||||||
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
|
for (int64_t i = 0; OB_SUCCESS == ret && i < N; ++i)
|
||||||
{
|
{
|
||||||
@ -133,7 +134,21 @@ int ObTableBatchExecuteP::response(const int retcode)
|
|||||||
}
|
}
|
||||||
} // end for
|
} // end for
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
|
// return the package even if negate_htable_timestamp fails
|
||||||
|
const obrpc::ObRpcPacket *rpc_pkt = &reinterpret_cast<const obrpc::ObRpcPacket&>(req_->get_packet());
|
||||||
|
if (is_require_rerouting_err(retcode) && rpc_pkt->require_rerouting()) {
|
||||||
|
// response rerouting packet
|
||||||
|
ObTableMoveResponseSender sender(req_, retcode);
|
||||||
|
if (OB_FAIL(sender.init(arg_.table_id_, arg_.tablet_id_, *gctx_.schema_service_))) {
|
||||||
|
LOG_WARN("fail to init move response sender", K(ret), K_(arg));
|
||||||
|
} else if (OB_FAIL(sender.response())) {
|
||||||
|
LOG_WARN("fail to do move response", K(ret));
|
||||||
|
}
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
ret = ObRpcProcessor::response(retcode); // do common response when do move response failed
|
||||||
|
}
|
||||||
|
} else {
|
||||||
ret = ObRpcProcessor::response(retcode);
|
ret = ObRpcProcessor::response(retcode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,8 +176,10 @@ int ObTableBatchExecuteP::try_process()
|
|||||||
LOG_WARN("no operation in the batch", K(ret));
|
LOG_WARN("no operation in the batch", K(ret));
|
||||||
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
|
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
|
||||||
LOG_WARN("failed to get table id", K(ret));
|
LOG_WARN("failed to get table id", K(ret));
|
||||||
} else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
|
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||||
LOG_WARN("fail to check index supported", K(ret), K(table_id));
|
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||||
|
} else if (OB_FAIL(check_table_index_supported(table_id_, is_index_supported))) {
|
||||||
|
LOG_WARN("fail to check index supported", K(ret), K(table_id_));
|
||||||
} else if (OB_UNLIKELY(!is_index_supported)) {
|
} else if (OB_UNLIKELY(!is_index_supported)) {
|
||||||
ret = OB_NOT_SUPPORTED;
|
ret = OB_NOT_SUPPORTED;
|
||||||
LOG_WARN("index type is not supported by table api", K(ret));
|
LOG_WARN("index type is not supported by table api", K(ret));
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
|
|
||||||
#ifndef _OB_TABLE_END_TRANS_CB_H
|
#ifndef _OB_TABLE_END_TRANS_CB_H
|
||||||
#define _OB_TABLE_END_TRANS_CB_H 1
|
#define _OB_TABLE_END_TRANS_CB_H 1
|
||||||
#include "ob_rpc_async_response.h"
|
#include "ob_table_rpc_response_sender.h"
|
||||||
#include "sql/ob_end_trans_callback.h"
|
#include "sql/ob_end_trans_callback.h"
|
||||||
#include "share/table/ob_table.h"
|
#include "share/table/ob_table.h"
|
||||||
#include "ob_htable_lock_mgr.h"
|
#include "ob_htable_lock_mgr.h"
|
||||||
@ -61,7 +61,7 @@ private:
|
|||||||
ObTableEntity result_entity_;
|
ObTableEntity result_entity_;
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
ObTableOperationResult result_;
|
ObTableOperationResult result_;
|
||||||
obrpc::ObRpcAsyncResponse<ObTableOperationResult> response_sender_;
|
obrpc::ObTableRpcResponseSender<ObTableOperationResult> response_sender_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObTableBatchExecuteEndTransCb: public ObTableAPITransCb
|
class ObTableBatchExecuteEndTransCb: public ObTableAPITransCb
|
||||||
@ -88,7 +88,7 @@ private:
|
|||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
table::ObTableEntityFactory<table::ObTableEntity> entity_factory_;
|
table::ObTableEntityFactory<table::ObTableEntity> entity_factory_;
|
||||||
ObTableBatchOperationResult result_;
|
ObTableBatchOperationResult result_;
|
||||||
obrpc::ObRpcAsyncResponse<ObTableBatchOperationResult> response_sender_;
|
obrpc::ObTableRpcResponseSender<ObTableBatchOperationResult> response_sender_;
|
||||||
ObTableOperationType::Type table_operation_type_;
|
ObTableOperationType::Type table_operation_type_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,7 @@
|
|||||||
#include "ob_table_scan_executor.h"
|
#include "ob_table_scan_executor.h"
|
||||||
#include "ob_table_cg_service.h"
|
#include "ob_table_cg_service.h"
|
||||||
#include "observer/ob_req_time_service.h"
|
#include "observer/ob_req_time_service.h"
|
||||||
|
#include "ob_table_move_response.h"
|
||||||
|
|
||||||
using namespace oceanbase::observer;
|
using namespace oceanbase::observer;
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -206,10 +207,12 @@ int ObTableApiExecuteP::try_process()
|
|||||||
uint64_t table_id = arg_.table_id_;
|
uint64_t table_id = arg_.table_id_;
|
||||||
bool is_index_supported = true;
|
bool is_index_supported = true;
|
||||||
const ObTableOperation &table_operation = arg_.table_operation_;
|
const ObTableOperation &table_operation = arg_.table_operation_;
|
||||||
if (ObTableOperationType::GET != table_operation.type()) {
|
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
|
||||||
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
|
LOG_WARN("failed to get table id", K(ret));
|
||||||
LOG_WARN("failed to get table id", K(ret));
|
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||||
} else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
|
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||||
|
} else if (ObTableOperationType::GET != table_operation.type()) {
|
||||||
|
if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
|
||||||
LOG_WARN("fail to check index supported", K(ret), K(table_id));
|
LOG_WARN("fail to check index supported", K(ret), K(table_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -324,12 +327,26 @@ uint64_t ObTableApiExecuteP::get_request_checksum()
|
|||||||
int ObTableApiExecuteP::response(const int retcode)
|
int ObTableApiExecuteP::response(const int retcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!need_retry_in_queue_ && !did_async_end_trans()) {
|
if (!need_retry_in_queue_ && !had_do_response()) {
|
||||||
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
||||||
// @note modify the value of timestamp to be positive
|
// @note modify the value of timestamp to be positive
|
||||||
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_);
|
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
|
// return the package even if negate_htable_timestamp fails
|
||||||
|
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(req_->get_packet());
|
||||||
|
if (is_require_rerouting_err(retcode) && rpc_pkt->require_rerouting()) {
|
||||||
|
// response rerouting packet
|
||||||
|
ObTableMoveResponseSender sender(req_, retcode);
|
||||||
|
if (OB_FAIL(sender.init(arg_.table_id_, arg_.tablet_id_, *gctx_.schema_service_))) {
|
||||||
|
LOG_WARN("fail to init move response sender", K(ret), K_(arg));
|
||||||
|
} else if (OB_FAIL(sender.response())) {
|
||||||
|
LOG_WARN("fail to do move response", K(ret));
|
||||||
|
}
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
ret = ObRpcProcessor::response(retcode); // do common response when do move response failed
|
||||||
|
}
|
||||||
|
} else {
|
||||||
ret = ObRpcProcessor::response(retcode);
|
ret = ObRpcProcessor::response(retcode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -432,7 +449,7 @@ ObTableAPITransCb *ObTableApiExecuteP::new_callback(rpc::ObRequest *req)
|
|||||||
int ObTableApiExecuteP::before_response(int error_code)
|
int ObTableApiExecuteP::before_response(int error_code)
|
||||||
{
|
{
|
||||||
// NOTE: when check_timeout failed, the result.entity_ is null, and serialize result cause coredump
|
// NOTE: when check_timeout failed, the result.entity_ is null, and serialize result cause coredump
|
||||||
if (!did_async_end_trans() && OB_ISNULL(result_.get_entity())) {
|
if (!had_do_response() && OB_ISNULL(result_.get_entity())) {
|
||||||
result_.set_entity(result_entity_);
|
result_.set_entity(result_entity_);
|
||||||
}
|
}
|
||||||
return ObTableRpcProcessor::before_response(error_code);
|
return ObTableRpcProcessor::before_response(error_code);
|
||||||
|
|||||||
82
src/observer/table/ob_table_move_response.cpp
Normal file
82
src/observer/table/ob_table_move_response.cpp
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2023 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define USING_LOG_PREFIX SERVER
|
||||||
|
#include "ob_table_move_response.h"
|
||||||
|
#include "share/schema/ob_schema_getter_guard.h"
|
||||||
|
#include "observer/ob_server_struct.h"
|
||||||
|
#include "share/partition_table/ob_partition_location.h"
|
||||||
|
|
||||||
|
using namespace oceanbase::observer;
|
||||||
|
using namespace oceanbase::common;
|
||||||
|
using namespace oceanbase::table;
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
int ObTableMoveResponseSender::get_replica(const uint64_t table_id,
|
||||||
|
const common::ObTabletID &tablet_id,
|
||||||
|
table::ObTableMoveReplicaInfo &replica)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool is_cache_hit = false;
|
||||||
|
int64_t expire_renew_time = INT64_MAX; //对于get接口,需要传一个最大值,表示需要拿最新的location cache,并让老的失效掉
|
||||||
|
share::ObLSID ls_id;
|
||||||
|
share::ObLSLocation ls_loc;
|
||||||
|
share::ObLSReplicaLocation replica_loc;
|
||||||
|
|
||||||
|
if (OB_FAIL(GCTX.location_service_->get(MTL_ID(), tablet_id, expire_renew_time, is_cache_hit, ls_id))) {
|
||||||
|
LOG_WARN("fail to get partition", K(ret), K(table_id), K(tablet_id));
|
||||||
|
} else if (OB_FAIL(GCTX.location_service_->get(GCONF.cluster_id, MTL_ID(), ls_id, expire_renew_time, is_cache_hit, ls_loc))) {
|
||||||
|
LOG_WARN("fail get partition", K(ret), K(table_id), K(tablet_id));
|
||||||
|
} else if (OB_FAIL(ls_loc.get_leader(replica_loc))) {
|
||||||
|
LOG_WARN("fail to get strong leader replica", K(ret));
|
||||||
|
} else {
|
||||||
|
replica.server_ = replica_loc.get_server();
|
||||||
|
replica.role_ = replica_loc.get_role();
|
||||||
|
replica.replica_type_ = replica_loc.get_replica_type();
|
||||||
|
replica.part_renew_time_ = ls_loc.get_renew_time();
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTableMoveResponseSender::init(const uint64_t table_id,
|
||||||
|
const common::ObTabletID &tablet_id,
|
||||||
|
share::schema::ObMultiVersionSchemaService &schema_service)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
ObTableMoveReplicaInfo &replica = result_.get_replica_info();
|
||||||
|
if (OB_FAIL(get_replica(table_id, tablet_id, replica))) {
|
||||||
|
LOG_WARN("fail to get partition info", K(ret), K(table_id), K(tablet_id));
|
||||||
|
} else {
|
||||||
|
share::schema::ObSchemaGetterGuard schema_guard;
|
||||||
|
const share::schema::ObTableSchema *table_schema = nullptr;
|
||||||
|
if (OB_FAIL(schema_service.get_tenant_schema_guard(MTL_ID(), schema_guard))) {
|
||||||
|
LOG_WARN("fail to get schema guard", K(ret));
|
||||||
|
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
|
||||||
|
LOG_WARN("fail to get table schema", K(table_id), K(ret));
|
||||||
|
} else if (OB_ISNULL(table_schema)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_ERROR("NULL ptr", K(ret), K(table_id));
|
||||||
|
} else {
|
||||||
|
replica.set_table_id(table_id);
|
||||||
|
replica.set_schema_version(table_schema->get_schema_version());
|
||||||
|
replica.set_tablet_id(tablet_id);
|
||||||
|
|
||||||
|
// set move pcode
|
||||||
|
response_sender_.set_pcode(obrpc::OB_TABLE_API_MOVE);
|
||||||
|
LOG_DEBUG("move response init successfully", K(replica));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
49
src/observer/table/ob_table_move_response.h
Normal file
49
src/observer/table/ob_table_move_response.h
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2023 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _OB_TABLE_MOVE_RESPONSE_H
|
||||||
|
#define _OB_TABLE_MOVE_RESPONSE_H 1
|
||||||
|
#include "ob_table_rpc_response_sender.h"
|
||||||
|
#include "share/table/ob_table.h"
|
||||||
|
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace observer
|
||||||
|
{
|
||||||
|
class ObTableMoveResponseSender
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ObTableMoveResponseSender(rpc::ObRequest *req, const int ret_code)
|
||||||
|
:response_sender_(req, result_, ret_code)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
virtual ~ObTableMoveResponseSender() = default;
|
||||||
|
OB_INLINE table::ObTableMoveResult& get_result() { return result_; }
|
||||||
|
int init(const uint64_t table_id,
|
||||||
|
const common::ObTabletID &tablet_id,
|
||||||
|
share::schema::ObMultiVersionSchemaService &schema_service);
|
||||||
|
int response() { return response_sender_.response(common::OB_SUCCESS); };
|
||||||
|
private:
|
||||||
|
int get_replica(const uint64_t table_id,
|
||||||
|
const common::ObTabletID &tablet_id,
|
||||||
|
table::ObTableMoveReplicaInfo &replica);
|
||||||
|
private:
|
||||||
|
table::ObTableMoveResult result_;
|
||||||
|
obrpc::ObTableRpcResponseSender<table::ObTableMoveResult> response_sender_;
|
||||||
|
private:
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObTableMoveResponseSender);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // end namespace server
|
||||||
|
} // end namespace oceanbase
|
||||||
|
|
||||||
|
#endif /* _OB_TABLE_MOVE_RESPONSE_H */
|
||||||
@ -955,19 +955,19 @@ int ObTableQueryAndMutateP::try_process()
|
|||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
const bool is_hkv = (ObTableEntityType::ET_HKV == arg_.entity_type_);
|
const bool is_hkv = (ObTableEntityType::ET_HKV == arg_.entity_type_);
|
||||||
ObHTableLockHandle *lock_handle = nullptr;
|
ObHTableLockHandle *lock_handle = nullptr;
|
||||||
uint64_t table_id = OB_INVALID_ID;
|
|
||||||
|
|
||||||
if (OB_FAIL(init_scan_tb_ctx(cache_guard))) {
|
if (OB_FAIL(init_scan_tb_ctx(cache_guard))) {
|
||||||
LOG_WARN("fail to init scan table ctx", K(ret));
|
LOG_WARN("fail to init scan table ctx", K(ret));
|
||||||
} else if (FALSE_IT(table_id = tb_ctx_.get_ref_table_id())) {
|
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||||
|
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||||
} else if (is_hkv && OB_FAIL(HTABLE_LOCK_MGR->acquire_handle(lock_handle))) {
|
} else if (is_hkv && OB_FAIL(HTABLE_LOCK_MGR->acquire_handle(lock_handle))) {
|
||||||
LOG_WARN("fail to get htable lock handle", K(ret));
|
LOG_WARN("fail to get htable lock handle", K(ret));
|
||||||
} else if (is_hkv && OB_FAIL(ObHTableUtils::lock_htable_row(table_id, query, *lock_handle, ObHTableLockMode::EXCLUSIVE))) {
|
} else if (is_hkv && OB_FAIL(ObHTableUtils::lock_htable_row(table_id_, query, *lock_handle, ObHTableLockMode::EXCLUSIVE))) {
|
||||||
LOG_WARN("fail to lock htable row", K(ret), K(table_id), K(query));
|
LOG_WARN("fail to lock htable row", K(ret), K_(table_id), K(query));
|
||||||
} else if (OB_FAIL(start_trans(false, /* is_readonly */
|
} else if (OB_FAIL(start_trans(false, /* is_readonly */
|
||||||
sql::stmt::T_UPDATE,
|
sql::stmt::T_UPDATE,
|
||||||
consistency_level,
|
consistency_level,
|
||||||
table_id,
|
table_id_,
|
||||||
tb_ctx_.get_ls_id(),
|
tb_ctx_.get_ls_id(),
|
||||||
get_timeout_ts()))) {
|
get_timeout_ts()))) {
|
||||||
LOG_WARN("fail to start readonly transaction", K(ret));
|
LOG_WARN("fail to start readonly transaction", K(ret));
|
||||||
|
|||||||
@ -269,6 +269,8 @@ int ObTableQueryP::try_process()
|
|||||||
LOG_WARN("fail to get spec from cache", K(ret));
|
LOG_WARN("fail to get spec from cache", K(ret));
|
||||||
} else if (OB_FAIL(spec->create_executor(tb_ctx_, executor))) {
|
} else if (OB_FAIL(spec->create_executor(tb_ctx_, executor))) {
|
||||||
LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_));
|
LOG_WARN("fail to generate executor", K(ret), K(tb_ctx_));
|
||||||
|
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||||
|
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||||
} else if (OB_FAIL(start_trans(true, /* is_readonly */
|
} else if (OB_FAIL(start_trans(true, /* is_readonly */
|
||||||
sql::stmt::T_SELECT,
|
sql::stmt::T_SELECT,
|
||||||
arg_.consistency_level_,
|
arg_.consistency_level_,
|
||||||
|
|||||||
@ -598,6 +598,8 @@ int ObTableQuerySyncP::try_process()
|
|||||||
} else if (OB_FAIL(get_query_session(query_session_id_, query_session_))) {
|
} else if (OB_FAIL(get_query_session(query_session_id_, query_session_))) {
|
||||||
LOG_WARN("fail to get query session", K(ret), K(query_session_id_));
|
LOG_WARN("fail to get query session", K(ret), K(query_session_id_));
|
||||||
} else if (FALSE_IT(timeout_ts_ = get_timeout_ts())) {
|
} else if (FALSE_IT(timeout_ts_ = get_timeout_ts())) {
|
||||||
|
} else if (FALSE_IT(table_id_ = arg_.table_id_)) {
|
||||||
|
} else if (FALSE_IT(tablet_id_ = arg_.tablet_id_)) {
|
||||||
} else {
|
} else {
|
||||||
if (ObQueryOperationType::QUERY_START == arg_.query_type_) {
|
if (ObQueryOperationType::QUERY_START == arg_.query_type_) {
|
||||||
ret = process_query_start();
|
ret = process_query_start();
|
||||||
|
|||||||
@ -186,7 +186,6 @@ public:
|
|||||||
explicit ObTableQuerySyncP(const ObGlobalContext &gctx);
|
explicit ObTableQuerySyncP(const ObGlobalContext &gctx);
|
||||||
virtual ~ObTableQuerySyncP() {}
|
virtual ~ObTableQuerySyncP() {}
|
||||||
virtual int deserialize() override;
|
virtual int deserialize() override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual int check_arg() override;
|
virtual int check_arg() override;
|
||||||
virtual int try_process() override;
|
virtual int try_process() override;
|
||||||
@ -195,6 +194,7 @@ protected:
|
|||||||
virtual uint64_t get_request_checksum() override;
|
virtual uint64_t get_request_checksum() override;
|
||||||
virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override;
|
virtual table::ObTableAPITransCb *new_callback(rpc::ObRequest *req) override;
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int process_query_start();
|
int process_query_start();
|
||||||
int process_query_next();
|
int process_query_next();
|
||||||
|
|||||||
@ -29,6 +29,7 @@
|
|||||||
#include "storage/tx/ob_trans_service.h"
|
#include "storage/tx/ob_trans_service.h"
|
||||||
#include "ob_table_session_pool.h"
|
#include "ob_table_session_pool.h"
|
||||||
#include "storage/tx/wrs/ob_weak_read_util.h"
|
#include "storage/tx/wrs/ob_weak_read_util.h"
|
||||||
|
#include "ob_table_move_response.h"
|
||||||
|
|
||||||
using namespace oceanbase::observer;
|
using namespace oceanbase::observer;
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -89,9 +90,9 @@ int ObTableLoginP::process()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// whether the client should refresh location cache
|
// whether the client should refresh location cache
|
||||||
if (OB_SUCCESS != ret && is_bad_routing_err(ret)) {
|
if (OB_SUCCESS != ret && is_require_rerouting_err(ret)) {
|
||||||
ObRpcProcessor::bad_routing_ = true;
|
ObRpcProcessor::require_rerouting_ = true;
|
||||||
LOG_WARN("[TABLE] login bad routing", K(ret), "bad_routing", ObRpcProcessor::bad_routing_);
|
LOG_WARN("[TABLE] login require rerouting", K(ret), "require_rerouting", ObRpcProcessor::require_rerouting_);
|
||||||
}
|
}
|
||||||
ObTenantStatEstGuard stat_guard(result_.tenant_id_);
|
ObTenantStatEstGuard stat_guard(result_.tenant_id_);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
@ -223,7 +224,7 @@ ObTableApiProcessorBase::ObTableApiProcessorBase(const ObGlobalContext &gctx)
|
|||||||
need_retry_in_queue_(false),
|
need_retry_in_queue_(false),
|
||||||
retry_count_(0),
|
retry_count_(0),
|
||||||
trans_desc_(NULL),
|
trans_desc_(NULL),
|
||||||
did_async_end_trans_(false)
|
had_do_response_(false)
|
||||||
{
|
{
|
||||||
need_audit_ = GCONF.enable_sql_audit;
|
need_audit_ = GCONF.enable_sql_audit;
|
||||||
trans_state_ptr_ = &trans_state_;
|
trans_state_ptr_ = &trans_state_;
|
||||||
@ -233,7 +234,7 @@ void ObTableApiProcessorBase::reset_ctx()
|
|||||||
{
|
{
|
||||||
trans_state_ptr_->reset();
|
trans_state_ptr_->reset();
|
||||||
trans_desc_ = NULL;
|
trans_desc_ = NULL;
|
||||||
did_async_end_trans_ = false;
|
had_do_response_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableApiProcessorBase::get_ls_id(const ObTabletID &tablet_id, ObLSID &ls_id)
|
int ObTableApiProcessorBase::get_ls_id(const ObTabletID &tablet_id, ObLSID &ls_id)
|
||||||
@ -568,7 +569,7 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim
|
|||||||
callback.callback(ret);
|
callback.callback(ret);
|
||||||
}
|
}
|
||||||
// ignore the return code of end_trans
|
// ignore the return code of end_trans
|
||||||
did_async_end_trans_ = true; // don't send response in this worker thread
|
had_do_response_ = true; // don't send response in this worker thread
|
||||||
// @note the req_ may be freed, req_processor can not be read any more.
|
// @note the req_ may be freed, req_processor can not be read any more.
|
||||||
// The req_has_wokenup_ MUST set to be true, otherwise req_processor will invoke req_->set_process_start_end_diff, cause memory core
|
// The req_has_wokenup_ MUST set to be true, otherwise req_processor will invoke req_->set_process_start_end_diff, cause memory core
|
||||||
// @see ObReqProcessor::run() req_->set_process_start_end_diff(ObTimeUtility::current_time());
|
// @see ObReqProcessor::run() req_->set_process_start_end_diff(ObTimeUtility::current_time());
|
||||||
@ -880,15 +881,15 @@ int ObTableRpcProcessor<T>::process()
|
|||||||
if (OB_FAIL(process_with_retry(RpcProcessor::arg_.credential_, get_timeout_ts()))) {
|
if (OB_FAIL(process_with_retry(RpcProcessor::arg_.credential_, get_timeout_ts()))) {
|
||||||
if (OB_NOT_NULL(request_string_)) { // request_string_ has been generated if enable sql_audit
|
if (OB_NOT_NULL(request_string_)) { // request_string_ has been generated if enable sql_audit
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), K(request_string_));
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), K(request_string_));
|
||||||
} else if (did_async_end_trans()) { // req_ may be freed
|
} else if (had_do_response()) { // req_ may be freed
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_));
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), "request", RpcProcessor::arg_);
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), "request", RpcProcessor::arg_);
|
||||||
}
|
}
|
||||||
// whether the client should refresh location cache
|
// whether the client should refresh location cache and retry
|
||||||
if (is_bad_routing_err(ret)) {
|
if (is_require_rerouting_err(ret)) {
|
||||||
ObRpcProcessor<T>::bad_routing_ = true;
|
ObRpcProcessor<T>::require_rerouting_ = true;
|
||||||
LOG_WARN("table_api request bad routing", K(ret), "bad_routing", ObRpcProcessor<T>::bad_routing_);
|
LOG_WARN("table_api request require rerouting", K(ret), "require_rerouting", ObRpcProcessor<T>::require_rerouting_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -910,12 +911,26 @@ int ObTableRpcProcessor<T>::before_response(int error_code)
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<class T>
|
template<class T>
|
||||||
int ObTableRpcProcessor<T>::response(const int retcode)
|
int ObTableRpcProcessor<T>::response(int error_code)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
// if it is waiting for retry in queue, the response can NOT be sent.
|
// if it is waiting for retry in queue, the response can NOT be sent.
|
||||||
if (!need_retry_in_queue_) {
|
if (!need_retry_in_queue_ && !had_do_response()) {
|
||||||
ret = RpcProcessor::response(retcode);
|
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(this->req_->get_packet());
|
||||||
|
if (is_require_rerouting_err(error_code) && rpc_pkt->require_rerouting()) {
|
||||||
|
// response rerouting packet
|
||||||
|
ObTableMoveResponseSender sender(this->req_, error_code);
|
||||||
|
if (OB_FAIL(sender.init(ObTableApiProcessorBase::table_id_, ObTableApiProcessorBase::tablet_id_, *gctx_.schema_service_))) {
|
||||||
|
LOG_WARN("fail to init move response sender", K(ret), K(RpcProcessor::arg_));
|
||||||
|
} else if (OB_FAIL(sender.response())) {
|
||||||
|
LOG_WARN("fail to do move response", K(ret));
|
||||||
|
}
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
ret = RpcProcessor::response(error_code); // do common response when do move response failed
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ret = RpcProcessor::response(error_code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -978,9 +993,9 @@ void ObTableRpcProcessor<T>::generate_sql_id()
|
|||||||
snprintf(audit_record_.sql_id_, (int32_t)sizeof(audit_record_.sql_id_),
|
snprintf(audit_record_.sql_id_, (int32_t)sizeof(audit_record_.sql_id_),
|
||||||
"TABLEAPI0x%04Xvv%016lX", RpcProcessor::PCODE, checksum);
|
"TABLEAPI0x%04Xvv%016lX", RpcProcessor::PCODE, checksum);
|
||||||
}
|
}
|
||||||
bool oceanbase::observer::is_bad_routing_err(const int err)
|
bool oceanbase::observer::is_require_rerouting_err(const int err)
|
||||||
{
|
{
|
||||||
// bad routing check : whether client should refresh location cache
|
// rerouting: whether client should refresh location cache and retry
|
||||||
// Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp
|
// Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp
|
||||||
return (is_master_changed_error(err)
|
return (is_master_changed_error(err)
|
||||||
|| is_server_down_error(err)
|
|| is_server_down_error(err)
|
||||||
|
|||||||
@ -134,11 +134,11 @@ public:
|
|||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
int64_t timeout_ts);
|
int64_t timeout_ts);
|
||||||
void release_read_trans();
|
void release_read_trans();
|
||||||
inline bool did_async_end_trans() const { return did_async_end_trans_; }
|
|
||||||
inline transaction::ObTxDesc *get_trans_desc() { return trans_desc_; }
|
inline transaction::ObTxDesc *get_trans_desc() { return trans_desc_; }
|
||||||
int get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
int get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
||||||
ObIArray<ObTabletID> &tablet_ids);
|
ObIArray<ObTabletID> &tablet_ids);
|
||||||
inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return tx_snapshot_; }
|
inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return tx_snapshot_; }
|
||||||
|
inline bool had_do_response() const { return had_do_response_; }
|
||||||
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
|
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
|
||||||
protected:
|
protected:
|
||||||
virtual int check_arg() = 0;
|
virtual int check_arg() = 0;
|
||||||
@ -181,11 +181,13 @@ protected:
|
|||||||
ObTableRetryPolicy retry_policy_;
|
ObTableRetryPolicy retry_policy_;
|
||||||
bool need_retry_in_queue_;
|
bool need_retry_in_queue_;
|
||||||
int32_t retry_count_;
|
int32_t retry_count_;
|
||||||
|
uint64_t table_id_;
|
||||||
|
ObTabletID tablet_id_;
|
||||||
protected:
|
protected:
|
||||||
// trans control
|
// trans control
|
||||||
sql::TransState trans_state_;
|
sql::TransState trans_state_;
|
||||||
transaction::ObTxDesc *trans_desc_;
|
transaction::ObTxDesc *trans_desc_;
|
||||||
bool did_async_end_trans_;
|
bool had_do_response_; // asynchronous transactions return packet in advance
|
||||||
sql::TransState *trans_state_ptr_;
|
sql::TransState *trans_state_ptr_;
|
||||||
transaction::ObTxReadSnapshot tx_snapshot_;
|
transaction::ObTxReadSnapshot tx_snapshot_;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -276,7 +276,7 @@ private:
|
|||||||
~ObTableRpcProcessorUtil() = delete;
|
~ObTableRpcProcessorUtil() = delete;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool is_bad_routing_err(const int err);
|
bool is_require_rerouting_err(const int err);
|
||||||
|
|
||||||
} // end namespace observer
|
} // end namespace observer
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|||||||
@ -10,8 +10,8 @@
|
|||||||
* See the Mulan PubL v2 for more details.
|
* See the Mulan PubL v2 for more details.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _OB_RPC_ASYNC_RESPONSE_H
|
#ifndef _OB_TABLE_RPC_RESPONSE_SENDER_H
|
||||||
#define _OB_RPC_ASYNC_RESPONSE_H 1
|
#define _OB_TABLE_RPC_RESPONSE_SENDER_H 1
|
||||||
#include "rpc/ob_request.h"
|
#include "rpc/ob_request.h"
|
||||||
#include "rpc/obrpc/ob_rpc_packet.h"
|
#include "rpc/obrpc/ob_rpc_packet.h"
|
||||||
#include "rpc/frame/ob_req_processor.h"
|
#include "rpc/frame/ob_req_processor.h"
|
||||||
@ -25,30 +25,40 @@ namespace obrpc
|
|||||||
{
|
{
|
||||||
// this class is copied from ObRpcProcessor
|
// this class is copied from ObRpcProcessor
|
||||||
template <class T>
|
template <class T>
|
||||||
class ObRpcAsyncResponse
|
class ObTableRpcResponseSender
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObRpcAsyncResponse(rpc::ObRequest *req, T &result)
|
ObTableRpcResponseSender(rpc::ObRequest *req, T &result, const int exec_ret_code = common::OB_SUCCESS)
|
||||||
:req_(req),
|
:req_(req),
|
||||||
result_(result),
|
result_(result),
|
||||||
|
exec_ret_code_(exec_ret_code),
|
||||||
|
pcode_(ObRpcPacketCode::OB_INVALID_RPC_CODE),
|
||||||
using_buffer_(NULL)
|
using_buffer_(NULL)
|
||||||
{}
|
{
|
||||||
virtual ~ObRpcAsyncResponse() = default;
|
if (OB_NOT_NULL(req_)) {
|
||||||
int response(const int retcode);
|
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(req_->get_packet());
|
||||||
|
pcode_ = rpc_pkt->get_pcode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual ~ObTableRpcResponseSender() = default;
|
||||||
|
int response(const int cb_param);
|
||||||
|
OB_INLINE void set_pcode(ObRpcPacketCode pcode) { pcode_ = pcode; }
|
||||||
private:
|
private:
|
||||||
int serialize();
|
int serialize();
|
||||||
int do_response(ObRpcPacket *response_pkt, bool bad_routing);
|
int do_response(ObRpcPacket *response_pkt, bool require_rerouting);
|
||||||
char *easy_alloc(int64_t size) const;
|
char *easy_alloc(int64_t size) const;
|
||||||
// disallow copy
|
// disallow copy
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObRpcAsyncResponse);
|
DISALLOW_COPY_AND_ASSIGN(ObTableRpcResponseSender);
|
||||||
private:
|
private:
|
||||||
rpc::ObRequest *req_;
|
rpc::ObRequest *req_;
|
||||||
T &result_;
|
T &result_;
|
||||||
|
const int exec_ret_code_; // processor执行的返回码
|
||||||
|
ObRpcPacketCode pcode_;
|
||||||
common::ObDataBuffer *using_buffer_;
|
common::ObDataBuffer *using_buffer_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
char *ObRpcAsyncResponse<T>::easy_alloc(int64_t size) const
|
char *ObTableRpcResponseSender<T>::easy_alloc(int64_t size) const
|
||||||
{
|
{
|
||||||
void *buf = NULL;
|
void *buf = NULL;
|
||||||
if (OB_ISNULL(req_)) {
|
if (OB_ISNULL(req_)) {
|
||||||
@ -60,7 +70,7 @@ char *ObRpcAsyncResponse<T>::easy_alloc(int64_t size) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
int ObRpcAsyncResponse<T>::serialize()
|
int ObTableRpcResponseSender<T>::serialize()
|
||||||
{
|
{
|
||||||
int ret = common::OB_SUCCESS;
|
int ret = common::OB_SUCCESS;
|
||||||
if (OB_ISNULL(using_buffer_)) {
|
if (OB_ISNULL(using_buffer_)) {
|
||||||
@ -77,12 +87,15 @@ int ObRpcAsyncResponse<T>::serialize()
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
int ObRpcAsyncResponse<T>::do_response(ObRpcPacket *response_pkt, bool bad_routing)
|
int ObTableRpcResponseSender<T>::do_response(ObRpcPacket *response_pkt, bool require_rerouting)
|
||||||
{
|
{
|
||||||
int ret = common::OB_SUCCESS;
|
int ret = common::OB_SUCCESS;
|
||||||
if (OB_ISNULL(req_)) {
|
if (OB_ISNULL(req_)) {
|
||||||
ret = common::OB_ERR_NULL_VALUE;
|
ret = common::OB_ERR_NULL_VALUE;
|
||||||
RPC_OBRPC_LOG(WARN, "req is NULL", K(ret));
|
RPC_OBRPC_LOG(WARN, "req is NULL", K(ret));
|
||||||
|
} else if (ObRpcPacketCode::OB_INVALID_RPC_CODE == pcode_) {
|
||||||
|
ret = common::OB_ERR_UNEXPECTED;
|
||||||
|
RPC_OBRPC_LOG(WARN, "pcode is invalid", K(ret), K_(pcode), KPC_(req));
|
||||||
} else {
|
} else {
|
||||||
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(req_->get_packet());
|
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(req_->get_packet());
|
||||||
// TODO: fufeng, make force_destroy_second as a configure item
|
// TODO: fufeng, make force_destroy_second as a configure item
|
||||||
@ -94,12 +107,11 @@ int ObRpcAsyncResponse<T>::do_response(ObRpcPacket *response_pkt, bool bad_routi
|
|||||||
// _OB_LOG(ERROR, "pkt process too long time: pkt_receive_ts=%ld, pkt_code=%d", rts, pcode);
|
// _OB_LOG(ERROR, "pkt process too long time: pkt_receive_ts=%ld, pkt_code=%d", rts, pcode);
|
||||||
// }
|
// }
|
||||||
//copy packet into req buffer
|
//copy packet into req buffer
|
||||||
ObRpcPacketCode pcode = rpc_pkt->get_pcode();
|
|
||||||
ObRpcPacket *packet = NULL;
|
ObRpcPacket *packet = NULL;
|
||||||
req_->set_trace_point(rpc::ObRequest::OB_EASY_REQUEST_RPC_ASYNC_RSP);
|
req_->set_trace_point(rpc::ObRequest::OB_EASY_REQUEST_RPC_ASYNC_RSP);
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
packet = response_pkt;
|
packet = response_pkt;
|
||||||
packet->set_pcode(pcode);
|
packet->set_pcode(pcode_);
|
||||||
packet->set_chid(rpc_pkt->get_chid());
|
packet->set_chid(rpc_pkt->get_chid());
|
||||||
packet->set_session_id(0); // not stream
|
packet->set_session_id(0); // not stream
|
||||||
packet->set_trace_id(rpc_pkt->get_trace_id());
|
packet->set_trace_id(rpc_pkt->get_trace_id());
|
||||||
@ -115,8 +127,8 @@ int ObRpcAsyncResponse<T>::do_response(ObRpcPacket *response_pkt, bool bad_routi
|
|||||||
packet->set_pop_process_start_diff(req_->get_pop_process_start_diff());
|
packet->set_pop_process_start_diff(req_->get_pop_process_start_diff());
|
||||||
packet->set_process_start_end_diff(req_->get_process_start_end_diff());
|
packet->set_process_start_end_diff(req_->get_process_start_end_diff());
|
||||||
packet->set_process_end_response_diff(req_->get_process_end_response_diff());
|
packet->set_process_end_response_diff(req_->get_process_end_response_diff());
|
||||||
if (bad_routing) {
|
if (require_rerouting) {
|
||||||
packet->set_bad_routing();
|
packet->set_require_rerouting();
|
||||||
}
|
}
|
||||||
packet->calc_checksum();
|
packet->calc_checksum();
|
||||||
}
|
}
|
||||||
@ -127,9 +139,10 @@ int ObRpcAsyncResponse<T>::do_response(ObRpcPacket *response_pkt, bool bad_routi
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
int ObRpcAsyncResponse<T>::response(const int retcode)
|
int ObTableRpcResponseSender<T>::response(const int cb_param)
|
||||||
{
|
{
|
||||||
int ret = common::OB_SUCCESS;
|
int ret = common::OB_SUCCESS;
|
||||||
|
int retcode = (cb_param == OB_SUCCESS ? exec_ret_code_ : cb_param);
|
||||||
if (OB_ISNULL(req_)) {
|
if (OB_ISNULL(req_)) {
|
||||||
ret = common::OB_INVALID_ARGUMENT;
|
ret = common::OB_INVALID_ARGUMENT;
|
||||||
RPC_OBRPC_LOG(WARN, "invalid req, maybe stream rpc timeout", K(ret), K(retcode),
|
RPC_OBRPC_LOG(WARN, "invalid req, maybe stream rpc timeout", K(ret), K(retcode),
|
||||||
@ -202,21 +215,22 @@ int ObRpcAsyncResponse<T>::response(const int retcode)
|
|||||||
using_buffer_->get_position()))) {
|
using_buffer_->get_position()))) {
|
||||||
RPC_OBRPC_LOG(WARN, "serialize result code fail", K(ret));
|
RPC_OBRPC_LOG(WARN, "serialize result code fail", K(ret));
|
||||||
} else {
|
} else {
|
||||||
// also send result if process successfully.
|
// 1. send result if process successfully.
|
||||||
if (common::OB_SUCCESS == retcode) {
|
// 2. send result if require rerouting
|
||||||
|
if (common::OB_SUCCESS == retcode || observer::is_require_rerouting_err(retcode)) {
|
||||||
if (OB_FAIL(serialize())) {
|
if (OB_FAIL(serialize())) {
|
||||||
RPC_OBRPC_LOG(WARN, "serialize result fail", K(ret));
|
RPC_OBRPC_LOG(WARN, "serialize result fail", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// routing check : whether client should refresh location cache and retry
|
// rerouting: whether client should refresh location cache and retry
|
||||||
// Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp
|
// Now, following the same logic as in ../mysql/ob_query_retry_ctrl.cpp
|
||||||
bool bad_routing = false;
|
bool require_rerouting = false;
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (common::OB_SUCCESS != retcode && observer::is_bad_routing_err(retcode)) {
|
if (common::OB_SUCCESS != retcode && observer::is_require_rerouting_err(retcode)) {
|
||||||
bad_routing = true;
|
require_rerouting = true;
|
||||||
RPC_OBRPC_LOG(WARN, "bad routing", K(retcode), K(bad_routing));
|
RPC_OBRPC_LOG(INFO, "require rerouting", K(retcode), K(require_rerouting));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,8 +238,8 @@ int ObRpcAsyncResponse<T>::response(const int retcode)
|
|||||||
ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket();
|
ObRpcPacket *pkt = new (pkt_buf) ObRpcPacket();
|
||||||
//Response rsp(sessid, is_stream_, is_last, pkt);
|
//Response rsp(sessid, is_stream_, is_last, pkt);
|
||||||
pkt->set_content(using_buffer_->get_data(), using_buffer_->get_position());
|
pkt->set_content(using_buffer_->get_data(), using_buffer_->get_position());
|
||||||
if (OB_FAIL(do_response(pkt, bad_routing))) {
|
if (OB_FAIL(do_response(pkt, require_rerouting))) {
|
||||||
RPC_OBRPC_LOG(WARN, "response data fail", K(ret));
|
RPC_OBRPC_LOG(WARN, "response data fail", K(ret), K(retcode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,4 +250,4 @@ int ObRpcAsyncResponse<T>::response(const int retcode)
|
|||||||
} // end namespace obrpc
|
} // end namespace obrpc
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|
||||||
#endif /* _OB_RPC_ASYNC_RESPONSE_H */
|
#endif /* _OB_TABLE_RPC_RESPONSE_SENDER_H */
|
||||||
@ -118,7 +118,7 @@ int ObTableApiScanExecutor::prepare_das_task()
|
|||||||
scan_op = static_cast<ObDASScanOp*>(task_op);
|
scan_op = static_cast<ObDASScanOp*>(task_op);
|
||||||
scan_op->set_scan_ctdef(&scan_spec_.get_ctdef().scan_ctdef_);
|
scan_op->set_scan_ctdef(&scan_spec_.get_ctdef().scan_ctdef_);
|
||||||
scan_op->set_scan_rtdef(&tsc_rtdef_.scan_rtdef_);
|
scan_op->set_scan_rtdef(&tsc_rtdef_.scan_rtdef_);
|
||||||
scan_op->set_can_part_retry(nullptr == tsc_rtdef_.scan_rtdef_.sample_info_);
|
scan_op->set_can_part_retry(false);
|
||||||
tsc_rtdef_.scan_rtdef_.table_loc_->is_reading_ = true;
|
tsc_rtdef_.scan_rtdef_.table_loc_->is_reading_ = true;
|
||||||
if (scan_spec_.get_ctdef().lookup_ctdef_ != nullptr) {
|
if (scan_spec_.get_ctdef().lookup_ctdef_ != nullptr) {
|
||||||
//is local index lookup, need to set the lookup ctdef to the das scan op
|
//is local index lookup, need to set the lookup ctdef to the das scan op
|
||||||
|
|||||||
@ -1717,3 +1717,20 @@ int ObTableAggregation::deep_copy(ObIAllocator &allocator, ObTableAggregation &d
|
|||||||
OB_SERIALIZE_MEMBER(ObTableAggregation,
|
OB_SERIALIZE_MEMBER(ObTableAggregation,
|
||||||
type_,
|
type_,
|
||||||
column_);
|
column_);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
OB_SERIALIZE_MEMBER(ObTableMoveReplicaInfo,
|
||||||
|
table_id_,
|
||||||
|
schema_version_,
|
||||||
|
tablet_id_,
|
||||||
|
server_,
|
||||||
|
role_,
|
||||||
|
replica_type_,
|
||||||
|
part_renew_time_,
|
||||||
|
reserved_);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
OB_SERIALIZE_MEMBER(ObTableMoveResult,
|
||||||
|
replica_info_,
|
||||||
|
reserved_);
|
||||||
@ -21,12 +21,14 @@
|
|||||||
#include "lib/container/ob_se_array.h"
|
#include "lib/container/ob_se_array.h"
|
||||||
#include "lib/hash/ob_hashmap.h"
|
#include "lib/hash/ob_hashmap.h"
|
||||||
#include "lib/list/ob_dlist.h"
|
#include "lib/list/ob_dlist.h"
|
||||||
|
#include "lib/net/ob_addr.h"
|
||||||
#include "common/ob_common_types.h"
|
#include "common/ob_common_types.h"
|
||||||
#include "common/ob_range.h"
|
#include "common/ob_range.h"
|
||||||
#include "rpc/obrpc/ob_poc_rpc_server.h"
|
#include "rpc/obrpc/ob_poc_rpc_server.h"
|
||||||
|
|
||||||
#include "share/table/ob_table_ttl_common.h"
|
#include "share/table/ob_table_ttl_common.h"
|
||||||
#include "common/rowkey/ob_rowkey.h"
|
#include "common/rowkey/ob_rowkey.h"
|
||||||
|
#include "common/ob_role.h"
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
namespace common
|
namespace common
|
||||||
@ -957,6 +959,59 @@ public:
|
|||||||
common::ObString end_rowkey_;
|
common::ObString end_rowkey_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct ObTableMoveReplicaInfo final
|
||||||
|
{
|
||||||
|
OB_UNIS_VERSION(1);
|
||||||
|
public:
|
||||||
|
ObTableMoveReplicaInfo()
|
||||||
|
: table_id_(common::OB_INVALID_ID),
|
||||||
|
schema_version_(common::OB_INVALID_VERSION),
|
||||||
|
tablet_id_(common::ObTabletID::INVALID_TABLET_ID),
|
||||||
|
role_(common::ObRole::INVALID_ROLE),
|
||||||
|
replica_type_(common::ObReplicaType::REPLICA_TYPE_MAX),
|
||||||
|
part_renew_time_(0),
|
||||||
|
reserved_(0)
|
||||||
|
{}
|
||||||
|
virtual ~ObTableMoveReplicaInfo() {}
|
||||||
|
TO_STRING_KV(K_(table_id),
|
||||||
|
K_(schema_version),
|
||||||
|
K_(part_renew_time),
|
||||||
|
K_(tablet_id),
|
||||||
|
K_(server),
|
||||||
|
K_(role),
|
||||||
|
K_(replica_type),
|
||||||
|
K_(reserved));
|
||||||
|
OB_INLINE void set_table_id(const uint64_t table_id) { table_id_ = table_id; }
|
||||||
|
OB_INLINE void set_schema_version(const uint64_t schema_version) { schema_version_ = schema_version; }
|
||||||
|
OB_INLINE void set_tablet_id(const common::ObTabletID &tablet_id) { tablet_id_ = tablet_id; }
|
||||||
|
public:
|
||||||
|
uint64_t table_id_;
|
||||||
|
uint64_t schema_version_;
|
||||||
|
common::ObTabletID tablet_id_;
|
||||||
|
common::ObAddr server_;
|
||||||
|
common::ObRole role_;
|
||||||
|
common::ObReplicaType replica_type_;
|
||||||
|
int64_t part_renew_time_;
|
||||||
|
uint64_t reserved_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class ObTableMoveResult final
|
||||||
|
{
|
||||||
|
OB_UNIS_VERSION(1);
|
||||||
|
public:
|
||||||
|
ObTableMoveResult()
|
||||||
|
: reserved_(0)
|
||||||
|
{}
|
||||||
|
virtual ~ObTableMoveResult() {}
|
||||||
|
TO_STRING_KV(K_(replica_info),
|
||||||
|
K_(reserved));
|
||||||
|
|
||||||
|
OB_INLINE ObTableMoveReplicaInfo& get_replica_info() { return replica_info_; }
|
||||||
|
private:
|
||||||
|
ObTableMoveReplicaInfo replica_info_;
|
||||||
|
uint64_t reserved_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
} // end namespace table
|
} // end namespace table
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|||||||
Reference in New Issue
Block a user