diff --git a/deps/init/oceanbase.el7.aarch64.deps b/deps/init/oceanbase.el7.aarch64.deps index 0d7a5fa21..01601d065 100644 --- a/deps/init/oceanbase.el7.aarch64.deps +++ b/deps/init/oceanbase.el7.aarch64.deps @@ -33,6 +33,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.aarch64.rpm devdeps-protobuf-c-1.4.1-100000072023102410.el7.aarch64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.aarch64.rpm devdeps-apache-arrow-9.0.0-302024052920.el7.aarch64.rpm +devdeps-apache-orc-1.8.3-202024072510.el7.aarch64.rpm [tools] obdevtools-binutils-2.30-12022100413.el7.aarch64.rpm diff --git a/deps/init/oceanbase.el7.x86_64.deps b/deps/init/oceanbase.el7.x86_64.deps index 168f4725f..1f3f350de 100644 --- a/deps/init/oceanbase.el7.x86_64.deps +++ b/deps/init/oceanbase.el7.x86_64.deps @@ -36,6 +36,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.x86_64.rpm devdeps-protobuf-c-1.4.1-100000062023102016.el7.x86_64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.x86_64.rpm devdeps-apache-arrow-9.0.0-222024052223.el7.x86_64.rpm +devdeps-apache-orc-1.8.3-202024072510.el7.x86_64.rpm [tools] obdevtools-binutils-2.30-12022100413.el7.x86_64.rpm diff --git a/deps/init/oceanbase.el8.aarch64.deps b/deps/init/oceanbase.el8.aarch64.deps index 25f5f5b46..7f32f9b7f 100644 --- a/deps/init/oceanbase.el8.aarch64.deps +++ b/deps/init/oceanbase.el8.aarch64.deps @@ -33,6 +33,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm devdeps-apache-arrow-9.0.0-322024052923.el8.aarch64.rpm +devdeps-apache-orc-1.8.3-202024072510.el8.aarch64.rpm [tools] obdevtools-binutils-2.30-12022100413.el8.aarch64.rpm diff --git a/deps/init/oceanbase.el8.x86_64.deps b/deps/init/oceanbase.el8.x86_64.deps index 78c1eeb55..0373426cd 100644 --- a/deps/init/oceanbase.el8.x86_64.deps +++ b/deps/init/oceanbase.el8.x86_64.deps @@ -35,6 +35,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm +devdeps-apache-orc-1.8.3-202024072510.el8.x86_64.rpm [tools] obdevtools-binutils-2.30-12022100413.el8.x86_64.rpm diff --git a/deps/init/oceanbase.el9.aarch64.deps b/deps/init/oceanbase.el9.aarch64.deps index f7616be6e..dc3f98ec1 100644 --- a/deps/init/oceanbase.el9.aarch64.deps +++ b/deps/init/oceanbase.el9.aarch64.deps @@ -37,6 +37,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm devdeps-apache-arrow-9.0.0-322024052923.el8.aarch64.rpm +devdeps-apache-orc-1.8.3-202024072510.el8.aarch64.rpm [deps-el9] devdeps-apr-1.6.5-232023090616.el9.aarch64.rpm target=el9 diff --git a/deps/init/oceanbase.el9.x86_64.deps b/deps/init/oceanbase.el9.x86_64.deps index c4dcc9b53..e049cb055 100644 --- a/deps/init/oceanbase.el9.x86_64.deps +++ b/deps/init/oceanbase.el9.x86_64.deps @@ -39,6 +39,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm +devdeps-apache-orc-1.8.3-202024072510.el8.x86_64.rpm [deps-el9] devdeps-apr-1.6.5-232023090616.el9.x86_64.rpm target=el9 diff --git a/deps/oblib/src/CMakeLists.txt b/deps/oblib/src/CMakeLists.txt index 540df4b68..93e6171aa 100644 --- a/deps/oblib/src/CMakeLists.txt +++ b/deps/oblib/src/CMakeLists.txt @@ -21,6 +21,7 @@ target_include_directories( ${DEP_DIR}/include/apr-1/ ${DEP_DIR}/include/icu/common ${DEP_DIR}/include/apache-arrow + ${DEP_DIR}/include/apache-orc ${USSL_INCLUDE_DIRS} ) @@ -208,6 +209,12 @@ target_link_libraries(oblib_base_base_base ${DEP_DIR}/lib64/libarrow.a ${DEP_DIR}/lib64/libparquet.a ${DEP_DIR}/lib64/libarrow_bundled_dependencies.a + ${DEP_DIR}/lib64/liborc.a + ${DEP_DIR}/lib64/libsnappy.a + ${DEP_DIR}/lib64/libprotoc.a + ${DEP_DIR}/lib64/libprotobuf.a + ${DEP_DIR}/lib64/liblz4.a + ${DEP_DIR}/lib64/libzstd.a -L${DEP_DIR}/var/usr/lib64 -L${DEP_DIR}/var/usr/lib -L${DEP_3RD_DIR}/usr/lib @@ -237,6 +244,12 @@ target_link_libraries(oblib_base_base_base ${DEP_DIR}/lib64/libarrow.a ${DEP_DIR}/lib64/libparquet.a ${DEP_DIR}/lib64/libarrow_bundled_dependencies.a + ${DEP_DIR}/lib64/liborc.a + ${DEP_DIR}/lib64/libsnappy.a + ${DEP_DIR}/lib64/libprotoc.a + ${DEP_DIR}/lib64/libprotobuf.a + ${DEP_DIR}/lib64/liblz4.a + ${DEP_DIR}/lib64/libzstd.a -L${DEP_DIR}/var/usr/lib64 -L${DEP_DIR}/var/usr/lib -L${DEP_3RD_DIR}/usr/lib diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index 601238a7c..a7bb13cba 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -6,6 +6,7 @@ # oblib_addtest(time/test_ob_time_utility.cpp) # oblib_addtest(timezone/test_ob_timezone_utils.cpp) oblib_addtest(parquet/test_parquet.cpp) +oblib_addtest(orc/test_orc.cpp) oblib_addtest(alloc/test_alloc_struct.cpp) oblib_addtest(alloc/test_block_set.cpp) oblib_addtest(alloc/test_chunk_mgr.cpp) diff --git a/deps/oblib/unittest/lib/orc/test_orc.cpp b/deps/oblib/unittest/lib/orc/test_orc.cpp new file mode 100644 index 000000000..39ae5fb16 --- /dev/null +++ b/deps/oblib/unittest/lib/orc/test_orc.cpp @@ -0,0 +1,603 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX SQL + +#include "gtest/gtest.h" +#include "lib/oblog/ob_log.h" +#include "lib/oblog/ob_log_module.h" + +#include +#include +#include +#include +#include + +#include +#include + + +#include "lib/allocator/page_arena.h" +#include "lib/file/ob_file.h" +#include "lib/file/file_directory_utils.h" +#include "lib/charset/ob_template_helper.h" +#include "lib/net/ob_net_util.h" + +#define USING_LOG_PREFIX SQL + +using namespace oceanbase::common; + +class TestOrc: public ::testing::Test +{ +public: + TestOrc() {}; + virtual ~TestOrc() {}; + virtual void SetUp(); + virtual void TearDown(); +}; + + + +void TestOrc::SetUp() +{ +} + +void TestOrc::TearDown() +{ +} + +class OrcMemoryPool : public ::orc::MemoryPool +{ + public: + virtual char* malloc(uint64_t size) override { + return (char* )alloc_.alloc(size); + } + virtual void free(char* p) override { + //do nothing + } + private: + oceanbase::common::ObArenaAllocator alloc_; +}; + + +// Result> GetOrcType(const DataType& type) { +// Type::type kind = type.id(); +// switch (kind) { +// case Type::type::BOOL: +// return liborc::createPrimitiveType(liborc::TypeKind::BOOLEAN); +// case Type::type::INT8: +// return liborc::createPrimitiveType(liborc::TypeKind::BYTE); +// case Type::type::INT16: +// return liborc::createPrimitiveType(liborc::TypeKind::SHORT); +// case Type::type::INT32: +// return liborc::createPrimitiveType(liborc::TypeKind::INT); +// case Type::type::INT64: +// return liborc::createPrimitiveType(liborc::TypeKind::LONG); +// case Type::type::FLOAT: +// return liborc::createPrimitiveType(liborc::TypeKind::FLOAT); +// case Type::type::DOUBLE: +// return liborc::createPrimitiveType(liborc::TypeKind::DOUBLE); +// // Use STRING instead of VARCHAR for now, both use UTF-8 +// case Type::type::STRING: +// case Type::type::LARGE_STRING: +// return liborc::createPrimitiveType(liborc::TypeKind::STRING); +// case Type::type::BINARY: +// case Type::type::LARGE_BINARY: +// case Type::type::FIXED_SIZE_BINARY: +// return liborc::createPrimitiveType(liborc::TypeKind::BINARY); +// case Type::type::DATE32: +// return liborc::createPrimitiveType(liborc::TypeKind::DATE); +// case Type::type::DATE64: +// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP); +// case Type::type::TIMESTAMP: { +// const auto& timestamp_type = checked_cast(type); +// if (!timestamp_type.timezone().empty()) { +// // The timestamp values stored in the arrow array are normalized to UTC. +// // TIMESTAMP_INSTANT type is always preferred over TIMESTAMP type. +// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP_INSTANT); +// } +// // The timestamp values stored in the arrow array can be in any timezone. +// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP); +// } +// case Type::type::DECIMAL128: { +// const uint64_t precision = +// static_cast(checked_cast(type).precision()); +// const uint64_t scale = +// static_cast(checked_cast(type).scale()); +// return liborc::createDecimalType(precision, scale); +// } +// case Type::type::LIST: +// case Type::type::FIXED_SIZE_LIST: +// case Type::type::LARGE_LIST: { +// const auto& value_field = checked_cast(type).value_field(); +// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*value_field->type())); +// SetAttributes(value_field, orc_subtype.get()); +// return liborc::createListType(std::move(orc_subtype)); +// } +// case Type::type::STRUCT: { +// std::unique_ptr out_type = liborc::createStructType(); +// std::vector> arrow_fields = +// checked_cast(type).fields(); +// for (auto it = arrow_fields.begin(); it != arrow_fields.end(); ++it) { +// std::string field_name = (*it)->name(); +// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*(*it)->type())); +// SetAttributes(*it, orc_subtype.get()); +// out_type->addStructField(field_name, std::move(orc_subtype)); +// } +// return out_type; +// } +// case Type::type::MAP: { +// const auto& key_field = checked_cast(type).key_field(); +// const auto& item_field = checked_cast(type).item_field(); +// ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type())); +// ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type())); +// SetAttributes(key_field, key_orc_type.get()); +// SetAttributes(item_field, item_orc_type.get()); +// return liborc::createMapType(std::move(key_orc_type), std::move(item_orc_type)); +// } +// case Type::type::DENSE_UNION: +// case Type::type::SPARSE_UNION: { +// std::unique_ptr out_type = liborc::createUnionType(); +// std::vector> arrow_fields = +// checked_cast(type).fields(); +// for (const auto& arrow_field : arrow_fields) { +// std::shared_ptr arrow_child_type = arrow_field->type(); +// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type)); +// SetAttributes(arrow_field, orc_subtype.get()); +// out_type->addUnionChild(std::move(orc_subtype)); +// } +// return out_type; +// } +// default: { +// return Status::NotImplemented("Unknown or unsupported Arrow type: ", +// type.ToString()); +// } +// } +// } + + // class ObOrcRandomAccess : public :orc::InputStream { + // public: + // ObOrcRandomAccess(oceanbase::sql::ObExternalDataAccessDriver &file_reader, const char* file_name, orc::MemoryPool *pool) + // : file_reader_(file_reader), file_name_(file_name), pool_(pool) { + + // } + + // uint64_t getLength() const override { + // return totalLength; + // } + + // uint64_t getNaturalReadSize() const override { + // return 128 * 1024; + // } + + // void read(void* buf, + // uint64_t length, + // uint64_t offset) override { + // int64_t bytesRead = 0; + // int ret = file_reader_.pread(buf, length, offset, bytesRead); + // totalLength += bytesRead; + // if (ret != OB_SUCCESS) { + // throw orc::ParseError("Bad read of " + std::string(file_name_)); + // } + // } + + // const std::string& getName() const override { + // return file_name_; + // } + // private: + // oceanbase::sql::ObExternalDataAccessDriver &file_reader_; + // const std::string file_name_; + // orc::MemoryPool *pool_; + // uint64_t totalLength; + // }; + + +void wirte_orc_file() { + std::unique_ptr outStream = orc::writeLocalFile("my-file.orc"); + //std::unique_ptr schema(orc::Type::buildTypeFromString("struct")); + std::unique_ptr schema = orc::createStructType(); + schema->addStructField("x", orc::createPrimitiveType(orc::TypeKind::INT)); + schema->addStructField("y", orc::createPrimitiveType(orc::TypeKind::BOOLEAN)); + std::unique_ptr sub_schema = orc::createStructType(); + sub_schema->addStructField("z", orc::createPrimitiveType(orc::TypeKind::FLOAT)); + sub_schema->addStructField("d", orc::createPrimitiveType(orc::TypeKind::DATE)); + schema->addStructField("S2", std::move(sub_schema)); + orc::WriterOptions options; + OrcMemoryPool pool; + options.setMemoryPool(&pool); + options.setCompression(orc::CompressionKind::CompressionKind_ZLIB); + std::unique_ptr writer = orc::createWriter(*schema, outStream.get(), options); + + uint64_t batchSize = 8, rowCount = 100; + std::unique_ptr batch = + writer->createRowBatch(batchSize); + orc::StructVectorBatch *root = + dynamic_cast(batch.get()); + orc::LongVectorBatch *x = + dynamic_cast(root->fields[0]); + orc::LongVectorBatch *y = + dynamic_cast(root->fields[1]); + orc::DoubleVectorBatch *z = + dynamic_cast(dynamic_cast(root->fields[2])->fields[0]); + + uint64_t rows = 0; + for (uint64_t i = 0; i < rowCount; ++i) { + if (i % 5 == 0) { + x->notNull[rows] = 0; + y->notNull[rows] = 0; + z->notNull[rows] = 0; + x->hasNulls = true; + y->hasNulls = true; + z->hasNulls = true; + x->data[rows] = 0; + y->data[rows] = 0; + z->data[rows] = i * 1.1 + 0.01; + rows++; + } else { + x->notNull[rows] = true; + y->notNull[rows] = true; + z->notNull[rows] = true; + x->data[rows] = i + 1; + y->data[rows] = i * 3 + 1; + z->data[rows] = i * 1.1 + 0.01; + rows++; + } + + + if (rows == batchSize) { + root->numElements = rows; + x->numElements = rows; + y->numElements = rows; + z->numElements = rows; + + writer->add(*batch); + rows = 0; + } + } + + if (rows != 0) { + root->numElements = rows; + x->numElements = rows; + y->numElements = rows; + //z->numElements = rows; + + writer->add(*batch); + rows = 0; + } + + writer->close(); +} + +void read_orc_file() { + std::unique_ptr inStream = orc::readLocalFile("my-file.orc"); + orc::ReaderOptions options; + OrcMemoryPool pool; + options.setMemoryPool(pool); + std::unique_ptr reader = orc::createReader(std::move(inStream), options); + + orc::RowReaderOptions rowReaderOptions; + std::unique_ptr rowReader = reader->createRowReader(rowReaderOptions); + std::unique_ptr batch = rowReader->createRowBatch(2); + + //std::cout <<"root field size: " << root->fields.size() << std::endl; + while (rowReader->next(*batch)) { + std::cout<<"column batch:" << batch->toString() <<"\n"; + for (uint64_t r = 0; r < batch->numElements; ++r) { + orc::StructVectorBatch *root = + dynamic_cast(batch.get()); + orc::ColumnVectorBatch *col[10] = {NULL}; + int k = 0; + std::cout<< "row:" << r ; + for (int i = 0; i < reader->getType().getSubtypeCount(); i++) { + const uint8_t* valid_bytes = NULL; + switch (reader->getType().getSubtype(i)->getKind()) { + case orc::TypeKind::BOOLEAN: + case orc::TypeKind::BYTE: + case orc::TypeKind::SHORT: + case orc::TypeKind::INT: + case orc::TypeKind::LONG: + case orc::TypeKind::DATE: + //valid_bytes = reinterpret_cast( dynamic_cast(root->fields[i])->notNull.data()) + r; + std::cout<<" col" << i <<":" << dynamic_cast(root->fields[i])->data[r] << " is not null:"<< bool(root->fields[i]->notNull[r]); + break; + case orc::TypeKind::FLOAT: + case orc::TypeKind::DOUBLE: + std::cout<< " col" << i <<":" << dynamic_cast(root->fields[i])->data[r] ; + std::cout<<" has NULL:"<<(root->fields[i])->hasNulls<<" is not null:"<< bool(root->fields[i]->notNull[r]); + break; + case orc::TypeKind::STRING: + case orc::TypeKind::VARCHAR: + case orc::TypeKind::CHAR: + case orc::TypeKind::BINARY: + std::cout<< " col" << i <<":" << dynamic_cast(root->fields[i])->data[r] <<" is not null:"<< bool(root->fields[i]->notNull[r]); + break; + case orc::TypeKind::LIST: + case orc::TypeKind::MAP: + case orc::TypeKind::UNION: + //not supported + break; + case orc::TypeKind::DECIMAL: + //std::cout<< "row:" << r << " col" << i <<":" << dynamic_cast(root->fields[i])->data[r] <<" is not null:"<< (bool)dynamic_cast(root->fields[i])->notNull[r]; + break; + case orc::TypeKind::TIMESTAMP: + case orc::TypeKind::TIMESTAMP_INSTANT: + //std::cout<< "row:" << r << " col" << i <<":" << dynamic_cast(root->fields[i])->data[r] <<" is not null:"<< (bool)dynamic_cast(root->fields[i])->notNull[r]; + break; + case orc::TypeKind::STRUCT: + std::cout<< " col" << i <<":" << dynamic_cast(dynamic_cast(root->fields[i])->fields[0])->data[r]; + break; + default: + //error + break; + } + // if (reader->getType().getColumnId() == reader->getType().getMaximumColumnId()) {//is primitive + // col[k++] = x; + // } + } + std::cout<<"\n"; + + } + } +} + +void printType(const orc::Type &type) { + std::cout << " type:" << type.toString() <<" type subTypeCount:" << type.getSubtypeCount()<< " typeKind: " << type.getKind() << " ColumnId:" << type.getColumnId() << " maxColumnId:" << type.getMaximumColumnId() << "\n"; + for (int i = 0; i < type.getSubtypeCount(); i++) { + printType(*type.getSubtype(i)); + } +} + + +bool is_primitive_Type(const orc::Type &type) { + if (type.getColumnId() == type.getMaximumColumnId()) { + return true; + } + return false; +} +void read_file_footer() { + std::unique_ptr inStream = orc::readLocalFile("my-file.orc"); + orc::ReaderOptions options; + OrcMemoryPool pool; + //options.setMemoryPool(pool); + std::unique_ptr reader = orc::createReader(std::move(inStream), options); + const orc::Type& type = reader->getType(); + printType(type); + for (int i = 0; i < reader->getType().getSubtypeCount(); i++) { + std::cout << "Subfield" << i << ": " << type.getFieldName(i); + printType(*type.getSubtype(i)); + } + for (int i = 0; i <= reader->getType().getMaximumColumnId(); i++) { + std::cout<<"column" <getColumnStatistics(i)->toString() <<"\n"; + } +} + +// Result> GetArrowSchema(const liborc::Type& type) { +// if (type.getKind() != liborc::STRUCT) { +// return Status::NotImplemented( +// "Only ORC files with a top-level struct " +// "can be handled"); +// } +// int size = static_cast(type.getSubtypeCount()); +// std::vector> fields; +// fields.reserve(size); +// for (int child = 0; child < size; ++child) { +// const std::string& name = type.getFieldName(child); +// ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField(name, type.getSubtype(child))); +// fields.push_back(std::move(elem_field)); +// } +// ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata()); +// return std::make_shared(std::move(fields), std::move(metadata)); +// } + +// Result> ReadMetadata() { +// const std::list keys = reader_->getMetadataKeys(); +// auto metadata = std::make_shared(); +// for (const auto& key : keys) { +// metadata->Append(key, reader_->getMetadataValue(key)); +// } +// return std::const_pointer_cast(metadata); +// } + +void read_column() { + std::cout<<"=================== test read column ===================\n"; + std::unique_ptr inStream = orc::readLocalFile("my-file.orc"); + orc::ReaderOptions options; + // OrcMemoryPool pool; + // options.setMemoryPool(pool); + std::list include_names_list; + include_names_list.push_front(std::string("y")); + include_names_list.push_front(std::string("S2.z")); + include_names_list.push_front(std::string("x")); + std::unique_ptr reader = orc::createReader(std::move(inStream), options); + orc::RowReaderOptions rowReaderOptions; + rowReaderOptions.include(include_names_list); + std::unique_ptr rowReader = reader->createRowReader(rowReaderOptions); + std::unique_ptr batch = rowReader->createRowBatch(1024); + std::cout<<"column batch:" << batch->toString() <<"\n"; + printType(rowReader->getSelectedType()); + const orc::Type &type = rowReader->getSelectedType(); + int size = static_cast(type.getSubtypeCount()); + for (int child = 0; child < size; ++child) { + const std::string& name = type.getFieldName(child); + std::cout<<"field "<< child << " name:" << name < keys = reader->getMetadataKeys(); + // auto metadata = std::make_shared(); + // for (const auto& key : keys) { + // metadata->Append(key, reader_->getMetadataValue(key)); + // } + // const orc::Type& type = reader->getType(); + // printType(type); + // for (int i = 0; i < reader->getType().getSubtypeCount(); i++) { + // std::cout << "Subfield" << i << ": " << type.getFieldName(i); + // printType(*type.getSubtype(i)); + // } + // for (int i = 0; i <= reader->getType().getMaximumColumnId(); i++) { + // std::cout<<"column" <getColumnStatistics(i)->toString() <<"\n"; + // } + +} + +void read_schema(std::unique_ptr &rowReader) { + const orc::Type &type = rowReader->getSelectedType(); + if (type.getKind() != orc::STRUCT) { + throw std::runtime_error("Only ORC files with a top-level struct can be handled"); + } else { + + } +} + +void read_stripe() { + // int64_t nstripes = reader_->getNumberOfStripes(); + // stripes_.resize(static_cast(nstripes)); + // std::unique_ptr stripe; + // uint64_t first_row_of_stripe = 0; + // for (int i = 0; i < nstripes; ++i) { + // stripe = reader_->getStripe(i); + // stripes_[i] = StripeInformation({static_cast(stripe->getOffset()), + // static_cast(stripe->getLength()), + // static_cast(stripe->getNumberOfRows()), + // static_cast(first_row_of_stripe)}); + // first_row_of_stripe += stripe->getNumberOfRows(); + // } +} + +void generate_orc_file(const bool output_null) { + std::unique_ptr outStream = orc::writeLocalFile("test1.orc"); + //std::unique_ptr schema(orc::Type::buildTypeFromString("struct")); + //std::unique_ptr schema = orc::createStructType(); + // schema->addStructField("age", orc::createPrimitiveType(orc::TypeKind::INT)) + // ->addStructField("is_male", orc::createPrimitiveType(orc::TypeKind::BOOLEAN)) + // ->addStructField("name", orc::createPrimitiveType(orc::TypeKind::Varchar)) + // ->addStructField("comment", orc::createPrimitiveType(orc::TypeKind::String)); + // std::unique_ptr schema(Type::buildTypeFromString(" + // struct")); + // std::unique_ptr sub_schema = orc::createStructType(); + // sub_schema->addStructField("z", orc::createPrimitiveType(orc::TypeKind::FLOAT)); + // schema->addStructField("S2", std::move(sub_schema)); + // orc::WriterOptions options; + // OrcMemoryPool pool; + // options.setCompression(orc::CompressionKind::CompressionKind_SNAPPY); + // //options.setStripeSize(1024); + // options.setCompressionBlockSize(1024); + // options.setMemoryPool(&pool); + // std::unique_ptr writer = orc::createWriter(*schema, outStream.get(), options); + + // uint64_t batchSize = 128, rowCount = 12800; + // std::unique_ptr batch = writer->createRowBatch(batchSize); + // orc::StructVectorBatch *root = dynamic_cast(batch.get()); + + // uint64_t rows = 0; + // for (uint64_t i = 0; i < rowCount; ++i) { + // if (output_null) { + // int put_null = std::rand() % 10; + // if (put_null == 1) { + + // } + // } + + // if (i % 5 == 0) { + // x->notNull[rows] = 0; + // y->notNull[rows] = 0; + // z->notNull[rows] = 0; + // x->hasNulls = true; + // y->hasNulls = true; + // z->hasNulls = true; + // x->data[rows] = 0; + // y->data[rows] = 0; + // z->data[rows] = i * 1.1 + 0.01; + // rows++; + // } else { + // x->notNull[rows] = true; + // y->notNull[rows] = true; + // z->notNull[rows] = true; + // x->data[rows] = i + 1; + // y->data[rows] = i * 3 + 1; + // z->data[rows] = i * 1.1 + 0.01; + // rows++; + // } + + + // if (rows == batchSize) { + // root->numElements = rows; + // x->numElements = rows; + // y->numElements = rows; + // z->numElements = rows; + + // writer->add(*batch); + // rows = 0; + // } + // } + + // if (rows != 0) { + // root->numElements = rows; + // x->numElements = rows; + // y->numElements = rows; + // //z->numElements = rows; + + // writer->add(*batch); + // rows = 0; + // } + + // writer->close(); +} + +void read_test_orc_file() +{ + std::cout<<"=================== test read orc file column ===================\n"; + std::unique_ptr inStream = orc::readLocalFile("/data/1/mingye.swj/work/support_master_orc/data/t.orc"); + orc::ReaderOptions options; + // OrcMemoryPool pool; + // options.setMemoryPool(pool); + // std::list include_names_list; + // include_names_list.push_front(std::string("S2.z")); + // include_names_list.push_front(std::string("x")); + std::unique_ptr reader = orc::createReader(std::move(inStream), options); + orc::RowReaderOptions rowReaderOptions; + //rowReaderOptions.include(include_names_list); + std::unique_ptr rowReader = reader->createRowReader(rowReaderOptions); + std::unique_ptr batch = rowReader->createRowBatch(256); + while (rowReader->next(*batch)) { + std::cout<<"column batch:" << batch->toString() <<"\n"; + for (uint64_t r = 0; r < batch->numElements; ++r) { + orc::StructVectorBatch *root = + dynamic_cast(batch.get()); + std::cout<<" row" << r <<":" << dynamic_cast(root->fields[0])->data[r] << " is not null:"<< bool(root->fields[0]->notNull[r]); + } + } + printType(rowReader->getSelectedType()); + const orc::Type &type = rowReader->getSelectedType(); + int size = static_cast(type.getSubtypeCount()); + for (int child = 0; child < size; ++child) { + const std::string& name = type.getFieldName(child); + printType(*type.getSubtype(child)); + } +} + +TEST_F(TestOrc, read_write_orc_file_test) +{ + wirte_orc_file(); + read_orc_file(); + read_file_footer(); + read_column(); + //read_test_orc_file(); +} + +int main(int argc, char **argv) +{ + OB_LOGGER.set_log_level("INFO"); + testing::InitGoogleTest(&argc,argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index dcccf6446..f5fee5e8d 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -889,6 +889,7 @@ ob_set_subtarget(ob_sql engine_table engine/table/ob_index_lookup_op_impl.cpp engine/table/ob_table_scan_with_index_back_op.cpp engine/table/ob_external_table_access_service.cpp + engine/table/ob_orc_table_row_iter.cpp engine/table/ob_parquet_table_row_iter.cpp ) diff --git a/src/sql/engine/cmd/ob_load_data_parser.cpp b/src/sql/engine/cmd/ob_load_data_parser.cpp index 7a2f6b54c..32038712a 100644 --- a/src/sql/engine/cmd/ob_load_data_parser.cpp +++ b/src/sql/engine/cmd/ob_load_data_parser.cpp @@ -32,10 +32,9 @@ const char INVALID_TERM_CHAR = '\xff'; const char * ObExternalFileFormat::FORMAT_TYPE_STR[] = { "CSV", "PARQUET", + "ORC", }; - -static_assert(array_elements(ObExternalFileFormat::FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT, - "Not enough initializer for ObExternalFileFormat"); +static_assert(array_elements(ObExternalFileFormat::FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT, "Not enough initializer for ObExternalFileFormat"); int ObCSVGeneralFormat::init_format(const ObDataInFileStruct &format, int64_t file_column_nums, @@ -391,7 +390,7 @@ int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const J_OBJ_START(); - databuff_print_kv(buf, buf_len, pos, "\"TYPE\"", is_valid_format ? FORMAT_TYPE_STR[format_type_] : "INVALID"); + databuff_print_kv(buf, buf_len, pos, "\"TYPE\"", is_valid_format ? ObExternalFileFormat::FORMAT_TYPE_STR[format_type_] : "INVALID"); switch (format_type_) { case CSV_FORMAT: @@ -434,8 +433,8 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al LOG_WARN("unexpected json format", K(ret), K(str)); } else { ObString format_type_str = format_type_node->value_->get_string(); - for (int i = 0; i < array_elements(FORMAT_TYPE_STR); ++i) { - if (format_type_str.case_compare(FORMAT_TYPE_STR[i]) == 0) { + for (int i = 0; i < array_elements(ObExternalFileFormat::FORMAT_TYPE_STR); ++i) { + if (format_type_str.case_compare(ObExternalFileFormat::FORMAT_TYPE_STR[i]) == 0) { format_type_ = static_cast(i); break; } @@ -447,6 +446,7 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al OZ (origin_file_format_str_.load_from_json_data(format_type_node, allocator)); break; case PARQUET_FORMAT: + case ORC_FORMAT: break; default: ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/engine/cmd/ob_load_data_parser.h b/src/sql/engine/cmd/ob_load_data_parser.h index bf422b472..d8ef54a21 100644 --- a/src/sql/engine/cmd/ob_load_data_parser.h +++ b/src/sql/engine/cmd/ob_load_data_parser.h @@ -524,6 +524,7 @@ struct ObExternalFileFormat INVALID_FORMAT = -1, CSV_FORMAT, PARQUET_FORMAT, + ORC_FORMAT, MAX_FORMAT }; @@ -543,7 +544,6 @@ struct ObExternalFileFormat sql::ObCSVGeneralFormat csv_format_; ObLoadCompressionFormat compression_format_; uint64_t options_; - static const char *FORMAT_TYPE_STR[]; }; diff --git a/src/sql/engine/table/ob_external_table_access_service.cpp b/src/sql/engine/table/ob_external_table_access_service.cpp index 673b3b707..937c24eeb 100644 --- a/src/sql/engine/table/ob_external_table_access_service.cpp +++ b/src/sql/engine/table/ob_external_table_access_service.cpp @@ -23,6 +23,7 @@ #include "lib/utility/ob_macro_utils.h" #include "sql/engine/table/ob_parquet_table_row_iter.h" #include "sql/engine/cmd/ob_load_data_file_reader.h" +#include "sql/engine/table/ob_orc_table_row_iter.h" namespace oceanbase { @@ -558,13 +559,18 @@ int ObExternalTableAccessService::table_scan( LOG_WARN("alloc memory failed", K(ret)); } break; - case ObExternalFileFormat::PARQUET_FORMAT: if (OB_ISNULL(row_iter = OB_NEWx(ObParquetTableRowIterator, (scan_param.allocator_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc memory failed", K(ret)); } break; + case ObExternalFileFormat::ORC_FORMAT: + if (OB_ISNULL(row_iter = OB_NEWx(ObOrcTableRowIterator, (scan_param.allocator_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory failed", K(ret)); + } + break; default: ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected format", K(ret), "format", param.external_file_format_.format_type_); @@ -594,6 +600,7 @@ int ObExternalTableAccessService::table_rescan(ObVTableScanParam ¶m, ObNewRo switch (param.external_file_format_.format_type_) { case ObExternalFileFormat::CSV_FORMAT: case ObExternalFileFormat::PARQUET_FORMAT: + case ObExternalFileFormat::ORC_FORMAT: result->reset(); break; default: diff --git a/src/sql/engine/table/ob_orc_table_row_iter.cpp b/src/sql/engine/table/ob_orc_table_row_iter.cpp new file mode 100644 index 000000000..c1b4996a6 --- /dev/null +++ b/src/sql/engine/table/ob_orc_table_row_iter.cpp @@ -0,0 +1,1382 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + #define USING_LOG_PREFIX SQL_ENG +#include "ob_orc_table_row_iter.h" +#include "sql/engine/expr/ob_expr_get_path.h" +#include "share/external_table/ob_external_table_utils.h" +#include "sql/engine/expr/ob_datum_cast.h" +#include "sql/engine/ob_exec_context.h" + + + +namespace oceanbase +{ +using namespace share::schema; +using namespace common; +using namespace share; +namespace sql { + +int ObOrcTableRowIterator::to_dot_column_path(ObIArray &col_names, ObString &path) +{ + int ret = OB_SUCCESS; + ObSqlString tmp_string; + for (int64_t i = 0; OB_SUCC(ret) && i < col_names.count(); i++) { + if (i > 0) { + OZ (tmp_string.append(".")); + } + OZ (tmp_string.append(col_names.at(i))); + } + OZ (ob_write_string(allocator_, tmp_string.string(), path)); + return ret; +} + + +/** + * Recurses over a type tree and build two maps + * map, map + */ +int ObOrcTableRowIterator::build_type_name_id_map(const orc::Type* type, ObIArray &col_names) +{ + int ret = OB_SUCCESS; + CK (type != nullptr); + OZ (id_to_type_.set_refactored(type->getColumnId(), type, 0)); + if (OB_FAIL(ret)) { + } else if (orc::TypeKind::STRUCT == type->getKind()) { + for (size_t i = 0; OB_SUCC(ret) && i < type->getSubtypeCount(); ++i) { + const std::string& cpp_field_name = type->getFieldName(i); + ObString field_name; + OZ (ob_write_string(allocator_, ObString(cpp_field_name.c_str()), field_name)); + OZ (col_names.push_back(field_name)); + ObString path; + OZ (to_dot_column_path(col_names, path)); + OZ (name_to_id_.set_refactored(path, type->getSubtype(i)->getColumnId(), 0)); + OZ (build_type_name_id_map(type->getSubtype(i), col_names)); + if (OB_FAIL(ret)) { + } else if (col_names.count() > 0) { + col_names.pop_back(); + } + } + } else { + // other non-primitive type + for (size_t j = 0; OB_SUCC(ret) && j < type->getSubtypeCount(); ++j) { + OZ (build_type_name_id_map(type->getSubtype(j), col_names)); + } + } + return ret; +} + +int ObOrcTableRowIterator::init(const storage::ObTableScanParam *scan_param) +{ + int ret = OB_SUCCESS; + + CK (scan_param != nullptr); + CK (scan_param->op_ != nullptr); + CK (scan_param->ext_column_convert_exprs_ != nullptr); + if (OB_SUCC(ret)) { + ObEvalCtx &eval_ctx = scan_param->op_->get_eval_ctx(); + int64_t column_cnt = scan_param->ext_column_convert_exprs_->count(); + mem_attr_ = ObMemAttr(MTL_ID(), "OrcRowIter"); + allocator_.set_attr(mem_attr_); + orc_alloc_.init(MTL_ID()); + OZ (id_to_type_.create(512, mem_attr_)); + OZ (name_to_id_.create(512, mem_attr_)); + OZ (ObExternalTableRowIterator::init(scan_param)); + OZ (data_access_driver_.init(scan_param->external_file_location_, + scan_param->external_file_access_info_)); + + if (OB_SUCC(ret) && OB_ISNULL(bit_vector_cache_)) { + void *mem = nullptr; + if (OB_ISNULL(mem = allocator_.alloc(ObBitVector::memory_size(eval_ctx.max_batch_size_)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc memory for skip", K(ret), K(eval_ctx.max_batch_size_)); + } else { + bit_vector_cache_ = to_bit_vector(mem); + bit_vector_cache_->reset(eval_ctx.max_batch_size_); + } + } + + ObArray file_column_exprs; + ObArray file_meta_column_exprs; + for (int i = 0; OB_SUCC(ret) && i < scan_param->ext_file_column_exprs_->count(); i++) { + ObExpr* ext_file_column_expr = scan_param->ext_file_column_exprs_->at(i); + if (OB_ISNULL(ext_file_column_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ptr", K(ret)); + } else if (ext_file_column_expr->type_ == T_PSEUDO_EXTERNAL_FILE_URL + || ext_file_column_expr->type_ == T_PSEUDO_PARTITION_LIST_COL) { + OZ (file_meta_column_exprs.push_back(ext_file_column_expr)); + } else if (ext_file_column_expr->type_ == T_PSEUDO_EXTERNAL_FILE_COL) { + OZ (file_column_exprs.push_back(ext_file_column_expr)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected expr", KPC(ext_file_column_expr)); + } + } + OZ (file_column_exprs_.assign(file_column_exprs)); + OZ (file_meta_column_exprs_.assign(file_meta_column_exprs)); + + if (OB_SUCC(ret) && file_column_exprs_.count() > 0) { + OZ (column_indexs_.allocate_array(allocator_, file_column_exprs_.count())); + //OZ (column_readers_.allocate_array(allocator_, file_column_exprs_.count())); + OZ (load_funcs_.allocate_array(allocator_, file_column_exprs_.count())); + } + + if (OB_SUCC(ret)) { + OZ (file_url_ptrs_.allocate_array(allocator_, eval_ctx.max_batch_size_)); + OZ (file_url_lens_.allocate_array(allocator_, eval_ctx.max_batch_size_)); + } + } + return ret; +} + + +int ObOrcTableRowIterator::next_stripe() +{ + int ret = OB_SUCCESS; + //init all meta + if (state_.cur_stripe_idx_ > state_.end_stripe_idx_) { + if (OB_FAIL(next_file())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next srtipe", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + int64_t cur_stripe = (state_.cur_stripe_idx_++) - 1; + CK (cur_stripe < stripes_.count()); + if (OB_SUCC(ret)) { + LOG_TRACE("show current stripe info", K(stripes_.at(cur_stripe))); + try { + // for (int i = 0; OB_SUCC(ret) && i < column_readers_.count(); i++) { + // if (column_readers_.at(i)) { + // column_readers_.at(i)->seekToRow(stripes_.at(cur_stripe).first_row_id); + // } else { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("column reader is null", K(ret)); + // } + // } + if (row_reader_) { + row_reader_->seekToRow(stripes_.at(cur_stripe).first_row_id); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column reader is null", K(ret)); + } + state_.cur_stripe_read_row_count_ = 0; + state_.cur_stripe_row_count_ = stripes_.at(cur_stripe).num_rows; + } catch(const std::exception& e) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret), "Info", e.what(), K(cur_stripe), K(column_indexs_)); + } catch(...) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret), K(cur_stripe), K(column_indexs_)); + } + } + } + return ret; +} + +int ObOrcTableRowIterator::next_file() +{ + int ret = OB_SUCCESS; + CK (scan_param_ != nullptr); + CK (scan_param_->op_ != nullptr); + if (OB_SUCC(ret)) { + ObEvalCtx &eval_ctx = scan_param_->op_->get_eval_ctx(); + ObString location = scan_param_->external_file_location_; + int64_t task_idx = 0; + int64_t file_size = 0; + if (data_access_driver_.is_opened()) { + data_access_driver_.close(); + } + + do { + if ((task_idx = state_.file_idx_++) >= scan_param_->key_ranges_.count()) { + ret = OB_ITER_END; + } else { + state_.cur_file_url_ = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::FILE_URL].get_string(); + url_.reuse(); + const char *split_char = "/"; + OZ (url_.append_fmt("%.*s%s%.*s", location.length(), location.ptr(), + (location.empty() || location[location.length() - 1] == '/') ? "" : split_char, + state_.cur_file_url_.length(), state_.cur_file_url_.ptr())); + OZ (data_access_driver_.get_file_size(url_.string(), file_size)); + + if (OB_SUCC(ret)) { + ObString expr_file_url; + if (data_access_driver_.get_storage_type() == OB_STORAGE_FILE) { + ObSqlString full_name; + if (ip_port_.empty()) { + OZ(gen_ip_port(allocator_)); + } + OZ (full_name.append_fmt("%.*s%%%.*s", ip_port_.length(), ip_port_.ptr(), + state_.cur_file_url_.length(), state_.cur_file_url_.ptr())); + OZ (ob_write_string(allocator_, full_name.string(), expr_file_url)); + } else { + expr_file_url = state_.cur_file_url_; + } + for (int i = 0; OB_SUCC(ret) && i < eval_ctx.max_batch_size_; i++) { + file_url_ptrs_.at(i) = expr_file_url.ptr(); + file_url_lens_.at(i) = expr_file_url.length(); + } + } + LOG_DEBUG("current external file", K(url_), K(file_size)); + } + } while (OB_SUCC(ret) && OB_UNLIKELY(0 >= file_size)); //skip not exist or empty file + + if (OB_SUCC(ret)) { + int64_t part_id = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::PARTITION_ID].get_int(); + if (part_id != 0 && state_.part_id_ != part_id) { + state_.part_id_ = part_id; + OZ (calc_file_partition_list_value(part_id, allocator_, state_.part_list_val_)); + } + + state_.cur_file_id_ = scan_param_->key_ranges_.at(task_idx).get_start_key().get_obj_ptr()[ObExternalTableUtils::FILE_ID].get_int(); + OZ (ObExternalTableUtils::resolve_line_number_range(scan_param_->key_ranges_.at(task_idx), + ObExternalTableUtils::ROW_GROUP_NUMBER, + state_.cur_stripe_idx_, + state_.end_stripe_idx_)); + + try { + OZ (data_access_driver_.open(url_.ptr()), url_); + std::unique_ptr inStream(new ObOrcFileAccess(data_access_driver_, + url_.ptr(), file_size)); + orc::ReaderOptions options; + options.setMemoryPool(orc_alloc_); + std::unique_ptr reader = orc::createReader(std::move(inStream), options); + if (!reader) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("orc create reader failed", K(ret)); + throw std::bad_exception(); + } + int64_t nstripes = reader->getNumberOfStripes(); + LOG_TRACE("read file access: number of stipes", K(nstripes), K(url_)); + OZ (stripes_.allocate_array(allocator_, static_cast(nstripes))); + state_.end_stripe_idx_ = std::min(nstripes, state_.end_stripe_idx_); + std::unique_ptr stripe; + uint64_t first_row_of_stripe = 0; + for (int i = 0; OB_SUCC(ret) && i < nstripes; i++) { + stripe = reader->getStripe(i); + if (!stripe) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get orc file stripe failed", K(ret)); + throw std::bad_exception(); + } + stripes_.at(i) = StripeInformation({static_cast(stripe->getOffset()), + static_cast(stripe->getLength()), + static_cast(stripe->getNumberOfRows()), + static_cast(first_row_of_stripe)}); + first_row_of_stripe += stripe->getNumberOfRows(); + } + + std::list include_names_list; + for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); i++) { + ObDataAccessPathExtraInfo *data_access_info = + static_cast(file_column_exprs_.at(i)->extra_info_); + CK (data_access_info != nullptr); + CK (data_access_info->data_access_path_.ptr() != nullptr); + CK (data_access_info->data_access_path_.length() != 0); + if (OB_SUCC(ret)) { + include_names_list.push_front(std::string(data_access_info->data_access_path_.ptr(), + data_access_info->data_access_path_.length())); //x.y.z -> column_id + } + } + orc::RowReaderOptions rowReaderOptions; + rowReaderOptions.include(include_names_list); + row_reader_ = reader->createRowReader(rowReaderOptions); + if (OB_FAIL(ret)) { + } else if (!row_reader_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("create row reader failed", K(ret)); + } else { + //column_indexs_.at(i) = rowReader->getSelectedType().getColumnId(); + //column_readers_.at(i) = std::move(rowReader); + ObArray col_names; + id_to_type_.reuse(); + name_to_id_.reuse(); + OZ (SMART_CALL(build_type_name_id_map(&row_reader_->getSelectedType(), col_names))); + for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); i++) { + ObDataAccessPathExtraInfo *data_access_info = + static_cast(file_column_exprs_.at(i)->extra_info_); + CK (data_access_info != nullptr); + CK (data_access_info->data_access_path_.ptr() != nullptr); + CK (data_access_info->data_access_path_.length() != 0); + int64_t col_id = -1; + OZ (name_to_id_.get_refactored(ObString(data_access_info->data_access_path_.length(), data_access_info->data_access_path_.ptr()), col_id)); + CK (col_id != -1); + const orc::Type *type = nullptr; + OZ (id_to_type_.get_refactored(col_id, type)); + CK (type != nullptr); + if (OB_SUCC(ret)) { + load_funcs_.at(i) = DataLoader::select_load_function(file_column_exprs_.at(i)->datum_meta_, + *type); + } + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(load_funcs_.at(i))) { + ret = OB_ERR_INVALID_TYPE_FOR_OP; + if (i >= row_reader_->getSelectedType().getSubtypeCount()) { + //error for report + LOG_WARN("not supported type", K(ret), K(file_column_exprs_.at(i)->datum_meta_)); + const char *ob_type = ob_obj_type_str(file_column_exprs_.at(i)->datum_meta_.type_); + LOG_USER_ERROR(OB_EXTERNAL_FILE_COLUMN_TYPE_MISMATCH, "", ob_type); + } else { + std::string p_type = row_reader_->getSelectedType().getSubtype(i) == nullptr ? + "INVALID ORC TYPE" : row_reader_->getSelectedType().getSubtype(i)->toString(); + int64_t pos = 0; + ObArrayWrap buf; + ObDatumMeta &meta = file_column_exprs_.at(i)->datum_meta_; + const char *ob_type = ob_obj_type_str(file_column_exprs_.at(i)->datum_meta_.type_); + if (OB_SUCCESS == buf.allocate_array(allocator_, 100)) { + ob_sql_type_str(buf.get_data(), buf.count(), pos, meta.type_, + OB_MAX_VARCHAR_LENGTH, meta.precision_, meta.scale_, meta.cs_type_); + if (pos < buf.count()) { + buf.at(pos++) = '\0'; + ob_type = buf.get_data(); + } + } + LOG_WARN("not supported type", K(ret), K(file_column_exprs_.at(i)->datum_meta_), + K(ObString(p_type.length(), p_type.data()))); + LOG_USER_ERROR(OB_EXTERNAL_FILE_COLUMN_TYPE_MISMATCH, p_type.c_str(), ob_type); + } + } + } + } + } catch(const std::exception& e) { + if (OB_SUCC(ret)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error", K(ret), "Info", e.what()); + } + } catch(...) { + if (OB_SUCC(ret)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error", K(ret)); + } + } + } + } + return ret; +} + +bool ObOrcTableRowIterator::DataLoader::is_orc_read_utc() +{ + return true; +} + +bool ObOrcTableRowIterator::DataLoader::is_ob_type_store_utc(const ObDatumMeta &meta) +{ + return ObTimestampType == meta.type_ || ObDateType == meta.type_ + || (lib::is_mysql_mode() && ObDateTimeType == meta.type_) + || (lib::is_mysql_mode() && ObTimeType == meta.type_); +} + +int64_t ObOrcTableRowIterator::DataLoader::calc_tz_adjust_us() +{ + int64_t res = 0; + int ret = OB_SUCCESS; + bool is_utc_src = is_orc_read_utc(); + bool is_utc_dst = is_ob_type_store_utc(file_col_expr_->datum_meta_); + if (is_utc_src != is_utc_dst) { + int32_t tmp_offset = 0; + if (OB_NOT_NULL(eval_ctx_.exec_ctx_.get_my_session()) + && OB_NOT_NULL(eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info()) + && OB_SUCCESS == eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info()->get_timezone_offset(0, tmp_offset)) { + res = SEC_TO_USEC(tmp_offset) * (is_utc_dst ? 1 : -1); + } + } + LOG_DEBUG("tz adjust", K(is_utc_src), K(is_utc_dst), K(res), K(file_col_expr_->datum_meta_)); + return res; +} + +int ObOrcTableRowIterator::DataLoader::load_data_for_col(LOAD_FUNC &func) +{ + return (this->*func)(); +} + +ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader::select_load_function( + const ObDatumMeta &datum_type, const orc::Type &type) +{ + LOAD_FUNC func = NULL; + + // int size = static_cast(type.getSubtypeCount()); + // if (size > 1) { + // LOG_USER_ERROR(OB_NOT_SUPPORTED, "Non-primitive type now are"); + // throw std::invalid_argument(type.toString()); + // } + const orc::Type* col_desc = &type; + orc::TypeKind type_kind = col_desc->getKind(); + if (ob_is_integer_type(datum_type.type_)) { + switch (type_kind) { + case orc::TypeKind::BOOLEAN: + case orc::TypeKind::BYTE: + case orc::TypeKind::SHORT: + case orc::TypeKind::INT: + case orc::TypeKind::LONG: + func = &DataLoader::load_int64_vec; + break; + case orc::TypeKind::DATE: + func = &DataLoader::load_int32_vec; + break; + default: + func = NULL; + } + } else if (ob_is_string_type(datum_type.type_) || ob_is_enum_or_set_type(datum_type.type_)) { + //convert parquet enum/string to enum/string vector + switch (type_kind) { + case orc::TypeKind::STRING: + case orc::TypeKind::VARCHAR: + case orc::TypeKind::BINARY: + case orc::TypeKind::CHAR: + func = &DataLoader::load_string_col; + break; + default: + func = NULL; + } + } else if (ob_is_number_or_decimal_int_tc(datum_type.type_)) { + //convert parquet int storing as int32/int64 to number/decimal vector + if (type_kind == orc::TypeKind::DECIMAL) { + if (col_desc->getPrecision() != datum_type.precision_ || col_desc->getScale() != datum_type.scale_) { + func = NULL; + } else if (col_desc->getPrecision() == 0 || col_desc->getPrecision() > 18) { + func = &DataLoader::load_dec128_vec; + } else { + func = &DataLoader::load_dec64_vec; + } + } else { + func = NULL; + } + } else if (ob_is_date_tc(datum_type.type_) || + ob_is_datetime(datum_type.type_) || + ob_is_time_tc(datum_type.type_) || + ob_is_otimestamp_type(datum_type.type_) || + ObTimestampType == datum_type.type_) { + switch (type_kind) { + // Values of TIMESTAMP type are stored in the writer timezone in the Orc file. + // Values are read back in the reader timezone. However, the writer timezone + // information in the Orc stripe footer is optional and may be missing. What is + // more, stripes in the same Orc file may have different writer timezones (though + // unlikely). So we cannot tell the exact timezone of values read back. In the adapter + // implementations, we set both writer and + // reader timezone to UTC to avoid any conversion so users can get the same values + // as written. To get rid of this burden, TIMESTAMP_INSTANT type is always preferred + // over TIMESTAMP type. + case orc::TypeKind::TIMESTAMP: + case orc::TypeKind::TIMESTAMP_INSTANT: + LOG_DEBUG("show type kind", K(type_kind), K(orc::TypeKind::TIMESTAMP_INSTANT)); + if (ob_is_date_tc(datum_type.type_) || + ob_is_datetime(datum_type.type_) || + ob_is_time_tc(datum_type.type_) || + ObTimestampType == datum_type.type_ || + ObTimestampLTZType == datum_type.type_) { + func = &DataLoader::load_timestamp_vec; + } + break; + case orc::TypeKind::DATE: + if (ob_is_date_tc(datum_type.type_)) { + func = &DataLoader::load_int32_vec; + } else if (ob_is_datetime(datum_type.type_) || + ob_is_time_tc(datum_type.type_) || + ObTimestampType == datum_type.type_ || + ObTimestampLTZType == datum_type.type_) { + func = &DataLoader::load_date_to_time_or_stamp; + } + break; + default: + func = NULL; + } + } else if (orc::TypeKind::FLOAT == type_kind && ObFloatType == datum_type.type_) { + func = &DataLoader::load_float; + } else if (orc::TypeKind::DOUBLE == type_kind && ObDoubleType == datum_type.type_) { + func = &DataLoader::load_double; + } + return func; +} + +int ObOrcTableRowIterator::get_data_column_batch_idxs(const orc::Type *type, const int col_id, ObIArray &idxs) +{ + int ret = OB_SUCCESS; + CK (type != NULL); + bool found = false; + while (OB_SUCC(ret) && !found) { + const orc::Type *cur = type; + for (int i = 0; OB_SUCC(ret) && !found && i < type->getSubtypeCount(); i++) { + if (type->getSubtype(i)->getColumnId() == col_id) { + OZ (idxs.push_back(i)); + found = true; + } else if (type->getSubtype(i)->getColumnId() < col_id && col_id < type->getSubtype(i)->getMaximumColumnId()) { + OZ (idxs.push_back(i)); + type = type->getSubtype(i); + } else { + //do nothing + } + } + if (OB_FAIL(ret)) { + } else if (cur == type && !found) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get data colum batch failed", K(ret)); + } + } + return ret; +} + +int ObOrcTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) +{ + int ret = OB_SUCCESS; + ObEvalCtx &eval_ctx = scan_param_->op_->get_eval_ctx(); + const ExprFixedArray &column_conv_exprs = *(scan_param_->ext_column_convert_exprs_); + int64_t read_count = 0; + ObMallocHookAttrGuard guard(mem_attr_); + + if (OB_SUCC(ret) && state_.cur_stripe_read_row_count_ >= state_.cur_stripe_row_count_) { + if (OB_FAIL(next_stripe())) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to next row group", K(ret)); + } + } + } + if (OB_FAIL(ret)) { + } else if (!file_column_exprs_.count()) { + read_count = std::min(capacity, state_.cur_stripe_row_count_ - state_.cur_stripe_read_row_count_); + } else if (!row_reader_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("row reader is null", K(ret)); + } else { + std::unique_ptr batch = row_reader_->createRowBatch(capacity); + if (!batch) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("create orc row batch failed", K(ret)); + } else if (row_reader_->next(*batch)) { + //ok + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read next batch failed", K(ret), K(state_.cur_stripe_read_row_count_), K(state_.cur_stripe_row_count_)); + } + //load vec data from parquet file to file column expr + for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) { + if (OB_ISNULL(file_column_exprs_.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("file column expr is null", K(ret)); + } else { + int idx = -1; + int64_t col_id = -1; + ObDataAccessPathExtraInfo *data_access_info = + static_cast(file_column_exprs_.at(i)->extra_info_); + CK (data_access_info != nullptr); + CK (data_access_info->data_access_path_.ptr() != nullptr); + CK (data_access_info->data_access_path_.length() != 0); + OZ (name_to_id_.get_refactored(ObString(data_access_info->data_access_path_.length(), data_access_info->data_access_path_.ptr()), col_id)); + ObArray idxs; + OZ (get_data_column_batch_idxs(&row_reader_->getSelectedType(), col_id, idxs)); + DataLoader loader(eval_ctx, file_column_exprs_.at(i), batch, capacity, idxs, read_count); + OZ (file_column_exprs_.at(i)->init_vector_for_write( + eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); + OZ (loader.load_data_for_col(load_funcs_.at(i))); + if (OB_SUCC(ret)) { + file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx); + } + if (OB_SUCC(ret) && read_count == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read result count is zero", K(ret)); + } + } + } + } + if (OB_SUCC(ret) && read_count > 0) { + //fill expr results from metadata + for (int i = 0; OB_SUCC(ret) && i < file_meta_column_exprs_.count(); i++) { + ObExpr *meta_expr = file_meta_column_exprs_.at(i); + CK (OB_NOT_NULL(meta_expr)); + if (OB_FAIL(ret)) { + } else if (meta_expr->type_ == T_PSEUDO_EXTERNAL_FILE_URL) { + StrDiscVec *text_vec = static_cast(meta_expr->get_vector(eval_ctx)); + CK (OB_NOT_NULL(text_vec)); + OZ (meta_expr->init_vector_for_write(eval_ctx, VEC_DISCRETE, read_count)); + if (OB_SUCC(ret)) { + text_vec->set_ptrs(file_url_ptrs_.get_data()); + text_vec->set_lens(file_url_lens_.get_data()); + } + } else if (meta_expr->type_ == T_PSEUDO_PARTITION_LIST_COL) { + OZ (meta_expr->init_vector_for_write(eval_ctx, VEC_UNIFORM, read_count)); + OZ (fill_file_partition_expr(meta_expr, state_.part_list_val_, read_count)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected expr", KPC(meta_expr)); + } + meta_expr->set_evaluated_projected(eval_ctx); + } + + for (int i = 0; OB_SUCC(ret) && i < column_exprs_.count(); i++) { + //column_conv_exprs is 1-1 mapped to column_exprs + //calc gen column exprs + CK (OB_NOT_NULL(column_conv_exprs.at(i))); + if (OB_FAIL(ret)) { + } else if (!column_conv_exprs.at(i)->get_eval_info(eval_ctx).evaluated_) { + OZ (column_conv_exprs.at(i)->init_vector_default(eval_ctx, read_count)); + CK (OB_NOT_NULL(bit_vector_cache_)); + OZ (column_conv_exprs.at(i)->eval_vector(eval_ctx, *bit_vector_cache_, read_count, true)); + OX (column_conv_exprs.at(i)->set_evaluated_projected(eval_ctx)); + } + //assign gen column exprs value to column exprs(output exprs) + if (OB_SUCC(ret)) { + ObExpr *to = column_exprs_.at(i); + ObExpr *from = column_conv_exprs.at(i); + CK (OB_NOT_NULL(to)); + CK (OB_NOT_NULL(from)); + VectorHeader &to_vec_header = to->get_vector_header(eval_ctx); + VectorHeader &from_vec_header = from->get_vector_header(eval_ctx); + if (OB_FAIL(ret)) { + } else if (from_vec_header.format_ == VEC_UNIFORM_CONST) { + ObDatum *from_datum = + static_cast(from->get_vector(eval_ctx))->get_datums(); + CK (OB_NOT_NULL(from_datum)); + OZ(to->init_vector(eval_ctx, VEC_UNIFORM, read_count)); + ObUniformBase *to_vec = static_cast(to->get_vector(eval_ctx)); + CK (OB_NOT_NULL(to_vec)); + if (OB_SUCC(ret)) { + ObDatum *to_datums = to_vec->get_datums(); + CK (OB_NOT_NULL(to_datums)); + for (int64_t j = 0; j < read_count && OB_SUCC(ret); j++) { + to_datums[j] = *from_datum; + } + } + } else if (from_vec_header.format_ == VEC_UNIFORM) { + ObUniformBase *uni_vec = static_cast(from->get_vector(eval_ctx)); + CK (OB_NOT_NULL(uni_vec)); + if (OB_SUCC(ret)) { + ObDatum *src = uni_vec->get_datums(); + ObDatum *dst = to->locate_batch_datums(eval_ctx); + CK (OB_NOT_NULL(src)); + CK (OB_NOT_NULL(dst)); + if (OB_SUCC(ret) && src != dst) { + MEMCPY(dst, src, read_count * sizeof(ObDatum)); + } + OZ(to->init_vector(eval_ctx, VEC_UNIFORM, read_count)); + } + } else { + to_vec_header = from_vec_header; + } + column_exprs_.at(i)->set_evaluated_projected(eval_ctx); + } + } + } + if (OB_SUCC(ret)) { + state_.cur_stripe_read_row_count_ += read_count; + count = read_count; + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_int64_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + row_count_ = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *int64_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(int64_vec)); + CK (VEC_FIXED == int64_vec->get_format()); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::LongVectorBatch *long_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (!long_batch) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc type failed", K(ret)); + } else if (!long_batch->hasNulls) { + CK (OB_NOT_NULL(long_batch->data.data())); + CK (OB_NOT_NULL(int64_vec->get_data())); + if (OB_SUCC(ret)) { + MEMCPY(pointer_cast(int64_vec->get_data()), long_batch->data.data(), sizeof(int64_t) * row_count_); + } + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + CK (OB_NOT_NULL(long_batch->notNull.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(long_batch->notNull.data()) + i; + if (OB_ISNULL(valid_bytes)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("orc not null batch valid bytes is null", K(ret)); + } else if (*valid_bytes == 1) { + int64_vec->set_int(i, long_batch->data[i]); + } else { + int64_vec->set_null(i); + } + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + LOG_DEBUG("load int64 vec", K(ret), K(row_count_)); + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_int32_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *int32_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(int32_vec)); + CK (VEC_FIXED == int32_vec->get_format()); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::LongVectorBatch *long_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (!long_batch) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc type failed", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + if (long_batch->hasNulls) { + CK (OB_NOT_NULL(long_batch->notNull.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(long_batch->notNull.data()) + i; + if (OB_ISNULL(valid_bytes)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("orc not null batch valid bytes is null", K(ret)); + } else if (*valid_bytes == 1) { + int32_vec->set_int32(i, (int32_t)long_batch->data[i]); + } else { + int32_vec->set_null(i); + } + } + } else { + int32_vec->set_int32(i, (int32_t)long_batch->data[i]); + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + LOG_DEBUG("load int32 vec", K(ret), K(row_count_)); + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_string_col() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + StrDiscVec *text_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + ObArrayWrap values; + CK (OB_NOT_NULL(text_vec)); + CK (VEC_DISCRETE == text_vec->get_format()); + OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_)); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + bool is_oracle_mode = lib::is_oracle_mode(); + bool is_byte_length = is_oracle_byte_length( + is_oracle_mode, file_col_expr_->datum_meta_.length_semantics_); + orc::StringVectorBatch *string_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (!string_batch) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc type failed", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + if (string_batch->hasNulls) { + CK (OB_NOT_NULL(string_batch->data.data())); + CK (OB_NOT_NULL(string_batch->notNull.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(string_batch->notNull.data()) + i; + if (OB_ISNULL(valid_bytes)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("orc not null batch valid bytes is null", K(ret)); + } else if (*valid_bytes == 1) { + if (string_batch->length[i] == 0 && is_oracle_mode) { + text_vec->set_null(i); + } else { + if (OB_UNLIKELY(string_batch->length[i] > file_col_expr_->max_length_ + && (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN, + pointer_cast(string_batch->data[i]), + string_batch->length[i]) > file_col_expr_->max_length_))) { + ret = OB_ERR_DATA_TOO_LONG; + LOG_WARN("data too long", K(ret)); + } else { + values.at(i) = std::string(string_batch->data[i], string_batch->data[i] + string_batch->length[i]); + text_vec->set_string(i, values.at(i).data(), values.at(i).length()); + } + } + } else { + text_vec->set_null(i); + } + } + } else { + CK (OB_NOT_NULL(string_batch->length.data())); + if (OB_FAIL(ret)) { + } else if (string_batch->length[i] == 0 && is_oracle_mode) { + text_vec->set_null(i); + } else { + CK (OB_NOT_NULL(string_batch->data.data())); + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(string_batch->length[i] > file_col_expr_->max_length_ + && (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN, + pointer_cast(string_batch->data[i]), + string_batch->length[i]) > file_col_expr_->max_length_))) { + ret = OB_ERR_DATA_TOO_LONG; + LOG_WARN("data too long", K(ret)); + } else { + values.at(i) = std::string(string_batch->data[i], string_batch->data[i] + string_batch->length[i]); + text_vec->set_string(i, values.at(i).data(), values.at(i).length()); + } + } + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_timestamp_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *dec_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + int64_t adjust_us = calc_tz_adjust_us(); + LOG_DEBUG("adjust value", K(adjust_us)); + CK (OB_NOT_NULL(dec_vec)); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + row_count_ = batch_->numElements; + orc::TimestampVectorBatch *timestamp_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (!timestamp_batch) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc type failed", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + const uint8_t* valid_bytes = nullptr; + CK (OB_NOT_NULL(timestamp_batch->data.data())); + CK (OB_NOT_NULL(timestamp_batch->nanoseconds.data())); + if (OB_SUCC(ret) && timestamp_batch->hasNulls) { + CK (OB_NOT_NULL(timestamp_batch->notNull.data())); + if (OB_SUCC(ret)) { + valid_bytes = reinterpret_cast(timestamp_batch->notNull.data()) + i; + } + } + if (OB_FAIL(ret)) { + } else if (!timestamp_batch->hasNulls || *valid_bytes == 1) { + int64_t adjusted_value = timestamp_batch->data[i] * USECS_PER_SEC + timestamp_batch->nanoseconds[i] / NSECS_PER_USEC + adjust_us; + if (ObTimestampType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_timestamp(i, adjusted_value); + } else if (ObDateTimeType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_datetime(i, adjusted_value); + } else if (ObDateType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_date(i, adjusted_value / USECS_PER_DAY); + } else if (ObTimeType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_time(i, adjusted_value); + } else { + ObOTimestampData data; + data.time_us_ = adjusted_value; + data.time_ctx_.set_tail_nsec(timestamp_batch->nanoseconds[i] % NSECS_PER_USEC); + dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + } + } else { + dec_vec->set_null(i); + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + return ret; +} + + +int ObOrcTableRowIterator::DataLoader::load_date_to_time_or_stamp() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *dec_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + int64_t adjust_us = calc_tz_adjust_us(); + LOG_DEBUG("show adjust value in date to ts", K(adjust_us)); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + row_count_ = batch_->numElements; + orc::LongVectorBatch *date_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(date_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic dec64 batch cast failed", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + const uint8_t* valid_bytes = nullptr; + CK (OB_NOT_NULL(date_batch->data.data())); + if (OB_SUCC(ret)) { + if (date_batch->hasNulls) { + CK (OB_NOT_NULL(date_batch->notNull.data())); + if (OB_SUCC(ret)) { + valid_bytes = reinterpret_cast(date_batch->notNull.data()) + i; + } + } + if (OB_FAIL(ret)) { + } else if (!date_batch->hasNulls || *valid_bytes == 1) { + int64_t adjusted_value = date_batch->data[i] * USECS_PER_DAY + adjust_us; + if (ObTimestampType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_timestamp(i, adjusted_value); + } else if (ObDateTimeType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_datetime(i, date_batch->data[i] * USECS_PER_DAY); + } else if (ObDateType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_date(i, adjusted_value / USECS_PER_DAY); + } else if (ObTimeType == file_col_expr_->datum_meta_.type_) { + dec_vec->set_time(i, adjusted_value); + } else { + ObOTimestampData data; + data.time_us_ = adjusted_value; + dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + } + } else { + dec_vec->set_null(i); + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_dec64_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(vec)); + CK (VEC_FIXED == vec->get_format()); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::Decimal64VectorBatch *dec64_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(dec64_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic dec64 batch cast failed", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + CK (OB_NOT_NULL(dec64_batch->values.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = nullptr; + if (dec64_batch->hasNulls) { + CK (OB_NOT_NULL(dec64_batch->notNull.data())); + if (OB_SUCC(ret)) { + valid_bytes = reinterpret_cast(dec64_batch->notNull.data()) + i; + } + } + if (OB_FAIL(ret)) { + } else if (!dec64_batch->hasNulls || *valid_bytes == 1) { + if (ObDecimalIntType == file_col_expr_->datum_meta_.type_) { + ObDecimalInt *decint = NULL; + int32_t int_bytes = 0; + if (OB_FAIL(wide::from_integer(dec64_batch->values[i], tmp_alloc_g.get_allocator(), decint, + int_bytes, file_col_expr_->datum_meta_.precision_))) { + LOG_WARN("fail to from integer", K(ret)); + } else if (OB_ISNULL(decint)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("int to dec failed", K(ret)); + } else { + vec->set_decimal_int(i, decint, int_bytes); + } + } else if (ObNumberType == file_col_expr_->datum_meta_.type_) { + ObDiscreteBase *vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + number::ObNumber res_nmb; + if (OB_FAIL(res_nmb.from(dec64_batch->values[i], tmp_alloc_g.get_allocator()))) { + LOG_WARN("fail to from number", K(ret)); + } else { + vec->set_number(i, res_nmb); + } + } + } else { + file_col_expr_->get_vector(eval_ctx_)->set_null(i); + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_dec128_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::Decimal128VectorBatch *dec128_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(dec128_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic dec128 batch cast failed", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + CK (OB_NOT_NULL(dec128_batch->values.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = nullptr; + if (dec128_batch->hasNulls) { + CK (OB_NOT_NULL(dec128_batch->notNull.data())); + if (OB_SUCC(ret)) { + valid_bytes = reinterpret_cast(dec128_batch->notNull.data()) + i; + } + } + if (OB_FAIL(ret)) { + } else if (!dec128_batch->hasNulls || *valid_bytes == 1) { + ObDecimalInt *decint = NULL; + int32_t int_bytes = sizeof(int128_t); + int128_t val = dec128_batch->values[i].getHighBits(); + val = val << 64; + val = val | dec128_batch->values[i].getLowBits(); + void *data = nullptr; + + if (OB_ISNULL(data = tmp_alloc_g.get_allocator().alloc(int_bytes))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + COMMON_LOG(WARN, "allocate memory failed", K(ret), K(int_bytes)); + } else { + decint = reinterpret_cast(data); + *decint->int128_v_ = val; + } + if (OB_FAIL(ret)) { + } else if (ObDecimalIntType == file_col_expr_->datum_meta_.type_) { + ObFixedLengthBase *vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(vec)); + CK (VEC_FIXED == vec->get_format()); + if (OB_SUCC(ret)) { + vec->set_decimal_int(i, decint, int_bytes); + } + } else if (ObNumberType == file_col_expr_->datum_meta_.type_) { + ObDiscreteBase *vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(vec)); + if (OB_SUCC(ret)) { + number::ObNumber res_nmb; + if (OB_FAIL(wide::to_number(decint, int_bytes, file_col_expr_->datum_meta_.scale_, + tmp_alloc_g.get_allocator(), res_nmb))) { + LOG_WARN("fail to from", K(ret)); + } else { + vec->set_number(i, res_nmb); + } + } + } + } else { + file_col_expr_->get_vector(eval_ctx_)->set_null(i); + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_float() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + ObFixedLengthBase *float_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(float_vec)); + CK (VEC_FIXED == float_vec->get_format()); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::DoubleVectorBatch *double_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(double_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic double batch cast failed", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + CK (OB_NOT_NULL(double_batch->data.data())); + if (OB_FAIL(ret)) { + } else if (double_batch->hasNulls) { + CK (OB_NOT_NULL(double_batch->notNull.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(double_batch->notNull.data()) + i; + if (*valid_bytes == 1) { + float_vec->set_float(i, (float)double_batch->data[i]); + } else { + float_vec->set_null(i); + } + } + } else { + float_vec->set_float(i, (float)double_batch->data[i]); + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + return ret; +} + +int ObOrcTableRowIterator::DataLoader::load_double() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + ObFixedLengthBase *double_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(double_vec)); + CK (VEC_FIXED == double_vec->get_format()); + if (OB_SUCC(ret)) { + if (batch_) { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::DoubleVectorBatch *double_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(double_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic double batch cast failed", K(ret)); + } + if (OB_FAIL(ret)) { + } else if (!double_batch->hasNulls) { + CK (OB_NOT_NULL(double_batch->data.data())); + CK (OB_NOT_NULL(double_vec->get_data())); + if (OB_SUCC(ret)) { + MEMCPY(pointer_cast(double_vec->get_data()), double_batch->data.data(), sizeof(double) * row_count_); + } + } else { + for (int64_t i = 0; i < row_count_; i++) { + CK (OB_NOT_NULL(double_batch->notNull.data())); + CK (OB_NOT_NULL(double_batch->data.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(double_batch->notNull.data()) + i; + CK (OB_NOT_NULL(valid_bytes)); + if (OB_FAIL(ret)) { + } else if (*valid_bytes == 1) { + double_vec->set_double(i, double_batch->data[i]); + } else { + double_vec->set_null(i); + } + } + } + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } + } + return ret; +} + + +int ObOrcTableRowIterator::get_next_row() +{ + int ret = OB_NOT_SUPPORTED; + return ret; +} + +void ObOrcTableRowIterator::reset() { + // reset state_ to initial values for rescan + state_.reuse(); +} + +} +} \ No newline at end of file diff --git a/src/sql/engine/table/ob_orc_table_row_iter.h b/src/sql/engine/table/ob_orc_table_row_iter.h new file mode 100644 index 000000000..2b75531e5 --- /dev/null +++ b/src/sql/engine/table/ob_orc_table_row_iter.h @@ -0,0 +1,235 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_ORC_TABLE_ROW_ITER_H +#define OB_ORC_TABLE_ROW_ITER_H + +#include "share/ob_i_tablet_scan.h" +#include "lib/file/ob_file.h" +#include "common/row/ob_row_iterator.h" +#include "storage/access/ob_dml_param.h" +#include "common/storage/ob_io_device.h" +#include "share/backup/ob_backup_struct.h" +#include "sql/engine/table/ob_external_table_access_service.h" +#include +#include +#include +#include +#include +#include + +namespace oceanbase { +namespace sql { + class ObOrcMemPool : public orc::MemoryPool { + public: + void init(uint64_t tenant_id) { + mem_attr_ = ObMemAttr(tenant_id, "OrcMemPool"); + } + + virtual char* malloc(uint64_t size) override { + int ret = OB_SUCCESS; + void *buf = ob_malloc_align(64, size, mem_attr_); + if (OB_ISNULL(buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory", K(size), K(lbt())); + throw std::bad_alloc(); + } + return (char*)buf; + } + + virtual void free(char* p) override { + if (OB_ISNULL(p)) { + throw std::bad_exception(); + } + ob_free_align(p); + } + + private: + common::ObMemAttr mem_attr_; + }; + + class ObOrcFileAccess : public orc::InputStream { + public: + ObOrcFileAccess(ObExternalDataAccessDriver &file_reader, const char* file_name, int64_t len) + : file_reader_(file_reader), file_name_(file_name), total_length_(len), fileName_(file_name) { + } + + uint64_t getLength() const { + return total_length_; + } + + uint64_t getNaturalReadSize() const { + return 128 * 1024; + } + + void read(void* buf, + uint64_t length, + uint64_t offset) override { + int64_t bytesRead = 0; + int ret = file_reader_.pread(buf, length, offset, bytesRead); + if (ret != OB_SUCCESS) { + throw std::bad_exception(); + } + LOG_TRACE("read file access", K(file_name_), K(bytesRead)); + } + + const std::string& getName() const override { + return fileName_; + } + + private: + ObExternalDataAccessDriver &file_reader_; + const char* file_name_; + uint64_t total_length_; + std::string fileName_; + }; + + struct StripeInformation { + /// \brief Offset of the stripe from the start of the file, in bytes + int64_t offset; + /// \brief Length of the stripe, in bytes + int64_t length; + /// \brief Number of rows in the stripe + int64_t num_rows; + /// \brief Index of the first row of the stripe + int64_t first_row_id; + + TO_STRING_KV(K(offset), K(length), K(num_rows), K(first_row_id)); + }; + + class ObOrcTableRowIterator : public ObExternalTableRowIterator { + public: + struct StateValues { + StateValues() : + file_idx_(0), + part_id_(0), + cur_file_id_(0), + cur_file_url_(), + cur_stripe_idx_(0), + end_stripe_idx_(-1), + cur_stripe_read_row_count_(0), + cur_stripe_row_count_(0), + batch_size_(128), + part_list_val_() {} + void reuse() { + file_idx_ = 0; + part_id_ = 0; + cur_file_id_ = 0; + cur_stripe_idx_ = 0; + end_stripe_idx_ = -1; + cur_stripe_read_row_count_ = 0; + cur_stripe_row_count_ = 0; + cur_file_url_.reset(); + part_list_val_.reset(); + } + int64_t file_idx_; + int64_t part_id_; + int64_t cur_file_id_; + ObString cur_file_url_; + int64_t cur_stripe_idx_; + int64_t end_stripe_idx_; + int64_t cur_stripe_read_row_count_; + int64_t cur_stripe_row_count_; + int64_t batch_size_; + ObNewRow part_list_val_; + }; + public: + ObOrcTableRowIterator() : file_column_exprs_(allocator_), file_meta_column_exprs_(allocator_), bit_vector_cache_(NULL) {} + virtual ~ObOrcTableRowIterator() { + + } + + int init(const storage::ObTableScanParam *scan_param) override; + + virtual int get_next_row(ObNewRow *&row) override { + UNUSED(row); + return common::OB_ERR_UNEXPECTED; + } + + int get_next_row() override; + int get_next_rows(int64_t &count, int64_t capacity) override; + + virtual void reset() override; +private: + // load vec data from orc file to expr mem + struct DataLoader { + DataLoader(ObEvalCtx &eval_ctx, + ObExpr *file_col_expr, + std::unique_ptr &batch, + const int64_t batch_size, + const ObIArray &idxs, + int64_t &row_count): + eval_ctx_(eval_ctx), + file_col_expr_(file_col_expr), + batch_(batch), + batch_size_(batch_size), + idxs_(idxs), + row_count_(row_count) + {} + typedef int (DataLoader::*LOAD_FUNC)(); + static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type, + const orc::Type &type); + int load_data_for_col(LOAD_FUNC &func); + int load_string_col(); + int load_int32_vec(); + int load_int64_vec(); + int load_timestamp_vec(); + int load_date_to_time_or_stamp(); + int load_float(); + int load_double(); + int load_dec128_vec(); + int load_dec64_vec(); + + bool is_orc_read_utc(); + bool is_ob_type_store_utc(const ObDatumMeta &meta); + + int64_t calc_tz_adjust_us(); + ObEvalCtx &eval_ctx_; + ObExpr *file_col_expr_; + std::unique_ptr &batch_; + const int64_t batch_size_; + const ObIArray &idxs_; + int64_t &row_count_; + }; + private: + int next_file(); + int next_stripe(); + int build_type_name_id_map(const orc::Type* type, ObIArray &col_names); + int to_dot_column_path(ObIArray &col_names, ObString &path); + int get_data_column_batch_idxs(const orc::Type *type, const int col_id, ObIArray &idxs); + private: + + StateValues state_; + lib::ObMemAttr mem_attr_; + ObArenaAllocator allocator_; + ObOrcMemPool orc_alloc_; + std::unique_ptr reader_; + std::unique_ptr row_reader_; + common::ObArrayWrap stripes_; + ObExternalDataAccessDriver data_access_driver_; + common::ObArrayWrap column_indexs_; //for getting statistics, may useless now. + ExprFixedArray file_column_exprs_; //column value from parquet file + ExprFixedArray file_meta_column_exprs_; //column value from file meta + common::ObArrayWrap load_funcs_; + ObSqlString url_; + ObBitVector *bit_vector_cache_; + common::ObArrayWrap file_url_ptrs_; //for file url expr + common::ObArrayWrap file_url_lens_; //for file url expr + hash::ObHashMap id_to_type_; + hash::ObHashMap name_to_id_; + +}; + +} +} + +#endif \ No newline at end of file diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index c7aaa4234..618b9852f 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -8247,9 +8247,20 @@ int ObDMLResolver::resolve_external_table_generated_column( ObExternalFileFormat format; if (OB_FAIL(format.load_from_string(table_schema->get_external_file_format(), *params_.allocator_))) { LOG_WARN("load from string failed", K(ret)); + } else if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not support orc in oracle mode", K(ret)); + LOG_USER_WARN(OB_NOT_SUPPORTED, "orc in oracle mode"); } else if (format.format_type_ != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) { - ret = OB_WRONG_COLUMN_NAME; - LOG_USER_ERROR(OB_WRONG_COLUMN_NAME, col.col_name_.length(), col.col_name_.ptr()); + if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && + ObExternalFileFormat::PARQUET_FORMAT != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) { + ret = OB_WRONG_COLUMN_NAME; + LOG_WARN("wrong column name", K(format.format_type_)); + LOG_USER_ERROR(OB_WRONG_COLUMN_NAME, col.col_name_.length(), col.col_name_.ptr()); + } + } + + if (OB_FAIL(ret)) { } else if (ObExternalFileFormat::CSV_FORMAT == format.format_type_) { if (OB_FAIL(ObResolverUtils::calc_file_column_idx(col.col_name_, file_column_idx))) { LOG_WARN("fail to calc file column idx", K(ret)); @@ -8263,7 +8274,8 @@ int ObDMLResolver::resolve_external_table_generated_column( LOG_WARN("fail to build external table file column expr", K(ret)); } } - } else if (ObExternalFileFormat::PARQUET_FORMAT == format.format_type_) { + } else if (ObExternalFileFormat::PARQUET_FORMAT == format.format_type_ || + ObExternalFileFormat::ORC_FORMAT == format.format_type_ ) { ObRawExpr *cast_expr = NULL; ObRawExpr *get_path_expr = NULL; ObRawExpr *cast_type_expr = NULL;