From 324f1b8f519b11f8a1393b98f95ab1ad14039fdc Mon Sep 17 00:00:00 2001 From: LingBin Date: Thu, 28 Nov 2019 04:48:52 -0600 Subject: [PATCH] Unify the type of path_hash to `size_t` (#2324) The type of path hash should be `size_t`(i.e. `uint32_t`), but the current code mixes `int64_t`, ` int32_t` and `size_t` --- be/src/olap/data_dir.cpp | 12 ++--- be/src/olap/data_dir.h | 4 +- be/src/olap/delta_writer.cpp | 22 ++++----- be/src/olap/delta_writer.h | 2 +- be/src/olap/memtable_flush_executor.cpp | 20 ++++----- be/src/olap/memtable_flush_executor.h | 45 ++++++++++--------- be/src/olap/olap_common.h | 8 ++-- be/src/util/string_util.cpp | 4 +- be/src/util/string_util.h | 2 +- be/test/olap/memtable_flush_executor_test.cpp | 2 +- 10 files changed, 58 insertions(+), 63 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index e851f27a86..3d210f92f6 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -918,17 +918,17 @@ void DataDir::perform_path_scan() { Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. path=[" + data_path + LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string() << "]"; return ; } - + for (const auto& shard : shards) { std::string shard_path = data_path + "/" + shard; std::set tablet_ids; ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << shard_path + LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error[" << ret.to_string() << "]"; continue; } @@ -947,11 +947,11 @@ void DataDir::perform_path_scan() { std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash; _all_check_paths.insert(tablet_schema_hash_path); std::set rowset_files; - + ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr, &rowset_files, - Env::Default()); + Env::Default()); if (!ret.ok()) { - LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path + LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path << "] error[" << ret.to_string() << "]"; continue; } diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index e7873d8ab1..e672b0f439 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -45,7 +45,7 @@ public: Status init(); const std::string& path() const { return _path; } - const size_t path_hash() const { return _path_hash; } + size_t path_hash() const { return _path_hash; } bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } int32_t cluster_id() const { return _cluster_id; } @@ -151,7 +151,7 @@ private: private: std::string _path; - int64_t _path_hash; + size_t _path_hash; // user specified capacity int64_t _capacity_bytes; // the actual avaiable capacity of the disk of this data dir diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 57179b72c1..8df038b0c4 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -33,17 +33,11 @@ OLAPStatus DeltaWriter::open(WriteRequest* req, MemTracker* mem_tracker, DeltaWr return OLAP_SUCCESS; } -DeltaWriter::DeltaWriter( - WriteRequest* req, - MemTracker* mem_tracker, - StorageEngine* storage_engine) - : _req(*req), _tablet(nullptr), - _cur_rowset(nullptr), _new_rowset(nullptr), _new_tablet(nullptr), - _rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr), - _delta_written_success(false), - _storage_engine(storage_engine) { - - _mem_tracker.reset(new MemTracker(-1, "delta writer", mem_tracker)); +DeltaWriter::DeltaWriter(WriteRequest* req, MemTracker* parent, StorageEngine* storage_engine) : + _req(*req), _tablet(nullptr), _cur_rowset(nullptr), _new_rowset(nullptr), + _new_tablet(nullptr), _rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr), + _delta_written_success(false), _storage_engine(storage_engine) { + _mem_tracker.reset(new MemTracker(-1, "delta writer", parent)); } DeltaWriter::~DeltaWriter() { @@ -75,7 +69,7 @@ void DeltaWriter::_garbage_collection() { rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _tablet, _req.txn_id); } // has to check rollback status, because the rowset maybe committed in this thread and - // published in another thread, then rollback will failed + // published in another thread, then rollback will failed. // when rollback failed should not delete rowset if (rollback_status == OLAP_SUCCESS) { _storage_engine->add_unused_rowset(_cur_rowset); @@ -183,7 +177,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id); _flush_memtable_async(); _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get())); + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get())); } else { DCHECK(mem_consumption() > _mem_table->memory_usage()); // this means there should be at least one memtable in flush queue. @@ -218,7 +212,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrFieldtxn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, + OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, _req.load_id, _cur_rowset, false); if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { LOG(WARNING) << "commit txn: " << _req.txn_id diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index f9349a2f6e..46726a071b 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -58,7 +58,7 @@ class DeltaWriter { public: static OLAPStatus open(WriteRequest* req, MemTracker* mem_tracker, DeltaWriter** writer); - DeltaWriter(WriteRequest* req, MemTracker* mem_tracker, StorageEngine* storage_engine); + DeltaWriter(WriteRequest* req, MemTracker* parent, StorageEngine* storage_engine); OLAPStatus init(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 728ebf0acc..f98d3ee07b 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -30,9 +30,9 @@ OLAPStatus FlushHandler::submit(std::shared_ptr memtable) { MemTableFlushContext ctx; ctx.memtable = memtable; ctx.flush_handler = this->shared_from_this(); - _counter_cond.inc(); + _counter_cond.inc(); _flush_executor->_push_memtable(_flush_queue_idx, ctx); - return OLAP_SUCCESS; + return OLAP_SUCCESS; } OLAPStatus FlushHandler::wait() { @@ -55,8 +55,8 @@ void FlushHandler::on_flush_finished(const FlushResult& res) { } OLAPStatus MemTableFlushExecutor::create_flush_handler( - int64_t path_hash, std::shared_ptr* flush_handler) { - int32_t flush_queue_idx = _get_queue_idx(path_hash); + size_t path_hash, std::shared_ptr* flush_handler) { + size_t flush_queue_idx = _get_queue_idx(path_hash); flush_handler->reset(new FlushHandler(flush_queue_idx, this)); return OLAP_SUCCESS; } @@ -99,7 +99,7 @@ MemTableFlushExecutor::~MemTableFlushExecutor() { _flush_pool->shutdown(); _flush_pool->join(); - // delete queue + // delete queue for (auto queue : _flush_queues) { delete queue; } @@ -108,11 +108,11 @@ MemTableFlushExecutor::~MemTableFlushExecutor() { delete _flush_pool; } -int32_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) { +size_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) { std::lock_guard l(_lock); - int32_t cur_idx = _path_map[path_hash]; - int32_t group = cur_idx / _thread_num_per_store; - int32_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store); + size_t cur_idx = _path_map[path_hash]; + size_t group = cur_idx / _thread_num_per_store; + size_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store); DCHECK(next_idx < _num_threads); _path_map[path_hash] = next_idx; return cur_idx; @@ -123,7 +123,7 @@ void MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushConte } void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { - while(true) { + while (true) { MemTableFlushContext ctx; if (!_flush_queues[queue_idx]->blocking_get(&ctx)) { // queue is empty and shutdown, end of thread diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 421040f113..76f91fef2d 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -25,12 +25,11 @@ #include #include -#include "olap/olap_define.h" -#include "runtime/mem_tracker.h" #include "util/blocking_queue.hpp" #include "util/counter_cond_variable.hpp" #include "util/spinlock.h" #include "util/thread_pool.hpp" +#include "olap/olap_define.h" namespace doris { @@ -64,13 +63,14 @@ struct FlushStatistic { std::atomic flush_time_ns = {0}; std::atomic flush_count= {0}; }; + std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); +class MemTableFlushExecutor; + // flush handler is for flushing memtables in a delta writer // This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception // when calling submit(); -class MemTableFlushExecutor; -class MemTracker; class FlushHandler : public std::enable_shared_from_this { public: FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor): @@ -81,9 +81,9 @@ public: _is_cancelled(false) { } - // submit a memtable to flush. return error if some previous submitted MemTable has failed + // submit a memtable to flush. return error if some previous submitted MemTable has failed OLAPStatus submit(std::shared_ptr memtable); - // wait for all submitted memtable finished. + // wait for all memtables submitted by itself to be finished. OLAPStatus wait(); // get flush operations' statistics const FlushStatistic& get_stats() const { return _stats; } @@ -98,6 +98,7 @@ public: bool is_cancelled() { return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load(); } void cancel() { _is_cancelled.store(true); } + private: // flush queue idx in memtable flush executor int32_t _flush_queue_idx; @@ -114,53 +115,53 @@ private: std::atomic _is_cancelled; }; -// MemTableFlushExecutor is for flushing memtables to disk. -// Each data directory has a specified number of worker threads and a corresponding number of flush queues. -// Each worker thread only takes memtable from the corresponding flush queue and writes it to disk. -// User SHOULD NOT call method of this class directly, use pattern should be: +// MemTableFlushExecutor is responsible for flushing memtables to disk. +// Each data directory has a specified number of worker threads and each thread will correspond +// to a queue. The only job of each worker thread is to take memtable from its corresponding +// flush queue and writes the data to disk. // +// NOTE: User SHOULD NOT call method of this class directly, use pattern should be: // ... // std::shared_ptr flush_handler; -// memTableFlushExecutor.create_flush_handler(path_hash, mem_tracker, &flush_handler); -// ... +// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler); +// ... // flush_handler->submit(memtable) // ... class MemTableFlushExecutor { public: MemTableFlushExecutor() {} + ~MemTableFlushExecutor(); + // init should be called after storage engine is opened, // because it needs path hash of each data dir. void init(const std::vector& data_dirs); - ~MemTableFlushExecutor(); - // create a flush handler to access the flush executor - OLAPStatus create_flush_handler(int64_t path_hash, std::shared_ptr* flush_handler); + OLAPStatus create_flush_handler(size_t path_hash, std::shared_ptr* flush_handler); private: + friend class FlushHandler; + // given the path hash, return the next idx of flush queue. // eg. // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called, // 0 and 1 will returned alternately. - int32_t _get_queue_idx(size_t path_hash); + size_t _get_queue_idx(size_t path_hash); // push the memtable to specified flush queue void _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx); void _flush_memtable(int32_t queue_idx); -private: - friend class FlushHandler; - int32_t _thread_num_per_store; int32_t _num_threads; ThreadPool* _flush_pool; - // the size of this vector should equals to _num_threads + // the size of this vector should equal to _num_threads std::vector*> _flush_queues; - // lock to protect path_map + // lock to protect _path_map SpinLock _lock; // path hash -> queue idx of _flush_queues; - std::unordered_map _path_map; + std::unordered_map _path_map; }; } // end namespace diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index ed99d23072..5bd1385682 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -62,7 +62,7 @@ struct DataDirInfo { is_used(false) { } std::string path; - int64_t path_hash; + size_t path_hash; int64_t capacity; // 总空间,单位字节 int64_t available; // 可用空间,单位字节 int64_t data_used_capacity; @@ -73,7 +73,7 @@ struct DataDirInfo { struct TabletInfo { TabletInfo( TTabletId in_tablet_id, - TSchemaHash in_schema_hash, + TSchemaHash in_schema_hash, UniqueId in_uid) : tablet_id(in_tablet_id), schema_hash(in_schema_hash), @@ -185,7 +185,7 @@ enum ReaderType { //using Version = std::pair; struct Version { - int64_t first; + int64_t first; int64_t second; Version(int64_t first_, int64_t second_) : first(first_), second(second_) {} @@ -261,7 +261,7 @@ typedef std::set UniqueIdSet; // Column unique Id -> column id map typedef std::map UniqueIdToColumnIdMap; -// 8 bit rowset id version +// 8 bit rowset id version // 56 bit, inc number from 0 // 128 bit backend uid, it is a uuid bit, id version struct RowsetId { diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp index b61dbcea7a..76e5b45d87 100644 --- a/be/src/util/string_util.cpp +++ b/be/src/util/string_util.cpp @@ -21,8 +21,8 @@ namespace doris { -std::size_t hash_of_path(const std::string& identifier, const std::string& path) { - std::size_t hash = std::hash()(identifier); +size_t hash_of_path(const std::string& identifier, const std::string& path) { + size_t hash = std::hash()(identifier); std::vector path_parts = strings::Split(path, "/", strings::SkipWhitespace()); for (auto& part : path_parts) { boost::hash_combine(hash, part); diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h index ccd12a3d87..712501ad92 100644 --- a/be/src/util/string_util.h +++ b/be/src/util/string_util.h @@ -60,7 +60,7 @@ public: } }; -std::size_t hash_of_path(const std::string& identifier, const std::string& path); +size_t hash_of_path(const std::string& identifier, const std::string& path); using StringCaseSet = std::set; using StringCaseUnorderedSet = std::unordered_set; diff --git a/be/test/olap/memtable_flush_executor_test.cpp b/be/test/olap/memtable_flush_executor_test.cpp index d028fe636e..b7408de615 100644 --- a/be/test/olap/memtable_flush_executor_test.cpp +++ b/be/test/olap/memtable_flush_executor_test.cpp @@ -95,7 +95,7 @@ public: TEST_F(TestMemTableFlushExecutor, create_flush_handler) { std::vector data_dir = k_engine->get_stores(); - int64_t path_hash = data_dir[0]->path_hash(); + size_t path_hash = data_dir[0]->path_hash(); std::shared_ptr flush_handler; k_flush_executor->create_flush_handler(path_hash, &flush_handler);