[FEAT MERGE] merge external table orc read feature

This commit is contained in:
SevenJ-swj 2024-08-26 06:32:54 +00:00 committed by ob-robot
parent 695a8e0b54
commit ae6a932613
16 changed files with 2271 additions and 11 deletions

View File

@ -33,6 +33,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.aarch64.rpm
devdeps-protobuf-c-1.4.1-100000072023102410.el7.aarch64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.aarch64.rpm
devdeps-apache-arrow-9.0.0-302024052920.el7.aarch64.rpm
devdeps-apache-orc-1.8.3-202024072510.el7.aarch64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el7.aarch64.rpm

View File

@ -36,6 +36,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el7.x86_64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.x86_64.rpm
devdeps-apache-arrow-9.0.0-222024052223.el7.x86_64.rpm
devdeps-apache-orc-1.8.3-202024072510.el7.x86_64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el7.x86_64.rpm

View File

@ -33,6 +33,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm
devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm
devdeps-apache-arrow-9.0.0-322024052923.el8.aarch64.rpm
devdeps-apache-orc-1.8.3-202024072510.el8.aarch64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el8.aarch64.rpm

View File

@ -35,6 +35,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm
devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm
devdeps-apache-orc-1.8.3-202024072510.el8.x86_64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el8.x86_64.rpm

View File

@ -37,6 +37,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm
devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm
devdeps-apache-arrow-9.0.0-322024052923.el8.aarch64.rpm
devdeps-apache-orc-1.8.3-202024072510.el8.aarch64.rpm
[deps-el9]
devdeps-apr-1.6.5-232023090616.el9.aarch64.rpm target=el9

View File

@ -39,6 +39,7 @@ devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm
devdeps-apache-orc-1.8.3-202024072510.el8.x86_64.rpm
[deps-el9]
devdeps-apr-1.6.5-232023090616.el9.x86_64.rpm target=el9

View File

@ -21,6 +21,7 @@ target_include_directories(
${DEP_DIR}/include/apr-1/
${DEP_DIR}/include/icu/common
${DEP_DIR}/include/apache-arrow
${DEP_DIR}/include/apache-orc
${USSL_INCLUDE_DIRS}
)
@ -208,6 +209,12 @@ target_link_libraries(oblib_base_base_base
${DEP_DIR}/lib64/libarrow.a
${DEP_DIR}/lib64/libparquet.a
${DEP_DIR}/lib64/libarrow_bundled_dependencies.a
${DEP_DIR}/lib64/liborc.a
${DEP_DIR}/lib64/libsnappy.a
${DEP_DIR}/lib64/libprotoc.a
${DEP_DIR}/lib64/libprotobuf.a
${DEP_DIR}/lib64/liblz4.a
${DEP_DIR}/lib64/libzstd.a
-L${DEP_DIR}/var/usr/lib64
-L${DEP_DIR}/var/usr/lib
-L${DEP_3RD_DIR}/usr/lib
@ -237,6 +244,12 @@ target_link_libraries(oblib_base_base_base
${DEP_DIR}/lib64/libarrow.a
${DEP_DIR}/lib64/libparquet.a
${DEP_DIR}/lib64/libarrow_bundled_dependencies.a
${DEP_DIR}/lib64/liborc.a
${DEP_DIR}/lib64/libsnappy.a
${DEP_DIR}/lib64/libprotoc.a
${DEP_DIR}/lib64/libprotobuf.a
${DEP_DIR}/lib64/liblz4.a
${DEP_DIR}/lib64/libzstd.a
-L${DEP_DIR}/var/usr/lib64
-L${DEP_DIR}/var/usr/lib
-L${DEP_3RD_DIR}/usr/lib

View File

@ -6,6 +6,7 @@
# oblib_addtest(time/test_ob_time_utility.cpp)
# oblib_addtest(timezone/test_ob_timezone_utils.cpp)
oblib_addtest(parquet/test_parquet.cpp)
oblib_addtest(orc/test_orc.cpp)
oblib_addtest(alloc/test_alloc_struct.cpp)
oblib_addtest(alloc/test_block_set.cpp)
oblib_addtest(alloc/test_chunk_mgr.cpp)

603
deps/oblib/unittest/lib/orc/test_orc.cpp vendored Normal file
View File

@ -0,0 +1,603 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL
#include "gtest/gtest.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
#include <orc/OrcFile.hh>
#include <orc/MemoryPool.hh>
#include <orc/Writer.hh>
#include <orc/Reader.hh>
#include <orc/Vector.hh>
#include <iostream>
#include <cstdlib>
#include "lib/allocator/page_arena.h"
#include "lib/file/ob_file.h"
#include "lib/file/file_directory_utils.h"
#include "lib/charset/ob_template_helper.h"
#include "lib/net/ob_net_util.h"
#define USING_LOG_PREFIX SQL
using namespace oceanbase::common;
class TestOrc: public ::testing::Test
{
public:
TestOrc() {};
virtual ~TestOrc() {};
virtual void SetUp();
virtual void TearDown();
};
void TestOrc::SetUp()
{
}
void TestOrc::TearDown()
{
}
class OrcMemoryPool : public ::orc::MemoryPool
{
public:
virtual char* malloc(uint64_t size) override {
return (char* )alloc_.alloc(size);
}
virtual void free(char* p) override {
//do nothing
}
private:
oceanbase::common::ObArenaAllocator alloc_;
};
// Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
// Type::type kind = type.id();
// switch (kind) {
// case Type::type::BOOL:
// return liborc::createPrimitiveType(liborc::TypeKind::BOOLEAN);
// case Type::type::INT8:
// return liborc::createPrimitiveType(liborc::TypeKind::BYTE);
// case Type::type::INT16:
// return liborc::createPrimitiveType(liborc::TypeKind::SHORT);
// case Type::type::INT32:
// return liborc::createPrimitiveType(liborc::TypeKind::INT);
// case Type::type::INT64:
// return liborc::createPrimitiveType(liborc::TypeKind::LONG);
// case Type::type::FLOAT:
// return liborc::createPrimitiveType(liborc::TypeKind::FLOAT);
// case Type::type::DOUBLE:
// return liborc::createPrimitiveType(liborc::TypeKind::DOUBLE);
// // Use STRING instead of VARCHAR for now, both use UTF-8
// case Type::type::STRING:
// case Type::type::LARGE_STRING:
// return liborc::createPrimitiveType(liborc::TypeKind::STRING);
// case Type::type::BINARY:
// case Type::type::LARGE_BINARY:
// case Type::type::FIXED_SIZE_BINARY:
// return liborc::createPrimitiveType(liborc::TypeKind::BINARY);
// case Type::type::DATE32:
// return liborc::createPrimitiveType(liborc::TypeKind::DATE);
// case Type::type::DATE64:
// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP);
// case Type::type::TIMESTAMP: {
// const auto& timestamp_type = checked_cast<const TimestampType&>(type);
// if (!timestamp_type.timezone().empty()) {
// // The timestamp values stored in the arrow array are normalized to UTC.
// // TIMESTAMP_INSTANT type is always preferred over TIMESTAMP type.
// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP_INSTANT);
// }
// // The timestamp values stored in the arrow array can be in any timezone.
// return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP);
// }
// case Type::type::DECIMAL128: {
// const uint64_t precision =
// static_cast<uint64_t>(checked_cast<const Decimal128Type&>(type).precision());
// const uint64_t scale =
// static_cast<uint64_t>(checked_cast<const Decimal128Type&>(type).scale());
// return liborc::createDecimalType(precision, scale);
// }
// case Type::type::LIST:
// case Type::type::FIXED_SIZE_LIST:
// case Type::type::LARGE_LIST: {
// const auto& value_field = checked_cast<const BaseListType&>(type).value_field();
// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*value_field->type()));
// SetAttributes(value_field, orc_subtype.get());
// return liborc::createListType(std::move(orc_subtype));
// }
// case Type::type::STRUCT: {
// std::unique_ptr<liborc::Type> out_type = liborc::createStructType();
// std::vector<std::shared_ptr<Field>> arrow_fields =
// checked_cast<const StructType&>(type).fields();
// for (auto it = arrow_fields.begin(); it != arrow_fields.end(); ++it) {
// std::string field_name = (*it)->name();
// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*(*it)->type()));
// SetAttributes(*it, orc_subtype.get());
// out_type->addStructField(field_name, std::move(orc_subtype));
// }
// return out_type;
// }
// case Type::type::MAP: {
// const auto& key_field = checked_cast<const MapType&>(type).key_field();
// const auto& item_field = checked_cast<const MapType&>(type).item_field();
// ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type()));
// ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type()));
// SetAttributes(key_field, key_orc_type.get());
// SetAttributes(item_field, item_orc_type.get());
// return liborc::createMapType(std::move(key_orc_type), std::move(item_orc_type));
// }
// case Type::type::DENSE_UNION:
// case Type::type::SPARSE_UNION: {
// std::unique_ptr<liborc::Type> out_type = liborc::createUnionType();
// std::vector<std::shared_ptr<Field>> arrow_fields =
// checked_cast<const UnionType&>(type).fields();
// for (const auto& arrow_field : arrow_fields) {
// std::shared_ptr<DataType> arrow_child_type = arrow_field->type();
// ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
// SetAttributes(arrow_field, orc_subtype.get());
// out_type->addUnionChild(std::move(orc_subtype));
// }
// return out_type;
// }
// default: {
// return Status::NotImplemented("Unknown or unsupported Arrow type: ",
// type.ToString());
// }
// }
// }
// class ObOrcRandomAccess : public :orc::InputStream {
// public:
// ObOrcRandomAccess(oceanbase::sql::ObExternalDataAccessDriver &file_reader, const char* file_name, orc::MemoryPool *pool)
// : file_reader_(file_reader), file_name_(file_name), pool_(pool) {
// }
// uint64_t getLength() const override {
// return totalLength;
// }
// uint64_t getNaturalReadSize() const override {
// return 128 * 1024;
// }
// void read(void* buf,
// uint64_t length,
// uint64_t offset) override {
// int64_t bytesRead = 0;
// int ret = file_reader_.pread(buf, length, offset, bytesRead);
// totalLength += bytesRead;
// if (ret != OB_SUCCESS) {
// throw orc::ParseError("Bad read of " + std::string(file_name_));
// }
// }
// const std::string& getName() const override {
// return file_name_;
// }
// private:
// oceanbase::sql::ObExternalDataAccessDriver &file_reader_;
// const std::string file_name_;
// orc::MemoryPool *pool_;
// uint64_t totalLength;
// };
void wirte_orc_file() {
std::unique_ptr<orc::OutputStream> outStream = orc::writeLocalFile("my-file.orc");
//std::unique_ptr<orc::Type> schema(orc::Type::buildTypeFromString("struct<x:int,y:int>"));
std::unique_ptr<orc::Type> schema = orc::createStructType();
schema->addStructField("x", orc::createPrimitiveType(orc::TypeKind::INT));
schema->addStructField("y", orc::createPrimitiveType(orc::TypeKind::BOOLEAN));
std::unique_ptr<orc::Type> sub_schema = orc::createStructType();
sub_schema->addStructField("z", orc::createPrimitiveType(orc::TypeKind::FLOAT));
sub_schema->addStructField("d", orc::createPrimitiveType(orc::TypeKind::DATE));
schema->addStructField("S2", std::move(sub_schema));
orc::WriterOptions options;
OrcMemoryPool pool;
options.setMemoryPool(&pool);
options.setCompression(orc::CompressionKind::CompressionKind_ZLIB);
std::unique_ptr<orc::Writer> writer = orc::createWriter(*schema, outStream.get(), options);
uint64_t batchSize = 8, rowCount = 100;
std::unique_ptr<orc::ColumnVectorBatch> batch =
writer->createRowBatch(batchSize);
orc::StructVectorBatch *root =
dynamic_cast<orc::StructVectorBatch *>(batch.get());
orc::LongVectorBatch *x =
dynamic_cast<orc::LongVectorBatch *>(root->fields[0]);
orc::LongVectorBatch *y =
dynamic_cast<orc::LongVectorBatch *>(root->fields[1]);
orc::DoubleVectorBatch *z =
dynamic_cast<orc::DoubleVectorBatch *>(dynamic_cast<orc::StructVectorBatch *>(root->fields[2])->fields[0]);
uint64_t rows = 0;
for (uint64_t i = 0; i < rowCount; ++i) {
if (i % 5 == 0) {
x->notNull[rows] = 0;
y->notNull[rows] = 0;
z->notNull[rows] = 0;
x->hasNulls = true;
y->hasNulls = true;
z->hasNulls = true;
x->data[rows] = 0;
y->data[rows] = 0;
z->data[rows] = i * 1.1 + 0.01;
rows++;
} else {
x->notNull[rows] = true;
y->notNull[rows] = true;
z->notNull[rows] = true;
x->data[rows] = i + 1;
y->data[rows] = i * 3 + 1;
z->data[rows] = i * 1.1 + 0.01;
rows++;
}
if (rows == batchSize) {
root->numElements = rows;
x->numElements = rows;
y->numElements = rows;
z->numElements = rows;
writer->add(*batch);
rows = 0;
}
}
if (rows != 0) {
root->numElements = rows;
x->numElements = rows;
y->numElements = rows;
//z->numElements = rows;
writer->add(*batch);
rows = 0;
}
writer->close();
}
void read_orc_file() {
std::unique_ptr<orc::InputStream> inStream = orc::readLocalFile("my-file.orc");
orc::ReaderOptions options;
OrcMemoryPool pool;
options.setMemoryPool(pool);
std::unique_ptr<orc::Reader> reader = orc::createReader(std::move(inStream), options);
orc::RowReaderOptions rowReaderOptions;
std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOptions);
std::unique_ptr<orc::ColumnVectorBatch> batch = rowReader->createRowBatch(2);
//std::cout <<"root field size: " << root->fields.size() << std::endl;
while (rowReader->next(*batch)) {
std::cout<<"column batch:" << batch->toString() <<"\n";
for (uint64_t r = 0; r < batch->numElements; ++r) {
orc::StructVectorBatch *root =
dynamic_cast<orc::StructVectorBatch *>(batch.get());
orc::ColumnVectorBatch *col[10] = {NULL};
int k = 0;
std::cout<< "row:" << r ;
for (int i = 0; i < reader->getType().getSubtypeCount(); i++) {
const uint8_t* valid_bytes = NULL;
switch (reader->getType().getSubtype(i)->getKind()) {
case orc::TypeKind::BOOLEAN:
case orc::TypeKind::BYTE:
case orc::TypeKind::SHORT:
case orc::TypeKind::INT:
case orc::TypeKind::LONG:
case orc::TypeKind::DATE:
//valid_bytes = reinterpret_cast<const uint8_t*>( dynamic_cast<orc::LongVectorBatch *>(root->fields[i])->notNull.data()) + r;
std::cout<<" col" << i <<":" << dynamic_cast<orc::LongVectorBatch *>(root->fields[i])->data[r] << " is not null:"<< bool(root->fields[i]->notNull[r]);
break;
case orc::TypeKind::FLOAT:
case orc::TypeKind::DOUBLE:
std::cout<< " col" << i <<":" << dynamic_cast<orc::DoubleVectorBatch *>(root->fields[i])->data[r] ;
std::cout<<" has NULL:"<<(root->fields[i])->hasNulls<<" is not null:"<< bool(root->fields[i]->notNull[r]);
break;
case orc::TypeKind::STRING:
case orc::TypeKind::VARCHAR:
case orc::TypeKind::CHAR:
case orc::TypeKind::BINARY:
std::cout<< " col" << i <<":" << dynamic_cast<orc::StringVectorBatch *>(root->fields[i])->data[r] <<" is not null:"<< bool(root->fields[i]->notNull[r]);
break;
case orc::TypeKind::LIST:
case orc::TypeKind::MAP:
case orc::TypeKind::UNION:
//not supported
break;
case orc::TypeKind::DECIMAL:
//std::cout<< "row:" << r << " col" << i <<":" << dynamic_cast<orc::Decimal128VectorBatch *>(root->fields[i])->data[r] <<" is not null:"<< (bool)dynamic_cast<orc::LongVectorBatch *>(root->fields[i])->notNull[r];
break;
case orc::TypeKind::TIMESTAMP:
case orc::TypeKind::TIMESTAMP_INSTANT:
//std::cout<< "row:" << r << " col" << i <<":" << dynamic_cast<orc::TimestampVectorBatch *>(root->fields[i])->data[r] <<" is not null:"<< (bool)dynamic_cast<orc::LongVectorBatch *>(root->fields[i])->notNull[r];
break;
case orc::TypeKind::STRUCT:
std::cout<< " col" << i <<":" << dynamic_cast<orc::DoubleVectorBatch *>(dynamic_cast<orc::StructVectorBatch *>(root->fields[i])->fields[0])->data[r];
break;
default:
//error
break;
}
// if (reader->getType().getColumnId() == reader->getType().getMaximumColumnId()) {//is primitive
// col[k++] = x;
// }
}
std::cout<<"\n";
}
}
}
void printType(const orc::Type &type) {
std::cout << " type:" << type.toString() <<" type subTypeCount:" << type.getSubtypeCount()<< " typeKind: " << type.getKind() << " ColumnId:" << type.getColumnId() << " maxColumnId:" << type.getMaximumColumnId() << "\n";
for (int i = 0; i < type.getSubtypeCount(); i++) {
printType(*type.getSubtype(i));
}
}
bool is_primitive_Type(const orc::Type &type) {
if (type.getColumnId() == type.getMaximumColumnId()) {
return true;
}
return false;
}
void read_file_footer() {
std::unique_ptr<orc::InputStream> inStream = orc::readLocalFile("my-file.orc");
orc::ReaderOptions options;
OrcMemoryPool pool;
//options.setMemoryPool(pool);
std::unique_ptr<orc::Reader> reader = orc::createReader(std::move(inStream), options);
const orc::Type& type = reader->getType();
printType(type);
for (int i = 0; i < reader->getType().getSubtypeCount(); i++) {
std::cout << "Subfield" << i << ": " << type.getFieldName(i);
printType(*type.getSubtype(i));
}
for (int i = 0; i <= reader->getType().getMaximumColumnId(); i++) {
std::cout<<"column" <<i<<": " << reader->getColumnStatistics(i)->toString() <<"\n";
}
}
// Result<std::shared_ptr<Schema>> GetArrowSchema(const liborc::Type& type) {
// if (type.getKind() != liborc::STRUCT) {
// return Status::NotImplemented(
// "Only ORC files with a top-level struct "
// "can be handled");
// }
// int size = static_cast<int>(type.getSubtypeCount());
// std::vector<std::shared_ptr<Field>> fields;
// fields.reserve(size);
// for (int child = 0; child < size; ++child) {
// const std::string& name = type.getFieldName(child);
// ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField(name, type.getSubtype(child)));
// fields.push_back(std::move(elem_field));
// }
// ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata());
// return std::make_shared<Schema>(std::move(fields), std::move(metadata));
// }
// Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() {
// const std::list<std::string> keys = reader_->getMetadataKeys();
// auto metadata = std::make_shared<KeyValueMetadata>();
// for (const auto& key : keys) {
// metadata->Append(key, reader_->getMetadataValue(key));
// }
// return std::const_pointer_cast<const KeyValueMetadata>(metadata);
// }
void read_column() {
std::cout<<"=================== test read column ===================\n";
std::unique_ptr<orc::InputStream> inStream = orc::readLocalFile("my-file.orc");
orc::ReaderOptions options;
// OrcMemoryPool pool;
// options.setMemoryPool(pool);
std::list<std::string> include_names_list;
include_names_list.push_front(std::string("y"));
include_names_list.push_front(std::string("S2.z"));
include_names_list.push_front(std::string("x"));
std::unique_ptr<orc::Reader> reader = orc::createReader(std::move(inStream), options);
orc::RowReaderOptions rowReaderOptions;
rowReaderOptions.include(include_names_list);
std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOptions);
std::unique_ptr<orc::ColumnVectorBatch> batch = rowReader->createRowBatch(1024);
std::cout<<"column batch:" << batch->toString() <<"\n";
printType(rowReader->getSelectedType());
const orc::Type &type = rowReader->getSelectedType();
int size = static_cast<int>(type.getSubtypeCount());
for (int child = 0; child < size; ++child) {
const std::string& name = type.getFieldName(child);
std::cout<<"field "<< child << " name:" << name <<std::endl;
printType(*type.getSubtype(child));
}
// const std::list<std::string> keys = reader->getMetadataKeys();
// auto metadata = std::make_shared<KeyValueMetadata>();
// for (const auto& key : keys) {
// metadata->Append(key, reader_->getMetadataValue(key));
// }
// const orc::Type& type = reader->getType();
// printType(type);
// for (int i = 0; i < reader->getType().getSubtypeCount(); i++) {
// std::cout << "Subfield" << i << ": " << type.getFieldName(i);
// printType(*type.getSubtype(i));
// }
// for (int i = 0; i <= reader->getType().getMaximumColumnId(); i++) {
// std::cout<<"column" <<i<<": " << reader->getColumnStatistics(i)->toString() <<"\n";
// }
}
void read_schema(std::unique_ptr<orc::RowReader> &rowReader) {
const orc::Type &type = rowReader->getSelectedType();
if (type.getKind() != orc::STRUCT) {
throw std::runtime_error("Only ORC files with a top-level struct can be handled");
} else {
}
}
void read_stripe() {
// int64_t nstripes = reader_->getNumberOfStripes();
// stripes_.resize(static_cast<size_t>(nstripes));
// std::unique_ptr<liborc::StripeInformation> stripe;
// uint64_t first_row_of_stripe = 0;
// for (int i = 0; i < nstripes; ++i) {
// stripe = reader_->getStripe(i);
// stripes_[i] = StripeInformation({static_cast<int64_t>(stripe->getOffset()),
// static_cast<int64_t>(stripe->getLength()),
// static_cast<int64_t>(stripe->getNumberOfRows()),
// static_cast<int64_t>(first_row_of_stripe)});
// first_row_of_stripe += stripe->getNumberOfRows();
// }
}
void generate_orc_file(const bool output_null) {
std::unique_ptr<orc::OutputStream> outStream = orc::writeLocalFile("test1.orc");
//std::unique_ptr<orc::Type> schema(orc::Type::buildTypeFromString("struct<x:int,y:int>"));
//std::unique_ptr<orc::Type> schema = orc::createStructType();
// schema->addStructField("age", orc::createPrimitiveType(orc::TypeKind::INT))
// ->addStructField("is_male", orc::createPrimitiveType(orc::TypeKind::BOOLEAN))
// ->addStructField("name", orc::createPrimitiveType(orc::TypeKind::Varchar))
// ->addStructField("comment", orc::createPrimitiveType(orc::TypeKind::String));
// std::unique_ptr<Type> schema(Type::buildTypeFromString("
// struct<name:varchar(100), age:int,
// sex:char(1), comment:string, in_date:date, job_time:timestamp,
// language:binary, has_house:boolean, money:bigint,
// height:decimal(5, 2), weight:double, width:float>"));
// std::unique_ptr<orc::Type> sub_schema = orc::createStructType();
// sub_schema->addStructField("z", orc::createPrimitiveType(orc::TypeKind::FLOAT));
// schema->addStructField("S2", std::move(sub_schema));
// orc::WriterOptions options;
// OrcMemoryPool pool;
// options.setCompression(orc::CompressionKind::CompressionKind_SNAPPY);
// //options.setStripeSize(1024);
// options.setCompressionBlockSize(1024);
// options.setMemoryPool(&pool);
// std::unique_ptr<orc::Writer> writer = orc::createWriter(*schema, outStream.get(), options);
// uint64_t batchSize = 128, rowCount = 12800;
// std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(batchSize);
// orc::StructVectorBatch *root = dynamic_cast<orc::StructVectorBatch *>(batch.get());
// uint64_t rows = 0;
// for (uint64_t i = 0; i < rowCount; ++i) {
// if (output_null) {
// int put_null = std::rand() % 10;
// if (put_null == 1) {
// }
// }
// if (i % 5 == 0) {
// x->notNull[rows] = 0;
// y->notNull[rows] = 0;
// z->notNull[rows] = 0;
// x->hasNulls = true;
// y->hasNulls = true;
// z->hasNulls = true;
// x->data[rows] = 0;
// y->data[rows] = 0;
// z->data[rows] = i * 1.1 + 0.01;
// rows++;
// } else {
// x->notNull[rows] = true;
// y->notNull[rows] = true;
// z->notNull[rows] = true;
// x->data[rows] = i + 1;
// y->data[rows] = i * 3 + 1;
// z->data[rows] = i * 1.1 + 0.01;
// rows++;
// }
// if (rows == batchSize) {
// root->numElements = rows;
// x->numElements = rows;
// y->numElements = rows;
// z->numElements = rows;
// writer->add(*batch);
// rows = 0;
// }
// }
// if (rows != 0) {
// root->numElements = rows;
// x->numElements = rows;
// y->numElements = rows;
// //z->numElements = rows;
// writer->add(*batch);
// rows = 0;
// }
// writer->close();
}
void read_test_orc_file()
{
std::cout<<"=================== test read orc file column ===================\n";
std::unique_ptr<orc::InputStream> inStream = orc::readLocalFile("/data/1/mingye.swj/work/support_master_orc/data/t.orc");
orc::ReaderOptions options;
// OrcMemoryPool pool;
// options.setMemoryPool(pool);
// std::list<std::string> include_names_list;
// include_names_list.push_front(std::string("S2.z"));
// include_names_list.push_front(std::string("x"));
std::unique_ptr<orc::Reader> reader = orc::createReader(std::move(inStream), options);
orc::RowReaderOptions rowReaderOptions;
//rowReaderOptions.include(include_names_list);
std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOptions);
std::unique_ptr<orc::ColumnVectorBatch> batch = rowReader->createRowBatch(256);
while (rowReader->next(*batch)) {
std::cout<<"column batch:" << batch->toString() <<"\n";
for (uint64_t r = 0; r < batch->numElements; ++r) {
orc::StructVectorBatch *root =
dynamic_cast<orc::StructVectorBatch *>(batch.get());
std::cout<<" row" << r <<":" << dynamic_cast<orc::LongVectorBatch *>(root->fields[0])->data[r] << " is not null:"<< bool(root->fields[0]->notNull[r]);
}
}
printType(rowReader->getSelectedType());
const orc::Type &type = rowReader->getSelectedType();
int size = static_cast<int>(type.getSubtypeCount());
for (int child = 0; child < size; ++child) {
const std::string& name = type.getFieldName(child);
printType(*type.getSubtype(child));
}
}
TEST_F(TestOrc, read_write_orc_file_test)
{
wirte_orc_file();
read_orc_file();
read_file_footer();
read_column();
//read_test_orc_file();
}
int main(int argc, char **argv)
{
OB_LOGGER.set_log_level("INFO");
testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}

View File

@ -889,6 +889,7 @@ ob_set_subtarget(ob_sql engine_table
engine/table/ob_index_lookup_op_impl.cpp
engine/table/ob_table_scan_with_index_back_op.cpp
engine/table/ob_external_table_access_service.cpp
engine/table/ob_orc_table_row_iter.cpp
engine/table/ob_parquet_table_row_iter.cpp
)

View File

@ -32,10 +32,9 @@ const char INVALID_TERM_CHAR = '\xff';
const char * ObExternalFileFormat::FORMAT_TYPE_STR[] = {
"CSV",
"PARQUET",
"ORC",
};
static_assert(array_elements(ObExternalFileFormat::FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT,
"Not enough initializer for ObExternalFileFormat");
static_assert(array_elements(ObExternalFileFormat::FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT, "Not enough initializer for ObExternalFileFormat");
int ObCSVGeneralFormat::init_format(const ObDataInFileStruct &format,
int64_t file_column_nums,
@ -391,7 +390,7 @@ int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const
J_OBJ_START();
databuff_print_kv(buf, buf_len, pos, "\"TYPE\"", is_valid_format ? FORMAT_TYPE_STR[format_type_] : "INVALID");
databuff_print_kv(buf, buf_len, pos, "\"TYPE\"", is_valid_format ? ObExternalFileFormat::FORMAT_TYPE_STR[format_type_] : "INVALID");
switch (format_type_) {
case CSV_FORMAT:
@ -434,8 +433,8 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
LOG_WARN("unexpected json format", K(ret), K(str));
} else {
ObString format_type_str = format_type_node->value_->get_string();
for (int i = 0; i < array_elements(FORMAT_TYPE_STR); ++i) {
if (format_type_str.case_compare(FORMAT_TYPE_STR[i]) == 0) {
for (int i = 0; i < array_elements(ObExternalFileFormat::FORMAT_TYPE_STR); ++i) {
if (format_type_str.case_compare(ObExternalFileFormat::FORMAT_TYPE_STR[i]) == 0) {
format_type_ = static_cast<FormatType>(i);
break;
}
@ -447,6 +446,7 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
OZ (origin_file_format_str_.load_from_json_data(format_type_node, allocator));
break;
case PARQUET_FORMAT:
case ORC_FORMAT:
break;
default:
ret = OB_ERR_UNEXPECTED;

View File

@ -524,6 +524,7 @@ struct ObExternalFileFormat
INVALID_FORMAT = -1,
CSV_FORMAT,
PARQUET_FORMAT,
ORC_FORMAT,
MAX_FORMAT
};
@ -543,7 +544,6 @@ struct ObExternalFileFormat
sql::ObCSVGeneralFormat csv_format_;
ObLoadCompressionFormat compression_format_;
uint64_t options_;
static const char *FORMAT_TYPE_STR[];
};

View File

@ -23,6 +23,7 @@
#include "lib/utility/ob_macro_utils.h"
#include "sql/engine/table/ob_parquet_table_row_iter.h"
#include "sql/engine/cmd/ob_load_data_file_reader.h"
#include "sql/engine/table/ob_orc_table_row_iter.h"
namespace oceanbase
{
@ -558,13 +559,18 @@ int ObExternalTableAccessService::table_scan(
LOG_WARN("alloc memory failed", K(ret));
}
break;
case ObExternalFileFormat::PARQUET_FORMAT:
if (OB_ISNULL(row_iter = OB_NEWx(ObParquetTableRowIterator, (scan_param.allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
}
break;
case ObExternalFileFormat::ORC_FORMAT:
if (OB_ISNULL(row_iter = OB_NEWx(ObOrcTableRowIterator, (scan_param.allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected format", K(ret), "format", param.external_file_format_.format_type_);
@ -594,6 +600,7 @@ int ObExternalTableAccessService::table_rescan(ObVTableScanParam &param, ObNewRo
switch (param.external_file_format_.format_type_) {
case ObExternalFileFormat::CSV_FORMAT:
case ObExternalFileFormat::PARQUET_FORMAT:
case ObExternalFileFormat::ORC_FORMAT:
result->reset();
break;
default:

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,235 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_ORC_TABLE_ROW_ITER_H
#define OB_ORC_TABLE_ROW_ITER_H
#include "share/ob_i_tablet_scan.h"
#include "lib/file/ob_file.h"
#include "common/row/ob_row_iterator.h"
#include "storage/access/ob_dml_param.h"
#include "common/storage/ob_io_device.h"
#include "share/backup/ob_backup_struct.h"
#include "sql/engine/table/ob_external_table_access_service.h"
#include <orc/OrcFile.hh>
#include <orc/MemoryPool.hh>
#include <orc/Writer.hh>
#include <orc/Reader.hh>
#include <orc/Int128.hh>
#include <orc/Common.hh>
namespace oceanbase {
namespace sql {
class ObOrcMemPool : public orc::MemoryPool {
public:
void init(uint64_t tenant_id) {
mem_attr_ = ObMemAttr(tenant_id, "OrcMemPool");
}
virtual char* malloc(uint64_t size) override {
int ret = OB_SUCCESS;
void *buf = ob_malloc_align(64, size, mem_attr_);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(size), K(lbt()));
throw std::bad_alloc();
}
return (char*)buf;
}
virtual void free(char* p) override {
if (OB_ISNULL(p)) {
throw std::bad_exception();
}
ob_free_align(p);
}
private:
common::ObMemAttr mem_attr_;
};
class ObOrcFileAccess : public orc::InputStream {
public:
ObOrcFileAccess(ObExternalDataAccessDriver &file_reader, const char* file_name, int64_t len)
: file_reader_(file_reader), file_name_(file_name), total_length_(len), fileName_(file_name) {
}
uint64_t getLength() const {
return total_length_;
}
uint64_t getNaturalReadSize() const {
return 128 * 1024;
}
void read(void* buf,
uint64_t length,
uint64_t offset) override {
int64_t bytesRead = 0;
int ret = file_reader_.pread(buf, length, offset, bytesRead);
if (ret != OB_SUCCESS) {
throw std::bad_exception();
}
LOG_TRACE("read file access", K(file_name_), K(bytesRead));
}
const std::string& getName() const override {
return fileName_;
}
private:
ObExternalDataAccessDriver &file_reader_;
const char* file_name_;
uint64_t total_length_;
std::string fileName_;
};
struct StripeInformation {
/// \brief Offset of the stripe from the start of the file, in bytes
int64_t offset;
/// \brief Length of the stripe, in bytes
int64_t length;
/// \brief Number of rows in the stripe
int64_t num_rows;
/// \brief Index of the first row of the stripe
int64_t first_row_id;
TO_STRING_KV(K(offset), K(length), K(num_rows), K(first_row_id));
};
class ObOrcTableRowIterator : public ObExternalTableRowIterator {
public:
struct StateValues {
StateValues() :
file_idx_(0),
part_id_(0),
cur_file_id_(0),
cur_file_url_(),
cur_stripe_idx_(0),
end_stripe_idx_(-1),
cur_stripe_read_row_count_(0),
cur_stripe_row_count_(0),
batch_size_(128),
part_list_val_() {}
void reuse() {
file_idx_ = 0;
part_id_ = 0;
cur_file_id_ = 0;
cur_stripe_idx_ = 0;
end_stripe_idx_ = -1;
cur_stripe_read_row_count_ = 0;
cur_stripe_row_count_ = 0;
cur_file_url_.reset();
part_list_val_.reset();
}
int64_t file_idx_;
int64_t part_id_;
int64_t cur_file_id_;
ObString cur_file_url_;
int64_t cur_stripe_idx_;
int64_t end_stripe_idx_;
int64_t cur_stripe_read_row_count_;
int64_t cur_stripe_row_count_;
int64_t batch_size_;
ObNewRow part_list_val_;
};
public:
ObOrcTableRowIterator() : file_column_exprs_(allocator_), file_meta_column_exprs_(allocator_), bit_vector_cache_(NULL) {}
virtual ~ObOrcTableRowIterator() {
}
int init(const storage::ObTableScanParam *scan_param) override;
virtual int get_next_row(ObNewRow *&row) override {
UNUSED(row);
return common::OB_ERR_UNEXPECTED;
}
int get_next_row() override;
int get_next_rows(int64_t &count, int64_t capacity) override;
virtual void reset() override;
private:
// load vec data from orc file to expr mem
struct DataLoader {
DataLoader(ObEvalCtx &eval_ctx,
ObExpr *file_col_expr,
std::unique_ptr<orc::ColumnVectorBatch> &batch,
const int64_t batch_size,
const ObIArray<int> &idxs,
int64_t &row_count):
eval_ctx_(eval_ctx),
file_col_expr_(file_col_expr),
batch_(batch),
batch_size_(batch_size),
idxs_(idxs),
row_count_(row_count)
{}
typedef int (DataLoader::*LOAD_FUNC)();
static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type,
const orc::Type &type);
int load_data_for_col(LOAD_FUNC &func);
int load_string_col();
int load_int32_vec();
int load_int64_vec();
int load_timestamp_vec();
int load_date_to_time_or_stamp();
int load_float();
int load_double();
int load_dec128_vec();
int load_dec64_vec();
bool is_orc_read_utc();
bool is_ob_type_store_utc(const ObDatumMeta &meta);
int64_t calc_tz_adjust_us();
ObEvalCtx &eval_ctx_;
ObExpr *file_col_expr_;
std::unique_ptr<orc::ColumnVectorBatch> &batch_;
const int64_t batch_size_;
const ObIArray<int> &idxs_;
int64_t &row_count_;
};
private:
int next_file();
int next_stripe();
int build_type_name_id_map(const orc::Type* type, ObIArray<ObString> &col_names);
int to_dot_column_path(ObIArray<ObString> &col_names, ObString &path);
int get_data_column_batch_idxs(const orc::Type *type, const int col_id, ObIArray<int> &idxs);
private:
StateValues state_;
lib::ObMemAttr mem_attr_;
ObArenaAllocator allocator_;
ObOrcMemPool orc_alloc_;
std::unique_ptr<orc::Reader> reader_;
std::unique_ptr<orc::RowReader> row_reader_;
common::ObArrayWrap<StripeInformation> stripes_;
ObExternalDataAccessDriver data_access_driver_;
common::ObArrayWrap<int> column_indexs_; //for getting statistics, may useless now.
ExprFixedArray file_column_exprs_; //column value from parquet file
ExprFixedArray file_meta_column_exprs_; //column value from file meta
common::ObArrayWrap<DataLoader::LOAD_FUNC> load_funcs_;
ObSqlString url_;
ObBitVector *bit_vector_cache_;
common::ObArrayWrap<char *> file_url_ptrs_; //for file url expr
common::ObArrayWrap<ObLength> file_url_lens_; //for file url expr
hash::ObHashMap<int64_t, const orc::Type*> id_to_type_;
hash::ObHashMap<ObString, int64_t> name_to_id_;
};
}
}
#endif

View File

@ -8247,9 +8247,20 @@ int ObDMLResolver::resolve_external_table_generated_column(
ObExternalFileFormat format;
if (OB_FAIL(format.load_from_string(table_schema->get_external_file_format(), *params_.allocator_))) {
LOG_WARN("load from string failed", K(ret));
} else if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT && lib::is_oracle_mode()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support orc in oracle mode", K(ret));
LOG_USER_WARN(OB_NOT_SUPPORTED, "orc in oracle mode");
} else if (format.format_type_ != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) {
ret = OB_WRONG_COLUMN_NAME;
LOG_USER_ERROR(OB_WRONG_COLUMN_NAME, col.col_name_.length(), col.col_name_.ptr());
if (format.format_type_ == ObExternalFileFormat::ORC_FORMAT &&
ObExternalFileFormat::PARQUET_FORMAT != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) {
ret = OB_WRONG_COLUMN_NAME;
LOG_WARN("wrong column name", K(format.format_type_));
LOG_USER_ERROR(OB_WRONG_COLUMN_NAME, col.col_name_.length(), col.col_name_.ptr());
}
}
if (OB_FAIL(ret)) {
} else if (ObExternalFileFormat::CSV_FORMAT == format.format_type_) {
if (OB_FAIL(ObResolverUtils::calc_file_column_idx(col.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
@ -8263,7 +8274,8 @@ int ObDMLResolver::resolve_external_table_generated_column(
LOG_WARN("fail to build external table file column expr", K(ret));
}
}
} else if (ObExternalFileFormat::PARQUET_FORMAT == format.format_type_) {
} else if (ObExternalFileFormat::PARQUET_FORMAT == format.format_type_ ||
ObExternalFileFormat::ORC_FORMAT == format.format_type_ ) {
ObRawExpr *cast_expr = NULL;
ObRawExpr *get_path_expr = NULL;
ObRawExpr *cast_type_expr = NULL;