Adjust mem attr for load data

This commit is contained in:
wjhh2008
2023-06-08 10:47:46 +00:00
committed by ob-robot
parent a587fa8e2e
commit 9a4cf24b71
4 changed files with 33 additions and 16 deletions

View File

@ -51,14 +51,14 @@ public:
ObArrayHashMap(): lock_(common::ObLatchIds::HASH_MAP_LOCK), size_(0), capacity_(0), items_(NULL) {}
~ObArrayHashMap() { destroy(); }
int init(const lib::ObLabel &label, int64_t capacity)
int init(const lib::ObMemAttr attr, int64_t capacity)
{
int ret = OB_SUCCESS;
if (!label.is_valid() || capacity <= 0) {
if (capacity <= 0) {
ret = OB_INVALID_ARGUMENT;
} else if (is_inited()) {
ret = OB_INIT_TWICE;
} else if (NULL == (items_ = static_cast<Item *>(ob_malloc(capacity * sizeof(Item), label)))) {
} else if (NULL == (items_ = static_cast<Item *>(ob_malloc(capacity * sizeof(Item), attr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
capacity_ = capacity;
@ -67,6 +67,13 @@ public:
return ret;
}
int init(const lib::ObLabel label, int64_t capacity)
{
ObMemAttr attr;
attr.label_ = label;
return init(attr, capacity);
}
void destroy()
{
if (NULL != items_) {
@ -129,7 +136,7 @@ public:
}
}
}
int64_t to_string(char *buf, int64_t buf_len) const
{
int64_t pos = 0;

View File

@ -37,7 +37,13 @@ public:
virtual ~ObSqlString();
bool is_valid() const;
void set_label(const lib::ObLabel &label) { allocator_.set_label(label); }
void set_attr(const lib::ObMemAttr &attr) { allocator_.set_attr(attr); }
void set_label(const lib::ObLabel &label)
{
lib::ObMemAttr attr;
attr.label_ = label;
allocator_.set_attr(attr);
}
void reset();
void reuse();
int reserve(const int64_t size);

View File

@ -906,7 +906,8 @@ void ObCSVFormats::init(const ObDataInFileStruct &file_formats)
}
ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr,
ObBitSet<> &main_string_values)
ObBitSet<> &main_string_values,
uint64_t tenant_id)
: exec_ctx(allocator, GCTX.session_mgr_),
data_buffer(NULL),
escape_buffer(NULL),
@ -915,13 +916,14 @@ ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr,
string_values(main_string_values)
{
const int64_t FILE_BUFFER_SIZE = ObLoadFileBuffer::MAX_BUFFER_SIZE;
ObMemAttr attr(tenant_id, ObModIds::OB_SQL_LOAD_DATA);
void *buf = NULL;
buf = ob_malloc(FILE_BUFFER_SIZE, ObModIds::OB_SQL_LOAD_DATA);
buf = ob_malloc(FILE_BUFFER_SIZE, attr);
if (OB_NOT_NULL(buf)) {
data_buffer = new(buf) ObLoadFileBuffer(
FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer));
}
buf = ob_malloc(FILE_BUFFER_SIZE, ObModIds::OB_SQL_LOAD_DATA);
buf = ob_malloc(FILE_BUFFER_SIZE, attr);
if (OB_NOT_NULL(buf)) {
escape_buffer = new(buf) ObLoadFileBuffer(
FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer));
@ -985,14 +987,14 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(handle));
// } else if (FALSE_IT(handle->exec_ctx.get_allocator().reuse())) {
} else if (OB_FAIL(part_buf_mgr.init(ObModIds::OB_SQL_LOAD_DATA,
} else if (FALSE_IT(tenant_id = handle->exec_ctx.get_my_session()->get_effective_tenant_id())) {
} else if (OB_FAIL(part_buf_mgr.init(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA),
handle->datafrag_mgr.get_total_part_cnt()))) {
LOG_WARN("fail to init part buf mgr", K(ret));
} else if (OB_FAIL(insert_values.prepare_allocate(
handle->generator.get_insert_exprs().count()))) {
LOG_WARN("fail to prealloc", K(ret),
"insert values count", handle->generator.get_insert_exprs().count());
} else if (FALSE_IT(tenant_id = handle->exec_ctx.get_my_session()->get_effective_tenant_id())) {
} else if (OB_ISNULL(expr_buf = ob_malloc(ObLoadFileBuffer::MAX_BUFFER_SIZE,
ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -1161,9 +1163,10 @@ int ObLoadDataSPImpl::exec_insert(ObInsertTask &task, ObInsertResult& result)
int64_t sql_buff_len_init = OB_MALLOC_BIG_BLOCK_SIZE; //2M
int64_t field_buf_len = OB_MAX_VARCHAR_LENGTH;
char *field_buff = NULL;
ObSqlString sql_str(ObModIds::OB_SQL_LOAD_DATA);
ObSEArray<ObString, 1> single_row_values;
ObMemAttr attr(task.tenant_id_, ObModIds::OB_SQL_LOAD_DATA);
ObSqlString sql_str;
ObSEArray<ObString, 1> single_row_values;
sql_str.set_attr(attr);
#ifdef TEST_MODE
delay_process_by_probability(INSERT_TASK_DROP_RATE);
@ -2737,7 +2740,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
} else if (OB_FAIL(field_values_in_file.prepare_allocate(num_of_file_column))) {
LOG_WARN("fail to reserve array", K(ret));
} else if (OB_ISNULL(buf = ob_malloc(ObLoadFileBuffer::MAX_BUFFER_SIZE,
ObModIds::OB_SQL_LOAD_DATA))) {
ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (FALSE_IT(expr_buffer = new(buf) ObLoadFileBuffer(
@ -2868,7 +2871,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
int64_t pos = 0;
if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()),
data_frag_mgr, string_type_column_bitset))
data_frag_mgr, string_type_column_bitset, tenant_id))
|| OB_ISNULL(handle->data_buffer)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc", K(ret));
@ -2933,7 +2936,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(server_last_available_ts.init(ObModIds::OB_SQL_LOAD_DATA, MAX_SERVER_COUNT))) {
if (OB_FAIL(server_last_available_ts.init(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA), MAX_SERVER_COUNT))) {
LOG_WARN("fail to create server map", K(ret));
}
}

View File

@ -488,7 +488,8 @@ struct ObParserErrRec {
struct ObShuffleTaskHandle {
ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr,
common::ObBitSet<> &main_string_values);
common::ObBitSet<> &main_string_values,
uint64_t tenant_id);
~ObShuffleTaskHandle();
ObArenaAllocator allocator;
ObDesExecContext exec_ctx;