From 3ffc447b38e054ed1827d2ed9e451c87e6a3a4dd Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 25 May 2020 21:24:56 +0800 Subject: [PATCH] [OUTFILE] Support `INTO OUTFILE` to export query result (#3584) This CL mainly changes: 1. Support `SELECT INTO OUTFILE` command. 2. Support export query result to a file via Broker. 3. Support CSV export format with specified column separator and line delimiter. --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/broker_writer.h | 2 +- be/src/exec/local_file_writer.h | 4 +- be/src/exec/parquet_writer.cpp | 91 +++++ be/src/exec/parquet_writer.h | 81 +++++ be/src/runtime/CMakeLists.txt | 4 +- be/src/runtime/buffer_control_block.h | 9 + be/src/runtime/file_result_writer.cpp | 327 ++++++++++++++++++ be/src/runtime/file_result_writer.h | 132 +++++++ be/src/runtime/mysql_result_writer.cpp | 256 ++++++++++++++ be/src/runtime/mysql_result_writer.h | 69 ++++ be/src/runtime/query_statistics.h | 11 +- be/src/runtime/result_sink.cpp | 46 ++- be/src/runtime/result_sink.h | 5 + be/src/runtime/result_writer.cpp | 215 +----------- be/src/runtime/result_writer.h | 39 +-- be/test/runtime/CMakeLists.txt | 1 + docs/.vuepress/sidebar/en.js | 1 + docs/.vuepress/sidebar/zh-CN.js | 1 + docs/en/administrator-guide/outfile.md | 186 ++++++++++ docs/zh-CN/administrator-guide/outfile.md | 182 ++++++++++ fe/src/main/cup/sql_parser.cup | 20 +- .../apache/doris/analysis/BaseViewStmt.java | 12 +- .../org/apache/doris/analysis/BrokerDesc.java | 11 + .../apache/doris/analysis/OutFileClause.java | 216 ++++++++++++ .../org/apache/doris/analysis/QueryStmt.java | 23 +- .../org/apache/doris/analysis/SelectStmt.java | 4 + .../apache/doris/common/util/ParseUtil.java | 3 +- .../apache/doris/planner/PlanFragment.java | 13 +- .../org/apache/doris/planner/Planner.java | 18 +- .../org/apache/doris/planner/ResultSink.java | 47 ++- .../java/org/apache/doris/qe/Coordinator.java | 15 +- .../org/apache/doris/qe/StmtExecutor.java | 26 +- fe/src/main/jflex/sql_scanner.flex | 1 + gensrc/proto/data.proto | 1 + gensrc/thrift/DataSinks.thrift | 19 +- 36 files changed, 1821 insertions(+), 271 deletions(-) create mode 100644 be/src/exec/parquet_writer.cpp create mode 100644 be/src/exec/parquet_writer.h create mode 100644 be/src/runtime/file_result_writer.cpp create mode 100644 be/src/runtime/file_result_writer.h create mode 100644 be/src/runtime/mysql_result_writer.cpp create mode 100644 be/src/runtime/mysql_result_writer.h create mode 100644 docs/en/administrator-guide/outfile.md create mode 100644 docs/zh-CN/administrator-guide/outfile.md create mode 100644 fe/src/main/java/org/apache/doris/analysis/OutFileClause.java diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index f949ef3f5a..4844338cae 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -92,6 +92,7 @@ set(EXEC_FILES broker_writer.cpp parquet_scanner.cpp parquet_reader.cpp + parquet_writer.cpp orc_scanner.cpp json_scanner.cpp assert_num_rows_node.cpp diff --git a/be/src/exec/broker_writer.h b/be/src/exec/broker_writer.h index f5efe7b20e..1bb5b757fa 100644 --- a/be/src/exec/broker_writer.h +++ b/be/src/exec/broker_writer.h @@ -40,7 +40,7 @@ public: BrokerWriter(ExecEnv* env, const std::vector& broker_addresses, const std::map& properties, - const std::string& dir, + const std::string& path, int64_t start_offset); virtual ~BrokerWriter(); diff --git a/be/src/exec/local_file_writer.h b/be/src/exec/local_file_writer.h index dc116698e0..a5d2b4d82d 100644 --- a/be/src/exec/local_file_writer.h +++ b/be/src/exec/local_file_writer.h @@ -24,12 +24,14 @@ namespace doris { +class RuntimeState; + class LocalFileWriter : public FileWriter { public: LocalFileWriter(const std::string& path, int64_t start_offset); virtual ~LocalFileWriter(); - Status open() override; + Status open() override; virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override; diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp new file mode 100644 index 0000000000..6c9f403316 --- /dev/null +++ b/be/src/exec/parquet_writer.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet_writer.h" + +#include +#include +#include + +#include "exec/file_writer.h" +#include "common/logging.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "util/thrift_util.h" + +namespace doris { + +/// ParquetOutputStream +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) { + set_mode(arrow::io::FileMode::WRITE); +} + +ParquetOutputStream::~ParquetOutputStream() { + Close(); +} + +arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + size_t written_len = 0; + Status st = _file_writer->write(reinterpret_cast(data), nbytes, &written_len); + if (!st.ok()) { + return arrow::Status::IOError(st.get_error_msg()); + } + _cur_pos += written_len; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Tell(int64_t* position) const { + *position = _cur_pos; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Close() { + Status st = _file_writer->close(); + if (!st.ok()) { + return arrow::Status::IOError(st.get_error_msg()); + } + _is_closed = true; + return arrow::Status::OK(); +} + +/// ParquetWriterWrapper +ParquetWriterWrapper::ParquetWriterWrapper(FileWriter *file_writer, const std::vector& output_expr_ctxs) : + _output_expr_ctxs(output_expr_ctxs) { + // TODO(cmy): implement + _outstream = new ParquetOutputStream(file_writer); +} + +Status ParquetWriterWrapper::write(const RowBatch& row_batch) { + // TODO(cmy): implement + return Status::OK(); +} + +void ParquetWriterWrapper::close() { + // TODO(cmy): implement +} + +ParquetWriterWrapper::~ParquetWriterWrapper() { + close(); +} + +} // end namespace diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h new file mode 100644 index 0000000000..5147d2105a --- /dev/null +++ b/be/src/exec/parquet_writer.h @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/Types_types.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/PlanNodes_types.h" + +namespace doris { + +class ExprContext; +class FileWriter; +class RowBatch; + +class ParquetOutputStream : public arrow::io::OutputStream { +public: + ParquetOutputStream(FileWriter* file_writer); + virtual ~ParquetOutputStream(); + + arrow::Status Write(const void* data, int64_t nbytes) override; + // return the current write position of the stream + arrow::Status Tell(int64_t* position) const override; + arrow::Status Close() override; + + bool closed() const override { + return _is_closed; + } +private: + FileWriter* _file_writer; // not owned + int64_t _cur_pos; // current write position + bool _is_closed = false; +}; + +// a wrapper of parquet output stream +class ParquetWriterWrapper { +public: + ParquetWriterWrapper(FileWriter *file_writer, const std::vector& output_expr_ctxs); + virtual ~ParquetWriterWrapper(); + + Status write(const RowBatch& row_batch); + + void close(); + +private: + ParquetOutputStream* _outstream; + const std::vector& _output_expr_ctxs; +}; + +} + diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 40fa551741..642668ef59 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -44,8 +44,8 @@ set(RUNTIME_FILES raw_value.cpp raw_value_ir.cpp result_sink.cpp - result_writer.cpp result_buffer_mgr.cpp + result_writer.cpp row_batch.cpp runtime_state.cpp string_value.cpp @@ -105,6 +105,8 @@ set(RUNTIME_FILES result_queue_mgr.cpp memory_scratch_sink.cpp external_scan_context_mgr.cpp + file_result_writer.cpp + mysql_result_writer.cpp memory/system_allocator.cpp memory/chunk_allocator.cpp ) diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 71682287e0..cd3f80f7fb 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -84,6 +84,15 @@ public: void set_query_statistics(std::shared_ptr statistics) { _query_statistics = statistics; } + + void update_num_written_rows(int64_t num_rows) { + // _query_statistics may be null when the result sink init failed + // or some other failure. + // and the number of written rows is only needed when all things go well. + if (_query_statistics.get() != nullptr) { + _query_statistics->set_returned_rows(num_rows); + } + } private: typedef std::list ResultQueue; diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp new file mode 100644 index 0000000000..2cc6102d81 --- /dev/null +++ b/be/src/runtime/file_result_writer.cpp @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/file_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/runtime_state.h" +#include "util/types.h" +#include "util/date_func.h" +#include "util/uid_util.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; + +FileResultWriter::FileResultWriter( + const ResultFileOptions* file_opts, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _file_opts(file_opts), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile) { +} + +FileResultWriter::~FileResultWriter() { + _close_file_writer(true); +} + +Status FileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + + RETURN_IF_ERROR(_create_file_writer()); + return Status::OK(); +} + +void FileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status FileResultWriter::_create_file_writer() { + std::string file_name = _get_next_file_name(); + if (_file_opts->is_local_file) { + _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); + } else { + _file_writer = new BrokerWriter(_state->exec_env(), + _file_opts->broker_addresses, + _file_opts->broker_properties, + file_name, + 0 /*start offset*/); + } + RETURN_IF_ERROR(_file_writer->open()); + + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); + break; + default: + return Status::InternalError(strings::Substitute("unsupport file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()); + return Status::OK(); +} + +// file name format as: my_prefix_0.csv +std::string FileResultWriter::_get_next_file_name() { + std::stringstream ss; + ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name(); + return ss.str(); +} + +std::string FileResultWriter::_file_format_to_name() { + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + return "csv"; + case TFileFormatType::FORMAT_PARQUET: + return "parquet"; + default: + return "unknown"; + } +} + +Status FileResultWriter::append_row_batch(const RowBatch* batch) { + if (nullptr == batch || 0 == batch->num_rows()) { + return Status::OK(); + } + + SCOPED_TIMER(_append_row_batch_timer); + if (_parquet_writer != nullptr) { + RETURN_IF_ERROR(_parquet_writer->write(*batch)); + } else { + RETURN_IF_ERROR(_write_csv_file(*batch)); + } + + _written_rows += batch->num_rows(); + return Status::OK(); +} + +Status FileResultWriter::_write_csv_file(const RowBatch& batch) { + int num_rows = batch.num_rows(); + for (int i = 0; i < num_rows; ++i) { + TupleRow* row = batch.get_row(i); + RETURN_IF_ERROR(_write_one_row_as_csv(row)); + } + _flush_plain_text_outstream(true); + return Status::OK(); +} + +// actually, this logic is same as `ExportSink::gen_row_buffer` +// TODO(cmy): find a way to unify them. +Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { + { + SCOPED_TIMER(_convert_tuple_timer); + int num_columns = _output_expr_ctxs.size(); + for (int i = 0; i < num_columns; ++i) { + void* item = _output_expr_ctxs[i]->get_value(row); + + if (item == nullptr) { + _plain_text_outstream << NULL_IN_CSV; + continue; + } + + switch (_output_expr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + _plain_text_outstream << (int)*static_cast(item); + break; + case TYPE_SMALLINT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_INT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_BIGINT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_LARGEINT: + _plain_text_outstream << reinterpret_cast(item)->value; + break; + case TYPE_FLOAT: { + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = *static_cast(item); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DOUBLE: { + // To prevent loss of precision on float and double types, + // they are converted to strings before output. + // For example: For a double value 27361919854.929001, + // the direct output of using std::stringstream is 2.73619e+10, + // and after conversion to a string, it outputs 27361919854.929001 + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = *static_cast(item); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const DateTimeValue* time_val = (const DateTimeValue*)(item); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: { + const StringValue* string_val = (const StringValue*)(item); + if (string_val->ptr == NULL) { + if (string_val->len != 0) { + _plain_text_outstream << NULL_IN_CSV; + } + } else { + _plain_text_outstream << std::string(string_val->ptr, string_val->len); + } + break; + } + case TYPE_DECIMAL: { + const DecimalValue* decimal_val = reinterpret_cast(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val(reinterpret_cast(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + default: { + // not supported type, like BITMAP, HLL, just export null + _plain_text_outstream << NULL_IN_CSV; + } + } + if (i < num_columns - 1) { + _plain_text_outstream << _file_opts->column_separator; + } + } // end for columns + _plain_text_outstream << _file_opts->line_delimiter; + } + + // write one line to file + return _flush_plain_text_outstream(false); +} + +Status FileResultWriter::_flush_plain_text_outstream(bool eos) { + SCOPED_TIMER(_file_write_timer); + size_t pos = _plain_text_outstream.tellp(); + if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { + return Status::OK(); + } + + const std::string& buf = _plain_text_outstream.str(); + size_t written_len = 0; + RETURN_IF_ERROR(_file_writer->write(reinterpret_cast(buf.c_str()), + buf.size(), &written_len)); + COUNTER_UPDATE(_written_data_bytes, written_len); + _current_written_bytes += written_len; + + // clear the stream + _plain_text_outstream.str(""); + _plain_text_outstream.clear(); + + // split file if exceed limit + RETURN_IF_ERROR(_create_new_file_if_exceed_size()); + + return Status::OK(); +} + +Status FileResultWriter::_create_new_file_if_exceed_size() { + if (_current_written_bytes < _file_opts->max_file_size_bytes) { + return Status::OK(); + } + // current file size exceed the max file size. close this file + // and create new one + { + SCOPED_TIMER(_writer_close_timer); + RETURN_IF_ERROR(_close_file_writer(false)); + } + _current_written_bytes = 0; + return Status::OK(); +} + +Status FileResultWriter::_close_file_writer(bool done) { + if (_parquet_writer != nullptr) { + _parquet_writer->close(); + delete _parquet_writer; + _parquet_writer = nullptr; + if (!done) { + //TODO(cmy): implement parquet writer later + } + } else if (_file_writer != nullptr) { + _file_writer->close(); + delete _file_writer; + _file_writer = nullptr; + } + + if (!done) { + // not finished, create new file writer for next file + RETURN_IF_ERROR(_create_file_writer()); + } + return Status::OK(); +} + +Status FileResultWriter::close() { + // the following 2 profile "_written_rows_counter" and "_writer_close_timer" + // must be outside the `_close_file_writer()`. + // because `_close_file_writer()` may be called in deconstructor, + // at that time, the RuntimeState may already been deconstructed, + // so does the profile in RuntimeState. + COUNTER_SET(_written_rows_counter, _written_rows); + SCOPED_TIMER(_writer_close_timer); + RETURN_IF_ERROR(_close_file_writer(true)); + return Status::OK(); +} + +} + diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h new file mode 100644 index 0000000000..bc0040327e --- /dev/null +++ b/be/src/runtime/file_result_writer.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" +#include "gen_cpp/DataSinks_types.h" + +namespace doris { + +class ExprContext; +class FileWriter; +class ParquetWriterWrapper; +class RowBatch; +class RuntimeProfile; +class TupleRow; + +struct ResultFileOptions { + bool is_local_file; + std::string file_path; + TFileFormatType::type file_format; + std::string column_separator; + std::string line_delimiter; + size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB + std::vector broker_addresses; + std::map broker_properties; + + ResultFileOptions(const TResultFileSinkOptions& t_opt) { + file_path = t_opt.file_path; + file_format = t_opt.file_format; + column_separator = t_opt.__isset.column_separator ? t_opt.column_separator : "\t"; + line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n"; + max_file_size_bytes = t_opt.__isset.max_file_size_bytes ? + t_opt.max_file_size_bytes : max_file_size_bytes; + + is_local_file = true; + if (t_opt.__isset.broker_addresses) { + broker_addresses = t_opt.broker_addresses; + is_local_file = false; + } + if (t_opt.__isset.broker_properties) { + broker_properties = t_opt.broker_properties; + } + } +}; + +// write result to file +class FileResultWriter final : public ResultWriter { +public: + FileResultWriter(const ResultFileOptions* file_option, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~FileResultWriter(); + + virtual Status init(RuntimeState* state) override; + virtual Status append_row_batch(const RowBatch* batch) override; + virtual Status close() override; + +private: + Status _write_csv_file(const RowBatch& batch); + Status _write_one_row_as_csv(TupleRow* row); + + // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer + // if eos, write the data even if buffer is not full. + Status _flush_plain_text_outstream(bool eos); + void _init_profile(); + + Status _create_file_writer(); + // get next export file name + std::string _get_next_file_name(); + std::string _file_format_to_name(); + // close file writer, and if !done, it will create new writer for next file + Status _close_file_writer(bool done); + // create a new file if current file size exceed limit + Status _create_new_file_if_exceed_size(); + +private: + RuntimeState* _state; // not owned, set when init + const ResultFileOptions* _file_opts; + const std::vector& _output_expr_ctxs; + + // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. + // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. + FileWriter* _file_writer = nullptr; + // parquet file writer + ParquetWriterWrapper* _parquet_writer = nullptr; + // Used to buffer the export data of plain text + // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling + // file writer's write() for every single row. + // But this cannot solve the problem of a row of data that is too large. + // For exampel: bitmap_to_string() may return large volumn of data. + // And the speed is relative low, in my test, is about 6.5MB/s. + std::stringstream _plain_text_outstream; + static const size_t OUTSTREAM_BUFFER_SIZE_BYTES; + + // current written bytes, used for split data + int64_t _current_written_bytes = 0; + // the suffix idx of export file name, start at 0 + int _file_idx = 0; + + RuntimeProfile* _parent_profile; // profile from result sink, not owned + // total time cost on append batch opertion + RuntimeProfile::Counter* _append_row_batch_timer = nullptr; + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _file_write_timer = nullptr; + // time of closing the file writer + RuntimeProfile::Counter* _writer_close_timer = nullptr; + // number of written rows + RuntimeProfile::Counter* _written_rows_counter = nullptr; + // bytes of written data + RuntimeProfile::Counter* _written_data_bytes = nullptr; +}; + +} // end of namespace + diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp new file mode 100644 index 0000000000..a0d2426478 --- /dev/null +++ b/be/src/runtime/mysql_result_writer.cpp @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/mysql_result_writer.h" + +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/buffer_control_block.h" +#include "util/mysql_row_buffer.h" +#include "util/types.h" +#include "util/date_func.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +MysqlResultWriter::MysqlResultWriter( + BufferControlBlock* sinker, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _sinker(sinker), + _output_expr_ctxs(output_expr_ctxs), + _row_buffer(NULL), + _parent_profile(parent_profile) { +} + +MysqlResultWriter::~MysqlResultWriter() { + delete _row_buffer; +} + +Status MysqlResultWriter::init(RuntimeState* state) { + _init_profile(); + if (NULL == _sinker) { + return Status::InternalError("sinker is NULL pointer."); + } + + _row_buffer = new(std::nothrow) MysqlRowBuffer(); + + if (NULL == _row_buffer) { + return Status::InternalError("no memory to alloc."); + } + + return Status::OK(); +} + +void MysqlResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("MySQLResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _result_send_timer = ADD_CHILD_TIMER(profile, "ResultRendTime", "AppendBatchTime"); + _sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT); +} + +Status MysqlResultWriter::_add_one_row(TupleRow* row) { + SCOPED_TIMER(_convert_tuple_timer); + _row_buffer->reset(); + int num_columns = _output_expr_ctxs.size(); + int buf_ret = 0; + + for (int i = 0; 0 == buf_ret && i < num_columns; ++i) { + void* item = _output_expr_ctxs[i]->get_value(row); + + if (NULL == item) { + buf_ret = _row_buffer->push_null(); + continue; + } + + switch (_output_expr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + buf_ret = _row_buffer->push_tinyint(*static_cast(item)); + break; + + case TYPE_SMALLINT: + buf_ret = _row_buffer->push_smallint(*static_cast(item)); + break; + + case TYPE_INT: + buf_ret = _row_buffer->push_int(*static_cast(item)); + break; + + case TYPE_BIGINT: + buf_ret = _row_buffer->push_bigint(*static_cast(item)); + break; + + case TYPE_LARGEINT: { + char buf[48]; + int len = 48; + char* v = LargeIntValue::to_string( + reinterpret_cast(item)->value, buf, &len); + buf_ret = _row_buffer->push_string(v, len); + break; + } + + case TYPE_FLOAT: + buf_ret = _row_buffer->push_float(*static_cast(item)); + break; + + case TYPE_DOUBLE: + buf_ret = _row_buffer->push_double(*static_cast(item)); + break; + + case TYPE_TIME: { + double time = *static_cast(item); + std::string time_str = time_str_from_double(time); + buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size()); + break; + } + + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const DateTimeValue* time_val = (const DateTimeValue*)(item); + // TODO(zhaochun), this function has core risk + char* pos = time_val->to_string(buf); + buf_ret = _row_buffer->push_string(buf, pos - buf - 1); + break; + } + + case TYPE_HLL: + case TYPE_OBJECT: { + buf_ret = _row_buffer->push_null(); + break; + } + + case TYPE_VARCHAR: + case TYPE_CHAR: { + const StringValue* string_val = (const StringValue*)(item); + + if (string_val->ptr == NULL) { + if (string_val->len == 0) { + // 0x01 is a magic num, not usefull actually, just for present "" + char* tmp_val = reinterpret_cast(0x01); + buf_ret = _row_buffer->push_string(tmp_val, string_val->len); + } else { + buf_ret = _row_buffer->push_null(); + } + } else { + buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len); + } + + break; + } + + case TYPE_DECIMAL: { + const DecimalValue* decimal_val = reinterpret_cast(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + + buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); + break; + } + + case TYPE_DECIMALV2: { + DecimalV2Value decimal_val(reinterpret_cast(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + + buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); + break; + } + + default: + LOG(WARNING) << "can't convert this type to mysql type. type = " << + _output_expr_ctxs[i]->root()->type(); + buf_ret = -1; + break; + } + } + + if (0 != buf_ret) { + return Status::InternalError("pack mysql buffer failed."); + } + + return Status::OK(); +} + +Status MysqlResultWriter::append_row_batch(const RowBatch* batch) { + SCOPED_TIMER(_append_row_batch_timer); + if (NULL == batch || 0 == batch->num_rows()) { + return Status::OK(); + } + + Status status; + // convert one batch + TFetchDataResult* result = new(std::nothrow) TFetchDataResult(); + int num_rows = batch->num_rows(); + result->result_batch.rows.resize(num_rows); + + for (int i = 0; status.ok() && i < num_rows; ++i) { + TupleRow* row = batch->get_row(i); + status = _add_one_row(row); + + if (status.ok()) { + result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length()); + } else { + LOG(WARNING) << "convert row to mysql result failed."; + break; + } + } + + if (status.ok()) { + SCOPED_TIMER(_sent_rows_counter); + // push this batch to back + status = _sinker->add_batch(result); + + if (status.ok()) { + result = NULL; + _written_rows += num_rows; + } else { + LOG(WARNING) << "append result batch to sink failed."; + } + } + + delete result; + result = NULL; + + return status; +} + +Status MysqlResultWriter::close() { + COUNTER_SET(_sent_rows_counter, _written_rows); + return Status::OK(); +} + +} + diff --git a/be/src/runtime/mysql_result_writer.h b/be/src/runtime/mysql_result_writer.h new file mode 100644 index 0000000000..57134afc89 --- /dev/null +++ b/be/src/runtime/mysql_result_writer.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" + +namespace doris { + +class TupleRow; +class RowBatch; +class ExprContext; +class MysqlRowBuffer; +class BufferControlBlock; +class RuntimeProfile; + +// convert the row batch to mysql protol row +class MysqlResultWriter final : public ResultWriter { +public: + MysqlResultWriter(BufferControlBlock* sinker, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~MysqlResultWriter(); + + virtual Status init(RuntimeState* state) override; + // convert one row batch to mysql result and + // append this batch to the result sink + virtual Status append_row_batch(const RowBatch* batch) override; + + virtual Status close() override; + +private: + void _init_profile(); + // convert one tuple row + Status _add_one_row(TupleRow* row); + +private: + BufferControlBlock* _sinker; + const std::vector& _output_expr_ctxs; + MysqlRowBuffer* _row_buffer; + + RuntimeProfile* _parent_profile; // parent profile from result sink. not owned + // total time cost on append batch opertion + RuntimeProfile::Counter* _append_row_batch_timer = nullptr; + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _result_send_timer = nullptr; + // number of sent rows + RuntimeProfile::Counter* _sent_rows_counter = nullptr; +}; + +} // end of namespace + diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index d0a247d1a5..a110653618 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -33,7 +33,7 @@ class QueryStatisticsRecvr; class QueryStatistics { public: - QueryStatistics() : scan_rows(0), scan_bytes(0) { + QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) { } void merge(const QueryStatistics& other) { @@ -49,17 +49,23 @@ public: this->scan_bytes += scan_bytes; } + void set_returned_rows(int64_t num_rows) { + this->returned_rows = num_rows; + } + void merge(QueryStatisticsRecvr* recvr); void clear() { scan_rows = 0; scan_bytes = 0; + returned_rows = 0; } void to_pb(PQueryStatistics* statistics) { DCHECK(statistics != nullptr); statistics->set_scan_rows(scan_rows); statistics->set_scan_bytes(scan_bytes); + statistics->set_returned_rows(returned_rows); } void merge_pb(const PQueryStatistics& statistics) { @@ -71,6 +77,9 @@ private: int64_t scan_rows; int64_t scan_bytes; + // number rows returned by query. + // only set once by result sink when closing. + int64_t returned_rows; }; // It is used for collecting sub plan query statistics in DataStreamRecvr. diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index 85b724d6ad..be51a5b991 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -25,7 +25,8 @@ #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" #include "runtime/buffer_control_block.h" -#include "runtime/result_writer.h" +#include "runtime/file_result_writer.h" +#include "runtime/mysql_result_writer.h" #include "runtime/mem_tracker.h" namespace doris { @@ -35,6 +36,17 @@ ResultSink::ResultSink(const RowDescriptor& row_desc, const std::vector& : _row_desc(row_desc), _t_output_expr(t_output_expr), _buf_size(buffer_size) { + + if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) { + _sink_type = TResultSinkType::MYSQL_PROTOCAL; + } else { + _sink_type = sink.type; + } + + if (_sink_type == TResultSinkType::FILE) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + } } ResultSink::~ResultSink() { @@ -58,13 +70,25 @@ Status ResultSink::prepare(RuntimeState* state) { _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); // prepare output_expr RETURN_IF_ERROR(prepare_exprs(state)); + // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender)); - // create writer - _writer.reset(new(std::nothrow) ResultWriter(_sender.get(), _output_expr_ctxs)); - RETURN_IF_ERROR(_writer->init(state)); + state->fragment_instance_id(), _buf_size, &_sender)); + + // create writer based on sink type + switch (_sink_type) { + case TResultSinkType::MYSQL_PROTOCAL: + _writer.reset(new(std::nothrow) MysqlResultWriter(_sender.get(), _output_expr_ctxs, _profile)); + break; + case TResultSinkType::FILE: + CHECK(_file_opts.get() != nullptr); + _writer.reset(new(std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile)); + break; + default: + return Status::InternalError("Unknown result sink type"); + } + RETURN_IF_ERROR(_writer->init(state)); return Status::OK(); } @@ -80,9 +104,19 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } + + Status final_status = exec_status; + // close the writer + Status st = _writer->close(); + if (!st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = st; + } + // close sender, this is normal path end if (_sender) { - _sender->close(exec_status); + _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->close(final_status); } state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + config::result_buffer_cancelled_interval_time, state->fragment_instance_id()); diff --git a/be/src/runtime/result_sink.h b/be/src/runtime/result_sink.h index 53739740bd..bf732b1c5d 100644 --- a/be/src/runtime/result_sink.h +++ b/be/src/runtime/result_sink.h @@ -35,6 +35,7 @@ class BufferControlBlock; class ExprContext; class ResultWriter; class MemTracker; +class ResultFileOptions; class ResultSink : public DataSink { public: @@ -60,6 +61,9 @@ public: private: Status prepare_exprs(RuntimeState* state); + TResultSinkType::type _sink_type; + // set file options when sink type is FILE + std::unique_ptr _file_opts; ObjectPool* _obj_pool; // Owned by the RuntimeState. @@ -73,6 +77,7 @@ private: boost::shared_ptr _writer; RuntimeProfile* _profile; // Allocated from _pool int _buf_size; // Allocated from _pool + }; } diff --git a/be/src/runtime/result_writer.cpp b/be/src/runtime/result_writer.cpp index 6d16348a34..b5537e486c 100644 --- a/be/src/runtime/result_writer.cpp +++ b/be/src/runtime/result_writer.cpp @@ -15,222 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "result_writer.h" - -#include "exprs/expr.h" -#include "runtime/primitive_type.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" -#include "runtime/result_buffer_mgr.h" -#include "runtime/buffer_control_block.h" -#include "util/mysql_row_buffer.h" -#include "util/types.h" -#include "util/date_func.h" - -#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/result_writer.h" namespace doris { -ResultWriter::ResultWriter( - BufferControlBlock* sinker, - const std::vector& output_expr_ctxs) : - _sinker(sinker), - _output_expr_ctxs(output_expr_ctxs), - _row_buffer(NULL) { -} - -ResultWriter::~ResultWriter() { - delete _row_buffer; -} - -Status ResultWriter::init(RuntimeState* state) { - if (NULL == _sinker) { - return Status::InternalError("sinker is NULL pointer."); - } - - _row_buffer = new(std::nothrow) MysqlRowBuffer(); - - if (NULL == _row_buffer) { - return Status::InternalError("no memory to alloc."); - } - - return Status::OK(); -} - -Status ResultWriter::add_one_row(TupleRow* row) { - _row_buffer->reset(); - int num_columns = _output_expr_ctxs.size(); - int buf_ret = 0; - - for (int i = 0; 0 == buf_ret && i < num_columns; ++i) { - void* item = _output_expr_ctxs[i]->get_value(row); - - if (NULL == item) { - buf_ret = _row_buffer->push_null(); - continue; - } - - switch (_output_expr_ctxs[i]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - buf_ret = _row_buffer->push_tinyint(*static_cast(item)); - break; - - case TYPE_SMALLINT: - buf_ret = _row_buffer->push_smallint(*static_cast(item)); - break; - - case TYPE_INT: - buf_ret = _row_buffer->push_int(*static_cast(item)); - break; - - case TYPE_BIGINT: - buf_ret = _row_buffer->push_bigint(*static_cast(item)); - break; - - case TYPE_LARGEINT: { - char buf[48]; - int len = 48; - char* v = LargeIntValue::to_string( - reinterpret_cast(item)->value, buf, &len); - buf_ret = _row_buffer->push_string(v, len); - break; - } - - case TYPE_FLOAT: - buf_ret = _row_buffer->push_float(*static_cast(item)); - break; - - case TYPE_DOUBLE: - buf_ret = _row_buffer->push_double(*static_cast(item)); - break; - - case TYPE_TIME: { - double time = *static_cast(item); - std::string time_str = time_str_from_double(time); - buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size()); - break; - } - - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const DateTimeValue* time_val = (const DateTimeValue*)(item); - // TODO(zhaochun), this function has core risk - char* pos = time_val->to_string(buf); - buf_ret = _row_buffer->push_string(buf, pos - buf - 1); - break; - } - - case TYPE_HLL: - case TYPE_OBJECT: { - buf_ret = _row_buffer->push_null(); - break; - } - - case TYPE_VARCHAR: - case TYPE_CHAR: { - const StringValue* string_val = (const StringValue*)(item); - - if (string_val->ptr == NULL) { - if (string_val->len == 0) { - // 0x01 is a magic num, not usefull actually, just for present "" - char* tmp_val = reinterpret_cast(0x01); - buf_ret = _row_buffer->push_string(tmp_val, string_val->len); - } else { - buf_ret = _row_buffer->push_null(); - } - } else { - buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len); - } - - break; - } - - case TYPE_DECIMAL: { - const DecimalValue* decimal_val = reinterpret_cast(item); - std::string decimal_str; - int output_scale = _output_expr_ctxs[i]->root()->output_scale(); - - if (output_scale > 0 && output_scale <= 30) { - decimal_str = decimal_val->to_string(output_scale); - } else { - decimal_str = decimal_val->to_string(); - } - - buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); - break; - } - - case TYPE_DECIMALV2: { - DecimalV2Value decimal_val(reinterpret_cast(item)->value); - std::string decimal_str; - int output_scale = _output_expr_ctxs[i]->root()->output_scale(); - - if (output_scale > 0 && output_scale <= 30) { - decimal_str = decimal_val.to_string(output_scale); - } else { - decimal_str = decimal_val.to_string(); - } - - buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); - break; - } - - default: - LOG(WARNING) << "can't convert this type to mysql type. type = " << - _output_expr_ctxs[i]->root()->type(); - buf_ret = -1; - break; - } - } - - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - return Status::OK(); -} - -Status ResultWriter::append_row_batch(RowBatch* batch) { - if (NULL == batch || 0 == batch->num_rows()) { - return Status::OK(); - } - - Status status; - // convert one batch - TFetchDataResult* result = new(std::nothrow) TFetchDataResult(); - int num_rows = batch->num_rows(); - result->result_batch.rows.resize(num_rows); - - for (int i = 0; status.ok() && i < num_rows; ++i) { - TupleRow* row = batch->get_row(i); - status = add_one_row(row); - - if (status.ok()) { - result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length()); - } else { - LOG(WARNING) << "convert row to mysql result failed."; - break; - } - } - - if (status.ok()) { - // push this batch to back - status = _sinker->add_batch(result); - - if (status.ok()) { - result = NULL; - } else { - LOG(WARNING) << "append result batch to sink failed."; - } - } - - delete result; - result = NULL; - - return status; -} +const std::string ResultWriter::NULL_IN_CSV = "\\N"; } diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index 6a70157fb1..a6b461f588 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -15,43 +15,36 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_RUNTIME_RESULT_WRITER_H -#define DORIS_BE_RUNTIME_RESULT_WRITER_H - -#include +#pragma once #include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" namespace doris { -class TupleRow; +class Status; class RowBatch; -class ExprContext; -class MysqlRowBuffer; -class BufferControlBlock; class RuntimeState; -//convert the row batch to mysql protol row +// abstract class of the result writer class ResultWriter { public: - ResultWriter(BufferControlBlock* sinker, const std::vector& output_expr_ctxs); - ~ResultWriter(); + ResultWriter() {}; + ~ResultWriter() {}; - Status init(RuntimeState* state); - // convert one row batch to mysql result and - // append this batch to the result sink - Status append_row_batch(RowBatch* batch); + virtual Status init(RuntimeState* state) = 0; + // convert and write one row batch + virtual Status append_row_batch(const RowBatch* batch) = 0; -private: - // convert one tuple row - Status add_one_row(TupleRow* row); + virtual Status close() = 0; - // The expressions that are run to create tuples to be written to hbase. - BufferControlBlock* _sinker; - const std::vector& _output_expr_ctxs; - MysqlRowBuffer* _row_buffer; + int64_t get_written_rows() const { return _written_rows; } + + static const std::string NULL_IN_CSV; + +protected: + int64_t _written_rows = 0; // number of rows written }; } -#endif diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index dd2471adf1..e2574b3129 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -60,5 +60,6 @@ ADD_BE_TEST(heartbeat_flags_test) ADD_BE_TEST(result_queue_mgr_test) ADD_BE_TEST(memory_scratch_sink_test) ADD_BE_TEST(external_scan_context_mgr_test) + ADD_BE_TEST(memory/chunk_allocator_test) ADD_BE_TEST(memory/system_allocator_test) diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 5efadc566c..db21dc81a0 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -106,6 +106,7 @@ module.exports = [ "colocation-join", "dynamic-partition", "export_manual", + "outfile", "privilege", "small-file-mgr", "sql-mode", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index e2d3bd570e..f6fa16d463 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -114,6 +114,7 @@ module.exports = [ "colocation-join", "dynamic-partition", "export-manual", + "outfile", "privilege", "segment-v2-usage", "small-file-mgr", diff --git a/docs/en/administrator-guide/outfile.md b/docs/en/administrator-guide/outfile.md new file mode 100644 index 0000000000..77a535d8e7 --- /dev/null +++ b/docs/en/administrator-guide/outfile.md @@ -0,0 +1,186 @@ +--- +{ + "title": "Export Query Result", + "language": "en" +} +--- + + + +# Export Query Result + +This document describes how to use the `SELECT INTO OUTFILE` command to export query results. + +## Syntax + +The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently, it only supports exporting to remote storage such as HDFS, S3, and BOS through the Broker process. The syntax is as follows: + +``` +query_stmt +INTO OUTFILE "file_path" +[format_as] +WITH BROKER `broker_name` +[broker_properties] +[other_properties] +``` + +* `file_path` + + `file_path` specify the file path and file name prefix. Like: `hdfs://path/to/my_file_`. + + The final file name will be assembled as `my_file_`, file seq no and the format suffix. File seq no starts from 0, determined by the number of split. + + ``` + my_file_0.csv + my_file_1.csv + my_file_2.csv + ``` + +* `[format_as]` + + ``` + FORMAT AS CSV + ``` + + Specify the export format. The default is CSV. + +* `[properties]` + + Specify the relevant attributes. Currently only export through Broker process is supported. Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker](./broker.html). + + ``` + PROPERTIES + ("broker.prop_key" = "broker.prop_val", ...) + ``` + + Other properties + + ``` + PROPERTIELS + ("key1" = "val1", "key2" = "val2", ...) + ``` + + currently supports the following properties: + + * `column_separator`: Column separator, only applicable to CSV format. The default is `\t`. + * `line_delimiter`: Line delimiter, only applicable to CSV format. The default is `\n`. + * `max_file_size`:The max size of a single file. Default is 1GB. Range from 5MB to 2GB. Files exceeding this size will be splitted. + +1. Example 1 + + Export simple query results to the file `hdfs:/path/to/result.txt`. Specify the export format as CSV. Use `my_broker` and set kerberos authentication information. Specify the column separator as `,` and the line delimiter as `\n`. + + ``` + SELECT * FROM tbl + INTO OUTFILE "hdfs:/path/to/result_" + FORMAT AS CSV + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.hadoop.security.authentication" = "kerberos", + "broker.kerberos_principal" = "doris@YOUR.COM", + "broker.kerberos_keytab" = "/home/doris/my.keytab" + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "100MB" + ); + ``` + + If the result is less than 100MB, file will be: `result_0.csv`. + + If larger than 100MB, may be: `result_0.csv, result_1.csv, ...`. + +2. Example 2 + + Export the query result of the CTE statement to the file `hdfs:/path/to/result.txt`. The default export format is CSV. Use `my_broker` and set hdfs high availability information. Use the default column separators and line delimiter. + + ``` + WITH + x1 AS + (SELECT k1, k2 FROM tbl1), + x2 AS + (SELECT k3 FROM tbl2) + SELEC k1 FROM x1 UNION SELECT k3 FROM x2 + INTO OUTFILE "hdfs:/path/to/result_" + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.username"="user", + "broker.password"="passwd", + "broker.dfs.nameservices" = "my_ha", + "broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ``` + + If the result is less than 1GB, file will be: `result_0.csv`. + + If larger than 1GB, may be: `result_0.csv, result_1.csv, ...`. + +3. Example 3 + + Export the query results of the UNION statement to the file `bos://bucket/result.parquet`. Specify the export format as PARQUET. Use `my_broker` and set hdfs high availability information. PARQUET format does not need to specify the column separator and line delimiter. + + ``` + SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 + INTO OUTFILE "bos://bucket/result_" + FORMAT AS PARQUET + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.bos_endpoint" = "http://bj.bcebos.com", + "broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", + "broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy" + ); + ``` + + If the result is less than 1GB, file will be: `result_0.parquet`. + + If larger than 1GB, may be: `result_0.parquet, result_1.parquet, ...`. + +## Return result + +The command is a synchronization command. The command returns, which means the operation is over. + +If it exports and returns normally, the result is as follows: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +``` + +`100000 row affected` Indicates the size of the exported result set. + +If the execution is incorrect, an error message will be returned, such as: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +``` + +## Notice + +* The CSV format does not support exporting binary types, such as BITMAP and HLL types. These types will be output as `\N`, which is null. +* The query results are exported from a single BE node and a single thread. Therefore, the export time and the export result set size are positively correlated. +* The export command does not check whether the file and file path exist. Whether the path will be automatically created or whether the existing file will be overwritten is entirely determined by the semantics of the remote storage system. +* If an error occurs during the export process, the exported file may remain on the remote storage system. Doris will not clean these files. The user needs to manually clean up. +* The timeout of the export command is the same as the timeout of the query. It can be set by `SET query_timeout = xxx`. +* For empty result query, there will be an empty file. +* File spliting will ensure that a row of data is stored in a single file. Therefore, the size of the file is not strictly equal to `max_file_size`. diff --git a/docs/zh-CN/administrator-guide/outfile.md b/docs/zh-CN/administrator-guide/outfile.md new file mode 100644 index 0000000000..7a11aca301 --- /dev/null +++ b/docs/zh-CN/administrator-guide/outfile.md @@ -0,0 +1,182 @@ +--- +{ + "title": "导出查询结果集", + "language": "zh-CN" +} +--- + + + +# 导出查询结果集 + +本文档介绍如何使用 `SELECT INTO OUTFILE` 命令进行查询结果的导出操作。 + +## 语法 + +`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前仅支持通过 Broker 进程导出到远端存储,如 HDFS,S3,BOS 上。语法如下 + +``` +query_stmt +INTO OUTFILE "file_path" +[format_as] +[properties] +``` + +* `file_path` + + `file_path` 指向文件存储的路径以及文件前缀。如 `hdfs://path/to/my_file_`。 + + 最终的文件名将由 `my_file_`,文件序号以及文件格式后缀组成。其中文件序号由0开始,数量为文件被分割的数量。如: + + ``` + my_file_0.csv + my_file_1.csv + my_file_2.csv + ``` + +* `[format_as]` + + ``` + FORMAT AS CSV + ``` + + 指定导出格式。默认为 CSV。 + + +* `[properties]` + + 指定相关属性。目前仅支持通过 Broker 进程进行导出。Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。 + + ``` + ("broker.prop_key" = "broker.prop_val", ...) + ``` + + 其他属性: + + ``` + ("key1" = "val1", "key2" = "val2", ...) + ``` + + 目前支持以下属性: + + * `column_separator`:列分隔符,仅对 CSV 格式适用。默认为 `\t`。 + * `line_delimiter`:行分隔符,仅对 CSV 格式适用。默认为 `\n`。 + * `max_file_size`:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。 + +1. 示例1 + + 将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。 + + ``` + SELECT * FROM tbl + INTO OUTFILE "hdfs:/path/to/result_" + FORMAT AS CSV + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.hadoop.security.authentication" = "kerberos", + "broker.kerberos_principal" = "doris@YOUR.COM", + "broker.kerberos_keytab" = "/home/doris/my.keytab" + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "100MB" + ); + ``` + + 最终生成文件如如果不大于 100MB,则为:`result_0.csv`。 + + 如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。 + +2. 示例2 + + 将 CTE 语句的查询结果导出到文件 `hdfs:/path/to/result.txt`。默认导出格式为 CSV。使用 `my_broker` 并设置 hdfs 高可用信息。使用默认的行列分隔符。 + + ``` + WITH + x1 AS + (SELECT k1, k2 FROM tbl1), + x2 AS + (SELECT k3 FROM tbl2) + SELEC k1 FROM x1 UNION SELECT k3 FROM x2 + INTO OUTFILE "hdfs:/path/to/result_" + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.username"="user", + "broker.password"="passwd", + "broker.dfs.nameservices" = "my_ha", + "broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ``` + + 最终生成文件如如果不大于 1GB,则为:`result_0.csv`。 + + 如果大于 1GB,则可能为 `result_0.csv, result_1.csv, ...`。 + +3. 示例3 + + 将 UNION 语句的查询结果导出到文件 `bos://bucket/result.txt`。指定导出格式为 PARQUET。使用 `my_broker` 并设置 hdfs 高可用信息。PARQUET 格式无需指定列分割符。 + + ``` + SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 + INTO OUTFILE "bos://bucket/result_" + FORMAT AS PARQUET + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.bos_endpoint" = "http://bj.bcebos.com", + "broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", + "broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy" + ); + ``` + + 最终生成文件如如果不大于 1GB,则为:`result_0.parquet`。 + + 如果大于 1GB,则可能为 `result_0.parquet, result_1.parquet, ...`。 + +## 返回结果 + +导出命令为同步命令。命令返回,即表示操作结束。 + +如果正常导出并返回,则结果如下: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +``` + +其中 `100000 row affected` 表示导出的结果集行数。 + +如果执行错误,则会返回错误信息,如: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +``` + +## 注意事项 + +* 查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。 +* 导出命令不会检查文件及文件路径是否存在。是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。 +* 如果在导出过程中出现错误,可能会有导出文件残留在远端存储系统上。Doris 不会清理这些文件。需要用户手动清理。 +* 导出命令的超时时间同查询的超时时间。可以通过 `SET query_timeout=xxx` 进行设置。 +* 对于结果集为空的查询,依然会产生一个大小为0的文件。 +* 文件切分会保证一行数据完整的存储在单一文件中。因此文件的大小并不严格等于 `max_file_size`。 diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 5c66fe3f6c..17806d0312 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -246,7 +246,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_LOCAL, KW_LOCATION, KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MINUTE, KW_MINUS, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_MONTH, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, - KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, + KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER, KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING, KW_PLUGIN, KW_PLUGINS, KW_PRIMARY, @@ -469,6 +469,8 @@ nonterminal Boolean opt_builtin; nonterminal Boolean opt_tmp; +nonterminal OutFileClause opt_outfile; + precedence left KW_FULL, KW_MERGE; precedence left DOT; precedence left SET_VAR; @@ -2465,7 +2467,7 @@ delete_stmt ::= // even if the union has order by and limit. // ORDER BY and LIMIT bind to the preceding select statement by default. query_stmt ::= - opt_with_clause:w set_operand_list:operands + opt_with_clause:w set_operand_list:operands opt_outfile:outfile {: QueryStmt queryStmt = null; if (operands.size() == 1) { @@ -2474,15 +2476,27 @@ query_stmt ::= queryStmt = new SetOperationStmt(operands, null, LimitElement.NO_LIMIT); } queryStmt.setWithClause(w); + queryStmt.setOutFileClause(outfile); RESULT = queryStmt; :} - | opt_with_clause:w set_operation_with_order_by_or_limit:set_operation + | opt_with_clause:w set_operation_with_order_by_or_limit:set_operation opt_outfile:outfile {: set_operation.setWithClause(w); + set_operation.setOutFileClause(outfile); RESULT = set_operation; :} ; +opt_outfile ::= + {: + RESULT = null; + :} + | KW_INTO KW_OUTFILE STRING_LITERAL:file opt_file_format:fileFormat opt_properties:properties + {: + RESULT = new OutFileClause(file, fileFormat, properties); + :} + ; + opt_with_clause ::= KW_WITH with_view_def_list:list {: RESULT = new WithClause(list); :} diff --git a/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java b/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java index 49049bf816..d78d8373f0 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java @@ -17,9 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -27,6 +24,11 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -141,5 +143,9 @@ public class BaseViewStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); + + if (viewDefStmt.hasOutFileClause()) { + throw new AnalysisException("Not support OUTFILE clause in CREATE VIEW statement"); + } } } diff --git a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java index a098ca69a9..8b3bac1cbc 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -19,6 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; import com.google.common.collect.Maps; @@ -78,4 +79,14 @@ public class BrokerDesc implements Writable { desc.readFields(in); return desc; } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append(" WITH BROKER ").append(name); + if (properties != null && !properties.isEmpty()) { + PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false); + sb.append(" (").append(printableMap.toString()).append(")"); + } + return sb.toString(); + } } diff --git a/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java new file mode 100644 index 0000000000..1d2a9057af --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.ParseUtil; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +// For syntax select * from tbl INTO OUTFILE xxxx +public class OutFileClause { + private static final Logger LOG = LogManager.getLogger(OutFileClause.class); + + private static final String BROKER_PROP_PREFIX = "broker."; + private static final String PROP_BROKER_NAME = "broker.name"; + private static final String PROP_COLUMN_SEPARATOR = "column_separator"; + private static final String PROP_LINE_DELIMITER = "line_delimiter"; + private static final String PROP_MAX_FILE_SIZE = "max_file_size"; + + private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB + private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB + private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB + + private String filePath; + private String format; + private Map properties; + + // set following members after analyzing + private String columnSeparator = "\t"; + private String lineDelimiter = "\n"; + private TFileFormatType fileFormatType; + private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES; + private BrokerDesc brokerDesc = null; + + public OutFileClause(String filePath, String format, Map properties) { + this.filePath = filePath; + this.format = Strings.isNullOrEmpty(format) ? "csv" : format.toLowerCase(); + this.properties = properties; + } + + public OutFileClause(OutFileClause other) { + this.filePath = other.filePath; + this.format = other.format; + this.properties = other.properties == null ? null : Maps.newHashMap(other.properties); + } + + public String getColumnSeparator() { + return columnSeparator; + } + + public String getLineDelimiter() { + return lineDelimiter; + } + + public TFileFormatType getFileFormatType() { + return fileFormatType; + } + + public long getMaxFileSizeBytes() { + return maxFileSizeBytes; + } + + public BrokerDesc getBrokerDesc() { + return brokerDesc; + } + + public void analyze(Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(filePath)) { + throw new AnalysisException("Must specify file in OUTFILE clause"); + } + + if (!format.equals("csv")) { + throw new AnalysisException("Only support CSV format"); + } + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + + analyzeProperties(); + + if (brokerDesc == null) { + throw new AnalysisException("Must specify BROKER properties in OUTFILE clause"); + } + } + + private void analyzeProperties() throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return; + } + + Set processedPropKeys = Sets.newHashSet(); + getBrokerProperties(processedPropKeys); + if (brokerDesc == null) { + return; + } + + if (properties.containsKey(PROP_COLUMN_SEPARATOR)) { + if (!isCsvFormat()) { + throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format"); + } + columnSeparator = properties.get(PROP_COLUMN_SEPARATOR); + processedPropKeys.add(PROP_COLUMN_SEPARATOR); + } + + if (properties.containsKey(PROP_LINE_DELIMITER)) { + if (!isCsvFormat()) { + throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format"); + } + lineDelimiter = properties.get(PROP_LINE_DELIMITER); + processedPropKeys.add(PROP_LINE_DELIMITER); + } + + if (properties.containsKey(PROP_MAX_FILE_SIZE)) { + maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE)); + if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { + throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); + } + processedPropKeys.add(PROP_MAX_FILE_SIZE); + } + + if (processedPropKeys.size() != properties.size()) { + LOG.debug("{} vs {}", processedPropKeys, properties); + throw new AnalysisException("Unknown properties: " + properties.keySet().stream() + .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); + } + } + + private void getBrokerProperties(Set processedPropKeys) { + if (!properties.containsKey(PROP_BROKER_NAME)) { + return; + } + String brokerName = properties.get(PROP_BROKER_NAME); + processedPropKeys.add(PROP_BROKER_NAME); + + Map brokerProps = Maps.newHashMap(); + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) { + brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue()); + processedPropKeys.add(entry.getKey()); + } + } + + brokerDesc = new BrokerDesc(brokerName, brokerProps); + } + + private boolean isCsvFormat() { + return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 + || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE + || fileFormatType == TFileFormatType.FORMAT_CSV_GZ + || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME + || fileFormatType == TFileFormatType.FORMAT_CSV_LZO + || fileFormatType == TFileFormatType.FORMAT_CSV_LZOP + || fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN; + } + + @Override + public OutFileClause clone() { + return new OutFileClause(this); + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ").append(format); + if (properties != null && !properties.isEmpty()) { + sb.append(" PROPERTIES("); + sb.append(new PrintableMap<>(properties, " = ", true, false)); + sb.append(")"); + } + return sb.toString(); + } + + public TResultFileSinkOptions toSinkOptions() { + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, fileFormatType); + if (isCsvFormat()) { + sinkOptions.setColumn_separator(columnSeparator); + sinkOptions.setLine_delimiter(lineDelimiter); + } + sinkOptions.setMax_file_size_bytes(maxFileSizeBytes); + if (brokerDesc != null) { + sinkOptions.setBroker_properties(brokerDesc.getProperties()); + // broker_addresses of sinkOptions will be set in Coordinator. + // Because we need to choose the nearest broker with the result sink node. + } + return sinkOptions; + } +} + + diff --git a/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java b/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java index e6f0bfb0a7..4da74e12d5 100644 --- a/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java @@ -23,12 +23,12 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -110,6 +110,9 @@ public abstract class QueryStmt extends StatementBase { ///////////////////////////////////////// // END: Members that need to be reset() + // represent the "INTO OUTFILE" clause + protected OutFileClause outFileClause; + QueryStmt(ArrayList orderByElements, LimitElement limitElement) { this.orderByElements = orderByElements; this.limitElement = limitElement; @@ -124,6 +127,7 @@ public abstract class QueryStmt extends StatementBase { super.analyze(analyzer); analyzeLimit(analyzer); if (hasWithClause()) withClause_.analyze(analyzer); + if (hasOutFileClause()) outFileClause.analyze(analyzer); } private void analyzeLimit(Analyzer analyzer) throws AnalysisException { @@ -553,12 +557,17 @@ public abstract class QueryStmt extends StatementBase { return withClause_ != null ? withClause_.clone() : null; } + public OutFileClause cloneOutfileCluse() { + return outFileClause != null ? outFileClause.clone() : null; + } + /** * C'tor for cloning. */ protected QueryStmt(QueryStmt other) { super(other); withClause_ = other.cloneWithClause(); + outFileClause = other.cloneOutfileCluse(); orderByElements = other.cloneOrderByElements(); limitElement = other.limitElement.clone(); resultExprs = Expr.cloneList(other.resultExprs); @@ -597,4 +606,16 @@ public abstract class QueryStmt extends StatementBase { public abstract void substituteSelectList(Analyzer analyzer, List newColLabels) throws AnalysisException, UserException; + + public void setOutFileClause(OutFileClause outFileClause) { + this.outFileClause = outFileClause; + } + + public OutFileClause getOutFileClause() { + return outFileClause; + } + + public boolean hasOutFileClause() { + return outFileClause != null; + } } diff --git a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java index 122c5e5d94..98f21029e8 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1540,6 +1540,10 @@ public class SelectStmt extends QueryStmt { if (hasLimitClause()) { strBuilder.append(limitElement.toSql()); } + + if (hasOutFileClause()) { + strBuilder.append(outFileClause.toSql()); + } return strBuilder.toString(); } diff --git a/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java index e51aeede69..ed3c4947fe 100644 --- a/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -18,7 +18,6 @@ package org.apache.doris.common.util; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.UserException; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -42,7 +41,7 @@ public class ParseUtil { private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); - public static long analyzeDataVolumn(String dataVolumnStr) throws UserException { + public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException { long dataVolumn = 0; Matcher m = dataVolumnPattern.matcher(dataVolumnStr); if (m.matches()) { diff --git a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java index 8131b81982..ecff1a5401 100644 --- a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -17,19 +17,22 @@ package org.apache.doris.planner; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.common.UserException; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.TreeNode; +import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TResultSinkType; + import com.google.common.base.Preconditions; -import org.apache.logging.log4j.Logger; + +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -174,8 +177,8 @@ public class PlanFragment extends TreeNode { } // add ResultSink Preconditions.checkState(sink == null); - // we're streaming to an exchange node - ResultSink bufferSink = new ResultSink(planRoot.getId()); + // we're streaming to an result sink + ResultSink bufferSink = new ResultSink(planRoot.getId(), TResultSinkType.MYSQL_PROTOCAL); sink = bufferSink; } } diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index 0e82bd4bb3..1b283ec54a 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -195,9 +195,10 @@ public class Planner { for (PlanFragment fragment : fragments) { fragment.finalize(analyzer, !queryOptions.allow_unsupported_formats); } - Collections.reverse(fragments); + setOutfileSink(queryStmt); + if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { @@ -210,6 +211,21 @@ public class Planner { } } + // if query stmt has OUTFILE clause, set info into ResultSink. + // this should be done after fragments are generated. + private void setOutfileSink(QueryStmt queryStmt) { + if (!queryStmt.hasOutFileClause()) { + return; + } + PlanFragment topFragment = fragments.get(0); + if (!(topFragment.getSink() instanceof ResultSink)) { + return; + } + + ResultSink resultSink = (ResultSink) topFragment.getSink(); + resultSink.setOutfileInfo(queryStmt.getOutFileClause()); + } + /** * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise * returns root unchanged. diff --git a/fe/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/src/main/java/org/apache/doris/planner/ResultSink.java index e71242e9b4..688c2cbe8b 100644 --- a/fe/src/main/java/org/apache/doris/planner/ResultSink.java +++ b/fe/src/main/java/org/apache/doris/planner/ResultSink.java @@ -17,19 +17,33 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.OutFileClause; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TResultFileSinkOptions; import org.apache.doris.thrift.TResultSink; +import org.apache.doris.thrift.TResultSinkType; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; /** - * Data sink that forwards data to an exchange node. + * Result sink that forwards data to + * 1. the FE data receiver, which result the final query result to user client. Or, + * 2. files that save the result data */ public class ResultSink extends DataSink { private final PlanNodeId exchNodeId; + private TResultSinkType sinkType; + private String brokerName; + private TResultFileSinkOptions fileSinkOptions; - public ResultSink(PlanNodeId exchNodeId) { + public ResultSink(PlanNodeId exchNodeId, TResultSinkType sinkType) { this.exchNodeId = exchNodeId; + this.sinkType = sinkType; } @Override @@ -43,6 +57,10 @@ public class ResultSink extends DataSink { protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK); TResultSink tResultSink = new TResultSink(); + tResultSink.setType(sinkType); + if (fileSinkOptions != null) { + tResultSink.setFile_options(fileSinkOptions); + } result.setResult_sink(tResultSink); return result; } @@ -56,6 +74,27 @@ public class ResultSink extends DataSink { public DataPartition getOutputPartition() { return null; } -} -/* vim: set ts=4 sw=4 sts=4 tw=100 noet: */ + public boolean isOutputFileSink() { + return sinkType == TResultSinkType.FILE; + } + + public boolean needBroker() { + return !Strings.isNullOrEmpty(brokerName); + } + + public String getBrokerName() { + return brokerName; + } + + public void setOutfileInfo(OutFileClause outFileClause) { + sinkType = TResultSinkType.FILE; + fileSinkOptions = outFileClause.toSinkOptions(); + brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName(); + } + + public void setBrokerAddr(String ip, int port) { + Preconditions.checkNotNull(fileSinkOptions); + fileSinkOptions.setBroker_addresses(Lists.newArrayList(new TNetworkAddress(ip, port))); + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index ac123b646d..36f9a0d557 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; @@ -401,11 +402,21 @@ public class Coordinator { PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); if (topParams.fragment.getSink() instanceof ResultSink) { + TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver( topParams.instanceExecParams.get(0).instanceId, - addressToBackendID.get(topParams.instanceExecParams.get(0).host), - toBrpcHost(topParams.instanceExecParams.get(0).host), + addressToBackendID.get(execBeAddr), + toBrpcHost(execBeAddr), queryOptions.query_timeout * 1000); + + // set the broker address for OUTFILE sink + ResultSink resultSink = (ResultSink) topParams.fragment.getSink(); + if (resultSink.isOutputFileSink() && resultSink.needBroker()) { + FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(resultSink.getBrokerName(), + execBeAddr.getHostname()); + resultSink.setBrokerAddr(broker.ip, broker.port); + } + } else { // This is a load process. this.queryOptions.setIs_report_success(true); diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 9baea6eca4..90133e2458 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -578,12 +578,23 @@ public class StmtExecutor { // so We need to send fields after first batch arrived // send result + // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, + // We will not send real query result to client. Instead, we only send OK to client with + // number of rows selected. For example: + // mysql> select * from tbl1 into outfile xxx; + // Query OK, 10 rows affected (0.01 sec) + // + // 2. If this is a query, send the result expr fields first, and send result data back to client. RowBatch batch; MysqlChannel channel = context.getMysqlChannel(); - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + boolean isOutfileQuery = queryStmt.hasOutFileClause(); + if (!isOutfileQuery) { + sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + } while (true) { batch = coord.getNext(); - if (batch.getBatch() != null) { + // for outfile query, there will be only one empty batch send back with eos flag + if (batch.getBatch() != null && !isOutfileQuery) { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } @@ -595,7 +606,11 @@ public class StmtExecutor { } statisticsForAuditLog = batch.getQueryStatistics(); - context.getState().setEof(); + if (!isOutfileQuery) { + context.getState().setEof(); + } else { + context.getState().setOk(statisticsForAuditLog.returned_rows, 0, ""); + } } // Process a select statement. @@ -612,6 +627,10 @@ public class StmtExecutor { insertStmt = (InsertStmt) parsedStmt; } + if (insertStmt.getQueryStmt().hasOutFileClause()) { + throw new DdlException("Not support OUTFILE clause in INSERT statement"); + } + // assign query id before explain query return UUID uuid = insertStmt.getUUID(); context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -913,3 +932,4 @@ public class StmtExecutor { return statisticsForAuditLog; } } + diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index ec53beaef0..e6d4ea0a1a 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -264,6 +264,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("or", new Integer(SqlParserSymbols.KW_OR)); keywordMap.put("order", new Integer(SqlParserSymbols.KW_ORDER)); keywordMap.put("outer", new Integer(SqlParserSymbols.KW_OUTER)); + keywordMap.put("outfile", new Integer(SqlParserSymbols.KW_OUTFILE)); keywordMap.put("over", new Integer(SqlParserSymbols.KW_OVER)); keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION)); keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS)); diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index ef13a40598..09af10c3c1 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto"; message PQueryStatistics { optional int64 scan_rows = 1; optional int64 scan_bytes = 2; + optional int64 returned_rows = 3; } message PRowBatch { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 2792a07276..343cbbb7ca 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -22,6 +22,7 @@ include "Exprs.thrift" include "Types.thrift" include "Descriptors.thrift" include "Partitions.thrift" +include "PlanNodes.thrift" enum TDataSinkType { DATA_STREAM_SINK, @@ -33,6 +34,21 @@ enum TDataSinkType { MEMORY_SCRATCH_SINK } +enum TResultSinkType { + MYSQL_PROTOCAL, + FILE +} + +struct TResultFileSinkOptions { + 1: required string file_path + 2: required PlanNodes.TFileFormatType file_format + 3: optional string column_separator // only for csv + 4: optional string line_delimiter // only for csv + 5: optional i64 max_file_size_bytes + 6: optional list broker_addresses; // only for remote file + 7: optional map broker_properties // only for remote file +} + struct TMemoryScratchSink { } @@ -52,8 +68,9 @@ struct TDataStreamSink { 3: optional bool ignore_not_found } -// Reserved for struct TResultSink { + 1: optional TResultSinkType type; + 2: optional TResultFileSinkOptions file_options; } struct TMysqlTableSink {