[FEAT MERGE] Support Parquet Format for External Table

This commit is contained in:
wjhh2008 2024-06-24 06:19:18 +00:00 committed by ob-robot
parent 851ab86cbc
commit d1e91320ec
48 changed files with 3321 additions and 217 deletions

View File

@ -32,6 +32,7 @@ devdeps-cos-c-sdk-5.0.16-52023070517.el7.aarch64.rpm
devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.aarch64.rpm
devdeps-protobuf-c-1.4.1-100000072023102410.el7.aarch64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.aarch64.rpm
devdeps-apache-arrow-9.0.0-302024052920.el7.aarch64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el7.aarch64.rpm

View File

@ -35,6 +35,7 @@ devdeps-cloud-qpl-1.1.0-272023061419.el7.x86_64.rpm
devdeps-s3-cpp-sdk-1.11.156-102023122011.el7.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el7.x86_64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el7.x86_64.rpm
devdeps-apache-arrow-9.0.0-222024052223.el7.x86_64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el7.x86_64.rpm

View File

@ -32,6 +32,7 @@ devdeps-cos-c-sdk-5.0.16-52023070517.el8.aarch64.rpm
devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.aarch64.rpm
devdeps-protobuf-c-1.4.1-100000072023102410.el8.aarch64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.aarch64.rpm
devdeps-apache-arrow-9.0.0-322024052923.el8.aarch64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el8.aarch64.rpm

View File

@ -34,6 +34,7 @@ devdeps-cloud-qpl-1.1.0-272023061419.el8.x86_64.rpm
devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
devdeps-roaringbitmap-croaring-3.0.0-42024042816.el8.x86_64.rpm
devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm
[tools]
obdevtools-binutils-2.30-12022100413.el8.x86_64.rpm

View File

@ -37,6 +37,7 @@ devdeps-cos-c-sdk-5.0.16-52023070517.el8.x86_64.rpm
devdeps-cloud-qpl-1.1.0-272023061419.el8.x86_64.rpm
devdeps-s3-cpp-sdk-1.11.156-102023122011.el8.x86_64.rpm
devdeps-protobuf-c-1.4.1-100000062023102016.el8.x86_64.rpm
devdeps-apache-arrow-9.0.0-172024052218.el8.x86_64.rpm
[deps-el9]
devdeps-apr-1.6.5-232023090616.el9.x86_64.rpm target=el9

View File

@ -22,6 +22,7 @@ target_include_directories(
${DEP_3RD_DIR}/usr/include/
${DEP_DIR}/include/apr-1/
${DEP_DIR}/include/icu/common
${DEP_DIR}/include/apache-arrow
${USSL_INCLUDE_DIRS}
)
@ -200,6 +201,9 @@ target_link_libraries(oblib_base_base_base
${DEP_DIR}/lib/libicustubdata.a
${DEP_DIR}/lib/libicuuc.a
${DEP_DIR}/lib/libprotobuf-c.a
${DEP_DIR}/lib64/libarrow.a
${DEP_DIR}/lib64/libparquet.a
${DEP_DIR}/lib64/libarrow_bundled_dependencies.a
-L${DEP_DIR}/var/usr/lib64
-L${DEP_DIR}/var/usr/lib
-L${DEP_3RD_DIR}/usr/lib
@ -226,6 +230,9 @@ target_link_libraries(oblib_base_base_base
${DEP_DIR}/lib/libicustubdata.a
${DEP_DIR}/lib/libicuuc.a
${DEP_DIR}/lib/libprotobuf-c.a
${DEP_DIR}/lib64/libarrow.a
${DEP_DIR}/lib64/libparquet.a
${DEP_DIR}/lib64/libarrow_bundled_dependencies.a
-L${DEP_DIR}/var/usr/lib64
-L${DEP_DIR}/var/usr/lib
-L${DEP_3RD_DIR}/usr/lib

View File

@ -1068,6 +1068,8 @@
#define N_EXTERNAL_FILE_COLUMN_PREFIX "metadata$filecol"
#define N_PARTITION_LIST_COL "metadata$partition_list_col"
#define N_EXTERNAL_FILE_URL "metadata$fileurl"
#define N_EXTERNAL_FILE_ROW "external$filerow"
#define N_PREFIX_PATTERN "prefix_pattern"
#define N_PRIV_XML_BINARY "_make_xml_binary"
#define N_SYS_MAKEXML "sys_makexml"
@ -1148,4 +1150,5 @@
#define N_RB_ANDNOT_NULL2EMPTY "rb_andnot_null2empty"
#define N_RB_TO_STRING "rb_to_string"
#define N_RB_FROM_STRING "rb_from_string"
#define N_GET_PATH "get_path"
#endif //OCEANBASE_LIB_OB_NAME_DEF_H_

View File

@ -5,6 +5,7 @@
# oblib_addtest(thread_local/test_itid.cpp)
# oblib_addtest(time/test_ob_time_utility.cpp)
# oblib_addtest(timezone/test_ob_timezone_utils.cpp)
oblib_addtest(parquet/test_parquet.cpp)
oblib_addtest(alloc/test_alloc_struct.cpp)
oblib_addtest(alloc/test_block_set.cpp)
oblib_addtest(alloc/test_chunk_mgr.cpp)

View File

@ -0,0 +1,717 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL
#include "gtest/gtest.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <arrow/util/logging.h>
#include <iostream>
#include "lib/allocator/page_arena.h"
#include "lib/file/ob_file.h"
#include "lib/file/file_directory_utils.h"
#include "lib/charset/ob_template_helper.h"
#include "lib/net/ob_net_util.h"
#define USING_LOG_PREFIX SQL
using namespace oceanbase::common;
class TestParquet: public ::testing::Test
{
public:
TestParquet();
virtual ~TestParquet();
virtual void SetUp();
virtual void TearDown();
};
TestParquet::TestParquet()
{
}
TestParquet::~TestParquet()
{
}
void TestParquet::SetUp()
{
}
void TestParquet::TearDown()
{
}
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
// #0 Build dummy data to pass around
// To have some input data, we first create an Arrow Table that holds
// some data.
std::shared_ptr<arrow::Table> generate_table() {
arrow::Int64Builder i64builder;
PARQUET_THROW_NOT_OK(i64builder.AppendValues({1, 2, 3, 4, 5}));
std::shared_ptr<arrow::Array> i64array;
PARQUET_THROW_NOT_OK(i64builder.Finish(&i64array));
arrow::StringBuilder strbuilder;
PARQUET_THROW_NOT_OK(strbuilder.Append("some"));
PARQUET_THROW_NOT_OK(strbuilder.Append("string"));
PARQUET_THROW_NOT_OK(strbuilder.Append("content"));
PARQUET_THROW_NOT_OK(strbuilder.Append("in"));
PARQUET_THROW_NOT_OK(strbuilder.Append("rows"));
std::shared_ptr<arrow::Array> strarray;
PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray));
std::shared_ptr<arrow::Schema> schema = arrow::schema(
{arrow::field("int", arrow::int64()), arrow::field("str", arrow::utf8())});
return arrow::Table::Make(schema, {i64array, strarray});
}
// #1 Write out the data as a Parquet file
void write_parquet_file(const arrow::Table& table) {
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet"));
// The last argument to the function call is the size of the RowGroup in
// the parquet file. Normally you would choose this to be rather large but
// for the example, we use a small value to have multiple RowGroups.
PARQUET_THROW_NOT_OK(
parquet::arrow::WriteTable(table, arrow::default_memory_pool(), outfile, 3));
}
// #2: Fully read in the file
void read_whole_file() {
std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl;
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(infile,
arrow::io::ReadableFile::Open("parquet-arrow-example.parquet",
arrow::default_memory_pool()));
std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
<< " columns." << std::endl;
}
// #3: Read only a single RowGroup of the parquet file
void read_single_rowgroup() {
std::cout << "Reading first RowGroup of parquet-arrow-example.parquet" << std::endl;
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(infile,
arrow::io::ReadableFile::Open("parquet-arrow-example.parquet",
arrow::default_memory_pool()));
std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
<< " columns." << std::endl;
}
// #4: Read only a single column of the whole parquet file
void read_single_column() {
std::cout << "Reading first column of parquet-arrow-example.parquet" << std::endl;
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(infile,
arrow::io::ReadableFile::Open("parquet-arrow-example.parquet",
arrow::default_memory_pool()));
std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
std::cout << std::endl;
}
// #5: Read only a single column of a RowGroup (this is known as ColumnChunk)
// from the Parquet file.
void read_single_column_chunk() {
std::cout << "Reading first ColumnChunk of the first RowGroup of "
"parquet-arrow-example.parquet"
<< std::endl;
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(infile,
arrow::io::ReadableFile::Open("parquet-arrow-example.parquet",
arrow::default_memory_pool()));
std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
std::cout << std::endl;
}
class ObParquetAllocator : public ::arrow::MemoryPool
{
public:
/// Allocate a new memory region of at least size bytes.
///
/// The allocated region shall be 64-byte aligned.
virtual arrow::Status Allocate(int64_t size, uint8_t** out) override
{
arrow::Status ret = arrow::Status::OK();
void *buf = alloc_.alloc_aligned(size, 64);
if (OB_ISNULL(buf)) {
ret = arrow::Status::Invalid("allocate memory failed");
} else {
*out = static_cast<uint8_t*>(buf);
}
std::cout << "Allocing : " << size << std::endl;
return arrow::Status::OK();
}
/// Resize an already allocated memory section.
///
/// As by default most default allocators on a platform don't support aligned
/// reallocation, this function can involve a copy of the underlying data.
virtual arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr)
{
std::cout << "Reallocing : " << old_size << ',' << new_size << std::endl;
return Allocate(new_size, ptr);
}
/// Free an allocated region.
///
/// @param buffer Pointer to the start of the allocated memory region
/// @param size Allocated size located at buffer. An allocator implementation
/// may use this for tracking the amount of allocated bytes as well as for
/// faster deallocation if supported by its backend.
virtual void Free(uint8_t* buffer, int64_t size) {
std::cout << "Freed : " << size << std::endl;
alloc_.free(buffer);
}
/// Return unused memory to the OS
///
/// Only applies to allocators that hold onto unused memory. This will be
/// best effort, a memory pool may not implement this feature or may be
/// unable to fulfill the request due to fragmentation.
virtual void ReleaseUnused() {
std::cout << "ReleaseUnused" << std::endl;
}
/// The number of bytes that were allocated and not yet free'd through
/// this allocator.
virtual int64_t bytes_allocated() const override {
std::cout << "bytes_allocated()" << std::endl;
return alloc_.total();
}
/// Return peak memory allocation in this memory pool
///
/// \return Maximum bytes allocated. If not known (or not implemented),
/// returns -1
virtual int64_t max_memory() const override { return -1; }
/// The name of the backend used by this MemoryPool (e.g. "system" or "jemalloc").
virtual std::string backend_name() const override { return "Parquet"; }
private:
ObArenaAllocator alloc_;
arrow::internal::MemoryPoolStats stats_;
};
class ObExternalFileReader : public arrow::io::RandomAccessFile {
public:
ObExternalFileReader(const char*file_name, arrow::MemoryPool *pool) {
file_reader_.open(file_name, false);
pool_ = pool;
file_name_ = file_name;
}
~ObExternalFileReader() override {}
virtual arrow::Status Close() override;
virtual bool closed() const override;
virtual arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
virtual arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
virtual arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
virtual arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
virtual arrow::Status Seek(int64_t position) override;
virtual arrow::Result<int64_t> Tell() const override;
virtual arrow::Result<int64_t> GetSize() override;
private:
ObFileReader file_reader_;
int64_t position_;
arrow::MemoryPool *pool_;
const char* file_name_;
};
arrow::Status ObExternalFileReader::Seek(int64_t position) {
std::cout<< "ObExternalFileReader::Seek" << std::endl;
position_ = position;
return arrow::Status::OK();
}
arrow::Result<int64_t> ObExternalFileReader::Read(int64_t nbytes, void *out)
{
std::cout<< "ObExternalFileReader::Read(int64_t nbytes, void *out)" << std::endl;
int64_t read_size = -1;
file_reader_.pread(out, nbytes, position_, read_size);
position_ += read_size;
return read_size;
}
arrow::Result<std::shared_ptr<arrow::Buffer>> ObExternalFileReader::Read(int64_t nbytes)
{
std::cout<< "ObExternalFileReader::Read(int64_t nbytes)" << std::endl;
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
if (bytes_read < nbytes) {
RETURN_NOT_OK(buffer->Resize(bytes_read));
}
return std::move(buffer);
}
arrow::Result<int64_t> ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes, void* out)
{
std::cout<< "ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes, void* out)" << std::endl;
int64_t read_size = -1;
file_reader_.pread(out, nbytes, position, read_size);
position_ = position + read_size;
return read_size;
}
arrow::Result<std::shared_ptr<arrow::Buffer>> ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes)
{
std::cout<< "ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes)" << std::endl;
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
ReadAt(position, nbytes, buffer->mutable_data()));
if (bytes_read < nbytes) {
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}
return std::move(buffer);
}
arrow::Result<int64_t> ObExternalFileReader::Tell() const
{
std::cout<< "ObExternalFileReader::Tell()" << std::endl;
return position_;
}
arrow::Result<int64_t> ObExternalFileReader::GetSize()
{
std::cout<< "ObExternalFileReader::GetSize()" << std::endl;
int64_t file_size = 0;
FileDirectoryUtils::get_file_size(file_name_, file_size);
return file_size;
}
arrow::Status ObExternalFileReader::Close()
{
std::cout<< "ObExternalFileReader::Close()" << std::endl;
file_reader_.close();
return arrow::Status::OK();
}
bool ObExternalFileReader::closed() const
{
std::cout<< "ObExternalFileReader::closed()" << std::endl;
return !file_reader_.is_opened();
}
void read_column_schema() {
std::cout << "Reading column schema "
"parquet-arrow-example.parquet"
<< std::endl;
ObParquetAllocator alloc;
parquet::ReaderProperties read_props(&alloc);
std::cout<< "create parquet_file : " << std::endl;
std::shared_ptr<ObExternalFileReader> reader =
std::make_shared<ObExternalFileReader>("parquet-arrow-example.parquet", &alloc);
std::cout<< "create file reader : " << std::endl;
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::Open(reader, read_props);
// Get the File MetaData
std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
int num_row_groups = file_metadata->num_row_groups();
int num_columns = file_metadata->num_columns();
std::cout<< "num_row_groups : " << num_row_groups << std::endl;
std::cout<< "num_columns : " << num_columns << std::endl;
for (int i = 0; i < num_columns; i++) {
std::cout<<"Path="<<file_metadata->schema()->Column(i)->path()->ToDotString()<<std::endl;
}
std::cout<<"Path="<<file_metadata->schema()->ColumnIndex(std::string("int"))<<std::endl;
std::cout<<"Path="<<file_metadata->schema()->ColumnIndex(std::string("str"))<<std::endl;
for (int i = 0; i < num_row_groups; i++) {
for (int j = 0; j < num_columns; j++) {
parquet::Type::type col_type = file_metadata->RowGroup(i)->ColumnChunk(j)->type();
std::cout<<"ColumnType="<<col_type<<std::endl;
}
}
int64_t value = 0;
int64_t values_read = 0;
int64_t rows_read = 0;
for (int r = 0; r < num_row_groups; ++r) {
// Get the RowGroup Reader
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);
std::shared_ptr<parquet::ColumnReader> column_reader;
column_reader = row_group_reader->Column(0);
parquet::Int64Reader* int64_reader =
static_cast<parquet::Int64Reader*>(column_reader.get());
while (int64_reader->HasNext()) {
std::cout << "before int64: " << std::endl;
rows_read = int64_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
std::cout << "read int64: " << value << std::endl;
std::cout << "read rows: " << rows_read << std::endl;
std::cout << "read values_read: " << values_read << std::endl;
}
column_reader = row_group_reader->Column(1);
parquet::ByteArrayReader* ba_reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());
while (ba_reader->HasNext()) {
parquet::ByteArray value;
std::cout << "before bytes: " << std::endl;
rows_read =
ba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
std::cout << "read bytes: " << std::string(pointer_cast<const char*>(value.ptr), value.len) << std::endl;
std::cout << "read rows: " << rows_read << std::endl;
std::cout << "read values_read: " << values_read << std::endl;
}
}
}
using parquet::ConvertedType;
using parquet::Repetition;
using parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::PrimitiveNode;
constexpr int FIXED_LENGTH = 10;
constexpr int FIXED_LENGTH_DEC = 14;
static std::shared_ptr<GroupNode> SetupSchema() {
parquet::schema::NodeVector fields;
fields.push_back(PrimitiveNode::Make("int", Repetition::OPTIONAL, parquet::LogicalType::Int(64, true), Type::INT64));
fields.push_back(PrimitiveNode::Make("string", Repetition::OPTIONAL, parquet::LogicalType::String(), Type::BYTE_ARRAY));
fields.push_back(PrimitiveNode::Make("decimal32", Repetition::OPTIONAL, parquet::LogicalType::Decimal(6, 3), Type::INT32));
fields.push_back(PrimitiveNode::Make("decimal64", Repetition::OPTIONAL, parquet::LogicalType::Decimal(10, 0), Type::INT64));
fields.push_back(PrimitiveNode::Make("decimalbytearr", Repetition::OPTIONAL, parquet::LogicalType::Decimal(20, 3), Type::BYTE_ARRAY));
fields.push_back(PrimitiveNode::Make("date", Repetition::OPTIONAL, parquet::LogicalType::Date(), Type::INT32));
fields.push_back(PrimitiveNode::Make("timestamp", Repetition::OPTIONAL, parquet::LogicalType::Timestamp(true, parquet::LogicalType::TimeUnit::MICROS), Type::INT64));
return std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
void gen_test_parquet() {
/**********************************************************************************
PARQUET WRITER EXAMPLE
**********************************************************************************/
// parquet::REQUIRED fields do not need definition and repetition level values
// parquet::OPTIONAL fields require only definition level values
// parquet::REPEATED fields require both definition and repetition level values
try {
// Create a local file output stream instance.
using FileClass = ::arrow::io::FileOutputStream;
std::shared_ptr<FileClass> out_file;
PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME));
// Setup the parquet schema
std::shared_ptr<GroupNode> schema = SetupSchema();
// Add writer properties
parquet::WriterProperties::Builder builder;
builder.compression(parquet::Compression::SNAPPY);
std::shared_ptr<parquet::WriterProperties> props = builder.build();
// Create a ParquetFileWriter instance
std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = NULL;
/*
// Write the Bool column
parquet::BoolWriter* bool_writer =
static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool value = ((i % 2) == 0) ? true : false;
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
parquet::Int32Writer* int32_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
int32_t value = i;
int32_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int64 column. Each row has repeats twice.
parquet::Int64Writer* int64_writer =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
int64_t value = i * 1000 * 1000;
value *= 1000 * 1000;
int16_t definition_level = 1;
int16_t repetition_level = 0;
if ((i % 2) == 0) {
repetition_level = 1; // start of a new record
}
int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
}
// Write the INT96 column.
parquet::Int96Writer* int96_writer =
static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::Int96 value;
value.value[0] = i;
value.value[1] = i + 1;
value.value[2] = i + 2;
int96_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Float column
parquet::FloatWriter* float_writer =
static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
float value = static_cast<float>(i) * 1.1f;
float_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Double column
parquet::DoubleWriter* double_writer =
static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
double value = i * 1.1111111;
double_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the ByteArray column. Make every alternate values NULL
parquet::ByteArrayWriter* ba_writer =
static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::ByteArray value;
char hello[FIXED_LENGTH] = "parquet";
hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
if (i % 2 == 0) {
int16_t definition_level = 1;
value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
value.len = FIXED_LENGTH;
ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
} else {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
}
}
// Write the FixedLengthByteArray column
parquet::FixedLenByteArrayWriter* flba_writer =
static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::FixedLenByteArray value;
char v = static_cast<char>(i);
char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
flba_writer->WriteBatch(1, nullptr, nullptr, &value);
}
*/
#define HAS_NULL 1
for (int j = 0; j < 10; j++) {
rg_writer = file_writer->AppendRowGroup();
parquet::Int64Writer* int64_writer =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
if (HAS_NULL && i % 2 == 0) {
int16_t definition_level = 0;
int64_writer->WriteBatch(1, &definition_level, nullptr, NULL);
} else {
int64_t value = i * 1000 * 1000 + j;
value *= 1000 * 1000;
int16_t definition_level = 1;
int64_writer->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::ByteArrayWriter* ba_writer =
static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::ByteArray value;
char hello[FIXED_LENGTH] = "parquet";
hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
if (HAS_NULL && i % 2 == 1) {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
} else {
int16_t definition_level = 1;
value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
value.len = FIXED_LENGTH;
ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::Int32Writer* int32_writer2 =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
if (HAS_NULL && i % 3 == 0) {
int16_t definition_level = 0;
int32_writer2->WriteBatch(1, &definition_level, nullptr, NULL);
} else {
int32_t value = j * 10000 + i;
int16_t definition_level = 1;
int32_writer2->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::Int64Writer* int64_writer2 =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
if (HAS_NULL && i % 3 == 1) {
int16_t definition_level = 0;
int64_writer2->WriteBatch(1, &definition_level, nullptr, NULL);
} else {
int64_t value = (j * 10000 + i) * 10000;
int16_t definition_level = 1;
int64_writer2->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::ByteArrayWriter* ba_writer2 =
static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
parquet::ByteArray value;
char hello[FIXED_LENGTH_DEC] = "1234567890.";
hello[11] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[12] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[13] = static_cast<char>(static_cast<int>('0') + i % 10);
if (HAS_NULL && i % 5 == 0) {
int16_t definition_level = 0;
ba_writer2->WriteBatch(1, &definition_level, nullptr, nullptr);
} else {
int16_t definition_level = 1;
value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
value.len = FIXED_LENGTH_DEC;
ba_writer2->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::Int32Writer* int32_writer3 =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
if (HAS_NULL && i % 6 == 0) {
int16_t definition_level = 0;
int32_writer3->WriteBatch(1, &definition_level, nullptr, NULL);
} else {
int32_t value = 19857 + 365 * 10 * j + i;
int16_t definition_level = 1;
int32_writer3->WriteBatch(1, &definition_level, nullptr, &value);
}
}
parquet::Int64Writer* int64_writer3 =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
if (HAS_NULL && i % 3 == 1) {
int16_t definition_level = 0;
int64_writer3->WriteBatch(1, &definition_level, nullptr, NULL);
} else {
int64_t value = (1716887565LL + i * 3600) * 1000 * 1000;
int16_t definition_level = 1;
int64_writer3->WriteBatch(1, &definition_level, nullptr, &value);
}
}
}
// Close the ParquetFileWriter
file_writer->Close();
// Write the bytes to file
DCHECK(out_file->Close().ok());
} catch (const std::exception& e) {
std::cerr << "Parquet write error: " << e.what() << std::endl;
}
}
TEST_F(TestParquet, example1)
{
std::shared_ptr<arrow::Table> table = generate_table();
write_parquet_file(*table);
read_whole_file();
read_single_rowgroup();
read_single_column();
read_single_column_chunk();
read_column_schema();
gen_test_parquet();
}
int main(int argc, char **argv)
{
OB_LOGGER.set_log_level("INFO");
testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}

View File

@ -263,6 +263,8 @@ int ObExternalTableUtils::make_external_table_scan_range(const common::ObString
obj_start[FILE_ID].set_int(file_id);
obj_end[FILE_ID] = ObObj();
obj_end[FILE_ID].set_int(file_id);
obj_start[ROW_GROUP_NUMBER].set_min_value();
obj_end[ROW_GROUP_NUMBER].set_max_value();
obj_start[LINE_NUMBER] = ObObj();
obj_start[LINE_NUMBER].set_int(first_lineno);
obj_end[LINE_NUMBER] = ObObj();
@ -576,7 +578,7 @@ int ObExternalTableUtils::collect_local_files_on_servers(
context.get_cb_list().at(i)->~ObRpcAsyncLoadExternalTableFileCallBack();
}
}
LOG_TRACE("update external table file list", K(ret), K(file_urls));
LOG_TRACE("update external table file list", K(ret), K(file_urls), K(location), K(pattern), K(all_servers));
return ret;
}

View File

@ -57,6 +57,7 @@ class ObExternalTableUtils {
PARTITION_ID = 0,
FILE_URL,
FILE_ID,
ROW_GROUP_NUMBER,
LINE_NUMBER,
MAX_EXTERNAL_FILE_SCANKEY
};

View File

@ -860,7 +860,7 @@ int ObInnerTableSchema::dba_ob_external_table_files_schema(ObTableSchema &table_
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, 'P0' AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0 INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0 WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) AND B.TABLE_MODE >> 12 & 15 in (0,1) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, P.PART_NAME AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0 INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0 LEFT JOIN OCEANBASE.__ALL_PART P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = 0 WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) AND B.TABLE_MODE >> 12 & 15 in (0,1) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
@ -910,7 +910,7 @@ int ObInnerTableSchema::all_ob_external_table_files_schema(ObTableSchema &table_
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, 'P0' AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0 INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0 WHERE B.TABLE_TYPE = 14 AND B.TABLE_MODE >> 12 & 15 in (0,1) AND 0 = sys_privilege_check('table_acc', EFFECTIVE_TENANT_ID(), C.DATABASE_NAME, B.TABLE_NAME) AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, P.PART_NAME AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0 INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0 LEFT JOIN OCEANBASE.__ALL_PART P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = 0 WHERE B.TABLE_TYPE = 14 AND B.TABLE_MODE >> 12 & 15 in (0,1) AND 0 = sys_privilege_check('table_acc', EFFECTIVE_TENANT_ID(), C.DATABASE_NAME, B.TABLE_NAME) AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
@ -1260,7 +1260,7 @@ int ObInnerTableSchema::cdb_ob_external_table_files_schema(ObTableSchema &table_
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT A.TENANT_ID AS TENANT_ID, B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, 'P0' AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_VIRTUAL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_VIRTUAL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND A.TENANT_ID=B.TENANT_ID AND B.TABLE_MODE >> 12 & 15 in (0,1) INNER JOIN OCEANBASE.__ALL_VIRTUAL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID=C.TENANT_ID WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT A.TENANT_ID AS TENANT_ID, B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS TABLE_SCHEMA, P.PART_NAME AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM OCEANBASE.__ALL_VIRTUAL_EXTERNAL_TABLE_FILE A INNER JOIN OCEANBASE.__ALL_VIRTUAL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND A.TENANT_ID=B.TENANT_ID AND B.TABLE_MODE >> 12 & 15 in (0,1) INNER JOIN OCEANBASE.__ALL_VIRTUAL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID=C.TENANT_ID LEFT JOIN OCEANBASE.__ALL_VIRTUAL_PART P ON A.PART_ID = P.PART_ID AND C.TENANT_ID = P.TENANT_ID WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}

View File

@ -1660,7 +1660,7 @@ int ObInnerTableSchema::dba_ob_external_table_files_ora_schema(ObTableSchema &ta
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS OWNER, 'P0' AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1) INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS OWNER, P.PART_NAME AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1) INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID LEFT JOIN SYS.ALL_VIRTUAL_PART_REAL_AGENT P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = C.TENANT_ID WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
@ -1710,7 +1710,7 @@ int ObInnerTableSchema::all_ob_external_table_files_ora_schema(ObTableSchema &ta
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS OWNER, 'P0' AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1) INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND (C.DATABASE_ID = USERENV('SCHEMAID') OR USER_CAN_ACCESS_OBJ(1, B.TABLE_ID, C.DATABASE_ID) = 1) AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT B.TABLE_NAME AS TABLE_NAME, C.DATABASE_NAME AS OWNER, P.PART_NAME AS PARTITION_NAME, A.FILE_URL AS FILE_URL, A.FILE_SIZE AS FILE_SIZE FROM SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1) INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID LEFT JOIN SYS.ALL_VIRTUAL_PART_REAL_AGENT P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = C.TENANT_ID WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND (C.DATABASE_ID = USERENV('SCHEMAID') OR USER_CAN_ACCESS_OBJ(1, B.TABLE_ID, C.DATABASE_ID) = 1) AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}

View File

@ -31540,13 +31540,14 @@ def_table_schema(
SELECT
B.TABLE_NAME AS TABLE_NAME,
C.DATABASE_NAME AS TABLE_SCHEMA,
'P0' AS PARTITION_NAME,
P.PART_NAME AS PARTITION_NAME,
A.FILE_URL AS FILE_URL,
A.FILE_SIZE AS FILE_SIZE
FROM
OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A
INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0
INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0
LEFT JOIN OCEANBASE.__ALL_PART P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = 0
WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION)
AND B.TABLE_MODE >> 12 & 15 in (0,1)
""".replace("\n", " ")
@ -31565,13 +31566,14 @@ def_table_schema(
SELECT
B.TABLE_NAME AS TABLE_NAME,
C.DATABASE_NAME AS TABLE_SCHEMA,
'P0' AS PARTITION_NAME,
P.PART_NAME AS PARTITION_NAME,
A.FILE_URL AS FILE_URL,
A.FILE_SIZE AS FILE_SIZE
FROM
OCEANBASE.__ALL_EXTERNAL_TABLE_FILE A
INNER JOIN OCEANBASE.__ALL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND B.TENANT_ID = 0
INNER JOIN OCEANBASE.__ALL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND C.TENANT_ID = 0
LEFT JOIN OCEANBASE.__ALL_PART P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = 0
WHERE B.TABLE_TYPE = 14
AND B.TABLE_MODE >> 12 & 15 in (0,1)
AND 0 = sys_privilege_check('table_acc', EFFECTIVE_TENANT_ID(), C.DATABASE_NAME, B.TABLE_NAME)
@ -31871,13 +31873,14 @@ def_table_schema(
A.TENANT_ID AS TENANT_ID,
B.TABLE_NAME AS TABLE_NAME,
C.DATABASE_NAME AS TABLE_SCHEMA,
'P0' AS PARTITION_NAME,
P.PART_NAME AS PARTITION_NAME,
A.FILE_URL AS FILE_URL,
A.FILE_SIZE AS FILE_SIZE
FROM
OCEANBASE.__ALL_VIRTUAL_EXTERNAL_TABLE_FILE A
INNER JOIN OCEANBASE.__ALL_VIRTUAL_TABLE B ON A.TABLE_ID = B.TABLE_ID AND A.TENANT_ID=B.TENANT_ID AND B.TABLE_MODE >> 12 & 15 in (0,1)
INNER JOIN OCEANBASE.__ALL_VIRTUAL_DATABASE C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID=C.TENANT_ID
LEFT JOIN OCEANBASE.__ALL_VIRTUAL_PART P ON A.PART_ID = P.PART_ID AND C.TENANT_ID = P.TENANT_ID
WHERE B.TABLE_TYPE = 14 AND (A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION)
""".replace("\n", " ")
)
@ -53216,13 +53219,14 @@ def_table_schema(
SELECT
B.TABLE_NAME AS TABLE_NAME,
C.DATABASE_NAME AS OWNER,
'P0' AS PARTITION_NAME,
P.PART_NAME AS PARTITION_NAME,
A.FILE_URL AS FILE_URL,
A.FILE_SIZE AS FILE_SIZE
FROM
SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A
INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1)
INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID
LEFT JOIN SYS.ALL_VIRTUAL_PART_REAL_AGENT P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = C.TENANT_ID
WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND
(A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION)
""".replace("\n", " ")
@ -53243,13 +53247,14 @@ def_table_schema(
SELECT
B.TABLE_NAME AS TABLE_NAME,
C.DATABASE_NAME AS OWNER,
'P0' AS PARTITION_NAME,
P.PART_NAME AS PARTITION_NAME,
A.FILE_URL AS FILE_URL,
A.FILE_SIZE AS FILE_SIZE
FROM
SYS.ALL_VIRTUAL_EXTERNAL_TABLE_FILE_REAL_AGENT A
INNER JOIN SYS.ALL_VIRTUAL_TABLE_REAL_AGENT B ON A.TABLE_ID = B.TABLE_ID AND bitand((B.TABLE_MODE / 4096), 15) IN (0,1)
INNER JOIN SYS.ALL_VIRTUAL_DATABASE_REAL_AGENT C ON B.DATABASE_ID = C.DATABASE_ID AND B.TENANT_ID = C.TENANT_ID
LEFT JOIN SYS.ALL_VIRTUAL_PART_REAL_AGENT P ON A.PART_ID = P.PART_ID AND P.TENANT_ID = C.TENANT_ID
WHERE B.TENANT_ID = EFFECTIVE_TENANT_ID() AND B.TABLE_TYPE = 14 AND
(C.DATABASE_ID = USERENV('SCHEMAID') OR USER_CAN_ACCESS_OBJ(1, B.TABLE_ID, C.DATABASE_ID) = 1) AND
(A.DELETE_VERSION = 9223372036854775807 OR A.DELETE_VERSION < A.CREATE_VERSION)

View File

@ -5350,16 +5350,19 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
ObExternalFileFormat format;
if (OB_FAIL(format.load_from_string(table_schema.get_external_file_format(), allocator))) {
SHARE_SCHEMA_LOG(WARN, "fail to load from json string", K(ret));
} else if (format.format_type_ != ObExternalFileFormat::CSV_FORMAT) {
} else if (!(format.format_type_ > ObExternalFileFormat::INVALID_FORMAT
&& format.format_type_ < ObExternalFileFormat::MAX_FORMAT)) {
ret = OB_NOT_SUPPORTED;
SHARE_SCHEMA_LOG(WARN, "unsupported to print file format", K(ret), K(format.format_type_));
} else {
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, " TYPE = '%s',", ObExternalFileFormat::FORMAT_TYPE_STR[format.format_type_]))) {
SHARE_SCHEMA_LOG(WARN, "fail to print TYPE", K(ret));
}
if (OB_SUCC(ret) && ObExternalFileFormat::CSV_FORMAT == format.format_type_) {
const ObCSVGeneralFormat &csv = format.csv_format_;
const ObOriginFileFormat &origin_format = format.origin_file_format_str_;
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\nFORMAT (\n"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print FORMAT (", K(ret));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos, " TYPE = 'CSV',"))) {
SHARE_SCHEMA_LOG(WARN, "fail to print TYPE", K(ret));
} else if (OB_FAIL(0 != csv.line_term_str_.case_compare(ObDataInFileStruct::DEFAULT_LINE_TERM_STR) &&
if (OB_FAIL(0 != csv.line_term_str_.case_compare(ObDataInFileStruct::DEFAULT_LINE_TERM_STR) &&
databuff_printf(buf, buf_len, pos, "\n LINE_DELIMITER = %.*s,", origin_format.origin_line_term_str_.length(), origin_format.origin_line_term_str_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print LINE_DELIMITER", K(ret));
} else if (OB_FAIL(0 != csv.field_term_str_.case_compare(ObDataInFileStruct::DEFAULT_FIELD_TERM_STR) &&
@ -5388,11 +5391,12 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
} else if (OB_FAIL(0 != csv.null_if_.count() &&
databuff_printf(buf, buf_len, pos, "\n NULL_IF = (%.*s),", origin_format.origin_null_if_str_.length(), origin_format.origin_null_if_str_.ptr()))) {
SHARE_SCHEMA_LOG(WARN, "fail to print NULL_IF", K(ret));
} else {
--pos;
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n) "))) {
SHARE_SCHEMA_LOG(WARN, "fail to print )", K(ret));
}
}
}
if (OB_SUCC(ret)) {
--pos;
if (OB_FAIL(databuff_printf(buf, buf_len, pos, "\n) "))) {
SHARE_SCHEMA_LOG(WARN, "fail to print )", K(ret));
}
}
}

View File

@ -394,6 +394,7 @@ ob_set_subtarget(ob_sql engine_expr
engine/expr/ob_expr_func_round.cpp
engine/expr/ob_expr_func_sleep.cpp
engine/expr/ob_expr_get_package_var.cpp
engine/expr/ob_expr_get_path.cpp
engine/expr/ob_expr_get_subprogram_var.cpp
engine/expr/ob_expr_get_sys_var.cpp
engine/expr/ob_expr_get_user_var.cpp
@ -877,6 +878,7 @@ ob_set_subtarget(ob_sql engine_table
engine/table/ob_index_lookup_op_impl.cpp
engine/table/ob_table_scan_with_index_back_op.cpp
engine/table/ob_external_table_access_service.cpp
engine/table/ob_parquet_table_row_iter.cpp
)
ob_set_subtarget(ob_sql executor

View File

@ -21,6 +21,7 @@
#include "sql/engine/expr/ob_expr_lob_utils.h"
#include "share/vector/ob_vector_define.h"
#include "sql/engine/expr/ob_datum_cast.h"
#include "sql/engine/expr/ob_expr_get_path.h"
namespace oceanbase
{
@ -501,6 +502,25 @@ int ObStaticEngineExprCG::cg_expr_by_operator(const ObIArray<ObRawExpr *> &raw_e
}
}
}
} else if (T_PSEUDO_EXTERNAL_FILE_COL == raw_expr->get_expr_type()) {
ObIExprExtraInfo *extra_info = nullptr;
ObPseudoColumnRawExpr *column_expr = static_cast<ObPseudoColumnRawExpr*>(raw_expr);
if (OB_FAIL(ObExprExtraInfoFactory::alloc(*op_cg_ctx_.allocator_, rt_expr->type_, extra_info))) {
LOG_WARN("Failed to allocate memory for ObExprOracleLRpadInfo", K(ret));
} else if (OB_ISNULL(extra_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("extra_info should not be nullptr", K(ret));
} else {
ObDataAccessPathExtraInfo *data_access_info = static_cast<ObDataAccessPathExtraInfo *>(extra_info);
if (OB_FAIL(ob_write_string(*op_cg_ctx_.allocator_,
column_expr->get_data_access_path(),
data_access_info->data_access_path_))) {
LOG_WARN("fail to write string", K(ret));
} else {
rt_expr->extra_info_ = extra_info;
LOG_DEBUG("external file col expr", K(ret), "path", data_access_info->data_access_path_);
}
}
} else if (!IS_EXPR_OP(rt_expr->type_) || IS_AGGR_FUN(rt_expr->type_)) {
// do nothing
} else if (OB_FAIL(expr_cg_impl.generate_expr_operator(*raw_expr, expr_op_fetcher))) {

View File

@ -18,6 +18,7 @@
#include "lib/utility/ob_print_utils.h"
#include "lib/string/ob_hex_utils_base.h"
#include "deps/oblib/src/lib/list/ob_dlist.h"
#include "share/schema/ob_column_schema.h"
using namespace oceanbase::sql;
using namespace oceanbase::common;
@ -28,10 +29,13 @@ namespace sql
{
const char INVALID_TERM_CHAR = '\xff';
const char * FORMAT_TYPE_STR[] = {
const char * ObExternalFileFormat::FORMAT_TYPE_STR[] = {
"CSV",
"PARQUET",
};
static_assert(array_elements(FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT, "Not enough initializer for ObExternalFileFormat");
static_assert(array_elements(ObExternalFileFormat::FORMAT_TYPE_STR) == ObExternalFileFormat::MAX_FORMAT,
"Not enough initializer for ObExternalFileFormat");
int ObCSVGeneralFormat::init_format(const ObDataInFileStruct &format,
int64_t file_column_nums,
@ -347,7 +351,7 @@ int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const
pos += origin_file_format_str_.to_json_kv_string(buf + pos, buf_len - pos);
break;
default:
pos = 0;
pos += 0;
}
J_OBJ_END();
@ -389,6 +393,8 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
OZ (csv_format_.load_from_json_data(format_type_node, allocator));
OZ (origin_file_format_str_.load_from_json_data(format_type_node, allocator));
break;
case PARQUET_FORMAT:
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid format type", K(ret), K(format_type_str));
@ -399,6 +405,45 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
return ret;
}
int ObExternalFileFormat::mock_gen_column_def(
const share::schema::ObColumnSchemaV2 &column,
ObIAllocator &allocator,
ObString &def)
{
int ret = OB_SUCCESS;
ObSqlString temp_str;
switch (format_type_) {
case CSV_FORMAT: {
uint64_t file_column_idx = column.get_column_id() - OB_APP_MIN_COLUMN_ID + 1;
if (OB_FAIL(temp_str.append_fmt("%s%lu", N_EXTERNAL_FILE_COLUMN_PREFIX, file_column_idx))) {
LOG_WARN("fail to append sql str", K(ret));
}
break;
}
case PARQUET_FORMAT: {
if (OB_FAIL(temp_str.append_fmt("get_path(%s, '%.*s')",
N_EXTERNAL_FILE_ROW,
column.get_column_name_str().length(),
column.get_column_name_str().ptr()))) {
LOG_WARN("fail to append sql str", K(ret));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected format", K(ret), K(format_type_));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ob_write_string(allocator, temp_str.string(), def))) {
LOG_WARN("fail to write string", K(ret));
}
}
return ret;
}
int ObExternalFileFormat::StringData::store_str(const ObString &str)
{
return ob_write_string(allocator_, str, str_);

View File

@ -21,6 +21,11 @@
namespace oceanbase
{
namespace share {
namespace schema {
class ObColumnSchemaV2;
}
}
namespace sql
{
class ObDataInFileStruct;
@ -495,18 +500,27 @@ struct ObExternalFileFormat
enum FormatType {
INVALID_FORMAT = -1,
CSV_FORMAT,
PARQUET_FORMAT,
MAX_FORMAT
};
enum Options {
OPT_REPLACE_INVALID_CHARACTERS = 1 << 0,
OPT_BINARY_AS_TEXT = 1 << 1,
};
ObExternalFileFormat() : format_type_(INVALID_FORMAT) {}
int64_t to_string(char* buf, const int64_t buf_len) const;
int load_from_string(const common::ObString &str, ObIAllocator &allocator);
int load_from_string(const common::ObString &str, common::ObIAllocator &allocator);
int mock_gen_column_def(const share::schema::ObColumnSchemaV2 &column, common::ObIAllocator &allocator, common::ObString &def);
ObOriginFileFormat origin_file_format_str_;
FormatType format_type_;
sql::ObCSVGeneralFormat csv_format_;
uint64_t options_;
static const char *FORMAT_TYPE_STR[];
};

View File

@ -614,7 +614,7 @@ int ObExprCast::calc_result_type2(ObExprResType &type,
int ObExprCast::get_cast_type(const bool enable_decimal_int,
const ObExprResType param_type2,
const ObCastMode cast_mode,
ObExprResType &dst_type) const
ObExprResType &dst_type)
{
int ret = OB_SUCCESS;
if (!param_type2.is_int() && !param_type2.get_param().is_int()) {

View File

@ -140,12 +140,12 @@ public:
sql::ObEvalCtx &ctx,
sql::ObDatum &res_datum);
virtual int is_valid_for_generated_column(const ObRawExpr*expr, const common::ObIArray<ObRawExpr *> &exprs, bool &is_valid) const;
static int get_cast_type(const bool enable_decimal_int,
const ObExprResType param_type2,
const ObCastMode cast_mode,
ObExprResType &dst_type);
DECLARE_SET_LOCAL_SESSION_VARS;
private:
int get_cast_type(const bool enable_decimal_int,
const ObExprResType param_type2,
const ObCastMode cast_mode,
ObExprResType &dst_type) const;
int get_explicit_cast_cm(const ObExprResType &src_type,
const ObExprResType &dst_type,
const ObSQLSessionInfo &session,

View File

@ -37,6 +37,7 @@
#include "sql/engine/expr/ob_expr_json_schema_valid.h"
#include "sql/engine/expr/ob_expr_json_schema_validation_report.h"
#include "sql/engine/expr/ob_expr_json_utils.h"
#include "sql/engine/expr/ob_expr_get_path.h"
namespace oceanbase
{
@ -46,18 +47,18 @@ namespace sql
#define REG_EXTRA_INFO(type, ExtraInfoClass) \
do { \
static_assert(type > T_INVALID && type < T_MAX_OP, "invalid expr type for extra info"); \
static_assert(is_valid_item_type(type), "invalid expr type for extra info"); \
ALLOC_FUNS_[type] = ObExprExtraInfoFactory::alloc<ExtraInfoClass>; \
} while(0)
ObExprExtraInfoFactory::AllocExtraInfoFunc ObExprExtraInfoFactory::ALLOC_FUNS_[T_MAX_OP] = { };
ObExprExtraInfoFactory::AllocExtraInfoFunc ObExprExtraInfoFactory::ALLOC_FUNS_[ObExprExtraInfoFactory::MAX_ITEM_ID] = { };
int ObExprExtraInfoFactory::alloc(common::ObIAllocator &alloc,
const ObExprOperatorType &type,
ObIExprExtraInfo *&extra_info)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!(type > T_INVALID && type < T_MAX_OP))) {
if (OB_UNLIKELY(!is_valid_item_type(type))) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", K(ret), K(type));
} else if (OB_ISNULL(ALLOC_FUNS_[type])) {
@ -114,6 +115,7 @@ void ObExprExtraInfoFactory::register_expr_extra_infos()
REG_EXTRA_INFO(T_FUN_SYS_JSON_SCHEMA_VALIDATION_REPORT, ObExprJsonSchemaValidInfo);
REG_EXTRA_INFO(T_FUN_SYS_JSON_VALUE, ObExprJsonQueryParamInfo);
REG_EXTRA_INFO(T_FUN_SYS_JSON_QUERY, ObExprJsonQueryParamInfo);
REG_EXTRA_INFO(T_PSEUDO_EXTERNAL_FILE_COL, ObDataAccessPathExtraInfo);
}
} // end namespace sql

View File

@ -26,6 +26,7 @@ struct ObIExprExtraInfo;
struct ObExprExtraInfoFactory
{
public:
static constexpr int64_t MAX_ITEM_ID = T_DEFAULT;
typedef int (*AllocExtraInfoFunc) (common::ObIAllocator &alloc, ObIExprExtraInfo *&extra_info,
const ObExprOperatorType type);
// allocate extra info
@ -35,9 +36,13 @@ public:
static void register_expr_extra_infos();
inline static constexpr bool is_valid_item_type(const ObExprOperatorType &type) {
return (type > T_INVALID && type < MAX_ITEM_ID);
}
inline static bool is_registered(const ObExprOperatorType &type)
{
return type > T_INVALID && type < T_MAX_OP
return is_valid_item_type(type)
&& NULL != ALLOC_FUNS_[type];
}
@ -47,7 +52,7 @@ private:
const ObExprOperatorType type);
private:
static AllocExtraInfoFunc ALLOC_FUNS_[T_MAX_OP];
static AllocExtraInfoFunc ALLOC_FUNS_[MAX_ITEM_ID];
};
template <typename T>

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/expr/ob_expr_get_path.h"
#include "lib/string/ob_string.h"
namespace oceanbase
{
using namespace common;
namespace sql
{
int ObDataAccessPathExtraInfo::deep_copy(common::ObIAllocator &allocator,
const ObExprOperatorType type,
ObIExprExtraInfo *&copied_info) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObExprExtraInfoFactory::alloc(allocator, type, copied_info))) {
LOG_WARN("Failed to allocate memory for ObExprOracleLRpadInfo", K(ret));
} else if (OB_ISNULL(copied_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("extra_info should not be nullptr", K(ret));
} else {
ObDataAccessPathExtraInfo *other = static_cast<ObDataAccessPathExtraInfo *>(copied_info);
if (OB_FAIL(ob_write_string(allocator, data_access_path_, other->data_access_path_))) {
LOG_WARN("fail to write string", K(ret));
}
}
return ret;
}
OB_SERIALIZE_MEMBER(ObDataAccessPathExtraInfo, data_access_path_);
}
}

View File

@ -0,0 +1,67 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_EXPR_GET_PATH_H
#define OB_EXPR_GET_PATH_H
#include "sql/engine/expr/ob_expr_operator.h"
namespace oceanbase
{
namespace sql
{
struct ObDataAccessPathExtraInfo : public ObIExprExtraInfo
{
OB_UNIS_VERSION(1);
public:
ObDataAccessPathExtraInfo(common::ObIAllocator &alloc, ObExprOperatorType type)
: ObIExprExtraInfo(alloc, type)
{}
virtual ~ObDataAccessPathExtraInfo() {}
virtual int deep_copy(common::ObIAllocator &allocator,
const ObExprOperatorType type,
ObIExprExtraInfo *&copied_info) const override;
TO_STRING_KV(K(type_), K(data_access_path_));
ObString data_access_path_;
};
class ObExprGetPath: public ObFuncExprOperator
{
public:
explicit ObExprGetPath(common::ObIAllocator &alloc)
: ObFuncExprOperator(alloc, T_FUN_SYS_GET_PATH, N_GET_PATH, 2, VALID_FOR_GENERATED_COL, NOT_ROW_DIMENSION) {}
virtual ~ObExprGetPath() {}
virtual int calc_result_type2(ObExprResType &type,
ObExprResType &type1,
ObExprResType &type2,
ObExprTypeCtx &type_ctx) const
{
UNUSED(type1);
UNUSED(type2);
UNUSED(type_ctx);
type.set_varchar();
type.set_collation_type(CS_TYPE_BINARY);
return common::OB_SUCCESS;
}
virtual int cg_expr(ObExprCGCtx &op_cg_ctx,
const ObRawExpr &raw_expr,
ObExpr &rt_expr) const override {
return common::OB_NOT_SUPPORTED;
}
private:
DISALLOW_COPY_AND_ASSIGN(ObExprGetPath);
};
}
}
#endif // OB_EXPR_GET_PATH_H

View File

@ -22,6 +22,7 @@
namespace oceanbase
{
using namespace common;
using namespace share;
namespace sql
{

View File

@ -453,6 +453,7 @@
#include "sql/engine/expr/ob_expr_lock_func.h"
#include "sql/engine/expr/ob_expr_topn_filter.h"
#include "sql/engine/expr/ob_expr_get_path.h"
using namespace oceanbase::common;
namespace oceanbase
@ -1114,6 +1115,7 @@ void ObExprOperatorFactory::register_expr_operators()
REG_OP(ObExprRbAndnotNull2empty);
REG_OP(ObExprRbToString);
REG_OP(ObExprRbFromString);
REG_OP(ObExprGetPath);
}();
// 注册oracle系统函数
REG_OP_ORCL(ObExprSysConnectByPath);
@ -1440,6 +1442,7 @@ void ObExprOperatorFactory::register_expr_operators()
REG_OP_ORCL(ObExprInnerTableSequenceGetter);
// REG_OP_ORCL(ObExprTopNFilter);
REG_OP_ORCL(ObExprSdoRelate);
REG_OP_ORCL(ObExprGetPath);
}
bool ObExprOperatorFactory::is_expr_op_type_valid(ObExprOperatorType type)

View File

@ -23,6 +23,7 @@
namespace oceanbase
{
using namespace common;
using namespace share;
namespace sql
{

View File

@ -206,10 +206,11 @@ ObExpr::EvalVectorFunc VectorCasterUtil::get_vector_cast(const VecValueTypeClass
const ObCastMode cast_mode)
{
ObExpr::EvalVectorFunc ret_func = nullptr;
ObExpr::EvalFunc temp_func = nullptr;
if (is_eval_arg_cast) {
ret_func = CM_IS_EXPLICIT_CAST(cast_mode) ? VECTOR_EVAL_ARG_CAST_FUNCS[in_tc][out_tc][EXPLICIT_CAST_FLAG] :
VECTOR_EVAL_ARG_CAST_FUNCS[in_tc][out_tc][IMPLICIT_CAST_FLAG];
} else if (row_cast_fn == cast_not_expected
} else if (row_cast_fn == (temp_func = cast_not_expected)
|| row_cast_fn == cast_not_support
|| row_cast_fn == cast_inconsistent_types
|| row_cast_fn == cast_inconsistent_types_json

View File

@ -21,6 +21,7 @@
#include "share/external_table/ob_external_table_utils.h"
#include "share/ob_device_manager.h"
#include "lib/utility/ob_macro_utils.h"
#include "sql/engine/table/ob_parquet_table_row_iter.h"
namespace oceanbase
{
@ -49,7 +50,7 @@ void ObExternalDataAccessDriver::close()
}
}
bool ObExternalDataAccessDriver::is_opened()
bool ObExternalDataAccessDriver::is_opened() const
{
return fd_.is_valid();
}
@ -103,13 +104,13 @@ int ObExternalDataAccessDriver::get_file_size(const ObString &url, int64_t &file
return ret;
}
int ObExternalDataAccessDriver::open(const ObString &url)
int ObExternalDataAccessDriver::open(const char *url)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(device_handle_)) {
ret = OB_NOT_INIT;
} else {
ret = device_handle_->open(url.ptr(), -1, 0, fd_, &iod_opts_);
ret = device_handle_->open(url, -1, 0, fd_, &iod_opts_);
}
return ret;
}
@ -347,6 +348,13 @@ int ObExternalTableAccessService::table_scan(
LOG_WARN("alloc memory failed", K(ret));
}
break;
case ObExternalFileFormat::PARQUET_FORMAT:
if (OB_ISNULL(row_iter = OB_NEWx(ObParquetTableRowIterator, (scan_param.allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected format", K(ret), "format", param.external_file_format_.format_type_);
@ -375,6 +383,7 @@ int ObExternalTableAccessService::table_rescan(ObVTableScanParam &param, ObNewRo
} else {
switch (param.external_file_format_.format_type_) {
case ObExternalFileFormat::CSV_FORMAT:
case ObExternalFileFormat::PARQUET_FORMAT:
result->reset();
break;
default:
@ -407,6 +416,12 @@ int ObExternalTableAccessService::revert_scan_iter(ObNewRowIterator *iter)
return ret;
}
int ObExternalTableRowIterator::init(const ObTableScanParam *scan_param)
{
scan_param_ = scan_param;
return init_exprs(scan_param);
}
ObCSVTableRowIterator::~ObCSVTableRowIterator()
{
release_buf();
@ -478,7 +493,17 @@ int ObCSVTableRowIterator::expand_buf()
return ret;
}
int ObCSVTableRowIterator::init_exprs(const storage::ObTableScanParam *scan_param)
int ObExternalTableRowIterator::gen_ip_port(ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
char buf[MAX_IP_PORT_SQL_LENGTH];
int32_t len = 0;
OZ (GCONF.self_addr_.addr_to_buffer(buf, MAX_IP_PORT_SQL_LENGTH, len));
OZ (ob_write_string(allocator, ObString(len, buf), ip_port_));
return ret;
}
int ObExternalTableRowIterator::init_exprs(const storage::ObTableScanParam *scan_param)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(scan_param)) {
@ -523,7 +548,6 @@ int ObCSVTableRowIterator::init(const storage::ObTableScanParam *scan_param)
arena_alloc_.set_attr(lib::ObMemAttr(scan_param->tenant_id_, "CSVRowIter"));
OZ (ObExternalTableRowIterator::init(scan_param));
OZ (parser_.init(scan_param->external_file_format_.csv_format_));
OZ (init_exprs(scan_param));
OZ (data_access_driver_.init(scan_param_->external_file_location_, scan_param->external_file_access_info_));
OZ (expand_buf());
@ -563,37 +587,64 @@ int ObCSVTableRowIterator::get_next_file_and_line_number(const int64_t task_idx,
return ret;
}
int ObCSVTableRowIterator::update_file_partition_list_value(const int64_t part_id)
int ObExternalTableRowIterator::fill_file_partition_expr(ObExpr *expr, ObNewRow &value, const int64_t row_count)
{
int ret = OB_SUCCESS;
if (part_id != state_.part_id_) {
state_.part_id_ = part_id;
share::schema::ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
const ObPartition *partition = NULL;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
scan_param_->tenant_id_,
schema_guard))) {
LOG_WARN("get_schema_guard failed", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(scan_param_->tenant_id_, scan_param_->index_id_, table_schema))) {
LOG_WARN("get table schema failed", K(ret));
} else if (table_schema->is_partitioned_table() && table_schema->is_user_specified_partition_for_external_table()) {
if (OB_FAIL(table_schema->get_partition_by_part_id(part_id, CHECK_PARTITION_MODE_NORMAL, partition))) {
LOG_WARN("get partition failed", K(ret), K(part_id));
} else if (OB_ISNULL(partition) || OB_UNLIKELY(partition->get_list_row_values().count() != 1)
|| partition->get_list_row_values().at(0).get_count() != table_schema->get_partition_key_column_num()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("partition is invalid", K(ret), K(part_id));
} else {
int64_t pos = 0;
int64_t size = partition->get_list_row_values().at(0).get_deep_copy_size();
char *buf = (char *)arena_alloc_.alloc(size);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate mem failed", K(ret));
}
OZ (state_.part_list_val_.deep_copy(partition->get_list_row_values().at(0), buf, size, pos));
ObEvalCtx &eval_ctx = scan_param_->op_->get_eval_ctx();
ObDatum *datums = expr->locate_batch_datums(eval_ctx);
int64_t loc_idx = expr->extra_ - 1;
if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= value.get_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("loc idx is out of range", K(loc_idx), K(value), K(ret));
} else {
if (value.get_cell(loc_idx).is_null()) {
for (int j = 0; OB_SUCC(ret) && j < row_count; j++) {
datums[j].set_null();
}
} else {
for (int j = 0; OB_SUCC(ret) && j < row_count; j++) {
CK (OB_NOT_NULL(datums[j].ptr_));
OZ (datums[j].from_obj(value.get_cell(loc_idx)));
}
}
}
return ret;
}
int ObExternalTableRowIterator::calc_file_partition_list_value(const int64_t part_id, ObIAllocator &allocator, ObNewRow &value)
{
int ret = OB_SUCCESS;
share::schema::ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
const ObPartition *partition = NULL;
if (OB_ISNULL(GCTX.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error");
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
scan_param_->tenant_id_,
schema_guard))) {
LOG_WARN("get_schema_guard failed", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(scan_param_->tenant_id_, scan_param_->index_id_, table_schema))) {
LOG_WARN("get table schema failed", K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("table not exist", K(scan_param_->index_id_), K(scan_param_->tenant_id_));
} else if (table_schema->is_partitioned_table() && table_schema->is_user_specified_partition_for_external_table()) {
if (OB_FAIL(table_schema->get_partition_by_part_id(part_id, CHECK_PARTITION_MODE_NORMAL, partition))) {
LOG_WARN("get partition failed", K(ret), K(part_id));
} else if (OB_ISNULL(partition) || OB_UNLIKELY(partition->get_list_row_values().count() != 1)
|| partition->get_list_row_values().at(0).get_count() != table_schema->get_partition_key_column_num()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("partition is invalid", K(ret), K(part_id));
} else {
int64_t pos = 0;
int64_t size = partition->get_list_row_values().at(0).get_deep_copy_size();
char *buf = (char *)allocator.alloc(size);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate mem failed", K(ret));
}
OZ (value.deep_copy(partition->get_list_row_values().at(0), buf, size, pos));
}
}
return ret;
@ -620,7 +671,10 @@ int ObCSVTableRowIterator::open_next_file()
} else if (part_id == 0) {
//empty file do not belong to any partitions
} else {
OZ (update_file_partition_list_value(part_id));
if (part_id != state_.part_id_) {
state_.part_id_ = part_id;
OZ (calc_file_partition_list_value(part_id, arena_alloc_, state_.part_list_val_));
}
}
if (OB_SUCC(ret)) {
if (start_line == MIN_EXTERNAL_TABLE_LINE_NUMBER && end_line == INT64_MAX) {
@ -655,7 +709,7 @@ int ObCSVTableRowIterator::open_next_file()
}
LOG_DEBUG("try next file", K(ret), K(url_), K(file_url), K(state_));
} while (OB_SUCC(ret) && 0 >= state_.file_size_); //skip empty file
OZ (data_access_driver_.open(url_.string()), url_);
OZ (data_access_driver_.open(url_.ptr()), url_);
LOG_DEBUG("open external file", K(ret), K(url_), K(state_.file_size_), K(location));
@ -987,7 +1041,7 @@ int ObCSVTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
OZ (column_convert_expr->eval_batch(eval_ctx, *bit_vector_cache_, returned_row_cnt));
if (OB_SUCC(ret)) {
MEMCPY(column_expr->locate_batch_datums(eval_ctx),
column_convert_expr->locate_batch_datums(eval_ctx), sizeof(ObDatum) * returned_row_cnt);
column_convert_expr->locate_batch_datums(eval_ctx), sizeof(ObDatum) * returned_row_cnt);
column_expr->set_evaluated_flag(eval_ctx);
}
}
@ -1006,6 +1060,5 @@ void ObCSVTableRowIterator::reset()
}
}

View File

@ -22,7 +22,6 @@
#include "common/storage/ob_io_device.h"
#include "share/backup/ob_backup_struct.h"
namespace oceanbase
{
namespace common
@ -38,8 +37,8 @@ public:
ObExternalDataAccessDriver() : storage_type_(common::OB_STORAGE_MAX_TYPE), device_handle_(nullptr) {}
~ObExternalDataAccessDriver();
int init(const common::ObString &location, const ObString &access_info);
int open(const common::ObString &url);
bool is_opened();
int open(const char *url);
bool is_opened() const;
int get_file_size(const common::ObString &url, int64_t &file_size);
int get_file_sizes(const ObString &location, const ObIArray<ObString> &urls, ObIArray<int64_t> &file_sizes);
@ -66,13 +65,24 @@ private:
class ObExternalTableRowIterator : public common::ObNewRowIterator {
public:
ObExternalTableRowIterator() : scan_param_(nullptr) {}
virtual int init(const storage::ObTableScanParam *scan_param) {
scan_param_ = scan_param;
return common::OB_SUCCESS;
}
ObExternalTableRowIterator() :
scan_param_(nullptr), line_number_expr_(NULL), file_id_expr_(NULL), file_name_expr_(NULL)
{}
virtual int init(const storage::ObTableScanParam *scan_param);
protected:
int init_exprs(const storage::ObTableScanParam *scan_param);
int gen_ip_port(common::ObIAllocator &allocator);
int calc_file_partition_list_value(const int64_t part_id, common::ObIAllocator &allocator, common::ObNewRow &value);
int fill_file_partition_expr(ObExpr *expr, common::ObNewRow &value, const int64_t row_count);
protected:
const storage::ObTableScanParam *scan_param_;
//external table column exprs
common::ObSEArray<ObExpr*, 16> column_exprs_;
//hidden columns
ObExpr *line_number_expr_;
ObExpr *file_id_expr_;
ObExpr *file_name_expr_;
common::ObString ip_port_;
};
class ObExternalTableAccessService : public common::ObITabletScan
@ -144,13 +154,11 @@ public:
K(cur_file_name_), K(cur_file_id_), K(cur_line_number_), K(line_count_limit_), K_(part_id), K_(ip_port_len), K_(file_with_url));
};
ObCSVTableRowIterator() : bit_vector_cache_(NULL), line_number_expr_(NULL), file_id_expr_(NULL),
file_name_expr_(NULL) {}
ObCSVTableRowIterator() : bit_vector_cache_(NULL) {}
virtual ~ObCSVTableRowIterator();
virtual int init(const storage::ObTableScanParam *scan_param) override;
int get_next_row() override;
int get_next_rows(int64_t &count, int64_t capacity) override;
int update_file_partition_list_value(const int64_t part_id);
virtual int get_next_row(ObNewRow *&row) override {
UNUSED(row);
@ -172,7 +180,6 @@ private:
int skip_lines();
void release_buf();
void dump_error_log(common::ObIArray<ObCSVGeneralParser::LineErrRec> &error_msgs);
int init_exprs(const storage::ObTableScanParam *scan_param);
private:
ObBitVector *bit_vector_cache_;
StateValues state_;
@ -181,15 +188,11 @@ private:
ObCSVGeneralParser parser_;
ObExternalDataAccessDriver data_access_driver_;
ObSqlString url_;
ObSEArray<ObExpr*, 16> column_exprs_;
ObExpr *line_number_expr_;
ObExpr *file_id_expr_;
ObExpr *file_name_expr_;
};
}
}
#endif // OB_EXTERNAL_TABLE_ACCESS_SERVICE_H_

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,236 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_PARQUET_TABLE_ROW_ITER_H
#define OB_PARQUET_TABLE_ROW_ITER_H
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include "share/ob_i_tablet_scan.h"
#include "lib/file/ob_file.h"
#include "common/row/ob_row_iterator.h"
#include "storage/access/ob_dml_param.h"
#include "common/storage/ob_io_device.h"
#include "share/backup/ob_backup_struct.h"
#include "sql/engine/table/ob_external_table_access_service.h"
namespace oceanbase {
namespace sql {
class ObArrowMemPool : public ::arrow::MemoryPool
{
public:
ObArrowMemPool() : total_alloc_size_(0) {}
void init(uint64_t tenant_id);
virtual arrow::Status Allocate(int64_t size, uint8_t** out) override;
virtual arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
virtual void Free(uint8_t* buffer, int64_t size) override;
virtual void ReleaseUnused() override;
virtual int64_t bytes_allocated() const override;
virtual int64_t max_memory() const override { return -1; }
virtual std::string backend_name() const override { return "Arrow"; }
private:
common::ObArenaAllocator alloc_;
common::ObMemAttr mem_attr_;
arrow::internal::MemoryPoolStats stats_;
int64_t total_alloc_size_;
};
class ObArrowFile : public arrow::io::RandomAccessFile {
public:
ObArrowFile(ObExternalDataAccessDriver &file_reader, const char*file_name, arrow::MemoryPool *pool)
: file_reader_(file_reader), file_name_(file_name), pool_(pool)
{}
~ObArrowFile() override {
file_reader_.close();
}
int open();
virtual arrow::Status Close() override;
virtual bool closed() const override;
virtual arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
virtual arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
virtual arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
virtual arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
virtual arrow::Status Seek(int64_t position) override;
virtual arrow::Result<int64_t> Tell() const override;
virtual arrow::Result<int64_t> GetSize() override;
private:
ObExternalDataAccessDriver &file_reader_;
const char* file_name_;
arrow::MemoryPool *pool_;
int64_t position_;
};
class ObParquetTableRowIterator : public ObExternalTableRowIterator {
public:
struct StateValues {
StateValues() :
file_idx_(0),
part_id_(0),
row_group_idx_(0),
cur_file_id_(0),
cur_row_group_idx_(0),
end_row_group_idx_(-1),
cur_row_group_read_row_count_(0),
cur_row_group_row_count_(0),
cur_line_number_(0) {}
void reuse() {
file_idx_ = 0;
part_id_ = 0;
row_group_idx_ = 0;
cur_file_id_ = 0;
cur_row_group_idx_ = 0;
end_row_group_idx_ = -1;
cur_row_group_read_row_count_ = 0;
cur_row_group_row_count_ = 0;
cur_line_number_ = 0;
cur_file_url_.reset();
part_list_val_.reset();
}
int64_t file_idx_;
int64_t part_id_;
int64_t row_group_idx_;
int64_t cur_file_id_;
int64_t cur_row_group_idx_;
int64_t end_row_group_idx_;
int64_t cur_row_group_read_row_count_;
int64_t cur_row_group_row_count_;
int64_t cur_line_number_;
ObString cur_file_url_;
ObNewRow part_list_val_;
};
public:
ObParquetTableRowIterator() :
read_props_(&arrow_alloc_),
file_column_exprs_(allocator_),
file_meta_column_exprs_(allocator_),
bit_vector_cache_(NULL) {}
virtual ~ObParquetTableRowIterator() {}
int init(const storage::ObTableScanParam *scan_param) override;
int get_next_row() override;
int get_next_rows(int64_t &count, int64_t capacity) override;
virtual int get_next_row(ObNewRow *&row) override {
UNUSED(row);
return common::OB_ERR_UNEXPECTED;
}
virtual void reset() override;
private:
// load vec data from parquet file to expr mem
struct DataLoader {
DataLoader(ObEvalCtx &eval_ctx,
ObExpr *file_col_expr,
parquet::ColumnReader *reader,
common::ObIArrayWrap<int16_t> &def_levels_buf,
common::ObIArrayWrap<int16_t> &rep_levels_buf,
const int64_t batch_size,
int64_t &row_count):
eval_ctx_(eval_ctx),
file_col_expr_(file_col_expr),
reader_(reader),
batch_size_(batch_size),
row_count_(row_count),
def_levels_buf_(def_levels_buf),
rep_levels_buf_(rep_levels_buf)
{}
typedef int (DataLoader::*LOAD_FUNC)();
static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type,
const parquet::ColumnDescriptor *col_desc);
int16_t get_max_def_level();
int load_data_for_col(LOAD_FUNC &func);
int load_int64_to_int64_vec();
int load_int32_to_int64_vec();
int load_int32_to_int32_vec();
int load_string_col();
int load_fixed_string_col();
int load_decimal_any_col();
//[TODO EXTERNAL TABLE] float16
int load_date_col_to_datetime();
int load_time_millis_col();
int load_time_nanos_col();
int load_timestamp_millis_col();
int load_timestamp_micros_col();
int load_timestamp_nanos_col();
int load_timestamp_hive();
int load_float();
int load_double();
int to_numeric(const int64_t idx, const int64_t int_value);
int to_numeric(const int64_t idx, const char *str, const int32_t length);
int to_numeric_hive(const int64_t idx, const char *str, const int32_t length, char *buf, const int64_t data_len);
int64_t calc_tz_adjust_us();
bool check_char_len(const char *ptr, int32_t len);
static bool is_ob_type_store_utc(const ObDatumMeta &meta);
static bool is_parquet_store_utc(const parquet::LogicalType *logtype);
ObEvalCtx &eval_ctx_;
ObExpr *file_col_expr_;
parquet::ColumnReader *reader_;
const int64_t batch_size_;
int64_t &row_count_;
common::ObIArrayWrap<int16_t> &def_levels_buf_;
common::ObIArrayWrap<int16_t> &rep_levels_buf_;
};
private:
int next_file();
int next_row_group();
int calc_exprs_for_rowid(const int64_t read_count);
int calc_pseudo_exprs(const int64_t read_count);
private:
StateValues state_;
lib::ObMemAttr mem_attr_;
ObArenaAllocator allocator_;
ObArrowMemPool arrow_alloc_;
parquet::ReaderProperties read_props_;
ObExternalDataAccessDriver data_access_driver_;
std::unique_ptr<parquet::ParquetFileReader> file_reader_;
std::shared_ptr<parquet::FileMetaData> file_meta_;
ExprFixedArray file_column_exprs_; //column value from parquet file
ExprFixedArray file_meta_column_exprs_; //column value from file meta
common::ObArrayWrap<int> column_indexs_;
common::ObArrayWrap<std::shared_ptr<parquet::ColumnReader>> column_readers_;
common::ObArrayWrap<DataLoader::LOAD_FUNC> load_funcs_;
ObSqlString url_;
ObBitVector *bit_vector_cache_;
common::ObArrayWrap<int16_t> def_levels_buf_;
common::ObArrayWrap<int16_t> rep_levels_buf_;
common::ObArrayWrap<char *> file_url_ptrs_; //for file url expr
common::ObArrayWrap<ObLength> file_url_lens_; //for file url expr
};
}
}
#endif // OB_PARQUET_TABLE_ROW_ITER_H

View File

@ -4033,6 +4033,7 @@ int ObRawExprPrinter::print(ObPseudoColumnRawExpr *expr)
}
case T_PSEUDO_PARTITION_LIST_COL:
case T_PSEUDO_EXTERNAL_FILE_URL:
case T_PSEUDO_EXTERNAL_FILE_ROW:
case T_PSEUDO_EXTERNAL_FILE_COL: {
if (!expr->get_table_name().empty()) {
PRINT_IDENT(expr->get_table_name());

View File

@ -5468,10 +5468,9 @@ int ObAlterTableResolver::resolve_alter_table_column_definition(AlterColumnSchem
} else if (OB_FAIL(tmp_table_schema.assign(*table_schema_))) {
LOG_WARN("failed to assign a table schema", K(ret));
} else if (OB_FAIL(resolve_column_definition(column, node, stat,
is_modify_column_visibility, pk_name,
is_modify_column_visibility, pk_name, *table_schema_,
is_oracle_temp_table,
false,
false,
allow_has_default))) {
SQL_RESV_LOG(WARN, "resolve column definition failed", K(ret));
} else if (is_mysql_mode()){ // add column level constraint

View File

@ -598,6 +598,13 @@ int ObCreateTableResolver::resolve(const ParseNode &parse_tree)
// do nothing
}
if (OB_SUCC(ret) && is_external_table_) {
//before resolve table elements
if (OB_FAIL(resolve_external_table_format_early(create_table_node->children_[4]))) {
LOG_WARN("fail to resolve external file format", K(ret));
}
}
// 1、 resolve table_id first for check whether is inner_table
if (OB_SUCC(ret) && OB_FAIL(resolve_table_id_pre(create_table_node->children_[4]))) {
SQL_RESV_LOG(WARN, "resolve_table_id_pre failed", K(ret));
@ -1445,9 +1452,9 @@ int ObCreateTableResolver::resolve_table_elements(const ParseNode *node,
if (OB_FAIL(resolve_column_definition(column, element, stat,
is_modify_column_visibility,
pk_name,
table_schema,
is_oracle_temp_table_,
is_create_table_as,
table_schema.is_external_table()))) {
is_create_table_as))) {
SQL_RESV_LOG(WARN, "resolve column definition failed", K(ret));
} else if (!column.is_udt_related_column(lib::is_oracle_mode()) && // udt column will check after hidden column generated
OB_FAIL(check_default_value(column.get_cur_default_value(),
@ -3103,6 +3110,43 @@ int ObCreateTableResolver::resolve_index_name(
return ret;
}
int ObCreateTableResolver::resolve_external_table_format_early(const ParseNode *node)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(node)) {
if (T_TABLE_OPTION_LIST != node->type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument.", K(ret));
} else {
ParseNode *option_node = NULL;
int32_t num = node->num_child_;
for (int32_t i = 0; OB_SUCC(ret) && i < num; ++i) {
option_node = node->children_[i];
if (OB_NOT_NULL(option_node) && T_EXTERNAL_FILE_FORMAT == option_node->type_) {
ObExternalFileFormat format;
for (int32_t j = 0; OB_SUCC(ret) && j < option_node->num_child_; ++j) {
if (OB_NOT_NULL(option_node->children_[j])
&& T_EXTERNAL_FILE_FORMAT_TYPE == option_node->children_[j]->type_) {
if (OB_FAIL(resolve_file_format(option_node->children_[j], format))) {
LOG_WARN("fail to resolve file format", K(ret));
} else {
external_table_format_type_ = format.format_type_;
}
}
}
}
}
}
}
if (OB_SUCC(ret) && external_table_format_type_ >= ObExternalFileFormat::PARQUET_FORMAT) {
uint64_t data_version = 0;
CK (OB_NOT_NULL(session_info_));
OZ (GET_MIN_DATA_VERSION(session_info_->get_effective_tenant_id(), data_version));
OV (DATA_VERSION_4_3_2_0 <= data_version, OB_NOT_SUPPORTED, data_version);
}
return ret;
}
int ObCreateTableResolver::resolve_table_charset_info(const ParseNode *node) {
int ret = OB_SUCCESS;
if (NULL != node) {

View File

@ -90,6 +90,7 @@ private:
int set_temp_table_info(share::schema::ObTableSchema &table_schema, ParseNode *commit_option_node);
int resolve_table_charset_info(const ParseNode *node);
int resolve_external_table_format_early(const ParseNode *node);
//index
int add_sort_column(const obrpc::ObColumnSortItem &sort_column);
int generate_index_arg();

View File

@ -118,7 +118,8 @@ ObDDLResolver::ObDDLResolver(ObResolverParams &params)
have_generate_fts_arg_(false),
is_set_lob_inrow_threshold_(false),
lob_inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD),
auto_increment_cache_size_(0)
auto_increment_cache_size_(0),
external_table_format_type_(ObExternalFileFormat::INVALID_FORMAT)
{
table_mode_.reset();
}
@ -2661,9 +2662,13 @@ int ObDDLResolver::resolve_file_format(const ParseNode *node, ObExternalFileForm
switch (node->type_) {
case T_EXTERNAL_FILE_FORMAT_TYPE: {
ObString string_v = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim_space_only();
if (0 == string_v.case_compare("CSV")) {
format.format_type_ = ObExternalFileFormat::CSV_FORMAT;
} else {
for (int i = 0; i < ObExternalFileFormat::MAX_FORMAT; i++) {
if (0 == string_v.case_compare(ObExternalFileFormat::FORMAT_TYPE_STR[i])) {
format.format_type_ = static_cast<ObExternalFileFormat::FormatType>(i);
break;
}
}
if (ObExternalFileFormat::INVALID_FORMAT == format.format_type_) {
ObSqlString err_msg;
err_msg.append_fmt("format '%.*s'", string_v.length(), string_v.ptr());
ret = OB_NOT_SUPPORTED;
@ -3021,12 +3026,13 @@ int ObDDLResolver::resolve_column_definition(ObColumnSchemaV2 &column,
ObColumnResolveStat &resolve_stat,
bool &is_modify_column_visibility,
common::ObString &pk_name,
const ObTableSchema &table_schema,
const bool is_oracle_temp_table,
const bool is_create_table_as,
const bool is_external_table,
const bool allow_has_default)
{
int ret = OB_SUCCESS;
bool is_external_table = table_schema.is_external_table();
bool is_modify_column = stmt::T_ALTER_TABLE == stmt_->get_stmt_type()
&& OB_DDL_MODIFY_COLUMN == (static_cast<AlterColumnSchema &>(column)).alter_type_;
ParseNode *column_definition_ref_node = NULL;
@ -3331,15 +3337,13 @@ int ObDDLResolver::resolve_column_definition(ObColumnSchemaV2 &column,
}
} else if (is_external_table) {
//mock generated column
uint64_t file_column_idx = column.get_column_id() - OB_APP_MIN_COLUMN_ID + 1;
ObSqlString temp_str;
ObExternalFileFormat format;
format.format_type_ = external_table_format_type_;
ObString mock_gen_column_str;
ObObj default_value;
if (OB_FAIL(temp_str.append_fmt("%s%lu", N_EXTERNAL_FILE_COLUMN_PREFIX, file_column_idx))) {
LOG_WARN("fail to append sql str", K(ret));
} else if (OB_FAIL(ob_write_string(*allocator_, temp_str.string(), mock_gen_column_str))) {
LOG_WARN("fail to write string", K(ret));
if (OB_FAIL(format.mock_gen_column_def(column, *allocator_, mock_gen_column_str))) {
LOG_WARN("fail to mock gen column def", K(ret));
} else {
ObObj default_value;
default_value.set_varchar(mock_gen_column_str);
default_value.set_collation_type(ObCharset::get_system_collation());
if (OB_FAIL(column.set_cur_default_value(default_value))) {

View File

@ -570,9 +570,9 @@ protected:
ObColumnResolveStat &reslove_stat,
bool &is_modify_column_visibility,
common::ObString &pk_name,
const ObTableSchema &table_schema,
const bool is_oracle_temp_table = false,
const bool is_create_table_as = false,
const bool is_external_table = false,
const bool allow_has_default = true);
int resolve_file_prefix(ObString &url, ObSqlString &prefix_str, common::ObStorageType &device_type);
int resolve_uk_name_from_column_attribute(
@ -1030,6 +1030,7 @@ protected:
bool is_set_lob_inrow_threshold_;
int64_t lob_inrow_threshold_;
int64_t auto_increment_cache_size_;
ObExternalFileFormat::FormatType external_table_format_type_;
private:
template <typename STMT>
DISALLOW_COPY_AND_ASSIGN(ObDDLResolver);

View File

@ -8143,6 +8143,133 @@ int ObDMLResolver::add_additional_function_according_to_type(const ColumnItem *c
return ret;
}
int search_parquet_expr(ObRawExpr *root, ObRawExpr *file_row_expr, ObRawExpr *&pattern_expr) {
int ret = OB_SUCCESS;
ObRawExpr *get_path_expr = NULL;
pattern_expr = NULL;
if (OB_ISNULL(root) || OB_ISNULL(file_row_expr)) {
ret = OB_ERR_UNEXPECTED;
} else if (T_FUN_SYS_CAST == root->get_expr_type()) {
if (root->get_param_count() <= 0 || OB_ISNULL(get_path_expr = root->get_param_expr(0))) {
ret = OB_ERR_UNEXPECTED;
} else {
if (T_FUN_SYS_GET_PATH == get_path_expr->get_expr_type()) {
if (get_path_expr->get_param_count() > 0 && get_path_expr->get_param_expr(0) == file_row_expr) {
pattern_expr = root;
}
}
}
}
for (int i = 0; OB_SUCC(ret) && NULL == pattern_expr && i < root->get_param_count(); i++) {
if (OB_FAIL(SMART_CALL(search_parquet_expr(root->get_param_expr(i), file_row_expr, pattern_expr)))) {
LOG_WARN("fail to search parquet column expr", K(ret));
}
}
return ret;
}
int ObDMLResolver::resolve_external_table_generated_column(
ObQualifiedName &col,
const TableItem &table_item,
const ObTableSchema *table_schema,
const ObColumnSchemaV2 *column_schema,
ObRawExpr *&real_ref_expr,
ObRawExpr *&ref_expr)
{
int ret = OB_SUCCESS;
uint64_t file_column_idx = UINT64_MAX;
if (OB_ISNULL(table_schema) || OB_ISNULL(column_schema) || OB_ISNULL(ref_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected arg", KP(table_schema), KP(column_schema), KP(ref_expr));
} else if (0 == col.col_name_.case_compare(N_EXTERNAL_FILE_URL)) {
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_file_url(
*params_.expr_factory_, *params_.session_info_,
table_item.table_id_, table_item.table_name_,
col.col_name_, real_ref_expr))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
} else if (col.col_name_.prefix_match_ci(N_PARTITION_LIST_COL)) {
if (OB_FAIL(ObResolverUtils::calc_file_column_idx(col.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (real_ref_expr = ObResolverUtils::find_file_column_expr(
pseudo_external_file_col_exprs_, table_item.table_id_, file_column_idx, col.col_name_))) {
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_partition_list_col(
*params_.expr_factory_, *params_.session_info_,
table_item.table_id_, table_item.table_name_,
col.col_name_, file_column_idx, real_ref_expr,
column_schema))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
}
} else {
ObExternalFileFormat format;
if (OB_FAIL(format.load_from_string(table_schema->get_external_file_format(), *params_.allocator_))) {
LOG_WARN("load from string failed", K(ret));
} else if (format.format_type_ != ObResolverUtils::resolve_external_file_column_type(col.col_name_)) {
ret = OB_WRONG_COLUMN_NAME;
LOG_USER_ERROR(OB_WRONG_COLUMN_NAME, col.col_name_.length(), col.col_name_.ptr());
} else if (ObExternalFileFormat::CSV_FORMAT == format.format_type_) {
if (OB_FAIL(ObResolverUtils::calc_file_column_idx(col.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (real_ref_expr = ObResolverUtils::find_file_column_expr(
pseudo_external_file_col_exprs_, table_item.table_id_, file_column_idx, col.col_name_))) {
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_csv(
*params_.expr_factory_, *params_.session_info_,
table_item.table_id_, table_item.table_name_,
col.col_name_, file_column_idx,
real_ref_expr, format))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
}
} else if (ObExternalFileFormat::PARQUET_FORMAT == format.format_type_) {
ObRawExpr *cast_expr = NULL;
ObRawExpr *get_path_expr = NULL;
ObRawExpr *cast_type_expr = NULL;
if (T_FUN_SYS_GET_PATH == ref_expr->get_expr_type()) {
// GET_PATH(N_EXTERNAL_FILE_ROW, 'xxx')
if (ref_expr->get_param_count() > 0 && ref_expr->get_param_expr(0) == col.ref_expr_) {
get_path_expr = ref_expr;
cast_type_expr = NULL; //using column type as result type
}
} else {
// search pattern: cast(GET_PATH(N_EXTERNAL_FILE_ROW, 'xxx') as xxx)
if (OB_FAIL(search_parquet_expr(ref_expr, col.ref_expr_, cast_expr))) {
LOG_WARN("fail to serach parquet path expr", K(ret));
} else if (OB_NOT_NULL(cast_expr)) {
if (cast_expr->get_param_count() != 2
|| OB_ISNULL(get_path_expr = cast_expr->get_param_expr(0))
|| OB_ISNULL(cast_type_expr = cast_expr->get_param_expr(1))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected cast expr", K(ret));
}
}
}
if (OB_SUCC(ret)) {
ObRawExpr *pattern_expr = OB_NOT_NULL(cast_expr) ? cast_expr : get_path_expr;
if (OB_ISNULL(pattern_expr)) {
ret = OB_ERR_UNSUPPORTED_ACTION_ON_GENERATED_COLUMN;
LOG_WARN("invalid generated column define for external table", K(ret));
} else if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_parquet(
*params_.expr_factory_, *params_.session_info_,
table_item.table_id_, table_item.table_name_,
col.col_name_, get_path_expr,
cast_expr, column_schema, real_ref_expr))) {
LOG_WARN("fail to build file column expr", K(ret));
} else if (OB_FAIL(ObRawExprUtils::replace_ref_column(ref_expr, pattern_expr, real_ref_expr))) {
LOG_WARN("replace column reference expr failed", K(ret));
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(pseudo_external_file_col_exprs_.push_back(real_ref_expr))) {
LOG_WARN("fail to push back to array", K(ret));
}
}
LOG_TRACE("add external file column", KPC(real_ref_expr), K(col.col_name_));
return ret;
}
int ObDMLResolver::resolve_generated_column_expr(const ObString &expr_str,
const TableItem &table_item, const ObColumnSchemaV2 *column_schema,
const ObColumnRefRawExpr &column, ObRawExpr *&ref_expr,
@ -8265,28 +8392,11 @@ int ObDMLResolver::resolve_generated_column_expr(const ObString &expr_str,
}
}
} else if (table_schema->is_external_table()
&& ObResolverUtils::is_external_file_column_name(columns.at(i).col_name_)) {
uint64_t file_column_idx = UINT64_MAX;
if (OB_ISNULL(stmt)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (OB_FAIL(ObResolverUtils::calc_file_column_idx(columns.at(i).col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (real_ref_expr = ObResolverUtils::find_file_column_expr(
pseudo_external_file_col_exprs_, table_item.table_id_, file_column_idx, columns.at(i).col_name_))) {
ObExternalFileFormat format;
if (OB_FAIL(format.load_from_string(table_schema->get_external_file_format(), *params_.allocator_))) {
LOG_WARN("load from string failed", K(ret));
} else if (OB_FAIL(ObResolverUtils::build_file_column_expr(*params_.expr_factory_, *params_.session_info_,
table_item.table_id_, table_item.alias_name_,
columns.at(i).col_name_, file_column_idx, real_ref_expr,
format.csv_format_.cs_type_, column_schema))) {
LOG_WARN("fail to build external table file column expr", K(ret));
} else if (OB_FAIL(pseudo_external_file_col_exprs_.push_back(real_ref_expr))) {
LOG_WARN("fail to push back to array", K(ret));
}
&& ObResolverUtils::is_external_pseudo_column_name(columns.at(i).col_name_)) {
if (OB_FAIL(resolve_external_table_generated_column(columns.at(i), table_item, table_schema,
column_schema, real_ref_expr, ref_expr))) {
LOG_WARN("fail to resolve external table generated column", K(ret));
}
LOG_TRACE("add external file column", KPC(real_ref_expr), K(columns.at(i).col_name_), K(table_item));
} else {
if (OB_FAIL(resolve_basic_column_item(table_item, columns.at(i).col_name_,
include_hidden, col_item, stmt))) {
@ -13198,10 +13308,10 @@ int ObDMLResolver::resolve_pseudo_column(
} else if (NULL != (real_ref_expr = ObResolverUtils::find_file_column_expr(
pseudo_external_file_col_exprs_, table_item->table_id_, UINT64_MAX, q_name.col_name_))) {
LOG_TRACE("find file name pseudo column", K(*real_ref_expr));
} else if (OB_FAIL(ObResolverUtils::build_file_column_expr(*params_.expr_factory_, *params_.session_info_,
table_item->table_id_, table_item->alias_name_,
q_name.col_name_, UINT64_MAX, real_ref_expr,
CHARSET_UTF8MB4))) {
} else if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_file_url(
*params_.expr_factory_, *params_.session_info_,
table_item->table_id_, table_item->alias_name_,
q_name.col_name_, real_ref_expr))) {
LOG_WARN("fail to build external table file column expr", K(ret));
} else if (OB_FAIL(pseudo_external_file_col_exprs_.push_back(real_ref_expr))) {
LOG_WARN("fail to push back to array", K(ret));

View File

@ -213,6 +213,13 @@ public:
int fill_same_column_to_using(JoinedTable* &joined_table);
int get_columns_from_table_item(const TableItem *table_item, common::ObIArray<common::ObString> &column_names);
int resolve_external_table_generated_column(
ObQualifiedName &col,
const TableItem &table_item,
const share::schema::ObTableSchema *table_schema,
const share::schema::ObColumnSchemaV2 *column_schema,
ObRawExpr *&real_ref_expr,
ObRawExpr *&ref_expr);
int resolve_using_columns(const ParseNode &using_node, common::ObIArray<common::ObString> &column_names);
int transfer_using_to_on_expr(JoinedTable *&joined_table);
int resolve_table_column_expr(const ObQualifiedName &q_name, ObRawExpr *&real_ref_expr);

View File

@ -6179,6 +6179,7 @@ int ObPseudoColumnRawExpr::assign(const ObRawExpr &other)
cte_cycle_default_value_ = tmp.cte_cycle_default_value_;
table_id_ = tmp.table_id_;
table_name_ = tmp.table_name_;
data_access_path_ = tmp.data_access_path_;
}
}
return ret;
@ -6199,7 +6200,8 @@ bool ObPseudoColumnRawExpr::inner_same_as(const ObRawExpr &expr,
{
UNUSED(check_context);
return type_ == expr.get_expr_type() &&
table_id_ == static_cast<const ObPseudoColumnRawExpr&>(expr).get_table_id();
table_id_ == static_cast<const ObPseudoColumnRawExpr&>(expr).get_table_id() &&
0 == data_access_path_.compare(static_cast<const ObPseudoColumnRawExpr&>(expr).get_data_access_path());
}
int ObPseudoColumnRawExpr::do_visit(ObRawExprVisitor &visitor)
@ -6259,6 +6261,7 @@ int ObPseudoColumnRawExpr::get_name_internal(char *buf, const int64_t buf_len, i
case T_PSEUDO_EXTERNAL_FILE_URL:
case T_PSEUDO_PARTITION_LIST_COL:
case T_PSEUDO_EXTERNAL_FILE_COL:
case T_PSEUDO_EXTERNAL_FILE_ROW:
if (!table_name_.empty() && OB_FAIL(BUF_PRINTF("%.*s.", table_name_.length(), table_name_.ptr()))) {
LOG_WARN("failed to print table name", K(ret));
} else if (OB_FAIL(databuff_print_obj(buf, buf_len, pos, expr_name_))) {

View File

@ -4741,18 +4741,22 @@ public:
uint64_t get_table_id() const { return table_id_; }
void set_table_name(const common::ObString &table_name) { table_name_ = table_name; }
const common::ObString & get_table_name() const { return table_name_; }
void set_data_access_path(const common::ObString &data_access_path) { data_access_path_ = data_access_path; }
const common::ObString & get_data_access_path() const { return data_access_path_; }
VIRTUAL_TO_STRING_KV(N_ITEM_TYPE, type_,
N_RESULT_TYPE, result_type_,
N_EXPR_INFO, info_,
N_REL_ID, rel_ids_,
N_TABLE_ID, table_id_,
N_TABLE_NAME, table_name_);
N_TABLE_NAME, table_name_,
K_(data_access_path));
private:
ObRawExpr *cte_cycle_value_;
ObRawExpr *cte_cycle_default_value_;
uint64_t table_id_;
common::ObString table_name_;
common::ObString data_access_path_; //for external table column
DISALLOW_COPY_AND_ASSIGN(ObPseudoColumnRawExpr);
};

View File

@ -2460,7 +2460,7 @@ int ObRawExprUtils::build_generated_column_expr(ObRawExprFactory &expr_factory,
&& true == need_check_simple_column
&& T_REF_COLUMN == expr->get_expr_type()
&& !(columns.count() == 1
&& ObResolverUtils::is_external_file_column_name(columns.at(0).col_name_))) {
&& ObResolverUtils::is_external_pseudo_column_name(columns.at(0).col_name_))) {
ret = OB_ERR_INVALID_COLUMN_EXPRESSION;
LOG_WARN("simple column is not allowed in Oracle mode", K(ret), K(*expr));
}

View File

@ -43,6 +43,7 @@
#include "sql/engine/expr/ob_expr_unistr.h"
#include "sql/resolver/dml/ob_inlist_resolver.h"
#include "lib/charset/ob_ctype.h"
#include "sql/engine/expr/ob_expr_cast.h"
namespace oceanbase
{
@ -4812,24 +4813,54 @@ int ObResolverUtils::resolve_external_table_column_def(ObRawExprFactory &expr_fa
int ret = OB_SUCCESS;
ObRawExpr *file_column_expr = nullptr;
uint64_t file_column_idx = UINT64_MAX;
if (!ObResolverUtils::is_external_file_column_name(q_name.col_name_)) {
if (!ObResolverUtils::is_external_pseudo_column_name(q_name.col_name_)) {
ret = OB_ERR_BAD_FIELD_ERROR;
ObString scope_name = "external file column";
LOG_USER_ERROR(OB_ERR_BAD_FIELD_ERROR, q_name.col_name_.length(), q_name.col_name_.ptr(),
scope_name.length(), scope_name.ptr());
} else if (OB_FAIL(ObResolverUtils::calc_file_column_idx(q_name.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (file_column_expr = ObResolverUtils::find_file_column_expr(
real_exprs, OB_INVALID_ID, file_column_idx, q_name.col_name_))) {
ObString table_name;
if (OB_FAIL(ObResolverUtils::build_file_column_expr(expr_factory, session_info, OB_INVALID_ID,
table_name, q_name.col_name_,
file_column_idx, file_column_expr, CHARSET_UTF8MB4, gen_col_schema))) {
LOG_WARN("fail to build external table file column expr", K(ret));
} else if (OB_FAIL(real_exprs.push_back(file_column_expr))) {
LOG_WARN("fail to push back expr", K(ret));
} else {
if (0 == q_name.col_name_.case_compare(N_EXTERNAL_FILE_URL)) {
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_file_url(expr_factory, session_info,
OB_INVALID_ID, ObString(), q_name.col_name_, file_column_expr))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
} else if (q_name.col_name_.prefix_match_ci(N_PARTITION_LIST_COL)) {
if (OB_FAIL(ObResolverUtils::calc_file_column_idx(q_name.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (file_column_expr = ObResolverUtils::find_file_column_expr(
real_exprs, OB_INVALID_ID, file_column_idx, q_name.col_name_))) {
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_partition_list_col(expr_factory,
session_info, OB_INVALID_ID, ObString(),
q_name.col_name_, file_column_idx, file_column_expr, gen_col_schema))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
}
} else if (ObExternalFileFormat::CSV_FORMAT == ObResolverUtils::resolve_external_file_column_type(q_name.col_name_)) {
if (OB_FAIL(ObResolverUtils::calc_file_column_idx(q_name.col_name_, file_column_idx))) {
LOG_WARN("fail to calc file column idx", K(ret));
} else if (nullptr == (file_column_expr = ObResolverUtils::find_file_column_expr(
real_exprs, OB_INVALID_ID, file_column_idx, q_name.col_name_))) {
ObExternalFileFormat temp_format;
temp_format.csv_format_.init_format(ObDataInFileStruct(), 0, CS_TYPE_UTF8MB4_BIN);
if (OB_FAIL(ObResolverUtils::build_file_column_expr_for_csv(expr_factory, session_info,
OB_INVALID_ID, ObString(), q_name.col_name_, file_column_idx, file_column_expr, temp_format))) {
LOG_WARN("fail to build external table file column expr", K(ret));
}
}
} else {
if (OB_FAIL(ObResolverUtils::build_file_row_expr_for_parquet(expr_factory, session_info,
OB_INVALID_ID, ObString(),
q_name.col_name_, file_column_expr))) {
LOG_WARN("fail to build file column expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(real_exprs.push_back(file_column_expr))) {
LOG_WARN("fail to push back expr", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObTransformUtils::replace_expr(q_name.ref_expr_, file_column_expr, expr))) {
LOG_WARN("fail replace expr", K(ret));
@ -4839,43 +4870,183 @@ int ObResolverUtils::resolve_external_table_column_def(ObRawExprFactory &expr_fa
return ret;
}
bool ObResolverUtils::is_external_file_column_name(const ObString &name)
bool ObResolverUtils::is_external_pseudo_column_name(const ObString &name)
{
return name.prefix_match_ci(N_EXTERNAL_FILE_COLUMN_PREFIX)
return is_external_file_column_name(name)
|| 0 == name.case_compare(N_EXTERNAL_FILE_URL)
|| name.prefix_match_ci(N_PARTITION_LIST_COL);
}
int ObResolverUtils::build_file_column_expr(ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
ObCharsetType cs_type,
const ObColumnSchemaV2 *generated_column)
bool ObResolverUtils::is_external_file_column_name(const ObString &name)
{
ObExternalFileFormat::FormatType type = resolve_external_file_column_type(name);
return (type > ObExternalFileFormat::INVALID_FORMAT && type < ObExternalFileFormat::MAX_FORMAT);
}
ObExternalFileFormat::FormatType ObResolverUtils::resolve_external_file_column_type(const ObString &name)
{
ObExternalFileFormat::FormatType type = ObExternalFileFormat::INVALID_FORMAT;
if (name.prefix_match_ci(N_EXTERNAL_FILE_COLUMN_PREFIX)) {
type = ObExternalFileFormat::CSV_FORMAT;
} else if (0 == name.case_compare(N_EXTERNAL_FILE_ROW)) {
type = ObExternalFileFormat::PARQUET_FORMAT;
}
return type;
}
int ObResolverUtils::build_file_column_expr_for_parquet(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
ObRawExpr *get_path_expr,
ObRawExpr *cast_expr,
const ObColumnSchemaV2 *generated_column,
ObRawExpr *&expr)
{
int ret = OB_SUCCESS;
ObPseudoColumnRawExpr *file_column_expr = nullptr;
ObItemType type = T_INVALID;
uint64_t extra = UINT64_MAX;
ObRawExpr *path_expr = nullptr;
if (column_name.case_compare(N_EXTERNAL_FILE_URL) == 0) {
type = T_PSEUDO_EXTERNAL_FILE_URL;
extra = UINT64_MAX;
} else if (column_name.prefix_match_ci(N_PARTITION_LIST_COL)) {
type = T_PSEUDO_PARTITION_LIST_COL;
extra = column_idx;
} else if (column_name.prefix_match_ci(N_EXTERNAL_FILE_COLUMN_PREFIX)) {
type = T_PSEUDO_EXTERNAL_FILE_COL;
extra = column_idx;
} else {
if (OB_FAIL(expr_factory.create_raw_expr(T_PSEUDO_EXTERNAL_FILE_COL, file_column_expr))) {
LOG_WARN("create nextval failed", K(ret));
} else if (OB_ISNULL(file_column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not valid column type", K(column_name), K(ret));
LOG_WARN("expr is null", K(ret));
} else {
file_column_expr->set_expr_name(column_name);
file_column_expr->set_table_name(table_name);
file_column_expr->set_table_id(table_id);
file_column_expr->set_explicited_reference();
if (OB_ISNULL(get_path_expr) || OB_ISNULL(path_expr = get_path_expr->get_param_expr(1))) {
ret = OB_ERR_UNEXPECTED;
}
if (OB_SUCC(ret)) {
//get type
if (OB_NOT_NULL(cast_expr)) {
bool enable_decimalint = false;
ObExprResType dst_type;
ObConstRawExpr *const_cast_type_expr = static_cast<ObConstRawExpr *>(cast_expr->get_param_expr(1));
if (!const_cast_type_expr->get_value().is_int()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support non-const expr", K(ret), KPC(const_cast_type_expr));
} else if (OB_FAIL(const_cast_type_expr->formalize(&session_info))) {
LOG_WARN("fail to formalize expr", K(ret));
} else if (OB_FAIL(ObSQLUtils::check_enable_decimalint(&session_info, enable_decimalint))) {
LOG_WARN("fail to check_enable_decimalint", K(ret));
} else if (OB_FAIL(ObExprCast::get_cast_type(enable_decimalint,
const_cast_type_expr->get_result_type(),
cast_expr->get_extra(), dst_type))) {
LOG_WARN("get cast dest type failed", K(ret));
} else {
if (dst_type.is_string_or_lob_locator_type()) {
// string data stored in parquet file as UTF8 format
dst_type.set_collation_type(CS_TYPE_UTF8MB4_BIN);
}
file_column_expr->set_result_type(dst_type);
}
} else if (OB_NOT_NULL(generated_column)) {
ObColumnRefRawExpr *column_expr = nullptr;
if (OB_FAIL(ObRawExprUtils::build_column_expr(expr_factory, *generated_column, column_expr))) {
LOG_WARN("failed to build column expr", K(ret));
} else {
file_column_expr->set_accuracy(column_expr->get_accuracy());
file_column_expr->set_data_type(column_expr->get_data_type());
file_column_expr->set_collation_type(column_expr->get_collation_type());
file_column_expr->set_collation_level(column_expr->get_collation_level());
if (column_expr->get_result_type().is_string_or_lob_locator_type()
&& ObCharset::charset_type_by_coll(column_expr->get_collation_type()) != CHARSET_UTF8MB4) {
// string data stored in parquet file as UTF8 format
file_column_expr->set_collation_type(CS_TYPE_UTF8MB4_BIN);
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected arg", K(ret));
}
}
if (OB_SUCC(ret)) {
//get path
if (OB_FAIL(path_expr->formalize(&session_info))) {
LOG_WARN("fail to formalize expr", K(ret));
} else if (!path_expr->is_static_const_expr()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support non-const expr", K(ret), KPC(path_expr));
} else {
ObConstRawExpr *const_path_expr = static_cast<ObConstRawExpr *>(path_expr);
if (!const_path_expr->get_value().is_string_type()) {
ret = OB_NOT_SUPPORTED;
} else {
file_column_expr->set_data_access_path(const_path_expr->get_value().get_string());
}
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(expr_factory.create_raw_expr(type, file_column_expr))) {
if (OB_SUCC(ret)) {
if (OB_FAIL(file_column_expr->formalize(&session_info))) {
LOG_WARN("failed to extract info", K(ret));
} else {
expr = file_column_expr;
}
}
return ret;
}
int ObResolverUtils::build_file_row_expr_for_parquet(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
ObRawExpr *&expr)
{
int ret = OB_SUCCESS;
ObPseudoColumnRawExpr *file_column_expr = nullptr;
if (OB_FAIL(expr_factory.create_raw_expr(T_PSEUDO_EXTERNAL_FILE_ROW, file_column_expr))) {
LOG_WARN("create nextval failed", K(ret));
} else if (OB_ISNULL(file_column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr is null", K(ret));
} else {
file_column_expr->set_expr_name(column_name);
file_column_expr->set_table_name(table_name);
file_column_expr->set_table_id(table_id);
file_column_expr->set_explicited_reference();
file_column_expr->set_data_type(ObVarcharType);
file_column_expr->set_collation_type(CS_TYPE_BINARY);
}
if (OB_SUCC(ret)) {
if (OB_FAIL(file_column_expr->formalize(&session_info))) {
LOG_WARN("failed to extract info", K(ret));
} else {
expr = file_column_expr;
}
}
return ret;
}
int ObResolverUtils::build_file_column_expr_for_csv(ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
const ObExternalFileFormat &format)
{
int ret = OB_SUCCESS;
ObPseudoColumnRawExpr *file_column_expr = nullptr;
ObItemType type = T_PSEUDO_EXTERNAL_FILE_COL;
uint64_t extra = column_idx;
if (OB_FAIL(expr_factory.create_raw_expr(type, file_column_expr))) {
LOG_WARN("create nextval failed", K(ret));
} else if (OB_ISNULL(file_column_expr)) {
ret = OB_ERR_UNEXPECTED;
@ -4886,36 +5057,106 @@ int ObResolverUtils::build_file_column_expr(ObRawExprFactory &expr_factory,
file_column_expr->set_table_id(table_id);
file_column_expr->set_explicited_reference();
file_column_expr->set_extra(extra);
if (type == T_PSEUDO_PARTITION_LIST_COL) {
if (OB_ISNULL(generated_column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("generated column is null", K(ret));
} else {
const ObAccuracy &accuracy = generated_column->get_accuracy();
file_column_expr->set_data_type(generated_column->get_data_type());
file_column_expr->set_result_flag(ObRawExprUtils::calc_column_result_flag(*generated_column));
file_column_expr->set_accuracy(accuracy);
if (ob_is_string_type(generated_column->get_data_type())
|| ob_is_enumset_tc(generated_column->get_data_type())
|| ob_is_json_tc(generated_column->get_data_type())
|| ob_is_geometry_tc(generated_column->get_data_type())) {
file_column_expr->set_collation_type(generated_column->get_collation_type());
file_column_expr->set_collation_level(CS_LEVEL_IMPLICIT);
} else {
file_column_expr->set_collation_type(CS_TYPE_BINARY);
file_column_expr->set_collation_level(CS_LEVEL_NUMERIC);
}
}
file_column_expr->set_data_type(ObVarcharType);
file_column_expr->set_collation_type(ObCharset::get_default_collation(format.csv_format_.cs_type_));
file_column_expr->set_collation_level(CS_LEVEL_IMPLICIT);
file_column_expr->set_length(OB_MAX_VARCHAR_LENGTH);
if (lib::is_oracle_mode()) {
file_column_expr->set_length_semantics(LS_BYTE);
}
if (OB_FAIL(file_column_expr->formalize(&session_info))) {
LOG_WARN("failed to extract info", K(ret));
} else {
file_column_expr->set_data_type(ObVarcharType);
file_column_expr->set_collation_type(ObCharset::get_default_collation(cs_type));
file_column_expr->set_length(OB_MAX_VARCHAR_LENGTH);
if (lib::is_oracle_mode()) {
file_column_expr->set_length_semantics(LS_BYTE);
expr = file_column_expr;
}
}
return ret;
}
int ObResolverUtils::build_file_column_expr_for_partition_list_col(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
const ObColumnSchemaV2 *generated_column)
{
int ret = OB_SUCCESS;
ObPseudoColumnRawExpr *file_column_expr = nullptr;
ObColumnRefRawExpr *column_expr = nullptr;
ObItemType type = T_PSEUDO_PARTITION_LIST_COL;
uint64_t extra = column_idx;
if (OB_FAIL(expr_factory.create_raw_expr(type, file_column_expr))) {
LOG_WARN("create nextval failed", K(ret));
} else if (OB_ISNULL(file_column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr is null", K(ret));
} else if (OB_ISNULL(generated_column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("gen column schema is null", K(ret));
} else {
file_column_expr->set_expr_name(column_name);
file_column_expr->set_table_name(table_name);
file_column_expr->set_table_id(table_id);
file_column_expr->set_explicited_reference();
file_column_expr->set_extra(extra);
if (OB_FAIL(ObRawExprUtils::build_column_expr(expr_factory, *generated_column, column_expr))) {
LOG_WARN("failed to build column expr", K(ret));
} else {
file_column_expr->set_accuracy(column_expr->get_accuracy());
file_column_expr->set_data_type(column_expr->get_data_type());
file_column_expr->set_collation_type(column_expr->get_collation_type());
file_column_expr->set_collation_level(column_expr->get_collation_level());
if (OB_FAIL(file_column_expr->formalize(&session_info))) {
LOG_WARN("failed to extract info", K(ret));
} else {
expr = file_column_expr;
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(file_column_expr->formalize(&session_info))) {
}
return ret;
}
int ObResolverUtils::build_file_column_expr_for_file_url(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const ObString &table_name,
const ObString &column_name,
ObRawExpr *&expr)
{
int ret = OB_SUCCESS;
ObPseudoColumnRawExpr *file_column_expr = nullptr;
ObItemType type = T_PSEUDO_EXTERNAL_FILE_URL;
uint64_t extra = UINT64_MAX;
if (OB_FAIL(expr_factory.create_raw_expr(type, file_column_expr))) {
LOG_WARN("create nextval failed", K(ret));
} else if (OB_ISNULL(file_column_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr is null", K(ret));
} else {
file_column_expr->set_expr_name(column_name);
file_column_expr->set_table_name(table_name);
file_column_expr->set_table_id(table_id);
file_column_expr->set_explicited_reference();
file_column_expr->set_extra(extra);
file_column_expr->set_data_type(ObVarcharType);
file_column_expr->set_collation_type(CS_TYPE_UTF8MB4_BIN);
file_column_expr->set_collation_level(CS_LEVEL_IMPLICIT);
file_column_expr->set_length(OB_MAX_VARCHAR_LENGTH);
if (lib::is_oracle_mode()) {
file_column_expr->set_length_semantics(LS_BYTE);
}
if (OB_FAIL(file_column_expr->formalize(&session_info))) {
LOG_WARN("failed to extract info", K(ret));
} else {
expr = file_column_expr;
@ -4963,7 +5204,7 @@ int ObResolverUtils::resolve_generated_column_expr(ObResolverParams &params,
} else if (lib::is_oracle_mode() && q_name.is_pl_udf()) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "using udf as generated column");
LOG_WARN("using udf as generated column is not supported", K(ret));
LOG_WARN("using udf as generated column is not supported", K(ret), K(q_name));
// OZ (ObRawExprUtils::resolve_gen_column_udf_expr(expr,
// const_cast<ObQualifiedName &>(q_name),
// *expr_factory,

View File

@ -774,15 +774,50 @@ public:
int64_t column_idx,
const ObString &expr_name);
static int calc_file_column_idx(const ObString &column_name, uint64_t &file_column_idx);
static int build_file_column_expr(ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
ObCharsetType cs_type,
const ObColumnSchemaV2 *generated_column = NULL);
static int build_file_column_expr_for_csv(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
const ObExternalFileFormat &format);
static int build_file_column_expr_for_partition_list_col(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
int64_t column_idx,
ObRawExpr *&expr,
const ObColumnSchemaV2 *generated_column);
static int build_file_column_expr_for_file_url(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
ObRawExpr *&expr);
static int build_file_row_expr_for_parquet(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
ObRawExpr *&expr);
static int build_file_column_expr_for_parquet(
ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const uint64_t table_id,
const common::ObString &table_name,
const common::ObString &column_name,
ObRawExpr *get_path_expr,
ObRawExpr *cast_expr,
const ObColumnSchemaV2 *generated_column,
ObRawExpr *&expr);
//only used for DDL resolver, resolve a PSEUDO column expr for validation and printer not for execution
static int resolve_external_table_column_def(ObRawExprFactory &expr_factory,
const ObSQLSessionInfo &session_info,
const ObQualifiedName &q_name,
@ -790,6 +825,8 @@ public:
ObRawExpr *&expr,
const ObColumnSchemaV2 *gen_col_schema = NULL);
static bool is_external_file_column_name(const common::ObString &name);
static bool is_external_pseudo_column_name(const common::ObString &name);
static ObExternalFileFormat::FormatType resolve_external_file_column_type(const common::ObString &name);
static int resolve_file_format_string_value(const ParseNode *node,
const ObCharsetType &format_charset,

View File

@ -5315,7 +5315,7 @@ desc oceanbase.DBA_OB_EXTERNAL_TABLE_FILES;
Field Type Null Key Default Extra
TABLE_NAME varchar(256) NO
TABLE_SCHEMA varchar(128) NO
PARTITION_NAME varchar(2) NO
PARTITION_NAME varchar(64) NO
FILE_URL varbinary(16384) NO NULL
FILE_SIZE bigint(20) NO NULL
select /*+QUERY_TIMEOUT(60000000)*/ count(*) as cnt from (select * from oceanbase.DBA_OB_EXTERNAL_TABLE_FILES limit 1);
@ -5325,7 +5325,7 @@ desc oceanbase.ALL_OB_EXTERNAL_TABLE_FILES;
Field Type Null Key Default Extra
TABLE_NAME varchar(256) NO
TABLE_SCHEMA varchar(128) NO
PARTITION_NAME varchar(2) NO
PARTITION_NAME varchar(64) NO
FILE_URL varbinary(16384) NO NULL
FILE_SIZE bigint(20) NO NULL
select /*+QUERY_TIMEOUT(60000000)*/ count(*) as cnt from (select * from oceanbase.ALL_OB_EXTERNAL_TABLE_FILES limit 1);

View File

@ -7464,7 +7464,7 @@ desc oceanbase.DBA_OB_EXTERNAL_TABLE_FILES;
Field Type Null Key Default Extra
TABLE_NAME varchar(256) NO
TABLE_SCHEMA varchar(128) NO
PARTITION_NAME varchar(2) NO
PARTITION_NAME varchar(64) NO
FILE_URL varbinary(16384) NO NULL
FILE_SIZE bigint(20) NO NULL
select /*+QUERY_TIMEOUT(60000000)*/ count(*) as cnt from (select * from oceanbase.DBA_OB_EXTERNAL_TABLE_FILES limit 1);
@ -7474,7 +7474,7 @@ desc oceanbase.ALL_OB_EXTERNAL_TABLE_FILES;
Field Type Null Key Default Extra
TABLE_NAME varchar(256) NO
TABLE_SCHEMA varchar(128) NO
PARTITION_NAME varchar(2) NO
PARTITION_NAME varchar(64) NO
FILE_URL varbinary(16384) NO NULL
FILE_SIZE bigint(20) NO NULL
select /*+QUERY_TIMEOUT(60000000)*/ count(*) as cnt from (select * from oceanbase.ALL_OB_EXTERNAL_TABLE_FILES limit 1);
@ -7575,7 +7575,7 @@ Field Type Null Key Default Extra
TENANT_ID bigint(20) NO NULL
TABLE_NAME varchar(256) NO
TABLE_SCHEMA varchar(128) NO
PARTITION_NAME varchar(2) NO
PARTITION_NAME varchar(64) NO
FILE_URL varbinary(16384) NO NULL
FILE_SIZE bigint(20) NO NULL
select /*+QUERY_TIMEOUT(60000000)*/ count(*) as cnt from (select * from oceanbase.CDB_OB_EXTERNAL_TABLE_FILES limit 1);