diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 202ae767b0..d93c394047 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -22,6 +22,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec") set(EXEC_FILES + arrow/arrow_reader.cpp + arrow/orc_reader.cpp + arrow/parquet_reader.cpp analytic_eval_node.cpp blocking_join_node.cpp broker_scan_node.cpp @@ -94,7 +97,6 @@ set(EXEC_FILES local_file_writer.cpp broker_writer.cpp parquet_scanner.cpp - parquet_reader.cpp parquet_writer.cpp orc_scanner.cpp odbc_connector.cpp diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp new file mode 100644 index 0000000000..789c66be2c --- /dev/null +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "exec/arrow/arrow_reader.h" + +#include +#include +#include + +#include "common/logging.h" +#include "exec/file_reader.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "util/thrift_util.h" + +namespace doris { + +// Broker + +ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : _batch_size(batch_size), _num_of_columns_from_file(num_of_columns_from_file) { + _arrow_file = std::shared_ptr(new ArrowFile(file_reader)); + _rb_reader = nullptr; + _total_groups = 0; + _current_group = 0; +} + +ArrowReaderWrap::~ArrowReaderWrap() { + close(); +} + +void ArrowReaderWrap::close() { + arrow::Status st = _arrow_file->Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +Status ArrowReaderWrap::column_indices(const std::vector& tuple_slot_descs) { + DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size()); + _include_column_ids.clear(); + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = tuple_slot_descs.at(i); + // Get the Column Reader for the boolean column + auto iter = _map_column.find(slot_desc->col_name()); + if (iter != _map_column.end()) { + _include_column_ids.emplace_back(iter->second); + } else { + std::stringstream str_error; + str_error << "Invalid Column Name:" << slot_desc->col_name(); + LOG(WARNING) << str_error.str(); + return Status::InvalidArgument(str_error.str()); + } + } + return Status::OK(); +} + +ArrowFile::ArrowFile(FileReader* file) : _file(file) {} + +ArrowFile::~ArrowFile() { + arrow::Status st = Close(); + if (!st.ok()) { + LOG(WARNING) << "close file error: " << st.ToString(); + } +} + +arrow::Status ArrowFile::Close() { + if (_file != nullptr) { + _file->close(); + delete _file; + _file = nullptr; + } + return arrow::Status::OK(); +} + +bool ArrowFile::closed() const { + if (_file != nullptr) { + return _file->closed(); + } else { + return true; + } +} + +arrow::Result ArrowFile::Read(int64_t nbytes, void* buffer) { + return ReadAt(_pos, nbytes, buffer); +} + +arrow::Result ArrowFile::ReadAt(int64_t position, int64_t nbytes, void* out) { + int64_t reads = 0; + int64_t bytes_read = 0; + _pos = position; + while (nbytes > 0) { + Status result = _file->readat(_pos, nbytes, &reads, out); + if (!result.ok()) { + return arrow::Status::IOError("Readat failed."); + } + if (reads == 0) { + break; + } + bytes_read += reads; // total read bytes + nbytes -= reads; // remained bytes + _pos += reads; + out = (char*)out + reads; + } + return bytes_read; +} + +arrow::Result ArrowFile::GetSize() { + return _file->size(); +} + +arrow::Status ArrowFile::Seek(int64_t position) { + _pos = position; + // NOTE: Only readat operation is used, so _file seek is not called here. + return arrow::Status::OK(); +} + +arrow::Result ArrowFile::Tell() const { + return _pos; +} + +arrow::Result> ArrowFile::Read(int64_t nbytes) { + auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool()); + ARROW_RETURN_NOT_OK(buffer); + std::shared_ptr read_buf = std::move(buffer.ValueOrDie()); + auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data()); + ARROW_RETURN_NOT_OK(bytes_read); + // If bytes_read is equal with read_buf's capacity, we just assign + if (bytes_read.ValueOrDie() == nbytes) { + return std::move(read_buf); + } else { + return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie()); + } +} + +} // namespace doris diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h new file mode 100644 index 0000000000..4149f02493 --- /dev/null +++ b/be/src/exec/arrow/arrow_reader.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "common/status.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" + +namespace doris { + +class ExecEnv; +class TBrokerRangeDesc; +class TNetworkAddress; +class RuntimeState; +class Tuple; +class SlotDescriptor; +class MemPool; +class FileReader; + +class ArrowFile : public arrow::io::RandomAccessFile { +public: + ArrowFile(FileReader* file); + virtual ~ArrowFile(); + arrow::Result Read(int64_t nbytes, void* buffer) override; + arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override; + arrow::Result GetSize() override; + arrow::Status Seek(int64_t position) override; + arrow::Result> Read(int64_t nbytes) override; + arrow::Result Tell() const override; + arrow::Status Close() override; + bool closed() const override; + +private: + FileReader* _file; + int64_t _pos = 0; +}; + +// base of arrow reader +class ArrowReaderWrap { +public: + ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); + virtual ~ArrowReaderWrap(); + + virtual Status init_reader(const std::vector& tuple_slot_descs, + const std::string& timezone) = 0; + // for row + virtual Status read(Tuple* tuple, const std::vector& tuple_slot_descs, + MemPool* mem_pool, bool* eof) { + return Status::NotSupported("Not Implemented read"); + } + // for vec + virtual Status next_batch(std::shared_ptr* batch, bool* eof) = 0; + virtual void close(); + virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } + +protected: + virtual Status column_indices(const std::vector& tuple_slot_descs); + +protected: + const int64_t _batch_size; + const int32_t _num_of_columns_from_file; + std::shared_ptr _arrow_file; + std::shared_ptr<::arrow::RecordBatchReader> _rb_reader; + int _total_groups; // num of groups(stripes) of a parquet(orc) file + int _current_group; // current group(stripe) + std::map _map_column; // column-name <---> column-index + std::vector _include_column_ids; // columns that need to get from file +}; + +} // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp new file mode 100644 index 0000000000..5815f008df --- /dev/null +++ b/be/src/exec/arrow/orc_reader.cpp @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "exec/arrow/orc_reader.h" + +#include +#include +#include + +#include "common/logging.h" +#include "exec/file_reader.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" + +namespace doris { + +ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file) { + _reader = nullptr; + _cur_file_eof = false; +} + +Status ORCReaderWrap::init_reader(const std::vector& tuple_slot_descs, + const std::string& timezone) { + // Open ORC file reader + auto maybe_reader = + arrow::adapters::orc::ORCFileReader::Open(_arrow_file, arrow::default_memory_pool()); + if (!maybe_reader.ok()) { + // Handle error instantiating file reader... + LOG(WARNING) << "failed to create orc file reader, errmsg=" << maybe_reader.status(); + return Status::InternalError("Failed to create orc file reader"); + } + _reader = std::move(maybe_reader.ValueOrDie()); + _total_groups = _reader->NumberOfStripes(); + if (_total_groups == 0) { + return Status::EndOfFile("Empty Orc File"); + } + + // map + arrow::Result> maybe_schema = _reader->ReadSchema(); + if (!maybe_schema.ok()) { + // Handle error instantiating file reader... + LOG(WARNING) << "failed to read schema, errmsg=" << maybe_schema.status(); + return Status::InternalError("Failed to create orc file reader"); + } + std::shared_ptr schema = maybe_schema.ValueOrDie(); + for (size_t i = 0; i < schema->num_fields(); ++i) { + _map_column.emplace(schema->field(i)->name(), i); + } + + bool eof = false; + RETURN_IF_ERROR(_next_stripe_reader(&eof)); + if (eof) { + return Status::EndOfFile("end of file"); + } + + RETURN_IF_ERROR(column_indices(tuple_slot_descs)); + return Status::OK(); +} + +Status ORCReaderWrap::_next_stripe_reader(bool* eof) { + if (_current_group >= _total_groups) { + *eof = true; + return Status::OK(); + } + // Get a stripe level record batch iterator. + // record batch will have up to batch_size rows. + // NextStripeReader serves as a fine grained alternative to ReadStripe + // which may cause OOM issues by loading the whole stripe into memory. + // Note this will only read rows for the current stripe, not the entire file. + arrow::Result> maybe_rb_reader = + _reader->NextStripeReader(_batch_size, _include_column_ids); + if (!maybe_rb_reader.ok()) { + LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status(); + return Status::InternalError(maybe_rb_reader.status().ToString()); + } + _rb_reader = maybe_rb_reader.ValueOrDie(); + _current_group++; + return Status::OK(); +} + +Status ORCReaderWrap::next_batch(std::shared_ptr* batch, bool* eof) { + *eof = false; + do { + auto st = _rb_reader->ReadNext(batch); + if (!st.ok()) { + LOG(WARNING) << "failed to get next batch, errmsg=" << st; + return Status::InternalError(st.ToString()); + } + if (*batch == nullptr) { + // try next stripe + RETURN_IF_ERROR(_next_stripe_reader(eof)); + if (*eof) { + break; + } + } + } while (*batch == nullptr); + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h new file mode 100644 index 0000000000..5213a18dcf --- /dev/null +++ b/be/src/exec/arrow/orc_reader.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "common/status.h" +#include "exec/arrow/arrow_reader.h" +namespace doris { + +// Reader of orc file +class ORCReaderWrap final : public ArrowReaderWrap { +public: + ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); + ~ORCReaderWrap() override = default; + + Status init_reader(const std::vector& tuple_slot_descs, + const std::string& timezone) override; + Status next_batch(std::shared_ptr* batch, bool* eof) override; + +private: + Status _next_stripe_reader(bool* eof); + +private: + // orc file reader object + std::unique_ptr _reader; + bool _cur_file_eof; // is read over? +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp similarity index 84% rename from be/src/exec/parquet_reader.cpp rename to be/src/exec/arrow/parquet_reader.cpp index 53880cdbb3..5c57efc4be 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "exec/parquet_reader.h" +#include "exec/arrow/parquet_reader.h" #include #include @@ -42,21 +42,15 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file) - : _num_of_columns_from_file(num_of_columns_from_file), - _total_groups(0), - _current_group(0), +ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) + : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), _rows_of_group(0), _current_line_of_group(0), - _current_line_of_batch(0) { - _parquet = std::shared_ptr(new ParquetFile(file_reader)); -} + _current_line_of_batch(0) {} -ParquetReaderWrap::~ParquetReaderWrap() { - close(); -} -Status ParquetReaderWrap::init_parquet_reader(const std::vector& tuple_slot_descs, - const std::string& timezone) { +Status ParquetReaderWrap::init_reader(const std::vector& tuple_slot_descs, + const std::string& timezone) { try { parquet::ArrowReaderProperties arrow_reader_properties = parquet::default_arrow_reader_properties(); @@ -66,7 +60,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector auto reader_builder = parquet::arrow::FileReaderBuilder(); reader_builder.properties(arrow_reader_properties); - auto st = reader_builder.Open(_parquet); + auto st = reader_builder.Open(_arrow_file); if (!st.ok()) { LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString(); @@ -111,7 +105,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector _current_line_of_batch = 0; //save column type std::shared_ptr field_schema = _batch->schema(); - for (int i = 0; i < _parquet_column_ids.size(); i++) { + for (int i = 0; i < _include_column_ids.size(); i++) { std::shared_ptr field = field_schema->field(i); if (!field) { LOG(WARNING) << "Get field schema failed. Column order:" << i; @@ -131,14 +125,11 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector void ParquetReaderWrap::close() { _closed = true; _queue_writer_cond.notify_one(); - arrow::Status st = _parquet->Close(); - if (!st.ok()) { - LOG(WARNING) << "close parquet file error: " << st.ToString(); - } + ArrowReaderWrap::close(); } Status ParquetReaderWrap::size(int64_t* size) { - arrow::Result result = _parquet->GetSize(); + arrow::Result result = _arrow_file->GetSize(); if (result.ok()) { *size = result.ValueOrDie(); return Status::OK(); @@ -158,24 +149,6 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc return; } -Status ParquetReaderWrap::column_indices(const std::vector& tuple_slot_descs) { - _parquet_column_ids.clear(); - for (int i = 0; i < _num_of_columns_from_file; i++) { - auto slot_desc = tuple_slot_descs.at(i); - // Get the Column Reader for the boolean column - auto iter = _map_column.find(slot_desc->col_name()); - if (iter != _map_column.end()) { - _parquet_column_ids.emplace_back(iter->second); - } else { - std::stringstream str_error; - str_error << "Invalid Column Name:" << slot_desc->col_name(); - LOG(WARNING) << str_error.str(); - return Status::InvalidArgument(str_error.str()); - } - } - return Status::OK(); -} - inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc) { if (!slot_desc->is_nullable()) { std::stringstream str_error; @@ -188,8 +161,7 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript return Status::OK(); } -Status ParquetReaderWrap::read_record_batch(const std::vector& tuple_slot_descs, - bool* eof) { +Status ParquetReaderWrap::read_record_batch(bool* eof) { if (_current_line_of_group >= _rows_of_group) { // read next row group VLOG_DEBUG << "read_record_batch, current group id:" << _current_group << " current line of group:" << _current_line_of_group @@ -197,7 +169,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& << ". start to read next row group"; _current_group++; if (_current_group >= _total_groups) { // read completed. - _parquet_column_ids.clear(); + _include_column_ids.clear(); *eof = true; return Status::OK(); } @@ -219,11 +191,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& return Status::OK(); } -Status ParquetReaderWrap::next_batch(std::shared_ptr* batch, - const std::vector& tuple_slot_descs, - bool* eof) { +Status ParquetReaderWrap::next_batch(std::shared_ptr* batch, bool* eof) { if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) { - RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof)); + RETURN_IF_ERROR(read_record_batch(eof)); } *batch = get_batch(); return Status::OK(); @@ -281,7 +251,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& const uint8_t* value = nullptr; int column_index = 0; try { - size_t slots = _parquet_column_ids.size(); + size_t slots = _include_column_ids.size(); for (size_t i = 0; i < slots; ++i) { auto slot_desc = tuple_slot_descs[i]; column_index = i; // column index in batch record @@ -550,7 +520,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& // update data value ++_current_line_of_group; ++_current_line_of_batch; - return read_record_batch(tuple_slot_descs, eof); + return read_record_batch(eof); } void ParquetReaderWrap::prefetch_batch() { @@ -570,13 +540,13 @@ void ParquetReaderWrap::prefetch_batch() { if (_closed || current_group >= _total_groups) { return; } - _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch); + _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader); if (!_status.ok()) { _closed = true; return; } arrow::RecordBatchVector batches; - _status = _rb_batch->ReadAll(&batches); + _status = _rb_reader->ReadAll(&batches); if (!_status.ok()) { _closed = true; return; @@ -602,83 +572,4 @@ Status ParquetReaderWrap::read_next_batch() { return Status::OK(); } -ParquetFile::ParquetFile(FileReader* file) : _file(file) {} - -ParquetFile::~ParquetFile() { - arrow::Status st = Close(); - if (!st.ok()) { - LOG(WARNING) << "close parquet file error: " << st.ToString(); - } -} - -arrow::Status ParquetFile::Close() { - if (_file != nullptr) { - _file->close(); - delete _file; - _file = nullptr; - } - return arrow::Status::OK(); -} - -bool ParquetFile::closed() const { - if (_file != nullptr) { - return _file->closed(); - } else { - return true; - } -} - -arrow::Result ParquetFile::Read(int64_t nbytes, void* buffer) { - return ReadAt(_pos, nbytes, buffer); -} - -arrow::Result ParquetFile::ReadAt(int64_t position, int64_t nbytes, void* out) { - int64_t reads = 0; - int64_t bytes_read = 0; - _pos = position; - while (nbytes > 0) { - Status result = _file->readat(_pos, nbytes, &reads, out); - if (!result.ok()) { - bytes_read = 0; - return arrow::Status::IOError("Readat failed."); - } - if (reads == 0) { - break; - } - bytes_read += reads; // total read bytes - nbytes -= reads; // remained bytes - _pos += reads; - out = (char*)out + reads; - } - return bytes_read; -} - -arrow::Result ParquetFile::GetSize() { - return _file->size(); -} - -arrow::Status ParquetFile::Seek(int64_t position) { - _pos = position; - // NOTE: Only readat operation is used, so _file seek is not called here. - return arrow::Status::OK(); -} - -arrow::Result ParquetFile::Tell() const { - return _pos; -} - -arrow::Result> ParquetFile::Read(int64_t nbytes) { - auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool()); - ARROW_RETURN_NOT_OK(buffer); - std::shared_ptr read_buf = std::move(buffer.ValueOrDie()); - auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data()); - ARROW_RETURN_NOT_OK(bytes_read); - // If bytes_read is equal with read_buf's capacity, we just assign - if (bytes_read.ValueOrDie() == nbytes) { - return std::move(read_buf); - } else { - return arrow::SliceBuffer(read_buf, 0, bytes_read.ValueOrDie()); - } -} - } // namespace doris diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h similarity index 64% rename from be/src/exec/parquet_reader.h rename to be/src/exec/arrow/parquet_reader.h index 77b946b691..597dea2feb 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -40,6 +40,7 @@ #include "common/config.h" #include "common/status.h" +#include "exec/arrow/arrow_reader.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" @@ -55,46 +56,28 @@ class SlotDescriptor; class MemPool; class FileReader; -class ParquetFile : public arrow::io::RandomAccessFile { +// Reader of parquet file +class ParquetReaderWrap final : public ArrowReaderWrap { public: - ParquetFile(FileReader* file); - ~ParquetFile() override; - arrow::Result Read(int64_t nbytes, void* buffer) override; - arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override; - arrow::Result GetSize() override; - arrow::Status Seek(int64_t position) override; - arrow::Result> Read(int64_t nbytes) override; - arrow::Result Tell() const override; - arrow::Status Close() override; - bool closed() const override; - -private: - FileReader* _file; - int64_t _pos = 0; -}; - -// Reader of broker parquet file -class ParquetReaderWrap { -public: - ParquetReaderWrap(FileReader* file_reader, int32_t num_of_columns_from_file); - virtual ~ParquetReaderWrap(); + // batch_size is not use here + ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file); + ~ParquetReaderWrap() override = default; // Read Status read(Tuple* tuple, const std::vector& tuple_slot_descs, - MemPool* mem_pool, bool* eof); - void close(); - Status size(int64_t* size); - Status init_parquet_reader(const std::vector& tuple_slot_descs, - const std::string& timezone); - Status next_batch(std::shared_ptr* batch, - const std::vector& tuple_slot_descs, bool* eof); + MemPool* mem_pool, bool* eof) override; + Status size(int64_t* size) override; + Status init_reader(const std::vector& tuple_slot_descs, + const std::string& timezone) override; + Status next_batch(std::shared_ptr* batch, bool* eof) override; + void close() override; private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); - Status column_indices(const std::vector& tuple_slot_descs); Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); - Status read_record_batch(const std::vector& tuple_slot_descs, bool* eof); + Status read_record_batch(bool* eof); const std::shared_ptr& get_batch(); Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t* buf, int32_t* wbtyes); @@ -104,19 +87,11 @@ private: Status read_next_batch(); private: - const int32_t _num_of_columns_from_file; - std::shared_ptr _parquet; - // parquet file reader object - std::unique_ptr<::arrow::RecordBatchReader> _rb_batch; std::shared_ptr _batch; std::unique_ptr _reader; std::shared_ptr _file_metadata; - std::map _map_column; // column-name <---> column-index - std::vector _parquet_column_ids; std::vector _parquet_column_type; - int _total_groups; // groups in a parquet file - int _current_group; int _rows_of_group; // rows in a group. int _current_line_of_group; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index f2f3742cf8..a378c41203 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -33,6 +33,7 @@ #include "util/thread.h" #include "vec/exec/vbroker_scanner.h" #include "vec/exec/vjson_scanner.h" +#include "vec/exec/vorc_scanner.h" #include "vec/exec/vparquet_scanner.h" namespace doris { @@ -237,9 +238,15 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan } break; case TFileFormatType::FORMAT_ORC: - scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs, - counter); + if (_vectorized) { + scan = new vectorized::VORCScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } else { + scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } break; case TFileFormatType::FORMAT_JSON: if (_vectorized) { diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index c6cb02e8c2..880313b6f0 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -17,12 +17,12 @@ #include "exec/parquet_scanner.h" +#include "exec/arrow/parquet_reader.h" #include "exec/broker_reader.h" #include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/hdfs_reader_writer.h" #include "exec/local_file_reader.h" -#include "exec/parquet_reader.h" #include "exec/s3_reader.h" #include "exec/text_converter.h" #include "runtime/exec_env.h" @@ -141,14 +141,14 @@ Status ParquetScanner::open_next_reader() { file_reader->close(); continue; } + int32_t num_of_columns_from_file = _src_slot_descs.size(); if (range.__isset.num_of_columns_from_file) { - _cur_file_reader = - new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file); - } else { - _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _src_slot_descs.size()); + num_of_columns_from_file = range.num_of_columns_from_file; } + _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(), + num_of_columns_from_file); - Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs, _state->timezone()); + Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone()); if (status.is_end_of_file()) { continue; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index b9087990ca..28bfafedf6 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -101,6 +101,8 @@ static TFileFormatType::type parse_format(const std::string& format_str, } } else if (iequal(format_str, "PARQUET")) { format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + format_type = TFileFormatType::FORMAT_ORC; } return format_type; } diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 265d6fd884..49f92ddf79 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -77,6 +77,7 @@ set(VEC_FILES data_types/data_type_date.cpp data_types/data_type_date_time.cpp exec/vaggregation_node.cpp + exec/varrow_scanner.cpp exec/ves_http_scan_node.cpp exec/ves_http_scanner.cpp exec/volap_scan_node.cpp @@ -103,6 +104,7 @@ set(VEC_FILES exec/vbroker_scanner.cpp exec/vjson_scanner.cpp exec/vparquet_scanner.cpp + exec/vorc_scanner.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index 557b978c8e..2071b8cedb 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -260,9 +260,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { return nested; } -DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, bool is_nullable) { +DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool is_nullable) { DataTypePtr nested = nullptr; - switch (type) { + switch (type->id()) { case ::arrow::Type::BOOL: nested = std::make_shared(); break; @@ -310,10 +310,10 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::Type::type& type, boo nested = std::make_shared(); break; case ::arrow::Type::DECIMAL: - nested = std::make_shared>(27, 9); + nested = std::make_shared>(); break; default: - DCHECK(false) << "invalid arrow type:" << (int)type; + DCHECK(false) << "invalid arrow type:" << (int)(type->id()); break; } diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp index 3b667c6f72..968617414c 100644 --- a/be/src/vec/data_types/data_type_factory.hpp +++ b/be/src/vec/data_types/data_type_factory.hpp @@ -88,7 +88,7 @@ public: DataTypePtr create_data_type(const PColumnMeta& pcolumn); - DataTypePtr create_data_type(const arrow::Type::type& type, bool is_nullable); + DataTypePtr create_data_type(const arrow::DataType* type, bool is_nullable); private: DataTypePtr _create_primitive_data_type(const FieldType& type) const; diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp new file mode 100644 index 0000000000..b44475ce19 --- /dev/null +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/arrow/parquet_reader.h" +#include "exec/broker_reader.h" +#include "exec/buffered_reader.h" +#include "exec/hdfs_reader_writer.h" +#include "exec/local_file_reader.h" +#include "exec/s3_reader.h" +#include "exprs/expr.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/exec/vorc_scanner.h" +#include "vec/functions/simple_function_factory.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris::vectorized { + +VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), + // _splittable(params.splittable), + _cur_file_reader(nullptr), + _cur_file_eof(false), + _batch(nullptr), + _arrow_batch_cur_idx(0) {} + +VArrowScanner::~VArrowScanner() { + close(); +} + +Status VArrowScanner::_open_next_reader() { + // open_file_reader + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } + + while (true) { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + std::unique_ptr file_reader; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + file_reader.reset(new LocalFileReader(range.path, range.start_offset)); + break; + } + case TFileType::FILE_HDFS: { + FileReader* reader; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + range.start_offset, &reader)); + file_reader.reset(reader); + break; + } + case TFileType::FILE_BROKER: { + int64_t file_size = 0; + // for compatibility + if (range.__isset.file_size) { + file_size = range.file_size; + } + file_reader.reset(new BufferedReader( + _profile, + new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, + range.path, range.start_offset, file_size))); + break; + } + case TFileType::FILE_S3: { + file_reader.reset(new BufferedReader( + _profile, new S3Reader(_params.properties, range.path, range.start_offset))); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } + + int32_t num_of_columns_from_file = _src_slot_descs.size(); + if (range.__isset.num_of_columns_from_file) { + num_of_columns_from_file = range.num_of_columns_from_file; + } + _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file); + + Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone()); + + if (status.is_end_of_file()) { + continue; + } else { + if (!status.ok()) { + std::stringstream ss; + ss << " file: " << range.path << " error:" << status.get_error_msg(); + return Status::InternalError(ss.str()); + } else { + return status; + } + } + } +} + +Status VArrowScanner::open() { + RETURN_IF_ERROR(BaseScanner::open()); + if (_ranges.empty()) { + return Status::OK(); + } + return Status::OK(); +} + +// get next available arrow batch +Status VArrowScanner::_next_arrow_batch() { + _arrow_batch_cur_idx = 0; + // first, init file reader + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + } + // second, loop until find available arrow batch or EOF + while (!_scanner_eof) { + RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); + if (_cur_file_eof) { + RETURN_IF_ERROR(_open_next_reader()); + _cur_file_eof = false; + continue; + } + if (_batch->num_rows() == 0) { + continue; + } + return Status::OK(); + } + return Status::EndOfFile("EOF"); +} + +Status VArrowScanner::_init_arrow_batch_if_necessary() { + // 1. init batch if first time + // 2. reset reader if end of file + Status status; + if (_scanner_eof) { + return Status::EndOfFile("EOF"); + } + if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + return _next_arrow_batch(); + } + return status; +} + +Status VArrowScanner::_init_src_block() { + size_t batch_pos = 0; + _src_block.clear(); + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->column(batch_pos++).get(); + // let src column be nullable for simplify converting + // TODO, support not nullable for exec efficiently + auto is_nullable = true; + DataTypePtr data_type = + DataTypeFactory::instance().create_data_type(array->type().get(), is_nullable); + if (data_type == nullptr) { + return Status::NotSupported( + fmt::format("Not support arrow type:{}", array->type()->name())); + } + MutableColumnPtr data_column = data_type->create_column(); + _src_block.insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + return Status::OK(); +} + +Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) { + // overall of type converting: + // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> + // primitive type(PT1) ==materialize_block==> dest primitive type + + // first, we need to convert the arrow type to the corresponding internal type, + // such as arrow::INT16 to TYPE_SMALLINT(PT0). + // why need first step? we cannot convert the arrow type to type in src desc directly, + // it's too hard to achieve. + + // second, convert PT0 to the type in src desc, such as TYPE_SMALLINT to TYPE_VARCHAR.(PT1) + // why need second step? the materialize step only accepts types specified in src desc. + + // finally, through the materialized, convert to the type in dest desc, such as TYPE_DATETIME. + SCOPED_TIMER(_read_timer); + // init arrow batch + { + Status st = _init_arrow_batch_if_necessary(); + if (!st.ok()) { + if (!st.is_end_of_file()) { + return st; + } + *eof = true; + return Status::OK(); + } + } + + RETURN_IF_ERROR(_init_src_block()); + // convert arrow batch to block until reach the batch_size + while (!_scanner_eof) { + // cast arrow type to PT0 and append it to src block + // for example: arrow::Type::INT16 => TYPE_SMALLINT + RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block)); + // finalize the src block if full + if (_src_block.rows() >= _state->batch_size()) { + break; + } + auto status = _next_arrow_batch(); + // if ok, append the batch to the src columns + if (status.ok()) { + continue; + } + // return error if not EOF + if (!status.is_end_of_file()) { + return status; + } + _cur_file_eof = true; + break; + } + COUNTER_UPDATE(_rows_read_counter, _src_block.rows()); + SCOPED_TIMER(_materialize_timer); + // cast PT0 => PT1 + // for example: TYPE_SMALLINT => TYPE_VARCHAR + RETURN_IF_ERROR(_cast_src_block(&_src_block)); + + // materialize, src block => dest columns + return _fill_dest_block(block, eof); +} + +// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> +// primitive type(PT1) ==materialize_block==> dest primitive type +Status VArrowScanner::_cast_src_block(Block* block) { + // cast primitive type(PT0) to primitive type(PT1) + for (size_t i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto& arg = block->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + ColumnsWithTypeAndName arguments { + arg, + {DataTypeString().create_column_const( + arg.column->size(), remove_nullable(return_type)->get_family_name()), + std::make_shared(), ""}}; + auto func_cast = + SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); + RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); + block->get_by_position(i).type = std::move(return_type); + } + return Status::OK(); +} + +Status VArrowScanner::_append_batch_to_src_block(Block* block) { + size_t num_elements = std::min((_state->batch_size() - block->rows()), + (_batch->num_rows() - _arrow_batch_cur_idx)); + size_t column_pos = 0; + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->column(column_pos++).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx, + column_with_type_and_name, num_elements, + _state->timezone())); + } + + _arrow_batch_cur_idx += num_elements; + return Status::OK(); +} + +void VArrowScanner::close() { + BaseScanner::close(); + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + } +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h new file mode 100644 index 0000000000..c3740e0d3e --- /dev/null +++ b/be/src/vec/exec/varrow_scanner.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "gen_cpp/Types_types.h" +#include "runtime/mem_pool.h" +#include "util/runtime_profile.h" + +namespace doris::vectorized { + +// VArrow scanner convert the data read from orc|parquet to doris's columns. +class VArrowScanner : public BaseScanner { +public: + VArrowScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + virtual ~VArrowScanner(); + + // Open this scanner, will initialize information need to + virtual Status open() override; + + virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, + bool* fill_tuple) override { + return Status::NotSupported("Not Implemented get next"); + } + + virtual Status get_next(Block* block, bool* eof) override; + + virtual void close() override; + +protected: + virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) = 0; + +private: + // Read next buffer from reader + Status _open_next_reader(); + Status _next_arrow_batch(); + Status _init_arrow_batch_if_necessary(); + Status _init_src_block(); + Status _append_batch_to_src_block(Block* block); + Status _cast_src_block(Block* block); + +private: + // Reader + ArrowReaderWrap* _cur_file_reader; + bool _cur_file_eof; // is read over? + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp new file mode 100644 index 0000000000..7521634183 --- /dev/null +++ b/be/src/vec/exec/vorc_scanner.cpp @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vorc_scanner.h" + +#include + +namespace doris::vectorized { + +VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, + counter) {} + +ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h new file mode 100644 index 0000000000..12510e9731 --- /dev/null +++ b/be/src/vec/exec/vorc_scanner.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "gen_cpp/Types_types.h" +#include "runtime/mem_pool.h" +#include "util/runtime_profile.h" + +namespace doris::vectorized { + +// VOrc scanner convert the data read from Orc to doris's columns. +class VORCScanner final : public VArrowScanner { +public: + VORCScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VORCScanner() override = default; + +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 037bc15028..cb59ae60bc 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -17,13 +17,7 @@ #include "vec/exec/vparquet_scanner.h" -#include "exec/parquet_reader.h" -#include "exprs/expr.h" -#include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "vec/data_types/data_type_factory.hpp" -#include "vec/functions/simple_function_factory.h" -#include "vec/utils/arrow_column_to_doris_column.h" +#include "exec/arrow/parquet_reader.h" namespace doris::vectorized { @@ -33,185 +27,12 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter) - : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, - counter), - _batch(nullptr), - _arrow_batch_cur_idx(0) {} + : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, + counter) {} -VParquetScanner::~VParquetScanner() = default; - -Status VParquetScanner::open() { - RETURN_IF_ERROR(ParquetScanner::open()); - if (_ranges.empty()) { - return Status::OK(); - } - return Status::OK(); -} - -// get next available arrow batch -Status VParquetScanner::_next_arrow_batch() { - _arrow_batch_cur_idx = 0; - // first, init file reader - if (_cur_file_reader == nullptr || _cur_file_eof) { - RETURN_IF_ERROR(open_next_reader()); - _cur_file_eof = false; - } - // second, loop until find available arrow batch or EOF - while (!_scanner_eof) { - RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof)); - if (_cur_file_eof) { - RETURN_IF_ERROR(open_next_reader()); - _cur_file_eof = false; - continue; - } - if (_batch->num_rows() == 0) { - continue; - } - return Status::OK(); - } - return Status::EndOfFile("EOF"); -} - -Status VParquetScanner::_init_arrow_batch_if_necessary() { - // 1. init batch if first time - // 2. reset reader if end of file - Status status; - if (_scanner_eof) { - return Status::EndOfFile("EOF"); - } - if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - return _next_arrow_batch(); - } - return status; -} - -Status VParquetScanner::_init_src_block() { - size_t batch_pos = 0; - _src_block.clear(); - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto* array = _batch->column(batch_pos++).get(); - // let src column be nullable for simplify converting - // TODO, support not nullable for exec efficiently - auto is_nullable = true; - DataTypePtr data_type = - DataTypeFactory::instance().create_data_type(array->type()->id(), is_nullable); - if (data_type == nullptr) { - return Status::NotSupported( - fmt::format("Not support arrow type:{}", array->type()->name())); - } - MutableColumnPtr data_column = data_type->create_column(); - _src_block.insert( - ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); - } - return Status::OK(); -} - -Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) { - // overall of type converting: - // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> - // primitive type(PT1) ==materialize_block==> dest primitive type - - // first, we need to convert the arrow type to the corresponding internal type, - // such as arrow::INT16 to TYPE_SMALLINT(PT0). - // why need first step? we cannot convert the arrow type to type in src desc directly, - // it's too hard to achieve. - - // second, convert PT0 to the type in src desc, such as TYPE_SMALLINT to TYPE_VARCHAR.(PT1) - // why need second step? the materialize step only accepts types specified in src desc. - - // finally, through the materialized, convert to the type in dest desc, such as TYPE_DATETIME. - SCOPED_TIMER(_read_timer); - // init arrow batch - { - Status st = _init_arrow_batch_if_necessary(); - if (!st.ok()) { - if (!st.is_end_of_file()) { - return st; - } - *eof = true; - return Status::OK(); - } - } - - RETURN_IF_ERROR(_init_src_block()); - // convert arrow batch to block until reach the batch_size - while (!_scanner_eof) { - // cast arrow type to PT0 and append it to src block - // for example: arrow::Type::INT16 => TYPE_SMALLINT - RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block)); - // finalize the src block if full - if (_src_block.rows() >= _state->batch_size()) { - break; - } - auto status = _next_arrow_batch(); - // if ok, append the batch to the src columns - if (status.ok()) { - continue; - } - // return error if not EOF - if (!status.is_end_of_file()) { - return status; - } - _cur_file_eof = true; - break; - } - COUNTER_UPDATE(_rows_read_counter, _src_block.rows()); - SCOPED_TIMER(_materialize_timer); - // cast PT0 => PT1 - // for example: TYPE_SMALLINT => TYPE_VARCHAR - RETURN_IF_ERROR(_cast_src_block(&_src_block)); - - // materialize, src block => dest columns - return _fill_dest_block(block, eof); -} - -// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> -// primitive type(PT1) ==materialize_block==> dest primitive type -Status VParquetScanner::_cast_src_block(Block* block) { - // cast primitive type(PT0) to primitive type(PT1) - for (size_t i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto& arg = block->get_by_name(slot_desc->col_name()); - // remove nullable here, let the get_function decide whether nullable - auto return_type = slot_desc->get_data_type_ptr(); - ColumnsWithTypeAndName arguments { - arg, - {DataTypeString().create_column_const( - arg.column->size(), remove_nullable(return_type)->get_family_name()), - std::make_shared(), ""}}; - auto func_cast = - SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); - RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); - block->get_by_position(i).type = std::move(return_type); - } - return Status::OK(); -} - -Status VParquetScanner::_append_batch_to_src_block(Block* block) { - size_t num_elements = std::min((_state->batch_size() - block->rows()), - (_batch->num_rows() - _arrow_batch_cur_idx)); - size_t column_pos = 0; - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto* array = _batch->column(column_pos++).get(); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx, - column_with_type_and_name, num_elements, - _state->timezone())); - } - - _arrow_batch_cur_idx += num_elements; - return Status::OK(); +ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index 72ac280989..367e2e7472 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -18,7 +18,7 @@ #pragma once #include -#include +#include #include #include @@ -36,7 +36,7 @@ namespace doris::vectorized { // VParquet scanner convert the data read from Parquet to doris's columns. -class VParquetScanner : public ParquetScanner { +class VParquetScanner final : public VArrowScanner { public: VParquetScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, @@ -44,23 +44,11 @@ public: const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - ~VParquetScanner() override; + ~VParquetScanner() override = default; - // Open this scanner, will initialize information need to - Status open() override; - - Status get_next(Block* block, bool* eof) override; - -private: - Status _next_arrow_batch(); - Status _init_arrow_batch_if_necessary(); - Status _init_src_block() override; - Status _append_batch_to_src_block(Block* block); - Status _cast_src_block(Block* block); - -private: - std::shared_ptr _batch; - size_t _arrow_batch_cur_idx; +protected: + ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + int32_t num_of_columns_from_file) override; }; } // namespace doris::vectorized diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 20052e823e..48d37e56a1 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -338,6 +338,8 @@ set(VEC_TEST_FILES vec/exec/vbroker_scanner_test.cpp vec/exec/vjson_scanner_test.cpp vec/exec/vtablet_sink_test.cpp + vec/exec/vorc_scanner_test.cpp + vec/exec/vparquet_scanner_test.cpp vec/exprs/vexpr_test.cpp vec/function/function_array_element_test.cpp vec/function/function_array_index_test.cpp diff --git a/be/test/olap/hll_test.cpp b/be/test/olap/hll_test.cpp index 3131efdeb2..5843a076cb 100644 --- a/be/test/olap/hll_test.cpp +++ b/be/test/olap/hll_test.cpp @@ -34,7 +34,7 @@ static uint64_t hash(uint64_t value) { } // keep logic same with java version in fe when you change hll_test.cpp,see HllTest.java TEST_F(TestHll, Normal) { - uint8_t buf[HLL_REGISTERS_COUNT + 1]; + uint8_t buf[HLL_REGISTERS_COUNT + 1] = {0}; // empty { diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp new file mode 100644 index 0000000000..f5f8bf522c --- /dev/null +++ b/be/test/vec/exec/vorc_scanner_test.cpp @@ -0,0 +1,892 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vorc_scanner.h" + +#include +#include +#include + +#include +#include +#include + +#include "common/object_pool.h" +#include "exec/local_file_reader.h" +#include "exec/orc_scanner.h" +#include "exprs/cast_functions.h" +#include "exprs/decimalv2_operators.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/descriptors.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" +#include "runtime/user_function_cache.h" +#include "vec/exec/vbroker_scan_node.h" + +namespace doris { +namespace vectorized { + +class VOrcScannerTest : public testing::Test { +public: + VOrcScannerTest() : _runtime_state(TQueryGlobals()) { + _profile = _runtime_state.runtime_profile(); + _runtime_state._instance_mem_tracker.reset(new MemTracker()); + _runtime_state._query_options.enable_vectorized_engine = true; + } + ~VOrcScannerTest() {} + + static void SetUpTestCase() { + UserFunctionCache::instance()->init( + "./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + DecimalV2Operators::init(); + } + +protected: + virtual void SetUp() {} + + virtual void TearDown() {} + +private: + RuntimeState _runtime_state; + RuntimeProfile* _profile; + ObjectPool _obj_pool; + DescriptorTbl* _desc_tbl; + std::vector _addresses; + ScannerCounter _counter; + std::vector _pre_filter; + bool _fill_tuple; +}; + +TEST_F(VOrcScannerTest, normal) { + TBrokerScanRangeParams params; + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + + TTypeDesc big_int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + big_int_type.types.push_back(node); + } + + TTypeDesc float_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::FLOAT); + node.__set_scalar_type(scalar_type); + float_type.types.push_back(node); + } + + TTypeDesc double_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DOUBLE); + node.__set_scalar_type(scalar_type); + double_type.types.push_back(node); + } + + TTypeDesc date_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DATE); + node.__set_scalar_type(scalar_type); + date_type.types.push_back(node); + } + + //col1 varchar -> bigint + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = big_int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttobigint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = big_int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 0; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(8, expr); + params.src_slot_ids.push_back(0); + } + //col2, col3 + for (int i = 1; i <= 2; i++) { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = i; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(8 + i, expr); + params.src_slot_ids.push_back(i); + } + + //col5 varchar -> double + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = double_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodouble"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = double_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 3; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(11, expr); + params.src_slot_ids.push_back(3); + } + + //col6 varchar -> float + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = float_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttofloat"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = float_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_float_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 4; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(12, expr); + params.src_slot_ids.push_back(4); + } + //col7,col8 + for (int i = 5; i <= 6; i++) { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttoint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = i; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(8 + i, expr); + params.src_slot_ids.push_back(i); + } + + //col9 varchar -> var + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 7; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(15, expr); + params.src_slot_ids.push_back(7); + } + + params.__set_src_tuple_id(0); + params.__set_dest_tuple_id(1); + + //init_desc_table + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder src_tuple_builder; + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col1") + .column_pos(1) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col2") + .column_pos(2) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col3") + .column_pos(3) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col5") + .column_pos(4) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col6") + .column_pos(5) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col7") + .column_pos(6) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col8") + .column_pos(7) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col9") + .column_pos(8) + .build()); + src_tuple_builder.build(&dtb); + + TTupleDescriptorBuilder dest_tuple_builder; + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("col1").column_pos(1).build()); + dest_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col2") + .column_pos(2) + .build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().string_type(65535).column_name("col3").column_pos(3).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DOUBLE).column_name("col5").column_pos(4).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_FLOAT).column_name("col6").column_pos(5).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("col7").column_pos(6).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("col8").column_pos(7).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().string_type(65535).column_name("col9").column_pos(8).build()); + dest_tuple_builder.build(&dtb); + t_desc_table = dtb.desc_tbl(); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + _runtime_state.set_desc_tbl(_desc_tbl); + + std::vector ranges; + TBrokerRangeDesc rangeDesc; + rangeDesc.start_offset = 0; + rangeDesc.size = -1; + rangeDesc.format_type = TFileFormatType::FORMAT_ORC; + rangeDesc.splittable = false; + + rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc"; + rangeDesc.file_type = TFileType::FILE_LOCAL; + ranges.push_back(rangeDesc); + + VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter, + &_counter); + EXPECT_TRUE(scanner.open().ok()); + + //auto tracker = std::make_shared(); + //MemPool tuple_pool(tracker.get()); + + //Tuple* tuple = (Tuple*)tuple_pool.allocate(_desc_tbl->get_tuple_descriptor(1)->byte_size()); + vectorized::Block block; + bool eof = false; + + EXPECT_TRUE(scanner.get_next(&block, &eof).ok()); + EXPECT_TRUE(eof); + EXPECT_TRUE(scanner.get_next(&block, &eof).ok()); + EXPECT_TRUE(eof); + scanner.close(); +} + +TEST_F(VOrcScannerTest, normal2) { + TBrokerScanRangeParams params; + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 1; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(3, expr); + params.src_slot_ids.push_back(0); + params.src_slot_ids.push_back(1); + params.src_slot_ids.push_back(2); + } + params.__set_src_tuple_id(0); + params.__set_dest_tuple_id(1); + + //init_desc_table + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder src_tuple_builder; + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col1") + .column_pos(1) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col2") + .column_pos(2) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col3") + .column_pos(3) + .build()); + src_tuple_builder.build(&dtb); + TTupleDescriptorBuilder dest_tuple_builder; + dest_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .column_name("value_from_col2") + .column_pos(1) + .build()); + + dest_tuple_builder.build(&dtb); + t_desc_table = dtb.desc_tbl(); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + _runtime_state.set_desc_tbl(_desc_tbl); + + std::vector ranges; + TBrokerRangeDesc rangeDesc; + rangeDesc.start_offset = 0; + rangeDesc.size = -1; + rangeDesc.format_type = TFileFormatType::FORMAT_ORC; + rangeDesc.splittable = false; + + rangeDesc.path = "./be/test/exec/test_data/orc_scanner/my-file.orc"; + rangeDesc.file_type = TFileType::FILE_LOCAL; + ranges.push_back(rangeDesc); + + VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter, + &_counter); + EXPECT_TRUE(scanner.open().ok()); + + bool eof = false; + vectorized::Block block; + EXPECT_TRUE(scanner.get_next(&block, &eof).ok()); + EXPECT_EQ(10, block.rows()); + EXPECT_TRUE(eof); + scanner.close(); +} + +TEST_F(VOrcScannerTest, normal3) { + TBrokerScanRangeParams params; + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + TTypeDesc decimal_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + scalar_type.__set_precision(64); + scalar_type.__set_scale(64); + node.__set_scalar_type(scalar_type); + decimal_type.types.push_back(node); + } + + TTypeDesc tinyint_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::TINYINT); + node.__set_scalar_type(scalar_type); + tinyint_type.types.push_back(node); + } + + TTypeDesc datetime_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DATETIME); + node.__set_scalar_type(scalar_type); + datetime_type.types.push_back(node); + } + + TTypeDesc date_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DATE); + node.__set_scalar_type(scalar_type); + date_type.types.push_back(node); + } + + { + for (int i = 0; i < 5; ++i) { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = decimal_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimalv2"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = decimal_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = i; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(9 + i, expr); + params.src_slot_ids.push_back(i); + } + + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = tinyint_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttotinyint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = tinyint_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("cast_to_tiny_int_val(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_tiny_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 5; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(14, expr); + params.src_slot_ids.push_back(5); + } + + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = datetime_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodatetime"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = datetime_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("cast_to_datetime_val(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_datetime_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 6; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(15, expr); + params.src_slot_ids.push_back(6); + } + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = date_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodate"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = date_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_date_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 7; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(16, expr); + params.src_slot_ids.push_back(7); + } + { + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = decimal_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimalv2"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = decimal_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("cast_to_decimalv2_val(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = 8; + slot_ref.slot_ref.tuple_id = 0; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + params.expr_of_dest_slot.emplace(17, expr); + params.src_slot_ids.push_back(8); + } + } + params.__set_src_tuple_id(0); + params.__set_dest_tuple_id(1); + + //init_desc_table + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder src_tuple_builder; + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col1") + .column_pos(1) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col2") + .column_pos(2) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col3") + .column_pos(3) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col4") + .column_pos(4) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col5") + .column_pos(5) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col6") + .column_pos(6) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col7") + .column_pos(7) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col8") + .column_pos(8) + .build()); + src_tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(65535) + .nullable(true) + .column_name("col9") + .column_pos(9) + .build()); + src_tuple_builder.build(&dtb); + + TTupleDescriptorBuilder dest_tuple_builder; + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col1").column_pos(1).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(7, 5).column_name("col2").column_pos(2).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(10, 9).column_name("col3").column_pos(3).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col4").column_pos(4).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(10, 5).column_name("col5").column_pos(5).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("col6").column_pos(6).build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("col7").column_pos(7).build()); + dest_tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_DATE) + .nullable(true) + .column_name("col8") + .column_pos(8) + .build()); + dest_tuple_builder.add_slot( + TSlotDescriptorBuilder().decimal_type(27, 9).column_name("col9").column_pos(9).build()); + + dest_tuple_builder.build(&dtb); + t_desc_table = dtb.desc_tbl(); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + _runtime_state.set_desc_tbl(_desc_tbl); + + std::vector ranges; + TBrokerRangeDesc rangeDesc; + rangeDesc.start_offset = 0; + rangeDesc.size = -1; + rangeDesc.format_type = TFileFormatType::FORMAT_ORC; + rangeDesc.splittable = false; + + rangeDesc.path = "./be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc"; + rangeDesc.file_type = TFileType::FILE_LOCAL; + ranges.push_back(rangeDesc); + + VORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, _pre_filter, + &_counter); + EXPECT_TRUE(scanner.open().ok()); + + bool eof = false; + vectorized::Block block; + EXPECT_TRUE(scanner.get_next(&block, &eof).ok()); + EXPECT_EQ(1, block.rows()); + EXPECT_TRUE(eof); + scanner.close(); +} + +} // namespace vectorized +} // namespace doris diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp new file mode 100644 index 0000000000..ba8f70ce70 --- /dev/null +++ b/be/test/vec/exec/vparquet_scanner_test.cpp @@ -0,0 +1,499 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include +#include +#include + +#include "common/object_pool.h" +#include "exec/local_file_reader.h" +#include "exprs/cast_functions.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/descriptors.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" +#include "runtime/user_function_cache.h" +#include "vec/exec/vbroker_scan_node.h" + +namespace doris { +namespace vectorized { + +class VParquetScannerTest : public testing::Test { +public: + VParquetScannerTest() : _runtime_state(TQueryGlobals()) { + init(); + _runtime_state._instance_mem_tracker.reset(new MemTracker()); + _runtime_state._query_options.enable_vectorized_engine = true; + } + ~VParquetScannerTest() {} + void init(); + static void SetUpTestCase() { + UserFunctionCache::instance()->init( + "./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + } + +protected: + virtual void SetUp() {} + virtual void TearDown() {} + +private: + int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + void create_expr_info(); + void init_desc_table(); + RuntimeState _runtime_state; + ObjectPool _obj_pool; + std::map _slots_map; + TBrokerScanRangeParams _params; + DescriptorTbl* _desc_tbl; + TPlanNode _tnode; +}; + +#define TUPLE_ID_DST 0 +#define TUPLE_ID_SRC 1 +#define COLUMN_NUMBERS 20 +#define DST_TUPLE_SLOT_ID_START 1 +#define SRC_TUPLE_SLOT_ID_START 21 +int VParquetScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + const char* columnNames[] = { + "log_version", "log_time", "log_time_stamp", "js_version", + "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", + "device_resolution", "page_url", "page_refer_url", "page_yyid", + "page_type", "pos_type", "content_id", "media_id", + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; + for (int i = 0; i < COLUMN_NUMBERS; i++) { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value + slot_desc.byteOffset = i * 16 + 8; + slot_desc.nullIndicatorByte = i / 8; + slot_desc.nullIndicatorBit = i % 8; + slot_desc.colName = columnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + { + // TTupleDescriptor source + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_SRC; + //Here 8 bytes in order to handle null values + t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +int VParquetScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + int32_t byteOffset = + 8; // Skip the first 8 bytes These 8 bytes are used to indicate whether the field is a null value + { //log_version + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 0; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 0; + slot_desc.colName = "log_version"; + slot_desc.slotIdx = 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // log_time + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT64 + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 1; + slot_desc.colName = "log_time"; + slot_desc.slotIdx = 2; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 8; + { // log_time_stamp + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); //parquet::Type::INT32 + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 2; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 2; + slot_desc.colName = "log_time_stamp"; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 8; + const char* columnNames[] = { + "log_version", "log_time", "log_time_stamp", "js_version", + "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", + "device_resolution", "page_url", "page_refer_url", "page_yyid", + "page_type", "pos_type", "content_id", "media_id", + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; + for (int i = 3; i < COLUMN_NUMBERS; i++, byteOffset += 16) { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); //parquet::Type::BYTE + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = i / 8; + slot_desc.nullIndicatorBit = i % 8; + slot_desc.colName = columnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_DST; + t_tuple_desc.byteSize = byteOffset + 8; //Here 8 bytes in order to handle null values + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +void VParquetScannerTest::init_desc_table() { + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + int next_slot_id = 1; + + next_slot_id = create_dst_tuple(t_desc_table, next_slot_id); + + next_slot_id = create_src_tuple(t_desc_table, next_slot_id); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + + _runtime_state.set_desc_tbl(_desc_tbl); +} + +void VParquetScannerTest::create_expr_info() { + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + // log_version VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // log_time id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START); + } + // log_time VARCHAR --> BIGINT + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttoint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // log_time id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1); + } + // log_time_stamp VARCHAR --> BIGINT + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttoint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2); + } + // couldn't convert type + for (int i = 3; i < COLUMN_NUMBERS; i++) { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + i; // log_time id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + i, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + i); + } + + // _params.__isset.expr_of_dest_slot = true; + _params.__set_dest_tuple_id(TUPLE_ID_DST); + _params.__set_src_tuple_id(TUPLE_ID_SRC); +} + +void VParquetScannerTest::init() { + create_expr_info(); + init_desc_table(); + + // Node Id + _tnode.node_id = 0; + _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; + _tnode.num_children = 0; + _tnode.limit = -1; + _tnode.row_tuples.push_back(0); + _tnode.nullable_tuples.push_back(false); + _tnode.broker_scan_node.tuple_id = 0; + _tnode.__isset.broker_scan_node = true; +} + +TEST_F(VParquetScannerTest, normal) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_PARQUET; + range.splittable = true; + + std::vector columns_from_path {"value"}; + range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(19); +#if 1 + range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet"; + range.file_type = TFileType::FILE_LOCAL; +#else + range.path = "hdfs://ip:8020/user/xxxx.parq"; + range.file_type = TFileType::FILE_BROKER; + TNetworkAddress addr; + addr.__set_hostname("127.0.0.1"); + addr.__set_port(8000); + broker_scan_range.broker_addresses.push_back(addr); +#endif + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // Get block + vectorized::Block block; + bool eof = false; + for (int i = 0; i < 14; i++) { + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2048, block.rows()); + EXPECT_FALSE(eof); + block.clear(); + } + + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(1328, block.rows()); + EXPECT_TRUE(eof); + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(0, block.rows()); + EXPECT_TRUE(eof); + + scan_node.close(&_runtime_state); + { + std::stringstream ss; + scan_node.runtime_profile()->pretty_print(&ss); + LOG(INFO) << ss.str(); + } +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index d29ae4bf8e..6b96aa271c 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -360,9 +360,12 @@ build_gtest() { # rapidjson build_rapidjson() { check_if_source_exist $RAPIDJSON_SOURCE - - rm -rf $TP_INSTALL_DIR/rapidjson - cp -r $TP_SOURCE_DIR/$RAPIDJSON_SOURCE/include/rapidjson $TP_INCLUDE_DIR/ + cd $TP_SOURCE_DIR/$RAPIDJSON_SOURCE + mkdir -p $BUILD_DIR && cd $BUILD_DIR + rm -rf CMakeCache.txt CMakeFiles/ + ${CMAKE_CMD} ../ -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR -DRAPIDJSON_BUILD_DOC=OFF \ + -DRAPIDJSON_BUILD_EXAMPLES=OFF -DRAPIDJSON_BUILD_TESTS=OFF + make -j $PARALLEL && make install } # snappy @@ -373,7 +376,7 @@ build_snappy() { mkdir -p $BUILD_DIR && cd $BUILD_DIR rm -rf CMakeCache.txt CMakeFiles/ CFLAGS="-O3" CXXFLAGS="-O3" ${CMAKE_CMD} -G "${GENERATOR}" -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \ - -DCMAKE_POSITION_INDEPENDENT_CODE=On \ + -DCMAKE_POSITION_INDEPENDENT_CODE=ON \ -DCMAKE_INSTALL_INCLUDEDIR=$TP_INCLUDE_DIR/snappy \ -DSNAPPY_BUILD_TESTS=0 ../ ${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install @@ -643,20 +646,23 @@ build_arrow() { export ARROW_SNAPPY_URL=${TP_SOURCE_DIR}/${SNAPPY_NAME} export ARROW_ZLIB_URL=${TP_SOURCE_DIR}/${ZLIB_NAME} export ARROW_XSIMD_URL=${TP_SOURCE_DIR}/${XSIMD_NAME} + export ARROW_ORC_URL=${TP_SOURCE_DIR}/${ORC_NAME} LDFLAGS="-L${TP_LIB_DIR} -static-libstdc++ -static-libgcc" \ ${CMAKE_CMD} -G "${GENERATOR}" -DARROW_PARQUET=ON -DARROW_IPC=ON -DARROW_BUILD_SHARED=OFF \ -DARROW_BUILD_STATIC=ON -DARROW_WITH_BROTLI=ON -DARROW_WITH_LZ4=ON -DARROW_USE_GLOG=ON \ -DARROW_WITH_SNAPPY=ON -DARROW_WITH_ZLIB=ON -DARROW_WITH_ZSTD=ON -DARROW_JSON=ON \ - -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=OFF \ + -DARROW_WITH_UTF8PROC=OFF -DARROW_WITH_RE2=ON -DARROW_ORC=ON \ -DCMAKE_INSTALL_PREFIX=$TP_INSTALL_DIR \ -DCMAKE_INSTALL_LIBDIR=lib64 \ -DARROW_BOOST_USE_SHARED=OFF \ -DARROW_GFLAGS_USE_SHARED=OFF \ -Dgflags_ROOT=$TP_INSTALL_DIR \ -DGLOG_ROOT=$TP_INSTALL_DIR \ + -DRE2_ROOT=$TP_INSTALL_DIR \ -DZLIB_LIBRARY=$TP_INSTALL_DIR/lib/libz.a -DZLIB_INCLUDE_DIR=$TP_INSTALL_DIR/include \ -DRapidJSON_ROOT=$TP_INSTALL_DIR \ + -DORC_ROOT=$TP_INSTALL_DIR \ -DBrotli_SOURCE=BUNDLED \ -DLZ4_LIB=$TP_INSTALL_DIR/lib/liblz4.a -DLZ4_INCLUDE_DIR=$TP_INSTALL_DIR/include/lz4 \ -DLz4_SOURCE=SYSTEM \ @@ -664,7 +670,6 @@ build_arrow() { -Dzstd_SOURCE=SYSTEM \ -DSnappy_LIB=$TP_INSTALL_DIR/lib/libsnappy.a -DSnappy_INCLUDE_DIR=$TP_INSTALL_DIR/include \ -DSnappy_SOURCE=SYSTEM \ - -DBoost_INCLUDE_DIR=$TP_INSTALL_DIR/include \ -DThrift_ROOT=$TP_INSTALL_DIR .. ${BUILD_SYSTEM} -j $PARALLEL && ${BUILD_SYSTEM} install @@ -1011,6 +1016,7 @@ build_rocksdb build_cyrus_sasl build_librdkafka build_flatbuffers +build_orc build_arrow build_s2 build_bitshuffle @@ -1019,7 +1025,6 @@ build_fmt build_parallel_hashmap build_pdqsort build_libdivide -build_orc build_cctz build_tsan_header build_mysql