fix load data bug

This commit is contained in:
wjhh2008 2024-02-18 09:27:06 +00:00 committed by ob-robot
parent d934a2ebd3
commit 2f81e754dc
2 changed files with 24 additions and 14 deletions

View File

@ -932,15 +932,10 @@ ObShuffleTaskHandle::~ObShuffleTaskHandle()
}
}
int ObShuffleTaskHandle::expand_buf(const int64_t max_size)
int ObShuffleTaskHandle::expand_buf(const int64_t max_size, const int64_t to_buffer_size)
{
int ret = OB_SUCCESS;
int64_t new_size = 0;
if (OB_ISNULL(data_buffer)) {
new_size = ObLoadFileBuffer::MAX_BUFFER_SIZE;
} else {
new_size = (sizeof(ObLoadFileBuffer) + data_buffer->get_buffer_size()) * 2;
}
int64_t new_size = to_buffer_size;
if (new_size > max_size) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("buffer size not enough", K(ret));
@ -1379,12 +1374,16 @@ int ObLoadDataSPImpl::next_file_buffer(ObExecContext &ctx,
int64_t limit)
{
int ret = OB_SUCCESS;
bool has_valid_data = false;
bool has_valid_data = true;
CK (OB_NOT_NULL(handle) && OB_NOT_NULL(handle->data_buffer));
do {
if (OB_UNLIKELY(handle->data_buffer->get_struct_size() < box.data_trimer.get_buffer_size())) {
OZ (handle->expand_buf(box.batch_buffer_size, box.data_trimer.get_buffer_size()));
}
//从data_trimer中恢复出上次读取剩下的数据
OZ (box.data_trimer.recover_incomplate_data(*handle->data_buffer));
@ -1431,7 +1430,6 @@ int ObLoadDataSPImpl::next_file_buffer(ObExecContext &ctx,
}
}
} while (OB_SUCC(ret) && !has_valid_data && !box.read_cursor.is_end_file_
&& OB_SUCC(handle->expand_buf(box.batch_buffer_size))
&& OB_SUCC(box.data_trimer.expand_buf(ctx.get_allocator())));
return ret;
}
@ -2020,8 +2018,10 @@ int ObLoadFileDataTrimer::recover_incomplate_data(ObLoadFileBuffer &buffer)
{
int ret = OB_SUCCESS;
char *buf = NULL;
if (OB_ISNULL(buf = buffer.begin_ptr())) {
if (OB_ISNULL(buf = buffer.begin_ptr())
|| OB_UNLIKELY(buffer.get_buffer_size() < incomplate_data_len_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(buffer.get_buffer_size()));
} else if (incomplate_data_len_ > 0) {
MEMCPY(buf, incomplate_data_, incomplate_data_len_);
buffer.update_pos(incomplate_data_len_);
@ -2342,7 +2342,14 @@ int ObLoadFileDataTrimer::expand_buf(ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
int64_t new_buf_len = incomplate_data_buf_len_ * (NULL != incomplate_data_ ? 2 : 1);
int64_t new_buf_len = 0;
if (NULL == incomplate_data_) {
new_buf_len = ObLoadFileBuffer::MAX_BUFFER_SIZE;
} else {
new_buf_len = incomplate_data_buf_len_ * 2;
}
char *new_buf = NULL;
if (OB_ISNULL(new_buf = static_cast<char*>(allocator.alloc(new_buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -2949,7 +2956,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
data_frag_mgr, string_type_column_bitset, tenant_id))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc", K(ret));
} else if (OB_FAIL(temp_handle->expand_buf(batch_buffer_size))) {
} else if (OB_FAIL(temp_handle->expand_buf(batch_buffer_size, ObLoadFileBuffer::MAX_BUFFER_SIZE))) {
LOG_WARN("fail to expand buf", K(ret));
}
}
@ -2963,7 +2970,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc", K(ret));
} else {
if (OB_FAIL(handle->expand_buf(batch_buffer_size))) {
if (OB_FAIL(handle->expand_buf(batch_buffer_size, ObLoadFileBuffer::MAX_BUFFER_SIZE))) {
LOG_WARN("fail to expand buf", K(ret));
} else if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(),
exec_ctx_serialized_data.length(), pos))) {
@ -3067,6 +3074,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
if (OB_SUCC(ret)) {
const int64_t fake_file_size = (file_size > 0) ? file_size : (2 << 30); // use 2G as default in load local mode
int64_t max_task_count = (fake_file_size / ObLoadFileBuffer::MAX_BUFFER_SIZE + 1) * 2;
file_buf_row_num.set_attr(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA));
if (OB_FAIL(file_buf_row_num.reserve(max_task_count))) {
LOG_WARN("fail to reserve", K(ret));
}

View File

@ -350,6 +350,7 @@ public:
int64_t get_remain_len() { return buffer_size_ - pos_; }
int64_t get_data_len() { return pos_; }
int64_t get_buffer_size() { return buffer_size_; }
int64_t get_struct_size() { return sizeof(ObLoadFileBuffer) + buffer_size_; }
int64_t *get_pos() { return &pos_; }
void reset() { pos_ = 0; }
TO_STRING_KV(K_(pos), K_(buffer_size));
@ -399,6 +400,7 @@ public:
int64_t get_lines_count() const { return lines_cnt_; }
void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; }
int expand_buf(common::ObIAllocator &allocator);
int64_t get_buffer_size() const { return incomplate_data_buf_len_; }
private:
ObCSVFormats formats_;
char *incomplate_data_;
@ -496,7 +498,7 @@ struct ObShuffleTaskHandle {
uint64_t tenant_id);
~ObShuffleTaskHandle();
int expand_buf(const int64_t max_size);
int expand_buf(const int64_t max_size, const int64_t to_buffer_size);
ObArenaAllocator allocator;
ObDesExecContext exec_ctx;