add _max_rpc_packet_size parameter

This commit is contained in:
liucc1997 2023-07-11 21:18:03 +00:00 committed by ob-robot
parent 1337d35972
commit a8b370a2be
18 changed files with 54 additions and 36 deletions

View File

@ -450,7 +450,7 @@ const int64_t OB_MAX_PACKET_DECODE_TS = 10 * 1000L;
* -----------------------------------
*/
const uint32_t OB_NET_HEADER_LENGTH = 16; // 16 bytes packet header
const uint32_t OB_MAX_RPC_PACKET_LENGTH = (2L << 30) - (1<<20);
const uint32_t OB_MAX_RPC_PACKET_LENGTH = (1L << 24);
const int OB_TBNET_PACKET_FLAG = 0x416e4574;
const int OB_SERVER_ADDR_STR_LEN = 128; //used for buffer size of easy_int_addr_to_str

View File

@ -33,6 +33,9 @@ ObListener* global_ob_listener;
bool __attribute__((weak)) enable_pkt_nio() {
return false;
}
int64_t __attribute__((weak)) get_max_rpc_packet_size() {
return OB_MAX_RPC_PACKET_LENGTH;
}
}; // end namespace obrpc
}; // end namespace oceanbase

View File

@ -77,6 +77,7 @@ private:
extern ObPocRpcServer global_poc_server;
extern ObListener* global_ob_listener;
int64_t get_max_rpc_packet_size();
extern "C" {
int dispatch_to_ob_listener(int accept_fd);
int tranlate_to_ob_error(int err);

View File

@ -21,6 +21,7 @@ namespace oceanbase
{
namespace obrpc
{
extern int64_t get_max_rpc_packet_size();
class ObRpcProxy;
int64_t calc_extra_payload_size();
int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t pos);
@ -54,10 +55,10 @@ template <typename T>
if (NULL == header_buf) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
RPC_OBRPC_LOG(WARN, "alloc buffer fail", K(payload_sz));
} else if (payload_sz > common::OB_MAX_RPC_PACKET_LENGTH) {
} else if (payload_sz > get_max_rpc_packet_size()) {
ret = common::OB_RPC_PACKET_TOO_LONG;
RPC_OBRPC_LOG(WARN, "obrpc packet payload execced its limit",
K(payload_sz), "limit", common::OB_MAX_RPC_PACKET_LENGTH, K(ret));
RPC_OBRPC_LOG(ERROR, "obrpc packet payload execced its limit",
K(payload_sz), "limit", get_max_rpc_packet_size(), K(ret));
} else if (OB_FAIL(common::serialization::encode(
payload_buf, payload_sz, pos, args))) {
RPC_OBRPC_LOG(WARN, "serialize argument fail", K(pos), K(payload_sz), K(ret));

View File

@ -23,6 +23,7 @@
#include "lib/allocator/ob_tc_malloc.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/obrpc/ob_virtual_rpc_protocol_processor.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
#include "common/ob_clock_generator.h"
using namespace oceanbase::common;
@ -97,9 +98,9 @@ int ObRpcNetHandler::try_decode_keepalive(easy_message_t *ms, int &result)
chid = bswap_32(*((uint32_t *)(net_header + 8)));
full_demanded_len = common::OB_NET_HEADER_LENGTH + dlen;
if (dlen > OB_MAX_RPC_PACKET_LENGTH) {
if (dlen > get_max_rpc_packet_size()) {
result = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload exceeds its limit", K(magic), K(result), K(dlen), "limit", OB_MAX_RPC_PACKET_LENGTH);
LOG_WARN("obrpc packet payload exceeds its limit", K(magic), K(result), K(dlen), "limit", get_max_rpc_packet_size());
} else if (recv_len < full_demanded_len) {
//data is not enough
result = 0;

View File

@ -29,6 +29,7 @@
#include "rpc/obrpc/ob_irpc_extra_payload.h"
#include "rpc/obrpc/ob_rpc_processor_base.h"
#include "rpc/obrpc/ob_rpc_net_handler.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
using namespace oceanbase::common;
@ -415,9 +416,9 @@ int ObRpcProcessorBase::part_response(const int retcode, bool is_last)
char *tmp_buf = NULL;
if (OB_FAIL(ret)) {
//do nothing
} else if (content_size + max_overflow_size > common::OB_MAX_PACKET_LENGTH) {
} else if (content_size + max_overflow_size > get_max_rpc_packet_size()) {
ret = common::OB_RPC_PACKET_TOO_LONG;
RPC_OBRPC_LOG(WARN, "response content size bigger than OB_MAX_PACKET_LENGTH", K(ret));
RPC_OBRPC_LOG(ERROR, "response content size bigger than max_rpc_packet_size", K(ret), "limit", get_max_rpc_packet_size());
} else {
/*
* RPC response packet buffer format

View File

@ -177,10 +177,10 @@ int ObRpcProtocolProcessor::resolve_packet_type(ObTimeGuard &timeguard,
// net header length plus packet content length
const uint32_t full_len = OB_NET_HEADER_LENGTH + plen;
if (plen > OB_MAX_RPC_PACKET_LENGTH) {
if (plen > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload exceed its limit",
K(ret), K(plen), "limit", OB_MAX_RPC_PACKET_LENGTH);
K(ret), K(plen), "limit", get_max_rpc_packet_size());
} else if (recv_len < full_len) {
if (NULL != ms->c && NULL != ms->c->sc) {
//ssl will decrypt data into ssl buff, we should not set next_read_len here

View File

@ -96,10 +96,10 @@ int ObRpcProxy::rpc_post(
const int64_t payload = calc_payload_size(0);
ObReqTransport::Request req;
if (OB_FAIL(ret)) {
} else if (payload > OB_MAX_RPC_PACKET_LENGTH) {
} else if (payload > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload execced its limit",
K(ret), K(payload), "limit", OB_MAX_RPC_PACKET_LENGTH);
K(ret), K(payload), "limit", get_max_rpc_packet_size());
} else if (OB_ISNULL(transport_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("transport shoul not be NULL", K(ret));

View File

@ -430,10 +430,10 @@ int ObRpcProxy::rpc_call(ObRpcPacketCode pcode, const Input &args,
ObReqTransport::Request req;
if (OB_FAIL(ret)) {
} else if (payload > OB_MAX_RPC_PACKET_LENGTH) {
} else if (payload > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
RPC_OBRPC_LOG(WARN, "obrpc packet payload execced its limit",
K(payload), "limit", OB_MAX_RPC_PACKET_LENGTH,
K(payload), "limit", get_max_rpc_packet_size(),
K(ret));
} else if (OB_ISNULL(transport_)) {
ret = OB_ERR_UNEXPECTED;
@ -620,10 +620,10 @@ int ObRpcProxy::rpc_post(const typename pcodeStruct::Request &args,
}
if (OB_FAIL(ret)) {
} else if (payload > OB_MAX_RPC_PACKET_LENGTH) {
} else if (payload > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
RPC_OBRPC_LOG(WARN, "obrpc packet payload execced its limit",
K(ret), K(payload), "limit", OB_MAX_RPC_PACKET_LENGTH);
K(ret), K(payload), "limit", get_max_rpc_packet_size());
} else if (OB_ISNULL(transport_)) {
ret = OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(WARN, "transport_ should not be NULL", K(ret), KP_(transport));

View File

@ -297,11 +297,11 @@ int ObVirtualRpcProtocolProcessor::encode_raw_rpc_packet(ObTimeGuard &timeguard,
uint32_t ez_pkt_size = OB_NET_HEADER_LENGTH + rpc_packet_size;
uint32_t ez_rpc_header_size = OB_NET_HEADER_LENGTH + rpc_header_size;
if (ez_pkt_size > OB_MAX_RPC_PACKET_LENGTH) {
if (ez_pkt_size > get_max_rpc_packet_size()) {
// We find out the packet size would beyond max size of limitation
// in OceanBase.
ret = OB_ERR_UNEXPECTED;
LOG_WARN("packet size beyond limit", K(ret), K(ez_pkt_size), "limit", OB_MAX_RPC_PACKET_LENGTH);
LOG_WARN("packet size beyond limit", K(ret), K(ez_pkt_size), "limit", get_max_rpc_packet_size());
} else {
timeguard.click();
ObRpcProxy::PCodeGuard pcode_guard(OB_RPC_STREAM_TEST_ENCODE_RAW_PCODE);
@ -401,9 +401,9 @@ int ObVirtualRpcProtocolProcessor::decode_compressed_packet_data(ObTimeGuard &ti
timeguard.click();
if (OB_FAIL(header.decode(data, data_len, pos))) {
LOG_WARN("failed to decode", K(ret));
} else if (header.origin_size_ > OB_MAX_RPC_PACKET_LENGTH) {
} else if (header.origin_size_ > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(header.origin_size_), "limit", OB_MAX_RPC_PACKET_LENGTH);
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(header.origin_size_), "limit", get_max_rpc_packet_size());
} else {
timeguard.click();
int32_t full_size = header.full_size_;
@ -440,9 +440,9 @@ int ObVirtualRpcProtocolProcessor::decode_compressed_packet_data(ObTimeGuard &ti
timeguard.click();
if (OB_FAIL(header.decode(data, data_len, pos))) {
LOG_WARN("failed to decode HeadPacketHeader", K(ret));
} else if (header.total_data_len_before_compress_ > OB_MAX_RPC_PACKET_LENGTH) {
} else if (header.total_data_len_before_compress_ > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(header.total_data_len_before_compress_), "limit", OB_MAX_RPC_PACKET_LENGTH);
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(header.total_data_len_before_compress_), "limit", get_max_rpc_packet_size());
} else if (OB_UNLIKELY(data_len != header.full_size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid data", K(data_len), K(pos), K(header), K(ret));
@ -612,9 +612,9 @@ int ObVirtualRpcProtocolProcessor::decode_raw_net_rpc_packet(common::ObTimeGuard
ret = OB_ERR_UNEXPECTED;
LOG_WARN("packet magic flag mismatch, close connection", K(ret));
ms->status = EASY_ERROR;
} else if (plen > OB_MAX_RPC_PACKET_LENGTH) {
} else if (plen > get_max_rpc_packet_size()) {
ret = OB_RPC_PACKET_TOO_LONG;
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(plen), "limit", OB_MAX_RPC_PACKET_LENGTH);
LOG_WARN("obrpc packet payload exceed its limit", K(ret), K(plen), "limit", get_max_rpc_packet_size());
} else if (recv_len < full_demanded_len) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid recv_len ", K(recv_len), K(full_demanded_len), K(ret));

View File

@ -3436,9 +3436,9 @@ TEST(TestQueryResult, alloc_memory_if_need)
printf("allocator: %ld, result_buf: %ld\n", query_result.allocator_.total(), query_result.buf_.get_capacity());
}
}
ASSERT_EQ(query_result.buf_.get_capacity(), ObTableQueryResult::MAX_BUF_BLOCK_SIZE * 1);
ASSERT_GT(query_result.allocator_.total(), ObTableQueryResult::MAX_BUF_BLOCK_SIZE * 1);
ASSERT_LE(query_result.allocator_.total(), ObTableQueryResult::MAX_BUF_BLOCK_SIZE * 3);
ASSERT_EQ(query_result.buf_.get_capacity(), ObTableQueryResult::get_max_buf_block_size() * 1);
ASSERT_GT(query_result.allocator_.total(), ObTableQueryResult::get_max_buf_block_size() * 1);
ASSERT_LE(query_result.allocator_.total(), ObTableQueryResult::get_max_buf_block_size() * 3);
}
TEST_F(TestBatchExecute, update_table_with_index_by_lowercase_rowkey)

View File

@ -1035,7 +1035,7 @@ ObHTableFilterOperator::ObHTableFilterOperator(const ObTableQuery &query,
hfilter_(NULL),
batch_size_(query.get_batch()),
max_result_size_(std::min(query.get_max_result_size(),
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
is_first_result_(true)
{
}

View File

@ -161,7 +161,7 @@ public:
last_row_(NULL),
batch_size_(query.get_batch()),
max_result_size_(std::min(query.get_max_result_size(),
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
scan_result_(NULL),
is_first_result_(true),
has_more_rows_(true)
@ -199,7 +199,7 @@ public:
tfilter_(NULL),
batch_size_(query.get_batch()),
max_result_size_(std::min(query.get_max_result_size(),
static_cast<int64_t>(common::OB_MAX_PACKET_BUFFER_LENGTH-1024))),
static_cast<int64_t>(ObTableQueryResult::get_max_packet_buffer_length() - 1024))),
is_first_result_(true),
has_more_rows_(true),
row_idx_(0)

View File

@ -419,6 +419,10 @@ bool enable_pkt_nio() {
&& (!OBSERVER.is_arbitration_mode())
&& GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0;
}
int64_t get_max_rpc_packet_size()
{
return GCONF._max_rpc_packet_size;
}
}
} // end of namespace oceanbase

View File

@ -727,6 +727,9 @@ DEF_INT(rpc_memory_limit_percentage, OB_TENANT_PARAMETER, "0", "[0,100]",
"maximum memory for rpc in a tenant, as a percentage of total tenant memory, "
"and 0 means no limit to rpc memory",
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_CAP(_max_rpc_packet_size, OB_CLUSTER_PARAMETER, "16MB", "[2M,2047M]",
"the max rpc packet size when sending RPC or responding RPC results",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_CAP(standby_fetch_log_bandwidth_limit, OB_CLUSTER_PARAMETER, "0MB", "[0M,10000G]",
"the max bandwidth in bytes per second that can be occupied by the sum of the synchronizing log from primary cluster of all servers in the standby cluster",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -1381,9 +1381,9 @@ int ObTableQueryResult::assign_property_names(const ObIArray<ObString> &other)
int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size)
{
int ret = OB_SUCCESS;
if (need_size <= 0 || need_size > MAX_BUF_BLOCK_SIZE) {
if (need_size <= 0 || need_size > get_max_buf_block_size()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(need_size), LITERAL_K(MAX_BUF_BLOCK_SIZE));
LOG_WARN("invalid argument", K(need_size), LITERAL_K(get_max_buf_block_size()));
} else if (NULL == buf_.get_data()) { // first alloc
int64_t actual_size = 0;
if (need_size <= DEFAULT_BUF_BLOCK_SIZE) {
@ -1399,12 +1399,12 @@ int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size)
buf_.set_data(tmp_buf, actual_size);
}
} else if (buf_.get_remain() < need_size) {
if (need_size + buf_.get_position() > MAX_BUF_BLOCK_SIZE) { // check max buf size when expand buf
if (need_size + buf_.get_position() > get_max_buf_block_size()) { // check max buf size when expand buf
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("will exceed max buf need_size", K(ret), K(need_size), K(buf_.get_position()), LITERAL_K(MAX_BUF_BLOCK_SIZE));
LOG_WARN("will exceed max buf need_size", K(ret), K(need_size), K(buf_.get_position()), LITERAL_K(get_max_buf_block_size()));
} else {
int64_t actual_size = MAX(need_size + buf_.get_position(), 2 * buf_.get_capacity());
actual_size = MIN(actual_size, MAX_BUF_BLOCK_SIZE);
actual_size = MIN(actual_size, get_max_buf_block_size());
char *tmp_buf = static_cast<char*>(allocator_.alloc(actual_size));
if (NULL == tmp_buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -23,6 +23,8 @@
#include "lib/list/ob_dlist.h"
#include "common/ob_common_types.h"
#include "common/ob_range.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
namespace oceanbase
{
namespace common
@ -787,8 +789,9 @@ public:
bool reach_batch_size_or_result_size(const int32_t batch_count,
const int64_t max_result_size);
const common::ObIArray<common::ObString>& get_select_columns() const { return properties_names_; };
static int64_t get_max_packet_buffer_length() { return obrpc::get_max_rpc_packet_size() - (1<<20); }
static int64_t get_max_buf_block_size() { return get_max_packet_buffer_length() - (1024*1024LL); }
private:
static const int64_t MAX_BUF_BLOCK_SIZE = common::OB_MAX_PACKET_BUFFER_LENGTH - (1024*1024LL);
static const int64_t DEFAULT_BUF_BLOCK_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE - (1024*1024LL);
int alloc_buf_if_need(const int64_t size);
private:

View File

@ -294,6 +294,7 @@ _load_tde_encrypt_engine
_log_writer_parallelism
_max_elr_dependent_trx_count
_max_malloc_sample_interval
_max_rpc_packet_size
_max_schema_slot_num
_max_tablet_cnt_per_gb
_memory_large_chunk_cache_size