Fix load data bug for json type

This commit is contained in:
wjhh2008
2023-12-12 13:42:55 +00:00
committed by ob-robot
parent 29ff204ed8
commit 4235388eaf
5 changed files with 183 additions and 81 deletions

View File

@ -44,6 +44,7 @@
#include "storage/tx_storage/ob_tenant_freezer.h" #include "storage/tx_storage/ob_tenant_freezer.h"
#include "sql/rewrite/ob_transform_utils.h" #include "sql/rewrite/ob_transform_utils.h"
#include "observer/omt/ob_tenant_timezone_mgr.h" #include "observer/omt/ob_tenant_timezone_mgr.h"
#include "share/config/ob_config_helper.h"
using namespace oceanbase::sql; using namespace oceanbase::sql;
using namespace oceanbase::common; using namespace oceanbase::common;
@ -910,26 +911,15 @@ void ObCSVFormats::init(const ObDataInFileStruct &file_formats)
ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr, ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr,
ObBitSet<> &main_string_values, ObBitSet<> &main_string_values,
uint64_t tenant_id) uint64_t tenant_id)
: exec_ctx(allocator, GCTX.session_mgr_), : allocator(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)),
exec_ctx(allocator, GCTX.session_mgr_),
data_buffer(NULL), data_buffer(NULL),
escape_buffer(NULL), escape_buffer(NULL),
calc_tablet_id_expr(NULL), calc_tablet_id_expr(NULL),
datafrag_mgr(main_datafrag_mgr), datafrag_mgr(main_datafrag_mgr),
string_values(main_string_values) string_values(main_string_values)
{ {
const int64_t FILE_BUFFER_SIZE = ObLoadFileBuffer::MAX_BUFFER_SIZE; attr = ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA);
ObMemAttr attr(tenant_id, ObModIds::OB_SQL_LOAD_DATA);
void *buf = NULL;
buf = ob_malloc(FILE_BUFFER_SIZE, attr);
if (OB_NOT_NULL(buf)) {
data_buffer = new(buf) ObLoadFileBuffer(
FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer));
}
buf = ob_malloc(FILE_BUFFER_SIZE, attr);
if (OB_NOT_NULL(buf)) {
escape_buffer = new(buf) ObLoadFileBuffer(
FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer));
}
} }
ObShuffleTaskHandle::~ObShuffleTaskHandle() ObShuffleTaskHandle::~ObShuffleTaskHandle()
@ -942,6 +932,41 @@ ObShuffleTaskHandle::~ObShuffleTaskHandle()
} }
} }
int ObShuffleTaskHandle::expand_buf(const int64_t max_size)
{
int ret = OB_SUCCESS;
int64_t new_size = 0;
if (OB_ISNULL(data_buffer)) {
new_size = ObLoadFileBuffer::MAX_BUFFER_SIZE;
} else {
new_size = (sizeof(ObLoadFileBuffer) + data_buffer->get_buffer_size()) * 2;
}
if (new_size > max_size) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("buffer size not enough", K(ret));
} else {
void *buf1 = NULL;
void *buf2 = NULL;
if (OB_ISNULL(buf1 = ob_malloc(new_size, attr))
|| OB_ISNULL(buf2 = ob_malloc(new_size, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
if (OB_NOT_NULL(data_buffer)) {
ob_free(data_buffer);
}
data_buffer = new(buf1) ObLoadFileBuffer(
new_size - sizeof(ObLoadFileBuffer));
if (OB_NOT_NULL(escape_buffer)) {
ob_free(escape_buffer);
}
escape_buffer = new(buf2) ObLoadFileBuffer(
new_size - sizeof(ObLoadFileBuffer));
}
}
LOG_DEBUG("expand buf to", K(new_size));
return ret;
}
int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -982,6 +1007,7 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle)
return true; return true;
}; };
const int64_t buf_len = handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer);
if (OB_ISNULL(handle) if (OB_ISNULL(handle)
|| OB_ISNULL(handle->data_buffer) || OB_ISNULL(handle->data_buffer)
|| OB_ISNULL(handle->escape_buffer) || OB_ISNULL(handle->escape_buffer)
@ -998,14 +1024,13 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle)
handle->generator.get_insert_exprs().count()))) { handle->generator.get_insert_exprs().count()))) {
LOG_WARN("fail to prealloc", K(ret), LOG_WARN("fail to prealloc", K(ret),
"insert values count", handle->generator.get_insert_exprs().count()); "insert values count", handle->generator.get_insert_exprs().count());
} else if (OB_ISNULL(expr_buf = ob_malloc(ObLoadFileBuffer::MAX_BUFFER_SIZE, } else if (OB_ISNULL(expr_buf = ob_malloc(handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer),
ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) { ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("not enough memory", K(ret)); LOG_WARN("not enough memory", K(ret));
} else { } else {
handle->err_records.reuse(); handle->err_records.reuse();
expr_buffer = new(expr_buf) ObLoadFileBuffer(ObLoadFileBuffer::MAX_BUFFER_SIZE expr_buffer = new(expr_buf) ObLoadFileBuffer(handle->data_buffer->get_buffer_size());
- sizeof(ObLoadFileBuffer));
ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records; ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records;
ObSEArray<ObObj, 32> parse_result; ObSEArray<ObObj, 32> parse_result;
@ -1348,62 +1373,74 @@ int ObLoadDataSPImpl::handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHa
return ret; return ret;
} }
int ObLoadDataSPImpl::next_file_buffer(ToolBox &box, int ObLoadDataSPImpl::next_file_buffer(ObExecContext &ctx,
ObLoadFileBuffer &data_buffer, ToolBox &box,
ObShuffleTaskHandle *handle,
int64_t limit) int64_t limit)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool has_valid_data = false;
//从data_trimer中恢复出上次读取剩下的数据 CK (OB_NOT_NULL(handle) && OB_NOT_NULL(handle->data_buffer));
OZ (box.data_trimer.recover_incomplate_data(data_buffer));
if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) { do {
OZ (box.file_reader.pread(data_buffer.current_ptr(),
data_buffer.get_remain_len(), //从data_trimer中恢复出上次读取剩下的数据
box.read_cursor.file_offset_, OZ (box.data_trimer.recover_incomplate_data(*handle->data_buffer));
box.read_cursor.read_size_));
} else { if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) {
OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_, OZ (box.file_reader.pread(handle->data_buffer->current_ptr(),
data_buffer.get_remain_len(), handle->data_buffer->get_remain_len(),
data_buffer.current_ptr(), box.read_cursor.file_offset_,
box.read_cursor.read_size_)); box.read_cursor.read_size_));
}
if (OB_SUCC(ret)) {
if (OB_UNLIKELY(0 == box.read_cursor.read_size_)) {
box.read_cursor.is_end_file_ = true;
LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor));
} else { } else {
data_buffer.update_pos(box.read_cursor.read_size_); //更新buffer中数据长度 OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_,
int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs(); handle->data_buffer->get_remain_len(),
box.read_cursor.commit_read(); handle->data_buffer->current_ptr(),
int64_t processed_GBs = box.read_cursor.get_total_read_GBs(); box.read_cursor.read_size_));
if (processed_GBs != last_proccessed_GBs) { }
LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs));
if (OB_SUCC(ret)) {
if (OB_UNLIKELY(0 == box.read_cursor.read_size_)) {
box.read_cursor.is_end_file_ = true;
LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor));
} else {
handle->data_buffer->update_pos(box.read_cursor.read_size_); //更新buffer中数据长度
int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs();
box.read_cursor.commit_read();
int64_t processed_GBs = box.read_cursor.get_total_read_GBs();
if (processed_GBs != last_proccessed_GBs) {
LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs));
}
box.job_status->read_bytes_ += box.read_cursor.read_size_;
} }
box.job_status->read_bytes_ += data_buffer.get_data_len();
} }
}
//从buffer中找出完整的行,剩下的备份到 data_trimer //从buffer中找出完整的行,剩下的备份到 data_trimer
if (OB_SUCC(ret) && OB_LIKELY(data_buffer.is_valid())) { if (OB_SUCC(ret) && OB_LIKELY(handle->data_buffer->is_valid())) {
int64_t complete_cnt = limit; int64_t complete_cnt = limit;
int64_t complete_len = 0; int64_t complete_len = 0;
if (OB_FAIL(pre_parse_lines(data_buffer, box.parser, if (OB_FAIL(pre_parse_lines(*handle->data_buffer, box.parser,
box.read_cursor.is_end_file(), box.read_cursor.is_end_file(),
complete_len, complete_cnt))) { complete_len, complete_cnt))) {
LOG_WARN("fail to fast_lines_parse", K(ret)); LOG_WARN("fail to fast_lines_parse", K(ret));
} else if (OB_FAIL(box.data_trimer.backup_incomplate_data(data_buffer, } else if (OB_FAIL(box.data_trimer.backup_incomplate_data(*handle->data_buffer,
complete_len))) { complete_len))) {
LOG_WARN("fail to back up data", K(ret)); LOG_WARN("fail to back up data", K(ret));
} else { } else {
box.data_trimer.commit_line_cnt(complete_cnt); box.data_trimer.commit_line_cnt(complete_cnt);
LOG_DEBUG("LOAD DATA", has_valid_data = complete_cnt > 0;
"split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(), LOG_DEBUG("LOAD DATA",
"incomplate data", box.data_trimer.get_incomplate_data_string()); "split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(),
K(complete_len), K(complete_cnt),
"incomplate data length", box.data_trimer.get_incomplate_data_string().length(),
"incomplate data", box.data_trimer.get_incomplate_data_string());
}
} }
} } while (OB_SUCC(ret) && !has_valid_data && !box.read_cursor.is_end_file_
&& OB_SUCC(handle->expand_buf(box.batch_buffer_size))
&& OB_SUCC(box.data_trimer.expand_buf(ctx.get_allocator())));
return ret; return ret;
} }
@ -1448,7 +1485,7 @@ int ObLoadDataSPImpl::shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_FAIL(box.file_buf_row_num.push_back(box.data_trimer.get_lines_count()))) { if (OB_FAIL(box.file_buf_row_num.push_back(box.data_trimer.get_lines_count()))) {
LOG_WARN("fail to push back", K(ret)); LOG_WARN("fail to push back", K(ret));
} else if (OB_FAIL(next_file_buffer(box, *(handle->data_buffer)))) { } else if (OB_FAIL(next_file_buffer(ctx, box, handle))) {
LOG_WARN("fail get next file buffer", K(ret)); LOG_WARN("fail get next file buffer", K(ret));
} }
} }
@ -1928,8 +1965,7 @@ int ObLoadDataSPImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
while (OB_SUCC(ret) while (OB_SUCC(ret)
&& !box.read_cursor.is_end_file() && !box.read_cursor.is_end_file()
&& box.data_trimer.get_lines_count() < box.ignore_rows) { && box.data_trimer.get_lines_count() < box.ignore_rows) {
box.expr_buffer->reset(); OZ (next_file_buffer(ctx, box, box.temp_handle,
OZ (next_file_buffer(box, *box.expr_buffer,
box.ignore_rows - box.data_trimer.get_lines_count())); box.ignore_rows - box.data_trimer.get_lines_count()));
LOG_DEBUG("LOAD DATA ignore rows", K(box.ignore_rows), K(box.data_trimer.get_lines_count())); LOG_DEBUG("LOAD DATA ignore rows", K(box.ignore_rows), K(box.data_trimer.get_lines_count()));
} }
@ -2005,9 +2041,9 @@ int ObLoadFileDataTrimer::backup_incomplate_data(ObLoadFileBuffer &buffer, int64
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
incomplate_data_len_ = buffer.get_data_len() - valid_data_len; incomplate_data_len_ = buffer.get_data_len() - valid_data_len;
if (incomplate_data_len_ > ObLoadFileBuffer::MAX_BUFFER_SIZE) { if (incomplate_data_len_ > incomplate_data_buf_len_) {
ret = OB_SIZE_OVERFLOW; ret = OB_SIZE_OVERFLOW;
LOG_WARN("size over flow", K(ret), K(incomplate_data_len_)); LOG_WARN("size over flow", K(ret), K(incomplate_data_len_), K(incomplate_data_buf_len_));
} else if (incomplate_data_len_ > 0 && NULL != incomplate_data_) { } else if (incomplate_data_len_ > 0 && NULL != incomplate_data_) {
MEMCPY(incomplate_data_, buffer.begin_ptr() + valid_data_len, incomplate_data_len_); MEMCPY(incomplate_data_, buffer.begin_ptr() + valid_data_len, incomplate_data_len_);
buffer.update_pos(-incomplate_data_len_); buffer.update_pos(-incomplate_data_len_);
@ -2310,18 +2346,31 @@ int ObPartDataFragMgr::update_part_location(ObExecContext &ctx)
return ret; return ret;
} }
int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats) int ObLoadFileDataTrimer::expand_buf(ObIAllocator &allocator)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(incomplate_data_
= static_cast<char*>(allocator.alloc(ObLoadFileBuffer::MAX_BUFFER_SIZE)))) { int64_t new_buf_len = incomplate_data_buf_len_ * (NULL != incomplate_data_ ? 2 : 1);
char *new_buf = NULL;
if (OB_ISNULL(new_buf = static_cast<char*>(allocator.alloc(new_buf_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("no memory", K(ret)); LOG_WARN("no memory", K(ret));
} else {
if (NULL != incomplate_data_) {
MEMCPY(new_buf, incomplate_data_, incomplate_data_len_);
}
incomplate_data_ = new_buf;
incomplate_data_buf_len_ = new_buf_len;
} }
formats_ = formats;
return ret; return ret;
} }
int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats)
{
formats_ = formats;
return expand_buf(allocator);
}
int ObLoadDataSPImpl::ToolBox::release_resources() int ObLoadDataSPImpl::ToolBox::release_resources()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -2424,6 +2473,10 @@ int ObLoadDataSPImpl::ToolBox::release_resources()
common::ObDeviceManager::get_instance().release_device(device_handle_); common::ObDeviceManager::get_instance().release_device(device_handle_);
} }
if (OB_NOT_NULL(temp_handle)) {
temp_handle->~ObShuffleTaskHandle();
}
return ret; return ret;
} }
@ -2825,6 +2878,8 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
int64_t hint_batch_size = 0; int64_t hint_batch_size = 0;
int64_t hint_max_batch_buffer_size = 0;
ObString hint_batch_buffer_size_str;
if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) { if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) {
LOG_WARN("fail to get value", K(ret)); LOG_WARN("fail to get value", K(ret));
} else if (0 == hint_batch_size) { } else if (0 == hint_batch_size) {
@ -2832,7 +2887,22 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
} else { } else {
batch_row_count = std::max(1L, std::min(DEFAULT_BUFFERRED_ROW_COUNT, hint_batch_size)); batch_row_count = std::max(1L, std::min(DEFAULT_BUFFERRED_ROW_COUNT, hint_batch_size));
} }
LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count)); if (OB_SUCC(ret)) {
if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_BUFFER_SIZE, hint_batch_buffer_size_str))) {
LOG_WARN("fail to get value", K(ret));
} else {
bool is_valid = false;
hint_batch_buffer_size_str = hint_batch_buffer_size_str.trim();
if (!hint_batch_buffer_size_str.empty()) {
hint_max_batch_buffer_size = ObConfigCapacityParser::get(to_cstring(hint_batch_buffer_size_str), is_valid);
}
if (!is_valid) {
hint_max_batch_buffer_size = 1L << 30; // 1G
}
batch_buffer_size = MAX(ObLoadFileBuffer::MAX_BUFFER_SIZE, hint_max_batch_buffer_size);
}
}
LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count), K(batch_buffer_size));
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
@ -2882,18 +2952,31 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
} }
*/ */
if (OB_SUCC(ret)) {
if (OB_ISNULL(temp_handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()),
data_frag_mgr, string_type_column_bitset, tenant_id))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc", K(ret));
} else if (OB_FAIL(temp_handle->expand_buf(batch_buffer_size))) {
LOG_WARN("fail to expand buf", K(ret));
}
}
for (int i = 0; OB_SUCC(ret) && i < shuffle_task_controller.get_max_parallelism(); ++i) { for (int i = 0; OB_SUCC(ret) && i < shuffle_task_controller.get_max_parallelism(); ++i) {
ObShuffleTaskHandle *handle = nullptr; ObShuffleTaskHandle *handle = nullptr;
int64_t pos = 0; int64_t pos = 0;
if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()),
data_frag_mgr, string_type_column_bitset, tenant_id)) data_frag_mgr, string_type_column_bitset, tenant_id))) {
|| OB_ISNULL(handle->data_buffer)) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc", K(ret)); LOG_WARN("Failed to alloc", K(ret));
} else { } else {
if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(), if (OB_FAIL(handle->expand_buf(batch_buffer_size))) {
exec_ctx_serialized_data.length(), pos))) { LOG_WARN("fail to expand buf", K(ret));
} else if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(),
exec_ctx_serialized_data.length(), pos))) {
LOG_WARN("fail to deserialize", K(ret)); LOG_WARN("fail to deserialize", K(ret));
} else if (OB_FAIL(handle->parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) { } else if (OB_FAIL(handle->parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) {
LOG_WARN("fail to init parser", K(ret)); LOG_WARN("fail to init parser", K(ret));

View File

@ -384,7 +384,8 @@ struct ObCSVFormats {
class ObLoadFileDataTrimer class ObLoadFileDataTrimer
{ {
public: public:
ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0), lines_cnt_(0) {} ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0),
incomplate_data_buf_len_(ObLoadFileBuffer::MAX_BUFFER_SIZE), lines_cnt_(0) {}
int init(common::ObIAllocator &allocator, const ObCSVFormats &formats); int init(common::ObIAllocator &allocator, const ObCSVFormats &formats);
//for debug //for debug
ObString get_incomplate_data_string() { ObString get_incomplate_data_string() {
@ -396,10 +397,12 @@ public:
bool has_incomplate_data() const { return incomplate_data_len_ > 0; } bool has_incomplate_data() const { return incomplate_data_len_ > 0; }
int64_t get_lines_count() const { return lines_cnt_; } int64_t get_lines_count() const { return lines_cnt_; }
void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; } void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; }
int expand_buf(common::ObIAllocator &allocator);
private: private:
ObCSVFormats formats_;//TODO [load data] change to ObInverseParser(formats) ObCSVFormats formats_;
char *incomplate_data_; char *incomplate_data_;
int64_t incomplate_data_len_; int64_t incomplate_data_len_;
int64_t incomplate_data_buf_len_;
int64_t lines_cnt_; int64_t lines_cnt_;
}; };
@ -491,6 +494,9 @@ struct ObShuffleTaskHandle {
common::ObBitSet<> &main_string_values, common::ObBitSet<> &main_string_values,
uint64_t tenant_id); uint64_t tenant_id);
~ObShuffleTaskHandle(); ~ObShuffleTaskHandle();
int expand_buf(const int64_t max_size);
ObArenaAllocator allocator; ObArenaAllocator allocator;
ObDesExecContext exec_ctx; ObDesExecContext exec_ctx;
ObLoadFileBuffer *data_buffer; ObLoadFileBuffer *data_buffer;
@ -505,6 +511,7 @@ struct ObShuffleTaskHandle {
common::ObBitSet<> &string_values; common::ObBitSet<> &string_values;
ObShuffleResult result; ObShuffleResult result;
ObSEArray<ObParserErrRec, 16> err_records; ObSEArray<ObParserErrRec, 16> err_records;
common::ObMemAttr attr;
TO_STRING_KV("task_id", result.task_id_); TO_STRING_KV("task_id", result.task_id_);
}; };
@ -711,6 +718,7 @@ public:
int64_t num_of_table_column; int64_t num_of_table_column;
int64_t parallel; int64_t parallel;
int64_t batch_row_count; int64_t batch_row_count;
int64_t batch_buffer_size;
int64_t data_frag_mem_usage_limit; //limit = data_frag_mem_usage_limit * MAX_BUFFER_SIZE int64_t data_frag_mem_usage_limit; //limit = data_frag_mem_usage_limit * MAX_BUFFER_SIZE
int64_t file_size; int64_t file_size;
int64_t ignore_rows; int64_t ignore_rows;
@ -737,6 +745,7 @@ public:
common::ObSEArray<int64_t, 1> file_buf_row_num; common::ObSEArray<int64_t, 1> file_buf_row_num;
int64_t last_session_check_ts; int64_t last_session_check_ts;
common::ObSEArray<ObLoadTableColumnDesc, 16> insert_infos; common::ObSEArray<ObLoadTableColumnDesc, 16> insert_infos;
ObShuffleTaskHandle *temp_handle;
//debug values //debug values
int64_t insert_dispatch_rows; int64_t insert_dispatch_rows;
@ -759,7 +768,7 @@ public:
int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt); int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt);
int shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox &box); int shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox &box);
int next_file_buffer(ToolBox &box, ObLoadFileBuffer &data_buffer, int64_t limit = INT64_MAX); int next_file_buffer(ObExecContext &ctx, ToolBox &box, ObShuffleTaskHandle *handle, int64_t limit = INT64_MAX);
int handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHandle &handle); int handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHandle &handle);
int wait_shuffle_task_return(ToolBox &box); int wait_shuffle_task_return(ToolBox &box);

View File

@ -9325,7 +9325,11 @@ READ_CONSISTENCY '(' consistency_level ')'
} }
| LOAD_BATCH_SIZE '(' INTNUM ')' | LOAD_BATCH_SIZE '(' INTNUM ')'
{ {
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 1, $3); malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, NULL);
}
| LOAD_BATCH_SIZE '(' INTNUM ',' STRING_VALUE ')'
{
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, $5);
} }
| DIRECT '(' BOOL_VALUE ',' INTNUM ')' | DIRECT '(' BOOL_VALUE ',' INTNUM ')'
{ {

View File

@ -501,7 +501,7 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node)
break; break;
} }
case T_LOAD_BATCH_SIZE: { case T_LOAD_BATCH_SIZE: {
if (1 != hint_node->num_child_) { if (2 != hint_node->num_child_) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("max concurrent node should have 1 child", K(ret)); LOG_WARN("max concurrent node should have 1 child", K(ret));
} else if (OB_ISNULL(hint_node->children_[0])) { } else if (OB_ISNULL(hint_node->children_[0])) {
@ -510,6 +510,11 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node)
} else if (OB_FAIL(stmt_hints.set_value( } else if (OB_FAIL(stmt_hints.set_value(
ObLoadDataHint::BATCH_SIZE, hint_node->children_[0]->value_))) { ObLoadDataHint::BATCH_SIZE, hint_node->children_[0]->value_))) {
LOG_WARN("fail to set concurrent value", K(ret)); LOG_WARN("fail to set concurrent value", K(ret));
} else if (OB_NOT_NULL(hint_node->children_[1])
&& OB_FAIL(stmt_hints.set_value(ObLoadDataHint::BATCH_BUFFER_SIZE,
ObString(hint_node->children_[1]->str_len_,
hint_node->children_[1]->str_value_)))) {
LOG_WARN("fail to set concurrent value", K(ret));
} }
break; break;
} }

View File

@ -190,6 +190,7 @@ public:
}; };
enum StringHintItem { enum StringHintItem {
LOG_LEVEL, LOG_LEVEL,
BATCH_BUFFER_SIZE,
TOTAL_STRING_ITEM TOTAL_STRING_ITEM
}; };
void reset() void reset()