add standby rpc compression

This commit is contained in:
zhjc1124
2023-05-26 10:11:15 +00:00
committed by ob-robot
parent c5cf21da3e
commit 9077dc1f23
17 changed files with 219 additions and 26 deletions

View File

@ -24,6 +24,11 @@ namespace oceanbase
namespace obrpc
{
const int easy_head_size = 16;
common::ObCompressorType get_proxy_compressor_type(ObRpcProxy& proxy) {
return proxy.get_compressor_type();
}
int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
{
if (PNIO_OK != io_err) {

View File

@ -26,7 +26,7 @@ int64_t calc_extra_payload_size();
int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t pos);
int init_packet(ObRpcProxy& proxy, ObRpcPacket& pkt, ObRpcPacketCode pcode, const ObRpcOpts &opts,
const bool unneed_response);
common::ObCompressorType get_proxy_compressor_type(ObRpcProxy& proxy);
template <typename T>
int rpc_encode_req(
ObRpcProxy& proxy,
@ -45,7 +45,7 @@ template <typename T>
int ret = common::OB_SUCCESS;
ObRpcPacket pkt;
const int64_t header_sz = pkt.get_header_size();
const int64_t payload_sz = calc_extra_payload_size() + common::serialization::encoded_length(args);
int64_t payload_sz = calc_extra_payload_size() + common::serialization::encoded_length(args);
const int64_t reserve_bytes_for_pnio = 0;
char* header_buf = (char*)pool.alloc(reserve_bytes_for_pnio + header_sz + payload_sz) + reserve_bytes_for_pnio;
char* payload_buf = header_buf + header_sz;
@ -64,6 +64,41 @@ template <typename T>
} else if (OB_FAIL(fill_extra_payload(pkt, payload_buf, payload_sz, pos))) {
RPC_OBRPC_LOG(WARN, "fill extra payload fail", K(ret), K(pos), K(payload_sz));
} else {
const common::ObCompressorType &compressor_type = get_proxy_compressor_type(proxy);
bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type);
if (need_compressed) {
// compress
int tmp_ret = OB_SUCCESS;
common::ObCompressor *compressor = NULL;
char *compressed_buf = NULL;
int64_t dst_data_size = 0;
int64_t max_overflow_size = 0;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compressor_type, compressor))) {
RPC_OBRPC_LOG(WARN, "get_compressor failed", K(ret), K(compressor_type));
} else if (OB_FAIL(compressor->get_max_overflow_size(payload_sz, max_overflow_size))) {
RPC_OBRPC_LOG(WARN, "get_max_overflow_size failed", K(ret), K(payload_sz), K(max_overflow_size));
} else if (NULL == (compressed_buf = static_cast<char *>(
common::ob_malloc(payload_sz + max_overflow_size, common::ObModIds::OB_RPC_PROCESSOR)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_OBRPC_LOG(WARN, "Allocate memory failed", K(ret));
} else if (OB_SUCCESS !=
(tmp_ret = compressor->compress(
payload_buf, payload_sz, compressed_buf, payload_sz + max_overflow_size, dst_data_size))) {
RPC_OBRPC_LOG(WARN, "compress failed", K(tmp_ret));
} else if (dst_data_size >= payload_sz) {
} else {
RPC_OBRPC_LOG(DEBUG, "compress request success", K(compressor_type), K(dst_data_size), K(payload_sz));
// replace buf
pkt.set_compressor_type(compressor_type);
pkt.set_original_len(static_cast<int32_t>(payload_sz));
memcpy(payload_buf, compressed_buf, dst_data_size);
payload_sz = dst_data_size;
}
if (NULL != compressed_buf) {
ob_free(compressed_buf);
compressed_buf = NULL;
}
}
int64_t header_pos = 0;
pkt.set_content(payload_buf, payload_sz);
if (OB_FAIL(init_packet(proxy, pkt, pcode, opts, unneed_resp))) {

View File

@ -138,6 +138,7 @@ public:
uint64_t get_tenant() const { return tenant_id_; }
void set_tenant(uint64_t tenant_id) { tenant_id_ = tenant_id; }
int32_t get_group_id() const { return group_id_; }
common::ObCompressorType get_compressor_type() const { return compressor_type_; }
void set_group_id(int32_t group_id) { group_id_ = group_id; }
void set_priv_tenant(uint64_t tenant_id) { priv_tenant_id_ = tenant_id; }
void set_server(const common::ObAddr &dst) { dst_ = dst; }

View File

@ -296,25 +296,59 @@ int ObRpcProxy::AsyncCB<pcodeStruct>::decode(void *pkt)
}
if (OB_SUCC(ret)) {
ObRpcPacket *rpkt = reinterpret_cast<ObRpcPacket*>(pkt);
const char *buf = rpkt->get_cdata();
const int64_t len = rpkt->get_clen();
int64_t pos = 0;
ObRpcPacket *rpkt = reinterpret_cast<ObRpcPacket*>(pkt);
const char *buf = rpkt->get_cdata();
int64_t len = rpkt->get_clen();
int64_t pos = 0;
UNIS_VERSION_GUARD(rpkt->get_unis_version());
char *uncompressed_buf = NULL;
if (OB_FAIL(rpkt->verify_checksum())) {
RPC_OBRPC_LOG(ERROR, "verify checksum fail", K(*rpkt), K(ret));
} else if (OB_FAIL(rcode_.deserialize(buf, len, pos))) {
RPC_OBRPC_LOG(WARN, "decode result code fail", K(*rpkt), K(ret));
} else if (rcode_.rcode_ != OB_SUCCESS) {
// RPC_OBRPC_LOG(WARN, "execute rpc fail", K_(rcode));
} else if (OB_FAIL(result_.deserialize(buf, len, pos))) {
RPC_OBRPC_LOG(WARN, "decode packet fail", K(ret));
} else {
//do nothing
}
if (OB_SUCC(ret)) {
const common::ObCompressorType &compressor_type = rpkt->get_compressor_type();
if (common::INVALID_COMPRESSOR != compressor_type) {
// uncompress
const int32_t original_len = rpkt->get_original_len();
common::ObCompressor *compressor = NULL;
int64_t dst_data_size = 0;
if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, compressor))) {
RPC_OBRPC_LOG(WARN, "get_compressor failed", K(ret), K(compressor_type));
} else if (NULL == (uncompressed_buf =
static_cast<char *>(common::ob_malloc(original_len, common::ObModIds::OB_RPC)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_OBRPC_LOG(WARN, "Allocate memory failed", K(ret));
} else if (OB_FAIL(compressor->decompress(buf, len, uncompressed_buf, original_len, dst_data_size))) {
RPC_OBRPC_LOG(WARN, "decompress failed", K(ret));
} else if (dst_data_size != original_len) {
ret = common::OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "decompress len not match", K(ret), K(dst_data_size), K(original_len));
} else {
RPC_OBRPC_LOG(DEBUG, "uncompress result success", K(compressor_type), K(len), K(original_len));
// replace buf
buf = uncompressed_buf;
len = original_len;
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(rcode_.deserialize(buf, len, pos))) {
RPC_OBRPC_LOG(WARN, "decode result code fail", K(*rpkt), K(ret));
} else if (rcode_.rcode_ != OB_SUCCESS) {
// RPC_OBRPC_LOG(WARN, "execute rpc fail", K_(rcode));
} else if (OB_FAIL(result_.deserialize(buf, len, pos))) {
RPC_OBRPC_LOG(WARN, "decode packet fail", K(ret));
} else {
// do nothing
}
}
// free the uncompress buffer
if (NULL != uncompressed_buf) {
common::ob_free(uncompressed_buf);
uncompressed_buf = NULL;
}
}
return ret;
}
template <class pcodeStruct>
@ -446,7 +480,7 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, const Input &args,
ret = common::OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "decompress len not match", K(ret), K(dst_data_size), K(original_len));
} else {
RPC_OBRPC_LOG(DEBUG, "uncompress result success", K(compressor_type), K(len), K(original_len));
RPC_OBRPC_LOG(INFO, "uncompress result success", K(compressor_type), K(len), K(original_len));
// replace buf
buf = uncompressed_buf;
len = original_len;

View File

@ -249,7 +249,7 @@ OB_DEF_DESERIALIZE(ObCdcReqStartLSNByTsResp)
*
*/
OB_SERIALIZE_MEMBER(ObCdcLSFetchLogReq, rpc_ver_, ls_id_, start_lsn_,
upper_limit_ts_, client_pid_, client_id_, progress_, flag_);
upper_limit_ts_, client_pid_, client_id_, progress_, flag_, compressor_type_);
OB_SERIALIZE_MEMBER(ObCdcFetchStatus,
is_reach_max_lsn_,
is_reach_upper_limit_ts_,
@ -475,7 +475,7 @@ void ObCdcLSFetchLogResp::reset()
*/
OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq::MissLogParam, miss_lsn_);
OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq, rpc_ver_, ls_id_, miss_log_array_,
client_pid_, client_id_, flag_);
client_pid_, client_id_, flag_, compressor_type_);
void ObCdcLSFetchMissLogReq::reset()
{

View File

@ -15,6 +15,7 @@
#include "observer/ob_server_struct.h" // GCTX
#include "share/ob_ls_id.h" // ObLSID
#include "lib/compress/ob_compress_util.h" // ObCompressorType
#include "logservice/palf/lsn.h" // LSN
#include "logservice/palf/log_group_entry.h" // LogGroupEntry
#include "logservice/palf/log_entry.h" // LogEntry
@ -236,6 +237,9 @@ public:
void set_flag(int8_t flag) { flag_ |= flag; }
int8_t get_flag() const { return flag_; }
void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; }
common::ObCompressorType get_compressor_type() const { return compressor_type_; }
TO_STRING_KV(K_(rpc_ver),
K_(ls_id),
K_(start_lsn),
@ -243,7 +247,8 @@ public:
K_(client_pid),
K_(client_id),
K_(progress),
K_(flag));
K_(flag),
K_(compressor_type));
OB_UNIS_VERSION(1);
@ -262,6 +267,7 @@ private:
// server B can hardly locate log in archive.
int64_t progress_;
int8_t flag_;
common::ObCompressorType compressor_type_;
};
// Statistics for LS
@ -478,11 +484,15 @@ public:
void set_flag(int8_t flag) { flag_ |= flag; }
int8_t get_flag() const { return flag_; }
void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; }
common::ObCompressorType get_compressor_type() const { return compressor_type_; }
TO_STRING_KV(K_(rpc_ver),
K_(ls_id),
K_(miss_log_array),
K_(client_pid),
K_(flag));
K_(flag),
K_(compressor_type));
OB_UNIS_VERSION(1);
private:
@ -492,6 +502,7 @@ private:
uint64_t client_pid_; // Process ID.
ObCdcRpcId client_id_;
int8_t flag_;
common::ObCompressorType compressor_type_;
};
} // namespace obrpc

View File

@ -72,6 +72,7 @@ int ObCdcLSFetchLogP::process()
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(ERROR, "cdc_service is null", KR(ret));
} else {
set_result_compress_type(req.get_compressor_type());
ret = cdc_service->fetch_log(req, resp, get_send_timestamp(), get_receive_timestamp());
}
@ -92,6 +93,7 @@ int ObCdcLSFetchMissingLogP::process()
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(ERROR, "cdc_service is null", KR(ret));
} else {
set_result_compress_type(req.get_compressor_type());
ret = cdc_service->fetch_missing_log(req, resp, get_send_timestamp(), get_receive_timestamp());
}

View File

@ -521,6 +521,20 @@ int ObLogFetcher::update_fetching_log_upper_limit(const share::SCN &upper_limit_
return ret;
}
int ObLogFetcher::update_compressor_type(const common::ObCompressorType &compressor_type)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
} else if (OB_FAIL(rpc_.update_compressor_type(compressor_type))) {
LOG_WARN("ObLogRpc update_compressor_type failed", K(compressor_type));
}
return ret;
}
int ObLogFetcher::get_progress_info(ProgressInfo &progress_info)
{
int ret = OB_SUCCESS;

View File

@ -16,6 +16,7 @@
#define OCEANBASE_LOG_FETCHER_LOG_FETCHER_H_
#include "share/scn.h" // SCN
#include "lib/compress/ob_compress_util.h" // ObCompressorType
#include "ob_log_ls_fetch_mgr.h" // ObLogLSFetchMgr
#include "ob_log_fetch_stream_container_mgr.h" // ObFsContainerMgr
#include "ob_log_rpc.h" // ObLogRpc
@ -113,6 +114,8 @@ public:
// @retval Other error codes Failed
virtual int update_fetching_log_upper_limit(const share::SCN &upper_limit_scn) = 0;
virtual int update_compressor_type(const common::ObCompressorType &compressor_type) = 0;
virtual int get_progress_info(ProgressInfo &progress_info) = 0;
virtual int wait_for_all_ls_to_be_removed(const int64_t timeout) = 0;
@ -220,6 +223,8 @@ public:
virtual int update_fetching_log_upper_limit(const share::SCN &upper_limit_scn);
virtual int update_compressor_type(const common::ObCompressorType &compressor_type);
virtual int get_progress_info(ProgressInfo &progress_info);
virtual int wait_for_all_ls_to_be_removed(const int64_t timeout);

View File

@ -20,7 +20,7 @@
#include "ob_log_config.h" // ObLogFetcherConfig
#include "observer/ob_srv_network_frame.h"
#include "logservice/data_dictionary/ob_data_dict_utils.h"
/// The rpc proxy executes the RPC function with two error codes:
/// 1. proxy function return value ret
@ -51,9 +51,11 @@
int64_t max_rpc_proc_time = \
ATOMIC_LOAD(&ObLogRpc::g_rpc_process_handler_time_upper_limit); \
proxy.set_server((SVR)); \
if (OB_FAIL(proxy.dst_cluster_id(cluster_id_).by(tenant_id).group_id(share::OBCG_CDCSERVICE).trace_time(true).timeout((TIMEOUT))\
.max_process_handler_time(static_cast<int32_t>(max_rpc_proc_time))\
.RPC((REQ), (ARG)))) { \
if (OB_FAIL(proxy.dst_cluster_id(cluster_id_).by(tenant_id).group_id(share::OBCG_CDCSERVICE) \
.compressed(ATOMIC_LOAD(&compressor_type_)) \
.trace_time(true).timeout((TIMEOUT))\
.max_process_handler_time(static_cast<int32_t>(max_rpc_proc_time))\
.RPC((REQ), (ARG)))) { \
LOG_ERROR("rpc fail: " #RPC, "tenant_id", tenant_id, "svr", (SVR), "rpc_ret", ret, \
"result_code", proxy.get_result_code().rcode_, "req", (REQ)); \
} \
@ -80,7 +82,8 @@ ObLogRpc::ObLogRpc() :
ssl_key_expired_time_(0),
client_id_(),
cfg_(nullptr),
external_info_val_()
external_info_val_(),
compressor_type_(common::INVALID_COMPRESSOR)
{
external_info_val_[0] = '\0';
}
@ -120,6 +123,7 @@ int ObLogRpc::async_stream_fetch_log(const uint64_t tenant_id,
if (1 == cfg_->test_mode_switch_fetch_mode) {
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_TEST_SWITCH_MODE);
}
req.set_compressor_type(ATOMIC_LOAD(&compressor_type_));
SEND_RPC(async_stream_fetch_log, tenant_id, svr, timeout, req, &cb);
LOG_TRACE("rpc: async fetch stream log", KR(ret), K(svr), K(timeout), K(req));
return ret;
@ -136,6 +140,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id,
if (1 == cfg_->test_mode_force_fetch_archive) {
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE);
}
req.set_compressor_type(ATOMIC_LOAD(&compressor_type_));
SEND_RPC(async_stream_fetch_miss_log, tenant_id, svr, timeout, req, &cb);
LOG_TRACE("rpc: async fetch stream missing_log", KR(ret), K(svr), K(timeout), K(req));
return ret;
@ -184,6 +189,7 @@ void ObLogRpc::destroy()
client_id_.reset();
cfg_ = nullptr;
external_info_val_[0] = '\0';
compressor_type_ = common::INVALID_COMPRESSOR;
}
int ObLogRpc::reload_ssl_config()
@ -278,11 +284,37 @@ void ObLogRpc::configure(const ObLogFetcherConfig &cfg)
LOG_INFO("[CONFIG]", K(rpc_process_handler_time_upper_limit_msec));
}
int ObLogRpc::init_client_id_() {
int ObLogRpc::update_compressor_type(const common::ObCompressorType &compressor_type)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("ObLogRpc is not inited", KR(ret), K(cluster_id_));
} else {
ATOMIC_SET(&compressor_type_, compressor_type);
if (REACH_TIME_INTERVAL_THREAD_LOCAL(10 * _SEC_)) {
const char *compressor_type_name = nullptr;
if (compressor_type_ < common::MAX_COMPRESSOR) {
compressor_type_name = common::all_compressor_name[compressor_type];
}
LOG_INFO("update compressor type success", K_(compressor_type), K(compressor_type_name));
}
}
return ret;
}
int ObLogRpc::init_client_id_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(client_id_.init(getpid(), get_self_addr()))) {
LOG_ERROR("init client id failed", KR(ret));
}
return ret;
}

View File

@ -14,6 +14,7 @@
#define OCEANBASE_LOG_FETCHER_RPC_H_
#include "lib/net/ob_addr.h" // ObAddr
#include "lib/compress/ob_compress_util.h" // ObCompressorType
#include "rpc/obrpc/ob_net_client.h" // ObNetClient
#include "rpc/obrpc/ob_rpc_packet.h" // OB_LOG_OPEN_STREAM
#include "rpc/obrpc/ob_rpc_proxy.h" // ObRpcProxy
@ -102,6 +103,7 @@ public:
const ObLogFetcherConfig &cfg);
void destroy();
int reload_ssl_config();
int update_compressor_type(const common::ObCompressorType &compressor_type);
private:
int init_client_id_();
@ -115,6 +117,7 @@ private:
ObCdcRpcId client_id_;
const ObLogFetcherConfig *cfg_;
char external_info_val_[OB_MAX_CONFIG_VALUE_LEN];
common::ObCompressorType compressor_type_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRpc);

View File

@ -571,6 +571,19 @@ int ObLogRestoreNetDriver::set_restore_log_upper_limit()
return ret;
}
int ObLogRestoreNetDriver::set_compressor_type(const common::ObCompressorType &compressor_type)
{
int ret = OB_SUCCESS;
if (NULL == fetcher_) {
// do nothing
} else if (OB_FAIL(fetcher_->update_compressor_type(compressor_type))) {
LOG_WARN("ObLogFetcher update_compressor_type failed", K(compressor_type));
}
return ret;
}
int ObLogRestoreNetDriver::LogErrHandler::init(storage::ObLSService *ls_svr)
{
int ret = OB_SUCCESS;

View File

@ -16,6 +16,7 @@
#include "lib/container/ob_iarray.h" // Array
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/compress/ob_compress_util.h" // ObCompressorType
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_additional_info_factory.h"
#include "logservice/logfetcher/ob_log_fetcher_err_handler.h"
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_default_factory.h"
@ -81,6 +82,8 @@ public:
// set the max scn can be restored
int set_restore_log_upper_limit();
int set_compressor_type(const common::ObCompressorType &compressor_type);
private:
// TODO LogFetcher如何区分LogRestoreSource变化了, 比如从cluster 1的tenant A, 变为了cluster 2的tenant B
// LogFetcher需要提供接口, 区分不同cluster_id, tenant_id

View File

@ -201,6 +201,8 @@ void ObLogRestoreService::do_thread_task_()
last_normal_work_ts_ = common::ObTimeUtility::fast_current_time();
}
update_restore_upper_limit_();
set_compressor_type_();
}
}
@ -239,6 +241,31 @@ void ObLogRestoreService::update_restore_upper_limit_()
fetch_log_impl_.update_restore_upper_limit();
}
void ObLogRestoreService::set_compressor_type_()
{
int ret = OB_SUCCESS;
logservice::ObLogService* log_service = MTL(logservice::ObLogService*);
palf::PalfOptions options;
common::ObCompressorType compressor_type = INVALID_COMPRESSOR;
if (OB_NOT_NULL(log_service)) {
if (OB_FAIL(log_service->get_palf_options(options))) {
LOG_WARN("log_service get_palf_options failed", KR(ret));
} else {
if (options.compress_options_.enable_transport_compress_) {
compressor_type = options.compress_options_.transport_compress_func_;
fetch_log_impl_.set_compressor_type(compressor_type);
} else {
// close
fetch_log_impl_.set_compressor_type(common::INVALID_COMPRESSOR);
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log_service is nullptr", KR(ret));
}
}
bool ObLogRestoreService::need_schedule_() const
{
return common::ObTimeUtility::fast_current_time() - last_normal_work_ts_ > SCHEDULE_INTERVAL;

View File

@ -80,6 +80,7 @@ private:
void clean_resource_();
void report_error_();
void update_restore_upper_limit_();
void set_compressor_type_();
bool need_schedule_() const;
private:

View File

@ -102,5 +102,10 @@ void ObRemoteFetchLogImpl::update_restore_upper_limit()
{
(void)net_driver_->set_restore_log_upper_limit();
}
void ObRemoteFetchLogImpl::set_compressor_type(const common::ObCompressorType &compressor_type)
{
(void)net_driver_->set_compressor_type(compressor_type);
}
} // namespace logservice
} // namespace oceanbase

View File

@ -16,6 +16,7 @@
#include <cstdint>
#include "common/ob_role.h" // ObRole
#include "lib/utility/ob_macro_utils.h"
#include "lib/compress/ob_compress_util.h"
#include "share/restore/ob_log_restore_source.h"
namespace oceanbase
@ -39,6 +40,7 @@ public:
int do_schedule(const share::ObLogRestoreSourceItem &source);
void clean_resource();
void update_restore_upper_limit();
void set_compressor_type(const common::ObCompressorType &compressor_type);
private:
bool inited_;