[optimize] Optimize spark load/broker load reading parquet format file (#3878)

Add BufferedReader for reading parquet file via broker
This commit is contained in:
xy720
2020-06-23 13:42:22 +08:00
committed by GitHub
parent e5da108110
commit c50a310f8f
11 changed files with 415 additions and 2 deletions

View File

@ -30,6 +30,7 @@ set(EXEC_FILES
blocking_join_node.cpp
broker_scan_node.cpp
broker_reader.cpp
buffered_reader.cpp
base_scanner.cpp
broker_scanner.cpp
cross_join_node.cpp

View File

@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea
return status;
}
VLOG_RPC << "send pread request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes;
try {
client->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
@ -253,3 +255,4 @@ void BrokerReader::close() {
}
} // namespace doris

View File

@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/buffered_reader.h"
#include <sstream>
#include <algorithm>
#include "common/logging.h"
namespace doris {
// buffered reader
BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
: _reader(reader),
_buffer_size(buffer_size),
_buffer_offset(0),
_buffer_limit(0),
_cur_offset(0) {
_buffer = new char[_buffer_size];
}
BufferedReader::~BufferedReader() {
close();
}
Status BufferedReader::open() {
if (!_reader) {
std::stringstream ss;
ss << "Open buffered reader failed, reader is null";
return Status::InternalError(ss.str());
}
RETURN_IF_ERROR(_reader->open());
RETURN_IF_ERROR(_fill());
return Status::OK();
}
//not support
Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
return Status::NotSupported("Not support");
}
Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
DCHECK_NE(*buf_len, 0);
int64_t bytes_read;
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
if (bytes_read == 0) {
*eof = true;
} else {
*eof = false;
}
return Status::OK();
}
Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
if (nbytes <= 0) {
*bytes_read = 0;
return Status::OK();
}
RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
//EOF
if (*bytes_read <= 0) {
return Status::OK();
}
while (*bytes_read < nbytes) {
int64_t len;
RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
// EOF
if (len <= 0) {
break;
}
*bytes_read += len;
}
return Status::OK();
}
Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
// requested bytes missed the local buffer
if (position >= _buffer_limit || position < _buffer_offset) {
// if requested length is larger than the capacity of buffer, do not
// need to copy the character into local buffer.
if (nbytes > _buffer_size) {
return _reader->readat(position, nbytes, bytes_read, out);
}
_buffer_offset = position;
RETURN_IF_ERROR(_fill());
if (position >= _buffer_limit) {
*bytes_read = 0;
return Status::OK();
}
}
int64_t len = std::min(_buffer_limit - position, nbytes);
int64_t off = position - _buffer_offset;
memcpy(out, _buffer + off, len);
*bytes_read = len;
_cur_offset = position + *bytes_read;
return Status::OK();
}
Status BufferedReader::_fill() {
if (_buffer_offset >= 0) {
int64_t bytes_read;
// retry for new content
int retry_times = 1;
do {
// fill the buffer
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
} while (bytes_read == 0 && retry_times++ < 2);
_buffer_limit = _buffer_offset + bytes_read;
}
return Status::OK();
}
int64_t BufferedReader::size() {
return _reader->size();
}
Status BufferedReader::seek(int64_t position) {
_cur_offset = position;
return Status::OK();
}
Status BufferedReader::tell(int64_t* position) {
*position = _cur_offset;
return Status::OK();
}
void BufferedReader::close() {
_reader->close();
SAFE_DELETE_ARRAY(_buffer);
}
bool BufferedReader::closed() {
return _reader->closed();
}
} // namespace doris

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include "common/status.h"
#include "olap/olap_define.h"
#include "exec/file_reader.h"
namespace doris {
// Buffered Reader
// Add a cache layer between the caller and the file reader to reduce the
// times of calls to the read function to speed up.
class BufferedReader : public FileReader {
public:
// If the reader need the file size, set it when construct FileReader.
// There is no other way to set the file size.
BufferedReader(FileReader* reader, int64_t = 1024 * 1024);
virtual ~BufferedReader();
virtual Status open() override;
// Read
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
virtual void close() override;
virtual bool closed() override;
private:
Status _fill();
Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);
private:
FileReader* _reader;
char* _buffer;
int64_t _buffer_size;
int64_t _buffer_offset;
int64_t _buffer_limit;
int64_t _cur_offset;
};
}

View File

@ -158,6 +158,9 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript
Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof) {
if (_current_line_of_group >= _rows_of_group) {// read next row group
VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of group:"
<< _current_line_of_group << " is larger than rows group size:"
<< _rows_of_group << ". start to read next row group";
_current_group++;
if (_current_group >= _total_groups) {// read completed.
_parquet_column_ids.clear();
@ -177,6 +180,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
}
_current_line_of_batch = 0;
} else if (_current_line_of_batch >= _batch->num_rows()) {
VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of batch:"
<< _current_line_of_batch << " is larger than batch size:"
<< _batch->num_rows() << ". start to read next batch";
arrow::Status status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");

View File

@ -29,6 +29,7 @@
#include "exec/text_converter.hpp"
#include "exec/local_file_reader.h"
#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/parquet_reader.h"
@ -119,8 +120,8 @@ Status ParquetScanner::open_next_reader() {
int64_t file_size = 0;
// for compatibility
if (range.__isset.file_size) { file_size = range.file_size; }
file_reader.reset(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
range.path, range.start_offset, file_size));
file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
range.path, range.start_offset, file_size)));
break;
}
#if 0

View File

@ -51,6 +51,7 @@ ADD_BE_TEST(broker_scanner_test)
ADD_BE_TEST(broker_scan_node_test)
ADD_BE_TEST(tablet_info_test)
ADD_BE_TEST(tablet_sink_test)
ADD_BE_TEST(buffered_reader_test)
# ADD_BE_TEST(es_scan_node_test)
ADD_BE_TEST(es_http_scan_node_test)
ADD_BE_TEST(es_predicate_test)

View File

@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "exec/local_file_reader.h"
#include "exec/buffered_reader.h"
#include "util/stopwatch.hpp"
namespace doris {
class BufferedReaderTest : public testing::Test {
public:
BufferedReaderTest() {}
protected:
virtual void SetUp() {
}
virtual void TearDown() {
}
};
TEST_F(BufferedReaderTest, normal_use) {
// buffered_reader_test_file 950 bytes
LocalFileReader file_reader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
BufferedReader reader(&file_reader, 1024);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[1024];
MonotonicStopWatch watch;
watch.start();
int64_t read_length = 0;
st = reader.readat(0, 1024, &read_length, buf);
ASSERT_TRUE(st.ok());
ASSERT_EQ(950, read_length);
LOG(INFO) << "read bytes " << read_length << " using time " << watch.elapsed_time();
}
TEST_F(BufferedReaderTest, test_validity) {
// buffered_reader_test_file.txt 45 bytes
LocalFileReader file_reader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(&file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
bool eof = false;
size_t buf_len = 10;
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str());
ASSERT_FALSE(eof);
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
}
TEST_F(BufferedReaderTest, test_seek) {
// buffered_reader_test_file.txt 45 bytes
LocalFileReader file_reader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(&file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
bool eof = false;
size_t buf_len = 10;
// Seek to the end of the file
st = reader.seek(45);
ASSERT_TRUE(st.ok());
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
// Seek to the beginning of the file
st = reader.seek(0);
ASSERT_TRUE(st.ok());
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
// Seek to a wrong position
st = reader.seek(-1);
ASSERT_TRUE(st.ok());
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
// Seek to a wrong position
st = reader.seek(-1000);
ASSERT_TRUE(st.ok());
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
ASSERT_FALSE(eof);
// Seek to a wrong position
st = reader.seek(1000);
ASSERT_TRUE(st.ok());
st = reader.read(buf, &buf_len, &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
}
TEST_F(BufferedReaderTest, test_miss) {
// buffered_reader_test_file.txt 45 bytes
LocalFileReader file_reader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(&file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[128];
int64_t bytes_read;
st = reader.readat(20, 10, &bytes_read, buf);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, (size_t)bytes_read).c_str());
ASSERT_EQ(10, bytes_read);
st = reader.readat(0, 5, &bytes_read, buf);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str());
ASSERT_EQ(5, bytes_read);
st = reader.readat(5, 10, &bytes_read, buf);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("lnprtvxzAb", std::string((char*)buf, (size_t)bytes_read).c_str());
ASSERT_EQ(10, bytes_read);
// if requested length is larger than the capacity of buffer, do not
// need to copy the character into local buffer.
st = reader.readat(0, 128, &bytes_read, buf);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
ASSERT_EQ(45, bytes_read);
}
} // end namespace doris
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,4 @@
bdfhjlnprtvxzAbCdEfGhIj
MnOpQrStUvWxYz
IjKl

View File

@ -212,6 +212,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/es_scan_reader_test
${DORIS_TEST_BINARY_DIR}/exec/es_query_builder_test
${DORIS_TEST_BINARY_DIR}/exec/tablet_info_test
${DORIS_TEST_BINARY_DIR}/exec/tablet_sink_test
${DORIS_TEST_BINARY_DIR}/exec/buffered_reader_test
# Running runtime Unittest
${DORIS_TEST_BINARY_DIR}/runtime/external_scan_context_mgr_test