Fix load data bug for json type
This commit is contained in:
		| @ -44,6 +44,7 @@ | |||||||
| #include "storage/tx_storage/ob_tenant_freezer.h" | #include "storage/tx_storage/ob_tenant_freezer.h" | ||||||
| #include "sql/rewrite/ob_transform_utils.h" | #include "sql/rewrite/ob_transform_utils.h" | ||||||
| #include "observer/omt/ob_tenant_timezone_mgr.h" | #include "observer/omt/ob_tenant_timezone_mgr.h" | ||||||
|  | #include "share/config/ob_config_helper.h" | ||||||
|  |  | ||||||
| using namespace oceanbase::sql; | using namespace oceanbase::sql; | ||||||
| using namespace oceanbase::common; | using namespace oceanbase::common; | ||||||
| @ -910,26 +911,15 @@ void ObCSVFormats::init(const ObDataInFileStruct &file_formats) | |||||||
| ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr, | ObShuffleTaskHandle::ObShuffleTaskHandle(ObDataFragMgr &main_datafrag_mgr, | ||||||
|                                          ObBitSet<> &main_string_values, |                                          ObBitSet<> &main_string_values, | ||||||
|                                          uint64_t tenant_id) |                                          uint64_t tenant_id) | ||||||
|   : exec_ctx(allocator, GCTX.session_mgr_), |   : allocator(ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)), | ||||||
|  |     exec_ctx(allocator, GCTX.session_mgr_), | ||||||
|     data_buffer(NULL), |     data_buffer(NULL), | ||||||
|     escape_buffer(NULL), |     escape_buffer(NULL), | ||||||
|     calc_tablet_id_expr(NULL), |     calc_tablet_id_expr(NULL), | ||||||
|     datafrag_mgr(main_datafrag_mgr), |     datafrag_mgr(main_datafrag_mgr), | ||||||
|     string_values(main_string_values) |     string_values(main_string_values) | ||||||
| { | { | ||||||
|   const int64_t FILE_BUFFER_SIZE = ObLoadFileBuffer::MAX_BUFFER_SIZE; |   attr = ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA); | ||||||
|   ObMemAttr attr(tenant_id, ObModIds::OB_SQL_LOAD_DATA); |  | ||||||
|   void *buf = NULL; |  | ||||||
|   buf = ob_malloc(FILE_BUFFER_SIZE, attr); |  | ||||||
|   if (OB_NOT_NULL(buf)) { |  | ||||||
|     data_buffer = new(buf) ObLoadFileBuffer( |  | ||||||
|           FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer)); |  | ||||||
|   } |  | ||||||
|   buf = ob_malloc(FILE_BUFFER_SIZE, attr); |  | ||||||
|   if (OB_NOT_NULL(buf)) { |  | ||||||
|     escape_buffer = new(buf) ObLoadFileBuffer( |  | ||||||
|           FILE_BUFFER_SIZE - sizeof(ObLoadFileBuffer)); |  | ||||||
|   } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| ObShuffleTaskHandle::~ObShuffleTaskHandle() | ObShuffleTaskHandle::~ObShuffleTaskHandle() | ||||||
| @ -942,6 +932,41 @@ ObShuffleTaskHandle::~ObShuffleTaskHandle() | |||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | int ObShuffleTaskHandle::expand_buf(const int64_t max_size) | ||||||
|  | { | ||||||
|  |   int ret = OB_SUCCESS; | ||||||
|  |   int64_t new_size = 0; | ||||||
|  |   if (OB_ISNULL(data_buffer)) { | ||||||
|  |     new_size = ObLoadFileBuffer::MAX_BUFFER_SIZE; | ||||||
|  |   } else { | ||||||
|  |     new_size = (sizeof(ObLoadFileBuffer) + data_buffer->get_buffer_size()) * 2; | ||||||
|  |   } | ||||||
|  |   if (new_size > max_size) { | ||||||
|  |     ret = OB_SIZE_OVERFLOW; | ||||||
|  |     LOG_WARN("buffer size not enough", K(ret)); | ||||||
|  |   } else { | ||||||
|  |     void *buf1 = NULL; | ||||||
|  |     void *buf2 = NULL; | ||||||
|  |     if (OB_ISNULL(buf1 = ob_malloc(new_size, attr)) | ||||||
|  |         || OB_ISNULL(buf2 = ob_malloc(new_size, attr))) { | ||||||
|  |       ret = OB_ALLOCATE_MEMORY_FAILED; | ||||||
|  |     } else { | ||||||
|  |       if (OB_NOT_NULL(data_buffer)) { | ||||||
|  |         ob_free(data_buffer); | ||||||
|  |       } | ||||||
|  |       data_buffer = new(buf1) ObLoadFileBuffer( | ||||||
|  |             new_size - sizeof(ObLoadFileBuffer)); | ||||||
|  |       if (OB_NOT_NULL(escape_buffer)) { | ||||||
|  |         ob_free(escape_buffer); | ||||||
|  |       } | ||||||
|  |       escape_buffer = new(buf2) ObLoadFileBuffer( | ||||||
|  |             new_size - sizeof(ObLoadFileBuffer)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   LOG_DEBUG("expand buf to", K(new_size)); | ||||||
|  |   return ret; | ||||||
|  | } | ||||||
|  |  | ||||||
| int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) | int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
| @ -982,6 +1007,7 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) | |||||||
|     return true; |     return true; | ||||||
|   }; |   }; | ||||||
|  |  | ||||||
|  |     const int64_t buf_len = handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer); | ||||||
|   if (OB_ISNULL(handle) |   if (OB_ISNULL(handle) | ||||||
|       || OB_ISNULL(handle->data_buffer) |       || OB_ISNULL(handle->data_buffer) | ||||||
|       || OB_ISNULL(handle->escape_buffer) |       || OB_ISNULL(handle->escape_buffer) | ||||||
| @ -998,14 +1024,13 @@ int ObLoadDataSPImpl::exec_shuffle(int64_t task_id, ObShuffleTaskHandle *handle) | |||||||
|                        handle->generator.get_insert_exprs().count()))) { |                        handle->generator.get_insert_exprs().count()))) { | ||||||
|     LOG_WARN("fail to prealloc", K(ret), |     LOG_WARN("fail to prealloc", K(ret), | ||||||
|              "insert values count", handle->generator.get_insert_exprs().count()); |              "insert values count", handle->generator.get_insert_exprs().count()); | ||||||
|   } else if (OB_ISNULL(expr_buf = ob_malloc(ObLoadFileBuffer::MAX_BUFFER_SIZE, |   } else if (OB_ISNULL(expr_buf = ob_malloc(handle->data_buffer->get_buffer_size() + sizeof(ObLoadFileBuffer), | ||||||
|                                             ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) { |                                             ObMemAttr(tenant_id, ObModIds::OB_SQL_LOAD_DATA)))) { | ||||||
|     ret = OB_ALLOCATE_MEMORY_FAILED; |     ret = OB_ALLOCATE_MEMORY_FAILED; | ||||||
|     LOG_WARN("not enough memory", K(ret)); |     LOG_WARN("not enough memory", K(ret)); | ||||||
|   } else { |   } else { | ||||||
|     handle->err_records.reuse(); |     handle->err_records.reuse(); | ||||||
|     expr_buffer = new(expr_buf) ObLoadFileBuffer(ObLoadFileBuffer::MAX_BUFFER_SIZE |     expr_buffer = new(expr_buf) ObLoadFileBuffer(handle->data_buffer->get_buffer_size()); | ||||||
|                                                  - sizeof(ObLoadFileBuffer)); |  | ||||||
|     ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records; |     ObSEArray<ObCSVGeneralParser::LineErrRec, 1> err_records; | ||||||
|     ObSEArray<ObObj, 32> parse_result; |     ObSEArray<ObObj, 32> parse_result; | ||||||
|  |  | ||||||
| @ -1348,24 +1373,30 @@ int ObLoadDataSPImpl::handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHa | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| int ObLoadDataSPImpl::next_file_buffer(ToolBox &box, | int ObLoadDataSPImpl::next_file_buffer(ObExecContext &ctx, | ||||||
|                                        ObLoadFileBuffer &data_buffer, |                                        ToolBox &box, | ||||||
|  |                                        ObShuffleTaskHandle *handle, | ||||||
|                                        int64_t limit) |                                        int64_t limit) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|  |   bool has_valid_data = false; | ||||||
|  |  | ||||||
|  |   CK (OB_NOT_NULL(handle) && OB_NOT_NULL(handle->data_buffer)); | ||||||
|  |  | ||||||
|  |   do { | ||||||
|  |  | ||||||
|     //从data_trimer中恢复出上次读取剩下的数据 |     //从data_trimer中恢复出上次读取剩下的数据 | ||||||
|   OZ (box.data_trimer.recover_incomplate_data(data_buffer)); |     OZ (box.data_trimer.recover_incomplate_data(*handle->data_buffer)); | ||||||
|  |  | ||||||
|     if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) { |     if (ObLoadFileLocation::SERVER_DISK == box.load_file_storage) { | ||||||
|     OZ (box.file_reader.pread(data_buffer.current_ptr(), |       OZ (box.file_reader.pread(handle->data_buffer->current_ptr(), | ||||||
|                               data_buffer.get_remain_len(), |                                 handle->data_buffer->get_remain_len(), | ||||||
|                                 box.read_cursor.file_offset_, |                                 box.read_cursor.file_offset_, | ||||||
|                                 box.read_cursor.read_size_)); |                                 box.read_cursor.read_size_)); | ||||||
|     } else { |     } else { | ||||||
|       OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_, |       OZ (box.device_handle_->pread(box.fd_, box.read_cursor.file_offset_, | ||||||
|                                 data_buffer.get_remain_len(), |                                   handle->data_buffer->get_remain_len(), | ||||||
|                                 data_buffer.current_ptr(), |                                   handle->data_buffer->current_ptr(), | ||||||
|                                   box.read_cursor.read_size_)); |                                   box.read_cursor.read_size_)); | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @ -1374,7 +1405,7 @@ int ObLoadDataSPImpl::next_file_buffer(ToolBox &box, | |||||||
|         box.read_cursor.is_end_file_ = true; |         box.read_cursor.is_end_file_ = true; | ||||||
|         LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor)); |         LOG_DEBUG("LOAD DATA reach file end", K(box.read_cursor)); | ||||||
|       } else { |       } else { | ||||||
|       data_buffer.update_pos(box.read_cursor.read_size_); //更新buffer中数据长度 |         handle->data_buffer->update_pos(box.read_cursor.read_size_); //更新buffer中数据长度 | ||||||
|         int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs(); |         int64_t last_proccessed_GBs = box.read_cursor.get_total_read_GBs(); | ||||||
|         box.read_cursor.commit_read(); |         box.read_cursor.commit_read(); | ||||||
|         int64_t processed_GBs = box.read_cursor.get_total_read_GBs(); |         int64_t processed_GBs = box.read_cursor.get_total_read_GBs(); | ||||||
| @ -1382,28 +1413,34 @@ int ObLoadDataSPImpl::next_file_buffer(ToolBox &box, | |||||||
|           LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs)); |           LOG_INFO("LOAD DATA file read progress: ", K(processed_GBs)); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|       box.job_status->read_bytes_ += data_buffer.get_data_len(); |         box.job_status->read_bytes_ += box.read_cursor.read_size_; | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     //从buffer中找出完整的行,剩下的备份到 data_trimer |     //从buffer中找出完整的行,剩下的备份到 data_trimer | ||||||
|   if (OB_SUCC(ret) && OB_LIKELY(data_buffer.is_valid())) { |     if (OB_SUCC(ret) && OB_LIKELY(handle->data_buffer->is_valid())) { | ||||||
|       int64_t complete_cnt = limit; |       int64_t complete_cnt = limit; | ||||||
|       int64_t complete_len = 0; |       int64_t complete_len = 0; | ||||||
|     if (OB_FAIL(pre_parse_lines(data_buffer, box.parser, |       if (OB_FAIL(pre_parse_lines(*handle->data_buffer, box.parser, | ||||||
|                                   box.read_cursor.is_end_file(), |                                   box.read_cursor.is_end_file(), | ||||||
|                                   complete_len, complete_cnt))) { |                                   complete_len, complete_cnt))) { | ||||||
|         LOG_WARN("fail to fast_lines_parse", K(ret)); |         LOG_WARN("fail to fast_lines_parse", K(ret)); | ||||||
|     } else if (OB_FAIL(box.data_trimer.backup_incomplate_data(data_buffer, |       } else if (OB_FAIL(box.data_trimer.backup_incomplate_data(*handle->data_buffer, | ||||||
|                                                                 complete_len))) { |                                                                 complete_len))) { | ||||||
|         LOG_WARN("fail to back up data", K(ret)); |         LOG_WARN("fail to back up data", K(ret)); | ||||||
|       } else { |       } else { | ||||||
|         box.data_trimer.commit_line_cnt(complete_cnt); |         box.data_trimer.commit_line_cnt(complete_cnt); | ||||||
|  |         has_valid_data = complete_cnt > 0; | ||||||
|         LOG_DEBUG("LOAD DATA", |         LOG_DEBUG("LOAD DATA", | ||||||
|             "split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(), |             "split offset", box.read_cursor.file_offset_ - box.data_trimer.get_incomplate_data_string().length(), | ||||||
|  |             K(complete_len), K(complete_cnt), | ||||||
|  |             "incomplate data length", box.data_trimer.get_incomplate_data_string().length(), | ||||||
|             "incomplate data", box.data_trimer.get_incomplate_data_string()); |             "incomplate data", box.data_trimer.get_incomplate_data_string()); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |   } while (OB_SUCC(ret) && !has_valid_data && !box.read_cursor.is_end_file_ | ||||||
|  |            && OB_SUCC(handle->expand_buf(box.batch_buffer_size)) | ||||||
|  |            && OB_SUCC(box.data_trimer.expand_buf(ctx.get_allocator()))); | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| @ -1448,7 +1485,7 @@ int ObLoadDataSPImpl::shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox | |||||||
|     if (OB_SUCC(ret)) { |     if (OB_SUCC(ret)) { | ||||||
|       if (OB_FAIL(box.file_buf_row_num.push_back(box.data_trimer.get_lines_count()))) { |       if (OB_FAIL(box.file_buf_row_num.push_back(box.data_trimer.get_lines_count()))) { | ||||||
|         LOG_WARN("fail to push back", K(ret)); |         LOG_WARN("fail to push back", K(ret)); | ||||||
|       } else if (OB_FAIL(next_file_buffer(box, *(handle->data_buffer)))) { |       } else if (OB_FAIL(next_file_buffer(ctx, box, handle))) { | ||||||
|         LOG_WARN("fail get next file buffer", K(ret)); |         LOG_WARN("fail get next file buffer", K(ret)); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
| @ -1928,8 +1965,7 @@ int ObLoadDataSPImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) | |||||||
|     while (OB_SUCC(ret) |     while (OB_SUCC(ret) | ||||||
|            && !box.read_cursor.is_end_file() |            && !box.read_cursor.is_end_file() | ||||||
|            && box.data_trimer.get_lines_count() < box.ignore_rows) { |            && box.data_trimer.get_lines_count() < box.ignore_rows) { | ||||||
|       box.expr_buffer->reset(); |       OZ (next_file_buffer(ctx, box, box.temp_handle, | ||||||
|       OZ (next_file_buffer(box, *box.expr_buffer, |  | ||||||
|                            box.ignore_rows - box.data_trimer.get_lines_count())); |                            box.ignore_rows - box.data_trimer.get_lines_count())); | ||||||
|       LOG_DEBUG("LOAD DATA ignore rows", K(box.ignore_rows), K(box.data_trimer.get_lines_count())); |       LOG_DEBUG("LOAD DATA ignore rows", K(box.ignore_rows), K(box.data_trimer.get_lines_count())); | ||||||
|     } |     } | ||||||
| @ -2005,9 +2041,9 @@ int ObLoadFileDataTrimer::backup_incomplate_data(ObLoadFileBuffer &buffer, int64 | |||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   incomplate_data_len_ = buffer.get_data_len() - valid_data_len; |   incomplate_data_len_ = buffer.get_data_len() - valid_data_len; | ||||||
|   if (incomplate_data_len_ > ObLoadFileBuffer::MAX_BUFFER_SIZE) { |   if (incomplate_data_len_ > incomplate_data_buf_len_) { | ||||||
|     ret = OB_SIZE_OVERFLOW; |     ret = OB_SIZE_OVERFLOW; | ||||||
|     LOG_WARN("size over flow", K(ret), K(incomplate_data_len_)); |     LOG_WARN("size over flow", K(ret), K(incomplate_data_len_), K(incomplate_data_buf_len_)); | ||||||
|   } else if (incomplate_data_len_ > 0 && NULL != incomplate_data_) { |   } else if (incomplate_data_len_ > 0 && NULL != incomplate_data_) { | ||||||
|     MEMCPY(incomplate_data_, buffer.begin_ptr() + valid_data_len, incomplate_data_len_); |     MEMCPY(incomplate_data_, buffer.begin_ptr() + valid_data_len, incomplate_data_len_); | ||||||
|     buffer.update_pos(-incomplate_data_len_); |     buffer.update_pos(-incomplate_data_len_); | ||||||
| @ -2310,18 +2346,31 @@ int ObPartDataFragMgr::update_part_location(ObExecContext &ctx) | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats) | int ObLoadFileDataTrimer::expand_buf(ObIAllocator &allocator) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   if (OB_ISNULL(incomplate_data_ |  | ||||||
|                 = static_cast<char*>(allocator.alloc(ObLoadFileBuffer::MAX_BUFFER_SIZE)))) { |   int64_t new_buf_len = incomplate_data_buf_len_ * (NULL != incomplate_data_ ? 2 : 1); | ||||||
|  |   char *new_buf = NULL; | ||||||
|  |   if (OB_ISNULL(new_buf = static_cast<char*>(allocator.alloc(new_buf_len)))) { | ||||||
|     ret = OB_ALLOCATE_MEMORY_FAILED; |     ret = OB_ALLOCATE_MEMORY_FAILED; | ||||||
|     LOG_WARN("no memory", K(ret)); |     LOG_WARN("no memory", K(ret)); | ||||||
|  |   } else { | ||||||
|  |     if (NULL != incomplate_data_) { | ||||||
|  |       MEMCPY(new_buf, incomplate_data_, incomplate_data_len_); | ||||||
|  |     } | ||||||
|  |     incomplate_data_ = new_buf; | ||||||
|  |     incomplate_data_buf_len_ = new_buf_len; | ||||||
|   } |   } | ||||||
|   formats_ = formats; |  | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | int ObLoadFileDataTrimer::init(ObIAllocator &allocator, const ObCSVFormats &formats) | ||||||
|  | { | ||||||
|  |   formats_ = formats; | ||||||
|  |   return expand_buf(allocator); | ||||||
|  | } | ||||||
|  |  | ||||||
| int ObLoadDataSPImpl::ToolBox::release_resources() | int ObLoadDataSPImpl::ToolBox::release_resources() | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
| @ -2424,6 +2473,10 @@ int ObLoadDataSPImpl::ToolBox::release_resources() | |||||||
|     common::ObDeviceManager::get_instance().release_device(device_handle_); |     common::ObDeviceManager::get_instance().release_device(device_handle_); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  |   if (OB_NOT_NULL(temp_handle)) { | ||||||
|  |     temp_handle->~ObShuffleTaskHandle(); | ||||||
|  |   } | ||||||
|  |  | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| @ -2825,6 +2878,8 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm | |||||||
|  |  | ||||||
|   if (OB_SUCC(ret)) { |   if (OB_SUCC(ret)) { | ||||||
|     int64_t hint_batch_size = 0; |     int64_t hint_batch_size = 0; | ||||||
|  |     int64_t hint_max_batch_buffer_size = 0; | ||||||
|  |     ObString hint_batch_buffer_size_str; | ||||||
|     if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) { |     if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_SIZE, hint_batch_size))) { | ||||||
|       LOG_WARN("fail to get value", K(ret)); |       LOG_WARN("fail to get value", K(ret)); | ||||||
|     } else if (0 == hint_batch_size) { |     } else if (0 == hint_batch_size) { | ||||||
| @ -2832,7 +2887,22 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm | |||||||
|     } else { |     } else { | ||||||
|       batch_row_count = std::max(1L, std::min(DEFAULT_BUFFERRED_ROW_COUNT, hint_batch_size)); |       batch_row_count = std::max(1L, std::min(DEFAULT_BUFFERRED_ROW_COUNT, hint_batch_size)); | ||||||
|     } |     } | ||||||
|     LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count)); |     if (OB_SUCC(ret)) { | ||||||
|  |       if (OB_FAIL(hint.get_value(ObLoadDataHint::BATCH_BUFFER_SIZE, hint_batch_buffer_size_str))) { | ||||||
|  |         LOG_WARN("fail to get value", K(ret)); | ||||||
|  |       } else { | ||||||
|  |         bool is_valid = false; | ||||||
|  |         hint_batch_buffer_size_str = hint_batch_buffer_size_str.trim(); | ||||||
|  |         if (!hint_batch_buffer_size_str.empty()) { | ||||||
|  |           hint_max_batch_buffer_size = ObConfigCapacityParser::get(to_cstring(hint_batch_buffer_size_str), is_valid); | ||||||
|  |         } | ||||||
|  |         if (!is_valid) { | ||||||
|  |           hint_max_batch_buffer_size = 1L << 30; // 1G | ||||||
|  |         } | ||||||
|  |         batch_buffer_size = MAX(ObLoadFileBuffer::MAX_BUFFER_SIZE, hint_max_batch_buffer_size); | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |     LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count), K(batch_buffer_size)); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (OB_SUCC(ret)) { |   if (OB_SUCC(ret)) { | ||||||
| @ -2882,17 +2952,30 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm | |||||||
|     } |     } | ||||||
| */ | */ | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     if (OB_SUCC(ret)) { | ||||||
|  |       if (OB_ISNULL(temp_handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), | ||||||
|  |                                           data_frag_mgr, string_type_column_bitset, tenant_id))) { | ||||||
|  |         ret = OB_ALLOCATE_MEMORY_FAILED; | ||||||
|  |         LOG_WARN("Failed to alloc", K(ret)); | ||||||
|  |       } else if (OB_FAIL(temp_handle->expand_buf(batch_buffer_size))) { | ||||||
|  |         LOG_WARN("fail to expand buf", K(ret)); | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     for (int i = 0; OB_SUCC(ret) && i < shuffle_task_controller.get_max_parallelism(); ++i) { |     for (int i = 0; OB_SUCC(ret) && i < shuffle_task_controller.get_max_parallelism(); ++i) { | ||||||
|       ObShuffleTaskHandle *handle = nullptr; |       ObShuffleTaskHandle *handle = nullptr; | ||||||
|       int64_t pos = 0; |       int64_t pos = 0; | ||||||
|  |  | ||||||
|       if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), |       if (OB_ISNULL(handle = OB_NEWx(ObShuffleTaskHandle, (&ctx.get_allocator()), | ||||||
|                                      data_frag_mgr, string_type_column_bitset, tenant_id)) |                                      data_frag_mgr, string_type_column_bitset, tenant_id))) { | ||||||
|           || OB_ISNULL(handle->data_buffer)) { |  | ||||||
|         ret = OB_ALLOCATE_MEMORY_FAILED; |         ret = OB_ALLOCATE_MEMORY_FAILED; | ||||||
|         LOG_WARN("Failed to alloc", K(ret)); |         LOG_WARN("Failed to alloc", K(ret)); | ||||||
|       } else { |       } else { | ||||||
|         if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(), |         if (OB_FAIL(handle->expand_buf(batch_buffer_size))) { | ||||||
|  |           LOG_WARN("fail to expand buf", K(ret)); | ||||||
|  |         } else if (OB_FAIL(handle->exec_ctx.deserialize(exec_ctx_serialized_data.ptr(), | ||||||
|                                                         exec_ctx_serialized_data.length(), pos))) { |                                                         exec_ctx_serialized_data.length(), pos))) { | ||||||
|           LOG_WARN("fail to deserialize", K(ret)); |           LOG_WARN("fail to deserialize", K(ret)); | ||||||
|         } else if (OB_FAIL(handle->parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) { |         } else if (OB_FAIL(handle->parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) { | ||||||
|  | |||||||
| @ -384,7 +384,8 @@ struct ObCSVFormats { | |||||||
| class ObLoadFileDataTrimer | class ObLoadFileDataTrimer | ||||||
| { | { | ||||||
| public: | public: | ||||||
|   ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0), lines_cnt_(0) {} |   ObLoadFileDataTrimer() : incomplate_data_(NULL), incomplate_data_len_(0), | ||||||
|  |     incomplate_data_buf_len_(ObLoadFileBuffer::MAX_BUFFER_SIZE), lines_cnt_(0) {} | ||||||
|   int init(common::ObIAllocator &allocator, const ObCSVFormats &formats); |   int init(common::ObIAllocator &allocator, const ObCSVFormats &formats); | ||||||
|   //for debug |   //for debug | ||||||
|   ObString get_incomplate_data_string() { |   ObString get_incomplate_data_string() { | ||||||
| @ -396,10 +397,12 @@ public: | |||||||
|   bool has_incomplate_data() const { return incomplate_data_len_ > 0; } |   bool has_incomplate_data() const { return incomplate_data_len_ > 0; } | ||||||
|   int64_t get_lines_count() const { return lines_cnt_; } |   int64_t get_lines_count() const { return lines_cnt_; } | ||||||
|   void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; } |   void commit_line_cnt(int64_t line_cnt) { lines_cnt_ += line_cnt; } | ||||||
|  |   int expand_buf(common::ObIAllocator &allocator); | ||||||
| private: | private: | ||||||
|   ObCSVFormats formats_;//TODO [load data] change to ObInverseParser(formats) |   ObCSVFormats formats_; | ||||||
|   char *incomplate_data_; |   char *incomplate_data_; | ||||||
|   int64_t incomplate_data_len_; |   int64_t incomplate_data_len_; | ||||||
|  |   int64_t incomplate_data_buf_len_; | ||||||
|   int64_t lines_cnt_; |   int64_t lines_cnt_; | ||||||
| }; | }; | ||||||
|  |  | ||||||
| @ -491,6 +494,9 @@ struct ObShuffleTaskHandle { | |||||||
|                       common::ObBitSet<> &main_string_values, |                       common::ObBitSet<> &main_string_values, | ||||||
|                       uint64_t tenant_id); |                       uint64_t tenant_id); | ||||||
|   ~ObShuffleTaskHandle(); |   ~ObShuffleTaskHandle(); | ||||||
|  |  | ||||||
|  |   int expand_buf(const int64_t max_size); | ||||||
|  |  | ||||||
|   ObArenaAllocator allocator; |   ObArenaAllocator allocator; | ||||||
|   ObDesExecContext exec_ctx; |   ObDesExecContext exec_ctx; | ||||||
|   ObLoadFileBuffer *data_buffer; |   ObLoadFileBuffer *data_buffer; | ||||||
| @ -505,6 +511,7 @@ struct ObShuffleTaskHandle { | |||||||
|   common::ObBitSet<> &string_values; |   common::ObBitSet<> &string_values; | ||||||
|   ObShuffleResult result; |   ObShuffleResult result; | ||||||
|   ObSEArray<ObParserErrRec, 16> err_records; |   ObSEArray<ObParserErrRec, 16> err_records; | ||||||
|  |   common::ObMemAttr attr; | ||||||
|   TO_STRING_KV("task_id", result.task_id_); |   TO_STRING_KV("task_id", result.task_id_); | ||||||
| }; | }; | ||||||
|  |  | ||||||
| @ -711,6 +718,7 @@ public: | |||||||
|     int64_t num_of_table_column; |     int64_t num_of_table_column; | ||||||
|     int64_t parallel; |     int64_t parallel; | ||||||
|     int64_t batch_row_count; |     int64_t batch_row_count; | ||||||
|  |     int64_t batch_buffer_size; | ||||||
|     int64_t data_frag_mem_usage_limit; //limit = data_frag_mem_usage_limit * MAX_BUFFER_SIZE |     int64_t data_frag_mem_usage_limit; //limit = data_frag_mem_usage_limit * MAX_BUFFER_SIZE | ||||||
|     int64_t file_size; |     int64_t file_size; | ||||||
|     int64_t ignore_rows; |     int64_t ignore_rows; | ||||||
| @ -737,6 +745,7 @@ public: | |||||||
|     common::ObSEArray<int64_t, 1> file_buf_row_num; |     common::ObSEArray<int64_t, 1> file_buf_row_num; | ||||||
|     int64_t last_session_check_ts; |     int64_t last_session_check_ts; | ||||||
|     common::ObSEArray<ObLoadTableColumnDesc, 16> insert_infos; |     common::ObSEArray<ObLoadTableColumnDesc, 16> insert_infos; | ||||||
|  |     ObShuffleTaskHandle *temp_handle; | ||||||
|  |  | ||||||
|     //debug values |     //debug values | ||||||
|     int64_t insert_dispatch_rows; |     int64_t insert_dispatch_rows; | ||||||
| @ -759,7 +768,7 @@ public: | |||||||
|   int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt); |   int execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt); | ||||||
|  |  | ||||||
|   int shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox &box); |   int shuffle_task_gen_and_dispatch(ObExecContext &ctx, ToolBox &box); | ||||||
|   int next_file_buffer(ToolBox &box, ObLoadFileBuffer &data_buffer, int64_t limit = INT64_MAX); |   int next_file_buffer(ObExecContext &ctx, ToolBox &box, ObShuffleTaskHandle *handle, int64_t limit = INT64_MAX); | ||||||
|   int handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHandle &handle); |   int handle_returned_shuffle_task(ToolBox &box, ObShuffleTaskHandle &handle); | ||||||
|   int wait_shuffle_task_return(ToolBox &box); |   int wait_shuffle_task_return(ToolBox &box); | ||||||
|  |  | ||||||
|  | |||||||
| @ -9325,7 +9325,11 @@ READ_CONSISTENCY '(' consistency_level ')' | |||||||
| } | } | ||||||
| | LOAD_BATCH_SIZE '(' INTNUM ')' | | LOAD_BATCH_SIZE '(' INTNUM ')' | ||||||
| { | { | ||||||
|   malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 1, $3); |   malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, NULL); | ||||||
|  | } | ||||||
|  | | LOAD_BATCH_SIZE '(' INTNUM ',' STRING_VALUE ')' | ||||||
|  | { | ||||||
|  |   malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_BATCH_SIZE, 2, $3, $5); | ||||||
| } | } | ||||||
| | DIRECT '(' BOOL_VALUE ',' INTNUM ')' | | DIRECT '(' BOOL_VALUE ',' INTNUM ')' | ||||||
| { | { | ||||||
|  | |||||||
| @ -501,7 +501,7 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node) | |||||||
|         break; |         break; | ||||||
|       } |       } | ||||||
|       case T_LOAD_BATCH_SIZE: { |       case T_LOAD_BATCH_SIZE: { | ||||||
|         if (1 != hint_node->num_child_) { |         if (2 != hint_node->num_child_) { | ||||||
|           ret = OB_ERR_UNEXPECTED; |           ret = OB_ERR_UNEXPECTED; | ||||||
|           LOG_WARN("max concurrent node should have 1 child", K(ret)); |           LOG_WARN("max concurrent node should have 1 child", K(ret)); | ||||||
|         } else if (OB_ISNULL(hint_node->children_[0])) { |         } else if (OB_ISNULL(hint_node->children_[0])) { | ||||||
| @ -510,6 +510,11 @@ int ObLoadDataResolver::resolve_hints(const ParseNode &node) | |||||||
|         } else if (OB_FAIL(stmt_hints.set_value( |         } else if (OB_FAIL(stmt_hints.set_value( | ||||||
|                         ObLoadDataHint::BATCH_SIZE, hint_node->children_[0]->value_))) { |                         ObLoadDataHint::BATCH_SIZE, hint_node->children_[0]->value_))) { | ||||||
|           LOG_WARN("fail to set concurrent value", K(ret)); |           LOG_WARN("fail to set concurrent value", K(ret)); | ||||||
|  |         } else if (OB_NOT_NULL(hint_node->children_[1]) | ||||||
|  |                    && OB_FAIL(stmt_hints.set_value(ObLoadDataHint::BATCH_BUFFER_SIZE, | ||||||
|  |                                               ObString(hint_node->children_[1]->str_len_, | ||||||
|  |                                                       hint_node->children_[1]->str_value_)))) { | ||||||
|  |           LOG_WARN("fail to set concurrent value", K(ret)); | ||||||
|         } |         } | ||||||
|         break; |         break; | ||||||
|       } |       } | ||||||
|  | |||||||
| @ -190,6 +190,7 @@ public: | |||||||
|   }; |   }; | ||||||
|   enum StringHintItem { |   enum StringHintItem { | ||||||
|     LOG_LEVEL, |     LOG_LEVEL, | ||||||
|  |     BATCH_BUFFER_SIZE, | ||||||
|     TOTAL_STRING_ITEM |     TOTAL_STRING_ITEM | ||||||
|   }; |   }; | ||||||
|   void reset() |   void reset() | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 wjhh2008
					wjhh2008