From c643cbd30c9d28e1f3ac7c2fb68e8bd67d9d70f8 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 25 Sep 2019 13:49:32 +0800 Subject: [PATCH] Optimize the load performance for large file (#1798) The current load process is: Tablet Sink -> Tablet Channel Mgr -> Tablets Channel -> Delta Writer -> MemTable -> Flush to disk In the path of Tablets Channel -> DeltaWriter -> MemTable -> Flush to disk, the following operations are performed: Insert tuple into different memtables according to tablet ID When the memtable size reaches the threshold, it is written to disk. The above operations are equivalent to single thread execution for a single load task. In fact, the insertion of memtable and the flush of memtable can be executed synchronously. Perform these operation in single thread prevents the insertion of memtable from being delayed due to slow disk writing. In the new implementation, I added a MemTableFlushExecutor class with a set of flush queues and corresponding worker threads. By default, each data directory uses two worker threads for flush, which can be modified by the parameter flush_thread_num_per_store of BE. DeltaWriter will push the full memtable to MemTableFlushExecutor for flush operation and generate a new memtable for receiving new data. This design can improve the performance of load large files. In single host testing, the time to load a 1GB text file is reduced from 48 seconds to 29 seconds. --- be/src/common/config.h | 2 + be/src/olap/CMakeLists.txt | 1 + be/src/olap/delta_writer.cpp | 99 +++++--- be/src/olap/delta_writer.h | 25 +- be/src/olap/memtable.cpp | 21 +- be/src/olap/memtable.h | 17 +- be/src/olap/memtable_flush_executor.cpp | 154 ++++++++++++ be/src/olap/memtable_flush_executor.h | 156 ++++++++++++ be/src/olap/storage_engine.cpp | 7 + be/src/olap/storage_engine.h | 7 +- be/src/runtime/CMakeLists.txt | 163 ++++++------- be/src/runtime/tablet_writer_mgr.cpp | 228 +----------------- be/src/runtime/tablet_writer_mgr.h | 26 +- be/src/runtime/tablets_channel.cpp | 216 +++++++++++++++++ be/src/runtime/tablets_channel.h | 120 +++++++++ be/src/util/counter_cond_variable.hpp | 92 +++++++ be/src/util/semaphore.hpp | 6 +- be/test/olap/CMakeLists.txt | 1 + be/test/olap/delta_writer_test.cpp | 12 +- be/test/olap/memtable_flush_executor_test.cpp | 138 +++++++++++ .../external_scan_context_mgr_test.cpp | 4 +- be/test/runtime/tablet_writer_mgr_test.cpp | 15 +- be/test/runtime/user_function_cache_test.cpp | 16 +- be/test/util/CMakeLists.txt | 1 + be/test/util/counter_cond_variable_test.cpp | 86 +++++++ .../transaction/GlobalTransactionMgr.java | 2 +- run-ut.sh | 2 + 27 files changed, 1204 insertions(+), 413 deletions(-) create mode 100644 be/src/olap/memtable_flush_executor.cpp create mode 100644 be/src/olap/memtable_flush_executor.h create mode 100644 be/src/runtime/tablets_channel.cpp create mode 100644 be/src/runtime/tablets_channel.h create mode 100644 be/src/util/counter_cond_variable.hpp create mode 100644 be/test/olap/memtable_flush_executor_test.cpp create mode 100644 be/test/util/counter_cond_variable_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index b386d213b3..b37e788f65 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -465,6 +465,8 @@ namespace config { CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95% // The min bytes that should be left of a data dir CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB + // number of thread for flushing memtable per store + CONF_Int32(flush_thread_num_per_store, "2"); } // namespace config } // namespace doris diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 2ae5cc895a..a6ae95e68d 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -45,6 +45,7 @@ add_library(Olap STATIC key_coder.cpp lru_cache.cpp memtable.cpp + memtable_flush_executor.cpp merger.cpp new_status.cpp null_predicate.cpp diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 6499981607..0fea80a05d 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,33 +17,38 @@ #include "olap/delta_writer.h" -#include "olap/schema.h" -#include "olap/memtable.h" #include "olap/data_dir.h" +#include "olap/memtable.h" +#include "olap/memtable_flush_executor.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/rowset/rowset_id_generator.h" +#include "olap/schema.h" +#include "olap/storage_engine.h" namespace doris { OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { - *writer = new DeltaWriter(req); - return OLAP_SUCCESS; + *writer = new DeltaWriter(req, StorageEngine::instance()); + return (*writer)->init(); } -DeltaWriter::DeltaWriter(WriteRequest* req) +DeltaWriter::DeltaWriter( + WriteRequest* req, + StorageEngine* storage_engine) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), _new_rowset(nullptr), _new_tablet(nullptr), - _rowset_writer(nullptr), _mem_table(nullptr), - _schema(nullptr), _tablet_schema(nullptr), - _delta_written_success(false) {} + _rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr), + _delta_written_success(false), + _storage_engine(storage_engine) { +} DeltaWriter::~DeltaWriter() { if (!_delta_written_success) { _garbage_collection(); } - SAFE_DELETE(_mem_table); + _mem_table.reset(); SAFE_DELETE(_schema); if (_rowset_writer != nullptr) { _rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string()); @@ -53,26 +58,26 @@ DeltaWriter::~DeltaWriter() { void DeltaWriter::_garbage_collection() { OLAPStatus rollback_status = OLAP_SUCCESS; if (_tablet != nullptr) { - rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id, + rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid()); } // has to check rollback status, because the rowset maybe committed in this thread and // published in another thread, then rollback will failed // when rollback failed should not delete rowset if (rollback_status == OLAP_SUCCESS) { - StorageEngine::instance()->add_unused_rowset(_cur_rowset); + _storage_engine->add_unused_rowset(_cur_rowset); } if (_new_tablet != nullptr) { - rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id, + rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id, _new_tablet->tablet_id(), _new_tablet->schema_hash(), _new_tablet->tablet_uid()); if (rollback_status == OLAP_SUCCESS) { - StorageEngine::instance()->add_unused_rowset(_new_rowset); + _storage_engine->add_unused_rowset(_new_rowset); } } } OLAPStatus DeltaWriter::init() { - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash); + _tablet = _storage_engine->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash); if (_tablet == nullptr) { LOG(WARNING) << "tablet_id: " << _req.tablet_id << ", " << "schema_hash: " << _req.schema_hash << " not found"; @@ -85,7 +90,7 @@ OLAPStatus DeltaWriter::init() { return OLAP_ERR_RWLOCK_ERROR; } MutexLock push_lock(_tablet->get_push_lock()); - RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn( + RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn( _req.partition_id, _req.txn_id, _req.tablet_id, _req.schema_hash, _tablet->tablet_uid(), _req.load_id)); if (_req.need_gen_rollup) { @@ -98,7 +103,7 @@ OLAPStatus DeltaWriter::init() { << "new_tablet_id: " << new_tablet_id << ", " << "new_schema_hash: " << new_schema_hash << ", " << "transaction_id: " << _req.txn_id; - _new_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash); + _new_tablet = _storage_engine->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash); if (_new_tablet == nullptr) { LOG(WARNING) << "find alter task, but could not find new tablet tablet_id: " << new_tablet_id << ", schema_hash: " << new_schema_hash; @@ -108,7 +113,7 @@ OLAPStatus DeltaWriter::init() { if (!new_migration_rlock.own_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - StorageEngine::instance()->txn_manager()->prepare_txn( + _storage_engine->txn_manager()->prepare_txn( _req.partition_id, _req.txn_id, new_tablet_id, new_schema_hash, _new_tablet->tablet_uid(), _req.load_id); } @@ -116,7 +121,7 @@ OLAPStatus DeltaWriter::init() { } RowsetWriterContext writer_context; - writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); + writer_context.rowset_id = _storage_engine->next_rowset_id(); writer_context.tablet_uid = _tablet->tablet_uid(); writer_context.tablet_id = _req.tablet_id; writer_context.partition_id = _req.partition_id; @@ -132,48 +137,58 @@ OLAPStatus DeltaWriter::init() { _tablet_schema = &(_tablet->tablet_schema()); _schema = new Schema(*_tablet_schema); - _mem_table = new MemTable(_schema, _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type()); + _mem_table = std::make_shared(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()); + + // create flush handler + RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(_tablet->data_dir()->path_hash(), &_flush_handler)); + _is_init = true; return OLAP_SUCCESS; } OLAPStatus DeltaWriter::write(Tuple* tuple) { if (!_is_init) { - auto st = init(); - if (st != OLAP_SUCCESS) { - return st; - } + RETURN_NOT_OK(init()); } _mem_table->insert(tuple); - if (_mem_table->memory_usage() >= config::write_buffer_size) { - RETURN_NOT_OK(_mem_table->flush(_rowset_writer.get())); - SAFE_DELETE(_mem_table); - _mem_table = new MemTable(_schema, _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type()); + // if memtable is full, push it to the flush executor, + // and create a new memtable for incoming data + if (_mem_table->memory_usage() >= config::write_buffer_size) { + RETURN_NOT_OK(_flush_memtable_async()); + // create a new memtable for new incoming data + _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get())); } return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* tablet_vec) { - if (!_is_init) { - auto st = init(); - if (st != OLAP_SUCCESS) { - return st; - } - } - RETURN_NOT_OK(_mem_table->close(_rowset_writer.get())); +OLAPStatus DeltaWriter::_flush_memtable_async() { + return _flush_handler->submit(_mem_table); +} + +OLAPStatus DeltaWriter::close() { + if (!_is_init) { + RETURN_NOT_OK(init()); + } + + RETURN_NOT_OK(_flush_memtable_async()); + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { + // return error if previous flush failed + RETURN_NOT_OK(_flush_handler->wait()); - OLAPStatus res = OLAP_SUCCESS; // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); if (_cur_rowset == nullptr) { LOG(WARNING) << "fail to build rowset"; return OLAP_ERR_MALLOC_ERROR; } - res = StorageEngine::instance()->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(), + OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(), _req.partition_id, _req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid(), _req.load_id, _cur_rowset, false); if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { @@ -194,7 +209,7 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t return res; } - res = StorageEngine::instance()->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(), + res = _storage_engine->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(), _req.partition_id, _req.txn_id, _new_tablet->tablet_id(), _new_tablet->schema_hash(), _new_tablet->tablet_uid(), _req.load_id, _new_rowset, false); @@ -218,6 +233,10 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t #endif _delta_written_success = true; + + const FlushStatistic& stat = _flush_handler->get_stats(); + LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() + << ", stats: " << stat; return OLAP_SUCCESS; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index f9d1587f5e..073f5e5f1e 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -18,19 +18,21 @@ #ifndef DORIS_BE_SRC_DELTA_WRITER_H #define DORIS_BE_SRC_DELTA_WRITER_H -#include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/schema_change.h" #include "runtime/descriptors.h" #include "runtime/tuple.h" #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" +#include "util/blocking_queue.hpp" namespace doris { -class SegmentGroup; +class FlushHandler; class MemTable; class Schema; +class SegmentGroup; +class StorageEngine; enum WriteType { LOAD = 1, @@ -54,17 +56,27 @@ struct WriteRequest { class DeltaWriter { public: static OLAPStatus open(WriteRequest* req, DeltaWriter** writer); + + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine); + OLAPStatus init(); - DeltaWriter(WriteRequest* req); + ~DeltaWriter(); + OLAPStatus write(Tuple* tuple); - OLAPStatus close(google::protobuf::RepeatedPtrField* tablet_vec); + // flush the last memtable to flush queue, must call it before close_wait() + OLAPStatus close(); + // wait for all memtables being flushed + OLAPStatus close_wait(google::protobuf::RepeatedPtrField* tablet_vec); OLAPStatus cancel(); int64_t partition_id() const { return _req.partition_id; } private: + // push a full memtable to flush executor + OLAPStatus _flush_memtable_async(); + void _garbage_collection(); private: @@ -75,10 +87,13 @@ private: RowsetSharedPtr _new_rowset; TabletSharedPtr _new_tablet; std::unique_ptr _rowset_writer; - MemTable* _mem_table; + std::shared_ptr _mem_table; Schema* _schema; const TabletSchema* _tablet_schema; bool _delta_written_success; + + StorageEngine* _storage_engine; + std::shared_ptr _flush_handler; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a0054a89de..5a1b240ae2 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -26,15 +26,17 @@ namespace doris { -MemTable::MemTable(Schema* schema, const TabletSchema* tablet_schema, +MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type) - : _schema(schema), + KeysType keys_type, RowsetWriter* rowset_writer) + : _tablet_id(tablet_id), + _schema(schema), _tablet_schema(tablet_schema), _tuple_desc(tuple_desc), _slot_descs(slot_descs), _keys_type(keys_type), - _row_comparator(_schema) { + _row_comparator(_schema), + _rowset_writer(rowset_writer) { _schema_size = _schema->schema_size(); _tuple_buf = _arena.Allocate(_schema_size); _skip_list = new Table(_row_comparator, &_arena); @@ -59,6 +61,7 @@ size_t MemTable::memory_usage() { void MemTable::insert(Tuple* tuple) { ContiguousRow row(_schema, _tuple_buf); + for (size_t i = 0; i < _slot_descs->size(); ++i) { auto cell = row.cell(i); const SlotDescriptor* slot = (*_slot_descs)[i]; @@ -75,7 +78,7 @@ void MemTable::insert(Tuple* tuple) { } } -OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) { +OLAPStatus MemTable::flush() { int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); @@ -84,17 +87,17 @@ OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) { char* row = (char*)it.key(); ContiguousRow dst_row(_schema, row); agg_finalize_row(&dst_row, _skip_list->arena()); - RETURN_NOT_OK(rowset_writer->add_row(dst_row)); + RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); } - RETURN_NOT_OK(rowset_writer->flush()); + RETURN_NOT_OK(_rowset_writer->flush()); } DorisMetrics::memtable_flush_total.increment(1); DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000); return OLAP_SUCCESS; } -OLAPStatus MemTable::close(RowsetWriter* rowset_writer) { - return flush(rowset_writer); +OLAPStatus MemTable::close() { + return flush(); } } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 1a262ce3dd..80be744cf7 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -18,8 +18,6 @@ #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H #define DORIS_BE_SRC_OLAP_MEMTABLE_H -#include - #include "olap/schema.h" #include "olap/skiplist.h" #include "runtime/tuple.h" @@ -28,18 +26,22 @@ namespace doris { class RowCursor; +class RowsetWriter; class MemTable { public: - MemTable(Schema* schema, const TabletSchema* tablet_schema, + MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type); + KeysType keys_type, RowsetWriter* rowset_writer); ~MemTable(); + int64_t tablet_id() { return _tablet_id; } size_t memory_usage(); void insert(Tuple* tuple); - OLAPStatus flush(RowsetWriter* rowset_writer); - OLAPStatus close(RowsetWriter* rowset_writer); + OLAPStatus flush(); + OLAPStatus close(); + private: + int64_t _tablet_id; Schema* _schema; const TabletSchema* _tablet_schema; TupleDescriptor* _tuple_desc; @@ -60,6 +62,9 @@ private: char* _tuple_buf; size_t _schema_size; Table* _skip_list; + + RowsetWriter* _rowset_writer; + }; // class MemTable } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp new file mode 100644 index 0000000000..a2a24485a7 --- /dev/null +++ b/be/src/olap/memtable_flush_executor.cpp @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_flush_executor.h" + +#include "olap/data_dir.h" +#include "olap/delta_writer.h" +#include "olap/memtable.h" +#include "runtime/exec_env.h" + +namespace doris { + +OLAPStatus FlushHandler::submit(std::shared_ptr memtable) { + RETURN_NOT_OK(_last_flush_status.load()); + MemTableFlushContext ctx; + ctx.memtable = memtable; + ctx.flush_handler = this->shared_from_this(); + _counter_cond.inc(); + _flush_executor->_push_memtable(_flush_queue_idx, ctx); + return OLAP_SUCCESS; +} + +OLAPStatus FlushHandler::wait() { + // wait util encoutering error, or all submitted memtables are finished + RETURN_NOT_OK(_last_flush_status.load()); + _counter_cond.block_wait(); + return _last_flush_status.load(); +} + +void FlushHandler::on_flush_finished(const FlushResult& res) { + if (res.flush_status != OLAP_SUCCESS) { + _last_flush_status.store(res.flush_status); + // if one failed, all other memtables no need to flush + _counter_cond.dec_to_zero(); + } else { + _stats.flush_time_ns.fetch_add(res.flush_time_ns); + _stats.flush_count.fetch_add(1); + _counter_cond.dec(); + } +} + +OLAPStatus MemTableFlushExecutor::create_flush_handler(int64_t path_hash, std::shared_ptr* flush_handler) { + int32_t flush_queue_idx = _get_queue_idx(path_hash); + flush_handler->reset(new FlushHandler(flush_queue_idx, this)); + return OLAP_SUCCESS; +} + +void MemTableFlushExecutor::init(const std::vector& data_dirs) { + int32_t data_dir_num = data_dirs.size(); + _thread_num_per_store = std::max(1, config::flush_thread_num_per_store); + _num_threads = data_dir_num * _thread_num_per_store; + + // create flush queues + for (int i = 0; i < _num_threads; ++i) { + BlockingQueue* queue = new BlockingQueue(10); + _flush_queues.push_back(queue); + } + // create thread pool + _flush_pool = new ThreadPool(_num_threads, 1); + for (int32_t i = 0; i < _num_threads; ++i) { + _flush_pool->offer(boost::bind(&MemTableFlushExecutor::_flush_memtable, this, i)); + } + + // _path_map saves the path hash to current idx of flush queue. + // eg. + // there are 4 data stores, each store has 2 work thread. + // so there are 8(= 4 * 2) queues in _flush_queues. + // and the path hash of the 4 paths are mapped to idx 0, 2, 4, 6. + int32_t group = 0; + for (auto store : data_dirs) { + _path_map[store->path_hash()] = group; + group += _thread_num_per_store; + } +} + +MemTableFlushExecutor::~MemTableFlushExecutor() { + // shutdown queues + for (auto queue : _flush_queues) { + queue->shutdown(); + } + + // shutdown thread pool + _flush_pool->shutdown(); + _flush_pool->join(); + + // delete queue + for (auto queue : _flush_queues) { + delete queue; + } + _flush_queues.clear(); + + delete _flush_pool; +} + +int32_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) { + std::lock_guard l(_lock); + int32_t cur_idx = _path_map[path_hash]; + int32_t group = cur_idx / _thread_num_per_store; + int32_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store); + DCHECK(next_idx < _num_threads); + _path_map[path_hash] = next_idx; + return cur_idx; +} + +void MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushContext& ctx) { + _flush_queues[queue_idx]->blocking_put(ctx); +} + +void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { + while(true) { + MemTableFlushContext ctx; + if (!_flush_queues[queue_idx]->blocking_get(&ctx)) { + // queue is empty and shutdown, end of thread + return; + } + + // if last flush of this tablet already failed, just skip + if (ctx.flush_handler->is_cancelled()) { + ctx.flush_handler->on_flush_cancelled(); + continue; + } + + // flush the memtable + FlushResult res; + MonotonicStopWatch timer; + timer.start(); + res.flush_status = ctx.memtable->flush(); + res.flush_time_ns = timer.elapsed_time(); + // callback + ctx.flush_handler->on_flush_finished(res); + } +} + +std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { + os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 + << ", flush count=" << stat.flush_count << ")"; + return os; +} + +} // end of namespac diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h new file mode 100644 index 0000000000..dc5b5d24c0 --- /dev/null +++ b/be/src/olap/memtable_flush_executor.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "olap/olap_define.h" +#include "util/blocking_queue.hpp" +#include "util/counter_cond_variable.hpp" +#include "util/spinlock.h" +#include "util/thread_pool.hpp" + +namespace doris { + +class DataDir; +class DeltaWriter; +class ExecEnv; +class MemTable; + +// The context for a memtable to be flushed. +class FlushHandler; +struct MemTableFlushContext { + // memtable to be flushed + std::shared_ptr memtable; + // flush handler from a delta writer. + // use shared ptr because flush_handler may be deleted before this + // memtable being flushed. so we need to make sure the flush_handler + // is alive until this memtable being flushed. + std::shared_ptr flush_handler; +}; + +// the flush result of a single memtable flush +struct FlushResult { + OLAPStatus flush_status; + int64_t flush_time_ns; +}; + +// the statistic of a certain flush handler. +// use atomic because it may be updated by multi threads +struct FlushStatistic { + std::atomic_int64_t flush_time_ns = {0}; + std::atomic_int64_t flush_count= {0}; +}; +std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); + +// flush handler is for flushing memtables in a delta writer +// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception +// when calling submit(); +class MemTableFlushExecutor; +class FlushHandler : public std::enable_shared_from_this { +public: + FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor): + _flush_queue_idx(flush_queue_idx), + _last_flush_status(OLAP_SUCCESS), + _counter_cond(0), + _flush_executor(flush_executor) { + } + + // submit a memtable to flush. return error if some previous submitted MemTable has failed + OLAPStatus submit(std::shared_ptr memtable); + // wait for all submitted memtable finished. + OLAPStatus wait(); + // get flush operations' statistics + const FlushStatistic& get_stats() const { return _stats; } + // called when a memtable is finished by executor. + void on_flush_finished(const FlushResult& res); + // called when a flush memtable execution is cancelled + void on_flush_cancelled() { + // for now, if one memtable cancelled, no more memtables will be flushed, so dec to zero. + _counter_cond.dec_to_zero(); + } + + bool is_cancelled() { return _last_flush_status.load() != OLAP_SUCCESS; } +private: + // flush queue idx in memtable flush executor + int32_t _flush_queue_idx; + // the flush status of last memtable + std::atomic _last_flush_status; + // used to wait/notify the memtable flush execution + CounterCondVariable _counter_cond; + + FlushStatistic _stats; + MemTableFlushExecutor* _flush_executor; +}; + +// MemTableFlushExecutor is for flushing memtables to disk. +// Each data directory has a specified number of worker threads and a corresponding number of flush queues. +// Each worker thread only takes memtable from the corresponding flush queue and writes it to disk. +// User SHOULD NOT call method of this class directly, use pattern should be: +// +// ... +// std::shared_ptr flush_handler; +// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler); +// ... +// flush_handler->submit(memtable) +// ... +class MemTableFlushExecutor { +public: + MemTableFlushExecutor() {} + // init should be called after storage engine is opened, + // because it needs path hash of each data dir. + void init(const std::vector& data_dirs); + + ~MemTableFlushExecutor(); + + // create a flush handler to access the flush executor + OLAPStatus create_flush_handler(int64_t path_hash, std::shared_ptr* flush_handler); + +private: + // given the path hash, return the next idx of flush queue. + // eg. + // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called, + // 0 and 1 will returned alternately. + int32_t _get_queue_idx(size_t path_hash); + + // push the memtable to specified flush queue + void _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx); + + void _flush_memtable(int32_t queue_idx); + +private: + friend class FlushHandler; + + int32_t _thread_num_per_store; + int32_t _num_threads; + ThreadPool* _flush_pool; + // the size of this vector should equals to _num_threads + std::vector*> _flush_queues; + // lock to protect path_map + SpinLock _lock; + // path hash -> queue idx of _flush_queues; + std::unordered_map _path_map; +}; + +} // end namespace diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 11ed82d604..1f670d01cd 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -36,6 +36,7 @@ #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/lru_cache.h" +#include "olap/memtable_flush_executor.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/push_handler.h" @@ -222,6 +223,9 @@ OLAPStatus StorageEngine::open() { // 取消未完成的SchemaChange任务 _tablet_manager->cancel_unfinished_schema_change(); + _memtable_flush_executor = new MemTableFlushExecutor(); + _memtable_flush_executor->init(dirs); + return OLAP_SUCCESS; } @@ -492,6 +496,9 @@ OLAPStatus StorageEngine::clear() { store_pair.second = nullptr; } _store_map.clear(); + + delete _memtable_flush_executor; + return OLAP_SUCCESS; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index cfbd995b24..7f96fa4bb0 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -51,9 +51,10 @@ namespace doris { -class Tablet; class DataDir; class EngineTask; +class MemTableFlushExecutor; +class Tablet; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. @@ -203,6 +204,8 @@ public: void release_rowset_id(const RowsetId& rowset_id) { return _rowset_id_generator->release_id(rowset_id); }; + MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor; } + private: OLAPStatus _check_file_descriptor_number(); @@ -345,6 +348,8 @@ private: std::unique_ptr _rowset_id_generator; + MemTableFlushExecutor* _memtable_flush_executor; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index e46afeef3e..ae89fca119 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -24,87 +24,88 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(RUNTIME_FILES - broker_mgr.cpp - buffered_block_mgr.cpp - buffered_tuple_stream.cpp - buffered_tuple_stream_ir.cpp - buffer_control_block.cpp - merge_sorter.cpp - client_cache.cpp - data_stream_mgr.cpp - data_stream_sender.cpp - datetime_value.cpp - descriptors.cpp - exec_env.cpp - exec_env_init.cpp - user_function_cache.cpp - mem_pool.cpp - plan_fragment_executor.cpp - primitive_type.cpp - pull_load_task_mgr.cpp - raw_value.cpp - raw_value_ir.cpp - result_sink.cpp - result_writer.cpp - result_buffer_mgr.cpp - row_batch.cpp - runtime_state.cpp - string_value.cpp - thread_resource_mgr.cpp - # timestamp_value.cpp - decimal_value.cpp - decimalv2_value.cpp - large_int_value.cpp - tuple.cpp - tuple_row.cpp - vectorized_row_batch.cpp - dpp_writer.cpp - qsorter.cpp - fragment_mgr.cpp - dpp_sink_internal.cpp - data_spliter.cpp - dpp_sink.cpp - etl_job_mgr.cpp - load_path_mgr.cpp - types.cpp - tmp_file_mgr.cc - disk_io_mgr.cc - disk_io_mgr_reader_context.cc - disk_io_mgr_scan_range.cc - buffered_block_mgr2.cc - test_env.cc - mem_tracker.cpp - spill_sorter.cc - sorted_run_merger.cc - data_stream_recvr.cc - buffered_tuple_stream2.cc - buffered_tuple_stream2_ir.cc - buffered_tuple_stream3.cc - # export_task_mgr.cpp - export_sink.cpp - tablet_writer_mgr.cpp - bufferpool/buffer_allocator.cc - bufferpool/buffer_pool.cc - bufferpool/reservation_tracker.cc - bufferpool/reservation_util.cc - bufferpool/suballocator.cc - bufferpool/system_allocator.cc - initial_reservations.cc - snapshot_loader.cpp - query_statistics.cpp - message_body_sink.cpp - stream_load/stream_load_context.cpp - stream_load/stream_load_executor.cpp - routine_load/data_consumer.cpp - routine_load/data_consumer_group.cpp - routine_load/data_consumer_pool.cpp - routine_load/routine_load_task_executor.cpp - small_file_mgr.cpp - result_queue_mgr.cpp - memory_scratch_sink.cpp - external_scan_context_mgr.cpp - memory/system_allocator.cpp - memory/chunk_allocator.cpp + broker_mgr.cpp + buffered_block_mgr.cpp + buffered_tuple_stream.cpp + buffered_tuple_stream_ir.cpp + buffer_control_block.cpp + merge_sorter.cpp + client_cache.cpp + data_stream_mgr.cpp + data_stream_sender.cpp + datetime_value.cpp + descriptors.cpp + exec_env.cpp + exec_env_init.cpp + user_function_cache.cpp + mem_pool.cpp + plan_fragment_executor.cpp + primitive_type.cpp + pull_load_task_mgr.cpp + raw_value.cpp + raw_value_ir.cpp + result_sink.cpp + result_writer.cpp + result_buffer_mgr.cpp + row_batch.cpp + runtime_state.cpp + string_value.cpp + thread_resource_mgr.cpp + # timestamp_value.cpp + decimal_value.cpp + decimalv2_value.cpp + large_int_value.cpp + tuple.cpp + tuple_row.cpp + vectorized_row_batch.cpp + dpp_writer.cpp + qsorter.cpp + fragment_mgr.cpp + dpp_sink_internal.cpp + data_spliter.cpp + dpp_sink.cpp + etl_job_mgr.cpp + load_path_mgr.cpp + types.cpp + tmp_file_mgr.cc + disk_io_mgr.cc + disk_io_mgr_reader_context.cc + disk_io_mgr_scan_range.cc + buffered_block_mgr2.cc + test_env.cc + mem_tracker.cpp + spill_sorter.cc + sorted_run_merger.cc + data_stream_recvr.cc + buffered_tuple_stream2.cc + buffered_tuple_stream2_ir.cc + buffered_tuple_stream3.cc + # export_task_mgr.cpp + export_sink.cpp + tablet_writer_mgr.cpp + tablets_channel.cpp + bufferpool/buffer_allocator.cc + bufferpool/buffer_pool.cc + bufferpool/reservation_tracker.cc + bufferpool/reservation_util.cc + bufferpool/suballocator.cc + bufferpool/system_allocator.cc + initial_reservations.cc + snapshot_loader.cpp + query_statistics.cpp + message_body_sink.cpp + stream_load/stream_load_context.cpp + stream_load/stream_load_executor.cpp + routine_load/data_consumer.cpp + routine_load/data_consumer_group.cpp + routine_load/data_consumer_pool.cpp + routine_load/routine_load_task_executor.cpp + small_file_mgr.cpp + result_queue_mgr.cpp + memory_scratch_sink.cpp + external_scan_context_mgr.cpp + memory/system_allocator.cpp + memory/chunk_allocator.cpp ) if (WITH_MYSQL) diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 0d30dbbab6..20593ce192 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -24,232 +24,17 @@ #include "common/object_pool.h" #include "exec/tablet_info.h" #include "runtime/descriptors.h" +#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "service/backend_options.h" -#include "util/bitmap.h" #include "util/stopwatch.hpp" #include "olap/delta_writer.h" #include "olap/lru_cache.h" namespace doris { -// channel that process all data for this load -class TabletsChannel { -public: - TabletsChannel(const TabletsChannelKey& key) : _key(key), _closed_senders(64) { } - ~TabletsChannel(); - - Status open(const PTabletWriterOpenRequest& params); - - Status add_batch(const PTabletWriterAddBatchRequest& batch); - - Status close(int sender_id, bool* finished, - const google::protobuf::RepeatedField& partition_ids, - google::protobuf::RepeatedPtrField* tablet_vec); - - time_t last_updated_time() { - return _last_updated_time; - } - -private: - // open all writer - Status _open_all_writers(const PTabletWriterOpenRequest& params); - -private: - // id of this load channel, just for - TabletsChannelKey _key; - - // make execute sequece - std::mutex _lock; - - // initialized in open function - int64_t _txn_id = -1; - int64_t _index_id = -1; - OlapTableSchemaParam* _schema = nullptr; - TupleDescriptor* _tuple_desc = nullptr; - // row_desc used to construct - RowDescriptor* _row_desc = nullptr; - bool _opened = false; - - // next sequence we expect - int _num_remaining_senders = 0; - std::vector _next_seqs; - Bitmap _closed_senders; - Status _close_status; - - // tablet_id -> TabletChannel - std::unordered_map _tablet_writers; - - std::unordered_set _partition_ids; - - // TODO(zc): to add this tracker to somewhere - MemTracker _mem_tracker; - - //use to erase timeout TabletsChannel in _tablets_channels - time_t _last_updated_time; -}; - -TabletsChannel::~TabletsChannel() { - for (auto& it : _tablet_writers) { - delete it.second; - } - delete _row_desc; - delete _schema; -} - -Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { - std::lock_guard l(_lock); - if (_opened) { - // Normal case, already open by other sender - return Status::OK(); - } - LOG(INFO) << "open tablets channel: " << _key; - _txn_id = params.txn_id(); - _index_id = params.index_id(); - _schema = new OlapTableSchemaParam(); - RETURN_IF_ERROR(_schema->init(params.schema())); - _tuple_desc = _schema->tuple_desc(); - _row_desc = new RowDescriptor(_tuple_desc, false); - - _num_remaining_senders = params.num_senders(); - _next_seqs.resize(_num_remaining_senders, 0); - _closed_senders.Reset(_num_remaining_senders); - - RETURN_IF_ERROR(_open_all_writers(params)); - - _opened = true; - _last_updated_time = time(nullptr); - return Status::OK(); -} - -Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { - DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); - std::lock_guard l(_lock); - DCHECK(_opened); - auto next_seq = _next_seqs[params.sender_id()]; - // check packet - if (params.packet_seq() < next_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::OK(); - } else if (params.packet_seq() > next_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::InternalError("lost data packet"); - } - - RowBatch row_batch(*_row_desc, params.row_batch(), &_mem_tracker); - - // iterator all data - for (int i = 0; i < params.tablet_ids_size(); ++i) { - auto tablet_id = params.tablet_ids(i); - auto it = _tablet_writers.find(tablet_id); - if (it == std::end(_tablet_writers)) { - std::stringstream ss; - ss << "unknown tablet to append data, tablet=" << tablet_id; - return Status::InternalError(ss.str()); - } - auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "tablet writer write failed, tablet_id=" << it->first - << ", transaction_id=" << _txn_id << ", err=" << st; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - } - _next_seqs[params.sender_id()]++; - _last_updated_time = time(nullptr); - return Status::OK(); -} - -Status TabletsChannel::close(int sender_id, bool* finished, - const google::protobuf::RepeatedField& partition_ids, - google::protobuf::RepeatedPtrField* tablet_vec) { - std::lock_guard l(_lock); - if (_closed_senders.Get(sender_id)) { - // Double close from one sender, just return OK - *finished = (_num_remaining_senders == 0); - return _close_status; - } - LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id; - for (auto pid : partition_ids) { - _partition_ids.emplace(pid); - } - _closed_senders.Set(sender_id, true); - _num_remaining_senders--; - *finished = (_num_remaining_senders == 0); - if (*finished) { - // All senders are closed - for (auto& it : _tablet_writers) { - if (_partition_ids.count(it.second->partition_id()) > 0) { - auto st = it.second->close(tablet_vec); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "close tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id << ", err=" << st; - LOG(WARNING) << ss.str(); - _close_status = Status::InternalError(ss.str()); - return _close_status; - } - } else { - auto st = it.second->cancel(); - if (st != OLAP_SUCCESS) { - LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id; - } - } - } - } - return Status::OK(); -} - -Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) { - std::vector* index_slots = nullptr; - int32_t schema_hash = 0; - for (auto& index : _schema->indexes()) { - if (index->index_id == _index_id) { - index_slots = &index->slots; - schema_hash = index->schema_hash; - break; - } - } - if (index_slots == nullptr) { - std::stringstream ss; - ss << "unknown index id, key=" << _key; - return Status::InternalError(ss.str()); - } - for (auto& tablet : params.tablets()) { - WriteRequest request; - request.tablet_id = tablet.tablet_id(); - request.schema_hash = schema_hash; - request.write_type = LOAD; - request.txn_id = _txn_id; - request.partition_id = tablet.partition_id(); - request.load_id = params.id(); - request.need_gen_rollup = params.need_gen_rollup(); - request.tuple_desc = _tuple_desc; - request.slots = index_slots; - - DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&request, &writer); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() - << ", txn_id=" << _txn_id - << ", partition_id=" << tablet.partition_id() - << ", err=" << st; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - _tablet_writers.emplace(tablet.tablet_id(), writer); - } - DCHECK(_tablet_writers.size() == params.tablets_size()); - return Status::OK(); -} - TabletWriterMgr::TabletWriterMgr(ExecEnv* exec_env) :_exec_env(exec_env) { _tablets_channels.init(2011); _lastest_success_channel = new_lru_cache(1024); @@ -381,15 +166,4 @@ Status TabletWriterMgr::_start_tablets_channel_clean() { return Status::OK(); } -std::string TabletsChannelKey::to_string() const { - std::stringstream ss; - ss << *this; - return ss.str(); -} - -std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { - os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; - return os; -} - } diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index f5714dade4..8a7f1e192e 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -29,6 +29,7 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/tablets_channel.h" #include "util/hash_util.hpp" #include "util/uid_util.h" @@ -37,29 +38,6 @@ namespace doris { class ExecEnv; -class TabletsChannel; - -struct TabletsChannelKey { - UniqueId id; - int64_t index_id; - - TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) - : id(pid), index_id(index_id_) { } - ~TabletsChannelKey() noexcept { } - - bool operator==(const TabletsChannelKey& rhs) const noexcept { - return index_id == rhs.index_id && id == rhs.id; - } - - std::string to_string() const; -}; - -struct TabletsChannelKeyHasher { - std::size_t operator()(const TabletsChannelKey& key) const { - size_t seed = key.id.hash(); - return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed); - } -}; class Cache; @@ -104,6 +82,4 @@ private: Status _start_tablets_channel_clean(); }; -std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&); - } diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp new file mode 100644 index 0000000000..b52f1aa028 --- /dev/null +++ b/be/src/runtime/tablets_channel.cpp @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/tablets_channel.h" + +#include "exec/tablet_info.h" +#include "olap/delta_writer.h" +#include "olap/memtable.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" + +namespace doris { + +TabletsChannel::TabletsChannel(const TabletsChannelKey& key): + _key(key), _closed_senders(64) { + // _last_updated_time should be set before being inserted to + // _tablet_channels in tablet_channel_mgr, or it may be erased + // immediately by gc thread. + _last_updated_time = time(nullptr); +} + +TabletsChannel::~TabletsChannel() { + for (auto& it : _tablet_writers) { + delete it.second; + } + delete _row_desc; + delete _schema; +} + +Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { + std::lock_guard l(_lock); + if (_opened) { + // Normal case, already open by other sender + return Status::OK(); + } + LOG(INFO) << "open tablets channel: " << _key; + _txn_id = params.txn_id(); + _index_id = params.index_id(); + _schema = new OlapTableSchemaParam(); + RETURN_IF_ERROR(_schema->init(params.schema())); + _tuple_desc = _schema->tuple_desc(); + _row_desc = new RowDescriptor(_tuple_desc, false); + + _num_remaining_senders = params.num_senders(); + _next_seqs.resize(_num_remaining_senders, 0); + _closed_senders.Reset(_num_remaining_senders); + + RETURN_IF_ERROR(_open_all_writers(params)); + + _opened = true; + _last_updated_time = time(nullptr); + return Status::OK(); +} + +Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { + DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); + std::lock_guard l(_lock); + DCHECK(_opened); + auto next_seq = _next_seqs[params.sender_id()]; + // check packet + if (params.packet_seq() < next_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq + << ", recept_seq=" << params.packet_seq(); + return Status::OK(); + } else if (params.packet_seq() > next_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << next_seq + << ", recept_seq=" << params.packet_seq(); + return Status::InternalError("lost data packet"); + } + + RowBatch row_batch(*_row_desc, params.row_batch(), &_mem_tracker); + + // iterator all data + for (int i = 0; i < params.tablet_ids_size(); ++i) { + auto tablet_id = params.tablet_ids(i); + auto it = _tablet_writers.find(tablet_id); + if (it == std::end(_tablet_writers)) { + std::stringstream ss; + ss << "unknown tablet to append data, tablet=" << tablet_id; + return Status::InternalError(ss.str()); + } + auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); + if (st != OLAP_SUCCESS) { + std::stringstream ss; + ss << "tablet writer write failed, tablet_id=" << it->first + << ", transaction_id=" << _txn_id << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + } + _next_seqs[params.sender_id()]++; + _last_updated_time = time(nullptr); + return Status::OK(); +} + +Status TabletsChannel::close(int sender_id, bool* finished, + const google::protobuf::RepeatedField& partition_ids, + google::protobuf::RepeatedPtrField* tablet_vec) { + std::lock_guard l(_lock); + if (_closed_senders.Get(sender_id)) { + // Double close from one sender, just return OK + *finished = (_num_remaining_senders == 0); + return _close_status; + } + LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id; + for (auto pid : partition_ids) { + _partition_ids.emplace(pid); + } + _closed_senders.Set(sender_id, true); + _num_remaining_senders--; + *finished = (_num_remaining_senders == 0); + if (*finished) { + // All senders are closed + // 1. close all delta writers + std::vector need_wait_writers; + for (auto& it : _tablet_writers) { + if (_partition_ids.count(it.second->partition_id()) > 0) { + auto st = it.second->close(); + if (st != OLAP_SUCCESS) { + LOG(WARNING) << "close tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id << ", err=" << st; + // just skip this tablet(writer) and continue to close others + continue; + } + need_wait_writers.push_back(it.second); + } else { + auto st = it.second->cancel(); + if (st != OLAP_SUCCESS) { + LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id; + // just skip this tablet(writer) and continue to close others + continue; + } + } + } + + // 2. wait delta writers and build the tablet vector + for (auto writer : need_wait_writers) { + // close may return failed, but no need to handle it here. + // tablet_vec will only contains success tablet, and then let FE judge it. + writer->close_wait(tablet_vec); + } + } + return Status::OK(); +} + +Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) { + std::vector* index_slots = nullptr; + int32_t schema_hash = 0; + for (auto& index : _schema->indexes()) { + if (index->index_id == _index_id) { + index_slots = &index->slots; + schema_hash = index->schema_hash; + break; + } + } + if (index_slots == nullptr) { + std::stringstream ss; + ss << "unknown index id, key=" << _key; + return Status::InternalError(ss.str()); + } + for (auto& tablet : params.tablets()) { + WriteRequest request; + request.tablet_id = tablet.tablet_id(); + request.schema_hash = schema_hash; + request.write_type = LOAD; + request.txn_id = _txn_id; + request.partition_id = tablet.partition_id(); + request.load_id = params.id(); + request.need_gen_rollup = params.need_gen_rollup(); + request.tuple_desc = _tuple_desc; + request.slots = index_slots; + + DeltaWriter* writer = nullptr; + auto st = DeltaWriter::open(&request, &writer); + if (st != OLAP_SUCCESS) { + std::stringstream ss; + ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() + << ", txn_id=" << _txn_id + << ", partition_id=" << tablet.partition_id() + << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + _tablet_writers.emplace(tablet.tablet_id(), writer); + } + DCHECK(_tablet_writers.size() == params.tablets_size()); + return Status::OK(); +} + +std::string TabletsChannelKey::to_string() const { + std::stringstream ss; + ss << *this; + return ss.str(); +} + +std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { + os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; + return os; +} + +} diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h new file mode 100644 index 0000000000..b893af870b --- /dev/null +++ b/be/src/runtime/tablets_channel.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "runtime/descriptors.h" +#include "runtime/mem_tracker.h" +#include "util/bitmap.h" +#include "util/thread_pool.hpp" +#include "util/uid_util.h" + +#include "gen_cpp/Types_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/internal_service.pb.h" + +namespace doris { + +struct TabletsChannelKey { + UniqueId id; + int64_t index_id; + + TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) + : id(pid), index_id(index_id_) { } + + ~TabletsChannelKey() noexcept { } + + bool operator==(const TabletsChannelKey& rhs) const noexcept { + return index_id == rhs.index_id && id == rhs.id; + } + + std::string to_string() const; +}; + +struct TabletsChannelKeyHasher { + std::size_t operator()(const TabletsChannelKey& key) const { + size_t seed = key.id.hash(); + return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed); + } +}; + +class DeltaWriter; +class OlapTableSchemaParam; + +// channel that process all data for this load +class TabletsChannel { +public: + TabletsChannel(const TabletsChannelKey& key); + + ~TabletsChannel(); + + Status open(const PTabletWriterOpenRequest& params); + + Status add_batch(const PTabletWriterAddBatchRequest& batch); + + Status close(int sender_id, bool* finished, + const google::protobuf::RepeatedField& partition_ids, + google::protobuf::RepeatedPtrField* tablet_vec); + + time_t last_updated_time() { + return _last_updated_time; + } + +private: + // open all writer + Status _open_all_writers(const PTabletWriterOpenRequest& params); + +private: + // id of this load channel + TabletsChannelKey _key; + + // make execute sequece + std::mutex _lock; + + // initialized in open function + int64_t _txn_id = -1; + int64_t _index_id = -1; + OlapTableSchemaParam* _schema = nullptr; + TupleDescriptor* _tuple_desc = nullptr; + // row_desc used to construct + RowDescriptor* _row_desc = nullptr; + bool _opened = false; + + // next sequence we expect + int _num_remaining_senders = 0; + std::vector _next_seqs; + Bitmap _closed_senders; + Status _close_status; + + // tablet_id -> TabletChannel + std::unordered_map _tablet_writers; + + std::unordered_set _partition_ids; + + // TODO(zc): to add this tracker to somewhere + MemTracker _mem_tracker; + + //use to erase timeout TabletsChannel in _tablets_channels + time_t _last_updated_time; +}; + +std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key); + +} // end namespace diff --git a/be/src/util/counter_cond_variable.hpp b/be/src/util/counter_cond_variable.hpp new file mode 100644 index 0000000000..9048037826 --- /dev/null +++ b/be/src/util/counter_cond_variable.hpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +// used for submitter/worker/waiter pattern +// submitter: +// one or more submitters submit tasks and call inc_count() +// worker: +// one or more workers do the task and call dec_count() after finishing the task +// waiter: +// one or more waiter call xxx_wait() to wait until all or at least one tasks are finished. +// Use pattern: +// thread1(submitter): +// CounterCondVariable cond(0); +// ... submit task ... +// cond.inc(); +// ... submit task ... +// cond.inr(); +// +// thread2(worker): +// ... do work... +// cond.dec(); +// ... do work... +// cond.dec(); +// or +// ... failed ... +// cond.dec_to_zero(); +// +// thread3(waiter): +// cond.block_wait(); + +class CounterCondVariable { +public: + explicit CounterCondVariable(int init = 0) : _count(init) { + } + + // increase the counter + void inc(int inc = 1) { + std::unique_lock lock(_lock); + _count += inc; + } + + // decrease the counter, and notify all waiters + void dec(int dec = 1) { + std::unique_lock lock(_lock); + _count -= dec; + _cv.notify_all(); + } + + // decrease the counter to zero + void dec_to_zero() { + std::unique_lock lock(_lock); + _count = 0; + _cv.notify_all(); + } + + // wait until count down to zero + void block_wait() { + std::unique_lock lock(_lock); + if (_count <= 0) { + return; + } + _cv.wait(lock, [this] { return _count <= 0; }); + } + +private: + std::mutex _lock; + std::condition_variable _cv; + int _count; +}; + +} // end namespace diff --git a/be/src/util/semaphore.hpp b/be/src/util/semaphore.hpp index 398c5ae3ed..dac287d604 100644 --- a/be/src/util/semaphore.hpp +++ b/be/src/util/semaphore.hpp @@ -31,13 +31,13 @@ class Semaphore { void signal() { std::unique_lock lock(_mutex); - ++count_; - cv_.notify_one(); + ++_count; + _cv.notify_one(); } void wait() { std::unique_lock lock(_mutex); - cv_.wait(lock, [=] { return _count > 0; }); + _cv.wait(lock, [=] { return _count > 0; }); --_count; } diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 9d671d6f30..3f248e6c30 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -71,3 +71,4 @@ ADD_BE_TEST(key_coder_test) ADD_BE_TEST(short_key_index_test) ADD_BE_TEST(page_cache_test) ADD_BE_TEST(hll_test) +ADD_BE_TEST(memtable_flush_executor_test) diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index ab4e0aba36..9685aeb3cf 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -30,6 +30,7 @@ #include "olap/utils.h" #include "runtime/tuple.h" #include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" #include "util/logging.h" #include "olap/options.h" #include "olap/tablet_meta_manager.h" @@ -56,6 +57,9 @@ void set_up() { doris::EngineOptions options; options.store_paths = paths; doris::StorageEngine::open(options, &k_engine); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); } void tear_down() { @@ -301,7 +305,9 @@ TEST_F(TestDeltaWriter, open) { DeltaWriter* delta_writer = nullptr; DeltaWriter::open(&write_req, &delta_writer); ASSERT_NE(delta_writer, nullptr); - res = delta_writer->close(nullptr); + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr); ASSERT_EQ(OLAP_SUCCESS, res); SAFE_DELETE(delta_writer); @@ -391,7 +397,9 @@ TEST_F(TestDeltaWriter, write) { ASSERT_EQ(OLAP_SUCCESS, res); } - res = delta_writer->close(nullptr); + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr); ASSERT_EQ(OLAP_SUCCESS, res); // publish version success diff --git a/be/test/olap/memtable_flush_executor_test.cpp b/be/test/olap/memtable_flush_executor_test.cpp new file mode 100644 index 0000000000..cfbcab538c --- /dev/null +++ b/be/test/olap/memtable_flush_executor_test.cpp @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_flush_executor.h" + +#include +#include +#include + +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/Types_types.h" +#include "olap/delta_writer.h" +#include "olap/field.h" +#include "olap/memtable.h" +#include "olap/schema.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/utils.h" +#include "runtime/tuple.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "util/logging.h" +#include "olap/options.h" +#include "olap/tablet_meta_manager.h" + +namespace doris { + +StorageEngine* k_engine = nullptr; +MemTableFlushExecutor* k_flush_executor = nullptr; + +void set_up() { + char buffer[1024]; + getcwd(buffer, 1024); + config::storage_root_path = std::string(buffer) + "/flush_test"; + remove_all_dir(config::storage_root_path); + create_dir(config::storage_root_path); + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + doris::StorageEngine::open(options, &k_engine); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); + + k_flush_executor = k_engine->memtable_flush_executor(); +} + +void tear_down() { + delete k_engine; + k_engine = nullptr; + system("rm -rf ./flush_test"); + remove_all_dir(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX); +} + +Schema create_schema() { + std::vector col_schemas; + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_SMALLINT, true); + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true); + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_SUM, OLAP_FIELD_TYPE_BIGINT, true); + Schema schema(col_schemas, 2); + return schema; +} + +class TestMemTableFlushExecutor : public ::testing::Test { +public: + TestMemTableFlushExecutor() { } + ~TestMemTableFlushExecutor() { } + + void SetUp() { + std::cout << "setup" << std::endl; + } + + void TearDown(){ + std::cout << "tear down" << std::endl; + } +}; + +TEST_F(TestMemTableFlushExecutor, create_flush_handler) { + std::vector data_dir = k_engine->get_stores(); + int64_t path_hash = data_dir[0]->path_hash(); + + std::shared_ptr flush_handler; + k_flush_executor->create_flush_handler(path_hash, &flush_handler); + ASSERT_NE(nullptr, flush_handler.get()); + + FlushResult res; + res.flush_status = OLAP_SUCCESS; + res.flush_time_ns = 100; + flush_handler->on_flush_finished(res); + ASSERT_FALSE(flush_handler->is_cancelled()); + ASSERT_EQ(100, flush_handler->get_stats().flush_time_ns); + ASSERT_EQ(1, flush_handler->get_stats().flush_count); + + FlushResult res2; + res2.flush_status = OLAP_ERR_OTHER_ERROR; + flush_handler->on_flush_finished(res2); + ASSERT_TRUE(flush_handler->is_cancelled()); + ASSERT_EQ(100, flush_handler->get_stats().flush_time_ns); + ASSERT_EQ(1, flush_handler->get_stats().flush_count); + + ASSERT_EQ(OLAP_ERR_OTHER_ERROR, flush_handler->wait()); +} + +} // namespace doris + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + doris::init_glog("be-test"); + int ret = doris::OLAP_SUCCESS; + testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + doris::set_up(); + ret = RUN_ALL_TESTS(); + doris::tear_down(); + google::protobuf::ShutdownProtobufLibrary(); + return ret; +} diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index 8ab4202245..8cb542f4cc 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -106,8 +106,10 @@ int main(int argc, char** argv) { fprintf(stderr, "error read config file. \n"); return -1; } + + doris::config::scan_context_gc_interval_min = 1; // doris::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); doris::CpuInfo::init(); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/be/test/runtime/tablet_writer_mgr_test.cpp b/be/test/runtime/tablet_writer_mgr_test.cpp index 69fce8f919..d42ad849b2 100644 --- a/be/test/runtime/tablet_writer_mgr_test.cpp +++ b/be/test/runtime/tablet_writer_mgr_test.cpp @@ -32,6 +32,7 @@ #include "runtime/descriptor_helper.h" #include "util/thrift_util.h" #include "olap/delta_writer.h" +#include "olap/storage_engine.h" namespace doris { @@ -42,7 +43,7 @@ OLAPStatus close_status; int64_t wait_lock_time_ns; // mock -DeltaWriter::DeltaWriter(WriteRequest* req) : _req(*req) { +DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine) : _req(*req) { } DeltaWriter::~DeltaWriter() { @@ -56,7 +57,7 @@ OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { if (open_status != OLAP_SUCCESS) { return open_status; } - *writer = new DeltaWriter(req); + *writer = new DeltaWriter(req, nullptr); return open_status; } @@ -69,7 +70,11 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) { return add_status; } -OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* tablet_vec) { +OLAPStatus DeltaWriter::close() { + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { return close_status; } @@ -479,7 +484,9 @@ TEST_F(TabletWriterMgrTest, close_failed) { google::protobuf::RepeatedPtrField tablet_vec; auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); request.release_id(); - ASSERT_FALSE(st.ok()); + // even if delta close failed, the return status is still ok, but tablet_vec is empty + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(tablet_vec.empty()); } } diff --git a/be/test/runtime/user_function_cache_test.cpp b/be/test/runtime/user_function_cache_test.cpp index b125938f32..a4dc1fc317 100644 --- a/be/test/runtime/user_function_cache_test.cpp +++ b/be/test/runtime/user_function_cache_test.cpp @@ -86,7 +86,7 @@ public: UserFunctionCacheTest() { } virtual ~UserFunctionCacheTest() { } static void SetUpTestCase() { - s_server = new EvHttpServer(29999); + s_server = new EvHttpServer(29987); s_server->register_handler(GET, "/{FILE}", &s_test_handler); s_server->start(); @@ -130,7 +130,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_add st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_TRUE(k_is_downloaded); @@ -140,7 +140,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_del st = cache.get_function_ptr(1, "_Z6my_delv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_NE(nullptr, fn_ptr); @@ -149,7 +149,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_mul st = cache.get_function_ptr(1, "_Z6my_mulv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_FALSE(st.ok()); @@ -165,7 +165,7 @@ TEST_F(UserFunctionCacheTest, load_normal) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_FALSE(k_is_downloaded); @@ -183,7 +183,7 @@ TEST_F(UserFunctionCacheTest, download_fail) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(2, "_Z6my_delv", - "http://127.0.0.1:29999/my_del.so", + "http://127.0.0.1:29987/my_del.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } @@ -199,7 +199,7 @@ TEST_F(UserFunctionCacheTest, md5_fail) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", "1234", &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } @@ -218,7 +218,7 @@ TEST_F(UserFunctionCacheTest, bad_so) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(2, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", "abc", &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 1955a00452..ff910ddfa2 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -47,3 +47,4 @@ ADD_BE_TEST(tdigest_test) ADD_BE_TEST(block_compression_test) ADD_BE_TEST(arrow/arrow_row_block_test) ADD_BE_TEST(arrow/arrow_row_batch_test) +ADD_BE_TEST(counter_cond_variable_test) diff --git a/be/test/util/counter_cond_variable_test.cpp b/be/test/util/counter_cond_variable_test.cpp new file mode 100644 index 0000000000..b64110af54 --- /dev/null +++ b/be/test/util/counter_cond_variable_test.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/counter_cond_variable.hpp" + +#include +#include + +#include "common/logging.h" + +namespace doris { + +CounterCondVariable g_cond; +std::mutex g_io_mu; + +class CounterCondVariableTest : public testing::Test { +public: + CounterCondVariableTest() { } + virtual ~CounterCondVariableTest() { } +}; + +void submitter() { + for (int i = 0; i < 10; ++i) { + g_cond.inc(); + } +} + +void worker() { + for (int i = 0; i < 10; ++i) { + { + std::unique_lock lock(g_io_mu); + std::cout << "worker " << i << std::endl; + } + sleep(1); + g_cond.dec(); + } +} + +void waiter() { + g_cond.block_wait(); + std::cout << "wait finished" << std::endl; +} + +TEST_F(CounterCondVariableTest, test) { + g_cond.block_wait(); + g_cond.inc(10); + g_cond.dec(10); + g_cond.block_wait(); + + std::thread submit(submitter); + std::thread wait1(waiter); + std::thread wait2(waiter); + std::thread work1(worker); + std::thread work2(worker); + + submit.join(); + wait1.join(); + wait2.join(); + work1.join(); + work2.join(); + + g_cond.inc(10); + g_cond.dec_to_zero(); + g_cond.block_wait(); +} + +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index c83aa89a74..a0331d432d 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -395,7 +395,7 @@ public class GlobalTransactionMgr { } if (successReplicaNum < quorumReplicaNum) { - LOG.warn("Failed to commit txn []. " + LOG.warn("Failed to commit txn [{}]. " + "Tablet [{}] success replica num is {} < quorum replica num {} " + "while error backends {}", transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, diff --git a/run-ut.sh b/run-ut.sh index c816e0a19e..95a9456e65 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -158,6 +158,7 @@ ${DORIS_TEST_BINARY_DIR}/util/tdigest_test ${DORIS_TEST_BINARY_DIR}/util/block_compression_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_block_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_batch_test +${DORIS_TEST_BINARY_DIR}/util/counter_cond_variable_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test @@ -241,6 +242,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/column_reader_test ${DORIS_TEST_BINARY_DIR}/olap/row_cursor_test ${DORIS_TEST_BINARY_DIR}/olap/skiplist_test ${DORIS_TEST_BINARY_DIR}/olap/serialize_test +${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test