[fix](Export) Fixed the bug that would be core when exporting large amounts of data (#21761)

A heap-buffer-overflow error occurs when exporting large amounts of data to orc format.
Reserve 50B for buffer to avoid this problem.
This commit is contained in:
Tiewei Fang
2023-07-18 00:06:38 +08:00
committed by GitHub
parent 05cf095506
commit 12784f863d
3 changed files with 144 additions and 13 deletions

View File

@ -167,7 +167,7 @@ void VOrcWriterWrapper::close() {
#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN) \
VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \
size_t begin_off = offset; \
const size_t begin_off = offset; \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
@ -179,7 +179,7 @@ void VOrcWriterWrapper::close() {
auto value = assert_cast<const COLUMN&>(*col).get_data()[row_id]; \
std::string value_str = fmt::format("{}", value); \
size_t len = value_str.size(); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -205,7 +205,7 @@ void VOrcWriterWrapper::close() {
auto value = not_null_column->get_data()[row_id]; \
std::string value_str = fmt::format("{}", value); \
size_t len = value_str.size(); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -227,7 +227,7 @@ void VOrcWriterWrapper::close() {
#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(root->fields[i]); \
size_t begin_off = offset; \
const size_t begin_off = offset; \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
@ -239,7 +239,7 @@ void VOrcWriterWrapper::close() {
int len = binary_cast<FROM, TO>( \
assert_cast<const ColumnVector<FROM>&>(*col).get_data()[row_id]) \
.to_buffer(const_cast<char*>(buffer.data) + offset); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -264,7 +264,7 @@ void VOrcWriterWrapper::close() {
for (size_t row_id = 0; row_id < sz; row_id++) { \
int len = binary_cast<FROM, TO>(not_null_column->get_data()[row_id]) \
.to_buffer(const_cast<char*>(buffer.data) + offset); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -285,7 +285,7 @@ void VOrcWriterWrapper::close() {
#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO) \
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(root->fields[i]); \
size_t begin_off = offset; \
const size_t begin_off = offset; \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
@ -299,7 +299,7 @@ void VOrcWriterWrapper::close() {
binary_cast<FROM, TO>( \
assert_cast<const ColumnVector<FROM>&>(*col).get_data()[row_id]) \
.to_buffer(const_cast<char*>(buffer.data) + offset, output_scale); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -325,7 +325,7 @@ void VOrcWriterWrapper::close() {
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; \
int len = binary_cast<FROM, TO>(not_null_column->get_data()[row_id]) \
.to_buffer(const_cast<char*>(buffer.data) + offset, output_scale); \
while (buffer.size < offset + len) { \
while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \
char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.data, buffer.size); \
free(const_cast<char*>(buffer.data)); \
@ -397,7 +397,7 @@ Status VOrcWriterWrapper::write(const Block& block) {
return Status::OK();
}
// Buffer used by date type
// Buffer used by date/datetime/datev2/datetimev2/largeint type
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
StringRef buffer(ptr, BUFFER_UNIT_SIZE);
size_t offset = 0;
@ -584,7 +584,7 @@ Status VOrcWriterWrapper::write(const Block& block) {
}
}
} catch (const std::exception& e) {
LOG(WARNING) << "Parquet write error: " << e.what();
LOG(WARNING) << "Orc write error: " << e.what();
return Status::InternalError(e.what());
}
root->numElements = sz;

View File

@ -98,7 +98,14 @@ private:
std::unique_ptr<orc::Type> _schema;
std::unique_ptr<orc::Writer> _writer;
static constexpr size_t BUFFER_UNIT_SIZE = 4096;
// Buffer used by date/datetime/datev2/datetimev2/largeint type
// date/datetime/datev2/datetimev2/largeint type will be converted to string bytes to store in Buffer
// The minimum value of largeint has 40 bytes after being converted to string(a negative number occupies a byte)
// The bytes of date/datetime/datev2/datetimev2 after converted to string are smaller than largeint
// Because a block is 4064 rows by default, here is 4064*40 bytes to BUFFER,
static constexpr size_t BUFFER_UNIT_SIZE = 4064 * 40;
// buffer reserves 40 bytes. The reserved space is just to prevent Headp-Buffer-Overflow
static constexpr size_t BUFFER_RESERVED_SIZE = 40;
};
} // namespace doris::vectorized
} // namespace doris::vectorized

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_big_data", "p2") {
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile")
return
}
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");
def check_path_exists = { dir_path ->
File path = new File(dir_path)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${dir_path} already exists! """)
}
}
def table_export_name = "test_export_big_data"
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_export_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_export_name} (
`id` int(11) NULL,
`name` string NULL,
`age` largeint(11) NULL,
`dt` date NULL,
`dt2` datev2 NULL,
`dtime` datetime NULL,
`dtime2` datetimev2 NULL
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
sql """ INSERT INTO ${table_export_name} select * from s3(
"uri" = "https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.csv",
"s3.access_key"= "${ak}",
"s3.secret_key" = "${sk}",
"format" = "csv");
"""
def outfile_path_prefix = """/mnt/datadisk1/fangtiewei/tmpdata/test_export"""
def uuid = UUID.randomUUID().toString()
def outFilePath = """${outfile_path_prefix}_${uuid}"""
// check export path
check_path_exists.call("${outFilePath}")
// exec export
sql """
EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${uuid}",
"format" = "orc",
"column_separator"=","
);
"""
while (true) {
def res = sql """ show export where label = "${uuid}" """
logger.info("export state: " + res[0][2])
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
assertEquals("1", json.fileNumber[0])
log.info("outfile_path: ${json.url[0]}")
return json.url[0];
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed: ${res[0][10]}""")
} else {
sleep(5000)
}
}
}