/** * Copyright (c) 2023 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL #include "gtest/gtest.h" #include #include #include #include #include #include #include #include "lib/allocator/page_arena.h" #include "lib/file/ob_file.h" #include "lib/file/file_directory_utils.h" #define USING_LOG_PREFIX SQL using namespace oceanbase::common; class TestParquet: public ::testing::Test { public: TestParquet(); virtual ~TestParquet(); virtual void SetUp(); virtual void TearDown(); }; TestParquet::TestParquet() { } TestParquet::~TestParquet() { } void TestParquet::SetUp() { } void TestParquet::TearDown() { } constexpr int NUM_ROWS_PER_ROW_GROUP = 500; const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet"; // #0 Build dummy data to pass around // To have some input data, we first create an Arrow Table that holds // some data. std::shared_ptr generate_table() { arrow::Int64Builder i64builder; PARQUET_THROW_NOT_OK(i64builder.AppendValues({1, 2, 3, 4, 5})); std::shared_ptr i64array; PARQUET_THROW_NOT_OK(i64builder.Finish(&i64array)); arrow::StringBuilder strbuilder; PARQUET_THROW_NOT_OK(strbuilder.Append("some")); PARQUET_THROW_NOT_OK(strbuilder.Append("string")); PARQUET_THROW_NOT_OK(strbuilder.Append("content")); PARQUET_THROW_NOT_OK(strbuilder.Append("in")); PARQUET_THROW_NOT_OK(strbuilder.Append("rows")); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); std::shared_ptr schema = arrow::schema( {arrow::field("int", arrow::int64()), arrow::field("str", arrow::utf8())}); return arrow::Table::Make(schema, {i64array, strarray}); } // #1 Write out the data as a Parquet file void write_parquet_file(const arrow::Table& table) { std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW( outfile, arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet")); // The last argument to the function call is the size of the RowGroup in // the parquet file. Normally you would choose this to be rather large but // for the example, we use a small value to have multiple RowGroups. PARQUET_THROW_NOT_OK( parquet::arrow::WriteTable(table, arrow::default_memory_pool(), outfile, 3)); } // #2: Fully read in the file void read_whole_file() { std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl; std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", arrow::default_memory_pool())); std::unique_ptr reader; PARQUET_THROW_NOT_OK( parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr table; PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns() << " columns." << std::endl; } // #3: Read only a single RowGroup of the parquet file void read_single_rowgroup() { std::cout << "Reading first RowGroup of parquet-arrow-example.parquet" << std::endl; std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", arrow::default_memory_pool())); std::unique_ptr reader; PARQUET_THROW_NOT_OK( parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr table; PARQUET_THROW_NOT_OK(reader->RowGroup(0)->ReadTable(&table)); std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns() << " columns." << std::endl; } // #4: Read only a single column of the whole parquet file void read_single_column() { std::cout << "Reading first column of parquet-arrow-example.parquet" << std::endl; std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", arrow::default_memory_pool())); std::unique_ptr reader; PARQUET_THROW_NOT_OK( parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr array; PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array)); PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout)); std::cout << std::endl; } // #5: Read only a single column of a RowGroup (this is known as ColumnChunk) // from the Parquet file. void read_single_column_chunk() { std::cout << "Reading first ColumnChunk of the first RowGroup of " "parquet-arrow-example.parquet" << std::endl; std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open("parquet-arrow-example.parquet", arrow::default_memory_pool())); std::unique_ptr reader; PARQUET_THROW_NOT_OK( parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr array; PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array)); PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout)); std::cout << std::endl; } class ObParquetAllocator : public ::arrow::MemoryPool { public: /// Allocate a new memory region of at least size bytes. /// /// The allocated region shall be 64-byte aligned. virtual arrow::Status Allocate(int64_t size, uint8_t** out) override { arrow::Status ret = arrow::Status::OK(); void *buf = alloc_.alloc_aligned(size, 64); if (OB_ISNULL(buf)) { ret = arrow::Status::Invalid("allocate memory failed"); } else { *out = static_cast(buf); } std::cout << "Allocing : " << size << std::endl; return arrow::Status::OK(); } /// Resize an already allocated memory section. /// /// As by default most default allocators on a platform don't support aligned /// reallocation, this function can involve a copy of the underlying data. virtual arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { std::cout << "Reallocing : " << old_size << ',' << new_size << std::endl; return Allocate(new_size, ptr); } /// Free an allocated region. /// /// @param buffer Pointer to the start of the allocated memory region /// @param size Allocated size located at buffer. An allocator implementation /// may use this for tracking the amount of allocated bytes as well as for /// faster deallocation if supported by its backend. virtual void Free(uint8_t* buffer, int64_t size) { std::cout << "Freed : " << size << std::endl; alloc_.free(buffer); } /// Return unused memory to the OS /// /// Only applies to allocators that hold onto unused memory. This will be /// best effort, a memory pool may not implement this feature or may be /// unable to fulfill the request due to fragmentation. virtual void ReleaseUnused() { std::cout << "ReleaseUnused" << std::endl; } /// The number of bytes that were allocated and not yet free'd through /// this allocator. virtual int64_t bytes_allocated() const override { std::cout << "bytes_allocated()" << std::endl; return alloc_.total(); } /// Return peak memory allocation in this memory pool /// /// \return Maximum bytes allocated. If not known (or not implemented), /// returns -1 virtual int64_t max_memory() const override { return -1; } /// The name of the backend used by this MemoryPool (e.g. "system" or "jemalloc"). virtual std::string backend_name() const override { return "Parquet"; } private: ObArenaAllocator alloc_; arrow::internal::MemoryPoolStats stats_; }; class ObExternalFileReader : public arrow::io::RandomAccessFile { public: ObExternalFileReader(const char*file_name, arrow::MemoryPool *pool) { file_reader_.open(file_name, false); pool_ = pool; file_name_ = file_name; } ~ObExternalFileReader() override {} virtual arrow::Status Close() override; virtual bool closed() const override; virtual arrow::Result Read(int64_t nbytes, void* out) override; virtual arrow::Result> Read(int64_t nbytes) override; virtual arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override; virtual arrow::Result> ReadAt(int64_t position, int64_t nbytes) override; virtual arrow::Status Seek(int64_t position) override; virtual arrow::Result Tell() const override; virtual arrow::Result GetSize() override; private: ObFileReader file_reader_; int64_t position_; arrow::MemoryPool *pool_; const char* file_name_; }; arrow::Status ObExternalFileReader::Seek(int64_t position) { std::cout<< "ObExternalFileReader::Seek" << std::endl; position_ = position; return arrow::Status::OK(); } arrow::Result ObExternalFileReader::Read(int64_t nbytes, void *out) { std::cout<< "ObExternalFileReader::Read(int64_t nbytes, void *out)" << std::endl; int64_t read_size = -1; file_reader_.pread(out, nbytes, position_, read_size); position_ += read_size; return read_size; } arrow::Result> ObExternalFileReader::Read(int64_t nbytes) { std::cout<< "ObExternalFileReader::Read(int64_t nbytes)" << std::endl; ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } return std::move(buffer); } arrow::Result ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes, void* out) { std::cout<< "ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes, void* out)" << std::endl; int64_t read_size = -1; file_reader_.pread(out, nbytes, position, read_size); position_ = position + read_size; return read_size; } arrow::Result> ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes) { std::cout<< "ObExternalFileReader::ReadAt(int64_t position, int64_t nbytes)" << std::endl; ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buffer->mutable_data())); if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); buffer->ZeroPadding(); } return std::move(buffer); } arrow::Result ObExternalFileReader::Tell() const { std::cout<< "ObExternalFileReader::Tell()" << std::endl; return position_; } arrow::Result ObExternalFileReader::GetSize() { std::cout<< "ObExternalFileReader::GetSize()" << std::endl; int64_t file_size = 0; FileDirectoryUtils::get_file_size(file_name_, file_size); return file_size; } arrow::Status ObExternalFileReader::Close() { std::cout<< "ObExternalFileReader::Close()" << std::endl; file_reader_.close(); return arrow::Status::OK(); } bool ObExternalFileReader::closed() const { std::cout<< "ObExternalFileReader::closed()" << std::endl; return !file_reader_.is_opened(); } void read_column_schema() { std::cout << "Reading column schema " "parquet-arrow-example.parquet" << std::endl; ObParquetAllocator alloc; parquet::ReaderProperties read_props(&alloc); std::cout<< "create parquet_file : " << std::endl; std::shared_ptr reader = std::make_shared("parquet-arrow-example.parquet", &alloc); std::cout<< "create file reader : " << std::endl; std::unique_ptr parquet_reader = parquet::ParquetFileReader::Open(reader, read_props); // Get the File MetaData std::shared_ptr file_metadata = parquet_reader->metadata(); int num_row_groups = file_metadata->num_row_groups(); int num_columns = file_metadata->num_columns(); std::cout<< "num_row_groups : " << num_row_groups << std::endl; std::cout<< "num_columns : " << num_columns << std::endl; for (int i = 0; i < num_columns; i++) { std::cout<<"Path="<schema()->Column(i)->path()->ToDotString()<RowGroup(i)->ColumnChunk(j)->type(); std::cout<<"ColumnType="< row_group_reader = parquet_reader->RowGroup(r); std::shared_ptr column_reader; column_reader = row_group_reader->Column(0); parquet::Int64Reader* int64_reader = static_cast(column_reader.get()); while (int64_reader->HasNext()) { std::cout << "before int64: " << std::endl; rows_read = int64_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); std::cout << "read int64: " << value << std::endl; std::cout << "read rows: " << rows_read << std::endl; std::cout << "read values_read: " << values_read << std::endl; } column_reader = row_group_reader->Column(1); parquet::ByteArrayReader* ba_reader = static_cast(column_reader.get()); while (ba_reader->HasNext()) { parquet::ByteArray value; std::cout << "before bytes: " << std::endl; rows_read = ba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); std::cout << "read bytes: " << std::string(pointer_cast(value.ptr), value.len) << std::endl; std::cout << "read rows: " << rows_read << std::endl; std::cout << "read values_read: " << values_read << std::endl; } } } using parquet::ConvertedType; using parquet::Repetition; using parquet::Type; using parquet::schema::GroupNode; using parquet::schema::PrimitiveNode; constexpr int FIXED_LENGTH = 10; constexpr int FIXED_LENGTH_DEC = 14; static std::shared_ptr SetupSchema() { parquet::schema::NodeVector fields; fields.push_back(PrimitiveNode::Make("int", Repetition::OPTIONAL, parquet::LogicalType::Int(64, true), Type::INT64)); fields.push_back(PrimitiveNode::Make("string", Repetition::OPTIONAL, parquet::LogicalType::String(), Type::BYTE_ARRAY)); fields.push_back(PrimitiveNode::Make("decimal32", Repetition::OPTIONAL, parquet::LogicalType::Decimal(6, 3), Type::INT32)); fields.push_back(PrimitiveNode::Make("decimal64", Repetition::OPTIONAL, parquet::LogicalType::Decimal(10, 0), Type::INT64)); fields.push_back(PrimitiveNode::Make("decimalbytearr", Repetition::OPTIONAL, parquet::LogicalType::Decimal(20, 3), Type::BYTE_ARRAY)); fields.push_back(PrimitiveNode::Make("date", Repetition::OPTIONAL, parquet::LogicalType::Date(), Type::INT32)); fields.push_back(PrimitiveNode::Make("timestamp", Repetition::OPTIONAL, parquet::LogicalType::Timestamp(true, parquet::LogicalType::TimeUnit::MICROS), Type::INT64)); return std::static_pointer_cast( GroupNode::Make("schema", Repetition::REQUIRED, fields)); } void gen_test_parquet() { /********************************************************************************** PARQUET WRITER EXAMPLE **********************************************************************************/ // parquet::REQUIRED fields do not need definition and repetition level values // parquet::OPTIONAL fields require only definition level values // parquet::REPEATED fields require both definition and repetition level values try { // Create a local file output stream instance. using FileClass = ::arrow::io::FileOutputStream; std::shared_ptr out_file; PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(PARQUET_FILENAME)); // Setup the parquet schema std::shared_ptr schema = SetupSchema(); // Add writer properties parquet::WriterProperties::Builder builder; builder.compression(parquet::Compression::SNAPPY); std::shared_ptr props = builder.build(); // Create a ParquetFileWriter instance std::shared_ptr file_writer = parquet::ParquetFileWriter::Open(out_file, schema, props); // Append a RowGroup with a specific number of rows. parquet::RowGroupWriter* rg_writer = NULL; /* // Write the Bool column parquet::BoolWriter* bool_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { bool value = ((i % 2) == 0) ? true : false; bool_writer->WriteBatch(1, nullptr, nullptr, &value); } // Write the Int32 column parquet::Int32Writer* int32_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { int32_t value = i; int32_writer->WriteBatch(1, nullptr, nullptr, &value); } // Write the Int64 column. Each row has repeats twice. parquet::Int64Writer* int64_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) { int64_t value = i * 1000 * 1000; value *= 1000 * 1000; int16_t definition_level = 1; int16_t repetition_level = 0; if ((i % 2) == 0) { repetition_level = 1; // start of a new record } int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value); } // Write the INT96 column. parquet::Int96Writer* int96_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { parquet::Int96 value; value.value[0] = i; value.value[1] = i + 1; value.value[2] = i + 2; int96_writer->WriteBatch(1, nullptr, nullptr, &value); } // Write the Float column parquet::FloatWriter* float_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { float value = static_cast(i) * 1.1f; float_writer->WriteBatch(1, nullptr, nullptr, &value); } // Write the Double column parquet::DoubleWriter* double_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { double value = i * 1.1111111; double_writer->WriteBatch(1, nullptr, nullptr, &value); } // Write the ByteArray column. Make every alternate values NULL parquet::ByteArrayWriter* ba_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { parquet::ByteArray value; char hello[FIXED_LENGTH] = "parquet"; hello[7] = static_cast(static_cast('0') + i / 100); hello[8] = static_cast(static_cast('0') + (i / 10) % 10); hello[9] = static_cast(static_cast('0') + i % 10); if (i % 2 == 0) { int16_t definition_level = 1; value.ptr = reinterpret_cast(&hello[0]); value.len = FIXED_LENGTH; ba_writer->WriteBatch(1, &definition_level, nullptr, &value); } else { int16_t definition_level = 0; ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); } } // Write the FixedLengthByteArray column parquet::FixedLenByteArrayWriter* flba_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { parquet::FixedLenByteArray value; char v = static_cast(i); char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; value.ptr = reinterpret_cast(&flba[0]); flba_writer->WriteBatch(1, nullptr, nullptr, &value); } */ #define HAS_NULL 1 for (int j = 0; j < 10; j++) { rg_writer = file_writer->AppendRowGroup(); parquet::Int64Writer* int64_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { if (HAS_NULL && i % 2 == 0) { int16_t definition_level = 0; int64_writer->WriteBatch(1, &definition_level, nullptr, NULL); } else { int64_t value = i * 1000 * 1000 + j; value *= 1000 * 1000; int16_t definition_level = 1; int64_writer->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::ByteArrayWriter* ba_writer = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { parquet::ByteArray value; char hello[FIXED_LENGTH] = "parquet"; hello[7] = static_cast(static_cast('0') + i / 100); hello[8] = static_cast(static_cast('0') + (i / 10) % 10); hello[9] = static_cast(static_cast('0') + i % 10); if (HAS_NULL && i % 2 == 1) { int16_t definition_level = 0; ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); } else { int16_t definition_level = 1; value.ptr = reinterpret_cast(&hello[0]); value.len = FIXED_LENGTH; ba_writer->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::Int32Writer* int32_writer2 = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { if (HAS_NULL && i % 3 == 0) { int16_t definition_level = 0; int32_writer2->WriteBatch(1, &definition_level, nullptr, NULL); } else { int32_t value = j * 10000 + i; int16_t definition_level = 1; int32_writer2->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::Int64Writer* int64_writer2 = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { if (HAS_NULL && i % 3 == 1) { int16_t definition_level = 0; int64_writer2->WriteBatch(1, &definition_level, nullptr, NULL); } else { int64_t value = (j * 10000 + i) * 10000; int16_t definition_level = 1; int64_writer2->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::ByteArrayWriter* ba_writer2 = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { parquet::ByteArray value; char hello[FIXED_LENGTH_DEC] = "1234567890."; hello[11] = static_cast(static_cast('0') + i / 100); hello[12] = static_cast(static_cast('0') + (i / 10) % 10); hello[13] = static_cast(static_cast('0') + i % 10); if (HAS_NULL && i % 5 == 0) { int16_t definition_level = 0; ba_writer2->WriteBatch(1, &definition_level, nullptr, nullptr); } else { int16_t definition_level = 1; value.ptr = reinterpret_cast(&hello[0]); value.len = FIXED_LENGTH_DEC; ba_writer2->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::Int32Writer* int32_writer3 = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { if (HAS_NULL && i % 6 == 0) { int16_t definition_level = 0; int32_writer3->WriteBatch(1, &definition_level, nullptr, NULL); } else { int32_t value = 19857 + 365 * 10 * j + i; int16_t definition_level = 1; int32_writer3->WriteBatch(1, &definition_level, nullptr, &value); } } parquet::Int64Writer* int64_writer3 = static_cast(rg_writer->NextColumn()); for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { if (HAS_NULL && i % 3 == 1) { int16_t definition_level = 0; int64_writer3->WriteBatch(1, &definition_level, nullptr, NULL); } else { int64_t value = (1716887565LL + i * 3600) * 1000 * 1000; int16_t definition_level = 1; int64_writer3->WriteBatch(1, &definition_level, nullptr, &value); } } } // Close the ParquetFileWriter file_writer->Close(); // Write the bytes to file DCHECK(out_file->Close().ok()); } catch (const std::exception& e) { std::cerr << "Parquet write error: " << e.what() << std::endl; } } TEST_F(TestParquet, example1) { std::shared_ptr table = generate_table(); write_parquet_file(*table); read_whole_file(); read_single_rowgroup(); read_single_column(); read_single_column_chunk(); read_column_schema(); gen_test_parquet(); } int main(int argc, char **argv) { OB_LOGGER.set_log_level("INFO"); testing::InitGoogleTest(&argc,argv); return RUN_ALL_TESTS(); }