From 4b316e4c3fea76ff32af14c8c33c3e12fd6fd16b Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sun, 14 Mar 2021 15:39:46 +0800 Subject: [PATCH] [Outfile] Support exporting query result to local disk (#5489) 1. User can export query result to local disk like: `select * from tbl into outfile ("file:///disk1/result_");` And modify the return result to show the details of export: ``` mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"; +------------+-----------+----------+--------------+ | FileNumber | TotalRows | FileSize | URL | +------------+-----------+----------+--------------+ | 1 | 2 | 8 | 192.168.1.10 | +------------+-----------+----------+--------------+ ``` 2. Support create a mark file after export successfully finished. Co-authored-by: chenmingyu --- be/src/runtime/file_result_writer.cpp | 113 ++++++++++++++++-- be/src/runtime/file_result_writer.h | 29 ++++- be/src/runtime/result_sink.cpp | 2 +- be/src/runtime/result_writer.h | 2 +- docs/en/administrator-guide/outfile.md | 31 +++-- docs/zh-CN/administrator-guide/outfile.md | 60 ++++++---- .../apache/doris/analysis/OutFileClause.java | 67 +++++++++-- .../java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/qe/StmtExecutor.java | 41 ++++--- gensrc/thrift/DataSinks.thrift | 1 + 10 files changed, 279 insertions(+), 73 deletions(-) diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp index f68f50c989..758f852871 100644 --- a/be/src/runtime/file_result_writer.cpp +++ b/be/src/runtime/file_result_writer.cpp @@ -23,11 +23,15 @@ #include "exprs/expr.h" #include "exprs/expr_context.h" #include "gen_cpp/PaloInternalService_types.h" +#include "runtime/buffer_control_block.h" #include "runtime/primitive_type.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple_row.h" +#include "service/backend_options.h" #include "util/date_func.h" +#include "util/file_utils.h" +#include "util/mysql_row_buffer.h" #include "util/types.h" #include "util/uid_util.h" @@ -37,10 +41,12 @@ const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts, const std::vector& output_expr_ctxs, - RuntimeProfile* parent_profile) + RuntimeProfile* parent_profile, + BufferControlBlock* sinker) : _file_opts(file_opts), _output_expr_ctxs(output_expr_ctxs), - _parent_profile(parent_profile) {} + _parent_profile(parent_profile), + _sinker(sinker) {} FileResultWriter::~FileResultWriter() { _close_file_writer(true); @@ -50,7 +56,7 @@ Status FileResultWriter::init(RuntimeState* state) { _state = state; _init_profile(); - RETURN_IF_ERROR(_create_file_writer()); + RETURN_IF_ERROR(_create_next_file_writer()); return Status::OK(); } @@ -64,8 +70,40 @@ void FileResultWriter::_init_profile() { _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); } -Status FileResultWriter::_create_file_writer() { - std::string file_name = _get_next_file_name(); +Status FileResultWriter::_create_success_file() { + std::string file_name; + RETURN_IF_ERROR(_get_success_file_name(&file_name)); + RETURN_IF_ERROR(_create_file_writer(file_name)); + RETURN_IF_ERROR(_close_file_writer(true, true)); + return Status::OK(); +} + +Status FileResultWriter::_get_success_file_name(std::string* file_name) { + std::stringstream ss; + ss << _file_opts->file_path << _file_opts->success_file_name; + *file_name = ss.str(); + if (_file_opts->is_local_file) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + if (FileUtils::check_exist(*file_name)) { + return Status::InternalError("File already exists: " + *file_name + + ". Host: " + BackendOptions::get_localhost()); + } + } + + return Status::OK(); +} + +Status FileResultWriter::_create_next_file_writer() { + std::string file_name; + RETURN_IF_ERROR(_get_next_file_name(&file_name)); + return _create_file_writer(file_name); +} + +Status FileResultWriter::_create_file_writer(const std::string& file_name) { if (_file_opts->is_local_file) { _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); } else { @@ -92,10 +130,23 @@ Status FileResultWriter::_create_file_writer() { } // file name format as: my_prefix_0.csv -std::string FileResultWriter::_get_next_file_name() { +Status FileResultWriter::_get_next_file_name(std::string* file_name) { std::stringstream ss; ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name(); - return ss.str(); + *file_name = ss.str(); + if (_file_opts->is_local_file) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + if (FileUtils::check_exist(*file_name)) { + return Status::InternalError("File already exists: " + *file_name + + ". Host: " + BackendOptions::get_localhost()); + } + } + + return Status::OK(); } std::string FileResultWriter::_file_format_to_name() { @@ -291,7 +342,7 @@ Status FileResultWriter::_create_new_file_if_exceed_size() { return Status::OK(); } -Status FileResultWriter::_close_file_writer(bool done) { +Status FileResultWriter::_close_file_writer(bool done, bool only_close) { if (_parquet_writer != nullptr) { _parquet_writer->close(); delete _parquet_writer; @@ -305,13 +356,57 @@ Status FileResultWriter::_close_file_writer(bool done) { _file_writer = nullptr; } + if (only_close) { + return Status::OK(); + } + if (!done) { // not finished, create new file writer for next file - RETURN_IF_ERROR(_create_file_writer()); + RETURN_IF_ERROR(_create_next_file_writer()); + } else { + // All data is written to file, send statistic result + if (_file_opts->success_file_name != "") { + // write success file, just need to touch an empty file + RETURN_IF_ERROR(_create_success_file()); + } + RETURN_IF_ERROR(_send_result()); } return Status::OK(); } +Status FileResultWriter::_send_result() { + if (_is_result_sent) { + return Status::OK(); + } + _is_result_sent = true; + + // The final stat result include: + // FileNumber, TotalRows, FileSize and URL + // The type of these field should be conssitent with types defined + // in OutFileClause.java of FE. + MysqlRowBuffer row_buffer; + row_buffer.push_int(_file_idx); // file number + row_buffer.push_bigint(_written_rows_counter->value()); // total rows + row_buffer.push_bigint(_written_data_bytes->value()); // file size + std::string localhost = BackendOptions::get_localhost(); + row_buffer.push_string(localhost.c_str(), localhost.length()); // url + + TFetchDataResult* result = new (std::nothrow) TFetchDataResult(); + result->result_batch.rows.resize(1); + result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length()); + + Status st = _sinker->add_batch(result); + if (st.ok()) { + result = nullptr; + } else { + LOG(WARNING) << "failed to send outfile result: " << st.get_error_msg(); + } + + delete result; + result = nullptr; + return st; +} + Status FileResultWriter::close() { // the following 2 profile "_written_rows_counter" and "_writer_close_timer" // must be outside the `_close_file_writer()`. diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index 72a6aee92c..f264c6d840 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -39,6 +39,7 @@ struct ResultFileOptions { size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB std::vector broker_addresses; std::map broker_properties; + std::string success_file_name = ""; ResultFileOptions(const TResultFileSinkOptions& t_opt) { file_path = t_opt.file_path; @@ -56,21 +57,29 @@ struct ResultFileOptions { if (t_opt.__isset.broker_properties) { broker_properties = t_opt.broker_properties; } + if (t_opt.__isset.success_file_name) { + success_file_name = t_opt.success_file_name; + } } }; +class BufferControlBlock; // write result to file class FileResultWriter final : public ResultWriter { public: FileResultWriter(const ResultFileOptions* file_option, const std::vector& output_expr_ctxs, - RuntimeProfile* parent_profile); + RuntimeProfile* parent_profile, + BufferControlBlock* sinker); virtual ~FileResultWriter(); virtual Status init(RuntimeState* state) override; virtual Status append_row_batch(const RowBatch* batch) override; virtual Status close() override; + // file result writer always return statistic result in one row + virtual int64_t get_written_rows() const { return 1; } + private: Status _write_csv_file(const RowBatch& batch); Status _write_one_row_as_csv(TupleRow* row); @@ -80,14 +89,20 @@ private: Status _flush_plain_text_outstream(bool eos); void _init_profile(); - Status _create_file_writer(); + Status _create_file_writer(const std::string& file_name); + Status _create_next_file_writer(); + Status _create_success_file(); // get next export file name - std::string _get_next_file_name(); + Status _get_next_file_name(std::string* file_name); + Status _get_success_file_name(std::string* file_name); std::string _file_format_to_name(); - // close file writer, and if !done, it will create new writer for next file - Status _close_file_writer(bool done); + // close file writer, and if !done, it will create new writer for next file. + // if only_close is true, this method will just close the file writer and return. + Status _close_file_writer(bool done, bool only_close = false); // create a new file if current file size exceed limit Status _create_new_file_if_exceed_size(); + // send the final statistic result + Status _send_result(); private: RuntimeState* _state; // not owned, set when init @@ -126,6 +141,10 @@ private: RuntimeProfile::Counter* _written_rows_counter = nullptr; // bytes of written data RuntimeProfile::Counter* _written_data_bytes = nullptr; + + BufferControlBlock* _sinker; + // set to true if the final statistic result is sent + bool _is_result_sent = false; }; } // namespace doris diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index e536fc5f3e..939f21827b 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -81,7 +81,7 @@ Status ResultSink::prepare(RuntimeState* state) { case TResultSinkType::FILE: CHECK(_file_opts.get() != nullptr); _writer.reset(new (std::nothrow) - FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile)); + FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile, _sender.get())); break; default: return Status::InternalError("Unknown result sink type"); diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index 8fbd1bed2f..418e7b644e 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -38,7 +38,7 @@ public: virtual Status close() = 0; - int64_t get_written_rows() const { return _written_rows; } + virtual int64_t get_written_rows() const { return _written_rows; } static const std::string NULL_IN_CSV; diff --git a/docs/en/administrator-guide/outfile.md b/docs/en/administrator-guide/outfile.md index 691ed3b045..7f31b531b6 100644 --- a/docs/en/administrator-guide/outfile.md +++ b/docs/en/administrator-guide/outfile.md @@ -160,20 +160,22 @@ WITH BROKER `broker_name` 4. Example 4 Export simple query results to the file `cos://${bucket_name}/path/result.txt`. Specify the export format as CSV. + And create a mark file after export finished. ``` - select k1,k2,v1 from tbl1 limit 100000 - into outfile "s3a://my_bucket/export/my_file_" - FORMAT AS CSV - PROPERTIES - ( + select k1,k2,v1 from tbl1 limit 100000 + into outfile "s3a://my_bucket/export/my_file_" + FORMAT AS CSV + PROPERTIES + ( "broker.name" = "hdfs_broker", "broker.fs.s3a.access.key" = "xxx", "broker.fs.s3a.secret.key" = "xxxx", "broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/", "column_separator" = ",", "line_delimiter" = "\n", - "max_file_size" = "1024MB" + "max_file_size" = "1024MB", + "success_file_name" = "SUCCESS" ) ``` @@ -188,19 +190,30 @@ WITH BROKER `broker_name` ## Return result The command is a synchronization command. The command returns, which means the operation is over. +At the same time, a row of results will be returned to show the exported execution result. If it exports and returns normally, the result is as follows: ``` -mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"; ++------------+-----------+----------+--------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+--------------+ +| 1 | 2 | 8 | 192.168.1.10 | ++------------+-----------+----------+--------------+ +1 row in set (0.05 sec) ``` -`100000 row affected` Indicates the size of the exported result set. +* FileNumber: The number of files finally generated. +* TotalRows: The number of rows in the result set. +* FileSize: The total size of the exported file. Unit byte. +* URL: If it is exported to a local disk, the Compute Node to which it is exported is displayed here. If the execution is incorrect, an error message will be returned, such as: ``` -mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +mysql> SELECT * FROM tbl INTO OUTFILE ... +ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... ``` ## Notice diff --git a/docs/zh-CN/administrator-guide/outfile.md b/docs/zh-CN/administrator-guide/outfile.md index c40af67766..04ea0e7dbb 100644 --- a/docs/zh-CN/administrator-guide/outfile.md +++ b/docs/zh-CN/administrator-guide/outfile.md @@ -136,6 +136,7 @@ INTO OUTFILE "file_path" 3. 示例3 将 UNION 语句的查询结果导出到文件 `bos://bucket/result.txt`。指定导出格式为 PARQUET。使用 `my_broker` 并设置 hdfs 高可用信息。PARQUET 格式无需指定列分割符。 + 导出完成后,生成一个标识文件。 ``` SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 @@ -156,46 +157,59 @@ INTO OUTFILE "file_path" 4. 示例4 - 将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。 - ``` - select k1,k2,v1 from tbl1 limit 100000 - into outfile "s3a://my_bucket/export/my_file_" - FORMAT AS CSV - PROPERTIES - ( - "broker.name" = "hdfs_broker", - "broker.fs.s3a.access.key" = "xxx", - "broker.fs.s3a.secret.key" = "xxxx", - "broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/", - "column_separator" = ",", - "line_delimiter" = "\n", - "max_file_size" = "1024MB" + 将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。 + 导出完成后,生成一个标识文件。 + + ``` + select k1,k2,v1 from tbl1 limit 100000 + into outfile "s3a://my_bucket/export/my_file_" + FORMAT AS CSV + PROPERTIES + ( + "broker.name" = "hdfs_broker", + "broker.fs.s3a.access.key" = "xxx", + "broker.fs.s3a.secret.key" = "xxxx", + "broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/", + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "1024MB", + "success_file_name" = "SUCCESS" ) ``` - 最终生成文件如如果不大于 1GB,则为:`my_file_0.csv`。 + 最终生成文件如如果不大于 1GB,则为:`my_file_0.csv`。 - 如果大于 1GB,则可能为 `my_file_0.csv, result_1.csv, ...`。 + 如果大于 1GB,则可能为 `my_file_0.csv, result_1.csv, ...`。 - 在cos上验证 - 1. 不存在的path会自动创建 - 2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。 + 在cos上验证 + 1. 不存在的path会自动创建 + 2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。 ## 返回结果 -导出命令为同步命令。命令返回,即表示操作结束。 +导出命令为同步命令。命令返回,即表示操作结束。同时会返回一行结果来展示导出的执行结果。 如果正常导出并返回,则结果如下: ``` -mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"; ++------------+-----------+----------+--------------+ +| FileNumber | TotalRows | FileSize | URL | ++------------+-----------+----------+--------------+ +| 1 | 2 | 8 | 192.168.1.10 | ++------------+-----------+----------+--------------+ +1 row in set (0.05 sec) ``` -其中 `100000 row affected` 表示导出的结果集行数。 +* FileNumber:最终生成的文件个数。 +* TotalRows:结果集行数。 +* FileSize:导出文件总大小。单位字节。 +* URL:如果是导出到本地磁盘,则这里显示具体导出到哪个 Compute Node。 如果执行错误,则会返回错误信息,如: ``` -mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +mysql> SELECT * FROM tbl INTO OUTFILE ... +ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... ``` ## 注意事项 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 778ec5b93b..05f975ddcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -17,20 +17,25 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TResultFileSinkOptions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.clearspring.analytics.util.Lists; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -39,12 +44,28 @@ import java.util.stream.Collectors; public class OutFileClause { private static final Logger LOG = LogManager.getLogger(OutFileClause.class); + public static final List RESULT_COL_NAMES = Lists.newArrayList(); + public static final List RESULT_COL_TYPES = Lists.newArrayList(); + + static { + RESULT_COL_NAMES.add("FileNumber"); + RESULT_COL_NAMES.add("TotalRows"); + RESULT_COL_NAMES.add("FileSize"); + RESULT_COL_NAMES.add("URL"); + + RESULT_COL_TYPES.add(PrimitiveType.INT); + RESULT_COL_TYPES.add(PrimitiveType.BIGINT); + RESULT_COL_TYPES.add(PrimitiveType.BIGINT); + RESULT_COL_TYPES.add(PrimitiveType.VARCHAR); + } + public static final String LOCAL_FILE_PREFIX = "file:///"; private static final String BROKER_PROP_PREFIX = "broker."; private static final String PROP_BROKER_NAME = "broker.name"; private static final String PROP_COLUMN_SEPARATOR = "column_separator"; private static final String PROP_LINE_DELIMITER = "line_delimiter"; private static final String PROP_MAX_FILE_SIZE = "max_file_size"; + private static final String PROP_SUCCESS_FILE_NAME = "success_file_name"; private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB @@ -60,6 +81,10 @@ public class OutFileClause { private TFileFormatType fileFormatType; private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES; private BrokerDesc brokerDesc = null; + // True if result is written to local disk. + // If set to true, the brokerDesc must be null. + private boolean isLocalOutput = false; + private String successFileName = ""; public OutFileClause(String filePath, String format, Map properties) { this.filePath = filePath; @@ -94,9 +119,7 @@ public class OutFileClause { } public void analyze(Analyzer analyzer) throws AnalysisException { - if (Strings.isNullOrEmpty(filePath)) { - throw new AnalysisException("Must specify file in OUTFILE clause"); - } + analyzeFilePath(); if (!format.equals("csv")) { throw new AnalysisException("Only support CSV format"); @@ -105,11 +128,29 @@ public class OutFileClause { analyzeProperties(); - if (brokerDesc == null) { + if (brokerDesc != null && isLocalOutput) { + throw new AnalysisException("No need to specify BROKER properties in OUTFILE clause for local file output"); + } else if (brokerDesc == null && !isLocalOutput) { throw new AnalysisException("Must specify BROKER properties in OUTFILE clause"); } } + private void analyzeFilePath() throws AnalysisException { + if (Strings.isNullOrEmpty(filePath)) { + throw new AnalysisException("Must specify file in OUTFILE clause"); + } + + if (filePath.startsWith(LOCAL_FILE_PREFIX)) { + if (!Config.enable_outfile_to_local) { + throw new AnalysisException("Exporting results to local disk is not allowed."); + } + isLocalOutput = true; + filePath = filePath.substring(LOCAL_FILE_PREFIX.length() - 1); // leave last '/' + } else { + isLocalOutput = false; + } + } + private void analyzeProperties() throws AnalysisException { if (properties == null || properties.isEmpty()) { return; @@ -117,9 +158,6 @@ public class OutFileClause { Set processedPropKeys = Sets.newHashSet(); getBrokerProperties(processedPropKeys); - if (brokerDesc == null) { - return; - } if (properties.containsKey(PROP_COLUMN_SEPARATOR)) { if (!isCsvFormat()) { @@ -145,6 +183,12 @@ public class OutFileClause { processedPropKeys.add(PROP_MAX_FILE_SIZE); } + if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) { + successFileName = properties.get(PROP_SUCCESS_FILE_NAME); + FeNameFormat.checkCommonName("file name", successFileName); + processedPropKeys.add(PROP_SUCCESS_FILE_NAME); + } + if (processedPropKeys.size() != properties.size()) { LOG.debug("{} vs {}", processedPropKeys, properties); throw new AnalysisException("Unknown properties: " + properties.keySet().stream() @@ -210,6 +254,9 @@ public class OutFileClause { // broker_addresses of sinkOptions will be set in Coordinator. // Because we need to choose the nearest broker with the result sink node. } + if (!Strings.isNullOrEmpty(successFileName)) { + sinkOptions.setSuccessFileName(successFileName); + } return sinkOptions; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 91000e4802..0f98a889b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1349,4 +1349,10 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean enable_access_file_without_broker = false; + + /** + * Whether to allow the outfile function to export the results to the local disk. + */ + @ConfField + public static boolean enable_outfile_to_local = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 0a991effd7..8a125a1ff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; +import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.SelectStmt; @@ -41,6 +42,7 @@ import org.apache.doris.analysis.UnsupportedStmt; import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; @@ -102,6 +104,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; // Do one COM_QUERY process. // first: Parse receive byte array to statement struct. @@ -626,7 +629,7 @@ public class StmtExecutor { batch = value.getRowBatch(); if (!isSend) { // send meta fields before sending first data batch. - sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs()); + sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs())); isSend = true; } for (ByteBuffer row : batch.getBatch().getRows()) { @@ -640,7 +643,7 @@ public class StmtExecutor { statisticsForAuditLog = batch.getQueryStatistics(); } if (!isSend) { - sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs()); + sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs())); isSend = true; } context.getState().setEof(); @@ -687,7 +690,7 @@ public class StmtExecutor { if (batch.getBatch() != null) { cacheAnalyzer.copyRowBatch(batch); if (!isSendFields) { - sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs()); + sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs())); isSendFields = true; } for (ByteBuffer row : batch.getBatch().getRows()) { @@ -707,7 +710,7 @@ public class StmtExecutor { cacheAnalyzer.updateCache(); if (!isSendFields) { - sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs()); + sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs())); isSendFields = true; } @@ -765,11 +768,15 @@ public class StmtExecutor { while (true) { batch = coord.getNext(); // for outfile query, there will be only one empty batch send back with eos flag - if (batch.getBatch() != null && !isOutfileQuery) { + if (batch.getBatch() != null) { // For some language driver, getting error packet after fields packet will be recognized as a success result // so We need to send fields after first batch arrived if (!isSendFields) { - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + if (!isOutfileQuery) { + sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs())); + } else { + sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES); + } isSendFields = true; } for (ByteBuffer row : batch.getBatch().getRows()) { @@ -781,16 +788,16 @@ public class StmtExecutor { break; } } - if (!isSendFields && !isOutfileQuery) { - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + if (!isSendFields) { + if (!isOutfileQuery) { + sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs())); + } else { + sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES); + } } statisticsForAuditLog = batch.getQueryStatistics(); - if (!isOutfileQuery) { - context.getState().setEof(); - } else { - context.getState().setOk(statisticsForAuditLog.returned_rows, 0, ""); - } + context.getState().setEof(); plannerProfile.setQueryFetchResultFinishTime(); } @@ -993,7 +1000,7 @@ public class StmtExecutor { context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer()); } - private void sendFields(List colNames, List exprs) throws IOException { + private void sendFields(List colNames, List types) throws IOException { // sends how many columns serializer.reset(); serializer.writeVInt(colNames.size()); @@ -1001,7 +1008,7 @@ public class StmtExecutor { // send field one by one for (int i = 0; i < colNames.size(); ++i) { serializer.reset(); - serializer.writeField(colNames.get(i), exprs.get(i).getType().getPrimitiveType()); + serializer.writeField(colNames.get(i), types.get(i)); context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer()); } // send EOF @@ -1112,5 +1119,9 @@ public class StmtExecutor { } return statisticsForAuditLog; } + + private List exprToType(List exprs) { + return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList()); + } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index ebf4371307..c3ebdf0256 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -48,6 +48,7 @@ struct TResultFileSinkOptions { 5: optional i64 max_file_size_bytes 6: optional list broker_addresses; // only for remote file 7: optional map broker_properties // only for remote file + 8: optional string success_file_name } struct TMemoryScratchSink {