[feature](table-valued-function) Support S3 tvf (#13959)

This pr does three things:

1. Modified the framework of table-valued-function(tvf).
2. be support `fetch_table_schema` rpc.
3. Implemented `S3(path, AK, SK, format)` table-valued-function.
This commit is contained in:
Tiewei Fang
2022-11-06 11:04:26 +08:00
committed by GitHub
parent fb5a3e118a
commit 27549564a7
33 changed files with 1156 additions and 266 deletions

View File

@ -69,6 +69,7 @@
#include "vec/exec/vanalytic_eval_node.h"
#include "vec/exec/vassert_num_rows_node.h"
#include "vec/exec/vbroker_scan_node.h"
#include "vec/exec/vdata_gen_scan_node.h"
#include "vec/exec/vempty_set_node.h"
#include "vec/exec/ves_http_scan_node.h"
#include "vec/exec/vexcept_node.h"
@ -84,7 +85,6 @@
#include "vec/exec/vselect_node.h"
#include "vec/exec/vsort_node.h"
#include "vec/exec/vtable_function_node.h"
#include "vec/exec/vtable_valued_function_scannode.h"
#include "vec/exec/vunion_node.h"
#include "vec/exprs/vexpr.h"
@ -417,7 +417,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::REPEAT_NODE:
case TPlanNodeType::TABLE_FUNCTION_NODE:
case TPlanNodeType::BROKER_SCAN_NODE:
case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE:
case TPlanNodeType::DATA_GEN_SCAN_NODE:
case TPlanNodeType::FILE_SCAN_NODE:
case TPlanNodeType::JDBC_SCAN_NODE:
break;
@ -650,9 +650,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
}
return Status::OK();
case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE:
case TPlanNodeType::DATA_GEN_SCAN_NODE:
if (state->enable_vectorized_exec()) {
*node = pool->add(new vectorized::VTableValuedFunctionScanNode(pool, tnode, descs));
*node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs));
return Status::OK();
} else {
error_msg << "numbers table function only support vectorized execution";
@ -721,7 +721,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes);
}

View File

@ -748,8 +748,8 @@ bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE ||
type == TPlanNodeType::BROKER_SCAN_NODE || type == TPlanNodeType::ES_SCAN_NODE ||
type == TPlanNodeType::ES_HTTP_SCAN_NODE || type == TPlanNodeType::ODBC_SCAN_NODE ||
type == TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE ||
type == TPlanNodeType::FILE_SCAN_NODE || type == TPlanNodeType::JDBC_SCAN_NODE;
type == TPlanNodeType::DATA_GEN_SCAN_NODE || type == TPlanNodeType::FILE_SCAN_NODE ||
type == TPlanNodeType::JDBC_SCAN_NODE;
}
Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason,

View File

@ -44,11 +44,14 @@
#include "util/md5.h"
#include "util/proto_util.h"
#include "util/ref_count_closure.h"
#include "util/s3_uri.h"
#include "util/string_util.h"
#include "util/telemetry/brpc_carrier.h"
#include "util/telemetry/telemetry.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris {
@ -409,6 +412,77 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller,
const PFetchTableSchemaRequest* request,
PFetchTableSchemaResult* result,
google::protobuf::Closure* done) {
VLOG_RPC << "fetch table schema";
brpc::ClosureGuard closure_guard(done);
TFileScanRange file_scan_range;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data());
uint32_t len = request->file_scan_range().size();
st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg();
st.to_protobuf(result->mutable_status());
return;
}
}
if (file_scan_range.__isset.ranges == false) {
st = Status::InternalError("can not get TFileRangeDesc.");
st.to_protobuf(result->mutable_status());
return;
}
if (file_scan_range.__isset.params == false) {
st = Status::InternalError("can not get TFileScanRangeParams.");
st.to_protobuf(result->mutable_status());
return;
}
const TFileRangeDesc& range = file_scan_range.ranges.at(0);
const TFileScanRangeParams& params = file_scan_range.params;
// file_slots is no use
std::vector<SlotDescriptor*> file_slots;
std::unique_ptr<vectorized::GenericReader> reader(nullptr);
std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema"));
switch (params.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE: {
reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots));
break;
}
default:
st = Status::InternalError("Not supported file format in fetch table schema: {}",
params.format_type);
st.to_protobuf(result->mutable_status());
return;
}
std::unordered_map<std::string, TypeDescriptor> name_to_col_type;
std::vector<std::string> col_names;
std::vector<TypeDescriptor> col_types;
st = reader->get_parsered_schema(&col_names, &col_types);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg();
st.to_protobuf(result->mutable_status());
return;
}
result->set_column_nums(col_names.size());
for (size_t idx = 0; idx < col_names.size(); ++idx) {
result->add_column_names(col_names[idx]);
}
for (size_t idx = 0; idx < col_types.size(); ++idx) {
PTypeDesc* type_desc = result->add_column_types();
col_types[idx].to_protobuf(type_desc);
}
st.to_protobuf(result->mutable_status());
}
void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
const PProxyRequest* request, PProxyResult* response,
google::protobuf::Closure* done) {

View File

@ -68,6 +68,11 @@ public:
void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request,
PFetchDataResult* result, google::protobuf::Closure* done) override;
void fetch_table_schema(google::protobuf::RpcController* controller,
const PFetchTableSchemaRequest* request,
PFetchTableSchemaResult* result,
google::protobuf::Closure* done) override;
void tablet_writer_open(google::protobuf::RpcController* controller,
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,

View File

@ -116,8 +116,8 @@ set(VEC_FILES
exec/vparquet_scanner.cpp
exec/vorc_scanner.cpp
exec/join/vhash_join_node.cpp
exec/tablefunction/vnumbers_tbf.cpp
exec/vtable_valued_function_scannode.cpp
exec/data_gen_functions/vnumbers_tvf.cpp
exec/vdata_gen_scan_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
exprs/vexpr.cpp

View File

@ -30,12 +30,12 @@ class Status;
namespace vectorized {
class VTableValuedFunctionInf {
class VDataGenFunctionInf {
public:
VTableValuedFunctionInf(TupleId tuple_id, const TupleDescriptor* tuple_desc)
VDataGenFunctionInf(TupleId tuple_id, const TupleDescriptor* tuple_desc)
: _tuple_id(tuple_id), _tuple_desc(tuple_desc) {}
virtual ~VTableValuedFunctionInf() = default;
virtual ~VDataGenFunctionInf() = default;
// Should set function parameters in this method
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0;

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/tablefunction/vnumbers_tbf.h"
#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
#include <sstream>
@ -29,10 +29,10 @@
namespace doris::vectorized {
VNumbersTBF::VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc)
: VTableValuedFunctionInf(tuple_id, tuple_desc) {}
VNumbersTVF::VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc)
: VDataGenFunctionInf(tuple_id, tuple_desc) {}
Status VNumbersTBF::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
Status VNumbersTVF::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
bool mem_reuse = block->mem_reuse();
DCHECK(block->rows() == 0);
std::vector<vectorized::MutableColumnPtr> columns(_slot_num);
@ -75,8 +75,9 @@ Status VNumbersTBF::get_next(RuntimeState* state, vectorized::Block* block, bool
return Status::OK();
}
Status VNumbersTBF::set_scan_ranges(const std::vector<TScanRangeParams>& scan_range_params) {
_total_numbers = scan_range_params[0].scan_range.tvf_scan_range.numbers_params.totalNumbers;
Status VNumbersTVF::set_scan_ranges(const std::vector<TScanRangeParams>& scan_range_params) {
_total_numbers =
scan_range_params[0].scan_range.data_gen_scan_range.numbers_params.totalNumbers;
return Status::OK();
}

View File

@ -20,7 +20,7 @@
#include <memory>
#include "runtime/descriptors.h"
#include "vec/exec/tablefunction/vtable_valued_function_inf.h"
#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
namespace doris {
@ -33,10 +33,10 @@ class Status;
namespace vectorized {
class VNumbersTBF : public VTableValuedFunctionInf {
class VNumbersTVF : public VDataGenFunctionInf {
public:
VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc);
~VNumbersTBF() = default;
VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc);
~VNumbersTVF() = default;
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;

View File

@ -59,6 +59,23 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
_split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
}
CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs)
: _state(nullptr),
_profile(profile),
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
_line_reader(nullptr),
_line_reader_eof(false),
_text_converter(nullptr),
_decompressor(nullptr) {
_file_format_type = _params.format_type;
_file_compress_type = _params.compress_type;
_size = _range.size;
}
CsvReader::~CsvReader() {}
Status CsvReader::init_reader(bool is_load) {
@ -185,6 +202,32 @@ Status CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
return Status::OK();
}
Status CsvReader::get_parsered_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
size_t read_line = 0;
bool is_parse_name = false;
RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));
if (read_line == 1) {
if (!is_parse_name) { //parse csv file without names and types
size_t col_nums = 0;
RETURN_IF_ERROR(_parse_col_nums(&col_nums));
for (size_t i = 0; i < col_nums; ++i) {
col_names->emplace_back("c" + std::to_string(i + 1));
}
} else { // parse csv file with names
RETURN_IF_ERROR(_parse_col_names(col_names));
}
for (size_t j = 0; j < col_names->size(); ++j) {
col_types->emplace_back(TypeDescriptor::create_string_type());
}
} else { // parse csv file without names and types
RETURN_IF_ERROR(_parse_col_names(col_names));
RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
}
return Status::OK();
}
Status CsvReader::_create_decompressor() {
CompressType compress_type;
if (_file_compress_type != TFileCompressType::UNKNOWN) {
@ -362,7 +405,8 @@ void CsvReader::_split_line(const Slice& line) {
// Match a separator
non_space = curpos;
// Trim tailing spaces. Be consistent with hive and trino's behavior.
if (_state->trim_tailing_spaces_for_external_table_query()) {
if (_state != nullptr &&
_state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
@ -378,7 +422,7 @@ void CsvReader::_split_line(const Slice& line) {
CHECK(curpos == line.size) << curpos << " vs " << line.size;
non_space = curpos;
if (_state->trim_tailing_spaces_for_external_table_query()) {
if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
@ -422,4 +466,104 @@ bool CsvReader::_is_array(const Slice& slice) {
return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 1] == ']';
}
Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
int64_t start_offset = _range.start_offset;
if (start_offset != 0) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_STREAM ||
_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError(
"Getting parsered schema from csv file do not support stream load and broker "
"load.");
}
// csv file without names line and types line.
*read_line = 1;
*is_parse_name = false;
if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
_params.file_attributes.header_type.size() > 0) {
std::string header_type = to_lower(_params.file_attributes.header_type);
if (header_type == BeConsts::CSV_WITH_NAMES) {
*is_parse_name = true;
} else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
*read_line = 2;
*is_parse_name = true;
}
}
// create and open file reader
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, _range.path, start_offset,
_range.file_size, 0, _file_reader));
RETURN_IF_ERROR(_file_reader->open());
if (_file_reader->size() == 0) {
return Status::EndOfFile("Empty File");
}
// get column_separator and line_delimiter
_value_separator = _params.file_attributes.text_params.column_separator;
_value_separator_length = _value_separator.size();
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());
_line_reader.reset(new PlainTextLineReader(_profile, _file_reader.get(), _decompressor.get(),
_size, _line_delimiter, _line_delimiter_length));
return Status::OK();
}
Status CsvReader::_parse_col_nums(size_t* col_nums) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof));
if (size == 0) {
return Status::InternalError("The first line is empty, can not parse column numbers");
}
if (!validate_utf8(const_cast<char*>(reinterpret_cast<const char*>(ptr)), size)) {
return Status::InternalError("Only support csv data in utf8 codec");
}
_split_line(Slice(ptr, size));
*col_nums = _split_values.size();
return Status::OK();
}
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
const uint8_t* ptr = nullptr;
size_t size = 0;
// no use of _line_reader_eof
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof));
if (size == 0) {
return Status::InternalError("The first line is empty, can not parse column names");
}
if (!validate_utf8(const_cast<char*>(reinterpret_cast<const char*>(ptr)), size)) {
return Status::InternalError("Only support csv data in utf8 codec");
}
_split_line(Slice(ptr, size));
for (size_t idx = 0; idx < _split_values.size(); ++idx) {
col_names->emplace_back(_split_values[idx].to_string());
}
return Status::OK();
}
// TODO(ftw): parse type
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* col_types) {
// delete after.
for (size_t i = 0; i < col_nums; ++i) {
col_types->emplace_back(TypeDescriptor::create_string_type());
}
// 1. check _line_reader_eof
// 2. read line
// 3. check utf8
// 4. check size
// 5. check _split_values.size must equal to col_nums.
// 6. fill col_types
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -34,6 +34,9 @@ public:
CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs);
CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs);
~CsvReader() override;
Status init_reader(bool is_query);
@ -41,7 +44,16 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
// get schema of csv file from first one line or first two lines.
// if file format is FORMAT_CSV_DEFLATE and if
// 1. header_type is empty, get schema from first line.
// 2. header_type is CSV_WITH_NAMES, get schema from first line.
// 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
Status get_parsered_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
private:
// used for stream/broker load of csv file.
Status _create_decompressor();
Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
@ -50,6 +62,13 @@ private:
bool _is_null(const Slice& slice);
bool _is_array(const Slice& slice);
// used for parse table schema of csv file.
Status _prepare_parse(size_t* read_line, bool* is_parse_name);
Status _parse_col_nums(size_t* col_nums);
Status _parse_col_names(std::vector<std::string>* col_names);
// TODO(ftw): parse type
Status _parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* col_types);
private:
RuntimeState* _state;
RuntimeProfile* _profile;

View File

@ -37,6 +37,11 @@ public:
std::unordered_set<std::string>* missing_cols) {
return Status::NotSupported("get_columns is not implemented");
}
virtual Status get_parsered_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
return Status::NotSupported("get_parser_schema is not implemented for this reader.");
}
virtual ~GenericReader() = default;
};

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/vtable_valued_function_scannode.h"
#include "vec/exec/vdata_gen_scan_node.h"
#include <sstream>
@ -26,28 +26,28 @@
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "util/runtime_profile.h"
#include "vec/exec/tablefunction/vnumbers_tbf.h"
#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
namespace doris::vectorized {
VTableValuedFunctionScanNode::VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
VDataGenFunctionScanNode::VDataGenFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
_is_init(false),
_tuple_id(tnode.table_valued_func_scan_node.tuple_id),
_tuple_id(tnode.data_gen_scan_node.tuple_id),
_tuple_desc(nullptr) {
// set _table_func here
switch (tnode.table_valued_func_scan_node.func_name) {
case TTVFunctionName::NUMBERS:
_table_func = std::make_shared<VNumbersTBF>(_tuple_id, _tuple_desc);
switch (tnode.data_gen_scan_node.func_name) {
case TDataGenFunctionName::NUMBERS:
_table_func = std::make_shared<VNumbersTVF>(_tuple_id, _tuple_desc);
break;
default:
LOG(FATAL) << "Unsupported function type";
}
}
Status VTableValuedFunctionScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VTableValuedFunctionScanNode::Prepare";
Status VDataGenFunctionScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VDataGenFunctionScanNode::Prepare";
if (_is_init) {
return Status::OK();
@ -70,7 +70,7 @@ Status VTableValuedFunctionScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
Status VTableValuedFunctionScanNode::open(RuntimeState* state) {
Status VDataGenFunctionScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
if (nullptr == state) {
@ -86,13 +86,13 @@ Status VTableValuedFunctionScanNode::open(RuntimeState* state) {
return Status::OK();
}
Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
LOG(FATAL) << "VTableValuedFunctionScanNode only support vectorized execution";
Status VDataGenFunctionScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
LOG(FATAL) << "VDataGenFunctionScanNode only support vectorized execution";
return Status::OK();
}
Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block,
bool* eos) {
Status VDataGenFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block,
bool* eos) {
if (state == nullptr || block == nullptr || eos == nullptr) {
return Status::InternalError("input is NULL pointer");
}
@ -103,7 +103,7 @@ Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, vectorized::B
return res;
}
Status VTableValuedFunctionScanNode::close(RuntimeState* state) {
Status VDataGenFunctionScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
@ -113,8 +113,7 @@ Status VTableValuedFunctionScanNode::close(RuntimeState* state) {
return ExecNode::close(state);
}
Status VTableValuedFunctionScanNode::set_scan_ranges(
const std::vector<TScanRangeParams>& scan_ranges) {
Status VDataGenFunctionScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
return _table_func->set_scan_ranges(scan_ranges);
}

View File

@ -21,7 +21,7 @@
#include "exec/scan_node.h"
#include "runtime/descriptors.h"
#include "vec/exec/tablefunction/vtable_valued_function_inf.h"
#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
namespace doris {
@ -34,11 +34,10 @@ class Status;
namespace vectorized {
class VTableValuedFunctionScanNode : public ScanNode {
class VDataGenFunctionScanNode : public ScanNode {
public:
VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
~VTableValuedFunctionScanNode() override = default;
VDataGenFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~VDataGenFunctionScanNode() override = default;
// initialize _mysql_scanner, and create _text_converter.
Status prepare(RuntimeState* state) override;
@ -59,7 +58,7 @@ public:
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
protected:
std::shared_ptr<VTableValuedFunctionInf> _table_func;
std::shared_ptr<VDataGenFunctionInf> _table_func;
bool _is_init;
// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;

View File

@ -19,18 +19,20 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.tablefunction.TableValuedFunctionInf;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.List;
public class TableValuedFunctionRef extends TableRef {
private Table table;
private TableValuedFunctionInf tableFunction;
private TableValuedFunctionIf tableFunction;
public TableValuedFunctionRef(String funcName, String alias, List<String> params) throws UserException {
super(new TableName(null, null, "_table_valued_function_" + funcName), alias);
this.tableFunction = TableValuedFunctionInf.getTableFunction(funcName, params);
this.tableFunction = TableValuedFunctionIf.getTableFunction(funcName, params);
if (hasExplicitAlias()) {
return;
}
@ -70,7 +72,11 @@ public class TableValuedFunctionRef extends TableRef {
analyzeJoin(analyzer);
}
public TableValuedFunctionInf getTableFunction() {
public ScanNode getScanNode(PlanNodeId id) {
return tableFunction.getScanNode(id, desc);
}
public TableValuedFunctionIf getTableFunction() {
return tableFunction;
}

View File

@ -17,12 +17,21 @@
package org.apache.doris.catalog;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.List;
public class FunctionGenTable extends Table {
private TableValuedFunctionIf tvf;
public FunctionGenTable(long id, String tableName, TableType type, List<Column> fullSchema) {
public FunctionGenTable(long id, String tableName, TableType type, List<Column> fullSchema,
TableValuedFunctionIf tvf) {
super(id, tableName, type, fullSchema);
this.tvf = tvf;
}
public TableValuedFunctionIf getTvf() {
return tvf;
}
}

View File

@ -36,8 +36,8 @@ import java.util.Set;
*/
public class S3URI {
private static final String SCHEME_DELIM = "://";
private static final String PATH_DELIM = "/";
public static final String SCHEME_DELIM = "://";
public static final String PATH_DELIM = "/";
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";
private static final Set<String> VALID_SCHEMES = ImmutableSet.of("http", "https", "s3", "s3a", "s3n", "bos");

View File

@ -22,14 +22,14 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.TableValuedFunctionInf;
import org.apache.doris.tablefunction.DataGenTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionTask;
import org.apache.doris.thrift.TDataGenScanNode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableValuedFunctionScanNode;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@ -38,17 +38,17 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* This scan node is used for table valued function.
* This scan node is used for data source generated from memory.
*/
public class TableValuedFunctionScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(TableValuedFunctionScanNode.class.getName());
public class DataGenScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(DataGenScanNode.class.getName());
private List<TScanRangeLocations> shardScanRanges;
private TableValuedFunctionInf tvf;
private DataGenTableValuedFunction tvf;
private boolean isFinalized = false;
public TableValuedFunctionScanNode(PlanNodeId id, TupleDescriptor desc,
String planNodeName, TableValuedFunctionInf tvf) {
public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
String planNodeName, DataGenTableValuedFunction tvf) {
super(id, desc, planNodeName, StatisticalType.TABLE_VALUED_FUNCTION_NODE);
this.tvf = tvf;
}
@ -85,11 +85,11 @@ public class TableValuedFunctionScanNode extends ScanNode {
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.TABLE_VALUED_FUNCTION_SCAN_NODE;
TTableValuedFunctionScanNode tvfScanNode = new TTableValuedFunctionScanNode();
tvfScanNode.setTupleId(desc.getId().asInt());
tvfScanNode.setFuncName(tvf.getFuncName());
msg.table_valued_func_scan_node = tvfScanNode;
msg.node_type = TPlanNodeType.DATA_GEN_SCAN_NODE;
TDataGenScanNode dataGenScanNode = new TDataGenScanNode();
dataGenScanNode.setTupleId(desc.getId().asInt());
dataGenScanNode.setFuncName(tvf.getDataGenFunctionName());
msg.data_gen_scan_node = dataGenScanNode;
}
private List<TScanRangeLocations> getShardLocations() throws AnalysisException {

View File

@ -275,7 +275,7 @@ public class DistributedPlanner {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM);
} else if (node instanceof TableValuedFunctionScanNode) {
} else if (node instanceof DataGenScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM);
} else if (node instanceof OlapScanNode) {
// olap scan node

View File

@ -1935,8 +1935,7 @@ public class SingleNodePlanner {
scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (JdbcTable) tblRef.getTable());
break;
case TABLE_VALUED_FUNCTION:
scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
"TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction());
scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId());
break;
case HMS_EXTERNAL_TABLE:
scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc());

View File

@ -32,6 +32,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
@ -44,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
@ -162,30 +164,13 @@ public class ExternalFileScanNode extends ExternalScanNode {
switch (type) {
case QUERY:
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
Preconditions.checkNotNull(hmsTable);
if (hmsTable.isView()) {
throw new AnalysisException(
String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(),
hmsTable.getDbName(), hmsTable.getName()));
if (this.desc.getTable() instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) this.desc.getTable();
initHMSExternalTable(hmsTable);
} else if (this.desc.getTable() instanceof FunctionGenTable) {
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf());
}
FileScanProviderIf scanProvider;
switch (hmsTable.getDlaType()) {
case HUDI:
scanProvider = new HudiScanProvider(hmsTable, desc);
break;
case ICEBERG:
scanProvider = new IcebergScanProvider(hmsTable, desc);
break;
case HIVE:
scanProvider = new HiveScanProvider(hmsTable, desc);
break;
default:
throw new UserException("Unknown table type: " + hmsTable.getDlaType());
}
this.scanProviders.add(scanProvider);
break;
case LOAD:
for (FileGroupInfo fileGroupInfo : fileGroupInfos) {
@ -202,6 +187,38 @@ public class ExternalFileScanNode extends ExternalScanNode {
initParamCreateContexts(analyzer);
}
private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException {
Preconditions.checkNotNull(hmsTable);
if (hmsTable.isView()) {
throw new AnalysisException(
String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(),
hmsTable.getDbName(), hmsTable.getName()));
}
FileScanProviderIf scanProvider;
switch (hmsTable.getDlaType()) {
case HUDI:
scanProvider = new HudiScanProvider(hmsTable, desc);
break;
case ICEBERG:
scanProvider = new IcebergScanProvider(hmsTable, desc);
break;
case HIVE:
scanProvider = new HiveScanProvider(hmsTable, desc);
break;
default:
throw new UserException("Unknown table type: " + hmsTable.getDlaType());
}
this.scanProviders.add(scanProvider);
}
private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) {
Preconditions.checkNotNull(table);
FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, tvf);
this.scanProviders.add(scanProvider);
}
// For each scan provider, create a corresponding ParamCreateContext
private void initParamCreateContexts(Analyzer analyzer) throws UserException {
for (FileScanProviderIf scanProvider : scanProviders) {

View File

@ -24,13 +24,11 @@ import org.apache.hadoop.hive.metastore.api.Table;
import java.util.Map;
/**
* An interface for hms table scan node to get the need information.
*/
public interface HMSTableScanProviderIf extends FileScanProviderIf {
String getMetaStoreUrl();
public abstract class HMSTableScanProvider extends QueryScanProvider {
Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
public abstract String getMetaStoreUrl();
Map<String, String> getTableProperties() throws MetaNotFoundException;
public abstract Table getRemoteHiveTable() throws DdlException, MetaNotFoundException;
public abstract Map<String, String> getTableProperties() throws MetaNotFoundException;
}

View File

@ -29,27 +29,14 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
@ -59,7 +46,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@ -75,7 +61,7 @@ import java.util.stream.Collectors;
/**
* A HiveScanProvider to get information for scan node.
*/
public class HiveScanProvider implements HMSTableScanProviderIf {
public class HiveScanProvider extends HMSTableScanProvider {
private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class);
private static final String PROP_FIELD_DELIMITER = "field.delim";
@ -84,8 +70,6 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
protected HMSExternalTable hmsTable;
protected int inputSplitNum = 0;
protected long inputFileSize = 0;
protected final TupleDescriptor desc;
public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) {
@ -257,128 +241,19 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
}
@Override
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
try {
List<InputSplit> inputSplits = getSplits(context.conjuncts);
this.inputSplitNum = inputSplits.size();
if (inputSplits.isEmpty()) {
return;
}
String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString();
String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath();
String fsName = fullPath.replace(filePath, "");
TFileType locationType = getLocationType();
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
String columnSeparator
= hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER);
textParams.setColumnSeparator(columnSeparator);
textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
context.params.setFileAttributes(fileAttributes);
}
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
} else if (locationType == TFileType.FILE_S3) {
context.params.setProperties(locationProperties);
}
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
for (InputSplit split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.info(
"Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: "
+ fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"
+ " loaction: " + Joiner.on("|").join(split.getLocations()));
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
if (fileSplitStrategy.hasNext()) {
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, backendPolicy);
fileSplitStrategy.next();
}
this.inputFileSize += fileSplit.getLength();
}
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeLocations.add(curLocations);
}
} catch (IOException e) {
throw new UserException(e);
}
public String getColumnSeparator() throws UserException {
return hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER);
}
@Override
public int getInputSplitNum() {
return this.inputSplitNum;
public String getLineSeparator() {
return DEFAULT_LINE_DELIMITER;
}
@Override
public long getInputFileSize() {
return this.inputFileSize;
}
private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.setParams(params);
// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
TScanRange scanRange = new TScanRange();
scanRange.setExtScanRange(externalScanRange);
// Locations
TScanRangeLocations locations = new TScanRangeLocations();
locations.setScanRange(scanRange);
TScanRangeLocation location = new TScanRangeLocation();
Backend selectedBackend = backendPolicy.getNextBe();
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
locations.addToLocations(location);
return locations;
}
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys)
throws DdlException, MetaNotFoundException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
if (getLocationType() == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (getLocationType() == TFileType.FILE_S3) {
rangeDesc.setPath(fileSplit.getPath().toString());
}
return rangeDesc;
public String getHeaderType() {
return "";
}
}

View File

@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Joiner;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public abstract class QueryScanProvider implements FileScanProviderIf {
public static final Logger LOG = LogManager.getLogger(QueryScanProvider.class);
private int inputSplitNum = 0;
private long inputFileSize = 0;
public abstract String getColumnSeparator() throws UserException;
public abstract String getLineSeparator();
public abstract String getHeaderType();
@Override
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
try {
List<InputSplit> inputSplits = getSplits(context.conjuncts);
this.inputSplitNum = inputSplits.size();
if (inputSplits.isEmpty()) {
return;
}
String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString();
String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath();
String fsName = fullPath.replace(filePath, "");
TFileType locationType = getLocationType();
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
textParams.setColumnSeparator(getColumnSeparator());
textParams.setLineDelimiter(getLineSeparator());
TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
fileAttributes.setHeaderType(getHeaderType());
context.params.setFileAttributes(fileAttributes);
}
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
} else if (locationType == TFileType.FILE_S3) {
context.params.setProperties(locationProperties);
}
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
for (InputSplit split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.info(
"Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: "
+ fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"
+ " loaction: " + Joiner.on("|").join(split.getLocations()));
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
if (fileSplitStrategy.hasNext()) {
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, backendPolicy);
fileSplitStrategy.next();
}
this.inputFileSize += fileSplit.getLength();
}
if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
scanRangeLocations.add(curLocations);
}
} catch (IOException e) {
throw new UserException(e);
}
}
@Override
public int getInputSplitNum() {
return this.inputSplitNum;
}
@Override
public long getInputFileSize() {
return this.inputFileSize;
}
private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.setParams(params);
// Scan range
TExternalScanRange externalScanRange = new TExternalScanRange();
externalScanRange.setFileScanRange(fileScanRange);
TScanRange scanRange = new TScanRange();
scanRange.setExtScanRange(externalScanRange);
// Locations
TScanRangeLocations locations = new TScanRangeLocations();
locations.setScanRange(scanRange);
TScanRangeLocation location = new TScanRangeLocation();
Backend selectedBackend = backendPolicy.getNextBe();
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
locations.addToLocations(location);
return locations;
}
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys)
throws DdlException, MetaNotFoundException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
if (getLocationType() == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (getLocationType() == TFileType.FILE_S3) {
rangeDesc.setPath(fileSplit.getPath().toString());
}
return rangeDesc;
}
}

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class TVFScanProvider extends QueryScanProvider {
private FunctionGenTable tvfTable;
private final TupleDescriptor desc;
private ExternalFileTableValuedFunction tableValuedFunction;
public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc,
ExternalFileTableValuedFunction tableValuedFunction) {
this.tvfTable = tvfTable;
this.desc = desc;
this.tableValuedFunction = tableValuedFunction;
}
// =========== implement abstract methods of QueryScanProvider =================
@Override
public String getColumnSeparator() throws UserException {
return tableValuedFunction.getColumnSeparator();
}
@Override
public String getLineSeparator() {
return tableValuedFunction.getLineSeparator();
}
@Override
public String getHeaderType() {
return tableValuedFunction.getHeaderType();
}
// =========== implement interface methods of FileScanProviderIf ================
@Override
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
return tableValuedFunction.getTFileFormatType();
}
@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return tableValuedFunction.getTFileType();
}
@Override
public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException {
List<InputSplit> splits = Lists.newArrayList();
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
for (TBrokerFileStatus fileStatus : fileStatuses) {
Path path = new Path(fileStatus.getPath());
FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(), new String[0]);
splits.add(fileSplit);
}
return splits;
}
@Override
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
return tableValuedFunction.getLocationProperties();
}
@Override
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return Lists.newArrayList();
}
@Override
public ParamCreateContext createContext(Analyzer analyzer) throws UserException {
ParamCreateContext context = new ParamCreateContext();
context.params = new TFileScanRangeParams();
context.destTupleDescriptor = desc;
context.params.setDestTupleId(desc.getId().asInt());
// no use, only for avoid null exception.
context.fileGroup = new BrokerFileGroup(tvfTable.getId(), "", "");
// Hive table must extract partition value from path and hudi/iceberg table keep
// partition field in file.
List<String> partitionKeys = getPathPartitionKeys();
List<Column> columns = tvfTable.getBaseSchema(false);
context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
slotInfo.setSlotId(slot.getId().asInt());
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
context.params.addToRequiredSlots(slotInfo);
}
return context;
}
@Override
public TableIf getTargetTable() {
return tvfTable;
}
}

View File

@ -93,6 +93,11 @@ public class BackendServiceClient {
return blockingStub.fetchData(request);
}
public Future<InternalService.PFetchTableSchemaResult> fetchTableStructureAsync(
InternalService.PFetchTableSchemaRequest request) {
return stub.fetchTableSchema(request);
}
public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) {
return stub.updateCache(request);
}

View File

@ -189,6 +189,18 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PFetchTableSchemaResult> fetchTableStructureAsync(
TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.fetchTableStructureAsync(request);
} catch (Throwable e) {
LOG.warn("fetch table structure catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PCacheResponse> updateCache(
TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException {
try {

View File

@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TDataGenFunctionName;
import java.util.List;
public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf {
public abstract List<TableValuedFunctionTask> getTasks() throws AnalysisException;
public abstract TDataGenFunctionName getDataGenFunctionName();
}

View File

@ -0,0 +1,233 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.ExternalFileScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
import org.apache.doris.proto.Types.PScalarType;
import org.apache.doris.proto.Types.PTypeDesc;
import org.apache.doris.proto.Types.PTypeNode;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL table-valued-function
*/
public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf {
public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class);
public static final String DEFAULT_COLUMN_SEPARATOR = ",";
public static final String DEFAULT_LINE_DELIMITER = "\n";
protected List<Column> columns = null;
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
protected Map<String, String> locationProperties;
protected TFileFormatType fileFormatType;
protected String headerType = "";
protected String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
protected String lineDelimiter = DEFAULT_LINE_DELIMITER;
public abstract TFileType getTFileType();
public abstract String getFilePath();
public abstract BrokerDesc getBrokerDesc();
public TFileFormatType getTFileFormatType() {
return fileFormatType;
}
public Map<String, String> getLocationProperties() {
return locationProperties;
}
public String getColumnSeparator() {
return columnSeparator;
}
public String getLineSeparator() {
return lineDelimiter;
}
public String getHeaderType() {
return headerType;
}
public void parseFile() throws UserException {
String path = getFilePath();
BrokerDesc brokerDesc = getBrokerDesc();
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
}
public List<TBrokerFileStatus> getFileStatuses() {
return fileStatuses;
}
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
return new ExternalFileScanNode(id, desc);
}
@Override
public List<Column> getTableColumns() throws AnalysisException {
if (this.columns != null) {
return columns;
}
// get one BE address
TNetworkAddress address = null;
columns = Lists.newArrayList();
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
break;
}
}
if (address == null) {
throw new AnalysisException("No Alive backends");
}
try {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
.fetchTableStructureAsync(address, request);
InternalService.PFetchTableSchemaResult result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchTableStructureAsync failed. backend address: "
+ address.getHostname() + ":" + address.getPort();
}
throw new AnalysisException(errMsg);
}
fillColumns(result);
} catch (RpcException e) {
throw new AnalysisException("fetchTableStructureResult rpc exception", e);
} catch (InterruptedException e) {
throw new AnalysisException("fetchTableStructureResult interrupted exception", e);
} catch (ExecutionException e) {
throw new AnalysisException("fetchTableStructureResult exception", e);
} catch (TException e) {
throw new AnalysisException("getFetchTableStructureRequest exception", e);
}
return columns;
}
private void fillColumns(InternalService.PFetchTableSchemaResult result)
throws AnalysisException {
if (result.getColumnNums() == 0) {
throw new AnalysisException("The amount of column is 0");
}
for (int idx = 0; idx < result.getColumnNums(); ++idx) {
PTypeDesc type = result.getColumnTypes(idx);
String colName = result.getColumnNames(idx);
for (PTypeNode typeNode : type.getTypesList()) {
// only support ScalarType.
PScalarType scalarType = typeNode.getScalarType();
TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(scalarType.getType());
columns.add(new Column(colName, PrimitiveType.fromThrift(tPrimitiveType), true));
}
}
}
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException {
// set TFileScanRangeParams
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
fileScanRangeParams.setFileType(getTFileType());
fileScanRangeParams.setFormatType(fileFormatType);
fileScanRangeParams.setProperties(locationProperties);
fileScanRangeParams.setFileAttributes(getFileAttributes());
// get first file, used to parse table schema
TBrokerFileStatus firstFile = null;
for (TBrokerFileStatus fileStatus : fileStatuses) {
if (fileStatus.isIsDir()) {
continue;
}
firstFile = fileStatus;
break;
}
if (firstFile == null) {
throw new AnalysisException("Can not get first file, please check s3 uri.");
}
// set TFileRangeDesc
TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
fileRangeDesc.setPath(firstFile.getPath());
fileRangeDesc.setStartOffset(0);
fileRangeDesc.setSize(firstFile.getSize());
// set TFileScanRange
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.addToRanges(fileRangeDesc);
fileScanRange.setParams(fileScanRangeParams);
return InternalService.PFetchTableSchemaRequest.newBuilder()
.setFileScanRange(ByteString.copyFrom(new TSerializer().serialize(fileScanRange))).build();
}
private TFileAttributes getFileAttributes() {
TFileAttributes fileAttributes = new TFileAttributes();
if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams();
fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
fileAttributes.setTextParams(fileTextScanRangeParams);
fileAttributes.setHeaderType(this.headerType);
}
return fileAttributes;
}
}

View File

@ -17,21 +17,22 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.DataGenScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDataGenFunctionName;
import org.apache.doris.thrift.TDataGenScanRange;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TTVFNumbersScanRange;
import org.apache.doris.thrift.TTVFScanRange;
import org.apache.doris.thrift.TTVFunctionName;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
@ -43,9 +44,8 @@ import java.util.List;
/**
* The Implement of table valued function——numbers(N,M).
*/
public class NumbersTableValuedFunction extends TableValuedFunctionInf {
public class NumbersTableValuedFunction extends DataGenTableValuedFunction {
public static final String NAME = "numbers";
private static final Logger LOG = LogManager.getLogger(PlanNode.class);
// The total numbers will be generated.
private long totalNumbers;
// The total backends will server it.
@ -71,8 +71,8 @@ public class NumbersTableValuedFunction extends TableValuedFunctionInf {
}
@Override
public TTVFunctionName getFuncName() {
return TTVFunctionName.NUMBERS;
public TDataGenFunctionName getDataGenFunctionName() {
return TDataGenFunctionName.NUMBERS;
}
@Override
@ -102,13 +102,18 @@ public class NumbersTableValuedFunction extends TableValuedFunctionInf {
List<TableValuedFunctionTask> res = Lists.newArrayList();
for (int i = 0; i < tabletsNum; ++i) {
TScanRange scanRange = new TScanRange();
TTVFScanRange tvfScanRange = new TTVFScanRange();
TDataGenScanRange dataGenScanRange = new TDataGenScanRange();
TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange();
tvfNumbersScanRange.setTotalNumbers(totalNumbers);
tvfScanRange.setNumbersParams(tvfNumbersScanRange);
scanRange.setTvfScanRange(tvfScanRange);
dataGenScanRange.setNumbersParams(tvfNumbersScanRange);
scanRange.setDataGenScanRange(dataGenScanRange);
res.add(new TableValuedFunctionTask(backendList.get(i % backendList.size()), scanRange));
}
return res;
}
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
return new DataGenScanNode(id, desc, "DataGenScanNode", this);
}
}

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Maps;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
/**
* The Implement of table valued function——S3(path, AK, SK, format).
*/
public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
public static final Logger LOG = LogManager.getLogger(S3TableValuedFunction.class);
public static final String NAME = "s3";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
public static final String USE_PATH_STYLE = "use_path_style";
private S3URI s3uri;
private String s3AK;
private String s3SK;
public S3TableValuedFunction(List<String> params) throws UserException {
if (params.size() != 4) {
throw new UserException(
"s3 table function only support 4 params now: S3(path, AK, SK, format)");
}
s3uri = S3URI.create(params.get(0));
s3AK = params.get(1);
s3SK = params.get(2);
String formatString = params.get(3).toLowerCase();
switch (formatString) {
case "csv":
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names":
this.headerType = FeConstants.csv_with_names;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names_and_types":
this.headerType = FeConstants.csv_with_names_and_types;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
default:
throw new AnalysisException("format:" + formatString + " is not supported.");
}
// set S3 location properties
locationProperties = Maps.newHashMap();
locationProperties.put(S3_ENDPOINT, s3uri.getBucketScheme());
locationProperties.put(S3_AK, s3AK);
locationProperties.put(S3_SK, s3SK);
locationProperties.put(S3_REGION, "");
locationProperties.put(USE_PATH_STYLE, "true");
parseFile();
}
// =========== implement abstract methods of ExternalFileTableValuedFunction =================
@Override
public TFileType getTFileType() {
return TFileType.FILE_S3;
}
@Override
public String getFilePath() {
// must be "s3://..."
return NAME + S3URI.SCHEME_DELIM + s3uri.getKey();
}
@Override
public BrokerDesc getBrokerDesc() {
return new BrokerDesc("S3TvfBroker", StorageType.S3, locationProperties);
}
// =========== implement abstract methods of TableValuedFunctionIf =================
@Override
public String getTableName() {
return "S3TableValuedFunction";
}
}

View File

@ -17,36 +17,43 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TTVFunctionName;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import java.util.List;
public abstract class TableValuedFunctionInf {
public abstract class TableValuedFunctionIf {
private FunctionGenTable table = null;
public abstract TTVFunctionName getFuncName();
public FunctionGenTable getTable() {
FunctionGenTable table = new FunctionGenTable(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION,
getTableColumns());
public FunctionGenTable getTable() throws AnalysisException {
if (table == null) {
table = new FunctionGenTable(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION,
getTableColumns(), this);
}
return table;
}
// All table functions should be registered here
public static TableValuedFunctionInf getTableFunction(String funcName, List<String> params) throws UserException {
if (funcName.equalsIgnoreCase(NumbersTableValuedFunction.NAME)) {
return new NumbersTableValuedFunction(params);
public static TableValuedFunctionIf getTableFunction(String funcName, List<String> params) throws UserException {
switch (funcName.toLowerCase()) {
case NumbersTableValuedFunction.NAME:
return new NumbersTableValuedFunction(params);
case S3TableValuedFunction.NAME:
return new S3TableValuedFunction(params);
default:
throw new UserException("Could not find table function " + funcName);
}
throw new UserException("Could not find table function " + funcName);
}
public abstract String getTableName();
public abstract List<Column> getTableColumns();
public abstract List<Column> getTableColumns() throws AnalysisException;
public abstract List<TableValuedFunctionTask> getTasks() throws AnalysisException;
public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
}

View File

@ -525,6 +525,17 @@ message PTabletWriteSlaveDoneResult {
optional PStatus status = 1;
};
message PFetchTableSchemaRequest {
optional bytes file_scan_range = 1;
};
message PFetchTableSchemaResult {
optional PStatus status = 1;
optional int32 column_nums = 2;
repeated string column_names = 3;
repeated PTypeDesc column_types = 4;
};
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@ -558,5 +569,6 @@ service PBackendService {
rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);
rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns (PTabletWriteSlaveResult);
rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) returns (PTabletWriteSlaveDoneResult);
rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult);
};

View File

@ -53,7 +53,7 @@ enum TPlanNodeType {
EXCEPT_NODE,
ODBC_SCAN_NODE,
TABLE_FUNCTION_NODE,
TABLE_VALUED_FUNCTION_SCAN_NODE,
DATA_GEN_SCAN_NODE,
FILE_SCAN_NODE,
JDBC_SCAN_NODE,
}
@ -324,7 +324,7 @@ struct TExternalScanRange {
// TODO: add more scan range type?
}
enum TTVFunctionName {
enum TDataGenFunctionName {
NUMBERS = 0,
}
@ -334,7 +334,7 @@ struct TTVFNumbersScanRange {
1: optional i64 totalNumbers
}
struct TTVFScanRange {
struct TDataGenScanRange {
1: optional TTVFNumbersScanRange numbers_params
}
@ -347,7 +347,7 @@ struct TScanRange {
6: optional TBrokerScanRange broker_scan_range
7: optional TEsScanRange es_scan_range
8: optional TExternalScanRange ext_scan_range
9: optional TTVFScanRange tvf_scan_range
9: optional TDataGenScanRange data_gen_scan_range
}
struct TMySQLScanNode {
@ -903,9 +903,9 @@ struct TRuntimeFilterDesc {
9: optional i64 bloom_filter_size_bytes
}
struct TTableValuedFunctionScanNode {
struct TDataGenScanNode {
1: optional Types.TTupleId tuple_id
2: optional TTVFunctionName func_name
2: optional TDataGenFunctionName func_name
}
// This is essentially a union of all messages corresponding to subclasses
@ -959,7 +959,7 @@ struct TPlanNode {
// output column
42: optional list<Types.TSlotId> output_slot_ids
43: optional TTableValuedFunctionScanNode table_valued_func_scan_node
43: optional TDataGenScanNode data_gen_scan_node
// file scan node
44: optional TFileScanNode file_scan_node