[Enhencement](Push Handle) use VParquetScanner in PushHandle (#15980)

* use VParquetScanner in PushHadnle

* delete ParquetScanner
This commit is contained in:
Tiewei Fang
2023-01-17 16:21:04 +08:00
committed by GitHub
parent 151ae71761
commit bbdf40b6bd
6 changed files with 15 additions and 300 deletions

View File

@ -63,7 +63,6 @@ set(EXEC_FILES
odbc_connector.cpp
table_connector.cpp
schema_scanner.cpp
parquet_scanner.cpp
)
if (WITH_LZO)

View File

@ -1,141 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet_scanner.h"
#include "exec/arrow/parquet_reader.h"
#include "io/file_factory.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_pipe.h"
namespace doris {
using namespace ErrorCode;
ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
// _splittable(params.splittable),
_cur_file_reader(nullptr),
_cur_file_eof(false) {}
ParquetScanner::~ParquetScanner() {
close();
}
Status ParquetScanner::open() {
return BaseScanner::open();
}
Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) {
SCOPED_TIMER(_read_timer);
// Get one line
while (!_scanner_eof) {
if (_cur_file_reader == nullptr || _cur_file_eof) {
RETURN_IF_ERROR(open_next_reader());
// If there isn't any more reader, break this
if (_scanner_eof) {
continue;
}
_cur_file_eof = false;
}
RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, tuple_pool, &_cur_file_eof));
// range of current file
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
fill_slots_of_columns_from_path(range.num_of_columns_from_file,
range.columns_from_path);
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
// TODO(weixiang): check whether shallow copy is enough
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
*eof = _scanner_eof;
return Status::OK();
}
Status ParquetScanner::open_next_reader() {
// open_file_reader
if (_cur_file_reader != nullptr) {
if (_stream_load_pipe != nullptr) {
_stream_load_pipe.reset();
_cur_file_reader = nullptr;
} else {
delete _cur_file_reader;
_cur_file_reader = nullptr;
}
}
while (true) {
if (_next_range >= _ranges.size()) {
_scanner_eof = true;
return Status::OK();
}
const TBrokerRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(
range.file_type, _state->exec_env(), _profile, _broker_addresses,
_params.properties, range, range.start_offset, file_reader));
RETURN_IF_ERROR(file_reader->open());
if (file_reader->size() == 0) {
file_reader->close();
continue;
}
int32_t num_of_columns_from_file = _src_slot_descs.size();
if (range.__isset.num_of_columns_from_file) {
num_of_columns_from_file = range.num_of_columns_from_file;
}
_cur_file_reader = new ParquetReaderWrap(_state, _src_slot_descs, file_reader.release(),
num_of_columns_from_file, 0, 0);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status =
_cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone());
if (status.is<END_OF_FILE>()) {
continue;
} else {
if (!status.ok()) {
return Status::InternalError("file: {}, error:{}", range.path, status.to_string());
} else {
RETURN_IF_ERROR(_cur_file_reader->init_parquet_type());
return status;
}
}
}
}
void ParquetScanner::close() {
BaseScanner::close();
if (_cur_file_reader != nullptr) {
if (_stream_load_pipe != nullptr) {
_stream_load_pipe.reset();
_cur_file_reader = nullptr;
} else {
delete _cur_file_reader;
_cur_file_reader = nullptr;
}
}
}
} // namespace doris

View File

@ -1,84 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include "common/status.h"
#include "exec/base_scanner.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
namespace doris {
class Tuple;
class SlotDescriptor;
struct Slice;
class ParquetReaderWrap;
class RuntimeState;
class ExprContext;
class TupleDescriptor;
class TupleRow;
class RowDescriptor;
class RuntimeProfile;
class StreamLoadPipe;
// Broker scanner convert the data read from broker to doris's tuple.
class ParquetScanner : public BaseScanner {
public:
ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
~ParquetScanner() override;
// Open this scanner, will initialize information need to
Status open() override;
// Get next tuple
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override;
Status get_next(vectorized::Block* block, bool* eof) override {
return Status::NotSupported("Not Implemented get block");
}
// Close this scanner
void close() override;
protected:
// Read next buffer from reader
Status open_next_reader();
// Reader
ParquetReaderWrap* _cur_file_reader;
bool _cur_file_eof; // is read over?
// used to hold current StreamLoadPipe
std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
};
} // namespace doris

View File

@ -34,6 +34,7 @@ BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Pat
_broker_addr(broker_addr),
_fd(fd),
_fs(std::move(fs)) {
_fs->get_client(&_client);
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}

View File

@ -24,7 +24,6 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_meta_manager.h"
@ -33,6 +32,7 @@
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "vec/exec/vparquet_scanner.h"
namespace doris {
using namespace ErrorCode;
@ -243,12 +243,10 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
break;
}
// 3. Init Row
std::unique_ptr<uint8_t[]> tuple_buf(new uint8_t[schema->schema_size()]);
ContiguousRow row(schema.get(), tuple_buf.get());
// 3. Init Block
vectorized::Block block;
// 4. Read data from broker and write into cur_tablet
// Convert from raw to delta
VLOG_NOTICE << "start to convert etl file to delta.";
while (!reader->eof()) {
if (reader->mem_pool()->mem_tracker()->consumption() >
@ -256,7 +254,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
RETURN_NOT_OK(rowset_writer->flush());
reader->mem_pool()->free_all();
}
res = reader->next(&row);
res = reader->next(&block);
if (!res.ok()) {
LOG(WARNING) << "read next row failed."
<< " res=" << res << " read_rows=" << num_rows;
@ -265,12 +263,8 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
if (reader->eof()) {
break;
}
//if read row but fill tuple fails,
if (!reader->is_fill_tuple()) {
break;
}
if (!(res = rowset_writer->add_row(row))) {
LOG(WARNING) << "fail to attach row to rowset_writer. "
if (!(res = rowset_writer->add_block(&block))) {
LOG(WARNING) << "fail to attach block to rowset_writer. "
<< "res=" << res << ", tablet=" << cur_tablet->full_name()
<< ", read_rows=" << num_rows;
break;
@ -802,9 +796,6 @@ Status LzoBinaryReader::_next_block() {
Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_scan_range,
const TDescriptorTable& t_desc_tbl) {
// init schema
_schema = schema;
// init runtime state, runtime profile, counter
TUniqueId dummy_id;
dummy_id.hi = 0;
@ -842,9 +833,9 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc
BaseScanner* scanner = nullptr;
switch (t_scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new ParquetScanner(_runtime_state.get(), _runtime_profile, t_scan_range.params,
t_scan_range.ranges, t_scan_range.broker_addresses,
_pre_filter_texprs, _counter.get());
scanner = new vectorized::VParquetScanner(
_runtime_state.get(), _runtime_profile, t_scan_range.params, t_scan_range.ranges,
t_scan_range.broker_addresses, _pre_filter_texprs, _counter.get());
break;
default:
LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type;
@ -857,23 +848,6 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc
return Status::Error<PUSH_INIT_ERROR>();
}
// init tuple
auto tuple_id = t_scan_range.params.dest_tuple_id;
_tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id);
if (_tuple_desc == nullptr) {
std::stringstream ss;
LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id;
return Status::Error<PUSH_INIT_ERROR>();
}
int tuple_buffer_size = _tuple_desc->byte_size();
void* tuple_buffer = _tuple_buffer_pool->allocate(tuple_buffer_size);
if (tuple_buffer == nullptr) {
LOG(WARNING) << "Allocate memory for tuple failed";
return Status::Error<PUSH_INIT_ERROR>();
}
_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
_ready = true;
return Status::OK();
}
@ -944,40 +918,11 @@ Status PushBrokerReader::fill_field_row(RowCursorCell* dst, const char* src, boo
return Status::OK();
}
Status PushBrokerReader::next(ContiguousRow* row) {
if (!_ready || row == nullptr) {
Status PushBrokerReader::next(vectorized::Block* block) {
if (!_ready || block == nullptr) {
return Status::Error<INVALID_ARGUMENT>();
}
memset(_tuple, 0, _tuple_desc->num_null_bytes());
// Get from scanner
Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof, &_fill_tuple);
if (UNLIKELY(!status.ok())) {
LOG(WARNING) << "Scanner get next tuple failed";
return Status::Error<PUSH_INPUT_DATA_ERROR>();
}
if (_eof || !_fill_tuple) {
return Status::OK();
}
auto slot_descs = _tuple_desc->slots();
// finalize row
for (size_t i = 0; i < slot_descs.size(); ++i) {
auto cell = row->cell(i);
const SlotDescriptor* slot = slot_descs[i];
bool is_null = _tuple->is_null(slot->null_indicator_offset());
const void* value = _tuple->get_slot(slot->tuple_offset());
FieldType type = _schema->column(i)->type();
Status field_status =
fill_field_row(&cell, (const char*)value, is_null, _mem_pool.get(), type);
if (field_status != Status::OK()) {
LOG(WARNING) << "fill field row failed in spark load, slot index: " << i
<< ", type: " << type;
return Status::Error<SCHEMA_SCHEMA_FIELD_INVALID>();
}
}
_scanner->get_next(block, &_eof);
return Status::OK();
}

View File

@ -178,12 +178,12 @@ private:
class PushBrokerReader {
public:
PushBrokerReader() : _ready(false), _eof(false), _fill_tuple(false) {}
PushBrokerReader() : _ready(false), _eof(false) {}
~PushBrokerReader() = default;
Status init(const Schema* schema, const TBrokerScanRange& t_scan_range,
const TDescriptorTable& t_desc_tbl);
Status next(ContiguousRow* row);
Status next(vectorized::Block* block);
void print_profile();
Status close() {
@ -191,7 +191,6 @@ public:
return Status::OK();
}
bool eof() const { return _eof; }
bool is_fill_tuple() const { return _fill_tuple; }
MemPool* mem_pool() { return _mem_pool.get(); }
private:
@ -199,10 +198,6 @@ private:
FieldType type);
bool _ready;
bool _eof;
bool _fill_tuple;
TupleDescriptor* _tuple_desc;
Tuple* _tuple;
const Schema* _schema;
std::unique_ptr<RuntimeState> _runtime_state;
RuntimeProfile* _runtime_profile;
std::unique_ptr<MemPool> _mem_pool;