diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 6f4a383748..f8590c7ae2 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -22,9 +22,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec") set(EXEC_FILES - arrow/arrow_reader.cpp - arrow/parquet_reader.cpp - base_scanner.cpp data_sink.cpp decompressor.cpp exec_node.cpp diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp deleted file mode 100644 index 8e0559ca78..0000000000 --- a/be/src/exec/arrow/arrow_reader.cpp +++ /dev/null @@ -1,262 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "exec/arrow/arrow_reader.h" - -#include - -#include -// IWYU pragma: no_include -#include // IWYU pragma: keep -#include -#include - -// IWYU pragma: no_include -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" -#include "io/fs/file_reader.h" -#include "runtime/descriptors.h" -#include "runtime/runtime_state.h" -#include "util/slice.h" -#include "util/string_util.h" -#include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/utils/arrow_column_to_doris_column.h" - -namespace doris { - -ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state, - const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, int32_t num_of_columns_from_file, - bool case_sensitive) - : _state(state), - _file_slot_descs(file_slot_descs), - _num_of_columns_from_file(num_of_columns_from_file), - _case_sensitive(case_sensitive) { - _arrow_file = std::shared_ptr(new ArrowFile(file_reader)); - _rb_reader = nullptr; - _total_groups = 0; - _current_group = 0; - _statistics = std::make_shared(); -} - -ArrowReaderWrap::~ArrowReaderWrap() { - close(); - _closed = true; - _queue_writer_cond.notify_one(); - if (_thread.joinable()) { - _thread.join(); - } -} - -void ArrowReaderWrap::close() { - arrow::Status st = _arrow_file->Close(); - if (!st.ok()) { - LOG(WARNING) << "close file error: " << st.ToString(); - } -} - -Status ArrowReaderWrap::column_indices() { - _include_column_ids.clear(); - _include_cols.clear(); - for (auto& slot_desc : _file_slot_descs) { - // Get the Column Reader for the boolean column - auto iter = _map_column.find(slot_desc->col_name()); - if (iter != _map_column.end()) { - _include_column_ids.emplace_back(iter->second); - _include_cols.push_back(slot_desc->col_name()); - } else { - _missing_cols.push_back(slot_desc->col_name()); - } - } - return Status::OK(); -} - -int ArrowReaderWrap::get_column_index(std::string column_name) { - std::string real_column_name = _case_sensitive ? column_name : to_lower(column_name); - auto iter = _map_column.find(real_column_name); - if (iter != _map_column.end()) { - return iter->second; - } else { - std::stringstream str_error; - str_error << "Invalid Column Name:" << real_column_name; - LOG(WARNING) << str_error.str(); - return -1; - } -} - -Status ArrowReaderWrap::get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) { - size_t rows = 0; - bool tmp_eof = false; - do { - if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - RETURN_IF_ERROR(next_batch(&_batch, &tmp_eof)); - // We need to make sure the eof is set to true iff block is empty. - if (tmp_eof) { - *eof = (rows == 0); - break; - } - } - - size_t num_elements = std::min((_state->batch_size() - block->rows()), - (_batch->num_rows() - _arrow_batch_cur_idx)); - for (auto i = 0; i < _file_slot_descs.size(); ++i) { - SlotDescriptor* slot_desc = _file_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - std::string real_column_name = - is_case_sensitive() ? slot_desc->col_name() : slot_desc->col_name_lower_case(); - auto* array = _batch->GetColumnByName(real_column_name).get(); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - RETURN_IF_ERROR(arrow_column_to_doris_column( - array, _arrow_batch_cur_idx, column_with_type_and_name.column, - column_with_type_and_name.type, num_elements, _state->timezone_obj())); - } - rows += num_elements; - _arrow_batch_cur_idx += num_elements; - } while (!tmp_eof && rows < _state->batch_size()); - *read_row = rows; - return Status::OK(); -} - -Status ArrowReaderWrap::next_batch(std::shared_ptr* batch, bool* eof) { - std::unique_lock lock(_mtx); - while (!_closed && _queue.empty()) { - if (_batch_eof) { - _include_column_ids.clear(); - _include_cols.clear(); - *eof = true; - return Status::OK(); - } - _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); - } - if (UNLIKELY(_closed)) { - return Status::InternalError(_status.message()); - } - *batch = _queue.front(); - _queue.pop_front(); - _queue_writer_cond.notify_one(); - _arrow_batch_cur_idx = 0; - return Status::OK(); -} - -void ArrowReaderWrap::prefetch_batch() { - auto insert_batch = [this](const auto& batch) { - std::unique_lock lock(_mtx); - while (!_closed && _queue.size() == _max_queue_size) { - _queue_writer_cond.wait_for(lock, std::chrono::seconds(1)); - } - if (UNLIKELY(_closed)) { - return; - } - _queue.push_back(batch); - _queue_reader_cond.notify_one(); - }; - int current_group = _current_group; - int total_groups = _total_groups; - while (true) { - if (_closed || current_group >= total_groups) { - _batch_eof = true; - _queue_reader_cond.notify_one(); - return; - } - if (filter_row_group(current_group)) { - current_group++; - continue; - } - - arrow::RecordBatchVector batches; - read_batches(batches, current_group); - if (!_status.ok()) { - _closed = true; - return; - } - std::for_each(batches.begin(), batches.end(), insert_batch); - current_group++; - } -} - -ArrowFile::ArrowFile(io::FileReaderSPtr file_reader) : _file_reader(file_reader) {} - -ArrowFile::~ArrowFile() { - arrow::Status st = Close(); - if (!st.ok()) { - LOG(WARNING) << "close file error: " << st.ToString(); - } -} - -arrow::Status ArrowFile::Close() { - return arrow::Status::OK(); -} - -bool ArrowFile::closed() const { - return _file_reader->closed(); -} - -arrow::Result ArrowFile::Read(int64_t nbytes, void* buffer) { - return ReadAt(_pos, nbytes, buffer); -} - -arrow::Result ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) { - int64_t bytes_read = 0; - _pos = position; - while (bytes_read < nbytes) { - size_t reads = 0; - Slice file_slice((uint8_t*)out, nbytes); - Status result = _file_reader->read_at(_pos, file_slice, &reads); - if (!result.ok()) { - return arrow::Status::IOError("Readat failed."); - } - if (reads == 0) { - break; - } - bytes_read += reads; // total read bytes - _pos += reads; - out = (char*)out + reads; - } - return bytes_read; -} - -arrow::Result ArrowFile::GetSize() { - return _file_reader->size(); -} - -arrow::Status ArrowFile::Seek(int64_t position) { - _pos = position; - // NOTE: Only readat operation is used, so _file seek is not called here. - return arrow::Status::OK(); -} - -arrow::Result ArrowFile::Tell() const { - return _pos; -} - -arrow::Result> ArrowFile::Read(int64_t nbytes) { - auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool()); - ARROW_RETURN_NOT_OK(buffer); - std::shared_ptr read_buf = std::move(buffer.ValueOrDie()); - auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data()); - ARROW_RETURN_NOT_OK(bytes_read); - // If bytes_read is equal with read_buf's capacity, we just assign - if (bytes_read.ValueOrDie() == nbytes) { - return read_buf; - } else { - return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie()); - } -} - -} // namespace doris diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h deleted file mode 100644 index ad4c40d26b..0000000000 --- a/be/src/exec/arrow/arrow_reader.h +++ /dev/null @@ -1,144 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common/config.h" -#include "common/status.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "vec/exec/format/generic_reader.h" - -namespace arrow { -class RecordBatch; -class RecordBatchReader; -} // namespace arrow - -namespace doris { - -class RuntimeState; -class SlotDescriptor; -class TupleDescriptor; - -namespace vectorized { -class Block; -} // namespace vectorized - -struct Statistics { - int32_t filtered_row_groups = 0; - int32_t total_groups = 0; - int64_t filtered_rows = 0; - int64_t total_rows = 0; - int64_t filtered_total_bytes = 0; - int64_t total_bytes = 0; -}; - -class ArrowFile : public arrow::io::RandomAccessFile { -public: - ArrowFile(io::FileReaderSPtr file_reader); - virtual ~ArrowFile(); - arrow::Result Read(int64_t nbytes, void* buffer) override; - arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override; - arrow::Result GetSize() override; - arrow::Status Seek(int64_t position) override; - arrow::Result> Read(int64_t nbytes) override; - arrow::Result Tell() const override; - arrow::Status Close() override; - bool closed() const override; - -private: - io::FileReaderSPtr _file_reader; - size_t _pos = 0; -}; - -// base of arrow reader -class ArrowReaderWrap : public vectorized::GenericReader { -public: - ArrowReaderWrap(RuntimeState* state, const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, int32_t num_of_columns_from_file, - bool caseSensitive); - virtual ~ArrowReaderWrap(); - - virtual Status init_reader(const TupleDescriptor* tuple_desc, const std::string& timezone) = 0; - - // for vec - Status get_next_block(vectorized::Block* block, size_t* read_row, bool* eof) override; - // This method should be deprecated once the old scanner is removed. - // And user should use "get_next_block" instead. - Status next_batch(std::shared_ptr* batch, bool* eof); - - std::shared_ptr& statistics() { return _statistics; } - void close(); - virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } - int get_column_index(std::string column_name); - - void prefetch_batch(); - bool is_case_sensitive() { return _case_sensitive; } - -protected: - virtual Status column_indices(); - virtual void read_batches(arrow::RecordBatchVector& batches, int current_group) = 0; - virtual bool filter_row_group(int current_group) = 0; - -protected: - RuntimeState* _state; - std::vector _file_slot_descs; - - const int32_t _num_of_columns_from_file; - std::shared_ptr _arrow_file; - std::shared_ptr<::arrow::RecordBatchReader> _rb_reader; - int _total_groups; // num of groups(stripes) of a parquet(orc) file - int _current_group; // current group(stripe) - std::map _map_column; // column-name <---> column-index - std::vector _include_column_ids; // columns that need to get from file - std::vector _include_cols; // columns that need to get from file - std::shared_ptr _statistics; - - std::atomic _closed = false; - std::atomic _batch_eof = false; - arrow::Status _status; - std::mutex _mtx; - std::condition_variable _queue_reader_cond; - std::condition_variable _queue_writer_cond; - std::list> _queue; - const size_t _max_queue_size = config::parquet_reader_max_buffer_size; - std::thread _thread; - bool _case_sensitive; - - // The following fields are only valid when using "get_block()" interface. - std::shared_ptr _batch; - size_t _arrow_batch_cur_idx = 0; - // Save col names which need to be read but does not exist in file - std::vector _missing_cols; -}; - -} // namespace doris diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp deleted file mode 100644 index 4b924c0275..0000000000 --- a/be/src/exec/arrow/parquet_reader.cpp +++ /dev/null @@ -1,211 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "exec/arrow/parquet_reader.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -// IWYU pragma: no_include -#include // IWYU pragma: keep -#include -#include -#include -#include -#include -#include - -// IWYU pragma: no_include -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" -#include "common/status.h" -#include "util/string_util.h" - -namespace doris { -class TupleDescriptor; - -// Broker -ParquetReaderWrap::ParquetReaderWrap(RuntimeState* state, - const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size, bool case_sensitive) - : ArrowReaderWrap(state, file_slot_descs, file_reader, num_of_columns_from_file, - case_sensitive), - _rows_of_group(0), - _current_line_of_group(0), - _current_line_of_batch(0) {} - -Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, - const std::string& timezone) { - try { - parquet::ArrowReaderProperties arrow_reader_properties = - parquet::default_arrow_reader_properties(); - arrow_reader_properties.set_pre_buffer(true); - arrow_reader_properties.set_use_threads(true); - // Open Parquet file reader - auto reader_builder = parquet::arrow::FileReaderBuilder(); - reader_builder.properties(arrow_reader_properties); - - auto st = reader_builder.Open(_arrow_file); - - if (!st.ok()) { - LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString(); - return Status::InternalError("Failed to create file reader"); - } - - st = reader_builder.Build(&_reader); - - if (!st.ok()) { - LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString(); - return Status::InternalError("Failed to create file reader"); - } - - _file_metadata = _reader->parquet_reader()->metadata(); - // initial members - _total_groups = _file_metadata->num_row_groups(); - if (_total_groups == 0) { - return Status::EndOfFile("Empty Parquet File"); - } - _rows_of_group = _file_metadata->RowGroup(0)->num_rows(); - - // map - auto* schemaDescriptor = _file_metadata->schema(); - for (int i = 0; i < _file_metadata->num_columns(); ++i) { - std::string schemaName; - // Get the Column Reader for the boolean column - if (schemaDescriptor->Column(i)->max_definition_level() > 1) { - schemaName = schemaDescriptor->Column(i)->path()->ToDotVector()[0]; - } else { - schemaName = schemaDescriptor->Column(i)->name(); - } - _map_column.emplace(_case_sensitive ? schemaName : to_lower(schemaName), i); - } - - _timezone = timezone; - - RETURN_IF_ERROR(column_indices()); - _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this); - return Status::OK(); - } catch (parquet::ParquetException& e) { - std::stringstream str_error; - str_error << "Init parquet reader fail. " << e.what(); - LOG(WARNING) << str_error.str(); - return Status::InternalError(str_error.str()); - } -} - -Status ParquetReaderWrap::size(int64_t* size) { - arrow::Result result = _arrow_file->GetSize(); - if (result.ok()) { - *size = result.ValueOrDie(); - return Status::OK(); - } else { - return Status::InternalError(result.status().ToString()); - } -} - -Status ParquetReaderWrap::read_record_batch(bool* eof) { - if (_current_line_of_group >= _rows_of_group) { // read next row group - VLOG_DEBUG << "read_record_batch, current group id:" << _current_group - << " current line of group:" << _current_line_of_group - << " is larger than rows group size:" << _rows_of_group - << ". start to read next row group"; - _current_group++; - if (_current_group >= _total_groups) { // read completed. - _include_column_ids.clear(); - *eof = true; - return Status::OK(); - } - _current_line_of_group = 0; - _rows_of_group = _file_metadata->RowGroup(_current_group) - ->num_rows(); //get rows of the current row group - // read batch - RETURN_IF_ERROR(read_next_batch()); - _current_line_of_batch = 0; - } else if (_current_line_of_batch >= _batch->num_rows()) { - VLOG_DEBUG << "read_record_batch, current group id:" << _current_group - << " current line of batch:" << _current_line_of_batch - << " is larger than batch size:" << _batch->num_rows() - << ". start to read next batch"; - // read batch - RETURN_IF_ERROR(read_next_batch()); - _current_line_of_batch = 0; - } - return Status::OK(); -} - -Status ParquetReaderWrap::init_parquet_type() { - // read batch - RETURN_IF_ERROR(read_next_batch()); - _current_line_of_batch = 0; - if (_batch == nullptr) { - return Status::OK(); - } - //save column type - std::shared_ptr field_schema = _batch->schema(); - for (int i = 0; i < _include_column_ids.size(); i++) { - std::shared_ptr field = field_schema->field(i); - if (!field) { - LOG(WARNING) << "Get field schema failed. Column order:" << i; - return Status::InternalError(_status.ToString()); - } - _parquet_column_type.emplace_back(field->type()->id()); - } - return Status::OK(); -} - -Status ParquetReaderWrap::read_next_batch() { - std::unique_lock lock(_mtx); - while (!_closed && _queue.empty()) { - if (_batch_eof) { - return Status::OK(); - } - _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); - } - - if (UNLIKELY(_closed)) { - return Status::InternalError(_status.message()); - } - - _batch = _queue.front(); - _queue.pop_front(); - _queue_writer_cond.notify_one(); - return Status::OK(); -} - -void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) { - _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader); - if (!_status.ok()) { - _closed = true; - return; - } - _status = _rb_reader->ReadAll(&batches); -} - -bool ParquetReaderWrap::filter_row_group(int current_group) { - return false; -} - -} // namespace doris diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h deleted file mode 100644 index ac60a25eae..0000000000 --- a/be/src/exec/arrow/parquet_reader.h +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include -#include -#include - -#include "common/status.h" -#include "exec/arrow/arrow_reader.h" -#include "io/fs/file_reader_writer_fwd.h" - -namespace arrow { -class RecordBatch; -} // namespace arrow -namespace parquet { -class FileMetaData; -} // namespace parquet - -namespace doris { - -class RuntimeState; -class SlotDescriptor; -class TupleDescriptor; - -// Reader of parquet file -class ParquetReaderWrap final : public ArrowReaderWrap { -public: - // batch_size is not use here - ParquetReaderWrap(RuntimeState* state, const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size, bool case_sensitive = true); - ~ParquetReaderWrap() override = default; - - Status size(int64_t* size) override; - Status init_reader(const TupleDescriptor* tuple_desc, const std::string& timezone) override; - Status init_parquet_type(); - -private: - Status read_record_batch(bool* eof); - -private: - Status read_next_batch(); - void read_batches(arrow::RecordBatchVector& batches, int current_group) override; - bool filter_row_group(int current_group) override; - -private: - // parquet file reader object - std::shared_ptr _batch; - std::unique_ptr _reader; - std::shared_ptr _file_metadata; - std::vector _parquet_column_type; - - int _rows_of_group; // rows in a group. - int _current_line_of_group; - int _current_line_of_batch; - std::string _timezone; -}; - -} // namespace doris diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp deleted file mode 100644 index ffe6bc1815..0000000000 --- a/be/src/exec/base_scanner.cpp +++ /dev/null @@ -1,429 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "base_scanner.h" - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -// IWYU pragma: no_include -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/consts.h" -#include "gutil/casts.h" -#include "runtime/define_primitive_type.h" -#include "runtime/descriptors.h" -#include "runtime/runtime_state.h" -#include "runtime/types.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_vector.h" -#include "vec/columns/columns_number.h" -#include "vec/common/string_ref.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_factory.hpp" -#include "vec/data_types/data_type_number.h" -#include "vec/exprs/vexpr_context.h" - -namespace doris { -class TColumn; -class TNetworkAddress; - -BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) - : _state(state), - _params(params), - _ranges(ranges), - _broker_addresses(broker_addresses), - _next_range(0), - _counter(counter), - _dest_tuple_desc(nullptr), - _pre_filter_texprs(pre_filter_texprs), - _strict_mode(false), - _line_counter(0), - _profile(profile), - _rows_read_counter(nullptr), - _read_timer(nullptr), - _materialize_timer(nullptr), - _success(false), - _scanner_eof(false) {} - -Status BaseScanner::open() { - _full_base_schema_view = vectorized::schema_util::FullBaseSchemaView::create_unique(); - RETURN_IF_ERROR(init_expr_ctxes()); - if (_params.__isset.strict_mode) { - _strict_mode = _params.strict_mode; - } - if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) { - return Status::InternalError("Slot map of dest to src must be set in strict mode"); - } - _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); - _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); - _materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)"); - - DCHECK(!_ranges.empty()); - const auto& range = _ranges[0]; - _num_of_columns_from_file = range.__isset.num_of_columns_from_file - ? implicit_cast(range.num_of_columns_from_file) - : implicit_cast(_src_slot_descs.size()); - - // check consistency - if (range.__isset.num_of_columns_from_file) { - int size = range.columns_from_path.size(); - for (const auto& r : _ranges) { - if (r.columns_from_path.size() != size) { - return Status::InternalError("ranges have different number of columns."); - } - } - } - return Status::OK(); -} - -Status BaseScanner::init_expr_ctxes() { - // Construct _src_slot_descs - const TupleDescriptor* src_tuple_desc = - _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); - if (src_tuple_desc == nullptr) { - return Status::InternalError("Unknown source tuple descriptor, tuple_id={}", - _params.src_tuple_id); - } - - std::map src_slot_desc_map; - std::unordered_map src_slot_desc_to_index {}; - for (int i = 0, len = src_tuple_desc->slots().size(); i < len; ++i) { - auto* slot_desc = src_tuple_desc->slots()[i]; - src_slot_desc_to_index.emplace(slot_desc, i); - src_slot_desc_map.emplace(slot_desc->id(), slot_desc); - } - for (auto slot_id : _params.src_slot_ids) { - auto it = src_slot_desc_map.find(slot_id); - if (it == std::end(src_slot_desc_map)) { - return Status::InternalError("Unknown source slot descriptor, slot_id={}", slot_id); - } - _src_slot_descs.emplace_back(it->second); - - if (it->second->type().is_variant_type() && - it->second->col_name() == BeConsts::DYNAMIC_COLUMN_NAME) { - _is_dynamic_schema = true; - } - } - _row_desc.reset(new RowDescriptor(_state->desc_tbl(), - std::vector({_params.src_tuple_id}), - std::vector({false}))); - - // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor - if (!_pre_filter_texprs.empty()) { - // for vectorized, preceding filter exprs should be compounded to one passed from fe. - DCHECK(_pre_filter_texprs.size() == 1); - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _pre_filter_texprs[0], &_vpre_filter_ctx_ptr)); - RETURN_IF_ERROR(_vpre_filter_ctx_ptr->prepare(_state, *_row_desc)); - RETURN_IF_ERROR(_vpre_filter_ctx_ptr->open(_state)); - } - - // Construct dest slots information - _dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id); - if (_dest_tuple_desc == nullptr) { - return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", - _params.dest_tuple_id); - } - - bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; - for (auto slot_desc : _dest_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - auto it = _params.expr_of_dest_slot.find(slot_desc->id()); - if (it == std::end(_params.expr_of_dest_slot)) { - return Status::InternalError("No expr for dest slot, id={}, name={}", slot_desc->id(), - slot_desc->col_name()); - } - - vectorized::VExprContext* ctx = nullptr; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); - RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get())); - RETURN_IF_ERROR(ctx->open(_state)); - _dest_vexpr_ctx.emplace_back(ctx); - if (has_slot_id_map) { - auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { - _src_slot_descs_order_by_dest.emplace_back(nullptr); - } else { - auto _src_slot_it = src_slot_desc_map.find(it1->second); - if (_src_slot_it == std::end(src_slot_desc_map)) { - return Status::InternalError("No src slot {} in src slot descs", it1->second); - } - _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(), - src_slot_desc_to_index[_src_slot_it->second]); - _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); - } - } - } - if (_dest_tuple_desc->table_desc()) { - _full_base_schema_view->db_name = _dest_tuple_desc->table_desc()->database(); - _full_base_schema_view->table_name = _dest_tuple_desc->table_desc()->name(); - _full_base_schema_view->table_id = _dest_tuple_desc->table_desc()->table_id(); - } - return Status::OK(); -} - -// need exception safety -Status BaseScanner::_filter_src_block() { - auto origin_column_num = _src_block.columns(); - // filter block - auto old_rows = _src_block.rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block, - origin_column_num)); - _counter->num_rows_unselected += old_rows - _src_block.rows(); - return Status::OK(); -} - -Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { - // Do vectorized expr here - int ctx_idx = 0; - size_t rows = _src_block.rows(); - auto filter_column = vectorized::ColumnUInt8::create(rows, 1); - auto& filter_map = filter_column->get_data(); - auto origin_column_num = _src_block.columns(); - - for (auto slot_desc : _dest_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - if (slot_desc->type().is_variant_type()) { - continue; - } - int dest_index = ctx_idx++; - - auto* ctx = _dest_vexpr_ctx[dest_index]; - int result_column_id = -1; - // PT1 => dest primitive type - RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); - bool is_origin_column = result_column_id < origin_column_num; - auto column_ptr = - is_origin_column && _src_block_mem_reuse - ? _src_block.get_by_position(result_column_id).column->clone_resized(rows) - : _src_block.get_by_position(result_column_id).column; - - DCHECK(column_ptr != nullptr); - - // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr - // is likely to be nullable - if (LIKELY(column_ptr->is_nullable())) { - auto nullable_column = - reinterpret_cast(column_ptr.get()); - for (int i = 0; i < rows; ++i) { - if (filter_map[i] && nullable_column->is_null_at(i)) { - if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && - !_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index]) - .column->is_null_at(i)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _src_block.dump_one_line(i, _num_of_columns_from_file); - }, - [&]() -> std::string { - auto raw_value = - _src_block.get_by_position(ctx_idx).column->get_data_at( - i); - std::string raw_string = raw_value.to_string(); - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, - "column({}) value is incorrect while strict " - "mode is {}, " - "src value is {}", - slot_desc->col_name(), _strict_mode, raw_string); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; - } else if (!slot_desc->is_nullable()) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _src_block.dump_one_line(i, _num_of_columns_from_file); - }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, - "column({}) values is null while columns is not " - "nullable", - slot_desc->col_name()); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; - } - } - } - if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr(); - } else if (slot_desc->is_nullable()) { - column_ptr = vectorized::make_nullable(column_ptr); - } - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } - - // handle dynamic generated columns - if (!_full_base_schema_view->empty()) { - assert(_is_dynamic_schema); - for (size_t x = dest_block->columns(); x < _src_block.columns(); ++x) { - auto& column_type_name = _src_block.get_by_position(x); - const TColumn& tcolumn = - _full_base_schema_view->column_name_to_column[column_type_name.name]; - auto original_type = vectorized::DataTypeFactory::instance().create_data_type(tcolumn); - // type conflict free path, always cast to original type - if (!column_type_name.type->equals(*original_type)) { - vectorized::ColumnPtr column_ptr; - RETURN_IF_ERROR(vectorized::schema_util::cast_column(column_type_name, - original_type, &column_ptr)); - column_type_name.column = column_ptr; - column_type_name.type = original_type; - } - dest_block->insert(vectorized::ColumnWithTypeAndName(std::move(column_type_name.column), - std::move(column_type_name.type), - column_type_name.name)); - } - } - - // after do the dest block insert operation, clear _src_block to remove the reference of origin column - if (_src_block_mem_reuse) { - _src_block.clear_column_data(origin_column_num); - } else { - _src_block.clear(); - } - - size_t dest_size = dest_block->columns(); - // do filter - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(filter_column), std::make_shared(), - "filter column")); - RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size)); - _counter->num_rows_filtered += rows - dest_block->rows(); - - return Status::OK(); -} - -// TODO: opt the reuse of src_block or dest_block column. some case we have to -// shallow copy the column of src_block to dest block -Status BaseScanner::_init_src_block() { - if (_src_block.is_empty_column()) { - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto data_type = slot_desc->get_data_type_ptr(); - auto column_ptr = data_type->create_column(); - column_ptr->reserve(_state->batch_size()); - _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr), data_type, - slot_desc->col_name())); - } - } - - return Status::OK(); -} - -// need exception safety -Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) { - *eof = _scanner_eof; - _fill_columns_from_path(); - if (LIKELY(_src_block.rows() > 0)) { - RETURN_IF_ERROR(BaseScanner::_filter_src_block()); - RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block)); - } - - return Status::OK(); -} - -void BaseScanner::close() { - if (_vpre_filter_ctx_ptr) { - _vpre_filter_ctx_ptr->close(_state); - } -} - -void BaseScanner::_fill_columns_from_path() { - const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.num_of_columns_from_file) { - size_t start = range.num_of_columns_from_file; - size_t rows = _src_block.rows(); - - for (size_t i = 0; i < range.columns_from_path.size(); ++i) { - auto slot_desc = _src_slot_descs.at(i + start); - if (slot_desc == nullptr) continue; - auto is_nullable = slot_desc->is_nullable(); - auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, - is_nullable); - auto data_column = data_type->create_column(); - const std::string& column_from_path = range.columns_from_path[i]; - for (size_t j = 0; j < rows; ++j) { - data_column->insert_data(const_cast(column_from_path.c_str()), - column_from_path.size()); - } - _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type, - slot_desc->col_name())); - } - } -} - -bool BaseScanner::is_null(const Slice& slice) { - return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N'; -} - -bool BaseScanner::is_array(const Slice& slice) { - return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 1] == ']'; -} - -bool BaseScanner::check_array_format(std::vector& split_values) { - // if not the array format, filter this line and return error url - auto dest_slot_descs = _dest_tuple_desc->slots(); - for (int j = 0; j < split_values.size() && j < dest_slot_descs.size(); ++j) { - auto dest_slot_desc = dest_slot_descs[j]; - if (!dest_slot_desc->is_materialized()) { - continue; - } - const Slice& value = split_values[j]; - if (dest_slot_desc->type().is_array_type() && !is_null(value) && !is_array(value)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { return std::string(value.data, value.size); }, - [&]() -> std::string { - fmt::memory_buffer err_msg; - fmt::format_to(err_msg, "Invalid format for array column({})", - dest_slot_desc->col_name()); - return fmt::to_string(err_msg); - }, - &_scanner_eof)); - _counter->num_rows_filtered++; - return false; - } - } - return true; -} - -} // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h deleted file mode 100644 index 0aad811667..0000000000 --- a/be/src/exec/base_scanner.h +++ /dev/null @@ -1,152 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include -#include - -#include "common/global_types.h" -#include "common/status.h" -#include "runtime/descriptors.h" -#include "util/runtime_profile.h" -#include "util/slice.h" -#include "vec/columns/column.h" -#include "vec/common/schema_util.h" -#include "vec/core/block.h" -#include "vec/exprs/vexpr.h" - -namespace doris { - -class RuntimeState; -class TBrokerRangeDesc; -class TBrokerScanRangeParams; -class TNetworkAddress; - -namespace vectorized { -class VExprContext; - -using MutableColumnPtr = IColumn::MutablePtr; -} // namespace vectorized - -// The counter will be passed to each scanner. -// Note that this struct is not thread safe. -// So if we support concurrent scan in the future, we need to modify this struct. -struct ScannerCounter { - ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {} - - int64_t num_rows_filtered; // unqualified rows (unmatched the dest schema, or no partition) - int64_t num_rows_unselected; // rows filtered by predicates -}; - -class BaseScanner { -public: - BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); - - virtual ~BaseScanner() { vectorized::VExpr::close(_dest_vexpr_ctx, _state); } - - virtual Status init_expr_ctxes(); - // Open this scanner, will initialize information need to - virtual Status open(); - - // Get next block - virtual Status get_next(vectorized::Block* block, bool* eof) { - return Status::NotSupported("Not Implemented get block"); - } - - // Close this scanner - virtual void close() = 0; - - bool is_dynamic_schema() const { return _is_dynamic_schema; } - -protected: - Status _fill_dest_block(vectorized::Block* dest_block, bool* eof); - virtual Status _init_src_block(); - - bool is_null(const Slice& slice); - bool is_array(const Slice& slice); - bool check_array_format(std::vector& split_values); - - RuntimeState* _state; - const TBrokerScanRangeParams& _params; - - //const TBrokerScanRangeParams& _params; - const std::vector& _ranges; - const std::vector& _broker_addresses; - int _next_range; - // used for process stat - ScannerCounter* _counter; - - // Used for constructing tuple - // slots for value read from broker file - std::vector _src_slot_descs; - std::unique_ptr _row_desc; - - // Dest tuple descriptor and dest expr context - const TupleDescriptor* _dest_tuple_desc; - // the map values of dest slot id to src slot desc - // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr - std::vector _src_slot_descs_order_by_dest; - - // dest slot desc index to src slot desc index - std::unordered_map _dest_slot_to_src_slot_index; - - // to filter src tuple directly - // the `_pre_filter_texprs` is the origin thrift exprs passed from scan node. - const std::vector _pre_filter_texprs; - - bool _strict_mode; - - int32_t _line_counter; - // Profile - RuntimeProfile* _profile; - RuntimeProfile::Counter* _rows_read_counter; - RuntimeProfile::Counter* _read_timer; - RuntimeProfile::Counter* _materialize_timer; - - // Used to record whether a row of data is successfully read. - bool _success = false; - bool _scanner_eof = false; - - // for vectorized load - std::vector _dest_vexpr_ctx; - vectorized::VExprContext* _vpre_filter_ctx_ptr = nullptr; - vectorized::Block _src_block; - bool _src_block_mem_reuse = false; - int _num_of_columns_from_file; - - // slot_ids for parquet predicate push down are in tuple desc - TupleId _tupleId = -1; - - bool _is_dynamic_schema = false; - // for tracing dynamic schema - std::unique_ptr _full_base_schema_view; - -private: - Status _filter_src_block(); - void _fill_columns_from_path(); - Status _materialize_dest_block(vectorized::Block* output_block); -}; - -} /* namespace doris */ diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 482ffe578d..0379e33616 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -54,7 +54,10 @@ #include "util/runtime_profile.h" #include "util/time.h" #include "vec/core/block.h" -#include "vec/exec/vparquet_scanner.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/functions/simple_function_factory.h" namespace doris { using namespace ErrorCode; @@ -239,13 +242,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur } // For push load, this tablet maybe not need push data, so that the path maybe empty if (!path.empty()) { - std::unique_ptr reader(new (std::nothrow) PushBrokerReader()); - if (reader == nullptr) { - LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name(); - res = Status::Error(); - break; - } - // init schema std::unique_ptr schema(new (std::nothrow) Schema(tablet_schema)); if (schema == nullptr) { @@ -255,8 +251,10 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur } // init Reader - if (!(res = reader->init(schema.get(), _request.broker_scan_range, - _request.desc_tbl))) { + std::unique_ptr reader = PushBrokerReader::create_unique( + schema.get(), _request.broker_scan_range, _request.desc_tbl); + res = reader->init(); + if (reader == nullptr || !res.ok()) { LOG(WARNING) << "fail to init reader. res=" << res << ", tablet=" << cur_tablet->full_name(); res = Status::Error(); @@ -312,8 +310,46 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur return res; } -Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_scan_range, - const TDescriptorTable& t_desc_tbl) { +PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl) + : _ready(false), + _eof(false), + _next_range(0), + _t_desc_tbl(t_desc_tbl), + _cur_reader_eof(false), + _params(t_scan_range.params), + _ranges(t_scan_range.ranges) { + // change broker params to file params + if (0 == _ranges.size()) { + return; + } + _file_params.file_type = _ranges[0].file_type; + _file_params.format_type = _ranges[0].format_type; + _file_params.src_tuple_id = _params.src_tuple_id; + _file_params.dest_tuple_id = _params.dest_tuple_id; + _file_params.num_of_columns_from_file = _ranges[0].num_of_columns_from_file; + _file_params.properties = _params.properties; + _file_params.expr_of_dest_slot = _params.expr_of_dest_slot; + _file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans; + _file_params.strict_mode = _params.strict_mode; + _file_params.__isset.broker_addresses = true; + _file_params.broker_addresses = t_scan_range.broker_addresses; + + for (int i = 0; i < _ranges.size(); ++i) { + TFileRangeDesc file_range; + file_range.load_id = _ranges[i].load_id; + file_range.path = _ranges[i].path; + file_range.start_offset = _ranges[i].start_offset; + file_range.__isset.size = true; + file_range.size = _ranges[i].size; + file_range.__isset.file_size = true; + file_range.file_size = _ranges[i].file_size; + file_range.columns_from_path = _ranges[i].columns_from_path; + _file_ranges.push_back(file_range); + } +} + +Status PushBrokerReader::init() { // init runtime state, runtime profile, counter TUniqueId dummy_id; dummy_id.hi = 0; @@ -329,7 +365,7 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc _runtime_state = RuntimeState::create_unique(params, query_options, query_globals, ExecEnv::GetInstance()); DescriptorTbl* desc_tbl = nullptr; - Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl); + Status status = DescriptorTbl::create(_runtime_state->obj_pool(), _t_desc_tbl, &desc_tbl); if (UNLIKELY(!status.ok())) { LOG(WARNING) << "Failed to create descriptor table, msg: " << status; return Status::Error(); @@ -343,27 +379,18 @@ Status PushBrokerReader::init(const Schema* schema, const TBrokerScanRange& t_sc _runtime_profile = _runtime_state->runtime_profile(); _runtime_profile->set_name("PushBrokerReader"); - _counter.reset(new ScannerCounter()); + _file_cache_statistics.reset(new io::FileCacheStatistics()); + _io_ctx.reset(new io::IOContext()); + _io_ctx->file_cache_stats = _file_cache_statistics.get(); + _io_ctx->query_id = &_runtime_state->query_id(); - // init scanner - BaseScanner* scanner = nullptr; - switch (t_scan_range.ranges[0].format_type) { - case TFileFormatType::FORMAT_PARQUET: - scanner = new vectorized::VParquetScanner( - _runtime_state.get(), _runtime_profile, t_scan_range.params, t_scan_range.ranges, - t_scan_range.broker_addresses, _pre_filter_texprs, _counter.get()); - break; - default: - LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type; - return Status::Error(); - } - _scanner.reset(scanner); - status = _scanner->open(); - if (UNLIKELY(!status.ok())) { - LOG(WARNING) << "Failed to open scanner, msg: " << status; - return Status::Error(); + auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); + for (int i = 0; i < slot_descs.size(); i++) { + _all_col_names.push_back(slot_descs[i]->col_name()); } + RETURN_IF_ERROR(_init_expr_ctxes()); + _ready = true; return Status::OK(); } @@ -372,7 +399,151 @@ Status PushBrokerReader::next(vectorized::Block* block) { if (!_ready || block == nullptr) { return Status::Error(); } - _scanner->get_next(block, &_eof); + if (_cur_reader == nullptr || _cur_reader_eof) { + RETURN_IF_ERROR(_get_next_reader()); + if (_eof) { + return Status::OK(); + } + } + RETURN_IF_ERROR(_init_src_block()); + size_t read_rows = 0; + RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); + if (read_rows > 0) { + RETURN_IF_ERROR(_cast_to_input_block()); + RETURN_IF_ERROR(_convert_to_output_block(block)); + } + return Status::OK(); +} + +Status PushBrokerReader::close() { + _ready = false; + for (auto ctx : _dest_vexpr_ctx) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + + if (_push_down_expr) { + _push_down_expr->close(_runtime_state.get()); + } + + for (auto& [k, v] : _slot_id_to_filter_conjuncts) { + for (auto& ctx : v) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + } + + for (auto* ctx : _not_single_slot_filter_conjuncts) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + return Status::OK(); +} + +Status PushBrokerReader::_init_src_block() { + _src_block.clear(); + int idx = 0; + for (auto& slot : _src_slot_descs) { + vectorized::DataTypePtr data_type; + auto it = _name_to_col_type.find(slot->col_name()); + if (it == _name_to_col_type.end() || _is_dynamic_schema) { + // not exist in file, using type from _input_tuple_desc + data_type = vectorized::DataTypeFactory::instance().create_data_type( + slot->type(), slot->is_nullable()); + } else { + data_type = vectorized::DataTypeFactory::instance().create_data_type(it->second, true); + } + if (data_type == nullptr) { + return Status::NotSupported("Not support data type {} for column {}", + it == _name_to_col_type.end() ? slot->type().debug_string() + : it->second.debug_string(), + slot->col_name()); + } + vectorized::MutableColumnPtr data_column = data_type->create_column(); + _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type, + slot->col_name())); + _src_block_name_to_idx.emplace(slot->col_name(), idx++); + } + _src_block_ptr = &_src_block; + return Status::OK(); +} + +Status PushBrokerReader::_cast_to_input_block() { + if (_is_dynamic_schema) { + return Status::OK(); + } + size_t idx = 0; + for (auto& slot_desc : _src_slot_descs) { + if (_name_to_col_type.find(slot_desc->col_name()) == _name_to_col_type.end()) { + continue; + } + if (slot_desc->type().is_variant_type()) { + continue; + } + auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + vectorized::ColumnsWithTypeAndName arguments { + arg, + {vectorized::DataTypeString().create_column_const( + arg.column->size(), remove_nullable(return_type)->get_family_name()), + std::make_shared(), ""}}; + auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function( + "CAST", arguments, return_type); + idx = _src_block_name_to_idx[slot_desc->col_name()]; + RETURN_IF_ERROR( + func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); + _src_block_ptr->get_by_position(idx).type = std::move(return_type); + } + return Status::OK(); +} + +Status PushBrokerReader::_convert_to_output_block(vectorized::Block* block) { + block->clear(); + + int ctx_idx = 0; + size_t rows = _src_block.rows(); + auto filter_column = vectorized::ColumnUInt8::create(rows, 1); + + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + vectorized::ColumnPtr column_ptr; + + auto* ctx = _dest_vexpr_ctx[dest_index]; + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); + column_ptr = _src_block.get_by_position(result_column_id).column; + // column_ptr maybe a ColumnConst, convert it to a normal column + column_ptr = column_ptr->convert_to_full_column_if_const(); + DCHECK(column_ptr != nullptr); + + // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr + // is likely to be nullable + if (LIKELY(column_ptr->is_nullable())) { + if (!slot_desc->is_nullable()) { + column_ptr = remove_nullable(column_ptr); + } + } else if (slot_desc->is_nullable()) { + column_ptr = make_nullable(column_ptr); + } + block->insert(dest_index, + vectorized::ColumnWithTypeAndName(column_ptr, slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + _src_block.clear(); + + size_t dest_size = block->columns(); + block->insert(vectorized::ColumnWithTypeAndName(std::move(filter_column), + std::make_shared(), + "filter column")); + RETURN_IF_ERROR(vectorized::Block::filter_block(block, dest_size, dest_size)); return Status::OK(); } @@ -382,6 +553,131 @@ void PushBrokerReader::print_profile() { LOG(INFO) << ss.str(); } +Status PushBrokerReader::_init_expr_ctxes() { + // Construct _src_slot_descs + const TupleDescriptor* src_tuple_desc = + _runtime_state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); + if (src_tuple_desc == nullptr) { + return Status::InternalError("Unknown source tuple descriptor, tuple_id={}", + _params.src_tuple_id); + } + + std::map src_slot_desc_map; + std::unordered_map src_slot_desc_to_index {}; + for (int i = 0, len = src_tuple_desc->slots().size(); i < len; ++i) { + auto* slot_desc = src_tuple_desc->slots()[i]; + src_slot_desc_to_index.emplace(slot_desc, i); + src_slot_desc_map.emplace(slot_desc->id(), slot_desc); + } + for (auto slot_id : _params.src_slot_ids) { + auto it = src_slot_desc_map.find(slot_id); + if (it == std::end(src_slot_desc_map)) { + return Status::InternalError("Unknown source slot descriptor, slot_id={}", slot_id); + } + _src_slot_descs.emplace_back(it->second); + + if (it->second->type().is_variant_type() && + it->second->col_name() == BeConsts::DYNAMIC_COLUMN_NAME) { + _is_dynamic_schema = true; + } + } + _row_desc.reset(new RowDescriptor(_runtime_state->desc_tbl(), + std::vector({_params.src_tuple_id}), + std::vector({false}))); + + if (!_pre_filter_texprs.empty()) { + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _runtime_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_runtime_state.get(), *_row_desc)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_runtime_state.get())); + } + + _dest_tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id); + if (_dest_tuple_desc == nullptr) { + return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", + _params.dest_tuple_id); + } + bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + auto it = _params.expr_of_dest_slot.find(slot_desc->id()); + if (it == std::end(_params.expr_of_dest_slot)) { + return Status::InternalError("No expr for dest slot, id={}, name={}", slot_desc->id(), + slot_desc->col_name()); + } + + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_runtime_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_runtime_state.get(), *_row_desc.get())); + RETURN_IF_ERROR(ctx->open(_runtime_state.get())); + _dest_vexpr_ctx.emplace_back(ctx); + if (has_slot_id_map) { + auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { + _src_slot_descs_order_by_dest.emplace_back(nullptr); + } else { + auto _src_slot_it = src_slot_desc_map.find(it1->second); + if (_src_slot_it == std::end(src_slot_desc_map)) { + return Status::InternalError("No src slot {} in src slot descs", it1->second); + } + _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(), + src_slot_desc_to_index[_src_slot_it->second]); + _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); + } + } + } + return Status::OK(); +} + +Status PushBrokerReader::_get_next_reader() { + _cur_reader.reset(nullptr); + if (_next_range >= _file_ranges.size()) { + _eof = true; + return Status::OK(); + } + const TFileRangeDesc& range = _file_ranges[_next_range++]; + Status init_status; + switch (_file_params.format_type) { + case TFileFormatType::FORMAT_PARQUET: { + std::unique_ptr parquet_reader = + vectorized::ParquetReader::create_unique( + _runtime_profile, _file_params, range, + _runtime_state->query_options().batch_size, + const_cast(&_runtime_state->timezone_obj()), + _io_ctx.get(), _runtime_state.get()); + + RETURN_IF_ERROR(parquet_reader->open()); + std::vector place_holder; + init_status = parquet_reader->init_reader( + _all_col_names, place_holder, _colname_to_value_range, _push_down_expr, + _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, false); + _cur_reader = std::move(parquet_reader); + if (!init_status.ok()) { + return Status::InternalError("failed to init reader for file {}, err: {}", range.path, + init_status.to_string()); + } + std::unordered_map> + partition_columns; + std::unordered_map missing_columns; + _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); + _cur_reader->set_fill_columns(partition_columns, missing_columns); + break; + } + default: + LOG(WARNING) << "Unsupported file format type: " << _file_params.format_type; + return Status::Error(); + } + _cur_reader_eof = false; + + return Status::OK(); +} + std::string PushHandler::_debug_version_list(const Versions& versions) const { std::ostringstream txt; txt << "Versions: "; diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 9f6a91a3d5..de6dd857e9 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -26,14 +26,16 @@ #include #include +#include "common/factory_creator.h" #include "common/object_pool.h" #include "common/status.h" -#include "exec/base_scanner.h" +#include "exec/olap_common.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" #include "runtime/runtime_state.h" +#include "vec/exec/format/generic_reader.h" namespace doris { @@ -46,6 +48,8 @@ class TTabletInfo; namespace vectorized { class Block; +class GenericReader; +class VExprContext; } // namespace vectorized class PushHandler { @@ -83,28 +87,73 @@ private: }; class PushBrokerReader { -public: - PushBrokerReader() : _ready(false), _eof(false) {} - ~PushBrokerReader() = default; + ENABLE_FACTORY_CREATOR(PushBrokerReader); - Status init(const Schema* schema, const TBrokerScanRange& t_scan_range, - const TDescriptorTable& t_desc_tbl); +public: + PushBrokerReader(const Schema* schema, const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl); + ~PushBrokerReader() = default; + Status init(); Status next(vectorized::Block* block); void print_profile(); - Status close() { - _ready = false; - return Status::OK(); - } + Status close(); bool eof() const { return _eof; } +protected: + Status _get_next_reader(); + Status _init_src_block(); + Status _cast_to_input_block(); + Status _convert_to_output_block(vectorized::Block* block); + Status _init_expr_ctxes(); + private: bool _ready; bool _eof; + int _next_range; + vectorized::Block* _src_block_ptr; + vectorized::Block _src_block; + const TDescriptorTable& _t_desc_tbl; + std::unordered_map _name_to_col_type; + std::unordered_set _missing_cols; + std::unordered_map _src_block_name_to_idx; + std::vector _dest_vexpr_ctx; + std::unique_ptr _vpre_filter_ctx_ptr; + bool _is_dynamic_schema = false; + std::vector _src_slot_descs_order_by_dest; + std::unordered_map _dest_slot_to_src_slot_index; + + std::vector _src_slot_descs; + std::unique_ptr _row_desc; + const TupleDescriptor* _dest_tuple_desc; + std::unique_ptr _runtime_state; RuntimeProfile* _runtime_profile; - std::unique_ptr _counter; - std::unique_ptr _scanner; + std::unique_ptr _cur_reader; + bool _cur_reader_eof; + const TBrokerScanRangeParams& _params; + const std::vector& _ranges; + TFileScanRangeParams _file_params; + std::vector _file_ranges; + + std::unique_ptr _file_cache_statistics; + std::unique_ptr _io_ctx; + + // col names from _slot_descs + std::vector _all_col_names; + std::unordered_map* _colname_to_value_range; + vectorized::VExprContext* _push_down_expr = nullptr; + const std::unordered_map* _col_name_to_slot_id; + // single slot filter conjuncts + std::unordered_map> _slot_id_to_filter_conjuncts; + // not single(zero or multi) slot filter conjuncts + std::vector _not_single_slot_filter_conjuncts; + // File source slot descriptors + std::vector _file_slot_descs; + // row desc for default exprs + std::unique_ptr _default_val_row_desc; + const TupleDescriptor* _real_tuple_desc = nullptr; + // Not used, just for placeholding std::vector _pre_filter_texprs; }; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 4d22b3da4d..2d5bc7f177 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -124,7 +124,6 @@ set(VEC_FILES data_types/data_type_time.cpp data_types/data_type_object.cpp exec/vaggregation_node.cpp - exec/varrow_scanner.cpp exec/vsort_node.cpp exec/vexchange_node.cpp exec/vset_operation_node.cpp @@ -137,7 +136,6 @@ set(VEC_FILES exec/vrepeat_node.cpp exec/vtable_function_node.cpp exec/vjdbc_connector.cpp - exec/vparquet_scanner.cpp exec/join/vhash_join_node.cpp exec/join/vjoin_node_base.cpp exec/join/vnested_loop_join_node.cpp diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp deleted file mode 100644 index b8ce90dfef..0000000000 --- a/be/src/vec/exec/varrow_scanner.cpp +++ /dev/null @@ -1,343 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/varrow_scanner.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "exec/arrow/arrow_reader.h" -#include "io/file_factory.h" -#include "io/fs/file_reader.h" -#include "runtime/descriptors.h" -#include "runtime/runtime_state.h" -#include "runtime/thread_context.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/column.h" -#include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/columns_with_type_and_name.h" -#include "vec/core/field.h" -#include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_factory.hpp" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_string.h" -#include "vec/functions/function.h" -#include "vec/functions/simple_function_factory.h" -#include "vec/utils/arrow_column_to_doris_column.h" - -namespace doris { -class TExpr; -namespace io { -enum class FileCachePolicy : uint8_t; -} // namespace io -} // namespace doris - -namespace doris::vectorized { -using namespace ErrorCode; - -VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), - // _splittable(params.splittable), - _cur_file_reader(nullptr), - _cur_file_eof(false), - _batch(nullptr), - _arrow_batch_cur_idx(0) { - _filtered_row_groups_counter = ADD_COUNTER(_profile, "FileFilteredRowGroups", TUnit::UNIT); - _filtered_rows_counter = ADD_COUNTER(_profile, "FileFilteredRows", TUnit::UNIT); - _filtered_bytes_counter = ADD_COUNTER(_profile, "FileFilteredBytes", TUnit::BYTES); - _total_rows_counter = ADD_COUNTER(_profile, "FileTotalRows", TUnit::UNIT); - _total_groups_counter = ADD_COUNTER(_profile, "FileTotalRowGroups", TUnit::UNIT); -} - -VArrowScanner::~VArrowScanner() { - close(); -} - -void VArrowScanner::_init_system_properties(const TBrokerRangeDesc& range) { - _system_properties.system_type = range.file_type; - _system_properties.properties = _params.properties; - _system_properties.hdfs_params = range.hdfs_params; - _system_properties.broker_addresses.assign(_broker_addresses.begin(), _broker_addresses.end()); -} - -void VArrowScanner::_init_file_description(const TBrokerRangeDesc& range) { - _file_description.path = range.path; - _file_description.start_offset = range.start_offset; - _file_description.file_size = range.__isset.file_size ? range.file_size : 0; -} - -Status VArrowScanner::_open_next_reader() { - // open_file_reader - if (_cur_file_reader != nullptr) { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - - while (true) { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - const TBrokerRangeDesc& range = _ranges[_next_range++]; - io::FileReaderSPtr file_reader; - _init_system_properties(range); - _init_file_description(range); - io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, - _file_description, &_file_system, - &file_reader, reader_options)); - - if (file_reader->size() == 0) { - continue; - } - - int32_t num_of_columns_from_file = _src_slot_descs.size(); - if (range.__isset.num_of_columns_from_file) { - num_of_columns_from_file = range.num_of_columns_from_file; - } - _cur_file_reader = _new_arrow_reader(_src_slot_descs, file_reader, num_of_columns_from_file, - range.start_offset, range.size); - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = _cur_file_reader->init_reader(tuple_desc, _state->timezone()); - - if (status.is()) { - continue; - } else { - if (!status.ok()) { - return Status::InternalError(" file: {} error:{}", range.path, status.to_string()); - } else { - update_profile(_cur_file_reader->statistics()); - return status; - } - } - } -} - -void VArrowScanner::update_profile(std::shared_ptr& statistics) { - COUNTER_UPDATE(_total_groups_counter, statistics->total_groups); - COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups); - COUNTER_UPDATE(_total_rows_counter, statistics->total_rows); - COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows); - COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes); -} - -Status VArrowScanner::open() { - RETURN_IF_ERROR(BaseScanner::open()); - if (_ranges.empty()) { - return Status::OK(); - } - return Status::OK(); -} - -// get next available arrow batch -Status VArrowScanner::_next_arrow_batch() { - _arrow_batch_cur_idx = 0; - // first, init file reader - if (_cur_file_reader == nullptr || _cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - } - // second, loop until find available arrow batch or EOF - while (!_scanner_eof) { - RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); - if (_cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - continue; - } - if (_batch->num_rows() == 0) { - continue; - } - return Status::OK(); - } - return Status::EndOfFile("EOF"); -} - -Status VArrowScanner::_init_arrow_batch_if_necessary() { - // 1. init batch if first time - // 2. reset reader if end of file - Status status = Status::OK(); - if (_scanner_eof) { - return Status::EndOfFile("EOF"); - } - if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - return _next_arrow_batch(); - } - return status; -} - -Status VArrowScanner::_init_src_block() { - size_t batch_pos = 0; - _src_block.clear(); - if (_batch->num_columns() < _num_of_columns_from_file) { - LOG(WARNING) << "some columns not found in the file, num_columns_obtained: " - << _batch->num_columns() - << " num_columns_required: " << _num_of_columns_from_file; - return Status::InvalidArgument("some columns not found in the file"); - } - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto* array = _batch->column(batch_pos++).get(); - // let src column be nullable for simplify converting - // TODO, support not nullable for exec efficiently - auto is_nullable = true; - DataTypePtr data_type = - DataTypeFactory::instance().create_data_type(array->type().get(), is_nullable); - if (data_type == nullptr) { - return Status::NotSupported( - fmt::format("Not support arrow type:{}", array->type()->name())); - } - MutableColumnPtr data_column = data_type->create_column(); - _src_block.insert( - ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); - } - return Status::OK(); -} - -Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) { - // overall of type converting: - // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> - // primitive type(PT1) ==materialize_block==> dest primitive type - - // first, we need to convert the arrow type to the corresponding internal type, - // such as arrow::INT16 to TYPE_SMALLINT(PT0). - // why need first step? we cannot convert the arrow type to type in src desc directly, - // it's too hard to achieve. - - // second, convert PT0 to the type in src desc, such as TYPE_SMALLINT to TYPE_VARCHAR.(PT1) - // why need second step? the materialize step only accepts types specified in src desc. - - // finally, through the materialized, convert to the type in dest desc, such as TYPE_DATETIME. - SCOPED_TIMER(_read_timer); - // init arrow batch - { - Status st = _init_arrow_batch_if_necessary(); - if (!st.ok()) { - if (!st.is()) { - return st; - } - *eof = true; - return Status::OK(); - } - } - - RETURN_IF_ERROR(_init_src_block()); - // convert arrow batch to block until reach the batch_size - while (!_scanner_eof) { - // cast arrow type to PT0 and append it to src block - // for example: arrow::Type::INT16 => TYPE_SMALLINT - RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block)); - // finalize the src block if full - if (_src_block.rows() >= _state->batch_size()) { - break; - } - auto status = _next_arrow_batch(); - // if ok, append the batch to the src columns - if (status.ok()) { - continue; - } - // return error if not EOF - if (!status.is()) { - return status; - } - _cur_file_eof = true; - break; - } - COUNTER_UPDATE(_rows_read_counter, _src_block.rows()); - SCOPED_TIMER(_materialize_timer); - // cast PT0 => PT1 - // for example: TYPE_SMALLINT => TYPE_VARCHAR - RETURN_IF_ERROR(_cast_src_block(&_src_block)); - - // materialize, src block => dest columns - RETURN_IF_CATCH_EXCEPTION({ return _fill_dest_block(block, eof); }); -} - -// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> -// primitive type(PT1) ==materialize_block==> dest primitive type -Status VArrowScanner::_cast_src_block(Block* block) { - // cast primitive type(PT0) to primitive type(PT1) - for (size_t i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto& arg = block->get_by_name(slot_desc->col_name()); - // remove nullable here, let the get_function decide whether nullable - auto return_type = slot_desc->get_data_type_ptr(); - ColumnsWithTypeAndName arguments { - arg, - {DataTypeString().create_column_const( - arg.column->size(), remove_nullable(return_type)->get_family_name()), - std::make_shared(), ""}}; - auto func_cast = - SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); - RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); - block->get_by_position(i).type = std::move(return_type); - } - return Status::OK(); -} - -Status VArrowScanner::_append_batch_to_src_block(Block* block) { - size_t num_elements = std::min((_state->batch_size() - block->rows()), - (_batch->num_rows() - _arrow_batch_cur_idx)); - size_t column_pos = 0; - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto* array = _batch->column(column_pos++).get(); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - RETURN_IF_ERROR(arrow_column_to_doris_column( - array, _arrow_batch_cur_idx, column_with_type_and_name.column, - column_with_type_and_name.type, num_elements, _state->timezone_obj())); - } - - _arrow_batch_cur_idx += num_elements; - return Status::OK(); -} - -void VArrowScanner::close() { - BaseScanner::close(); - if (_cur_file_reader != nullptr) { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h deleted file mode 100644 index e8d55542d6..0000000000 --- a/be/src/vec/exec/varrow_scanner.h +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include - -#include "common/status.h" -#include "exec/base_scanner.h" -#include "io/file_factory.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "util/runtime_profile.h" - -namespace arrow { -class RecordBatch; -} // namespace arrow -namespace doris { -class ArrowReaderWrap; -class RuntimeState; -class SlotDescriptor; -class TBrokerRangeDesc; -class TBrokerScanRangeParams; -class TExpr; -class TNetworkAddress; - -namespace io { -class FileSystem; -} // namespace io -namespace vectorized { -class Block; -} // namespace vectorized -struct Statistics; -} // namespace doris - -namespace doris::vectorized { - -// VArrow scanner convert the data read from orc|parquet to doris's columns. -class VArrowScanner : public BaseScanner { -public: - VArrowScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); - - virtual ~VArrowScanner(); - - // Open this scanner, will initialize information need to - virtual Status open() override; - - virtual Status get_next(Block* block, bool* eof) override; - - // Update file predicate filter profile - void update_profile(std::shared_ptr& statistics); - - virtual void close() override; - -protected: - virtual ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, - int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) = 0; - -private: - // Read next buffer from reader - Status _open_next_reader(); - Status _next_arrow_batch(); - Status _init_arrow_batch_if_necessary(); - Status _init_src_block() override; - Status _append_batch_to_src_block(Block* block); - Status _cast_src_block(Block* block); - void _init_system_properties(const TBrokerRangeDesc& range); - void _init_file_description(const TBrokerRangeDesc& range); - -private: - // Reader - ArrowReaderWrap* _cur_file_reader; - bool _cur_file_eof; // is read over? - std::shared_ptr _batch; - size_t _arrow_batch_cur_idx; - FileSystemProperties _system_properties; - FileDescription _file_description; - std::shared_ptr _file_system; - - RuntimeProfile::Counter* _filtered_row_groups_counter; - RuntimeProfile::Counter* _filtered_rows_counter; - RuntimeProfile::Counter* _filtered_bytes_counter; - RuntimeProfile::Counter* _total_rows_counter; - RuntimeProfile::Counter* _total_groups_counter; -}; - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp deleted file mode 100644 index 70d8bd1408..0000000000 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/vparquet_scanner.h" - -#include "exec/arrow/parquet_reader.h" -#include "vec/exec/varrow_scanner.h" - -namespace doris { -class ArrowReaderWrap; -class RuntimeProfile; -class RuntimeState; -class SlotDescriptor; -class TBrokerRangeDesc; -class TBrokerScanRangeParams; -class TExpr; -class TNetworkAddress; -struct ScannerCounter; -} // namespace doris - -namespace doris::vectorized { - -VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, - ScannerCounter* counter) - : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, - counter) {} - -ArrowReaderWrap* VParquetScanner::_new_arrow_reader( - const std::vector& file_slot_descs, io::FileReaderSPtr file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, - range_start_offset, range_size); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h deleted file mode 100644 index dc71f43b9e..0000000000 --- a/be/src/vec/exec/vparquet_scanner.h +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include - -#include "io/fs/file_reader_writer_fwd.h" -#include "vec/exec/varrow_scanner.h" - -namespace doris { -class ArrowReaderWrap; -class RuntimeProfile; -class RuntimeState; -class SlotDescriptor; -class TBrokerRangeDesc; -class TBrokerScanRangeParams; -class TExpr; -class TNetworkAddress; -struct ScannerCounter; -} // namespace doris - -namespace doris::vectorized { - -// VParquet scanner convert the data read from Parquet to doris's columns. -class VParquetScanner final : public VArrowScanner { -public: - VParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); - - ~VParquetScanner() override = default; - -protected: - ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, - io::FileReaderSPtr file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; -}; - -} // namespace doris::vectorized diff --git a/fe/pom.xml b/fe/pom.xml index 2cccf1064c..bea94f8e7b 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -998,7 +998,7 @@ under the License. org.apache.spark spark-launcher_2.12 ${spark.version} - provided + diff --git a/regression-test/data/load_p0/spark_load/all_types1.txt b/regression-test/data/load_p0/spark_load/all_types1.txt new file mode 100644 index 0000000000..3b4e1ad6dc --- /dev/null +++ b/regression-test/data/load_p0/spark_load/all_types1.txt @@ -0,0 +1,11 @@ +10000,aa,北京,0,11,4444,5555555,41232314,3.14,123.3423,111.111,111.111,2017-10-01,2017-10-01,2017-10-01 06:00:00,2017-10-01 06:00:00 +10001,bb,北京,0,22,3333,666,2768658,5.32,123111.3242,222.222,222.222,2017-10-02,2017-10-02,2017-10-02 07:00:00,2017-10-02 07:00:00 +10002,cc,北京,1,33,2222,453,5463456,4.321,11111.23423,333.333,333.333,2017-10-03,2017-10-03,2017-10-03 17:05:45,2017-10-03 17:05:45 +10003,dd,上海,1,44,1111,-3241,-45235,1.34,54626.324,444.444,444.444,2017-10-04,2017-10-04,2017-10-04 12:59:12,2017-10-04 12:59:12 +10004,ee,成都,0,55,-9999,21342,4513456,1.22,111.33,555.555,555.555,2017-10-05,2017-10-05,2017-10-05 11:20:00,2017-10-05 11:20:00 +10005,ff,西安,0,66,8888,64562,4356,9.133,23423.45,666.666,666.666,2017-10-06,2017-10-06,2017-10-06 12:00:15,2017-10-06 12:00:15 +10006,gg,深圳,1,77,-7777,-12313342,34534,8.100,12,777.777,777.777,2017-10-07,2017-10-07,2017-10-07 13:20:22,2017-10-07 13:20:22 +10007,hh,杭州,0,88,6666,314234,43535356,34.124,324,888.888,888.888,2017-10-08,2017-10-08,2017-10-08 14:58:10,2017-10-08 14:58:10 +10008,ii,上海,1,99,-5555,1341,23434534,342.120,34234.1,999.999,999.999,2017-10-09,2017-10-09,2017-10-09 25:12:22,2017-10-09 25:12:22 +10009,jj,南京,0,11,4444,-123,53623567,11.22,324.33,111.111,111.111,2017-10-10,2017-10-10,2017-10-10 16:25:42,2017-10-10 16:25:42 +10010,kk,成都,0,22,-3333,12314,674567,13,45464.435,222.222,222.222,2017-10-11,2017-10-11,2017-10-11 17:22:24,2017-10-11 17:22:24 \ No newline at end of file diff --git a/regression-test/data/load_p0/spark_load/all_types2.txt b/regression-test/data/load_p0/spark_load/all_types2.txt new file mode 100644 index 0000000000..429b1d99d2 --- /dev/null +++ b/regression-test/data/load_p0/spark_load/all_types2.txt @@ -0,0 +1,11 @@ +10011|aa|北京|0|11|4444|5555555|41232314|3.14|123.3423|111.111|111.111|2017-10-01|2017-10-01|2017-10-01 06:00:00|2017-10-01 06:00:00 +10012|bb|北京|0|22|3333|666|2768658|5.32|123111.3242|222.222|222.222|2017-10-02|2017-10-02|2017-10-02 07:00:00|2017-10-02 07:00:00 +10013|cc|北京|1|33|2222|453|5463456|4.321|11111.23423|333.333|333.333|2017-10-03|2017-10-03|2017-10-03 17:05:45|2017-10-03 17:05:45 +10014|dd|上海|1|44|1111|-3241|-45235|1.34|54626.324|444.444|444.444|2017-10-04|2017-10-04|2017-10-04 12:59:12|2017-10-04 12:59:12 +10015|ee|成都|0|55|-9999|21342|4513456|1.22|111.33|555.555|555.555|2017-10-05|2017-10-05|2017-10-05 11:20:00|2017-10-05 11:20:00 +10016|ff|西安|0|66|8888|64562|4356|9.133|23423.45|666.666|666.666|2017-10-06|2017-10-06|2017-10-06 12:00:15|2017-10-06 12:00:15 +10017|gg|深圳|1|77|-7777|-12313342|34534|8.100|12|777.777|777.777|2017-10-07|2017-10-07|2017-10-07 13:20:22|2017-10-07 13:20:22 +10018|hh|杭州|0|88|6666|314234|43535356|34.124|324|888.888|888.888|2017-10-08|2017-10-08|2017-10-08 14:58:10|2017-10-08 14:58:10 +10019|ii|上海|1|99|-5555|1341|23434534|342.120|34234.1|999.999|999.999|2017-10-09|2017-10-09|2017-10-09 25:12:22|2017-10-09 25:12:22 +10020|jj|南京|0|11|4444|-123|53623567|11.22|324.33|111.111|111.111|2017-10-10|2017-10-10|2017-10-10 16:25:42|2017-10-10 16:25:42 +10021|kk|成都|0|22|-3333|12314|674567|13|45464.435|222.222|222.222|2017-10-11|2017-10-11|2017-10-11 17:22:24|2017-10-11 17:22:24 \ No newline at end of file diff --git a/regression-test/data/load_p0/spark_load/test_spark_load.out b/regression-test/data/load_p0/spark_load/test_spark_load.out new file mode 100644 index 0000000000..aa4d662c23 --- /dev/null +++ b/regression-test/data/load_p0/spark_load/test_spark_load.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +10000 aa 北京 false 11 4444 5555555 41232314 3.14 123.3423 111.111 111.111 2017-10-01 2017-10-01 2017-10-01T06:00 2017-10-01T06:00 +10001 bb 北京 false 22 3333 666 2768658 5.32 123111.3242 222.222 222.222 2017-10-02 2017-10-02 2017-10-02T07:00 2017-10-02T07:00 +10002 cc 北京 true 33 2222 453 5463456 4.321 11111.23423 333.333 333.333 2017-10-03 2017-10-03 2017-10-03T17:05:45 2017-10-03T17:05:45 +10003 dd 上海 true 44 1111 -3241 -45235 1.34 54626.324 444.444 444.444 2017-10-04 2017-10-04 2017-10-04T12:59:12 2017-10-04T12:59:12 +10004 ee 成都 false 55 -9999 21342 4513456 1.22 111.33 555.555 555.555 2017-10-05 2017-10-05 2017-10-05T11:20 2017-10-05T11:20 +10005 ff 西安 false 66 8888 64562 4356 9.133 23423.45 666.666 666.666 2017-10-06 2017-10-06 2017-10-06T12:00:15 2017-10-06T12:00:15 +10006 gg 深圳 true 77 -7777 -12313342 34534 8.1 12.0 777.777 777.777 2017-10-07 2017-10-07 2017-10-07T13:20:22 2017-10-07T13:20:22 +10007 hh 杭州 false 88 6666 314234 43535356 34.124 324.0 888.888 888.888 2017-10-08 2017-10-08 2017-10-08T14:58:10 2017-10-08T14:58:10 +10008 ii 上海 true 99 -5555 1341 23434534 342.12 34234.1 999.999 999.999 2017-10-09 2017-10-09 \N \N +10009 jj 南京 false 11 4444 -123 53623567 11.22 324.33 111.111 111.111 2017-10-10 2017-10-10 2017-10-10T16:25:42 2017-10-10T16:25:42 +10010 kk 成都 false 22 -3333 12314 674567 13.0 45464.435 222.222 222.222 2017-10-11 2017-10-11 2017-10-11T17:22:24 2017-10-11T17:22:24 +10011 aa 北京 false 11 4444 5555555 41232314 3.14 123.3423 111.111 111.111 2017-10-01 2017-10-01 2017-10-01T06:00 2017-10-01T06:00 +10012 bb 北京 false 22 3333 666 2768658 5.32 123111.3242 222.222 222.222 2017-10-02 2017-10-02 2017-10-02T07:00 2017-10-02T07:00 +10013 cc 北京 true 33 2222 453 5463456 4.321 11111.23423 333.333 333.333 2017-10-03 2017-10-03 2017-10-03T17:05:45 2017-10-03T17:05:45 +10014 dd 上海 true 44 1111 -3241 -45235 1.34 54626.324 444.444 444.444 2017-10-04 2017-10-04 2017-10-04T12:59:12 2017-10-04T12:59:12 +10015 ee 成都 false 55 -9999 21342 4513456 1.22 111.33 555.555 555.555 2017-10-05 2017-10-05 2017-10-05T11:20 2017-10-05T11:20 +10016 ff 西安 false 66 8888 64562 4356 9.133 23423.45 666.666 666.666 2017-10-06 2017-10-06 2017-10-06T12:00:15 2017-10-06T12:00:15 +10017 gg 深圳 true 77 -7777 -12313342 34534 8.1 12.0 777.777 777.777 2017-10-07 2017-10-07 2017-10-07T13:20:22 2017-10-07T13:20:22 +10018 hh 杭州 false 88 6666 314234 43535356 34.124 324.0 888.888 888.888 2017-10-08 2017-10-08 2017-10-08T14:58:10 2017-10-08T14:58:10 +10019 ii 上海 true 99 -5555 1341 23434534 342.12 34234.1 999.999 999.999 2017-10-09 2017-10-09 \N \N +10020 jj 南京 false 11 4444 -123 53623567 11.22 324.33 111.111 111.111 2017-10-10 2017-10-10 2017-10-10T16:25:42 2017-10-10T16:25:42 +10021 kk 成都 false 22 -3333 12314 674567 13.0 45464.435 222.222 222.222 2017-10-11 2017-10-11 2017-10-11T17:22:24 2017-10-11T17:22:24 + +-- !select -- +10000 aa 北京 false 11 4444 5555555 41232314 3.14 123.3423 111.111 111.111 2017-10-01 2017-10-01 2017-10-01T06:00 2017-10-01T06:00 +10001 bb 北京 false 22 3333 666 2768658 5.32 123111.3242 222.222 222.222 2017-10-02 2017-10-02 2017-10-02T07:00 2017-10-02T07:00 +10002 cc 北京 true 33 2222 453 5463456 4.321 11111.23423 333.333 333.333 2017-10-03 2017-10-03 2017-10-03T17:05:45 2017-10-03T17:05:45 +10003 dd 上海 true 44 1111 -3241 -45235 1.34 54626.324 444.444 444.444 2017-10-04 2017-10-04 2017-10-04T12:59:12 2017-10-04T12:59:12 +10004 ee 成都 false 55 -9999 21342 4513456 1.22 111.33 555.555 555.555 2017-10-05 2017-10-05 2017-10-05T11:20 2017-10-05T11:20 +10005 ff 西安 false 66 8888 64562 4356 9.133 23423.45 666.666 666.666 2017-10-06 2017-10-06 2017-10-06T12:00:15 2017-10-06T12:00:15 +10006 gg 深圳 true 77 -7777 -12313342 34534 8.1 12.0 777.777 777.777 2017-10-07 2017-10-07 2017-10-07T13:20:22 2017-10-07T13:20:22 +10007 hh 杭州 false 88 6666 314234 43535356 34.124 324.0 888.888 888.888 2017-10-08 2017-10-08 2017-10-08T14:58:10 2017-10-08T14:58:10 +10008 ii 上海 true 99 -5555 1341 23434534 342.12 34234.1 999.999 999.999 2017-10-09 2017-10-09 \N \N +10009 jj 南京 false 11 4444 -123 53623567 11.22 324.33 111.111 111.111 2017-10-10 2017-10-10 2017-10-10T16:25:42 2017-10-10T16:25:42 +10010 kk 成都 false 22 -3333 12314 674567 13.0 45464.435 222.222 222.222 2017-10-11 2017-10-11 2017-10-11T17:22:24 2017-10-11T17:22:24 \ No newline at end of file diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index 33e2bc8ff5..e86b61a237 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -48,7 +48,7 @@ testDirectories = "" // this groups will not be executed excludeGroups = "" // this suites will not be executed -excludeSuites = "test_broker_load" +excludeSuites = "test_broker_load,test_spark_load" // this directories will not be executed excludeDirectories = "" diff --git a/regression-test/suites/load_p0/spark_load/test_spark_load.groovy b/regression-test/suites/load_p0/spark_load/test_spark_load.groovy new file mode 100644 index 0000000000..6cc3df7381 --- /dev/null +++ b/regression-test/suites/load_p0/spark_load/test_spark_load.groovy @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_spark_load", "p0") { + // Need spark cluster, upload data file to hdfs + def testTable = "tbl_test_spark_load" + def testTable2 = "tbl_test_spark_load2" + def testResource = "spark_resource" + def yarnAddress = "master:8032" + def hdfsAddress = "hdfs://master:9000" + def hdfsWorkingDir = "hdfs://master:9000/doris" + brokerName =getBrokerName() + hdfsUser = getHdfsUser() + hdfsPasswd = getHdfsPasswd() + + def create_test_table = {testTablex -> + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create table should update 0 rows") + } + + def create_spark_resource = {sparkType, sparkMaster, sparkQueue -> + def result1 = sql """ + CREATE EXTERNAL RESOURCE "${testResource}" + PROPERTIES + ( + "type" = "spark", + "spark.master" = "yarn", + "spark.submit.deployMode" = "cluster", + "spark.executor.memory" = "1g", + "spark.yarn.queue" = "default", + "spark.hadoop.yarn.resourcemanager.address" = "${yarnAddress}", + "spark.hadoop.fs.defaultFS" = "${hdfsAddress}", + "working_dir" = "${hdfsWorkingDir}", + "broker" = "${brokerName}", + "broker.username" = "${hdfsUser}", + "broker.password" = "${hdfsPasswd}" + ); + """ + + // DDL/DML return 1 row and 3 column, the only value is update row count + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create resource should update 0 rows") + } + + def load_from_hdfs_use_spark = {testTablex, testTablex2, label, hdfsFilePath1, hdfsFilePath2 -> + def result1= sql """ + LOAD LABEL ${label} + ( + DATA INFILE("${hdfsFilePath1}") + INTO TABLE ${testTablex} + COLUMNS TERMINATED BY ",", + DATA INFILE("${hdfsFilePath2}") + INTO TABLE ${testTablex2} + COLUMNS TERMINATED BY "|" + ) + WITH RESOURCE '${testResource}' + ( + "spark.executor.memory" = "2g", + "spark.shuffle.compress" = "true" + ) + PROPERTIES + ( + "timeout" = "3600" + ); + """ + + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } + + def check_load_result = {checklabel, testTablex, testTablex2 -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + sql "sync" + qt_select "select * from ${testTablex} order by c_int" + qt_select "select * from ${testTablex2} order by c_int" + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertEquals(1, 2) + } + } + } + } + + // if 'enableHdfs' in regression-conf.groovy has been set to true, + if (enableHdfs()) { + def hdfs_txt_file_path1 = uploadToHdfs "spark_load/all_types1.txt" + def hdfs_txt_file_path2 = uploadToHdfs "spark_load/all_types2.txt" + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql "DROP TABLE IF EXISTS ${testTable2}" + create_test_table.call(testTable) + create_test_table.call(testTable2) + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs.call(testTable, testTable2, test_load_label, hdfs_txt_file_path1, hdfs_txt_file_path2) + check_load_result.call(test_load_label, testTable, testTable2) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + try_sql("DROP TABLE IF EXISTS ${testTable2}") + } + } +}