diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 30f5675af2..9804fdc8d0 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -17,14 +17,13 @@ #include "olap/memtable.h" -#include "common/object_pool.h" +#include "common/logging.h" #include "olap/rowset/column_data_writer.h" #include "olap/rowset/rowset_writer.h" #include "olap/row_cursor.h" #include "olap/row.h" #include "olap/schema.h" #include "runtime/tuple.h" -#include "util/runtime_profile.h" #include "util/debug_util.h" namespace doris { @@ -43,9 +42,9 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _schema_size = _schema->schema_size(); _mem_tracker.reset(new MemTracker(-1, "memtable", mem_tracker)); - _mem_pool.reset(new MemPool(_mem_tracker.get())); - _tuple_buf = _mem_pool->allocate(_schema_size); - _skip_list = new Table(_row_comparator, _mem_pool.get()); + _buffer_mem_pool.reset(new MemPool(_mem_tracker.get())); + _table_mem_pool.reset(new MemPool(_mem_tracker.get())); + _skip_list = new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); } MemTable::~MemTable() { @@ -61,27 +60,56 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ return compare_row(lhs_row, rhs_row); } -size_t MemTable::memory_usage() { - return _mem_pool->mem_tracker()->consumption(); +void MemTable::insert(const Tuple* tuple) { + bool overwritten = false; + uint8_t* _tuple_buf = nullptr; + if (_keys_type == KeysType::DUP_KEYS) { + // Will insert directly, so use memory from _table_mem_pool + _tuple_buf = _table_mem_pool->allocate(_schema_size); + ContiguousRow row(_schema, _tuple_buf); + _tuple_to_row(tuple, &row, _table_mem_pool.get()); + _skip_list->Insert((TableKey)_tuple_buf, &overwritten); + DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; + return; + } + + // For non-DUP models, for the data rows passed from the upper layer, when copying the data, + // we first allocate from _buffer_mem_pool, and then check whether it already exists in + // _skiplist. If it exists, we aggregate the new row into the row in skiplist. + // otherwise, we need to copy it into _table_mem_pool before we can insert it. + _tuple_buf = _buffer_mem_pool->allocate(_schema_size); + ContiguousRow src_row(_schema, _tuple_buf); + _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get()); + + bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint); + if (is_exist) { + _aggregate_two_row(src_row, _hint.curr->key); + } else { + _tuple_buf = _table_mem_pool->allocate(_schema_size); + ContiguousRow dst_row(_schema, _tuple_buf); + copy_row(&dst_row, src_row, _table_mem_pool.get()); + _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint); + } + + // Make MemPool to be reusable, but does not free its memory + _buffer_mem_pool->clear(); } -void MemTable::insert(Tuple* tuple) { - ContiguousRow row(_schema, _tuple_buf); - +void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) { for (size_t i = 0; i < _slot_descs->size(); ++i) { - auto cell = row.cell(i); + auto cell = row->cell(i); const SlotDescriptor* slot = (*_slot_descs)[i]; bool is_null = tuple->is_null(slot->null_indicator_offset()); - void* value = tuple->get_slot(slot->tuple_offset()); - _schema->column(i)->consume(&cell, (const char *)value, is_null, _mem_pool.get(), &_agg_object_pool); + const void* value = tuple->get_slot(slot->tuple_offset()); + _schema->column(i)->consume( + &cell, (const char*)value, is_null, mem_pool, &_agg_object_pool); } +} - bool overwritten = false; - _skip_list->Insert((char*)_tuple_buf, &overwritten, _keys_type); - if (!overwritten) { - _tuple_buf = _mem_pool->allocate(_schema_size); - } +void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { + ContiguousRow dst_row(_schema, row_in_skiplist); + agg_update_row(&dst_row, src_row, _table_mem_pool.get()); } OLAPStatus MemTable::flush() { @@ -92,7 +120,7 @@ OLAPStatus MemTable::flush() { for (it.SeekToFirst(); it.Valid(); it.Next()) { char* row = (char*)it.key(); ContiguousRow dst_row(_schema, row); - agg_finalize_row(&dst_row, _mem_pool.get()); + agg_finalize_row(&dst_row, _table_mem_pool.get()); RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); } RETURN_NOT_OK(_rowset_writer->flush()); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index b7e6933c76..3a8a6a68dd 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -18,14 +18,20 @@ #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H #define DORIS_BE_SRC_OLAP_MEMTABLE_H +#include "common/object_pool.h" #include "olap/skiplist.h" +#include "olap/olap_define.h" +#include "runtime/mem_tracker.h" namespace doris { +class ContiguousRow; class RowsetWriter; -class ObjectPool; class Schema; +class SlotDescriptor; +class TabletSchema; class Tuple; +class TupleDescriptor; class MemTable { public: @@ -33,13 +39,30 @@ public: const std::vector* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker); ~MemTable(); + int64_t tablet_id() { return _tablet_id; } - size_t memory_usage(); - void insert(Tuple* tuple); + size_t memory_usage() { + return _mem_tracker->consumption(); + } + void insert(const Tuple* tuple); OLAPStatus flush(); OLAPStatus close(); private: + class RowCursorComparator { + public: + RowCursorComparator(const Schema* schema); + int operator()(const char* left, const char* right) const; + + private: + const Schema* _schema; + }; + typedef SkipList Table; + typedef Table::key_type TableKey; + + void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool); + void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist); + int64_t _tablet_id; Schema* _schema; const TabletSchema* _tablet_schema; @@ -48,21 +71,20 @@ private: const std::vector* _slot_descs; KeysType _keys_type; - struct RowCursorComparator { - const Schema* _schema; - RowCursorComparator(const Schema* schema); - int operator()(const char* left, const char* right) const; - }; - RowCursorComparator _row_comparator; std::unique_ptr _mem_tracker; - std::unique_ptr _mem_pool; + // This is a buffer, to hold the memory referenced by the rows that have not + // been inserted into the SkipList + std::unique_ptr _buffer_mem_pool; + // Only the rows will be inserted into SkipList can allocate memory from _table_mem_pool. + // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually + // reduce the number of segment files that are generated by current load + std::unique_ptr _table_mem_pool; ObjectPool _agg_object_pool; - typedef SkipList Table; - u_int8_t* _tuple_buf; size_t _schema_size; Table* _skip_list; + Table::Hint _hint; RowsetWriter* _rowset_writer; diff --git a/be/src/olap/skiplist.h b/be/src/olap/skiplist.h index 9f1a5d0f87..a019032f38 100644 --- a/be/src/olap/skiplist.h +++ b/be/src/olap/skiplist.h @@ -33,7 +33,6 @@ #include "gen_cpp/olap_file.pb.h" #include "runtime/mem_pool.h" #include "util/random.h" -#include "olap/row.h" namespace doris { @@ -41,20 +40,45 @@ template class SkipList { private: struct Node; + enum { kMaxHeight = 12 }; public: + typedef Key key_type; + // One Hint object is to show position info of one row. + // It is used in the following scenarios: + // // 1. check for existence + // bool is_exist = skiplist->Find(key, &hint); + // // 2. Do something separately based on the value of is_exist + // if (is_exist) { + //     do_something1 (); + // } else { + //     do_something2 (); + // skiplist->InsertWithHint(key, is_exist, hint); + // } + // + // Note: The user should guarantee that there must not be any other insertion + // between calling Find() and InsertWithHint(). + struct Hint { + Node* curr; + Node* prev[kMaxHeight]; + }; + // Create a new SkipList object that will use "cmp" for comparing keys, - // and will allocate memory using "*mem_pool". Objects allocated in the mem_pool - // must remain allocated for the lifetime of the skiplist object. - explicit SkipList(Comparator cmp, MemPool* mem_pool); + // and will allocate memory using "*mem_pool". + // NOTE: Objects allocated in the mem_pool must remain allocated for + // the lifetime of the skiplist object. + explicit SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup); // Insert key into the list. - // REQUIRES: nothing that compares equal to key is currently in the list. - void Insert(const Key& key, bool* overwritten, KeysType keys_type); - void Aggregate(const Key& k1, const Key& k2); + void Insert(const Key& key, bool* overwritten); + // Use hint to insert a key. the hint is from previous Find() + void InsertWithHint(const Key& key, bool is_exist, Hint* hint); // Returns true iff an entry that compares equal to key is in the list. bool Contains(const Key& key) const; + // Like Contains(), but it will return the position info as a hint. We can use this + // position info to insert directly using InsertWithHint(). + bool Find(const Key& key, Hint* hint) const; // Iteration over the contents of a skip list class Iterator { @@ -96,10 +120,10 @@ public: }; private: - enum { kMaxHeight = 12 }; - // Immutable after construction Comparator const compare_; + // When value is true, means indicates that duplicate values are allowed. + bool _can_dup; MemPool* const _mem_pool; // MemPool used for allocations of nodes Node* const head_; @@ -322,8 +346,9 @@ SkipList::FindLast() const { } template -SkipList::SkipList(Comparator cmp, MemPool* mem_pool) - : compare_(cmp), +SkipList::SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup) : + compare_(cmp), + _can_dup(can_dup), _mem_pool(mem_pool), head_(NewNode(0 /* any key will do */, kMaxHeight)), max_height_(1), @@ -334,22 +359,15 @@ SkipList::SkipList(Comparator cmp, MemPool* mem_pool) } template -void SkipList::Aggregate(const Key& k1, const Key& k2) { - ContiguousRow dst_row(compare_._schema, k1); - ContiguousRow src_row(compare_._schema, k2); - agg_update_row(&dst_row, src_row, _mem_pool); -} - -template -void SkipList::Insert(const Key& key, bool* overwritten, KeysType keys_type) { +void SkipList::Insert(const Key& key, bool* overwritten) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* prev[kMaxHeight]; Node* x = FindGreaterOrEqual(key, prev); #ifndef BE_TEST - if (x != nullptr && keys_type != KeysType::DUP_KEYS && Equal(key, x->key)) { - Aggregate(x->key, key); + // The key already exists and duplicate keys are not allowed, so we need to aggreage them + if (!_can_dup && x != nullptr && Equal(key, x->key)) { *overwritten = true; return; } @@ -383,6 +401,47 @@ void SkipList::Insert(const Key& key, bool* overwritten, KeysTyp } } +// NOTE: Already be checked, the row is exist. +template +void SkipList::InsertWithHint(const Key& key, bool is_exist, Hint* hint) { + Node* x = hint->curr; + DCHECK(!is_exist || x) << "curr pointer must not be null if row exists"; + +#ifndef BE_TEST + // The key already exists and duplicate keys are not allowed, so we need to aggreage them + if (!_can_dup && is_exist) { + return; + } +#endif + + Node** prev = hint->prev; + // Our data structure does not allow duplicate insertion + int height = RandomHeight(); + if (height > GetMaxHeight()) { + for (int i = GetMaxHeight(); i < height; i++) { + prev[i] = head_; + } + //fprintf(stderr, "Change height from %d to %d\n", max_height_, height); + + // It is ok to mutate max_height_ without any synchronization + // with concurrent readers. A concurrent reader that observes + // the new value of max_height_ will see either the old value of + // new level pointers from head_ (NULL), or a new value set in + // the loop below. In the former case the reader will + // immediately drop to the next level since NULL sorts after all + // keys. In the latter case the reader will use the new node. + max_height_.store(height, std::memory_order_relaxed); + } + + x = NewNode(key, height); + for (int i = 0; i < height; i++) { + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); + prev[i]->SetNext(i, x); + } +} + template bool SkipList::Contains(const Key& key) const { Node* x = FindGreaterOrEqual(key, NULL); @@ -393,6 +452,17 @@ bool SkipList::Contains(const Key& key) const { } } +template +bool SkipList::Find(const Key& key, Hint* hint) const { + Node* x = FindGreaterOrEqual(key, hint->prev); + hint->curr = x; + if (x != NULL && Equal(key, x->key)) { + return true; + } else { + return false; + } +} + } // namespace doris #endif // DORIS_BE_SRC_OLAP_SKIPLIST_H diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp index 5b7373451f..8a95dbefd8 100644 --- a/be/test/olap/skiplist_test.cpp +++ b/be/test/olap/skiplist_test.cpp @@ -53,7 +53,7 @@ TEST_F(SkipTest, Empty) { std::unique_ptr mem_pool(new MemPool(tracker.get())); TestComparator cmp; - SkipList list(cmp, mem_pool.get()); + SkipList list(cmp, mem_pool.get(), false); ASSERT_TRUE(!list.Contains(10)); SkipList::Iterator iter(&list); @@ -75,12 +75,12 @@ TEST_F(SkipTest, InsertAndLookup) { Random rnd(1000); std::set keys; TestComparator cmp; - SkipList list(cmp, mem_pool.get()); + SkipList list(cmp, mem_pool.get(), false); for (int i = 0; i < N; i++) { Key key = rnd.Next() % R; if (keys.insert(key).second) { bool overwritten = false; - list.Insert(key, &overwritten, KeysType::AGG_KEYS); + list.Insert(key, &overwritten); } } @@ -147,6 +147,38 @@ TEST_F(SkipTest, InsertAndLookup) { } } +// Only non-DUP model will use Find() and InsertWithHint(). +TEST_F(SkipTest, InsertWithHintNoneDupModel) { + std::unique_ptr tracker(new MemTracker(-1)); + std::unique_ptr mem_pool(new MemPool(tracker.get())); + + const int N = 2000; + const int R = 5000; + Random rnd(1000); + std::set keys; + TestComparator cmp; + SkipList list(cmp, mem_pool.get(), false); + SkipList::Hint hint; + for (int i = 0; i < N; i++) { + Key key = rnd.Next() % R; + bool is_exist = list.Find(key, &hint); + if (keys.insert(key).second) { + ASSERT_FALSE(is_exist); + list.InsertWithHint(key, is_exist, &hint); + } else { + ASSERT_TRUE(is_exist); + } + } + + for (int i = 0; i < R; i++) { + if (list.Contains(i)) { + ASSERT_EQ(keys.count(i), 1); + } else { + ASSERT_EQ(keys.count(i), 0); + } + } +} + // We want to make sure that with a single writer and multiple // concurrent readers (with no synchronization other than when a // reader's iterator is created), the reader always observes all the @@ -238,7 +270,7 @@ public: ConcurrentTest(): _mem_tracker(new MemTracker(-1)), _mem_pool(new MemPool(_mem_tracker.get())), - _list(TestComparator(), _mem_pool.get()){} + _list(TestComparator(), _mem_pool.get(), false) { } // REQUIRES: External synchronization void write_step(Random* rnd) { @@ -246,7 +278,7 @@ public: const int g = _current.get(k) + 1; const Key new_key = make_key(k, g); bool overwritten = false; - _list.Insert(new_key, &overwritten, KeysType::AGG_KEYS); + _list.Insert(new_key, &overwritten); _current.set(k, g); }