Load Rowset only once in a thread-safe manner (#2022)

[Storage]
This PR implements thread-safe `Rowset::load()` for both AlphaRowset and BetaRowset. The main changes are 

1. Introduce `DorisCallOnce<ReturnType>` to be the replacement for `DorisInitOnce` . It works for both Status and OLAPStatus.
2. `segment_v2::ColumnReader::init()` is now implemented by DorisCallOnce.
3. `segment_v2::Segment` is now created by a factory open() method. This guarantees all Segment instances are in opened state.
4. `segment_v2::Segment::_load_index()` is now implemented by DorisCallOnce.
5. Implement thread-safe load() for AlphaRowset and BetaRowset
This commit is contained in:
Dayue Gao
2019-10-21 16:05:12 +08:00
committed by ZHAO Chun
parent 58c882fa2a
commit 8aa2cbe12d
14 changed files with 147 additions and 126 deletions

View File

@ -31,20 +31,7 @@ AlphaRowset::AlphaRowset(const TabletSchema* schema,
: Rowset(schema, std::move(rowset_path), data_dir, std::move(rowset_meta)) {
}
OLAPStatus AlphaRowset::init() {
if (is_inited()) {
return OLAP_SUCCESS;
}
OLAPStatus status = _init_segment_groups();
set_inited(true);
return status;
}
OLAPStatus AlphaRowset::load(bool use_cache) {
DCHECK(is_inited()) << "should init() rowset " << unique_id() << " before load()";
if (is_loaded()) {
return OLAP_SUCCESS;
}
OLAPStatus AlphaRowset::do_load_once(bool use_cache) {
for (auto& segment_group: _segment_groups) {
// validate segment group
if (segment_group->validate() != OLAP_SUCCESS) {
@ -62,17 +49,11 @@ OLAPStatus AlphaRowset::load(bool use_cache) {
return res;
}
}
set_loaded(true);
return OLAP_SUCCESS;
}
OLAPStatus AlphaRowset::create_reader(std::shared_ptr<RowsetReader>* result) {
if (!is_loaded()) {
OLAPStatus status = load();
if (status != OLAP_SUCCESS) {
return OLAP_ERR_ROWSET_CREATE_READER;
}
}
RETURN_NOT_OK(load());
result->reset(new AlphaRowsetReader(
_schema->num_rows_per_row_block(), std::static_pointer_cast<AlphaRowset>(shared_from_this())));
return OLAP_SUCCESS;
@ -290,7 +271,7 @@ bool AlphaRowset::check_path(const std::string& path) {
return valid_paths.find(path) != valid_paths.end();
}
OLAPStatus AlphaRowset::_init_segment_groups() {
OLAPStatus AlphaRowset::init() {
std::vector<SegmentGroupPB> segment_group_metas;
AlphaRowsetMetaSharedPtr _alpha_rowset_meta = std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
_alpha_rowset_meta->get_segment_groups(&segment_group_metas);
@ -369,9 +350,7 @@ std::shared_ptr<SegmentGroup> AlphaRowset::_segment_group_with_largest_size() {
}
OLAPStatus AlphaRowset::reset_sizeinfo() {
if (!is_loaded()) {
RETURN_NOT_OK(load());
}
RETURN_NOT_OK(load());
std::vector<SegmentGroupPB> segment_group_metas;
AlphaRowsetMetaSharedPtr alpha_rowset_meta = std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
alpha_rowset_meta->get_segment_groups(&segment_group_metas);

View File

@ -40,10 +40,6 @@ class AlphaRowset : public Rowset {
public:
virtual ~AlphaRowset() {}
// this api is for lazy loading data
// always means that there are some io
OLAPStatus load(bool use_cache = true) override;
OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) override;
OLAPStatus split_range(const RowCursor& start_key,
@ -79,14 +75,15 @@ protected:
DataDir* data_dir,
RowsetMetaSharedPtr rowset_meta);
// init segment groups
OLAPStatus init() override;
OLAPStatus do_load_once(bool use_cache) override ;
// add custom logic when rowset is published
void make_visible_extra(Version version, VersionHash version_hash) override;
private:
OLAPStatus _init_segment_groups();
std::shared_ptr<SegmentGroup> _segment_group_with_largest_size();
private:

View File

@ -38,42 +38,28 @@ BetaRowset::BetaRowset(const TabletSchema* schema,
}
OLAPStatus BetaRowset::init() {
if (is_inited()) {
return OLAP_SUCCESS;
}
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id);
_segments.emplace_back(new segment_v2::Segment(seg_path, seg_id, _schema));
}
set_inited(true);
return OLAP_SUCCESS;
return OLAP_SUCCESS; // no op
}
// `use_cache` is ignored because beta rowset doesn't support fd cache now
OLAPStatus BetaRowset::load(bool use_cache) {
DCHECK(is_inited()) << "should init() rowset " << unique_id() << " before load()";
if (is_loaded()) {
return OLAP_SUCCESS;
}
for (auto& seg : _segments) {
auto s = seg->open();
OLAPStatus BetaRowset::do_load_once(bool use_cache) {
// open all segments under the current rowset
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id);
std::shared_ptr<segment_v2::Segment> segment;
auto s = segment_v2::Segment::open(seg_path, seg_id, _schema, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment " << seg->id() << " under rowset " << unique_id()
LOG(WARNING) << "failed to open segment " << seg_path << " under rowset " << unique_id()
<< " : " << s.to_string();
return OLAP_ERR_ROWSET_LOAD_FAILED;
}
_segments.push_back(std::move(segment));
}
set_loaded(true);
return OLAP_SUCCESS;
}
OLAPStatus BetaRowset::create_reader(RowsetReaderSharedPtr* result) {
if (!is_loaded()) {
OLAPStatus status = load();
if (status != OLAP_SUCCESS) {
return OLAP_ERR_ROWSET_CREATE_READER;
}
}
RETURN_NOT_OK(load());
result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this())));
return OLAP_SUCCESS;
}

View File

@ -38,8 +38,6 @@ public:
static std::string segment_file_path(const std::string& segment_dir, const RowsetId& rowset_id, int segment_id);
OLAPStatus load(bool use_cache = true) override;
OLAPStatus create_reader(RowsetReaderSharedPtr* result) override;
OLAPStatus split_range(const RowCursor& start_key,
@ -70,6 +68,8 @@ protected:
OLAPStatus init() override;
OLAPStatus do_load_once(bool use_cache) override ;
private:
friend class BetaRowsetReader;
std::vector<segment_v2::SegmentSharedPtr> _segments;

View File

@ -37,6 +37,10 @@ Rowset::Rowset(const TabletSchema *schema,
}
}
OLAPStatus Rowset::load(bool use_cache) {
return _load_once.call([this, use_cache] { return do_load_once(use_cache); });
}
void Rowset::make_visible(Version version, VersionHash version_hash) {
_is_pending = false;
_rowset_meta->set_version(version);

View File

@ -22,8 +22,10 @@
#include <vector>
#include "gen_cpp/olap_file.pb.h"
#include "gutil/macros.h"
#include "olap/new_status.h"
#include "olap/rowset/rowset_meta.h"
#include "util/once.h"
namespace doris {
@ -45,7 +47,10 @@ public:
// Open all segment files in this rowset and load necessary metadata.
// - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
virtual OLAPStatus load(bool use_cache = true) = 0;
//
// May be called multiple times, subsequent calls will no-op.
// Derived class implements the load logic by overriding the `do_load_once()` method.
OLAPStatus load(bool use_cache = true);
// returns OLAP_ERR_ROWSET_CREATE_READER when failed to create reader
virtual OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) = 0;
@ -129,6 +134,7 @@ public:
protected:
friend class RowsetFactory;
DISALLOW_COPY_AND_ASSIGN(Rowset);
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
Rowset(const TabletSchema* schema,
std::string rowset_path,
@ -138,10 +144,8 @@ protected:
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
virtual OLAPStatus init() = 0;
bool is_inited() const { return _is_inited; }
void set_inited(bool inited) { _is_inited = inited; }
bool is_loaded() const { return _is_loaded; }
void set_loaded(bool loaded) { _is_loaded= loaded; }
// The actual implementation of load(). Guaranteed by to called exactly once.
virtual OLAPStatus do_load_once(bool use_cache) = 0;
// allow subclass to add custom logic when rowset is being published
virtual void make_visible_extra(Version version, VersionHash version_hash) {}
@ -154,8 +158,7 @@ protected:
bool _is_pending; // rowset is pending iff it's not in visible state
bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version
bool _is_inited = false;
bool _is_loaded = false;
DorisCallOnce<OLAPStatus> _load_once;
bool _need_delete_file = false;
};

View File

@ -85,6 +85,10 @@ ColumnReader::~ColumnReader() {
}
Status ColumnReader::init() {
return _init_once.call([this] { return _do_init_once(); });
}
Status ColumnReader::_do_init_once() {
_type_info = get_type_info((FieldType)_meta.type());
if (_type_info == nullptr) {
return Status::NotSupported(Substitute("unsupported typeinfo, type=$0", _meta.type()));

View File

@ -30,6 +30,7 @@
#include "olap/rowset/segment_v2/column_zone_map.h" // for ColumnZoneMap
#include "olap/rowset/segment_v2/row_ranges.h" // for RowRanges
#include "olap/rowset/segment_v2/page_handle.h" // for PageHandle
#include "util/once.h"
namespace doris {
@ -61,6 +62,7 @@ public:
uint64_t num_rows, RandomAccessFile* file);
~ColumnReader();
// May be called multiple times, subsequent calls will no op.
Status init();
// create a new column iterator. Client should delete returned iterator
@ -88,6 +90,8 @@ public:
PagePointer get_dict_page_pointer() const;
private:
Status _do_init_once();
Status _init_ordinal_index();
Status _init_column_zone_map();
@ -103,6 +107,7 @@ private:
uint64_t _num_rows;
RandomAccessFile* _file = nullptr;
DorisCallOnce<Status> _init_once;
const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
const BlockCompressionCodec* _compress_codec = nullptr;

View File

@ -33,13 +33,22 @@ namespace segment_v2 {
using strings::Substitute;
Status Segment::open(std::string filename,
uint32_t segment_id,
const TabletSchema* tablet_schema,
std::shared_ptr<Segment>* output) {
std::shared_ptr<Segment> segment(new Segment(std::move(filename), segment_id, tablet_schema));
RETURN_IF_ERROR(segment->_open());
output->swap(segment);
return Status::OK();
}
Segment::Segment(
std::string fname, uint32_t segment_id,
const TabletSchema* tablet_schema)
: _fname(std::move(fname)),
_segment_id(segment_id),
_tablet_schema(tablet_schema),
_index_loaded(false) {
_tablet_schema(tablet_schema) {
}
Segment::~Segment() {
@ -48,11 +57,12 @@ Segment::~Segment() {
}
}
Status Segment::open() {
Status Segment::_open() {
RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file));
// parse footer to get meta
RETURN_IF_ERROR(_parse_footer());
// initial all column reader
RETURN_IF_ERROR(_initial_column_readers());
return Status::OK();
}
@ -61,6 +71,7 @@ Status Segment::new_iterator(
const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter) {
// trying to prune the current segment by segment-level zone map
if (read_options.conditions != nullptr) {
for (auto& column_condition : read_options.conditions->columns()) {
int32_t column_id = column_condition.first;
@ -104,14 +115,7 @@ Status Segment::new_iterator(
}
}
if(!_index_loaded) {
// parse short key index
RETURN_IF_ERROR(_parse_index());
// initial all column reader
RETURN_IF_ERROR(_initial_column_readers());
_index_loaded = true;
}
RETURN_IF_ERROR(_load_index());
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
iter->get()->init(read_options);
return Status::OK();
@ -165,17 +169,18 @@ Status Segment::_parse_footer() {
return Status::OK();
}
// load and parse short key index
Status Segment::_parse_index() {
// read short key index content
_sk_index_buf.resize(_footer.short_key_index_page().size());
Slice slice(_sk_index_buf.data(), _sk_index_buf.size());
RETURN_IF_ERROR(_input_file->read_at(_footer.short_key_index_page().offset(), slice));
Status Segment::_load_index() {
return _load_index_once.call([this] {
// read short key index content
_sk_index_buf.resize(_footer.short_key_index_page().size());
Slice slice(_sk_index_buf.data(), _sk_index_buf.size());
RETURN_IF_ERROR(_input_file->read_at(_footer.short_key_index_page().offset(), slice));
// Parse short key index
_sk_index_decoder.reset(new ShortKeyIndexDecoder(_sk_index_buf));
RETURN_IF_ERROR(_sk_index_decoder->parse());
return Status::OK();
// Parse short key index
_sk_index_decoder.reset(new ShortKeyIndexDecoder(_sk_index_buf));
RETURN_IF_ERROR(_sk_index_decoder->parse());
return Status::OK();
});
}
Status Segment::_initial_column_readers() {

View File

@ -24,11 +24,13 @@
#include "common/status.h" // Status
#include "gen_cpp/segment_v2.pb.h"
#include "gutil/macros.h"
#include "olap/iterators.h"
#include "olap/rowset/segment_v2/common.h" // rowid_t
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
#include "util/faststring.h"
#include "util/once.h"
namespace doris {
@ -57,11 +59,12 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
// change finished, client should disable all cached Segment for old TabletSchema.
class Segment : public std::enable_shared_from_this<Segment> {
public:
Segment(std::string fname, uint32_t segment_id,
const TabletSchema* tablet_schema);
~Segment();
static Status open(std::string filename,
uint32_t segment_id,
const TabletSchema* tablet_schema,
std::shared_ptr<Segment>* output);
Status open();
~Segment();
Status new_iterator(
const Schema& schema,
@ -75,18 +78,30 @@ public:
private:
friend class SegmentIterator;
Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
uint32_t num_rows_per_block() const { return _sk_index_decoder->num_rows_per_block(); }
size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); }
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(std::string fname, uint32_t segment_id, const TabletSchema* tablet_schema);
// open segment file and read the minimum amount of necessary information (footer)
Status _open();
Status _parse_footer();
Status _parse_index();
Status _initial_column_readers();
Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); }
// Load and decode short key index.
// May be called multiple times, subsequent calls will no op.
Status _load_index();
uint32_t num_rows_per_block() const {
DCHECK(_load_index_once.has_called() && _load_index_once.stored_result().ok());
return _sk_index_decoder->num_rows_per_block();
}
ShortKeyIndexIterator lower_bound(const Slice& key) const {
DCHECK(_load_index_once.has_called() && _load_index_once.stored_result().ok());
return _sk_index_decoder->lower_bound(key);
}
ShortKeyIndexIterator upper_bound(const Slice& key) const {
DCHECK(_load_index_once.has_called() && _load_index_once.stored_result().ok());
return _sk_index_decoder->upper_bound(key);
}
@ -94,6 +109,7 @@ private:
// NOTE: Before call this function , client should assure that
// this segment is not empty.
uint32_t last_block() const {
DCHECK(_load_index_once.has_called() && _load_index_once.stored_result().ok());
DCHECK(num_rows() > 0);
return _sk_index_decoder->num_items() - 1;
}
@ -111,14 +127,13 @@ private:
// after this segment is generated.
std::vector<ColumnReader*> _column_readers;
// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;
// used to store short key index
faststring _sk_index_buf;
// short key index decoder
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;
bool _index_loaded;
// Map from column unique id to column ordinal in footer's ColumnMetaPB
// If we can't find unique id from it, it means this segment is created
// with an old schema.

View File

@ -125,7 +125,7 @@ OLAPStatus Tablet::_init_once_action() {
}
OLAPStatus Tablet::init() {
return _init_once.init([this] { return _init_once_action(); });
return _init_once.call([this] { return _init_once_action(); });
}
bool Tablet::is_used() {

View File

@ -264,7 +264,7 @@ private:
std::string _tablet_path;
RowsetGraph _rs_graph;
DorisInitOnce _init_once;
DorisCallOnce<OLAPStatus> _init_once;
RWMutex _meta_lock;
// meta store lock is used for prevent 2 threads do checkpoint concurrently
// it will be used in econ-mode in the future
@ -286,7 +286,7 @@ private:
};
inline bool Tablet::init_succeeded() {
return _init_once.init_succeeded();
return _init_once.has_called() && _init_once.stored_result() == OLAP_SUCCESS;
}
inline DataDir* Tablet::data_dir() const {

View File

@ -24,36 +24,59 @@
namespace doris {
// Similar to the KuduOnceDynamic class, but accepts a lambda function.
class DorisInitOnce {
// Utility class for implementing thread-safe call-once semantics.
//
// call() will return stored result regardless of whether the first invocation
// returns a success status or not.
//
// Example:
// class Resource {
// public:
// Status init() {
// _init_once.call([this] { return _do_init(); });
// }
//
// bool is_inited() const {
// return _init_once.has_called() && _init_once.stored_result().ok();
// }
// private:
// Status _do_init() { /* init logic here */ }
// DorisCallOnce<Status> _init_once;
// };
template<typename ReturnType>
class DorisCallOnce {
public:
DorisInitOnce()
: _init_succeeded(false) {}
DorisCallOnce()
: _has_called(false) {}
// If the underlying `once_flag` has yet to be invoked, invokes the provided
// lambda and stores its return value. Otherwise, returns the stored Status.
template<typename Fn>
OLAPStatus init(Fn fn) {
ReturnType call(Fn fn) {
std::call_once(_once_flag, [this, fn] {
_status = fn();
if (OLAP_SUCCESS == _status) {
_init_succeeded.store(true, std::memory_order_release);
}
_has_called.store(true, std::memory_order_release);
});
return _status;
}
// std::memory_order_acquire here and std::memory_order_release in
// init(), taken together, mean that threads can safely synchronize on
// _init_succeeded.
bool init_succeeded() const {
return _init_succeeded.load(std::memory_order_acquire);
// Return whether `call` has been invoked or not.
bool has_called() const {
// std::memory_order_acquire here and std::memory_order_release in
// init(), taken together, mean that threads can safely synchronize on
// _has_called.
return _has_called.load(std::memory_order_acquire);
}
// Return the stored result. The result is only meaningful when `has_called() == true`.
ReturnType stored_result() const {
return _status;
}
private:
std::atomic<bool> _init_succeeded;
std::atomic<bool> _has_called;
std::once_flag _once_flag;
OLAPStatus _status;
ReturnType _status;
};
} // namespace doris

View File

@ -92,8 +92,8 @@ TEST_F(SegmentReaderWriterTest, normal) {
ASSERT_TRUE(st.ok());
// reader
{
std::shared_ptr<Segment> segment(new Segment(fname, 0, tablet_schema.get()));
st = segment->open();
std::shared_ptr<Segment> segment;
st = Segment::open(fname, 0, tablet_schema.get(), &segment);
LOG(INFO) << "segment open, msg=" << st.to_string();
ASSERT_TRUE(st.ok());
ASSERT_EQ(4096, segment->num_rows());
@ -277,8 +277,8 @@ TEST_F(SegmentReaderWriterTest, TestZoneMap) {
// reader with condition
{
std::shared_ptr<Segment> segment(new Segment(fname, 0, tablet_schema.get()));
st = segment->open();
std::shared_ptr<Segment> segment;
st = Segment::open(fname, 0, tablet_schema.get(), &segment);
ASSERT_TRUE(st.ok());
ASSERT_EQ(64 * 1024, segment->num_rows());
Schema schema(*tablet_schema);
@ -541,8 +541,8 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
new_tablet_schema_1->_cols.push_back(
create_int_value(5, OLAP_FIELD_AGGREGATION_SUM, true, "NULL"));
std::shared_ptr<Segment> segment(new Segment(fname, 0, new_tablet_schema_1.get()));
st = segment->open();
std::shared_ptr<Segment> segment;
st = Segment::open(fname, 0, new_tablet_schema_1.get(), &segment);
ASSERT_TRUE(st.ok());
ASSERT_EQ(4096, segment->num_rows());
Schema schema(*new_tablet_schema_1);
@ -596,8 +596,8 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
new_tablet_schema_1->_cols.push_back(create_int_value(4));
new_tablet_schema_1->_cols.push_back(create_int_value(5, OLAP_FIELD_AGGREGATION_SUM, true, "10086"));
std::shared_ptr<Segment> segment(new Segment(fname, 0, new_tablet_schema_1.get()));
st = segment->open();
std::shared_ptr<Segment> segment;
st = Segment::open(fname, 0, new_tablet_schema_1.get(), &segment);
ASSERT_TRUE(st.ok());
ASSERT_EQ(4096, segment->num_rows());
Schema schema(*new_tablet_schema_1);
@ -691,8 +691,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
ASSERT_TRUE(st.ok());
{
std::shared_ptr<Segment> segment(new Segment(fname, 0, tablet_schema.get()));
st = segment->open();
std::shared_ptr<Segment> segment;
st = Segment::open(fname, 0, tablet_schema.get(), &segment);
ASSERT_TRUE(st.ok());
ASSERT_EQ(4096, segment->num_rows());
Schema schema(*tablet_schema);