Changing the response packet size of remote SQL to 2MB

This commit is contained in:
leslieyuchen
2023-10-31 20:39:28 +00:00
committed by ob-robot
parent 34c3e633b0
commit c91f22632a
5 changed files with 36 additions and 9 deletions

View File

@ -120,7 +120,7 @@ void ObScanner::reuse()
fb_info_.reset();
}
int ObScanner::init()
int ObScanner::init(int64_t mem_size_limit /*= DEFAULT_MAX_SERIALIZE_SIZE*/)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
@ -137,6 +137,7 @@ int ObScanner::init()
// } else {
// is_inited_ = true;
// }
mem_size_limit_ = mem_size_limit;
is_inited_ = true;
}
return ret;
@ -171,7 +172,19 @@ int ObScanner::add_row(const ObNewRow &row)
LOG_WARN("fail to add_row to row store.", K(ret));
} else if (row_store_.get_data_size() > mem_size_limit_) {
LOG_WARN("row store data size", "rowstore_data_size", row_store_.get_data_size(), K_(mem_size_limit), K(ret));
if (OB_FAIL(row_store_.rollback_last_row())) {
if (row_store_.get_row_count() == 1 && row_store_.get_data_size() <= DEFAULT_MAX_SERIALIZE_SIZE) {
/**
* The default size of ObScanner is 64MB.
* Previously, when using ObScanner as an RPC transport carrier,
* the default limit of 64MB was used.
* Now, with remote execution, the unit of RPC packets has been changed to 2MB.
* This may cause previously oversized rows (greater than 2MB) to be unable to be written.
* Therefore, an additional processing is added in the "add_row" function to ensure that
* the row length is within 64MB.
* This allows the row to be written even if it exceeds the memory limit.
* */
LOG_INFO("add a large row, exceeds the memory limit", "row_len", row_store_.get_data_size(), K_(mem_size_limit));
} else if (OB_FAIL(row_store_.rollback_last_row())) {
LOG_WARN("fail to rollback last row", K(ret));
} else {
ret = OB_SIZE_OVERFLOW;
@ -187,7 +200,21 @@ int ObScanner::try_add_row(const common::ObIArray<sql::ObExpr *> &exprs,
int ret = OB_SUCCESS;
row_added = false;
if (OB_FAIL(datum_store_.try_add_row(exprs, ctx, mem_size_limit_, row_added))) {
LOG_WARN("fail to add_row to row store.", K(ret));
LOG_WARN("fail to add_row to chunk datum store.", K(ret));
} else if (!row_added && datum_store_.get_row_cnt() <= 0) {
/**
* The default size of ObScanner is 64MB.
* Previously, when using ObScanner as an RPC transport carrier,
* the default limit of 64MB was used.
* Now, with remote execution, the unit of RPC packets has been changed to 2MB.
* This may cause previously oversized rows (greater than 2MB) to be unable to be written.
* Therefore, an additional processing is added in the "add_row" function to ensure that
* the row length is within 64MB.
* This allows the row to be written even if it exceeds the memory limit.
* */
if (OB_FAIL(datum_store_.try_add_row(exprs, ctx, DEFAULT_MAX_SERIALIZE_SIZE, row_added))) {
LOG_WARN("try to add row to chunk datum store failed", K(ret));
}
}
return ret;

View File

@ -53,7 +53,7 @@ public:
uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
bool use_row_compact = true);
virtual ~ObScanner();
int init();
int init(int64_t mem_size_limit = DEFAULT_MAX_SERIALIZE_SIZE);
void reuse();
void reset();
@ -92,7 +92,7 @@ public:
void set_mem_block_size(int64_t block_size) { row_store_.set_block_size(block_size); }
int64_t get_mem_block_size() const { return row_store_.get_block_size(); }
void set_mem_size_limit(int64_t limit) { mem_size_limit_ = limit; }
//Do not modify the mem_size_limit during the usage of the scanner
int64_t get_mem_size_limit() const { return mem_size_limit_; }
void set_affected_rows(int64_t affacted_rows) { affected_rows_ = affacted_rows; }

View File

@ -842,7 +842,7 @@ int ObRpcRemoteExecuteP::init()
mtl_id);
result_.set_tenant_id(mtl_id);
if (OB_FAIL(result_.init())) {
if (OB_FAIL(result_.init(DEFAULT_MAX_REMOTE_EXEC_PACKET_LENGTH))) {
LOG_WARN("fail to init result", K(ret));
} else {
arg_.set_deserialize_param(exec_ctx_, phy_plan_);
@ -1038,7 +1038,7 @@ int ObRpcRemoteSyncExecuteP::init()
}
if (OB_SUCC(ret)) {
result_.set_tenant_id(MTL_ID());
if (OB_FAIL(result_.init())) {
if (OB_FAIL(result_.init(DEFAULT_MAX_REMOTE_EXEC_PACKET_LENGTH))) {
LOG_WARN("fail to init result", K(ret));
} else if (OB_FAIL(exec_ctx_.create_physical_plan_ctx())) {
LOG_WARN("create physical plan ctx failed", K(ret));

View File

@ -27,6 +27,7 @@ template <typename T>
class ObRemoteBaseExecuteP : public obrpc::ObRpcProcessor<T>
{
public:
static const int64_t DEFAULT_MAX_REMOTE_EXEC_PACKET_LENGTH = (1 << 22) - (1 << 13); //2MB - 8K
ObRemoteBaseExecuteP(const observer::ObGlobalContext &gctx, bool is_execute_remote_plan = false)
: obrpc::ObRpcProcessor<T>(),
gctx_(gctx),

View File

@ -79,7 +79,6 @@ TEST_F(TestScanner, basic_test)
TEST_F(TestScanner, serialization)
{
ObScanner scanner;
scanner.set_mem_size_limit(1024);
scanner.set_affected_rows(10);
scanner.set_last_insert_id_to_client(111);
scanner.set_last_insert_id_session(121);
@ -91,7 +90,7 @@ TEST_F(TestScanner, serialization)
scanner.set_row_duplicated_count(2000);
scanner.set_extend_info("fine,thank you, and you");
scanner.set_is_result_accurate(false);
scanner.init();
ASSERT_EQ(OB_SUCCESS, scanner.init(1024));
ObObj objs[3];
ObNewRow row;