transfer errsim module

This commit is contained in:
godyangfight
2023-08-15 11:44:33 +00:00
committed by ob-robot
parent f10a34e1d2
commit b5fcf8a442
45 changed files with 1222 additions and 35 deletions

View File

@ -59,4 +59,12 @@ ob_set_subtarget(oblib_common storage
storage/ob_sequence.cpp
)
if (OB_ERRSIM)
ob_set_subtarget(oblib_common errsim_module
errsim_module/ob_errsim_module_type.cpp
errsim_module/ob_tenant_errsim_event.cpp
errsim_module/ob_errsim_module_interface.cpp
)
endif()
ob_lib_add_target(oblib_common)

View File

@ -0,0 +1,61 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX COMMON
#include "ob_errsim_module_interface.h"
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include "lib/ob_define.h"
using namespace oceanbase::common;
namespace oceanbase {
namespace common {
int __attribute__((weak)) build_tenant_errsim_moulde(
const uint64_t tenant_id,
const int64_t config_version,
const common::ObArray<ObFixedLengthString<ObErrsimModuleTypeHelper::MAX_TYPE_NAME_LENGTH>> &module_array,
const int64_t percentage)
{
int ret = OB_SUCCESS;
UNUSED(tenant_id);
UNUSED(config_version);
UNUSED(module_array);
UNUSED(percentage);
return ret;
}
bool __attribute__((weak)) is_errsim_module(
const uint64_t tenant_id,
const ObErrsimModuleType::TYPE &type)
{
bool b_ret = false;
UNUSED(tenant_id);
UNUSED(type);
return b_ret;
}
int __attribute__((weak)) add_tenant_errsim_event(
const uint64_t tenant_id,
const ObTenantErrsimEvent &event)
{
int ret = OB_SUCCESS;
UNUSED(tenant_id);
UNUSED(event);
return ret;
}
} // common
} // oceanbase

View File

@ -0,0 +1,43 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_COMMON_ERRSIM_MODULE_OB_ERRSIM_MODULE_INTERFACE_H_
#define OCEANBASE_COMMON_ERRSIM_MODULE_OB_ERRSIM_MODULE_INTERFACE_H_
#include <stdint.h>
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/utility.h"
#include "ob_errsim_module_type.h"
#include "lib/container/ob_array.h"
#include "ob_tenant_errsim_event.h"
namespace oceanbase
{
namespace common
{
int build_tenant_errsim_moulde(
const uint64_t tenant_id,
const int64_t config_version,
const common::ObArray<ObFixedLengthString<ObErrsimModuleTypeHelper::MAX_TYPE_NAME_LENGTH>> &module_array,
const int64_t percentage);
bool is_errsim_module(
const uint64_t tenant_id,
const ObErrsimModuleType::TYPE &type);
int add_tenant_errsim_event(
const uint64_t tenant_id,
const ObTenantErrsimEvent &event);
} // namespace common
} // namespace oceanbase
#endif // OCEANBASE_COMMON_ERRSIM_MODULE_OB_ERRSIM_MODULE_INTERFACE_H_

View File

@ -0,0 +1,106 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX COMMON
#include "ob_errsim_module_type.h"
namespace oceanbase
{
using namespace lib;
namespace common
{
static const char *OB_ERRSIM_MODULE_TYPES[] = {
"NONE",
"ALL",
"MIGRATION",
"TRANSFER",
};
void ObErrsimModuleType::reset()
{
type_ = ObErrsimModuleType::ERRSIM_MODULE_MAX;
}
bool ObErrsimModuleType::is_valid() const
{
return ObErrsimModuleTypeHelper::is_valid(type_);
}
const char *ObErrsimModuleType::get_str()
{
return ObErrsimModuleTypeHelper::get_str(type_);
}
bool ObErrsimModuleType::operator == (const ObErrsimModuleType &other) const
{
bool is_same = true;
if (this == &other) {
// same
} else {
is_same = type_ == other.type_;
}
return is_same;
}
int64_t ObErrsimModuleType::hash() const
{
int64_t hash_value = 0;
hash_value = common::murmurhash(
&type_, sizeof(type_), hash_value);
return hash_value;
}
int ObErrsimModuleType::hash(uint64_t &hash_val) const
{
hash_val = hash();
return OB_SUCCESS;
}
OB_SERIALIZE_MEMBER(ObErrsimModuleType, type_);
const char *ObErrsimModuleTypeHelper::get_str(const ObErrsimModuleType::TYPE &type)
{
const char *str = nullptr;
if (!is_valid(type)) {
str = "UNKNOWN";
} else {
str = OB_ERRSIM_MODULE_TYPES[type];
}
return str;
}
ObErrsimModuleType::TYPE ObErrsimModuleTypeHelper::get_type(const char *type_str)
{
ObErrsimModuleType::TYPE type = ObErrsimModuleType::ERRSIM_MODULE_MAX;
const int64_t count = ARRAYSIZEOF(OB_ERRSIM_MODULE_TYPES);
STATIC_ASSERT(static_cast<int64_t>(ObErrsimModuleType::ERRSIM_MODULE_MAX) == count, "type count mismatch");
for (int64_t i = 0; i < count; ++i) {
if (0 == strcmp(type_str, OB_ERRSIM_MODULE_TYPES[i])) {
type = static_cast<ObErrsimModuleType::TYPE>(i);
break;
}
}
return type;
}
bool ObErrsimModuleTypeHelper::is_valid(const ObErrsimModuleType::TYPE &type)
{
return type >= ObErrsimModuleType::ERRSIM_MODULE_NONE
&& type < ObErrsimModuleType::ERRSIM_MODULE_MAX;
}
} //common
} //oceanbase

View File

@ -0,0 +1,60 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef SRC_SHARE_ERRSIM_MODULE_OB_ERRSIM_MODULE_TYPE_H_
#define SRC_SHARE_ERRSIM_MODULE_OB_ERRSIM_MODULE_TYPE_H_
#include "lib/ob_define.h"
#include "lib/utility/ob_print_utils.h"
namespace oceanbase
{
namespace common
{
struct ObErrsimModuleType final
{
OB_UNIS_VERSION(1);
public:
enum TYPE
{
ERRSIM_MODULE_NONE = 0,
ERRSIM_MODULE_ALL = 1,
ERRSIM_MODULE_MIGRATION = 2,
ERRSIM_MODULE_TRANSFER = 3,
ERRSIM_MODULE_MAX
};
ObErrsimModuleType() : type_(ERRSIM_MODULE_NONE) {}
explicit ObErrsimModuleType(const ObErrsimModuleType::TYPE &type) : type_(type) {}
~ObErrsimModuleType() = default;
void reset();
bool is_valid() const;
const char *get_str();
bool operator == (const ObErrsimModuleType &other) const;
int hash(uint64_t &hash_val) const;
int64_t hash() const;
TO_STRING_KV(K_(type));
TYPE type_;
};
struct ObErrsimModuleTypeHelper final
{
static const char *get_str(const ObErrsimModuleType::TYPE &type);
static ObErrsimModuleType::TYPE get_type(const char *type_str);
static bool is_valid(const ObErrsimModuleType::TYPE &type);
static const int64_t MAX_TYPE_NAME_LENGTH = 16;
};
} // namespace common
} // namespace oceanbase
#endif

View File

@ -0,0 +1,58 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX COMMON
#include "ob_tenant_errsim_event.h"
namespace oceanbase
{
using namespace lib;
namespace common
{
ObTenantErrsimEvent::ObTenantErrsimEvent()
: timestamp_(0),
type_(),
errsim_error_(OB_SUCCESS),
backtrace_()
{
}
void ObTenantErrsimEvent::reset()
{
timestamp_ = 0;
type_.reset();
errsim_error_ = OB_SUCCESS;
backtrace_.reset();
}
bool ObTenantErrsimEvent::is_valid() const
{
return timestamp_ > 0 && type_.is_valid() && !backtrace_.is_empty();
}
void ObTenantErrsimEvent::build_event(const int32_t result)
{
timestamp_ = ObTimeUtil::current_time();
errsim_error_ = result;
lbt(backtrace_.ptr(), backtrace_.capacity());
#ifdef ERRSIM
type_ = THIS_WORKER.get_module_type();
#else
ObErrsimModuleType tmp_type(ObErrsimModuleType::ERRSIM_MODULE_NONE);
type_ = tmp_type;
#endif
}
} //common
} //oceanbase

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef SRC_COMMON_ERRSIM_OB_TENANT_ERRSIM_MODULE_MGR_H_
#define SRC_COMMON_ERRSIM_OB_TENANT_ERRSIM_MODULE_MGR_H_
#include "lib/ob_define.h"
#include "lib/utility/ob_print_utils.h"
#include "ob_errsim_module_type.h"
#include "lib/hash/ob_hashset.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/lock/ob_bucket_lock.h"
#include "lib/utility/ob_backtrace.h"
namespace oceanbase
{
namespace common
{
struct ObTenantErrsimEvent final
{
ObTenantErrsimEvent();
~ObTenantErrsimEvent() = default;
void reset();
bool is_valid() const;
void build_event(const int32_t result);
typedef ObFixedLengthString<LBT_BUFFER_LENGTH> Lbt;
TO_STRING_KV(K_(timestamp), K_(type), K_(errsim_error), K_(backtrace));
int64_t timestamp_;
ObErrsimModuleType type_;
int32_t errsim_error_;
Lbt backtrace_;
};
} // namespace common
} // namespace oceanbase
#endif

View File

@ -23,6 +23,7 @@
#include "lib/oblog/ob_log.h"
#include "common/ob_smart_var.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "common/errsim_module/ob_errsim_module_interface.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
@ -401,16 +402,33 @@ void* ObTenantCtxAllocator::common_alloc(const int64_t size, const ObMemAttr &at
{
SANITY_DISABLE_CHECK_RANGE(); // prevent sanity_check_range
void *ret = nullptr;
AObject *obj = nullptr;
bool sample_allowed = false;
bool is_errsim = false;
if (!attr.label_.is_valid()) {
LIB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "OB_MOD_DO_NOT_USE_ME ALLOC", K(size));
}
bool sample_allowed = ObMallocSampleLimiter::malloc_sample_allowed(size, attr);
const int64_t alloc_size = sample_allowed ? (size + AOBJECT_BACKTRACE_SIZE) : size;
AObject *obj = allocator.alloc_object(alloc_size, attr);
if (OB_ISNULL(obj) && g_alloc_failed_ctx().need_wash()) {
int64_t total_size = ta.sync_wash();
obj = allocator.alloc_object(alloc_size, attr);
#ifdef ERRSIM
const ObErrsimModuleType type = THIS_WORKER.get_module_type();
if (is_errsim_module(ta.get_tenant_id(), type.type_)) {
//errsim alloc memory failed.
obj = nullptr;
is_errsim = true;
}
#endif
if (OB_UNLIKELY(is_errsim)) {
} else {
sample_allowed = ObMallocSampleLimiter::malloc_sample_allowed(size, attr);
const int64_t alloc_size = sample_allowed ? (size + AOBJECT_BACKTRACE_SIZE) : size;
obj = allocator.alloc_object(alloc_size, attr);
if (OB_ISNULL(obj) && g_alloc_failed_ctx().need_wash()) {
int64_t total_size = ta.sync_wash();
obj = allocator.alloc_object(alloc_size, attr);
}
}
if (NULL != obj) {
obj->on_malloc_sample_ = sample_allowed;
ob_malloc_sample_backtrace(obj, size);
@ -425,7 +443,7 @@ void* ObTenantCtxAllocator::common_alloc(const int64_t size, const ObMemAttr &at
int level = ObFreeLogPrinter::get_level();
ObFreeLogPrinter::get_instance().enable_free_log(attr.tenant_id_,
attr.ctx_id_, level);
const char *msg = alloc_failed_msg();
const char *msg = is_errsim ? "[ERRSIM] errsim inject memory error" : alloc_failed_msg();
LOG_DBA_WARN(OB_ALLOCATE_MEMORY_FAILED, "[OOPS]", "alloc failed reason", KCSTRING(msg));
_OB_LOG_RET(WARN, OB_ALLOCATE_MEMORY_FAILED, "oops, alloc failed, tenant_id=%ld, ctx_id=%ld, ctx_name=%s, ctx_hold=%ld, "
"ctx_limit=%ld, tenant_hold=%ld, tenant_limit=%ld",
@ -448,7 +466,10 @@ void* ObTenantCtxAllocator::common_realloc(const void *ptr, const int64_t size,
if (!attr.label_.is_valid()) {
LIB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "OB_MOD_DO_NOT_USE_ME REALLOC", K(size));
}
AObject *obj = NULL;
bool sample_allowed = false;
bool is_errsim = false;
if (NULL != ptr) {
obj = reinterpret_cast<AObject*>((char*)ptr - AOBJECT_HEADER_SIZE);
abort_unless(obj->is_valid());
@ -458,13 +479,27 @@ void* ObTenantCtxAllocator::common_realloc(const void *ptr, const int64_t size,
SANITY_POISON(obj->data_, obj->alloc_bytes_);
get_mem_leak_checker().on_free(*obj);
}
bool sample_allowed = ObMallocSampleLimiter::malloc_sample_allowed(size, attr);
const int64_t alloc_size = sample_allowed ? (size + AOBJECT_BACKTRACE_SIZE) : size;
obj = allocator.realloc_object(obj, alloc_size, attr);
if (OB_ISNULL(obj) && g_alloc_failed_ctx().need_wash()) {
int64_t total_size = ta.sync_wash();
obj = allocator.realloc_object(obj, alloc_size, attr);
#ifdef ERRSIM
const ObErrsimModuleType type = THIS_WORKER.get_module_type();
if (is_errsim_module(ta.get_tenant_id(), type.type_)) {
//errsim alloc memory failed.
obj = nullptr;
is_errsim = true;
}
#endif
if (OB_UNLIKELY(is_errsim)) {
} else {
sample_allowed = ObMallocSampleLimiter::malloc_sample_allowed(size, attr);
const int64_t alloc_size = sample_allowed ? (size + AOBJECT_BACKTRACE_SIZE) : size;
obj = allocator.realloc_object(obj, alloc_size, attr);
if(OB_ISNULL(obj) && g_alloc_failed_ctx().need_wash()) {
int64_t total_size = ta.sync_wash();
obj = allocator.realloc_object(obj, alloc_size, attr);
}
}
if (obj != NULL) {
obj->on_malloc_sample_ = sample_allowed;
ob_malloc_sample_backtrace(obj, size);
@ -478,7 +513,7 @@ void* ObTenantCtxAllocator::common_realloc(const void *ptr, const int64_t size,
int level = ObFreeLogPrinter::get_level();
ObFreeLogPrinter::get_instance().enable_free_log(attr.tenant_id_,
attr.ctx_id_, level);
const char *msg = alloc_failed_msg();
const char *msg = is_errsim ? "[ERRSIM] errsim inject memory error" : alloc_failed_msg();
LOG_DBA_WARN(OB_ALLOCATE_MEMORY_FAILED, "[OOPS]", "alloc failed reason", KCSTRING(msg));
_OB_LOG_RET(WARN, OB_ALLOCATE_MEMORY_FAILED, "oops, alloc failed, tenant_id=%ld, ctx_id=%ld, ctx_name=%s, ctx_hold=%ld, "
"ctx_limit=%ld, tenant_hold=%ld, tenant_limit=%ld",

View File

@ -22,7 +22,12 @@
using namespace oceanbase::common;
using namespace oceanbase::lib;
OB_SERIALIZE_MEMBER(ObRuntimeContext, compat_mode_);
#ifdef ERRSIM
OB_SERIALIZE_MEMBER(ObRuntimeContext, compat_mode_, module_type_);
#else
OB_SERIALIZE_MEMBER(ObRuntimeContext, compat_mode_);
#endif
namespace oceanbase {
namespace lib {

View File

@ -17,6 +17,7 @@
#include "lib/ob_define.h"
#include "lib/rc/context.h"
#include "lib/runtime.h"
#include "common/errsim_module/ob_errsim_module_type.h"
namespace oceanbase
{
@ -115,6 +116,11 @@ public:
static Worker& self();
static void set_worker_to_thread_local(Worker *worker);
#ifdef ERRSIM
static void set_module_type(const ObErrsimModuleType &module_type);
static ObErrsimModuleType get_module_type();
#endif
public:
static __thread Worker *self_;
@ -213,6 +219,29 @@ private:
Worker::CompatMode last_compat_mode_;
};
#ifdef ERRSIM
//set current errsim module in code snippet and set last errsim module when guard destructor
class ErrsimModuleGuard final
{
public:
ErrsimModuleGuard(ObErrsimModuleType::TYPE type)
{
last_type_ = THIS_WORKER.get_module_type().type_;
ObErrsimModuleType curr_type(type);
THIS_WORKER.set_module_type(curr_type);
}
~ErrsimModuleGuard()
{
ObErrsimModuleType curr_type(last_type_);
THIS_WORKER.set_module_type(curr_type);
}
private:
ObErrsimModuleType::TYPE last_type_;
};
#endif
// used to check compatibility mode.
class ObRuntimeContext
{
@ -222,6 +251,9 @@ public:
: compat_mode_(Worker::CompatMode::MYSQL)
{}
Worker::CompatMode compat_mode_;
#ifdef ERRSIM
ObErrsimModuleType module_type_;
#endif
};
inline ObRuntimeContext &get_ob_runtime_context()
@ -259,6 +291,19 @@ OB_INLINE Worker::CompatMode Worker::get_compatibility_mode()
return get_compat_mode();
}
#ifdef ERRSIM
OB_INLINE void Worker::set_module_type(const ObErrsimModuleType &module_type)
{
get_ob_runtime_context().module_type_ = module_type;
}
OB_INLINE ObErrsimModuleType Worker::get_module_type()
{
return get_ob_runtime_context().module_type_;
}
#endif
} // end of namespace lib
} // end of namespace oceanbase

View File

@ -120,6 +120,10 @@ int ObReqQueue::process_task(void *task)
} else {
ObCurTraceId::set(trace_id);
}
#ifdef ERRSIM
THIS_WORKER.set_module_type(packet.get_module_type());
#endif
// Do not set thread local log level while log level upgrading (OB_LOGGER.is_info_as_wdiag)
if (OB_LOGGER.is_info_as_wdiag()) {
ObThreadLogLevelUtils::clear();

View File

@ -129,7 +129,12 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
ucb_->on_invalid();
RPC_LOG(WARN, "rpc_decode_ob_packet fail", K(ret));
} else if (OB_FALSE_IT(ObCurTraceId::set(ret_pkt->get_trace_id()))) {
} else if (OB_FAIL(ucb_->decode(ret_pkt))) {
}
#ifdef ERRSIM
else if (OB_FALSE_IT(THIS_WORKER.set_module_type(ret_pkt->get_module_type()))) {
}
#endif
else if (OB_FAIL(ucb_->decode(ret_pkt))) {
ucb_->on_invalid();
RPC_LOG(WARN, "ucb.decode fail", K(ret));
} else {

View File

@ -66,6 +66,11 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
timeguard.click();
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size);
void *temp = NULL;
#ifdef ERRSIM
THIS_WORKER.set_module_type(tmp_pkt.get_module_type());
#endif
if (OB_ISNULL(pool)) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "create memory pool failed", K(tenant_id), K(pcode_label));
@ -154,6 +159,9 @@ int ObPocServerHandleContext::resp_error(uint64_t resp_id, int err_code, const c
res_pkt.set_dst_cluster_id(recv_pkt.get_src_cluster_id());
int64_t receive_ts = ObTimeUtility::current_time();
res_pkt.set_request_arrival_time(receive_ts);
#ifdef ERRSIM
res_pkt.set_module_type(recv_pkt.get_module_type());
#endif
}
}
res_pkt.set_resp();

View File

@ -85,6 +85,11 @@ int ObRpcPacketHeader::serialize(char* buf, const int64_t buf_len, int64_t& pos)
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
} else {
//do nothing
#ifdef ERRSIM
if (OB_FAIL(encode_i64(buf, buf_len, pos, static_cast<int64_t>(module_type_.type_)))) {
LOG_WARN("Encode error", K(ret), KP(buf), K(buf_len), K(pos));
}
#endif
}
} else {
ret = OB_BUF_NOT_ENOUGH;
@ -165,6 +170,15 @@ int ObRpcPacketHeader::deserialize(const char* buf, const int64_t data_len, int6
} else if (hlen_ > pos && OB_FAIL(decode_i64(buf, hlen_, pos, reinterpret_cast<int64_t*>(&cluster_name_hash_)))) {
LOG_WARN("Decode error", K(ret), KP(buf), K(hlen_), K(pos));
} else {
#ifdef ERRSIM
int64_t type = 0;
if (hlen_ > pos && OB_FAIL(decode_i64(buf, hlen_, pos, &type))) {
LOG_WARN("Decode error", K(ret), KP(buf), K(hlen_), K(pos));
} else {
module_type_.type_ = static_cast<ObErrsimModuleType::TYPE>(type);
}
#endif
}
ObSequence::update_max_seq_no(seq_no_);
LOG_DEBUG("rpc receive seq_no ", K_(seq_no), K(ObSequence::get_max_seq_no()));

View File

@ -20,6 +20,7 @@
#include "lib/compress/ob_compressor_pool.h"
#include "rpc/obrpc/ob_rpc_time.h"
#include "rpc/ob_packet.h"
#include "common/errsim_module/ob_errsim_module_type.h"
namespace oceanbase
{
@ -138,7 +139,11 @@ private:
class ObRpcPacketHeader
{
public:
#ifdef ERRSIM
static const uint8_t HEADER_SIZE = 144; // add 8 bit for errsim module
#else
static const uint8_t HEADER_SIZE = 136; // 112 -> 128: add 16 bytes for trace_id ipv6 extension. (Note yanyuan.cxf: but you should never change it)
#endif
static const uint16_t RESP_FLAG = 1 << 15;
static const uint16_t STREAM_FLAG = 1 << 14;
static const uint16_t STREAM_LAST_FLAG = 1 << 13;
@ -173,6 +178,10 @@ public:
int32_t group_id_;
uint64_t cluster_name_hash_;
#ifdef ERRSIM
ObErrsimModuleType module_type_;
#endif
int serialize(char* buf, const int64_t buf_len, int64_t& pos);
int deserialize(const char* buf, const int64_t data_len, int64_t& pos);
@ -335,6 +344,11 @@ public:
}
static uint64_t get_self_cluster_name_hash();
#ifdef ERRSIM
inline void set_module_type(const ObErrsimModuleType &module_type);
inline const ObErrsimModuleType get_module_type() const;
#endif
private:
ObRpcPacketHeader hdr_;
const char *cdata_;
@ -855,6 +869,18 @@ inline ObRpcCheckSumCheckLevel get_rpc_checksum_check_level_from_string(
return ret_type;
}
#ifdef ERRSIM
void ObRpcPacket::set_module_type(const ObErrsimModuleType &module_type)
{
hdr_.module_type_ = module_type;
}
const ObErrsimModuleType ObRpcPacket::get_module_type() const
{
return hdr_.module_type_;
}
#endif
} // end of namespace rpc
} // end of namespace oceanbase

View File

@ -294,6 +294,10 @@ int ObRpcProcessorBase::do_response(const Response &rsp)
// The cluster_id of the response must be the src_cluster_id of the request
packet->set_dst_cluster_id(rpc_pkt_->get_src_cluster_id());
#ifdef ERRSIM
packet->set_module_type(THIS_WORKER.get_module_type());
#endif
packet->set_request_arrival_time(req_->get_request_arrival_time());
packet->set_arrival_push_diff(req_->get_arrival_push_diff());
packet->set_push_pop_diff(req_->get_push_pop_diff());

View File

@ -153,6 +153,10 @@ int ObRpcProxy::init_pkt(
pkt->set_trace_id(common::ObCurTraceId::get());
}
#ifdef ERRSIM
pkt->set_module_type(THIS_WORKER.get_module_type());
#endif
if (OB_SUCC(ret)) {
pkt->set_pcode(pcode);
//Assign a channel id to this new packet

View File

@ -89,6 +89,9 @@ int ObRpcRequest::do_flush_buffer(common::ObDataBuffer *buffer, const ObRpcPacke
res_packet->set_session_id(sess_id);
res_packet->set_trace_id(common::ObCurTraceId::get());
res_packet->set_resp();
#ifdef ERRSIM
res_packet->set_module_type(THIS_WORKER.get_module_type());
#endif
res_packet->calc_checksum();
}
RPC_REQ_OP.response_result(this, res_packet);