[to #47632376]add pipe row memory limit
This commit is contained in:
@ -2400,6 +2400,13 @@ int ObPLExecState::final(int ret)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret) && func_.get_ret_type().is_composite_type() && result_.is_ext() && func_.is_pipelined()) {
|
||||||
|
tmp_ret = ObUserDefinedType::destruct_obj(result_, ctx_.exec_ctx_->get_my_session());
|
||||||
|
if (OB_SUCCESS != tmp_ret) {
|
||||||
|
LOG_WARN("failed to destruct pl object", K(tmp_ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_NOT_NULL(top_context_)
|
if (OB_NOT_NULL(top_context_)
|
||||||
&& top_context_->get_exec_stack().count() > 0
|
&& top_context_->get_exec_stack().count() > 0
|
||||||
&& top_context_->get_exec_stack().at(
|
&& top_context_->get_exec_stack().at(
|
||||||
|
|||||||
@ -22,11 +22,16 @@ namespace oceanbase
|
|||||||
{
|
{
|
||||||
namespace pl
|
namespace pl
|
||||||
{
|
{
|
||||||
|
/* ObPLAllocator can be dynamically scaled, to reduce memory consumption introduced by deep copy
|
||||||
|
* when curr_.used > next_threshold_ or (src copy size) * 2
|
||||||
|
* we alloc a new mem and reset old
|
||||||
|
*
|
||||||
|
*/
|
||||||
class ObPLAllocator : public common::ObIAllocator
|
class ObPLAllocator : public common::ObIAllocator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObPLAllocator()
|
ObPLAllocator()
|
||||||
: allocator1_(ObModIds::OB_PL_TEMP, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
: allocator1_(ObModIds::OB_PL_TEMP, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), // PLAllocator use current tenant memory set by MTL_ID()
|
||||||
allocator2_(ObModIds::OB_PL_TEMP, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
allocator2_(ObModIds::OB_PL_TEMP, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||||
curr_(&allocator1_),
|
curr_(&allocator1_),
|
||||||
backup_(&allocator2_),
|
backup_(&allocator2_),
|
||||||
@ -55,6 +60,8 @@ public:
|
|||||||
|
|
||||||
ObIAllocator* get_allocator() { return curr_; }
|
ObIAllocator* get_allocator() { return curr_; }
|
||||||
|
|
||||||
|
int64_t get_used() { return allocator1_.used() + allocator2_.used(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const static int PL_ALLOC_THRESHOLD = 1024 * 1024; // 1M
|
const static int PL_ALLOC_THRESHOLD = 1024 * 1024; // 1M
|
||||||
|
|
||||||
|
|||||||
@ -1397,3 +1397,6 @@ DEF_BOOL(_enable_transaction_internal_routing, OB_TENANT_PARAMETER, "True",
|
|||||||
DEF_STR(_load_tde_encrypt_engine, OB_CLUSTER_PARAMETER, "NONE",
|
DEF_STR(_load_tde_encrypt_engine, OB_CLUSTER_PARAMETER, "NONE",
|
||||||
"load the engine that meet the security classification requirement to encrypt data. default NONE",
|
"load the engine that meet the security classification requirement to encrypt data. default NONE",
|
||||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
|
DEF_INT(_pipelined_table_function_memory_limit, OB_TENANT_PARAMETER, "524288000", "[1024,18446744073709551615]",
|
||||||
|
"pipeline table function result set memory size limit. default 524288000 (500M), Range: [1024,18446744073709551615]",
|
||||||
|
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||||
|
|||||||
@ -303,6 +303,7 @@ _ob_trans_rpc_timeout
|
|||||||
_parallel_max_active_sessions
|
_parallel_max_active_sessions
|
||||||
_parallel_min_message_pool
|
_parallel_min_message_pool
|
||||||
_parallel_server_sleep_time
|
_parallel_server_sleep_time
|
||||||
|
_pipelined_table_function_memory_limit
|
||||||
_print_sample_ppm
|
_print_sample_ppm
|
||||||
_private_buffer_size
|
_private_buffer_size
|
||||||
_pushdown_storage_level
|
_pushdown_storage_level
|
||||||
|
|||||||
Reference in New Issue
Block a user