fixed invalid PalfEnvKey when handle message on arbserver

This commit is contained in:
simonjoylet
2023-02-07 20:27:24 +08:00
committed by ob-robot
parent a00463ed37
commit 65e0d25980
9 changed files with 28 additions and 12 deletions

View File

@ -47,6 +47,13 @@ Handle::Handle()
int ObRpcProxy::init(const ObReqTransport *transport, int ObRpcProxy::init(const ObReqTransport *transport,
const oceanbase::common::ObAddr &dst) const oceanbase::common::ObAddr &dst)
{
return init(transport, ObRpcNetHandler::CLUSTER_ID, dst);
}
int ObRpcProxy::init(const ObReqTransport *transport,
const int64_t src_cluster_id,
const oceanbase::common::ObAddr &dst)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -58,6 +65,7 @@ int ObRpcProxy::init(const ObReqTransport *transport,
LOG_WARN("invalid argument", K(ret), KP(transport)); LOG_WARN("invalid argument", K(ret), KP(transport));
} else { } else {
transport_ = transport; transport_ = transport;
src_cluster_id_ = src_cluster_id;
dst_ = dst; dst_ = dst;
init_ = true; init_ = true;
} }
@ -250,7 +258,7 @@ int ObRpcProxy::init_pkt(
pkt->set_timestamp(ObTimeUtility::current_time()); pkt->set_timestamp(ObTimeUtility::current_time());
pkt->set_dst_cluster_id(dst_cluster_id_); pkt->set_dst_cluster_id(dst_cluster_id_);
// For request, src_cluster_id must be the cluster_id of this cluster, directly hard-coded // For request, src_cluster_id must be the cluster_id of this cluster, directly hard-coded
pkt->set_src_cluster_id(ObRpcNetHandler::CLUSTER_ID); pkt->set_src_cluster_id(src_cluster_id_);
pkt->set_unis_version(opts.unis_version_); pkt->set_unis_version(opts.unis_version_);
pkt->set_group_id((0 != get_group_id()) ? get_group_id() : this_worker().get_group_id()); pkt->set_group_id((0 != get_group_id()) ? get_group_id() : this_worker().get_group_id());
if (need_increment_request_level(pcode)) { if (need_increment_request_level(pcode)) {

View File

@ -117,12 +117,16 @@ public:
tenant_id_(common::OB_SYS_TENANT_ID), group_id_(0), tenant_id_(common::OB_SYS_TENANT_ID), group_id_(0),
priv_tenant_id_(common::OB_INVALID_TENANT_ID), priv_tenant_id_(common::OB_INVALID_TENANT_ID),
max_process_handler_time_(0), compressor_type_(common::INVALID_COMPRESSOR), max_process_handler_time_(0), compressor_type_(common::INVALID_COMPRESSOR),
src_cluster_id_(common::OB_INVALID_CLUSTER_ID),
dst_cluster_id_(common::OB_INVALID_CLUSTER_ID), init_(false), dst_cluster_id_(common::OB_INVALID_CLUSTER_ID), init_(false),
active_(true), is_trace_time_(false), do_ratelimit_(false), is_bg_flow_(0), rcode_() {} active_(true), is_trace_time_(false), do_ratelimit_(false), is_bg_flow_(0), rcode_() {}
virtual ~ObRpcProxy() = default; virtual ~ObRpcProxy() = default;
int init(const rpc::frame::ObReqTransport *transport, int init(const rpc::frame::ObReqTransport *transport,
const common::ObAddr &dst = common::ObAddr()); const common::ObAddr &dst = common::ObAddr());
int init(const rpc::frame::ObReqTransport *transport,
const int64_t src_cluster_id,
const common::ObAddr &dst = common::ObAddr());
void destroy() { init_ = false; } void destroy() { init_ = false; }
bool is_inited() const { return init_; } bool is_inited() const { return init_; }
void set_timeout(int64_t timeout) { timeout_ = timeout; } void set_timeout(int64_t timeout) { timeout_ = timeout; }
@ -253,6 +257,7 @@ protected:
uint64_t priv_tenant_id_; uint64_t priv_tenant_id_;
uint32_t max_process_handler_time_; uint32_t max_process_handler_time_;
common::ObCompressorType compressor_type_; common::ObCompressorType compressor_type_;
int64_t src_cluster_id_;
int64_t dst_cluster_id_; int64_t dst_cluster_id_;
bool init_; bool init_;
bool active_; bool active_;

View File

@ -11,9 +11,11 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <signal.h> #include <signal.h>
#define private public #define private public
#define protected public
#include "env/ob_simple_log_cluster_env.h" #include "env/ob_simple_log_cluster_env.h"
#undef private
#include "rootserver/ob_rs_async_rpc_proxy.h" #include "rootserver/ob_rs_async_rpc_proxy.h"
#undef private
#undef protected
const std::string TEST_NAME = "mutil_arb_server"; const std::string TEST_NAME = "mutil_arb_server";
@ -289,7 +291,8 @@ TEST_F(TestObSimpleMutilArbServer, test_gc)
for (auto cluster_id : gc_cluster_ids) { for (auto cluster_id : gc_cluster_ids) {
ObArbGCNotifyArg arg; ObArbGCNotifyArg arg;
ObArbGCNotifyResult result; ObArbGCNotifyResult result;
ObRpcNetHandler::CLUSTER_ID = cluster_id; //ObRpcNetHandler::CLUSTER_ID = cluster_id;
rpc_proxy.src_cluster_id_ = cluster_id;
array.set_max_tenant_id(max_tenant_id); array.set_max_tenant_id(max_tenant_id);
EXPECT_EQ(OB_SUCCESS, arg.init(epoch, array)); EXPECT_EQ(OB_SUCCESS, arg.init(epoch, array));
EXPECT_EQ(OB_SUCCESS, rpc_proxy.to(dst_addr).arb_gc_notify(arg, result)); EXPECT_EQ(OB_SUCCESS, rpc_proxy.to(dst_addr).arb_gc_notify(arg, result));

View File

@ -35,13 +35,14 @@ LogRpc::~LogRpc()
} }
int LogRpc::init(const ObAddr &self, int LogRpc::init(const ObAddr &self,
const int64_t cluster_id,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport) rpc::frame::ObReqTransport *transport)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (IS_INIT) { if (IS_INIT) {
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
} else if (OB_FAIL(rpc_proxy_.init(transport))) { } else if (OB_FAIL(rpc_proxy_.init(transport, cluster_id))) {
PALF_LOG(ERROR, "LogRpcProxyV2 init failed", K(ret)); PALF_LOG(ERROR, "LogRpcProxyV2 init failed", K(ret));
} else { } else {
self_ = self; self_ = self;

View File

@ -70,6 +70,7 @@ public:
LogRpc(); LogRpc();
~LogRpc(); ~LogRpc();
int init(const common::ObAddr &self, int init(const common::ObAddr &self,
const int64_t cluster_id,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport); rpc::frame::ObReqTransport *transport);
void destroy(); void destroy();

View File

@ -16,6 +16,7 @@
#include "palf_env_impl.h" #include "palf_env_impl.h"
#include "palf_handle_impl.h" #include "palf_handle_impl.h"
#include "rpc/frame/ob_req_transport.h" #include "rpc/frame/ob_req_transport.h"
#include "rpc/obrpc/ob_rpc_net_handler.h"
#include "share/allocator/ob_tenant_mutil_allocator.h" #include "share/allocator/ob_tenant_mutil_allocator.h"
#include "palf_handle.h" #include "palf_handle.h"
#include "palf_options.h" #include "palf_options.h"
@ -52,7 +53,8 @@ int PalfEnv::create_palf_env(
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(FileDirectoryUtils::delete_tmp_file_or_directory_at(base_dir))) { } else if (OB_FAIL(FileDirectoryUtils::delete_tmp_file_or_directory_at(base_dir))) {
CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir)); CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, MTL_ID(), transport, } else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, obrpc::ObRpcNetHandler::CLUSTER_ID,
MTL_ID(), transport,
log_alloc_mgr, log_block_pool))) { log_alloc_mgr, log_block_pool))) {
PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir)); PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->start_())) { } else if (OB_FAIL(palf_env->start_())) {

View File

@ -162,6 +162,7 @@ PalfEnvImpl::~PalfEnvImpl()
int PalfEnvImpl::init( int PalfEnvImpl::init(
const PalfOptions &options, const PalfOptions &options,
const char *base_dir, const ObAddr &self, const char *base_dir, const ObAddr &self,
const int64_t cluster_id,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport, rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *log_alloc_mgr, common::ObILogAllocator *log_alloc_mgr,
@ -185,7 +186,7 @@ int PalfEnvImpl::init(
KP(log_alloc_mgr), KP(log_block_pool)); KP(log_alloc_mgr), KP(log_block_pool));
} else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) { } else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) {
PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret)); PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret));
} else if (OB_FAIL(log_rpc_.init(self, tenant_id, transport))) { } else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) {
PALF_LOG(ERROR, "LogRpc init failed", K(ret)); PALF_LOG(ERROR, "LogRpc init failed", K(ret));
} else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) { } else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) {
PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret)); PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret));

View File

@ -176,6 +176,7 @@ public:
int init(const PalfOptions &options, int init(const PalfOptions &options,
const char *base_dir, const char *base_dir,
const common::ObAddr &self, const common::ObAddr &self,
const int64_t cluster_id,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport, rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *alloc_mgr, common::ObILogAllocator *alloc_mgr,

View File

@ -158,12 +158,6 @@ void ObBlockMetaTree::destroy()
macro_blocks_.reset(); macro_blocks_.reset();
block_tree_.destroy(); block_tree_.destroy();
data_desc_.reset(); data_desc_.reset();
for (int64_t i = 0; i < sorted_rowkeys_.count(); ++i) {
const ObDataMacroBlockMeta *cur_meta = sorted_rowkeys_.at(i).block_meta_;
if (OB_NOT_NULL(cur_meta)) {
cur_meta->~ObDataMacroBlockMeta();
}
}
sorted_rowkeys_.reset(); sorted_rowkeys_.reset();
tree_allocator_.reset(); tree_allocator_.reset();
arena_.reset(); arena_.reset();