From 12784f863d4b84535d0728c74f3fd8d4ea2248fb Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Tue, 18 Jul 2023 00:06:38 +0800 Subject: [PATCH] [fix](Export) Fixed the bug that would be core when exporting large amounts of data (#21761) A heap-buffer-overflow error occurs when exporting large amounts of data to orc format. Reserve 50B for buffer to avoid this problem. --- be/src/vec/runtime/vorc_writer.cpp | 22 ++-- be/src/vec/runtime/vorc_writer.h | 11 +- .../export_p2/test_export_big_data.groovy | 124 ++++++++++++++++++ 3 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 regression-test/suites/export_p2/test_export_big_data.groovy diff --git a/be/src/vec/runtime/vorc_writer.cpp b/be/src/vec/runtime/vorc_writer.cpp index 47fc9242f8..293c11b874 100644 --- a/be/src/vec/runtime/vorc_writer.cpp +++ b/be/src/vec/runtime/vorc_writer.cpp @@ -167,7 +167,7 @@ void VOrcWriterWrapper::close() { #define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN) \ VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ - size_t begin_off = offset; \ + const size_t begin_off = offset; \ if (null_map != nullptr) { \ cur_batch->hasNulls = true; \ auto& null_data = assert_cast(*null_map).get_data(); \ @@ -179,7 +179,7 @@ void VOrcWriterWrapper::close() { auto value = assert_cast(*col).get_data()[row_id]; \ std::string value_str = fmt::format("{}", value); \ size_t len = value_str.size(); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -205,7 +205,7 @@ void VOrcWriterWrapper::close() { auto value = not_null_column->get_data()[row_id]; \ std::string value_str = fmt::format("{}", value); \ size_t len = value_str.size(); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -227,7 +227,7 @@ void VOrcWriterWrapper::close() { #define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \ orc::StringVectorBatch* cur_batch = dynamic_cast(root->fields[i]); \ - size_t begin_off = offset; \ + const size_t begin_off = offset; \ if (null_map != nullptr) { \ cur_batch->hasNulls = true; \ auto& null_data = assert_cast(*null_map).get_data(); \ @@ -239,7 +239,7 @@ void VOrcWriterWrapper::close() { int len = binary_cast( \ assert_cast&>(*col).get_data()[row_id]) \ .to_buffer(const_cast(buffer.data) + offset); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -264,7 +264,7 @@ void VOrcWriterWrapper::close() { for (size_t row_id = 0; row_id < sz; row_id++) { \ int len = binary_cast(not_null_column->get_data()[row_id]) \ .to_buffer(const_cast(buffer.data) + offset); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -285,7 +285,7 @@ void VOrcWriterWrapper::close() { #define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO) \ orc::StringVectorBatch* cur_batch = dynamic_cast(root->fields[i]); \ - size_t begin_off = offset; \ + const size_t begin_off = offset; \ if (null_map != nullptr) { \ cur_batch->hasNulls = true; \ auto& null_data = assert_cast(*null_map).get_data(); \ @@ -299,7 +299,7 @@ void VOrcWriterWrapper::close() { binary_cast( \ assert_cast&>(*col).get_data()[row_id]) \ .to_buffer(const_cast(buffer.data) + offset, output_scale); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -325,7 +325,7 @@ void VOrcWriterWrapper::close() { int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; \ int len = binary_cast(not_null_column->get_data()[row_id]) \ .to_buffer(const_cast(buffer.data) + offset, output_scale); \ - while (buffer.size < offset + len) { \ + while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ memcpy(new_ptr, buffer.data, buffer.size); \ free(const_cast(buffer.data)); \ @@ -397,7 +397,7 @@ Status VOrcWriterWrapper::write(const Block& block) { return Status::OK(); } - // Buffer used by date type + // Buffer used by date/datetime/datev2/datetimev2/largeint type char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); StringRef buffer(ptr, BUFFER_UNIT_SIZE); size_t offset = 0; @@ -584,7 +584,7 @@ Status VOrcWriterWrapper::write(const Block& block) { } } } catch (const std::exception& e) { - LOG(WARNING) << "Parquet write error: " << e.what(); + LOG(WARNING) << "Orc write error: " << e.what(); return Status::InternalError(e.what()); } root->numElements = sz; diff --git a/be/src/vec/runtime/vorc_writer.h b/be/src/vec/runtime/vorc_writer.h index 3a7b6c205f..ba508ea7a1 100644 --- a/be/src/vec/runtime/vorc_writer.h +++ b/be/src/vec/runtime/vorc_writer.h @@ -98,7 +98,14 @@ private: std::unique_ptr _schema; std::unique_ptr _writer; - static constexpr size_t BUFFER_UNIT_SIZE = 4096; + // Buffer used by date/datetime/datev2/datetimev2/largeint type + // date/datetime/datev2/datetimev2/largeint type will be converted to string bytes to store in Buffer + // The minimum value of largeint has 40 bytes after being converted to string(a negative number occupies a byte) + // The bytes of date/datetime/datev2/datetimev2 after converted to string are smaller than largeint + // Because a block is 4064 rows by default, here is 4064*40 bytes to BUFFER, + static constexpr size_t BUFFER_UNIT_SIZE = 4064 * 40; + // buffer reserves 40 bytes. The reserved space is just to prevent Headp-Buffer-Overflow + static constexpr size_t BUFFER_RESERVED_SIZE = 40; }; -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/regression-test/suites/export_p2/test_export_big_data.groovy b/regression-test/suites/export_p2/test_export_big_data.groovy new file mode 100644 index 0000000000..5c09b83010 --- /dev/null +++ b/regression-test/suites/export_p2/test_export_big_data.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_export_big_data", "p2") { + // check whether the FE config 'enable_outfile_to_local' is true + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe") + + String command = strBuilder.toString() + def process = command.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + def configJson = response.data.rows + boolean enableOutfileToLocal = false + for (Object conf: configJson) { + assert conf instanceof Map + if (((Map) conf).get("Name").toLowerCase() == "enable_outfile_to_local") { + enableOutfileToLocal = ((Map) conf).get("Value").toLowerCase() == "true" + } + } + if (!enableOutfileToLocal) { + logger.warn("Please set enable_outfile_to_local to true to run test_outfile") + return + } + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def check_path_exists = { dir_path -> + File path = new File(dir_path) + if (!path.exists()) { + assert path.mkdirs() + } else { + throw new IllegalStateException("""${dir_path} already exists! """) + } + } + + def table_export_name = "test_export_big_data" + // create table and insert + sql """ DROP TABLE IF EXISTS ${table_export_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_export_name} ( + `id` int(11) NULL, + `name` string NULL, + `age` largeint(11) NULL, + `dt` date NULL, + `dt2` datev2 NULL, + `dtime` datetime NULL, + `dtime2` datetimev2 NULL + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + sql """ INSERT INTO ${table_export_name} select * from s3( + "uri" = "https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.csv", + "s3.access_key"= "${ak}", + "s3.secret_key" = "${sk}", + "format" = "csv"); + """ + + def outfile_path_prefix = """/mnt/datadisk1/fangtiewei/tmpdata/test_export""" + def uuid = UUID.randomUUID().toString() + def outFilePath = """${outfile_path_prefix}_${uuid}""" + + + // check export path + check_path_exists.call("${outFilePath}") + + // exec export + sql """ + EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" + PROPERTIES( + "label" = "${uuid}", + "format" = "orc", + "column_separator"="," + ); + """ + + while (true) { + def res = sql """ show export where label = "${uuid}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + assertEquals("1", json.fileNumber[0]) + log.info("outfile_path: ${json.url[0]}") + return json.url[0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } +}