diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index 052a31e098..73879841c6 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -44,6 +44,7 @@ #include "storage/tx_storage/ob_tenant_freezer.h" #include "sql/rewrite/ob_transform_utils.h" #include "observer/omt/ob_tenant_timezone_mgr.h" +#include "share/config/ob_config_helper.h" using namespace oceanbase::sql; using namespace oceanbase::common; @@ -910,26 +911,15 @@ void ObCSVFormats::init(const ObDataInFileStruct &file_formats) ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr, ObBitSet<> &main_string_values, uint64_t tenant_id) - : exec_ctx(allocator, GCTX.session_mgr_), + : allocator(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)), + exec_ctx(allocator, GCTX.session_mgr_), data_buffer(NULL), escape_buffer(NULL), calc_tablet_id_expr(NULL), datafrag_mgr(main_datafrag_mgr), string_values(main_string_values) { - const int64_t FILE_BUFFER_SIZE = ObLoadFileBuffer::MAX_BUFFER_SIZE; - ObMemAttr attr(tenant_id, ObModIds::OB_SQL_LOAD_DATA); - void *buf = NULL; - buf = ob_malloc(FILE_BUFFER_SIZE, attr); - if (OB_NOT_NULL(buf)) { - data_buffer = new(buf) ObLoadFileBuffer( - FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer)); - } - buf = ob_malloc(FILE_BUFFER_SIZE, attr); - if (OB_NOT_NULL(buf)) { - escape_buffer = new(buf) ObLoadFileBuffer( - FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer)); - } + attr = ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA); } ObShuffleTaskHandle::~ObShuffleTaskHandle() @@ -942,6 +932,41 @@ ObShuffleTaskHandle::~ObShuffleTaskHandle() } } +int ObShuffleTaskHandle::expand_buf(const int64_t max_size) +{ + int ret = OB_SUCCESS; + int64_t new_size = 0; + if (OB_ISNULL(data_buffer)) { + new_size = ObLoadFileBuffer::MAX_BUFFER_SIZE; + } else { + new_size = (sizeof(ObLoadFileBuffer) + data_buffer->get_buffer_size()) * 2; + } + if (new_size > max_size) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("buffer size not enough", K(ret)); + } else { + void *buf1 = NULL; + void *buf2 = NULL; + if (OB_ISNULL(buf1 = ob_malloc(new_size, attr)) + || OB_ISNULL(buf2 = ob_malloc(new_size, attr))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + if (OB_NOT_NULL(data_buffer)) { + ob_free(data_buffer); + } + data_buffer = new(buf1) ObLoadFileBuffer( + new_size - sizeof(ObLoadFileBuffer)); + if (OB_NOT_NULL(escape_buffer)) { + ob_free(escape_buffer); + } + escape_buffer = new(buf2) ObLoadFileBuffer( + new_size - sizeof(ObLoadFileBuffer)); + } + } + LOG_DEBUG("expand buf to", K(new_size)); + return ret; +} + int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) { int ret = OB_SUCCESS; @@ -982,6 +1007,7 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) return true; }; + const int64_t buf_len = handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer); if (OB_ISNULL(handle) || OB_ISNULL(handle->data_buffer) || OB_ISNULL(handle->escape_buffer) @@ -998,14 +1024,13 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) handle->generator.get_insert_exprs().count()))) { LOG_WARN("fail to prealloc", K(ret), "insert values count", handle->generator.get_insert_exprs().count()); - } else if (OB_ISNULL(expr_buf = ob_malloc(ObLoadFileBuffer::MAX_BUFFER_SIZE, + } else if (OB_ISNULL(expr_buf = ob_malloc(handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer), ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("not enough memory", K(ret)); } else { handle->err_records.reuse(); - expr_buffer = new(expr_buf) ObLoadFileBuffer(ObLoadFileBuffer::MAX_BUFFER_SIZE - - sizeof(ObLoadFileBuffer)); + expr_buffer = new(expr_buf) ObLoadFileBuffer(handle->data_buffer->get_buffer_size()); ObSEArray err_records; ObSEArray parse_result; @@ -1348,62 +1373,74 @@ int ObLoadDataSPImpl::handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHa return ret; } -int ObLoadDataSPImpl::next_file_buffer(ToolBox &box, - ObLoadFileBuffer &data_buffer, +int ObLoadDataSPImpl::next_file_buffer(ObExecContext &ctx, + ToolBox &box, + ObShuffleTaskHandle *handle, int64_t limit) { int ret = OB_SUCCESS; + bool has_valid_data = false; - //从data_trimer中恢复出上次读取剩下的数据 - OZ (box.data_trimer.recover_incomplate_data(data_buffer)); + CK (OB_NOT_NULL(handle) && OB_NOT_NULL(handle->data_buffer)); - if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) { - OZ (box.file_reader.pread(data_buffer.current_ptr(), - data_buffer.get_remain_len(), - box.read_cursor.file_offset_, - box.read_cursor.read_size_)); - } else { - OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_, - data_buffer.get_remain_len(), - data_buffer.current_ptr(), + do { + + //从data_trimer中恢复出上次读取剩下的数据 + OZ (box.data_trimer.recover_incomplate_data(*handle->data_buffer)); + + if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) { + OZ (box.file_reader.pread(handle->data_buffer->current_ptr(), + handle->data_buffer->get_remain_len(), + box.read_cursor.file_offset_, box.read_cursor.read_size_)); - } - - if (OB_SUCC(ret)) { - if (OB_UNLIKELY(0 == box.read_cursor.read_size_)) { - box.read_cursor.is_end_file_ = true; - LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor)); } else { - data_buffer.update_pos(box.read_cursor.read_size_); //更新buffer中数据长度 - int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs(); - box.read_cursor.commit_read(); - int64_t processed_GBs = box.read_cursor.get_total_read_GBs(); - if (processed_GBs != last_proccessed_GBs) { - LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs)); + OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_, + handle->data_buffer->get_remain_len(), + handle->data_buffer->current_ptr(), + box.read_cursor.read_size_)); + } + + if (OB_SUCC(ret)) { + if (OB_UNLIKELY(0 == box.read_cursor.read_size_)) { + box.read_cursor.is_end_file_ = true; + LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor)); + } else { + handle->data_buffer->update_pos(box.read_cursor.read_size_); //更新buffer中数据长度 + int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs(); + box.read_cursor.commit_read(); + int64_t processed_GBs = box.read_cursor.get_total_read_GBs(); + if (processed_GBs != last_proccessed_GBs) { + LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs)); + } + + box.job_status->read_bytes_ += box.read_cursor.read_size_; } - - box.job_status->read_bytes_ += data_buffer.get_data_len(); } - } - //从buffer中找出完整的行,剩下的备份到 data_trimer - if (OB_SUCC(ret) && OB_LIKELY(data_buffer.is_valid())) { - int64_t complete_cnt = limit; - int64_t complete_len = 0; - if (OB_FAIL(pre_parse_lines(data_buffer, box.parser, - box.read_cursor.is_end_file(), - complete_len, complete_cnt))) { - LOG_WARN("fail to fast_lines_parse", K(ret)); - } else if (OB_FAIL(box.data_trimer.backup_incomplate_data(data_buffer, - complete_len))) { - LOG_WARN("fail to back up data", K(ret)); - } else { - box.data_trimer.commit_line_cnt(complete_cnt); - LOG_DEBUG("LOAD DATA", - "split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(), - "incomplate data", box.data_trimer.get_incomplate_data_string()); + //从buffer中找出完整的行,剩下的备份到 data_trimer + if (OB_SUCC(ret) && OB_LIKELY(handle->data_buffer->is_valid())) { + int64_t complete_cnt = limit; + int64_t complete_len = 0; + if (OB_FAIL(pre_parse_lines(*handle->data_buffer, box.parser, + box.read_cursor.is_end_file(), + complete_len, complete_cnt))) { + LOG_WARN("fail to fast_lines_parse", K(ret)); + } else if (OB_FAIL(box.data_trimer.backup_incomplate_data(*handle->data_buffer, + complete_len))) { + LOG_WARN("fail to back up data", K(ret)); + } else { + box.data_trimer.commit_line_cnt(complete_cnt); + has_valid_data = complete_cnt > 0; + LOG_DEBUG("LOAD DATA", + "split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(), + K(complete_len), K(complete_cnt), + "incomplate data length", box.data_trimer.get_incomplate_data_string().length(), + "incomplate data", box.data_trimer.get_incomplate_data_string()); + } } - } + } while (OB_SUCC(ret) && !has_valid_data && !box.read_cursor.is_end_file_ + && OB_SUCC(handle->expand_buf(box.batch_buffer_size)) + && OB_SUCC(box.data_trimer.expand_buf(ctx.get_allocator()))); return ret; } @@ -1448,7 +1485,7 @@ int ObLoadDataSPImpl::shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox if (OB_SUCC(ret)) { if (OB_FAIL(box.file_buf_row_num.push_back(box.data_trimer.get_lines_count()))) { LOG_WARN("fail to push back", K(ret)); - } else if (OB_FAIL(next_file_buffer(box, *(handle->data_buffer)))) { + } else if (OB_FAIL(next_file_buffer(ctx, box, handle))) { LOG_WARN("fail get next file buffer", K(ret)); } } @@ -1928,8 +1965,7 @@ int ObLoadDataSPImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) while (OB_SUCC(ret) && !box.read_cursor.is_end_file() && box.data_trimer.get_lines_count() < box.ignore_rows) { - box.expr_buffer->reset(); - OZ (next_file_buffer(box, *box.expr_buffer, + OZ (next_file_buffer(ctx, box, box.temp_handle, box.ignore_rows - box.data_trimer.get_lines_count())); LOG_DEBUG("LOAD DATA ignore rows", K(box.ignore_rows), K(box.data_trimer.get_lines_count())); } @@ -2005,9 +2041,9 @@ int ObLoadFileDataTrimer::backup_incomplate_data(ObLoadFileBuffer &buffer, int64 { int ret = OB_SUCCESS; incomplate_data_len_ = buffer.get_data_len() - valid_data_len; - if (incomplate_data_len_ > ObLoadFileBuffer::MAX_BUFFER_SIZE) { + if (incomplate_data_len_ > incomplate_data_buf_len_) { ret = OB_SIZE_OVERFLOW; - LOG_WARN("size over flow", K(ret), K(incomplate_data_len_)); + LOG_WARN("size over flow", K(ret), K(incomplate_data_len_), K(incomplate_data_buf_len_)); } else if (incomplate_data_len_ > 0 && NULL != incomplate_data_) { MEMCPY(incomplate_data_, buffer.begin_ptr() + valid_data_len, incomplate_data_len_); buffer.update_pos(-incomplate_data_len_); @@ -2310,18 +2346,31 @@ int ObPartDataFragMgr::update_part_location(ObExecContext &ctx) return ret; } -int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats) +int ObLoadFileDataTrimer::expand_buf(ObIAllocator &allocator) { int ret = OB_SUCCESS; - if (OB_ISNULL(incomplate_data_ - = static_cast(allocator.alloc(ObLoadFileBuffer::MAX_BUFFER_SIZE)))) { + + int64_t new_buf_len = incomplate_data_buf_len_ * (NULL != incomplate_data_ ? 2 : 1); + char *new_buf = NULL; + if (OB_ISNULL(new_buf = static_cast(allocator.alloc(new_buf_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("no memory", K(ret)); + } else { + if (NULL != incomplate_data_) { + MEMCPY(new_buf, incomplate_data_, incomplate_data_len_); + } + incomplate_data_ = new_buf; + incomplate_data_buf_len_ = new_buf_len; } - formats_ = formats; return ret; } +int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats) +{ + formats_ = formats; + return expand_buf(allocator); +} + int ObLoadDataSPImpl::ToolBox::release_resources() { int ret = OB_SUCCESS; @@ -2424,6 +2473,10 @@ int ObLoadDataSPImpl::ToolBox::release_resources() common::ObDeviceManager::get_instance().release_device(device_handle_); } + if (OB_NOT_NULL(temp_handle)) { + temp_handle->~ObShuffleTaskHandle(); + } + return ret; } @@ -2825,6 +2878,8 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm if (OB_SUCC(ret)) { int64_t hint_batch_size = 0; + int64_t hint_max_batch_buffer_size = 0; + ObString hint_batch_buffer_size_str; if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) { LOG_WARN("fail to get value", K(ret)); } else if (0 == hint_batch_size) { @@ -2832,7 +2887,22 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm } else { batch_row_count = std::max(1L, std::min(DEFAULT_BUFFERRED_ROW_COUNT, hint_batch_size)); } - LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count)); + if (OB_SUCC(ret)) { + if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_BUFFER_SIZE, hint_batch_buffer_size_str))) { + LOG_WARN("fail to get value", K(ret)); + } else { + bool is_valid = false; + hint_batch_buffer_size_str = hint_batch_buffer_size_str.trim(); + if (!hint_batch_buffer_size_str.empty()) { + hint_max_batch_buffer_size = ObConfigCapacityParser::get(to_cstring(hint_batch_buffer_size_str), is_valid); + } + if (!is_valid) { + hint_max_batch_buffer_size = 1L << 30; // 1G + } + batch_buffer_size = MAX(ObLoadFileBuffer::MAX_BUFFER_SIZE, hint_max_batch_buffer_size); + } + } + LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count), K(batch_buffer_size)); } if (OB_SUCC(ret)) { @@ -2882,18 +2952,31 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm } */ + + + if (OB_SUCC(ret)) { + if (OB_ISNULL(temp_handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), + data_frag_mgr, string_type_column_bitset, tenant_id))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("Failed to alloc", K(ret)); + } else if (OB_FAIL(temp_handle->expand_buf(batch_buffer_size))) { + LOG_WARN("fail to expand buf", K(ret)); + } + } + for (int i = 0; OB_SUCC(ret) && i < shuffle_task_controller.get_max_parallelism(); ++i) { ObShuffleTaskHandle *handle = nullptr; int64_t pos = 0; if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), - data_frag_mgr, string_type_column_bitset, tenant_id)) - || OB_ISNULL(handle->data_buffer)) { + data_frag_mgr, string_type_column_bitset, tenant_id))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to alloc", K(ret)); } else { - if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(), - exec_ctx_serialized_data.length(), pos))) { + if (OB_FAIL(handle->expand_buf(batch_buffer_size))) { + LOG_WARN("fail to expand buf", K(ret)); + } else if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(), + exec_ctx_serialized_data.length(), pos))) { LOG_WARN("fail to deserialize", K(ret)); } else if (OB_FAIL(handle->parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) { LOG_WARN("fail to init parser", K(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_impl.h b/src/sql/engine/cmd/ob_load_data_impl.h index b6f5081496..e872ec3025 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.h +++ b/src/sql/engine/cmd/ob_load_data_impl.h @@ -384,7 +384,8 @@ struct ObCSVFormats { class ObLoadFileDataTrimer { public: - ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0), lines_cnt_(0) {} + ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0), + incomplate_data_buf_len_(ObLoadFileBuffer::MAX_BUFFER_SIZE), lines_cnt_(0) {} int init(common::ObIAllocator &allocator, const ObCSVFormats &formats); //for debug ObString get_incomplate_data_string() { @@ -396,10 +397,12 @@ public: bool has_incomplate_data() const { return incomplate_data_len_ > 0; } int64_t get_lines_count() const { return lines_cnt_; } void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; } + int expand_buf(common::ObIAllocator &allocator); private: - ObCSVFormats formats_;//TODO [load data] change to ObInverseParser(formats) + ObCSVFormats formats_; char *incomplate_data_; int64_t incomplate_data_len_; + int64_t incomplate_data_buf_len_; int64_t lines_cnt_; }; @@ -491,6 +494,9 @@ struct ObShuffleTaskHandle { common::ObBitSet<> &main_string_values, uint64_t tenant_id); ~ObShuffleTaskHandle(); + + int expand_buf(const int64_t max_size); + ObArenaAllocator allocator; ObDesExecContext exec_ctx; ObLoadFileBuffer *data_buffer; @@ -505,6 +511,7 @@ struct ObShuffleTaskHandle { common::ObBitSet<> &string_values; ObShuffleResult result; ObSEArray err_records; + common::ObMemAttr attr; TO_STRING_KV("task_id", result.task_id_); }; @@ -711,6 +718,7 @@ public: int64_t num_of_table_column; int64_t parallel; int64_t batch_row_count; + int64_t batch_buffer_size; int64_t data_frag_mem_usage_limit; //limit = data_frag_mem_usage_limit * MAX_BUFFER_SIZE int64_t file_size; int64_t ignore_rows; @@ -737,6 +745,7 @@ public: common::ObSEArray file_buf_row_num; int64_t last_session_check_ts; common::ObSEArray insert_infos; + ObShuffleTaskHandle *temp_handle; //debug values int64_t insert_dispatch_rows; @@ -759,7 +768,7 @@ public: int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt); int shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox &box); - int next_file_buffer(ToolBox &box, ObLoadFileBuffer &data_buffer, int64_t limit = INT64_MAX); + int next_file_buffer(ObExecContext &ctx, ToolBox &box, ObShuffleTaskHandle *handle, int64_t limit = INT64_MAX); int handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHandle &handle); int wait_shuffle_task_return(ToolBox &box); diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index 84d8780008..e8faf4fa5a 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -9325,7 +9325,11 @@ READ_CONSISTENCY '(' consistency_level ')' } | LOAD_BATCH_SIZE '(' INTNUM ')' { - malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 1, $3); + malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, NULL); +} +| LOAD_BATCH_SIZE '(' INTNUM ',' STRING_VALUE ')' +{ + malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, $5); } | DIRECT '(' BOOL_VALUE ',' INTNUM ')' { diff --git a/src/sql/resolver/cmd/ob_load_data_resolver.cpp b/src/sql/resolver/cmd/ob_load_data_resolver.cpp index 4ac39ca337..cc0353bb18 100644 --- a/src/sql/resolver/cmd/ob_load_data_resolver.cpp +++ b/src/sql/resolver/cmd/ob_load_data_resolver.cpp @@ -501,7 +501,7 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node) break; } case T_LOAD_BATCH_SIZE: { - if (1 != hint_node->num_child_) { + if (2 != hint_node->num_child_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("max concurrent node should have 1 child", K(ret)); } else if (OB_ISNULL(hint_node->children_[0])) { @@ -510,6 +510,11 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node) } else if (OB_FAIL(stmt_hints.set_value( ObLoadDataHint::BATCH_SIZE, hint_node->children_[0]->value_))) { LOG_WARN("fail to set concurrent value", K(ret)); + } else if (OB_NOT_NULL(hint_node->children_[1]) + && OB_FAIL(stmt_hints.set_value(ObLoadDataHint::BATCH_BUFFER_SIZE, + ObString(hint_node->children_[1]->str_len_, + hint_node->children_[1]->str_value_)))) { + LOG_WARN("fail to set concurrent value", K(ret)); } break; } diff --git a/src/sql/resolver/cmd/ob_load_data_stmt.h b/src/sql/resolver/cmd/ob_load_data_stmt.h index 680ec6c236..eb3b82e33d 100644 --- a/src/sql/resolver/cmd/ob_load_data_stmt.h +++ b/src/sql/resolver/cmd/ob_load_data_stmt.h @@ -190,6 +190,7 @@ public: }; enum StringHintItem { LOG_LEVEL, + BATCH_BUFFER_SIZE, TOTAL_STRING_ITEM }; void reset()