[feature-wip](new-scan)Add new file scan node (#12048)

Related pr: #11582
This is the new file scan node and scanner for external hms catalog.
This commit is contained in:
Jibing-Li
2022-09-01 10:01:20 +08:00
committed by GitHub
parent 65051d67cf
commit ec4863b63a
12 changed files with 1058 additions and 3 deletions

View File

@ -61,6 +61,7 @@
#include "vec/core/block.h"
#include "vec/exec/file_scan_node.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vanalytic_eval_node.h"
@ -584,7 +585,12 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::FILE_SCAN_NODE:
*node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
// *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
if (config::enable_new_scan_node) {
*node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs));
} else {
*node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
}
return Status::OK();
@ -704,7 +710,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
// TODO(cmy): should be removed when NewOlapScanNode is ready
ExecNode* child0 = agg_node[0]->_children[0];
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode)) {
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
typeid(*child0) == typeid(vectorized::NewFileScanNode)) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
scan_node->set_no_agg_finalize();

View File

@ -46,6 +46,7 @@
#include "util/telemetry/telemetry.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
@ -166,7 +167,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
for (int i = 0; i < scan_nodes.size(); ++i) {
// TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode.
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode)) {
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode)) {
vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);

View File

@ -239,6 +239,10 @@ set(VEC_FILES
exec/scan/scanner_scheduler.cpp
exec/scan/new_olap_scan_node.cpp
exec/scan/new_olap_scanner.cpp
exec/scan/new_file_arrow_scanner.cpp
exec/scan/new_file_scan_node.cpp
exec/scan/new_file_scanner.cpp
exec/scan/new_file_text_scanner.cpp
)
add_library(Vec STATIC

View File

@ -0,0 +1,224 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/new_file_arrow_scanner.h"
#include "exec/arrow/orc_reader.h"
#include "exec/arrow/parquet_reader.h"
#include "io/file_factory.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris::vectorized {
NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit, const TFileScanRange& scan_range,
MemTracker* tracker, RuntimeProfile* profile)
: NewFileScanner(state, parent, limit, scan_range, tracker, profile),
_cur_file_reader(nullptr),
_cur_file_eof(false),
_batch(nullptr),
_arrow_batch_cur_idx(0) {}
Status NewFileArrowScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(NewFileScanner::open(state));
// SCOPED_TIMER(_parent->_reader_init_timer);
// SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
return Status::OK();
}
Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
// init arrow batch
{
Status st = _init_arrow_batch_if_necessary();
if (!st.ok()) {
if (!st.is_end_of_file()) {
return st;
}
*eof = true;
return Status::OK();
}
}
*eof = false;
RETURN_IF_ERROR(init_block(block));
// convert arrow batch to block until reach the batch_size
while (!_scanner_eof) {
// cast arrow type to PT0 and append it to block
// for example: arrow::Type::INT16 => TYPE_SMALLINT
RETURN_IF_ERROR(_append_batch_to_block(block));
// finalize the block if full
if (_rows >= _state->batch_size()) {
break;
}
auto status = _next_arrow_batch();
// if ok, append the batch to the columns
if (status.ok()) {
continue;
}
// return error if not EOF
if (!status.is_end_of_file()) {
return status;
}
_cur_file_eof = true;
break;
}
if (_scanner_eof && block->rows() == 0) {
*eof = true;
}
// return finalize_block(block, eof);
return Status::OK();
}
Status NewFileArrowScanner::_init_arrow_batch_if_necessary() {
// 1. init batch if first time
// 2. reset reader if end of file
Status status = Status::OK();
if (_scanner_eof) {
return Status::EndOfFile("EOF");
}
if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
return _next_arrow_batch();
}
return status;
}
Status NewFileArrowScanner::_append_batch_to_block(Block* block) {
size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
(_batch->num_rows() - _arrow_batch_cur_idx));
for (auto i = 0; i < _file_slot_descs.size(); ++i) {
SlotDescriptor* slot_desc = _file_slot_descs[i];
if (slot_desc == nullptr) {
continue;
}
std::string real_column_name = _cur_file_reader->is_case_sensitive()
? slot_desc->col_name()
: slot_desc->col_name_lower_case();
auto* array = _batch->GetColumnByName(real_column_name).get();
auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(
array, _arrow_batch_cur_idx, column_with_type_and_name.column,
column_with_type_and_name.type, num_elements, _state->timezone_obj()));
}
_rows += num_elements;
_arrow_batch_cur_idx += num_elements;
return _fill_columns_from_path(block, num_elements);
}
Status NewFileArrowScanner::_next_arrow_batch() {
_arrow_batch_cur_idx = 0;
// first, init file reader
if (_cur_file_reader == nullptr || _cur_file_eof) {
RETURN_IF_ERROR(_open_next_reader());
_cur_file_eof = false;
}
// second, loop until find available arrow batch or EOF
while (!_scanner_eof) {
RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
if (_cur_file_eof) {
RETURN_IF_ERROR(_open_next_reader());
_cur_file_eof = false;
continue;
}
if (_batch->num_rows() == 0) {
continue;
}
return Status::OK();
}
return Status::EndOfFile("EOF");
}
Status NewFileArrowScanner::_open_next_reader() {
// open_file_reader
if (_cur_file_reader != nullptr) {
delete _cur_file_reader;
_cur_file_reader = nullptr;
}
while (true) {
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
return Status::OK();
}
const TFileRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
range, file_reader));
RETURN_IF_ERROR(file_reader->open());
if (file_reader->size() == 0) {
file_reader->close();
continue;
}
int32_t num_of_columns_from_file = _file_slot_descs.size();
_cur_file_reader =
_new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file, range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id());
// TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty.
Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
_state->timezone());
if (status.is_end_of_file()) {
continue;
} else {
if (!status.ok()) {
std::stringstream ss;
ss << " file: " << range.path << " error:" << status.get_error_msg();
return Status::InternalError(ss.str());
} else {
// _update_profile(_cur_file_reader->statistics());
return status;
}
}
}
}
NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit, const TFileScanRange& scan_range,
MemTracker* tracker, RuntimeProfile* profile)
: NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) {
// _init_profiles(profile);
}
ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* file_reader,
int64_t batch_size,
int32_t num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) {
return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
range_start_offset, range_size, false);
}
NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile)
: NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile) {}
ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset,
range_size, false);
}
} // namespace doris::vectorized

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <exec/arrow/arrow_reader.h>
#include "exprs/bloomfilter_predicate.h"
#include "exprs/function_filter.h"
#include "vec/exec/scan/new_file_scanner.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
class NewFileArrowScanner : public NewFileScanner {
public:
NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile);
Status open(RuntimeState* state) override;
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size) = 0;
private:
Status _open_next_reader();
Status _next_arrow_batch();
Status _init_arrow_batch_if_necessary();
Status _append_batch_to_block(Block* block);
private:
// Reader
ArrowReaderWrap* _cur_file_reader;
bool _cur_file_eof; // is read over?
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx;
};
class NewFileParquetScanner final : public NewFileArrowScanner {
public:
NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile);
~NewFileParquetScanner() override = default;
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override {};
};
class NewFileORCScanner final : public NewFileArrowScanner {
public:
NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile);
~NewFileORCScanner() override = default;
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t range_start_offset,
int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override {};
};
} // namespace doris::vectorized

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/columns/column_const.h"
#include "vec/exec/scan/new_file_arrow_scanner.h"
#include "vec/exec/scan/new_file_text_scanner.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/functions/in.h"
namespace doris::vectorized {
NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: VScanNode(pool, tnode, descs), _file_scan_node(tnode.file_scan_node) {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}
Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
_scanner_mem_tracker = std::make_unique<MemTracker>("NewFileScanners");
return Status::OK();
}
void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
int max_scanners = config::doris_scanner_thread_pool_thread_num;
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of threads in thread pool.
_scan_ranges.clear();
auto range_iter = scan_ranges.begin();
for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
_scan_ranges.push_back(*range_iter);
}
for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
if (i == max_scanners) {
i = 0;
}
auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
}
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
}
Status NewFileScanNode::_init_profile() {
VScanNode::_init_profile();
return Status::OK();
}
Status NewFileScanNode::_process_conjuncts() {
RETURN_IF_ERROR(VScanNode::_process_conjuncts());
if (_eos) {
return Status::OK();
}
// TODO: Push conjuncts down to reader.
return Status::OK();
}
Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
return Status::OK();
}
for (auto& scan_range : _scan_ranges) {
VScanner* scanner =
(VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range);
scanners->push_back(scanner);
}
return Status::OK();
}
VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
NewFileScanner* scanner = nullptr;
switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range,
_scanner_mem_tracker.get(), runtime_profile());
break;
case TFileFormatType::FORMAT_ORC:
scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range,
_scanner_mem_tracker.get(), runtime_profile());
break;
default:
scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range,
_scanner_mem_tracker.get(), runtime_profile());
break;
}
_scanner_pool.add(scanner);
scanner->prepare(_vconjunct_ctx_ptr.get());
// TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead?
scanner->reg_conjunct_ctxs(_conjunct_ctxs);
return scanner;
}
}; // namespace doris::vectorized

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "vec/exec/scan/vscan_node.h"
namespace doris::vectorized {
class NewFileScanNode : public VScanNode {
public:
NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status prepare(RuntimeState* state) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
protected:
Status _init_profile() override;
Status _process_conjuncts() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
private:
VScanner* _create_scanner(const TFileScanRange& scan_range);
private:
std::vector<TScanRangeParams> _scan_ranges;
TFileScanNode _file_scan_node;
std::unique_ptr<MemTracker> _scanner_mem_tracker;
};
} // namespace doris::vectorized

View File

@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/new_file_scanner.h"
#include <fmt/format.h>
#include <vec/data_types/data_type_factory.hpp>
#include "common/logging.h"
#include "common/utils.h"
#include "exec/exec_node.h"
#include "exec/text_converter.hpp"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "vec/exec/scan/new_file_scan_node.h"
namespace doris::vectorized {
NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
_profile(profile) {}
Status NewFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_init_expr_ctxes());
return Status::OK();
}
Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (vconjunct_ctx_ptr != nullptr) {
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
}
return Status::OK();
}
Status NewFileScanner::_init_expr_ctxes() {
const TupleDescriptor* src_tuple_desc =
_state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
if (src_tuple_desc == nullptr) {
std::stringstream ss;
ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
return Status::InternalError(ss.str());
}
DCHECK(!_ranges.empty());
std::map<SlotId, int> _full_src_index_map;
std::map<SlotId, SlotDescriptor*> _full_src_slot_map;
int index = 0;
for (const auto& slot_desc : src_tuple_desc->slots()) {
_full_src_slot_map.emplace(slot_desc->id(), slot_desc);
_full_src_index_map.emplace(slot_desc->id(), index++);
}
_num_of_columns_from_file = _params.num_of_columns_from_file;
for (const auto& slot_info : _params.required_slots) {
auto slot_id = slot_info.slot_id;
auto it = _full_src_slot_map.find(slot_id);
if (it == std::end(_full_src_slot_map)) {
std::stringstream ss;
ss << "Unknown source slot descriptor, slot_id=" << slot_id;
return Status::InternalError(ss.str());
}
_required_slot_descs.emplace_back(it->second);
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
auto iti = _full_src_index_map.find(slot_id);
_file_slot_index_map.emplace(slot_id, iti->second);
} else {
_partition_slot_descs.emplace_back(it->second);
auto iti = _full_src_index_map.find(slot_id);
_partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
}
}
_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_params.src_tuple_id}),
std::vector<bool>({false})));
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
// for vectorized, preceding filter exprs should be compounded to one passed from fe.
DCHECK(_pre_filter_texprs.size() == 1);
_vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
}
return Status::OK();
}
Status NewFileScanner::init_block(vectorized::Block* block) {
(*block).clear();
_rows = 0;
for (const auto& slot_desc : _required_slot_descs) {
if (slot_desc == nullptr) {
continue;
}
auto is_nullable = slot_desc->is_nullable();
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
is_nullable);
if (data_type == nullptr) {
return Status::NotSupported(
fmt::format("Not support type for column:{}", slot_desc->col_name()));
}
MutableColumnPtr data_column = data_type->create_column();
(*block).insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
return Status::OK();
}
Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc == nullptr) continue;
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
std::stringstream ss;
ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
return Status::InternalError(ss.str());
}
const std::string& column_from_path = range.columns_from_path[it->second];
auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
for (size_t j = 0; j < rows; ++j) {
_text_converter->write_vec_column(slot_desc, col_ptr,
const_cast<char*>(column_from_path.c_str()),
column_from_path.size(), true, false);
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "exec/text_converter.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/function_filter.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
class NewFileScanNode;
class NewFileScanner : public VScanner {
public:
NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile);
Status open(RuntimeState* state) override;
Status prepare(VExprContext** vconjunct_ctx_ptr);
protected:
virtual void _init_profiles(RuntimeProfile* profile) = 0;
Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
Status init_block(vectorized::Block* block);
std::unique_ptr<TextConverter> _text_converter;
const TFileScanRangeParams& _params;
const std::vector<TFileRangeDesc>& _ranges;
int _next_range;
// Used for constructing tuple
std::vector<SlotDescriptor*> _required_slot_descs;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
// File slot id to index map.
std::map<SlotId, int> _file_slot_index_map;
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index map
std::map<SlotId, int> _partition_slot_index_map;
std::unique_ptr<RowDescriptor> _row_desc;
// Profile
RuntimeProfile* _profile;
RuntimeProfile::Counter* _rows_read_counter;
RuntimeProfile::Counter* _read_timer;
bool _scanner_eof = false;
int _rows = 0;
int _num_of_columns_from_file;
private:
Status _init_expr_ctxes();
};
} // namespace doris::vectorized

View File

@ -0,0 +1,251 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/scan/new_file_text_scanner.h"
#include "exec/plain_text_line_reader.h"
#include "io/file_factory.h"
#include "util/utf8_check.h"
#include "vec/exec/scan/vscan_node.h"
namespace doris::vectorized {
NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile)
: NewFileScanner(state, parent, limit, scan_range, tracker, profile),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_line_reader_eof(false),
_skip_lines(0),
_success(false) {}
Status NewFileTextScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(NewFileScanner::open(state));
if (_ranges.empty()) {
return Status::OK();
}
_split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
return Status::OK();
}
Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
SCOPED_TIMER(_read_timer);
RETURN_IF_ERROR(init_block(block));
const int batch_size = state->batch_size();
*eof = false;
int current_rows = _rows;
while (_rows < batch_size && !_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
RETURN_IF_ERROR(_open_next_reader());
// If there isn't any more reader, break this
if (_scanner_eof) {
continue;
}
}
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
if (_skip_lines > 0) {
_skip_lines--;
continue;
}
if (size == 0) {
// Read empty row, just continue
continue;
}
{
COUNTER_UPDATE(_rows_read_counter, 1);
RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
}
if (_cur_line_reader_eof) {
RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
current_rows = _rows;
}
}
if (_scanner_eof && block->rows() == 0) {
*eof = true;
}
return Status::OK();
}
Status NewFileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
RETURN_IF_ERROR(_line_split_to_values(line));
if (!_success) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}
for (int i = 0; i < _split_values.size(); ++i) {
auto slot_desc = _file_slot_descs[i];
const Slice& value = _split_values[i];
auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
_text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
}
_rows++;
return Status::OK();
}
Status NewFileTextScanner::_line_split_to_values(const Slice& line) {
if (!validate_utf8(line.data, line.size)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[]() -> std::string { return "Unable to display"; },
[]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", "Unable to display");
return fmt::to_string(error_msg);
},
&_scanner_eof));
_success = false;
return Status::OK();
}
RETURN_IF_ERROR(_split_line(line));
_success = true;
return Status::OK();
}
Status NewFileTextScanner::_open_next_reader() {
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
return Status::OK();
}
RETURN_IF_ERROR(_open_file_reader());
RETURN_IF_ERROR(_open_line_reader());
_next_range++;
return Status::OK();
}
Status NewFileTextScanner::_open_file_reader() {
const TFileRangeDesc& range = _ranges[_next_range];
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range,
_cur_file_reader));
return _cur_file_reader->open();
}
Status NewFileTextScanner::_open_line_reader() {
if (_cur_line_reader != nullptr) {
delete _cur_line_reader;
_cur_line_reader = nullptr;
}
const TFileRangeDesc& range = _ranges[_next_range];
int64_t size = range.size;
if (range.start_offset != 0) {
if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
std::stringstream ss;
ss << "For now we do not support split compressed file";
return Status::InternalError(ss.str());
}
size += 1;
// not first range will always skip one line
_skip_lines = 1;
}
// open line reader
switch (_params.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
_line_delimiter, _line_delimiter_length);
break;
default: {
std::stringstream ss;
ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
return Status::InternalError(ss.str());
}
}
_cur_line_reader_eof = false;
return Status::OK();
}
Status NewFileTextScanner::_split_line(const Slice& line) {
_split_values.clear();
std::vector<Slice> tmp_split_values;
tmp_split_values.reserve(_num_of_columns_from_file);
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.
size_t non_space = 0; // point to the last pos of non_space charactor.
// Separator: AAAA
//
// p1
// ▼
// AAAA
// 1000AAAA2000AAAA
// ▲ ▲
// Start │
// curpos
while (curpos < line.size) {
if (*(value + curpos + p1) != _value_separator[p1]) {
// Not match, move forward:
curpos += (p1 == 0 ? 1 : p1);
p1 = 0;
} else {
p1++;
if (p1 == _value_separator_length) {
// Match a separator
non_space = curpos;
// Trim tailing spaces. Be consistent with hive and trino's behavior.
if (_state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
}
tmp_split_values.emplace_back(value + start, non_space - start);
start = curpos + _value_separator_length;
curpos = start;
p1 = 0;
non_space = 0;
}
}
}
CHECK(curpos == line.size) << curpos << " vs " << line.size;
non_space = curpos;
if (_state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
}
tmp_split_values.emplace_back(value + start, non_space - start);
for (const auto& slot : _file_slot_descs) {
auto it = _file_slot_index_map.find(slot->id());
if (it == std::end(_file_slot_index_map)) {
std::stringstream ss;
ss << "Unknown _file_slot_index_map, slot_id=" << slot->id();
return Status::InternalError(ss.str());
}
int index = it->second;
CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file;
_split_values.emplace_back(tmp_split_values[index]);
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <exec/arrow/arrow_reader.h>
#include "exec/line_reader.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/function_filter.h"
#include "vec/exec/scan/new_file_scanner.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
class NewFileTextScanner : public NewFileScanner {
public:
NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, MemTracker* tracker,
RuntimeProfile* profile);
Status open(RuntimeState* state) override;
protected:
void _init_profiles(RuntimeProfile* profile) override {}
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
private:
Status _fill_file_columns(const Slice& line, vectorized::Block* _block);
Status _open_next_reader();
Status _open_file_reader();
Status _open_line_reader();
Status _line_split_to_values(const Slice& line);
Status _split_line(const Slice& line);
// Reader
std::shared_ptr<FileReader> _cur_file_reader;
LineReader* _cur_line_reader;
bool _cur_line_reader_eof;
// When we fetch range start from 0, header_type="csv_with_names" skip first line
// When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
// When we fetch range doesn't start
int _skip_lines;
std::vector<Slice> _split_values;
std::string _value_separator;
std::string _line_delimiter;
int _value_separator_length;
int _line_delimiter_length;
bool _success;
};
} // namespace doris::vectorized

View File

@ -18,6 +18,7 @@
#pragma once
#include "common/status.h"
#include "exprs/expr_context.h"
#include "olap/tablet.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr_context.h"
@ -62,6 +63,9 @@ protected:
// Filter the output block finally.
Status _filter_output_block(Block* block);
// to filter src tuple directly.
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
public:
VScanNode* get_parent() { return _parent; }
@ -98,6 +102,10 @@ public:
VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
void reg_conjunct_ctxs(const std::vector<ExprContext*>& conjunct_ctxs) {
_conjunct_ctxs = conjunct_ctxs;
}
protected:
void _discard_conjuncts() {
if (_vconjunct_ctx) {
@ -151,6 +159,11 @@ protected:
// watch to count the time wait for scanner thread
MonotonicStopWatch _watch;
int64_t _scanner_wait_worker_timer = 0;
// File formats based push down predicate
std::vector<ExprContext*> _conjunct_ctxs;
const std::vector<TExpr> _pre_filter_texprs;
};
} // namespace doris::vectorized