diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index b51fb27aba..678b865b1c 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -63,7 +63,6 @@ set(EXEC_FILES odbc_connector.cpp table_connector.cpp schema_scanner.cpp - parquet_scanner.cpp ) if (WITH_LZO) diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp deleted file mode 100644 index 2714ffb785..0000000000 --- a/be/src/exec/parquet_scanner.cpp +++ /dev/null @@ -1,141 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/parquet_scanner.h" - -#include "exec/arrow/parquet_reader.h" -#include "io/file_factory.h" -#include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/stream_load/stream_load_pipe.h" - -namespace doris { -using namespace ErrorCode; - -ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), - // _splittable(params.splittable), - _cur_file_reader(nullptr), - _cur_file_eof(false) {} - -ParquetScanner::~ParquetScanner() { - close(); -} - -Status ParquetScanner::open() { - return BaseScanner::open(); -} - -Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) { - SCOPED_TIMER(_read_timer); - // Get one line - while (!_scanner_eof) { - if (_cur_file_reader == nullptr || _cur_file_eof) { - RETURN_IF_ERROR(open_next_reader()); - // If there isn't any more reader, break this - if (_scanner_eof) { - continue; - } - _cur_file_eof = false; - } - RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, tuple_pool, &_cur_file_eof)); - // range of current file - const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.num_of_columns_from_file) { - fill_slots_of_columns_from_path(range.num_of_columns_from_file, - range.columns_from_path); - } - - COUNTER_UPDATE(_rows_read_counter, 1); - SCOPED_TIMER(_materialize_timer); - // TODO(weixiang): check whether shallow copy is enough - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); - break; // break always - } - - *eof = _scanner_eof; - return Status::OK(); -} - -Status ParquetScanner::open_next_reader() { - // open_file_reader - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - _cur_file_reader = nullptr; - } else { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - } - - while (true) { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - const TBrokerRangeDesc& range = _ranges[_next_range++]; - std::unique_ptr file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader( - range.file_type, _state->exec_env(), _profile, _broker_addresses, - _params.properties, range, range.start_offset, file_reader)); - RETURN_IF_ERROR(file_reader->open()); - - if (file_reader->size() == 0) { - file_reader->close(); - continue; - } - int32_t num_of_columns_from_file = _src_slot_descs.size(); - if (range.__isset.num_of_columns_from_file) { - num_of_columns_from_file = range.num_of_columns_from_file; - } - _cur_file_reader = new ParquetReaderWrap(_state, _src_slot_descs, file_reader.release(), - num_of_columns_from_file, 0, 0); - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = - _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); - if (status.is()) { - continue; - } else { - if (!status.ok()) { - return Status::InternalError("file: {}, error:{}", range.path, status.to_string()); - } else { - RETURN_IF_ERROR(_cur_file_reader->init_parquet_type()); - return status; - } - } - } -} - -void ParquetScanner::close() { - BaseScanner::close(); - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - _cur_file_reader = nullptr; - } else { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - } -} - -} // namespace doris diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h deleted file mode 100644 index 3c0ca48eae..0000000000 --- a/be/src/exec/parquet_scanner.h +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "common/status.h" -#include "exec/base_scanner.h" -#include "gen_cpp/PlanNodes_types.h" -#include "gen_cpp/Types_types.h" -#include "runtime/mem_pool.h" -#include "util/runtime_profile.h" -#include "util/slice.h" - -namespace doris { - -class Tuple; -class SlotDescriptor; -struct Slice; -class ParquetReaderWrap; -class RuntimeState; -class ExprContext; -class TupleDescriptor; -class TupleRow; -class RowDescriptor; -class RuntimeProfile; -class StreamLoadPipe; - -// Broker scanner convert the data read from broker to doris's tuple. -class ParquetScanner : public BaseScanner { -public: - ParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); - - ~ParquetScanner() override; - - // Open this scanner, will initialize information need to - Status open() override; - - // Get next tuple - Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override; - - Status get_next(vectorized::Block* block, bool* eof) override { - return Status::NotSupported("Not Implemented get block"); - } - - // Close this scanner - void close() override; - -protected: - // Read next buffer from reader - Status open_next_reader(); - - // Reader - ParquetReaderWrap* _cur_file_reader; - bool _cur_file_eof; // is read over? - - // used to hold current StreamLoadPipe - std::shared_ptr _stream_load_pipe; -}; - -} // namespace doris diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 3d3dbfd802..6d1298f003 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -34,6 +34,7 @@ BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Pat _broker_addr(broker_addr), _fd(fd), _fs(std::move(fs)) { + _fs->get_client(&_client); DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 49b01f6418..2f23acf534 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -24,7 +24,6 @@ #include "common/object_pool.h" #include "common/status.h" -#include "exec/parquet_scanner.h" #include "olap/row.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/rowset/rowset_meta_manager.h" @@ -33,6 +32,7 @@ #include "olap/tablet.h" #include "olap/tablet_schema.h" #include "runtime/exec_env.h" +#include "vec/exec/vparquet_scanner.h" namespace doris { using namespace ErrorCode; @@ -243,12 +243,10 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur break; } - // 3. Init Row - std::unique_ptr tuple_buf(new uint8_t[schema->schema_size()]); - ContiguousRow row(schema.get(), tuple_buf.get()); + // 3. Init Block + vectorized::Block block; // 4. Read data from broker and write into cur_tablet - // Convert from raw to delta VLOG_NOTICE << "start to convert etl file to delta."; while (!reader->eof()) { if (reader->mem_pool()->mem_tracker()->consumption() > @@ -256,7 +254,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur RETURN_NOT_OK(rowset_writer->flush()); reader->mem_pool()->free_all(); } - res = reader->next(&row); + res = reader->next(&block); if (!res.ok()) { LOG(WARNING) << "read next row failed." << " res=" << res << " read_rows=" << num_rows; @@ -265,12 +263,8 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur if (reader->eof()) { break; } - //if read row but fill tuple fails, - if (!reader->is_fill_tuple()) { - break; - } - if (!(res = rowset_writer->add_row(row))) { - LOG(WARNING) << "fail to attach row to rowset_writer. " + if (!(res = rowset_writer->add_block(&block))) { + LOG(WARNING) << "fail to attach block to rowset_writer. " << "res=" << res << ", tablet=" << cur_tablet->full_name() << ", read_rows=" << num_rows; break; @@ -802,9 +796,6 @@ Status LzoBinaryReader::_next_block() { Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_scan_range, const TDescriptorTable& t_desc_tbl) { - // init schema - _schema = schema; - // init runtime state, runtime profile, counter TUniqueId dummy_id; dummy_id.hi = 0; @@ -842,9 +833,9 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc BaseScanner* scanner = nullptr; switch (t_scan_range.ranges[0].format_type) { case TFileFormatType::FORMAT_PARQUET: - scanner = new ParquetScanner(_runtime_state.get(), _runtime_profile, t_scan_range.params, - t_scan_range.ranges, t_scan_range.broker_addresses, - _pre_filter_texprs, _counter.get()); + scanner = new vectorized::VParquetScanner( + _runtime_state.get(), _runtime_profile, t_scan_range.params, t_scan_range.ranges, + t_scan_range.broker_addresses, _pre_filter_texprs, _counter.get()); break; default: LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type; @@ -857,23 +848,6 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc return Status::Error(); } - // init tuple - auto tuple_id = t_scan_range.params.dest_tuple_id; - _tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id); - if (_tuple_desc == nullptr) { - std::stringstream ss; - LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id; - return Status::Error(); - } - - int tuple_buffer_size = _tuple_desc->byte_size(); - void* tuple_buffer = _tuple_buffer_pool->allocate(tuple_buffer_size); - if (tuple_buffer == nullptr) { - LOG(WARNING) << "Allocate memory for tuple failed"; - return Status::Error(); - } - _tuple = reinterpret_cast(tuple_buffer); - _ready = true; return Status::OK(); } @@ -944,40 +918,11 @@ Status PushBrokerReader::fill_field_row(RowCursorCell* dst, const char* src, boo return Status::OK(); } -Status PushBrokerReader::next(ContiguousRow* row) { - if (!_ready || row == nullptr) { +Status PushBrokerReader::next(vectorized::Block* block) { + if (!_ready || block == nullptr) { return Status::Error(); } - - memset(_tuple, 0, _tuple_desc->num_null_bytes()); - // Get from scanner - Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof, &_fill_tuple); - if (UNLIKELY(!status.ok())) { - LOG(WARNING) << "Scanner get next tuple failed"; - return Status::Error(); - } - if (_eof || !_fill_tuple) { - return Status::OK(); - } - - auto slot_descs = _tuple_desc->slots(); - // finalize row - for (size_t i = 0; i < slot_descs.size(); ++i) { - auto cell = row->cell(i); - const SlotDescriptor* slot = slot_descs[i]; - bool is_null = _tuple->is_null(slot->null_indicator_offset()); - const void* value = _tuple->get_slot(slot->tuple_offset()); - - FieldType type = _schema->column(i)->type(); - Status field_status = - fill_field_row(&cell, (const char*)value, is_null, _mem_pool.get(), type); - if (field_status != Status::OK()) { - LOG(WARNING) << "fill field row failed in spark load, slot index: " << i - << ", type: " << type; - return Status::Error(); - } - } - + _scanner->get_next(block, &_eof); return Status::OK(); } diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 1fe1d42823..2e1ddc237c 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -178,12 +178,12 @@ private: class PushBrokerReader { public: - PushBrokerReader() : _ready(false), _eof(false), _fill_tuple(false) {} + PushBrokerReader() : _ready(false), _eof(false) {} ~PushBrokerReader() = default; Status init(const Schema* schema, const TBrokerScanRange& t_scan_range, const TDescriptorTable& t_desc_tbl); - Status next(ContiguousRow* row); + Status next(vectorized::Block* block); void print_profile(); Status close() { @@ -191,7 +191,6 @@ public: return Status::OK(); } bool eof() const { return _eof; } - bool is_fill_tuple() const { return _fill_tuple; } MemPool* mem_pool() { return _mem_pool.get(); } private: @@ -199,10 +198,6 @@ private: FieldType type); bool _ready; bool _eof; - bool _fill_tuple; - TupleDescriptor* _tuple_desc; - Tuple* _tuple; - const Schema* _schema; std::unique_ptr _runtime_state; RuntimeProfile* _runtime_profile; std::unique_ptr _mem_pool;