[Outfile] Support exporting query result to local disk (#5489)

1.
User can export query result to local disk like:

`select * from tbl into outfile ("file:///disk1/result_");`

And modify the return result to show the details of export:

```
mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------+
| FileNumber | TotalRows | FileSize | URL          |
+------------+-----------+----------+--------------+
|          1 |         2 |        8 | 192.168.1.10 |
+------------+-----------+----------+--------------+
```

2.
Support create a mark file after export successfully finished.

Co-authored-by: chenmingyu <chenmingyu@baidu.com>
This commit is contained in:
Mingyu Chen
2021-03-14 15:39:46 +08:00
committed by GitHub
parent e9a73ee278
commit 4b316e4c3f
10 changed files with 279 additions and 73 deletions

View File

@ -23,11 +23,15 @@
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "runtime/buffer_control_block.h"
#include "runtime/primitive_type.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
#include "util/date_func.h"
#include "util/file_utils.h"
#include "util/mysql_row_buffer.h"
#include "util/types.h"
#include "util/uid_util.h"
@ -37,10 +41,12 @@ const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile)
RuntimeProfile* parent_profile,
BufferControlBlock* sinker)
: _file_opts(file_opts),
_output_expr_ctxs(output_expr_ctxs),
_parent_profile(parent_profile) {}
_parent_profile(parent_profile),
_sinker(sinker) {}
FileResultWriter::~FileResultWriter() {
_close_file_writer(true);
@ -50,7 +56,7 @@ Status FileResultWriter::init(RuntimeState* state) {
_state = state;
_init_profile();
RETURN_IF_ERROR(_create_file_writer());
RETURN_IF_ERROR(_create_next_file_writer());
return Status::OK();
}
@ -64,8 +70,40 @@ void FileResultWriter::_init_profile() {
_written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
}
Status FileResultWriter::_create_file_writer() {
std::string file_name = _get_next_file_name();
Status FileResultWriter::_create_success_file() {
std::string file_name;
RETURN_IF_ERROR(_get_success_file_name(&file_name));
RETURN_IF_ERROR(_create_file_writer(file_name));
RETURN_IF_ERROR(_close_file_writer(true, true));
return Status::OK();
}
Status FileResultWriter::_get_success_file_name(std::string* file_name) {
std::stringstream ss;
ss << _file_opts->file_path << _file_opts->success_file_name;
*file_name = ss.str();
if (_file_opts->is_local_file) {
// For local file writer, the file_path is a local dir.
// Here we do a simple security verification by checking whether the file exists.
// Because the file path is currently arbitrarily specified by the user,
// Doris is not responsible for ensuring the correctness of the path.
// This is just to prevent overwriting the existing file.
if (FileUtils::check_exist(*file_name)) {
return Status::InternalError("File already exists: " + *file_name
+ ". Host: " + BackendOptions::get_localhost());
}
}
return Status::OK();
}
Status FileResultWriter::_create_next_file_writer() {
std::string file_name;
RETURN_IF_ERROR(_get_next_file_name(&file_name));
return _create_file_writer(file_name);
}
Status FileResultWriter::_create_file_writer(const std::string& file_name) {
if (_file_opts->is_local_file) {
_file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
} else {
@ -92,10 +130,23 @@ Status FileResultWriter::_create_file_writer() {
}
// file name format as: my_prefix_0.csv
std::string FileResultWriter::_get_next_file_name() {
Status FileResultWriter::_get_next_file_name(std::string* file_name) {
std::stringstream ss;
ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name();
return ss.str();
*file_name = ss.str();
if (_file_opts->is_local_file) {
// For local file writer, the file_path is a local dir.
// Here we do a simple security verification by checking whether the file exists.
// Because the file path is currently arbitrarily specified by the user,
// Doris is not responsible for ensuring the correctness of the path.
// This is just to prevent overwriting the existing file.
if (FileUtils::check_exist(*file_name)) {
return Status::InternalError("File already exists: " + *file_name
+ ". Host: " + BackendOptions::get_localhost());
}
}
return Status::OK();
}
std::string FileResultWriter::_file_format_to_name() {
@ -291,7 +342,7 @@ Status FileResultWriter::_create_new_file_if_exceed_size() {
return Status::OK();
}
Status FileResultWriter::_close_file_writer(bool done) {
Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
if (_parquet_writer != nullptr) {
_parquet_writer->close();
delete _parquet_writer;
@ -305,13 +356,57 @@ Status FileResultWriter::_close_file_writer(bool done) {
_file_writer = nullptr;
}
if (only_close) {
return Status::OK();
}
if (!done) {
// not finished, create new file writer for next file
RETURN_IF_ERROR(_create_file_writer());
RETURN_IF_ERROR(_create_next_file_writer());
} else {
// All data is written to file, send statistic result
if (_file_opts->success_file_name != "") {
// write success file, just need to touch an empty file
RETURN_IF_ERROR(_create_success_file());
}
RETURN_IF_ERROR(_send_result());
}
return Status::OK();
}
Status FileResultWriter::_send_result() {
if (_is_result_sent) {
return Status::OK();
}
_is_result_sent = true;
// The final stat result include:
// FileNumber, TotalRows, FileSize and URL
// The type of these field should be conssitent with types defined
// in OutFileClause.java of FE.
MysqlRowBuffer row_buffer;
row_buffer.push_int(_file_idx); // file number
row_buffer.push_bigint(_written_rows_counter->value()); // total rows
row_buffer.push_bigint(_written_data_bytes->value()); // file size
std::string localhost = BackendOptions::get_localhost();
row_buffer.push_string(localhost.c_str(), localhost.length()); // url
TFetchDataResult* result = new (std::nothrow) TFetchDataResult();
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
Status st = _sinker->add_batch(result);
if (st.ok()) {
result = nullptr;
} else {
LOG(WARNING) << "failed to send outfile result: " << st.get_error_msg();
}
delete result;
result = nullptr;
return st;
}
Status FileResultWriter::close() {
// the following 2 profile "_written_rows_counter" and "_writer_close_timer"
// must be outside the `_close_file_writer()`.

View File

@ -39,6 +39,7 @@ struct ResultFileOptions {
size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
std::string success_file_name = "";
ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
@ -56,21 +57,29 @@ struct ResultFileOptions {
if (t_opt.__isset.broker_properties) {
broker_properties = t_opt.broker_properties;
}
if (t_opt.__isset.success_file_name) {
success_file_name = t_opt.success_file_name;
}
}
};
class BufferControlBlock;
// write result to file
class FileResultWriter final : public ResultWriter {
public:
FileResultWriter(const ResultFileOptions* file_option,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile);
RuntimeProfile* parent_profile,
BufferControlBlock* sinker);
virtual ~FileResultWriter();
virtual Status init(RuntimeState* state) override;
virtual Status append_row_batch(const RowBatch* batch) override;
virtual Status close() override;
// file result writer always return statistic result in one row
virtual int64_t get_written_rows() const { return 1; }
private:
Status _write_csv_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);
@ -80,14 +89,20 @@ private:
Status _flush_plain_text_outstream(bool eos);
void _init_profile();
Status _create_file_writer();
Status _create_file_writer(const std::string& file_name);
Status _create_next_file_writer();
Status _create_success_file();
// get next export file name
std::string _get_next_file_name();
Status _get_next_file_name(std::string* file_name);
Status _get_success_file_name(std::string* file_name);
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next file
Status _close_file_writer(bool done);
// close file writer, and if !done, it will create new writer for next file.
// if only_close is true, this method will just close the file writer and return.
Status _close_file_writer(bool done, bool only_close = false);
// create a new file if current file size exceed limit
Status _create_new_file_if_exceed_size();
// send the final statistic result
Status _send_result();
private:
RuntimeState* _state; // not owned, set when init
@ -126,6 +141,10 @@ private:
RuntimeProfile::Counter* _written_rows_counter = nullptr;
// bytes of written data
RuntimeProfile::Counter* _written_data_bytes = nullptr;
BufferControlBlock* _sinker;
// set to true if the final statistic result is sent
bool _is_result_sent = false;
};
} // namespace doris

View File

@ -81,7 +81,7 @@ Status ResultSink::prepare(RuntimeState* state) {
case TResultSinkType::FILE:
CHECK(_file_opts.get() != nullptr);
_writer.reset(new (std::nothrow)
FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile));
FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile, _sender.get()));
break;
default:
return Status::InternalError("Unknown result sink type");

View File

@ -38,7 +38,7 @@ public:
virtual Status close() = 0;
int64_t get_written_rows() const { return _written_rows; }
virtual int64_t get_written_rows() const { return _written_rows; }
static const std::string NULL_IN_CSV;

View File

@ -160,20 +160,22 @@ WITH BROKER `broker_name`
4. Example 4
Export simple query results to the file `cos://${bucket_name}/path/result.txt`. Specify the export format as CSV.
And create a mark file after export finished.
```
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "hdfs_broker",
"broker.fs.s3a.access.key" = "xxx",
"broker.fs.s3a.secret.key" = "xxxx",
"broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB"
"max_file_size" = "1024MB",
"success_file_name" = "SUCCESS"
)
```
@ -188,19 +190,30 @@ WITH BROKER `broker_name`
## Return result
The command is a synchronization command. The command returns, which means the operation is over.
At the same time, a row of results will be returned to show the exported execution result.
If it exports and returns normally, the result is as follows:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec)
mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------+
| 1 | 2 | 8 | 192.168.1.10 |
+------------+-----------+----------+--------------+
1 row in set (0.05 sec)
```
`100000 row affected` Indicates the size of the exported result set.
* FileNumber: The number of files finally generated.
* TotalRows: The number of rows in the result set.
* FileSize: The total size of the exported file. Unit byte.
* URL: If it is exported to a local disk, the Compute Node to which it is exported is displayed here.
If the execution is incorrect, an error message will be returned, such as:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
mysql> SELECT * FROM tbl INTO OUTFILE ...
ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
```
## Notice

View File

@ -136,6 +136,7 @@ INTO OUTFILE "file_path"
3. 示例3
将 UNION 语句的查询结果导出到文件 `bos://bucket/result.txt`。指定导出格式为 PARQUET。使用 `my_broker` 并设置 hdfs 高可用信息。PARQUET 格式无需指定列分割符。
导出完成后,生成一个标识文件。
```
SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1
@ -156,46 +157,59 @@ INTO OUTFILE "file_path"
4. 示例4
将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。
```
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "hdfs_broker",
"broker.fs.s3a.access.key" = "xxx",
"broker.fs.s3a.secret.key" = "xxxx",
"broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB"
将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。
导出完成后,生成一个标识文件。
```
select k1,k2,v1 from tbl1 limit 100000
into outfile "s3a://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "hdfs_broker",
"broker.fs.s3a.access.key" = "xxx",
"broker.fs.s3a.secret.key" = "xxxx",
"broker.fs.s3a.endpoint" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB",
"success_file_name" = "SUCCESS"
)
```
最终生成文件如如果不大于 1GB,则为:`my_file_0.csv`。
最终生成文件如如果不大于 1GB,则为:`my_file_0.csv`。
如果大于 1GB,则可能为 `my_file_0.csv, result_1.csv, ...`。
如果大于 1GB,则可能为 `my_file_0.csv, result_1.csv, ...`。
在cos上验证
1. 不存在的path会自动创建
2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。
在cos上验证
1. 不存在的path会自动创建
2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。
## 返回结果
导出命令为同步命令。命令返回,即表示操作结束。
导出命令为同步命令。命令返回,即表示操作结束。同时会返回一行结果来展示导出的执行结果。
如果正常导出并返回,则结果如下:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec)
mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
+------------+-----------+----------+--------------+
| FileNumber | TotalRows | FileSize | URL |
+------------+-----------+----------+--------------+
| 1 | 2 | 8 | 192.168.1.10 |
+------------+-----------+----------+--------------+
1 row in set (0.05 sec)
```
其中 `100000 row affected` 表示导出的结果集行数。
* FileNumber:最终生成的文件个数。
* TotalRows:结果集行数。
* FileSize:导出文件总大小。单位字节。
* URL:如果是导出到本地磁盘,则这里显示具体导出到哪个 Compute Node。
如果执行错误,则会返回错误信息,如:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
mysql> SELECT * FROM tbl INTO OUTFILE ...
ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
```
## 注意事项

View File

@ -17,20 +17,25 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TResultFileSinkOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.clearspring.analytics.util.Lists;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -39,12 +44,28 @@ import java.util.stream.Collectors;
public class OutFileClause {
private static final Logger LOG = LogManager.getLogger(OutFileClause.class);
public static final List<String> RESULT_COL_NAMES = Lists.newArrayList();
public static final List<PrimitiveType> RESULT_COL_TYPES = Lists.newArrayList();
static {
RESULT_COL_NAMES.add("FileNumber");
RESULT_COL_NAMES.add("TotalRows");
RESULT_COL_NAMES.add("FileSize");
RESULT_COL_NAMES.add("URL");
RESULT_COL_TYPES.add(PrimitiveType.INT);
RESULT_COL_TYPES.add(PrimitiveType.BIGINT);
RESULT_COL_TYPES.add(PrimitiveType.BIGINT);
RESULT_COL_TYPES.add(PrimitiveType.VARCHAR);
}
public static final String LOCAL_FILE_PREFIX = "file:///";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
private static final String PROP_LINE_DELIMITER = "line_delimiter";
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB
private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB
@ -60,6 +81,10 @@ public class OutFileClause {
private TFileFormatType fileFormatType;
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private BrokerDesc brokerDesc = null;
// True if result is written to local disk.
// If set to true, the brokerDesc must be null.
private boolean isLocalOutput = false;
private String successFileName = "";
public OutFileClause(String filePath, String format, Map<String, String> properties) {
this.filePath = filePath;
@ -94,9 +119,7 @@ public class OutFileClause {
}
public void analyze(Analyzer analyzer) throws AnalysisException {
if (Strings.isNullOrEmpty(filePath)) {
throw new AnalysisException("Must specify file in OUTFILE clause");
}
analyzeFilePath();
if (!format.equals("csv")) {
throw new AnalysisException("Only support CSV format");
@ -105,11 +128,29 @@ public class OutFileClause {
analyzeProperties();
if (brokerDesc == null) {
if (brokerDesc != null && isLocalOutput) {
throw new AnalysisException("No need to specify BROKER properties in OUTFILE clause for local file output");
} else if (brokerDesc == null && !isLocalOutput) {
throw new AnalysisException("Must specify BROKER properties in OUTFILE clause");
}
}
private void analyzeFilePath() throws AnalysisException {
if (Strings.isNullOrEmpty(filePath)) {
throw new AnalysisException("Must specify file in OUTFILE clause");
}
if (filePath.startsWith(LOCAL_FILE_PREFIX)) {
if (!Config.enable_outfile_to_local) {
throw new AnalysisException("Exporting results to local disk is not allowed.");
}
isLocalOutput = true;
filePath = filePath.substring(LOCAL_FILE_PREFIX.length() - 1); // leave last '/'
} else {
isLocalOutput = false;
}
}
private void analyzeProperties() throws AnalysisException {
if (properties == null || properties.isEmpty()) {
return;
@ -117,9 +158,6 @@ public class OutFileClause {
Set<String> processedPropKeys = Sets.newHashSet();
getBrokerProperties(processedPropKeys);
if (brokerDesc == null) {
return;
}
if (properties.containsKey(PROP_COLUMN_SEPARATOR)) {
if (!isCsvFormat()) {
@ -145,6 +183,12 @@ public class OutFileClause {
processedPropKeys.add(PROP_MAX_FILE_SIZE);
}
if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
FeNameFormat.checkCommonName("file name", successFileName);
processedPropKeys.add(PROP_SUCCESS_FILE_NAME);
}
if (processedPropKeys.size() != properties.size()) {
LOG.debug("{} vs {}", processedPropKeys, properties);
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
@ -210,6 +254,9 @@ public class OutFileClause {
// broker_addresses of sinkOptions will be set in Coordinator.
// Because we need to choose the nearest broker with the result sink node.
}
if (!Strings.isNullOrEmpty(successFileName)) {
sinkOptions.setSuccessFileName(successFileName);
}
return sinkOptions;
}
}

View File

@ -1349,4 +1349,10 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_access_file_without_broker = false;
/**
* Whether to allow the outfile function to export the results to the local disk.
*/
@ConfField
public static boolean enable_outfile_to_local = false;
}

View File

@ -26,6 +26,7 @@ import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.SelectStmt;
@ -41,6 +42,7 @@ import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
@ -102,6 +104,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
// Do one COM_QUERY process.
// first: Parse receive byte array to statement struct.
@ -626,7 +629,7 @@ public class StmtExecutor {
batch = value.getRowBatch();
if (!isSend) {
// send meta fields before sending first data batch.
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
isSend = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
@ -640,7 +643,7 @@ public class StmtExecutor {
statisticsForAuditLog = batch.getQueryStatistics();
}
if (!isSend) {
sendFields(selectStmt.getColLabels(), selectStmt.getResultExprs());
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
isSend = true;
}
context.getState().setEof();
@ -687,7 +690,7 @@ public class StmtExecutor {
if (batch.getBatch() != null) {
cacheAnalyzer.copyRowBatch(batch);
if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
@ -707,7 +710,7 @@ public class StmtExecutor {
cacheAnalyzer.updateCache();
if (!isSendFields) {
sendFields(newSelectStmt.getColLabels(), newSelectStmt.getResultExprs());
sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
isSendFields = true;
}
@ -765,11 +768,15 @@ public class StmtExecutor {
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null && !isOutfileQuery) {
if (batch.getBatch() != null) {
// For some language driver, getting error packet after fields packet will be recognized as a success result
// so We need to send fields after first batch arrived
if (!isSendFields) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
@ -781,16 +788,16 @@ public class StmtExecutor {
break;
}
}
if (!isSendFields && !isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
}
statisticsForAuditLog = batch.getQueryStatistics();
if (!isOutfileQuery) {
context.getState().setEof();
} else {
context.getState().setOk(statisticsForAuditLog.returned_rows, 0, "");
}
context.getState().setEof();
plannerProfile.setQueryFetchResultFinishTime();
}
@ -993,7 +1000,7 @@ public class StmtExecutor {
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
private void sendFields(List<String> colNames, List<Expr> exprs) throws IOException {
private void sendFields(List<String> colNames, List<PrimitiveType> types) throws IOException {
// sends how many columns
serializer.reset();
serializer.writeVInt(colNames.size());
@ -1001,7 +1008,7 @@ public class StmtExecutor {
// send field one by one
for (int i = 0; i < colNames.size(); ++i) {
serializer.reset();
serializer.writeField(colNames.get(i), exprs.get(i).getType().getPrimitiveType());
serializer.writeField(colNames.get(i), types.get(i));
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
// send EOF
@ -1112,5 +1119,9 @@ public class StmtExecutor {
}
return statisticsForAuditLog;
}
private List<PrimitiveType> exprToType(List<Expr> exprs) {
return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList());
}
}

View File

@ -48,6 +48,7 @@ struct TResultFileSinkOptions {
5: optional i64 max_file_size_bytes
6: optional list<Types.TNetworkAddress> broker_addresses; // only for remote file
7: optional map<string, string> broker_properties // only for remote file
8: optional string success_file_name
}
struct TMemoryScratchSink {