[Fix](Outfile) Fix that it does not report error when export table to S3 with an incorrect ak/sk/bucket (#23441)

Problem:
It will return a result although we use wrong ak/sk/bucket name, such as:
```sql
mysql> select * from demo.student
    -> into outfile "s3://xxxx/exp_"
    -> format as csv
    -> properties(
    ->   "s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
    ->   "s3.region" = "ap-beijing",
    ->   "s3.access_key"= "xxx",
    ->   "s3.secret_key" = "yyyy"
    -> );
+------------+-----------+----------+----------------------------------------------------------------------------------------------------+
| FileNumber | TotalRows | FileSize | URL                                                                                                |
+------------+-----------+----------+----------------------------------------------------------------------------------------------------+
|          1 |         3 |       26 | s3://xxxx/exp_2ae166e2981d4c08-b577290f93aa82ba_ |
+------------+-----------+----------+----------------------------------------------------------------------------------------------------+
1 row in set (0.15 sec)
```

The reason for this is that we did not catch the error returned by `close()` phase.
This commit is contained in:
Tiewei Fang
2023-08-26 00:19:30 +08:00
committed by GitHub
parent f66f161017
commit f32efe5758
11 changed files with 367 additions and 11 deletions

View File

@ -50,8 +50,8 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
void reserve_buffer(Slice s) { _buf = s; }
// apend data into the memory buffer inside or into the file cache
// if the buffer has no memory buffer
// append data into the memory buffer inside
// or into the file cache if the buffer has no memory buffer
void append_data(const Slice& data);
// upload to S3 and file cache in async threadpool
void submit();

View File

@ -377,6 +377,7 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
static_cast<int>(response.GetError().GetResponseCode()));
buf._on_failed(_st);
LOG(WARNING) << _st;
return;
}
_bytes_written += buf.get_size();
s3_file_created_total << 1;

View File

@ -466,7 +466,7 @@ Status VFileResultWriter::_create_new_file_if_exceed_size() {
Status VFileResultWriter::_close_file_writer(bool done) {
if (_vfile_writer) {
_vfile_writer->close();
RETURN_IF_ERROR(_vfile_writer->close());
// we can not use _current_written_bytes to COUNTER_UPDATE(_written_data_bytes, _current_written_bytes)
// because it will call `write()` function of orc/parquet function in `_vfile_writer->close()`
// and the real written_len will increase
@ -474,7 +474,7 @@ Status VFileResultWriter::_close_file_writer(bool done) {
COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len());
_vfile_writer.reset(nullptr);
} else if (_file_writer_impl) {
_file_writer_impl->close();
RETURN_IF_ERROR(_file_writer_impl->close());
}
if (!done) {

View File

@ -61,10 +61,11 @@ VOrcOutputStream::~VOrcOutputStream() {
void VOrcOutputStream::close() {
if (!_is_closed) {
Status st = _file_writer->close();
_is_closed = true;
if (!st.ok()) {
LOG(WARNING) << "close orc output stream failed: " << st;
throw std::runtime_error(st.to_string());
}
_is_closed = true;
}
}
@ -115,10 +116,15 @@ int64_t VOrcWriterWrapper::written_len() {
return _output_stream->getLength();
}
void VOrcWriterWrapper::close() {
Status VOrcWriterWrapper::close() {
if (_writer != nullptr) {
_writer->close();
try {
_writer->close();
} catch (const std::exception& e) {
return Status::IOError(e.what());
}
}
return Status::OK();
}
#define RETURN_WRONG_TYPE \

View File

@ -84,7 +84,7 @@ public:
Status write(const Block& block) override;
void close() override;
Status close() override;
int64_t written_len() override;

View File

@ -937,7 +937,7 @@ int64_t VParquetWriterWrapper::written_len() {
return _outstream->get_written_len();
}
void VParquetWriterWrapper::close() {
Status VParquetWriterWrapper::close() {
try {
if (_rg_writer != nullptr) {
_rg_writer->Close();
@ -949,11 +949,14 @@ void VParquetWriterWrapper::close() {
arrow::Status st = _outstream->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
return Status::IOError(st.ToString());
}
} catch (const std::exception& e) {
_rg_writer = nullptr;
LOG(WARNING) << "Parquet writer close error: " << e.what();
return Status::IOError(e.what());
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -103,7 +103,7 @@ public:
virtual Status write(const Block& block) = 0;
virtual void close() = 0;
virtual Status close() = 0;
virtual int64_t written_len() = 0;
@ -129,7 +129,7 @@ public:
Status write(const Block& block) override;
void close() override;
Status close() override;
int64_t written_len() override;

View File

@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9

View File

@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9

View File

@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_exception") {
def tableName = "outfile_exception_test"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` largeint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 10; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
order_qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """
// check parquet
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as parquet
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check orc
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as orc
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv_with_names
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv_with_names
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv_with_names_and_types
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv_with_names_and_types
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
}

View File

@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_exception") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
def tableName = "outfile_exception_test"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` largeint COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 10; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
order_qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """
// check parquet
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as parquet
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check orc
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as orc
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv_with_names
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv_with_names
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
// check csv_with_names_and_types
test {
sql """
select * from ${tableName} t ORDER BY user_id
into outfile "s3://ftw-datalake-test/test_outfile/exp_"
format as csv_with_names_and_types
properties(
"s3.endpoint" = "https://cos.ap-beijing.myqcloud.com",
"s3.region" = "ap-beijing",
"s3.access_key"= "xx",
"s3.secret_key" = "yy"
);
"""
// check exception
exception "NoSuchBucket:The specified bucket does not exist"
}
}