[OUTFILE] Support INTO OUTFILE to export query result (#3584)

This CL mainly changes:

1. Support `SELECT INTO OUTFILE` command.
2. Support export query result to a file via Broker.
3. Support CSV export format with specified column separator and line delimiter.
This commit is contained in:
Mingyu Chen
2020-05-25 21:24:56 +08:00
committed by GitHub
parent 6788cacb94
commit 3ffc447b38
36 changed files with 1821 additions and 271 deletions

View File

@ -92,6 +92,7 @@ set(EXEC_FILES
broker_writer.cpp
parquet_scanner.cpp
parquet_reader.cpp
parquet_writer.cpp
orc_scanner.cpp
json_scanner.cpp
assert_num_rows_node.cpp

View File

@ -40,7 +40,7 @@ public:
BrokerWriter(ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& dir,
const std::string& path,
int64_t start_offset);
virtual ~BrokerWriter();

View File

@ -24,12 +24,14 @@
namespace doris {
class RuntimeState;
class LocalFileWriter : public FileWriter {
public:
LocalFileWriter(const std::string& path, int64_t start_offset);
virtual ~LocalFileWriter();
Status open() override;
Status open() override;
virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override;

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet_writer.h"
#include <time.h>
#include <arrow/status.h>
#include <arrow/array.h>
#include "exec/file_writer.h"
#include "common/logging.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/tuple.h"
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "util/thrift_util.h"
namespace doris {
/// ParquetOutputStream
ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) {
set_mode(arrow::io::FileMode::WRITE);
}
ParquetOutputStream::~ParquetOutputStream() {
Close();
}
arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) {
size_t written_len = 0;
Status st = _file_writer->write(reinterpret_cast<const uint8_t*>(data), nbytes, &written_len);
if (!st.ok()) {
return arrow::Status::IOError(st.get_error_msg());
}
_cur_pos += written_len;
return arrow::Status::OK();
}
arrow::Status ParquetOutputStream::Tell(int64_t* position) const {
*position = _cur_pos;
return arrow::Status::OK();
}
arrow::Status ParquetOutputStream::Close() {
Status st = _file_writer->close();
if (!st.ok()) {
return arrow::Status::IOError(st.get_error_msg());
}
_is_closed = true;
return arrow::Status::OK();
}
/// ParquetWriterWrapper
ParquetWriterWrapper::ParquetWriterWrapper(FileWriter *file_writer, const std::vector<ExprContext*>& output_expr_ctxs) :
_output_expr_ctxs(output_expr_ctxs) {
// TODO(cmy): implement
_outstream = new ParquetOutputStream(file_writer);
}
Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
// TODO(cmy): implement
return Status::OK();
}
void ParquetWriterWrapper::close() {
// TODO(cmy): implement
}
ParquetWriterWrapper::~ParquetWriterWrapper() {
close();
}
} // end namespace

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <string>
#include <map>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <arrow/buffer.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
namespace doris {
class ExprContext;
class FileWriter;
class RowBatch;
class ParquetOutputStream : public arrow::io::OutputStream {
public:
ParquetOutputStream(FileWriter* file_writer);
virtual ~ParquetOutputStream();
arrow::Status Write(const void* data, int64_t nbytes) override;
// return the current write position of the stream
arrow::Status Tell(int64_t* position) const override;
arrow::Status Close() override;
bool closed() const override {
return _is_closed;
}
private:
FileWriter* _file_writer; // not owned
int64_t _cur_pos; // current write position
bool _is_closed = false;
};
// a wrapper of parquet output stream
class ParquetWriterWrapper {
public:
ParquetWriterWrapper(FileWriter *file_writer, const std::vector<ExprContext*>& output_expr_ctxs);
virtual ~ParquetWriterWrapper();
Status write(const RowBatch& row_batch);
void close();
private:
ParquetOutputStream* _outstream;
const std::vector<ExprContext*>& _output_expr_ctxs;
};
}

View File

@ -44,8 +44,8 @@ set(RUNTIME_FILES
raw_value.cpp
raw_value_ir.cpp
result_sink.cpp
result_writer.cpp
result_buffer_mgr.cpp
result_writer.cpp
row_batch.cpp
runtime_state.cpp
string_value.cpp
@ -105,6 +105,8 @@ set(RUNTIME_FILES
result_queue_mgr.cpp
memory_scratch_sink.cpp
external_scan_context_mgr.cpp
file_result_writer.cpp
mysql_result_writer.cpp
memory/system_allocator.cpp
memory/chunk_allocator.cpp
)

View File

@ -84,6 +84,15 @@ public:
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}
void update_num_written_rows(int64_t num_rows) {
// _query_statistics may be null when the result sink init failed
// or some other failure.
// and the number of written rows is only needed when all things go well.
if (_query_statistics.get() != nullptr) {
_query_statistics->set_returned_rows(num_rows);
}
}
private:
typedef std::list<TFetchDataResult*> ResultQueue;

View File

@ -0,0 +1,327 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/file_result_writer.h"
#include "exec/broker_writer.h"
#include "exec/local_file_writer.h"
#include "exec/parquet_writer.h"
#include "exprs/expr.h"
#include "runtime/primitive_type.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "runtime/runtime_state.h"
#include "util/types.h"
#include "util/date_func.h"
#include "util/uid_util.h"
#include "gen_cpp/PaloInternalService_types.h"
namespace doris {
const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
FileResultWriter::FileResultWriter(
const ResultFileOptions* file_opts,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile) :
_file_opts(file_opts),
_output_expr_ctxs(output_expr_ctxs),
_parent_profile(parent_profile) {
}
FileResultWriter::~FileResultWriter() {
_close_file_writer(true);
}
Status FileResultWriter::init(RuntimeState* state) {
_state = state;
_init_profile();
RETURN_IF_ERROR(_create_file_writer());
return Status::OK();
}
void FileResultWriter::_init_profile() {
RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true);
_append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
_convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
_file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
_writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
_written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
_written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
}
Status FileResultWriter::_create_file_writer() {
std::string file_name = _get_next_file_name();
if (_file_opts->is_local_file) {
_file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
} else {
_file_writer = new BrokerWriter(_state->exec_env(),
_file_opts->broker_addresses,
_file_opts->broker_properties,
file_name,
0 /*start offset*/);
}
RETURN_IF_ERROR(_file_writer->open());
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
_parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs);
break;
default:
return Status::InternalError(strings::Substitute("unsupport file format: $0", _file_opts->file_format));
}
LOG(INFO) << "create file for exporting query result. file name: " << file_name
<< ". query id: " << print_id(_state->query_id());
return Status::OK();
}
// file name format as: my_prefix_0.csv
std::string FileResultWriter::_get_next_file_name() {
std::stringstream ss;
ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name();
return ss.str();
}
std::string FileResultWriter::_file_format_to_name() {
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
return "csv";
case TFileFormatType::FORMAT_PARQUET:
return "parquet";
default:
return "unknown";
}
}
Status FileResultWriter::append_row_batch(const RowBatch* batch) {
if (nullptr == batch || 0 == batch->num_rows()) {
return Status::OK();
}
SCOPED_TIMER(_append_row_batch_timer);
if (_parquet_writer != nullptr) {
RETURN_IF_ERROR(_parquet_writer->write(*batch));
} else {
RETURN_IF_ERROR(_write_csv_file(*batch));
}
_written_rows += batch->num_rows();
return Status::OK();
}
Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
int num_rows = batch.num_rows();
for (int i = 0; i < num_rows; ++i) {
TupleRow* row = batch.get_row(i);
RETURN_IF_ERROR(_write_one_row_as_csv(row));
}
_flush_plain_text_outstream(true);
return Status::OK();
}
// actually, this logic is same as `ExportSink::gen_row_buffer`
// TODO(cmy): find a way to unify them.
Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) {
{
SCOPED_TIMER(_convert_tuple_timer);
int num_columns = _output_expr_ctxs.size();
for (int i = 0; i < num_columns; ++i) {
void* item = _output_expr_ctxs[i]->get_value(row);
if (item == nullptr) {
_plain_text_outstream << NULL_IN_CSV;
continue;
}
switch (_output_expr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
_plain_text_outstream << (int)*static_cast<int8_t*>(item);
break;
case TYPE_SMALLINT:
_plain_text_outstream << *static_cast<int16_t*>(item);
break;
case TYPE_INT:
_plain_text_outstream << *static_cast<int32_t*>(item);
break;
case TYPE_BIGINT:
_plain_text_outstream << *static_cast<int64_t*>(item);
break;
case TYPE_LARGEINT:
_plain_text_outstream << reinterpret_cast<PackedInt128*>(item)->value;
break;
case TYPE_FLOAT: {
char buffer[MAX_FLOAT_STR_LENGTH + 2];
float float_value = *static_cast<float*>(item);
buffer[0] = '\0';
int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
_plain_text_outstream << buffer;
break;
}
case TYPE_DOUBLE: {
// To prevent loss of precision on float and double types,
// they are converted to strings before output.
// For example: For a double value 27361919854.929001,
// the direct output of using std::stringstream is 2.73619e+10,
// and after conversion to a string, it outputs 27361919854.929001
char buffer[MAX_DOUBLE_STR_LENGTH + 2];
double double_value = *static_cast<double*>(item);
buffer[0] = '\0';
int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
_plain_text_outstream << buffer;
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
char buf[64];
const DateTimeValue* time_val = (const DateTimeValue*)(item);
time_val->to_string(buf);
_plain_text_outstream << buf;
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR: {
const StringValue* string_val = (const StringValue*)(item);
if (string_val->ptr == NULL) {
if (string_val->len != 0) {
_plain_text_outstream << NULL_IN_CSV;
}
} else {
_plain_text_outstream << std::string(string_val->ptr, string_val->len);
}
break;
}
case TYPE_DECIMAL: {
const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val->to_string(output_scale);
} else {
decimal_str = decimal_val->to_string();
}
_plain_text_outstream << decimal_str;
break;
}
case TYPE_DECIMALV2: {
const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val.to_string(output_scale);
} else {
decimal_str = decimal_val.to_string();
}
_plain_text_outstream << decimal_str;
break;
}
default: {
// not supported type, like BITMAP, HLL, just export null
_plain_text_outstream << NULL_IN_CSV;
}
}
if (i < num_columns - 1) {
_plain_text_outstream << _file_opts->column_separator;
}
} // end for columns
_plain_text_outstream << _file_opts->line_delimiter;
}
// write one line to file
return _flush_plain_text_outstream(false);
}
Status FileResultWriter::_flush_plain_text_outstream(bool eos) {
SCOPED_TIMER(_file_write_timer);
size_t pos = _plain_text_outstream.tellp();
if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
return Status::OK();
}
const std::string& buf = _plain_text_outstream.str();
size_t written_len = 0;
RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
buf.size(), &written_len));
COUNTER_UPDATE(_written_data_bytes, written_len);
_current_written_bytes += written_len;
// clear the stream
_plain_text_outstream.str("");
_plain_text_outstream.clear();
// split file if exceed limit
RETURN_IF_ERROR(_create_new_file_if_exceed_size());
return Status::OK();
}
Status FileResultWriter::_create_new_file_if_exceed_size() {
if (_current_written_bytes < _file_opts->max_file_size_bytes) {
return Status::OK();
}
// current file size exceed the max file size. close this file
// and create new one
{
SCOPED_TIMER(_writer_close_timer);
RETURN_IF_ERROR(_close_file_writer(false));
}
_current_written_bytes = 0;
return Status::OK();
}
Status FileResultWriter::_close_file_writer(bool done) {
if (_parquet_writer != nullptr) {
_parquet_writer->close();
delete _parquet_writer;
_parquet_writer = nullptr;
if (!done) {
//TODO(cmy): implement parquet writer later
}
} else if (_file_writer != nullptr) {
_file_writer->close();
delete _file_writer;
_file_writer = nullptr;
}
if (!done) {
// not finished, create new file writer for next file
RETURN_IF_ERROR(_create_file_writer());
}
return Status::OK();
}
Status FileResultWriter::close() {
// the following 2 profile "_written_rows_counter" and "_writer_close_timer"
// must be outside the `_close_file_writer()`.
// because `_close_file_writer()` may be called in deconstructor,
// at that time, the RuntimeState may already been deconstructed,
// so does the profile in RuntimeState.
COUNTER_SET(_written_rows_counter, _written_rows);
SCOPED_TIMER(_writer_close_timer);
RETURN_IF_ERROR(_close_file_writer(true));
return Status::OK();
}
}

View File

@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "runtime/result_writer.h"
#include "runtime/runtime_state.h"
#include "gen_cpp/DataSinks_types.h"
namespace doris {
class ExprContext;
class FileWriter;
class ParquetWriterWrapper;
class RowBatch;
class RuntimeProfile;
class TupleRow;
struct ResultFileOptions {
bool is_local_file;
std::string file_path;
TFileFormatType::type file_format;
std::string column_separator;
std::string line_delimiter;
size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
file_format = t_opt.file_format;
column_separator = t_opt.__isset.column_separator ? t_opt.column_separator : "\t";
line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n";
max_file_size_bytes = t_opt.__isset.max_file_size_bytes ?
t_opt.max_file_size_bytes : max_file_size_bytes;
is_local_file = true;
if (t_opt.__isset.broker_addresses) {
broker_addresses = t_opt.broker_addresses;
is_local_file = false;
}
if (t_opt.__isset.broker_properties) {
broker_properties = t_opt.broker_properties;
}
}
};
// write result to file
class FileResultWriter final : public ResultWriter {
public:
FileResultWriter(const ResultFileOptions* file_option,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile);
virtual ~FileResultWriter();
virtual Status init(RuntimeState* state) override;
virtual Status append_row_batch(const RowBatch* batch) override;
virtual Status close() override;
private:
Status _write_csv_file(const RowBatch& batch);
Status _write_one_row_as_csv(TupleRow* row);
// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
// if eos, write the data even if buffer is not full.
Status _flush_plain_text_outstream(bool eos);
void _init_profile();
Status _create_file_writer();
// get next export file name
std::string _get_next_file_name();
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next file
Status _close_file_writer(bool done);
// create a new file if current file size exceed limit
Status _create_new_file_if_exceed_size();
private:
RuntimeState* _state; // not owned, set when init
const ResultFileOptions* _file_opts;
const std::vector<ExprContext*>& _output_expr_ctxs;
// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
FileWriter* _file_writer = nullptr;
// parquet file writer
ParquetWriterWrapper* _parquet_writer = nullptr;
// Used to buffer the export data of plain text
// TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
// file writer's write() for every single row.
// But this cannot solve the problem of a row of data that is too large.
// For exampel: bitmap_to_string() may return large volumn of data.
// And the speed is relative low, in my test, is about 6.5MB/s.
std::stringstream _plain_text_outstream;
static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
// current written bytes, used for split data
int64_t _current_written_bytes = 0;
// the suffix idx of export file name, start at 0
int _file_idx = 0;
RuntimeProfile* _parent_profile; // profile from result sink, not owned
// total time cost on append batch opertion
RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
// tuple convert timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
// file write timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _file_write_timer = nullptr;
// time of closing the file writer
RuntimeProfile::Counter* _writer_close_timer = nullptr;
// number of written rows
RuntimeProfile::Counter* _written_rows_counter = nullptr;
// bytes of written data
RuntimeProfile::Counter* _written_data_bytes = nullptr;
};
} // end of namespace

View File

@ -0,0 +1,256 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/mysql_result_writer.h"
#include "exprs/expr.h"
#include "runtime/primitive_type.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/buffer_control_block.h"
#include "util/mysql_row_buffer.h"
#include "util/types.h"
#include "util/date_func.h"
#include "gen_cpp/PaloInternalService_types.h"
namespace doris {
MysqlResultWriter::MysqlResultWriter(
BufferControlBlock* sinker,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile) :
_sinker(sinker),
_output_expr_ctxs(output_expr_ctxs),
_row_buffer(NULL),
_parent_profile(parent_profile) {
}
MysqlResultWriter::~MysqlResultWriter() {
delete _row_buffer;
}
Status MysqlResultWriter::init(RuntimeState* state) {
_init_profile();
if (NULL == _sinker) {
return Status::InternalError("sinker is NULL pointer.");
}
_row_buffer = new(std::nothrow) MysqlRowBuffer();
if (NULL == _row_buffer) {
return Status::InternalError("no memory to alloc.");
}
return Status::OK();
}
void MysqlResultWriter::_init_profile() {
RuntimeProfile* profile = _parent_profile->create_child("MySQLResultWriter", true, true);
_append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
_convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
_result_send_timer = ADD_CHILD_TIMER(profile, "ResultRendTime", "AppendBatchTime");
_sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT);
}
Status MysqlResultWriter::_add_one_row(TupleRow* row) {
SCOPED_TIMER(_convert_tuple_timer);
_row_buffer->reset();
int num_columns = _output_expr_ctxs.size();
int buf_ret = 0;
for (int i = 0; 0 == buf_ret && i < num_columns; ++i) {
void* item = _output_expr_ctxs[i]->get_value(row);
if (NULL == item) {
buf_ret = _row_buffer->push_null();
continue;
}
switch (_output_expr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
buf_ret = _row_buffer->push_tinyint(*static_cast<int8_t*>(item));
break;
case TYPE_SMALLINT:
buf_ret = _row_buffer->push_smallint(*static_cast<int16_t*>(item));
break;
case TYPE_INT:
buf_ret = _row_buffer->push_int(*static_cast<int32_t*>(item));
break;
case TYPE_BIGINT:
buf_ret = _row_buffer->push_bigint(*static_cast<int64_t*>(item));
break;
case TYPE_LARGEINT: {
char buf[48];
int len = 48;
char* v = LargeIntValue::to_string(
reinterpret_cast<const PackedInt128*>(item)->value, buf, &len);
buf_ret = _row_buffer->push_string(v, len);
break;
}
case TYPE_FLOAT:
buf_ret = _row_buffer->push_float(*static_cast<float*>(item));
break;
case TYPE_DOUBLE:
buf_ret = _row_buffer->push_double(*static_cast<double*>(item));
break;
case TYPE_TIME: {
double time = *static_cast<double *>(item);
std::string time_str = time_str_from_double(time);
buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size());
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
char buf[64];
const DateTimeValue* time_val = (const DateTimeValue*)(item);
// TODO(zhaochun), this function has core risk
char* pos = time_val->to_string(buf);
buf_ret = _row_buffer->push_string(buf, pos - buf - 1);
break;
}
case TYPE_HLL:
case TYPE_OBJECT: {
buf_ret = _row_buffer->push_null();
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR: {
const StringValue* string_val = (const StringValue*)(item);
if (string_val->ptr == NULL) {
if (string_val->len == 0) {
// 0x01 is a magic num, not usefull actually, just for present ""
char* tmp_val = reinterpret_cast<char*>(0x01);
buf_ret = _row_buffer->push_string(tmp_val, string_val->len);
} else {
buf_ret = _row_buffer->push_null();
}
} else {
buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len);
}
break;
}
case TYPE_DECIMAL: {
const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val->to_string(output_scale);
} else {
decimal_str = decimal_val->to_string();
}
buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length());
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val.to_string(output_scale);
} else {
decimal_str = decimal_val.to_string();
}
buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length());
break;
}
default:
LOG(WARNING) << "can't convert this type to mysql type. type = " <<
_output_expr_ctxs[i]->root()->type();
buf_ret = -1;
break;
}
}
if (0 != buf_ret) {
return Status::InternalError("pack mysql buffer failed.");
}
return Status::OK();
}
Status MysqlResultWriter::append_row_batch(const RowBatch* batch) {
SCOPED_TIMER(_append_row_batch_timer);
if (NULL == batch || 0 == batch->num_rows()) {
return Status::OK();
}
Status status;
// convert one batch
TFetchDataResult* result = new(std::nothrow) TFetchDataResult();
int num_rows = batch->num_rows();
result->result_batch.rows.resize(num_rows);
for (int i = 0; status.ok() && i < num_rows; ++i) {
TupleRow* row = batch->get_row(i);
status = _add_one_row(row);
if (status.ok()) {
result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length());
} else {
LOG(WARNING) << "convert row to mysql result failed.";
break;
}
}
if (status.ok()) {
SCOPED_TIMER(_sent_rows_counter);
// push this batch to back
status = _sinker->add_batch(result);
if (status.ok()) {
result = NULL;
_written_rows += num_rows;
} else {
LOG(WARNING) << "append result batch to sink failed.";
}
}
delete result;
result = NULL;
return status;
}
Status MysqlResultWriter::close() {
COUNTER_SET(_sent_rows_counter, _written_rows);
return Status::OK();
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "runtime/result_writer.h"
#include "runtime/runtime_state.h"
namespace doris {
class TupleRow;
class RowBatch;
class ExprContext;
class MysqlRowBuffer;
class BufferControlBlock;
class RuntimeProfile;
// convert the row batch to mysql protol row
class MysqlResultWriter final : public ResultWriter {
public:
MysqlResultWriter(BufferControlBlock* sinker,
const std::vector<ExprContext*>& output_expr_ctxs,
RuntimeProfile* parent_profile);
virtual ~MysqlResultWriter();
virtual Status init(RuntimeState* state) override;
// convert one row batch to mysql result and
// append this batch to the result sink
virtual Status append_row_batch(const RowBatch* batch) override;
virtual Status close() override;
private:
void _init_profile();
// convert one tuple row
Status _add_one_row(TupleRow* row);
private:
BufferControlBlock* _sinker;
const std::vector<ExprContext*>& _output_expr_ctxs;
MysqlRowBuffer* _row_buffer;
RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
// total time cost on append batch opertion
RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
// tuple convert timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
// file write timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _result_send_timer = nullptr;
// number of sent rows
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
};
} // end of namespace

View File

@ -33,7 +33,7 @@ class QueryStatisticsRecvr;
class QueryStatistics {
public:
QueryStatistics() : scan_rows(0), scan_bytes(0) {
QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) {
}
void merge(const QueryStatistics& other) {
@ -49,17 +49,23 @@ public:
this->scan_bytes += scan_bytes;
}
void set_returned_rows(int64_t num_rows) {
this->returned_rows = num_rows;
}
void merge(QueryStatisticsRecvr* recvr);
void clear() {
scan_rows = 0;
scan_bytes = 0;
returned_rows = 0;
}
void to_pb(PQueryStatistics* statistics) {
DCHECK(statistics != nullptr);
statistics->set_scan_rows(scan_rows);
statistics->set_scan_bytes(scan_bytes);
statistics->set_returned_rows(returned_rows);
}
void merge_pb(const PQueryStatistics& statistics) {
@ -71,6 +77,9 @@ private:
int64_t scan_rows;
int64_t scan_bytes;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
};
// It is used for collecting sub plan query statistics in DataStreamRecvr.

View File

@ -25,7 +25,8 @@
#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_writer.h"
#include "runtime/file_result_writer.h"
#include "runtime/mysql_result_writer.h"
#include "runtime/mem_tracker.h"
namespace doris {
@ -35,6 +36,17 @@ ResultSink::ResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
: _row_desc(row_desc),
_t_output_expr(t_output_expr),
_buf_size(buffer_size) {
if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) {
_sink_type = TResultSinkType::MYSQL_PROTOCAL;
} else {
_sink_type = sink.type;
}
if (_sink_type == TResultSinkType::FILE) {
CHECK(sink.__isset.file_options);
_file_opts.reset(new ResultFileOptions(sink.file_options));
}
}
ResultSink::~ResultSink() {
@ -58,13 +70,25 @@ Status ResultSink::prepare(RuntimeState* state) {
_profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str()));
// prepare output_expr
RETURN_IF_ERROR(prepare_exprs(state));
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), _buf_size, &_sender));
// create writer
_writer.reset(new(std::nothrow) ResultWriter(_sender.get(), _output_expr_ctxs));
RETURN_IF_ERROR(_writer->init(state));
state->fragment_instance_id(), _buf_size, &_sender));
// create writer based on sink type
switch (_sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new(std::nothrow) MysqlResultWriter(_sender.get(), _output_expr_ctxs, _profile));
break;
case TResultSinkType::FILE:
CHECK(_file_opts.get() != nullptr);
_writer.reset(new(std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile));
break;
default:
return Status::InternalError("Unknown result sink type");
}
RETURN_IF_ERROR(_writer->init(state));
return Status::OK();
}
@ -80,9 +104,19 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
Status final_status = exec_status;
// close the writer
Status st = _writer->close();
if (!st.ok() && exec_status.ok()) {
// close file writer failed, should return this error to client
final_status = st;
}
// close sender, this is normal path end
if (_sender) {
_sender->close(exec_status);
_sender->update_num_written_rows(_writer->get_written_rows());
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id());

View File

@ -35,6 +35,7 @@ class BufferControlBlock;
class ExprContext;
class ResultWriter;
class MemTracker;
class ResultFileOptions;
class ResultSink : public DataSink {
public:
@ -60,6 +61,9 @@ public:
private:
Status prepare_exprs(RuntimeState* state);
TResultSinkType::type _sink_type;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts;
ObjectPool* _obj_pool;
// Owned by the RuntimeState.
@ -73,6 +77,7 @@ private:
boost::shared_ptr<ResultWriter> _writer;
RuntimeProfile* _profile; // Allocated from _pool
int _buf_size; // Allocated from _pool
};
}

View File

@ -15,222 +15,11 @@
// specific language governing permissions and limitations
// under the License.
#include "result_writer.h"
#include "exprs/expr.h"
#include "runtime/primitive_type.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/buffer_control_block.h"
#include "util/mysql_row_buffer.h"
#include "util/types.h"
#include "util/date_func.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "runtime/result_writer.h"
namespace doris {
ResultWriter::ResultWriter(
BufferControlBlock* sinker,
const std::vector<ExprContext*>& output_expr_ctxs) :
_sinker(sinker),
_output_expr_ctxs(output_expr_ctxs),
_row_buffer(NULL) {
}
ResultWriter::~ResultWriter() {
delete _row_buffer;
}
Status ResultWriter::init(RuntimeState* state) {
if (NULL == _sinker) {
return Status::InternalError("sinker is NULL pointer.");
}
_row_buffer = new(std::nothrow) MysqlRowBuffer();
if (NULL == _row_buffer) {
return Status::InternalError("no memory to alloc.");
}
return Status::OK();
}
Status ResultWriter::add_one_row(TupleRow* row) {
_row_buffer->reset();
int num_columns = _output_expr_ctxs.size();
int buf_ret = 0;
for (int i = 0; 0 == buf_ret && i < num_columns; ++i) {
void* item = _output_expr_ctxs[i]->get_value(row);
if (NULL == item) {
buf_ret = _row_buffer->push_null();
continue;
}
switch (_output_expr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
buf_ret = _row_buffer->push_tinyint(*static_cast<int8_t*>(item));
break;
case TYPE_SMALLINT:
buf_ret = _row_buffer->push_smallint(*static_cast<int16_t*>(item));
break;
case TYPE_INT:
buf_ret = _row_buffer->push_int(*static_cast<int32_t*>(item));
break;
case TYPE_BIGINT:
buf_ret = _row_buffer->push_bigint(*static_cast<int64_t*>(item));
break;
case TYPE_LARGEINT: {
char buf[48];
int len = 48;
char* v = LargeIntValue::to_string(
reinterpret_cast<const PackedInt128*>(item)->value, buf, &len);
buf_ret = _row_buffer->push_string(v, len);
break;
}
case TYPE_FLOAT:
buf_ret = _row_buffer->push_float(*static_cast<float*>(item));
break;
case TYPE_DOUBLE:
buf_ret = _row_buffer->push_double(*static_cast<double*>(item));
break;
case TYPE_TIME: {
double time = *static_cast<double *>(item);
std::string time_str = time_str_from_double(time);
buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size());
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
char buf[64];
const DateTimeValue* time_val = (const DateTimeValue*)(item);
// TODO(zhaochun), this function has core risk
char* pos = time_val->to_string(buf);
buf_ret = _row_buffer->push_string(buf, pos - buf - 1);
break;
}
case TYPE_HLL:
case TYPE_OBJECT: {
buf_ret = _row_buffer->push_null();
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR: {
const StringValue* string_val = (const StringValue*)(item);
if (string_val->ptr == NULL) {
if (string_val->len == 0) {
// 0x01 is a magic num, not usefull actually, just for present ""
char* tmp_val = reinterpret_cast<char*>(0x01);
buf_ret = _row_buffer->push_string(tmp_val, string_val->len);
} else {
buf_ret = _row_buffer->push_null();
}
} else {
buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len);
}
break;
}
case TYPE_DECIMAL: {
const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val->to_string(output_scale);
} else {
decimal_str = decimal_val->to_string();
}
buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length());
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[i]->root()->output_scale();
if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val.to_string(output_scale);
} else {
decimal_str = decimal_val.to_string();
}
buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length());
break;
}
default:
LOG(WARNING) << "can't convert this type to mysql type. type = " <<
_output_expr_ctxs[i]->root()->type();
buf_ret = -1;
break;
}
}
if (0 != buf_ret) {
return Status::InternalError("pack mysql buffer failed.");
}
return Status::OK();
}
Status ResultWriter::append_row_batch(RowBatch* batch) {
if (NULL == batch || 0 == batch->num_rows()) {
return Status::OK();
}
Status status;
// convert one batch
TFetchDataResult* result = new(std::nothrow) TFetchDataResult();
int num_rows = batch->num_rows();
result->result_batch.rows.resize(num_rows);
for (int i = 0; status.ok() && i < num_rows; ++i) {
TupleRow* row = batch->get_row(i);
status = add_one_row(row);
if (status.ok()) {
result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length());
} else {
LOG(WARNING) << "convert row to mysql result failed.";
break;
}
}
if (status.ok()) {
// push this batch to back
status = _sinker->add_batch(result);
if (status.ok()) {
result = NULL;
} else {
LOG(WARNING) << "append result batch to sink failed.";
}
}
delete result;
result = NULL;
return status;
}
const std::string ResultWriter::NULL_IN_CSV = "\\N";
}

View File

@ -15,43 +15,36 @@
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_RUNTIME_RESULT_WRITER_H
#define DORIS_BE_RUNTIME_RESULT_WRITER_H
#include <vector>
#pragma once
#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
namespace doris {
class TupleRow;
class Status;
class RowBatch;
class ExprContext;
class MysqlRowBuffer;
class BufferControlBlock;
class RuntimeState;
//convert the row batch to mysql protol row
// abstract class of the result writer
class ResultWriter {
public:
ResultWriter(BufferControlBlock* sinker, const std::vector<ExprContext*>& output_expr_ctxs);
~ResultWriter();
ResultWriter() {};
~ResultWriter() {};
Status init(RuntimeState* state);
// convert one row batch to mysql result and
// append this batch to the result sink
Status append_row_batch(RowBatch* batch);
virtual Status init(RuntimeState* state) = 0;
// convert and write one row batch
virtual Status append_row_batch(const RowBatch* batch) = 0;
private:
// convert one tuple row
Status add_one_row(TupleRow* row);
virtual Status close() = 0;
// The expressions that are run to create tuples to be written to hbase.
BufferControlBlock* _sinker;
const std::vector<ExprContext*>& _output_expr_ctxs;
MysqlRowBuffer* _row_buffer;
int64_t get_written_rows() const { return _written_rows; }
static const std::string NULL_IN_CSV;
protected:
int64_t _written_rows = 0; // number of rows written
};
}
#endif

View File

@ -60,5 +60,6 @@ ADD_BE_TEST(heartbeat_flags_test)
ADD_BE_TEST(result_queue_mgr_test)
ADD_BE_TEST(memory_scratch_sink_test)
ADD_BE_TEST(external_scan_context_mgr_test)
ADD_BE_TEST(memory/chunk_allocator_test)
ADD_BE_TEST(memory/system_allocator_test)

View File

@ -106,6 +106,7 @@ module.exports = [
"colocation-join",
"dynamic-partition",
"export_manual",
"outfile",
"privilege",
"small-file-mgr",
"sql-mode",

View File

@ -114,6 +114,7 @@ module.exports = [
"colocation-join",
"dynamic-partition",
"export-manual",
"outfile",
"privilege",
"segment-v2-usage",
"small-file-mgr",

View File

@ -0,0 +1,186 @@
---
{
"title": "Export Query Result",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Export Query Result
This document describes how to use the `SELECT INTO OUTFILE` command to export query results.
## Syntax
The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently, it only supports exporting to remote storage such as HDFS, S3, and BOS through the Broker process. The syntax is as follows:
```
query_stmt
INTO OUTFILE "file_path"
[format_as]
WITH BROKER `broker_name`
[broker_properties]
[other_properties]
```
* `file_path`
`file_path` specify the file path and file name prefix. Like: `hdfs://path/to/my_file_`.
The final file name will be assembled as `my_file_`, file seq no and the format suffix. File seq no starts from 0, determined by the number of split.
```
my_file_0.csv
my_file_1.csv
my_file_2.csv
```
* `[format_as]`
```
FORMAT AS CSV
```
Specify the export format. The default is CSV.
* `[properties]`
Specify the relevant attributes. Currently only export through Broker process is supported. Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker](./broker.html).
```
PROPERTIES
("broker.prop_key" = "broker.prop_val", ...)
```
Other properties
```
PROPERTIELS
("key1" = "val1", "key2" = "val2", ...)
```
currently supports the following properties:
* `column_separator`: Column separator, only applicable to CSV format. The default is `\t`.
* `line_delimiter`: Line delimiter, only applicable to CSV format. The default is `\n`.
* `max_file_size`:The max size of a single file. Default is 1GB. Range from 5MB to 2GB. Files exceeding this size will be splitted.
1. Example 1
Export simple query results to the file `hdfs:/path/to/result.txt`. Specify the export format as CSV. Use `my_broker` and set kerberos authentication information. Specify the column separator as `,` and the line delimiter as `\n`.
```
SELECT * FROM tbl
INTO OUTFILE "hdfs:/path/to/result_"
FORMAT AS CSV
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@YOUR.COM",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
```
If the result is less than 100MB, file will be: `result_0.csv`.
If larger than 100MB, may be: `result_0.csv, result_1.csv, ...`.
2. Example 2
Export the query result of the CTE statement to the file `hdfs:/path/to/result.txt`. The default export format is CSV. Use `my_broker` and set hdfs high availability information. Use the default column separators and line delimiter.
```
WITH
x1 AS
(SELECT k1, k2 FROM tbl1),
x2 AS
(SELECT k3 FROM tbl2)
SELEC k1 FROM x1 UNION SELECT k3 FROM x2
INTO OUTFILE "hdfs:/path/to/result_"
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.username"="user",
"broker.password"="passwd",
"broker.dfs.nameservices" = "my_ha",
"broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
```
If the result is less than 1GB, file will be: `result_0.csv`.
If larger than 1GB, may be: `result_0.csv, result_1.csv, ...`.
3. Example 3
Export the query results of the UNION statement to the file `bos://bucket/result.parquet`. Specify the export format as PARQUET. Use `my_broker` and set hdfs high availability information. PARQUET format does not need to specify the column separator and line delimiter.
```
SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1
INTO OUTFILE "bos://bucket/result_"
FORMAT AS PARQUET
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.bos_endpoint" = "http://bj.bcebos.com",
"broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy"
);
```
If the result is less than 1GB, file will be: `result_0.parquet`.
If larger than 1GB, may be: `result_0.parquet, result_1.parquet, ...`.
## Return result
The command is a synchronization command. The command returns, which means the operation is over.
If it exports and returns normally, the result is as follows:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec)
```
`100000 row affected` Indicates the size of the exported result set.
If the execution is incorrect, an error message will be returned, such as:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
```
## Notice
* The CSV format does not support exporting binary types, such as BITMAP and HLL types. These types will be output as `\N`, which is null.
* The query results are exported from a single BE node and a single thread. Therefore, the export time and the export result set size are positively correlated.
* The export command does not check whether the file and file path exist. Whether the path will be automatically created or whether the existing file will be overwritten is entirely determined by the semantics of the remote storage system.
* If an error occurs during the export process, the exported file may remain on the remote storage system. Doris will not clean these files. The user needs to manually clean up.
* The timeout of the export command is the same as the timeout of the query. It can be set by `SET query_timeout = xxx`.
* For empty result query, there will be an empty file.
* File spliting will ensure that a row of data is stored in a single file. Therefore, the size of the file is not strictly equal to `max_file_size`.

View File

@ -0,0 +1,182 @@
---
{
"title": "导出查询结果集",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# 导出查询结果集
本文档介绍如何使用 `SELECT INTO OUTFILE` 命令进行查询结果的导出操作。
## 语法
`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前仅支持通过 Broker 进程导出到远端存储,如 HDFS,S3,BOS 上。语法如下
```
query_stmt
INTO OUTFILE "file_path"
[format_as]
[properties]
```
* `file_path`
`file_path` 指向文件存储的路径以及文件前缀。如 `hdfs://path/to/my_file_`
最终的文件名将由 `my_file_`,文件序号以及文件格式后缀组成。其中文件序号由0开始,数量为文件被分割的数量。如:
```
my_file_0.csv
my_file_1.csv
my_file_2.csv
```
* `[format_as]`
```
FORMAT AS CSV
```
指定导出格式。默认为 CSV。
* `[properties]`
指定相关属性。目前仅支持通过 Broker 进程进行导出。Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。
```
("broker.prop_key" = "broker.prop_val", ...)
```
其他属性:
```
("key1" = "val1", "key2" = "val2", ...)
```
目前支持以下属性:
* `column_separator`:列分隔符,仅对 CSV 格式适用。默认为 `\t`。
* `line_delimiter`:行分隔符,仅对 CSV 格式适用。默认为 `\n`。
* `max_file_size`:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。
1. 示例1
将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。
```
SELECT * FROM tbl
INTO OUTFILE "hdfs:/path/to/result_"
FORMAT AS CSV
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@YOUR.COM",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
```
最终生成文件如如果不大于 100MB,则为:`result_0.csv`。
如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。
2. 示例2
将 CTE 语句的查询结果导出到文件 `hdfs:/path/to/result.txt`。默认导出格式为 CSV。使用 `my_broker` 并设置 hdfs 高可用信息。使用默认的行列分隔符。
```
WITH
x1 AS
(SELECT k1, k2 FROM tbl1),
x2 AS
(SELECT k3 FROM tbl2)
SELEC k1 FROM x1 UNION SELECT k3 FROM x2
INTO OUTFILE "hdfs:/path/to/result_"
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.username"="user",
"broker.password"="passwd",
"broker.dfs.nameservices" = "my_ha",
"broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
```
最终生成文件如如果不大于 1GB,则为:`result_0.csv`。
如果大于 1GB,则可能为 `result_0.csv, result_1.csv, ...`。
3. 示例3
将 UNION 语句的查询结果导出到文件 `bos://bucket/result.txt`。指定导出格式为 PARQUET。使用 `my_broker` 并设置 hdfs 高可用信息。PARQUET 格式无需指定列分割符。
```
SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1
INTO OUTFILE "bos://bucket/result_"
FORMAT AS PARQUET
PROPERTIELS
(
"broker.name" = "my_broker",
"broker.bos_endpoint" = "http://bj.bcebos.com",
"broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy"
);
```
最终生成文件如如果不大于 1GB,则为:`result_0.parquet`。
如果大于 1GB,则可能为 `result_0.parquet, result_1.parquet, ...`。
## 返回结果
导出命令为同步命令。命令返回,即表示操作结束。
如果正常导出并返回,则结果如下:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec)
```
其中 `100000 row affected` 表示导出的结果集行数。
如果执行错误,则会返回错误信息,如:
```
mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
```
## 注意事项
* 查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。
* 导出命令不会检查文件及文件路径是否存在。是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。
* 如果在导出过程中出现错误,可能会有导出文件残留在远端存储系统上。Doris 不会清理这些文件。需要用户手动清理。
* 导出命令的超时时间同查询的超时时间。可以通过 `SET query_timeout=xxx` 进行设置。
* 对于结果集为空的查询,依然会产生一个大小为0的文件。
* 文件切分会保证一行数据完整的存储在单一文件中。因此文件的大小并不严格等于 `max_file_size`。

View File

@ -246,7 +246,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
KW_LOCAL, KW_LOCATION,
KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MINUTE, KW_MINUS, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_MONTH,
KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS,
KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OVER,
KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER,
KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING,
KW_PLUGIN, KW_PLUGINS,
KW_PRIMARY,
@ -469,6 +469,8 @@ nonterminal Boolean opt_builtin;
nonterminal Boolean opt_tmp;
nonterminal OutFileClause opt_outfile;
precedence left KW_FULL, KW_MERGE;
precedence left DOT;
precedence left SET_VAR;
@ -2465,7 +2467,7 @@ delete_stmt ::=
// even if the union has order by and limit.
// ORDER BY and LIMIT bind to the preceding select statement by default.
query_stmt ::=
opt_with_clause:w set_operand_list:operands
opt_with_clause:w set_operand_list:operands opt_outfile:outfile
{:
QueryStmt queryStmt = null;
if (operands.size() == 1) {
@ -2474,15 +2476,27 @@ query_stmt ::=
queryStmt = new SetOperationStmt(operands, null, LimitElement.NO_LIMIT);
}
queryStmt.setWithClause(w);
queryStmt.setOutFileClause(outfile);
RESULT = queryStmt;
:}
| opt_with_clause:w set_operation_with_order_by_or_limit:set_operation
| opt_with_clause:w set_operation_with_order_by_or_limit:set_operation opt_outfile:outfile
{:
set_operation.setWithClause(w);
set_operation.setOutFileClause(outfile);
RESULT = set_operation;
:}
;
opt_outfile ::=
{:
RESULT = null;
:}
| KW_INTO KW_OUTFILE STRING_LITERAL:file opt_file_format:fileFormat opt_properties:properties
{:
RESULT = new OutFileClause(file, fileFormat, properties);
:}
;
opt_with_clause ::=
KW_WITH with_view_def_list:list
{: RESULT = new WithClause(list); :}

View File

@ -17,9 +17,6 @@
package org.apache.doris.analysis;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@ -27,6 +24,11 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -141,5 +143,9 @@ public class BaseViewStmt extends DdlStmt {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
if (viewDefStmt.hasOutFileClause()) {
throw new AnalysisException("Not support OUTFILE clause in CREATE VIEW statement");
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import com.google.common.collect.Maps;
@ -78,4 +79,14 @@ public class BrokerDesc implements Writable {
desc.readFields(in);
return desc;
}
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" WITH BROKER ").append(name);
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
}
}

View File

@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TResultFileSinkOptions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
// For syntax select * from tbl INTO OUTFILE xxxx
public class OutFileClause {
private static final Logger LOG = LogManager.getLogger(OutFileClause.class);
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
private static final String PROP_LINE_DELIMITER = "line_delimiter";
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB
private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB
private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB
private String filePath;
private String format;
private Map<String, String> properties;
// set following members after analyzing
private String columnSeparator = "\t";
private String lineDelimiter = "\n";
private TFileFormatType fileFormatType;
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private BrokerDesc brokerDesc = null;
public OutFileClause(String filePath, String format, Map<String, String> properties) {
this.filePath = filePath;
this.format = Strings.isNullOrEmpty(format) ? "csv" : format.toLowerCase();
this.properties = properties;
}
public OutFileClause(OutFileClause other) {
this.filePath = other.filePath;
this.format = other.format;
this.properties = other.properties == null ? null : Maps.newHashMap(other.properties);
}
public String getColumnSeparator() {
return columnSeparator;
}
public String getLineDelimiter() {
return lineDelimiter;
}
public TFileFormatType getFileFormatType() {
return fileFormatType;
}
public long getMaxFileSizeBytes() {
return maxFileSizeBytes;
}
public BrokerDesc getBrokerDesc() {
return brokerDesc;
}
public void analyze(Analyzer analyzer) throws AnalysisException {
if (Strings.isNullOrEmpty(filePath)) {
throw new AnalysisException("Must specify file in OUTFILE clause");
}
if (!format.equals("csv")) {
throw new AnalysisException("Only support CSV format");
}
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
analyzeProperties();
if (brokerDesc == null) {
throw new AnalysisException("Must specify BROKER properties in OUTFILE clause");
}
}
private void analyzeProperties() throws AnalysisException {
if (properties == null || properties.isEmpty()) {
return;
}
Set<String> processedPropKeys = Sets.newHashSet();
getBrokerProperties(processedPropKeys);
if (brokerDesc == null) {
return;
}
if (properties.containsKey(PROP_COLUMN_SEPARATOR)) {
if (!isCsvFormat()) {
throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format");
}
columnSeparator = properties.get(PROP_COLUMN_SEPARATOR);
processedPropKeys.add(PROP_COLUMN_SEPARATOR);
}
if (properties.containsKey(PROP_LINE_DELIMITER)) {
if (!isCsvFormat()) {
throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format");
}
lineDelimiter = properties.get(PROP_LINE_DELIMITER);
processedPropKeys.add(PROP_LINE_DELIMITER);
}
if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE));
if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) {
throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes);
}
processedPropKeys.add(PROP_MAX_FILE_SIZE);
}
if (processedPropKeys.size() != properties.size()) {
LOG.debug("{} vs {}", processedPropKeys, properties);
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
.filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList()));
}
}
private void getBrokerProperties(Set<String> processedPropKeys) {
if (!properties.containsKey(PROP_BROKER_NAME)) {
return;
}
String brokerName = properties.get(PROP_BROKER_NAME);
processedPropKeys.add(PROP_BROKER_NAME);
Map<String, String> brokerProps = Maps.newHashMap();
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) {
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue());
processedPropKeys.add(entry.getKey());
}
}
brokerDesc = new BrokerDesc(brokerName, brokerProps);
}
private boolean isCsvFormat() {
return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2
|| fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE
|| fileFormatType == TFileFormatType.FORMAT_CSV_GZ
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZO
|| fileFormatType == TFileFormatType.FORMAT_CSV_LZOP
|| fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN;
}
@Override
public OutFileClause clone() {
return new OutFileClause(this);
}
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ").append(format);
if (properties != null && !properties.isEmpty()) {
sb.append(" PROPERTIES(");
sb.append(new PrintableMap<>(properties, " = ", true, false));
sb.append(")");
}
return sb.toString();
}
public TResultFileSinkOptions toSinkOptions() {
TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, fileFormatType);
if (isCsvFormat()) {
sinkOptions.setColumn_separator(columnSeparator);
sinkOptions.setLine_delimiter(lineDelimiter);
}
sinkOptions.setMax_file_size_bytes(maxFileSizeBytes);
if (brokerDesc != null) {
sinkOptions.setBroker_properties(brokerDesc.getProperties());
// broker_addresses of sinkOptions will be set in Coordinator.
// Because we need to choose the nearest broker with the result sink node.
}
return sinkOptions;
}
}

View File

@ -23,12 +23,12 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.doris.qe.ConnectContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -110,6 +110,9 @@ public abstract class QueryStmt extends StatementBase {
/////////////////////////////////////////
// END: Members that need to be reset()
// represent the "INTO OUTFILE" clause
protected OutFileClause outFileClause;
QueryStmt(ArrayList<OrderByElement> orderByElements, LimitElement limitElement) {
this.orderByElements = orderByElements;
this.limitElement = limitElement;
@ -124,6 +127,7 @@ public abstract class QueryStmt extends StatementBase {
super.analyze(analyzer);
analyzeLimit(analyzer);
if (hasWithClause()) withClause_.analyze(analyzer);
if (hasOutFileClause()) outFileClause.analyze(analyzer);
}
private void analyzeLimit(Analyzer analyzer) throws AnalysisException {
@ -553,12 +557,17 @@ public abstract class QueryStmt extends StatementBase {
return withClause_ != null ? withClause_.clone() : null;
}
public OutFileClause cloneOutfileCluse() {
return outFileClause != null ? outFileClause.clone() : null;
}
/**
* C'tor for cloning.
*/
protected QueryStmt(QueryStmt other) {
super(other);
withClause_ = other.cloneWithClause();
outFileClause = other.cloneOutfileCluse();
orderByElements = other.cloneOrderByElements();
limitElement = other.limitElement.clone();
resultExprs = Expr.cloneList(other.resultExprs);
@ -597,4 +606,16 @@ public abstract class QueryStmt extends StatementBase {
public abstract void substituteSelectList(Analyzer analyzer, List<String> newColLabels)
throws AnalysisException, UserException;
public void setOutFileClause(OutFileClause outFileClause) {
this.outFileClause = outFileClause;
}
public OutFileClause getOutFileClause() {
return outFileClause;
}
public boolean hasOutFileClause() {
return outFileClause != null;
}
}

View File

@ -1540,6 +1540,10 @@ public class SelectStmt extends QueryStmt {
if (hasLimitClause()) {
strBuilder.append(limitElement.toSql());
}
if (hasOutFileClause()) {
strBuilder.append(outFileClause.toSql());
}
return strBuilder.toString();
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.common.util;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@ -42,7 +41,7 @@ public class ParseUtil {
private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)");
public static long analyzeDataVolumn(String dataVolumnStr) throws UserException {
public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException {
long dataVolumn = 0;
Matcher m = dataVolumnPattern.matcher(dataVolumnStr);
if (m.matches()) {

View File

@ -17,19 +17,22 @@
package org.apache.doris.planner;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.UserException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.TreeNode;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TResultSinkType;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.Logger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@ -174,8 +177,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
}
// add ResultSink
Preconditions.checkState(sink == null);
// we're streaming to an exchange node
ResultSink bufferSink = new ResultSink(planRoot.getId());
// we're streaming to an result sink
ResultSink bufferSink = new ResultSink(planRoot.getId(), TResultSinkType.MYSQL_PROTOCAL);
sink = bufferSink;
}
}

View File

@ -195,9 +195,10 @@ public class Planner {
for (PlanFragment fragment : fragments) {
fragment.finalize(analyzer, !queryOptions.allow_unsupported_formats);
}
Collections.reverse(fragments);
setOutfileSink(queryStmt);
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
@ -210,6 +211,21 @@ public class Planner {
}
}
// if query stmt has OUTFILE clause, set info into ResultSink.
// this should be done after fragments are generated.
private void setOutfileSink(QueryStmt queryStmt) {
if (!queryStmt.hasOutFileClause()) {
return;
}
PlanFragment topFragment = fragments.get(0);
if (!(topFragment.getSink() instanceof ResultSink)) {
return;
}
ResultSink resultSink = (ResultSink) topFragment.getSink();
resultSink.setOutfileInfo(queryStmt.getOutFileClause());
}
/**
* If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise
* returns root unchanged.

View File

@ -17,19 +17,33 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultFileSinkOptions;
import org.apache.doris.thrift.TResultSink;
import org.apache.doris.thrift.TResultSinkType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
/**
* Data sink that forwards data to an exchange node.
* Result sink that forwards data to
* 1. the FE data receiver, which result the final query result to user client. Or,
* 2. files that save the result data
*/
public class ResultSink extends DataSink {
private final PlanNodeId exchNodeId;
private TResultSinkType sinkType;
private String brokerName;
private TResultFileSinkOptions fileSinkOptions;
public ResultSink(PlanNodeId exchNodeId) {
public ResultSink(PlanNodeId exchNodeId, TResultSinkType sinkType) {
this.exchNodeId = exchNodeId;
this.sinkType = sinkType;
}
@Override
@ -43,6 +57,10 @@ public class ResultSink extends DataSink {
protected TDataSink toThrift() {
TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
TResultSink tResultSink = new TResultSink();
tResultSink.setType(sinkType);
if (fileSinkOptions != null) {
tResultSink.setFile_options(fileSinkOptions);
}
result.setResult_sink(tResultSink);
return result;
}
@ -56,6 +74,27 @@ public class ResultSink extends DataSink {
public DataPartition getOutputPartition() {
return null;
}
}
/* vim: set ts=4 sw=4 sts=4 tw=100 noet: */
public boolean isOutputFileSink() {
return sinkType == TResultSinkType.FILE;
}
public boolean needBroker() {
return !Strings.isNullOrEmpty(brokerName);
}
public String getBrokerName() {
return brokerName;
}
public void setOutfileInfo(OutFileClause outFileClause) {
sinkType = TResultSinkType.FILE;
fileSinkOptions = outFileClause.toSinkOptions();
brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName();
}
public void setBrokerAddr(String ip, int port) {
Preconditions.checkNotNull(fileSinkOptions);
fileSinkOptions.setBroker_addresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
@ -401,11 +402,21 @@ public class Coordinator {
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
if (topParams.fragment.getSink() instanceof ResultSink) {
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(
topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(topParams.instanceExecParams.get(0).host),
toBrpcHost(topParams.instanceExecParams.get(0).host),
addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr),
queryOptions.query_timeout * 1000);
// set the broker address for OUTFILE sink
ResultSink resultSink = (ResultSink) topParams.fragment.getSink();
if (resultSink.isOutputFileSink() && resultSink.needBroker()) {
FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(resultSink.getBrokerName(),
execBeAddr.getHostname());
resultSink.setBrokerAddr(broker.ip, broker.port);
}
} else {
// This is a load process.
this.queryOptions.setIs_report_success(true);

View File

@ -578,12 +578,23 @@ public class StmtExecutor {
// so We need to send fields after first batch arrived
// send result
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
// We will not send real query result to client. Instead, we only send OK to client with
// number of rows selected. For example:
// mysql> select * from tbl1 into outfile xxx;
// Query OK, 10 rows affected (0.01 sec)
//
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
boolean isOutfileQuery = queryStmt.hasOutFileClause();
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}
while (true) {
batch = coord.getNext();
if (batch.getBatch() != null) {
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null && !isOutfileQuery) {
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
@ -595,7 +606,11 @@ public class StmtExecutor {
}
statisticsForAuditLog = batch.getQueryStatistics();
context.getState().setEof();
if (!isOutfileQuery) {
context.getState().setEof();
} else {
context.getState().setOk(statisticsForAuditLog.returned_rows, 0, "");
}
}
// Process a select statement.
@ -612,6 +627,10 @@ public class StmtExecutor {
insertStmt = (InsertStmt) parsedStmt;
}
if (insertStmt.getQueryStmt().hasOutFileClause()) {
throw new DdlException("Not support OUTFILE clause in INSERT statement");
}
// assign query id before explain query return
UUID uuid = insertStmt.getUUID();
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
@ -913,3 +932,4 @@ public class StmtExecutor {
return statisticsForAuditLog;
}
}

View File

@ -264,6 +264,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("or", new Integer(SqlParserSymbols.KW_OR));
keywordMap.put("order", new Integer(SqlParserSymbols.KW_ORDER));
keywordMap.put("outer", new Integer(SqlParserSymbols.KW_OUTER));
keywordMap.put("outfile", new Integer(SqlParserSymbols.KW_OUTFILE));
keywordMap.put("over", new Integer(SqlParserSymbols.KW_OVER));
keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION));
keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS));

View File

@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto";
message PQueryStatistics {
optional int64 scan_rows = 1;
optional int64 scan_bytes = 2;
optional int64 returned_rows = 3;
}
message PRowBatch {

View File

@ -22,6 +22,7 @@ include "Exprs.thrift"
include "Types.thrift"
include "Descriptors.thrift"
include "Partitions.thrift"
include "PlanNodes.thrift"
enum TDataSinkType {
DATA_STREAM_SINK,
@ -33,6 +34,21 @@ enum TDataSinkType {
MEMORY_SCRATCH_SINK
}
enum TResultSinkType {
MYSQL_PROTOCAL,
FILE
}
struct TResultFileSinkOptions {
1: required string file_path
2: required PlanNodes.TFileFormatType file_format
3: optional string column_separator // only for csv
4: optional string line_delimiter // only for csv
5: optional i64 max_file_size_bytes
6: optional list<Types.TNetworkAddress> broker_addresses; // only for remote file
7: optional map<string, string> broker_properties // only for remote file
}
struct TMemoryScratchSink {
}
@ -52,8 +68,9 @@ struct TDataStreamSink {
3: optional bool ignore_not_found
}
// Reserved for
struct TResultSink {
1: optional TResultSinkType type;
2: optional TResultFileSinkOptions file_options;
}
struct TMysqlTableSink {