diff --git a/be/src/common/config.h b/be/src/common/config.h index b386d213b3..b37e788f65 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -465,6 +465,8 @@ namespace config { CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95% // The min bytes that should be left of a data dir CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB + // number of thread for flushing memtable per store + CONF_Int32(flush_thread_num_per_store, "2"); } // namespace config } // namespace doris diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 2ae5cc895a..a6ae95e68d 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -45,6 +45,7 @@ add_library(Olap STATIC key_coder.cpp lru_cache.cpp memtable.cpp + memtable_flush_executor.cpp merger.cpp new_status.cpp null_predicate.cpp diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 6499981607..0fea80a05d 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,33 +17,38 @@ #include "olap/delta_writer.h" -#include "olap/schema.h" -#include "olap/memtable.h" #include "olap/data_dir.h" +#include "olap/memtable.h" +#include "olap/memtable_flush_executor.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/rowset/rowset_id_generator.h" +#include "olap/schema.h" +#include "olap/storage_engine.h" namespace doris { OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { - *writer = new DeltaWriter(req); - return OLAP_SUCCESS; + *writer = new DeltaWriter(req, StorageEngine::instance()); + return (*writer)->init(); } -DeltaWriter::DeltaWriter(WriteRequest* req) +DeltaWriter::DeltaWriter( + WriteRequest* req, + StorageEngine* storage_engine) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), _new_rowset(nullptr), _new_tablet(nullptr), - _rowset_writer(nullptr), _mem_table(nullptr), - _schema(nullptr), _tablet_schema(nullptr), - _delta_written_success(false) {} + _rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr), + _delta_written_success(false), + _storage_engine(storage_engine) { +} DeltaWriter::~DeltaWriter() { if (!_delta_written_success) { _garbage_collection(); } - SAFE_DELETE(_mem_table); + _mem_table.reset(); SAFE_DELETE(_schema); if (_rowset_writer != nullptr) { _rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string()); @@ -53,26 +58,26 @@ DeltaWriter::~DeltaWriter() { void DeltaWriter::_garbage_collection() { OLAPStatus rollback_status = OLAP_SUCCESS; if (_tablet != nullptr) { - rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id, + rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid()); } // has to check rollback status, because the rowset maybe committed in this thread and // published in another thread, then rollback will failed // when rollback failed should not delete rowset if (rollback_status == OLAP_SUCCESS) { - StorageEngine::instance()->add_unused_rowset(_cur_rowset); + _storage_engine->add_unused_rowset(_cur_rowset); } if (_new_tablet != nullptr) { - rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id, + rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id, _new_tablet->tablet_id(), _new_tablet->schema_hash(), _new_tablet->tablet_uid()); if (rollback_status == OLAP_SUCCESS) { - StorageEngine::instance()->add_unused_rowset(_new_rowset); + _storage_engine->add_unused_rowset(_new_rowset); } } } OLAPStatus DeltaWriter::init() { - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash); + _tablet = _storage_engine->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash); if (_tablet == nullptr) { LOG(WARNING) << "tablet_id: " << _req.tablet_id << ", " << "schema_hash: " << _req.schema_hash << " not found"; @@ -85,7 +90,7 @@ OLAPStatus DeltaWriter::init() { return OLAP_ERR_RWLOCK_ERROR; } MutexLock push_lock(_tablet->get_push_lock()); - RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn( + RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn( _req.partition_id, _req.txn_id, _req.tablet_id, _req.schema_hash, _tablet->tablet_uid(), _req.load_id)); if (_req.need_gen_rollup) { @@ -98,7 +103,7 @@ OLAPStatus DeltaWriter::init() { << "new_tablet_id: " << new_tablet_id << ", " << "new_schema_hash: " << new_schema_hash << ", " << "transaction_id: " << _req.txn_id; - _new_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash); + _new_tablet = _storage_engine->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash); if (_new_tablet == nullptr) { LOG(WARNING) << "find alter task, but could not find new tablet tablet_id: " << new_tablet_id << ", schema_hash: " << new_schema_hash; @@ -108,7 +113,7 @@ OLAPStatus DeltaWriter::init() { if (!new_migration_rlock.own_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - StorageEngine::instance()->txn_manager()->prepare_txn( + _storage_engine->txn_manager()->prepare_txn( _req.partition_id, _req.txn_id, new_tablet_id, new_schema_hash, _new_tablet->tablet_uid(), _req.load_id); } @@ -116,7 +121,7 @@ OLAPStatus DeltaWriter::init() { } RowsetWriterContext writer_context; - writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); + writer_context.rowset_id = _storage_engine->next_rowset_id(); writer_context.tablet_uid = _tablet->tablet_uid(); writer_context.tablet_id = _req.tablet_id; writer_context.partition_id = _req.partition_id; @@ -132,48 +137,58 @@ OLAPStatus DeltaWriter::init() { _tablet_schema = &(_tablet->tablet_schema()); _schema = new Schema(*_tablet_schema); - _mem_table = new MemTable(_schema, _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type()); + _mem_table = std::make_shared(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()); + + // create flush handler + RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(_tablet->data_dir()->path_hash(), &_flush_handler)); + _is_init = true; return OLAP_SUCCESS; } OLAPStatus DeltaWriter::write(Tuple* tuple) { if (!_is_init) { - auto st = init(); - if (st != OLAP_SUCCESS) { - return st; - } + RETURN_NOT_OK(init()); } _mem_table->insert(tuple); - if (_mem_table->memory_usage() >= config::write_buffer_size) { - RETURN_NOT_OK(_mem_table->flush(_rowset_writer.get())); - SAFE_DELETE(_mem_table); - _mem_table = new MemTable(_schema, _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type()); + // if memtable is full, push it to the flush executor, + // and create a new memtable for incoming data + if (_mem_table->memory_usage() >= config::write_buffer_size) { + RETURN_NOT_OK(_flush_memtable_async()); + // create a new memtable for new incoming data + _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get())); } return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* tablet_vec) { - if (!_is_init) { - auto st = init(); - if (st != OLAP_SUCCESS) { - return st; - } - } - RETURN_NOT_OK(_mem_table->close(_rowset_writer.get())); +OLAPStatus DeltaWriter::_flush_memtable_async() { + return _flush_handler->submit(_mem_table); +} + +OLAPStatus DeltaWriter::close() { + if (!_is_init) { + RETURN_NOT_OK(init()); + } + + RETURN_NOT_OK(_flush_memtable_async()); + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { + // return error if previous flush failed + RETURN_NOT_OK(_flush_handler->wait()); - OLAPStatus res = OLAP_SUCCESS; // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); if (_cur_rowset == nullptr) { LOG(WARNING) << "fail to build rowset"; return OLAP_ERR_MALLOC_ERROR; } - res = StorageEngine::instance()->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(), + OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(), _req.partition_id, _req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid(), _req.load_id, _cur_rowset, false); if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { @@ -194,7 +209,7 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t return res; } - res = StorageEngine::instance()->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(), + res = _storage_engine->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(), _req.partition_id, _req.txn_id, _new_tablet->tablet_id(), _new_tablet->schema_hash(), _new_tablet->tablet_uid(), _req.load_id, _new_rowset, false); @@ -218,6 +233,10 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t #endif _delta_written_success = true; + + const FlushStatistic& stat = _flush_handler->get_stats(); + LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() + << ", stats: " << stat; return OLAP_SUCCESS; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index f9d1587f5e..073f5e5f1e 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -18,19 +18,21 @@ #ifndef DORIS_BE_SRC_DELTA_WRITER_H #define DORIS_BE_SRC_DELTA_WRITER_H -#include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/schema_change.h" #include "runtime/descriptors.h" #include "runtime/tuple.h" #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" +#include "util/blocking_queue.hpp" namespace doris { -class SegmentGroup; +class FlushHandler; class MemTable; class Schema; +class SegmentGroup; +class StorageEngine; enum WriteType { LOAD = 1, @@ -54,17 +56,27 @@ struct WriteRequest { class DeltaWriter { public: static OLAPStatus open(WriteRequest* req, DeltaWriter** writer); + + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine); + OLAPStatus init(); - DeltaWriter(WriteRequest* req); + ~DeltaWriter(); + OLAPStatus write(Tuple* tuple); - OLAPStatus close(google::protobuf::RepeatedPtrField* tablet_vec); + // flush the last memtable to flush queue, must call it before close_wait() + OLAPStatus close(); + // wait for all memtables being flushed + OLAPStatus close_wait(google::protobuf::RepeatedPtrField* tablet_vec); OLAPStatus cancel(); int64_t partition_id() const { return _req.partition_id; } private: + // push a full memtable to flush executor + OLAPStatus _flush_memtable_async(); + void _garbage_collection(); private: @@ -75,10 +87,13 @@ private: RowsetSharedPtr _new_rowset; TabletSharedPtr _new_tablet; std::unique_ptr _rowset_writer; - MemTable* _mem_table; + std::shared_ptr _mem_table; Schema* _schema; const TabletSchema* _tablet_schema; bool _delta_written_success; + + StorageEngine* _storage_engine; + std::shared_ptr _flush_handler; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index a0054a89de..5a1b240ae2 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -26,15 +26,17 @@ namespace doris { -MemTable::MemTable(Schema* schema, const TabletSchema* tablet_schema, +MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type) - : _schema(schema), + KeysType keys_type, RowsetWriter* rowset_writer) + : _tablet_id(tablet_id), + _schema(schema), _tablet_schema(tablet_schema), _tuple_desc(tuple_desc), _slot_descs(slot_descs), _keys_type(keys_type), - _row_comparator(_schema) { + _row_comparator(_schema), + _rowset_writer(rowset_writer) { _schema_size = _schema->schema_size(); _tuple_buf = _arena.Allocate(_schema_size); _skip_list = new Table(_row_comparator, &_arena); @@ -59,6 +61,7 @@ size_t MemTable::memory_usage() { void MemTable::insert(Tuple* tuple) { ContiguousRow row(_schema, _tuple_buf); + for (size_t i = 0; i < _slot_descs->size(); ++i) { auto cell = row.cell(i); const SlotDescriptor* slot = (*_slot_descs)[i]; @@ -75,7 +78,7 @@ void MemTable::insert(Tuple* tuple) { } } -OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) { +OLAPStatus MemTable::flush() { int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); @@ -84,17 +87,17 @@ OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) { char* row = (char*)it.key(); ContiguousRow dst_row(_schema, row); agg_finalize_row(&dst_row, _skip_list->arena()); - RETURN_NOT_OK(rowset_writer->add_row(dst_row)); + RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); } - RETURN_NOT_OK(rowset_writer->flush()); + RETURN_NOT_OK(_rowset_writer->flush()); } DorisMetrics::memtable_flush_total.increment(1); DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000); return OLAP_SUCCESS; } -OLAPStatus MemTable::close(RowsetWriter* rowset_writer) { - return flush(rowset_writer); +OLAPStatus MemTable::close() { + return flush(); } } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 1a262ce3dd..80be744cf7 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -18,8 +18,6 @@ #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H #define DORIS_BE_SRC_OLAP_MEMTABLE_H -#include - #include "olap/schema.h" #include "olap/skiplist.h" #include "runtime/tuple.h" @@ -28,18 +26,22 @@ namespace doris { class RowCursor; +class RowsetWriter; class MemTable { public: - MemTable(Schema* schema, const TabletSchema* tablet_schema, + MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - KeysType keys_type); + KeysType keys_type, RowsetWriter* rowset_writer); ~MemTable(); + int64_t tablet_id() { return _tablet_id; } size_t memory_usage(); void insert(Tuple* tuple); - OLAPStatus flush(RowsetWriter* rowset_writer); - OLAPStatus close(RowsetWriter* rowset_writer); + OLAPStatus flush(); + OLAPStatus close(); + private: + int64_t _tablet_id; Schema* _schema; const TabletSchema* _tablet_schema; TupleDescriptor* _tuple_desc; @@ -60,6 +62,9 @@ private: char* _tuple_buf; size_t _schema_size; Table* _skip_list; + + RowsetWriter* _rowset_writer; + }; // class MemTable } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp new file mode 100644 index 0000000000..a2a24485a7 --- /dev/null +++ b/be/src/olap/memtable_flush_executor.cpp @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_flush_executor.h" + +#include "olap/data_dir.h" +#include "olap/delta_writer.h" +#include "olap/memtable.h" +#include "runtime/exec_env.h" + +namespace doris { + +OLAPStatus FlushHandler::submit(std::shared_ptr memtable) { + RETURN_NOT_OK(_last_flush_status.load()); + MemTableFlushContext ctx; + ctx.memtable = memtable; + ctx.flush_handler = this->shared_from_this(); + _counter_cond.inc(); + _flush_executor->_push_memtable(_flush_queue_idx, ctx); + return OLAP_SUCCESS; +} + +OLAPStatus FlushHandler::wait() { + // wait util encoutering error, or all submitted memtables are finished + RETURN_NOT_OK(_last_flush_status.load()); + _counter_cond.block_wait(); + return _last_flush_status.load(); +} + +void FlushHandler::on_flush_finished(const FlushResult& res) { + if (res.flush_status != OLAP_SUCCESS) { + _last_flush_status.store(res.flush_status); + // if one failed, all other memtables no need to flush + _counter_cond.dec_to_zero(); + } else { + _stats.flush_time_ns.fetch_add(res.flush_time_ns); + _stats.flush_count.fetch_add(1); + _counter_cond.dec(); + } +} + +OLAPStatus MemTableFlushExecutor::create_flush_handler(int64_t path_hash, std::shared_ptr* flush_handler) { + int32_t flush_queue_idx = _get_queue_idx(path_hash); + flush_handler->reset(new FlushHandler(flush_queue_idx, this)); + return OLAP_SUCCESS; +} + +void MemTableFlushExecutor::init(const std::vector& data_dirs) { + int32_t data_dir_num = data_dirs.size(); + _thread_num_per_store = std::max(1, config::flush_thread_num_per_store); + _num_threads = data_dir_num * _thread_num_per_store; + + // create flush queues + for (int i = 0; i < _num_threads; ++i) { + BlockingQueue* queue = new BlockingQueue(10); + _flush_queues.push_back(queue); + } + // create thread pool + _flush_pool = new ThreadPool(_num_threads, 1); + for (int32_t i = 0; i < _num_threads; ++i) { + _flush_pool->offer(boost::bind(&MemTableFlushExecutor::_flush_memtable, this, i)); + } + + // _path_map saves the path hash to current idx of flush queue. + // eg. + // there are 4 data stores, each store has 2 work thread. + // so there are 8(= 4 * 2) queues in _flush_queues. + // and the path hash of the 4 paths are mapped to idx 0, 2, 4, 6. + int32_t group = 0; + for (auto store : data_dirs) { + _path_map[store->path_hash()] = group; + group += _thread_num_per_store; + } +} + +MemTableFlushExecutor::~MemTableFlushExecutor() { + // shutdown queues + for (auto queue : _flush_queues) { + queue->shutdown(); + } + + // shutdown thread pool + _flush_pool->shutdown(); + _flush_pool->join(); + + // delete queue + for (auto queue : _flush_queues) { + delete queue; + } + _flush_queues.clear(); + + delete _flush_pool; +} + +int32_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) { + std::lock_guard l(_lock); + int32_t cur_idx = _path_map[path_hash]; + int32_t group = cur_idx / _thread_num_per_store; + int32_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store); + DCHECK(next_idx < _num_threads); + _path_map[path_hash] = next_idx; + return cur_idx; +} + +void MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushContext& ctx) { + _flush_queues[queue_idx]->blocking_put(ctx); +} + +void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { + while(true) { + MemTableFlushContext ctx; + if (!_flush_queues[queue_idx]->blocking_get(&ctx)) { + // queue is empty and shutdown, end of thread + return; + } + + // if last flush of this tablet already failed, just skip + if (ctx.flush_handler->is_cancelled()) { + ctx.flush_handler->on_flush_cancelled(); + continue; + } + + // flush the memtable + FlushResult res; + MonotonicStopWatch timer; + timer.start(); + res.flush_status = ctx.memtable->flush(); + res.flush_time_ns = timer.elapsed_time(); + // callback + ctx.flush_handler->on_flush_finished(res); + } +} + +std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { + os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 + << ", flush count=" << stat.flush_count << ")"; + return os; +} + +} // end of namespac diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h new file mode 100644 index 0000000000..dc5b5d24c0 --- /dev/null +++ b/be/src/olap/memtable_flush_executor.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "olap/olap_define.h" +#include "util/blocking_queue.hpp" +#include "util/counter_cond_variable.hpp" +#include "util/spinlock.h" +#include "util/thread_pool.hpp" + +namespace doris { + +class DataDir; +class DeltaWriter; +class ExecEnv; +class MemTable; + +// The context for a memtable to be flushed. +class FlushHandler; +struct MemTableFlushContext { + // memtable to be flushed + std::shared_ptr memtable; + // flush handler from a delta writer. + // use shared ptr because flush_handler may be deleted before this + // memtable being flushed. so we need to make sure the flush_handler + // is alive until this memtable being flushed. + std::shared_ptr flush_handler; +}; + +// the flush result of a single memtable flush +struct FlushResult { + OLAPStatus flush_status; + int64_t flush_time_ns; +}; + +// the statistic of a certain flush handler. +// use atomic because it may be updated by multi threads +struct FlushStatistic { + std::atomic_int64_t flush_time_ns = {0}; + std::atomic_int64_t flush_count= {0}; +}; +std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); + +// flush handler is for flushing memtables in a delta writer +// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception +// when calling submit(); +class MemTableFlushExecutor; +class FlushHandler : public std::enable_shared_from_this { +public: + FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor): + _flush_queue_idx(flush_queue_idx), + _last_flush_status(OLAP_SUCCESS), + _counter_cond(0), + _flush_executor(flush_executor) { + } + + // submit a memtable to flush. return error if some previous submitted MemTable has failed + OLAPStatus submit(std::shared_ptr memtable); + // wait for all submitted memtable finished. + OLAPStatus wait(); + // get flush operations' statistics + const FlushStatistic& get_stats() const { return _stats; } + // called when a memtable is finished by executor. + void on_flush_finished(const FlushResult& res); + // called when a flush memtable execution is cancelled + void on_flush_cancelled() { + // for now, if one memtable cancelled, no more memtables will be flushed, so dec to zero. + _counter_cond.dec_to_zero(); + } + + bool is_cancelled() { return _last_flush_status.load() != OLAP_SUCCESS; } +private: + // flush queue idx in memtable flush executor + int32_t _flush_queue_idx; + // the flush status of last memtable + std::atomic _last_flush_status; + // used to wait/notify the memtable flush execution + CounterCondVariable _counter_cond; + + FlushStatistic _stats; + MemTableFlushExecutor* _flush_executor; +}; + +// MemTableFlushExecutor is for flushing memtables to disk. +// Each data directory has a specified number of worker threads and a corresponding number of flush queues. +// Each worker thread only takes memtable from the corresponding flush queue and writes it to disk. +// User SHOULD NOT call method of this class directly, use pattern should be: +// +// ... +// std::shared_ptr flush_handler; +// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler); +// ... +// flush_handler->submit(memtable) +// ... +class MemTableFlushExecutor { +public: + MemTableFlushExecutor() {} + // init should be called after storage engine is opened, + // because it needs path hash of each data dir. + void init(const std::vector& data_dirs); + + ~MemTableFlushExecutor(); + + // create a flush handler to access the flush executor + OLAPStatus create_flush_handler(int64_t path_hash, std::shared_ptr* flush_handler); + +private: + // given the path hash, return the next idx of flush queue. + // eg. + // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called, + // 0 and 1 will returned alternately. + int32_t _get_queue_idx(size_t path_hash); + + // push the memtable to specified flush queue + void _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx); + + void _flush_memtable(int32_t queue_idx); + +private: + friend class FlushHandler; + + int32_t _thread_num_per_store; + int32_t _num_threads; + ThreadPool* _flush_pool; + // the size of this vector should equals to _num_threads + std::vector*> _flush_queues; + // lock to protect path_map + SpinLock _lock; + // path hash -> queue idx of _flush_queues; + std::unordered_map _path_map; +}; + +} // end namespace diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 11ed82d604..1f670d01cd 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -36,6 +36,7 @@ #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/lru_cache.h" +#include "olap/memtable_flush_executor.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/push_handler.h" @@ -222,6 +223,9 @@ OLAPStatus StorageEngine::open() { // 取消未完成的SchemaChange任务 _tablet_manager->cancel_unfinished_schema_change(); + _memtable_flush_executor = new MemTableFlushExecutor(); + _memtable_flush_executor->init(dirs); + return OLAP_SUCCESS; } @@ -492,6 +496,9 @@ OLAPStatus StorageEngine::clear() { store_pair.second = nullptr; } _store_map.clear(); + + delete _memtable_flush_executor; + return OLAP_SUCCESS; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index cfbd995b24..7f96fa4bb0 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -51,9 +51,10 @@ namespace doris { -class Tablet; class DataDir; class EngineTask; +class MemTableFlushExecutor; +class Tablet; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. @@ -203,6 +204,8 @@ public: void release_rowset_id(const RowsetId& rowset_id) { return _rowset_id_generator->release_id(rowset_id); }; + MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor; } + private: OLAPStatus _check_file_descriptor_number(); @@ -345,6 +348,8 @@ private: std::unique_ptr _rowset_id_generator; + MemTableFlushExecutor* _memtable_flush_executor; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index e46afeef3e..ae89fca119 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -24,87 +24,88 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(RUNTIME_FILES - broker_mgr.cpp - buffered_block_mgr.cpp - buffered_tuple_stream.cpp - buffered_tuple_stream_ir.cpp - buffer_control_block.cpp - merge_sorter.cpp - client_cache.cpp - data_stream_mgr.cpp - data_stream_sender.cpp - datetime_value.cpp - descriptors.cpp - exec_env.cpp - exec_env_init.cpp - user_function_cache.cpp - mem_pool.cpp - plan_fragment_executor.cpp - primitive_type.cpp - pull_load_task_mgr.cpp - raw_value.cpp - raw_value_ir.cpp - result_sink.cpp - result_writer.cpp - result_buffer_mgr.cpp - row_batch.cpp - runtime_state.cpp - string_value.cpp - thread_resource_mgr.cpp - # timestamp_value.cpp - decimal_value.cpp - decimalv2_value.cpp - large_int_value.cpp - tuple.cpp - tuple_row.cpp - vectorized_row_batch.cpp - dpp_writer.cpp - qsorter.cpp - fragment_mgr.cpp - dpp_sink_internal.cpp - data_spliter.cpp - dpp_sink.cpp - etl_job_mgr.cpp - load_path_mgr.cpp - types.cpp - tmp_file_mgr.cc - disk_io_mgr.cc - disk_io_mgr_reader_context.cc - disk_io_mgr_scan_range.cc - buffered_block_mgr2.cc - test_env.cc - mem_tracker.cpp - spill_sorter.cc - sorted_run_merger.cc - data_stream_recvr.cc - buffered_tuple_stream2.cc - buffered_tuple_stream2_ir.cc - buffered_tuple_stream3.cc - # export_task_mgr.cpp - export_sink.cpp - tablet_writer_mgr.cpp - bufferpool/buffer_allocator.cc - bufferpool/buffer_pool.cc - bufferpool/reservation_tracker.cc - bufferpool/reservation_util.cc - bufferpool/suballocator.cc - bufferpool/system_allocator.cc - initial_reservations.cc - snapshot_loader.cpp - query_statistics.cpp - message_body_sink.cpp - stream_load/stream_load_context.cpp - stream_load/stream_load_executor.cpp - routine_load/data_consumer.cpp - routine_load/data_consumer_group.cpp - routine_load/data_consumer_pool.cpp - routine_load/routine_load_task_executor.cpp - small_file_mgr.cpp - result_queue_mgr.cpp - memory_scratch_sink.cpp - external_scan_context_mgr.cpp - memory/system_allocator.cpp - memory/chunk_allocator.cpp + broker_mgr.cpp + buffered_block_mgr.cpp + buffered_tuple_stream.cpp + buffered_tuple_stream_ir.cpp + buffer_control_block.cpp + merge_sorter.cpp + client_cache.cpp + data_stream_mgr.cpp + data_stream_sender.cpp + datetime_value.cpp + descriptors.cpp + exec_env.cpp + exec_env_init.cpp + user_function_cache.cpp + mem_pool.cpp + plan_fragment_executor.cpp + primitive_type.cpp + pull_load_task_mgr.cpp + raw_value.cpp + raw_value_ir.cpp + result_sink.cpp + result_writer.cpp + result_buffer_mgr.cpp + row_batch.cpp + runtime_state.cpp + string_value.cpp + thread_resource_mgr.cpp + # timestamp_value.cpp + decimal_value.cpp + decimalv2_value.cpp + large_int_value.cpp + tuple.cpp + tuple_row.cpp + vectorized_row_batch.cpp + dpp_writer.cpp + qsorter.cpp + fragment_mgr.cpp + dpp_sink_internal.cpp + data_spliter.cpp + dpp_sink.cpp + etl_job_mgr.cpp + load_path_mgr.cpp + types.cpp + tmp_file_mgr.cc + disk_io_mgr.cc + disk_io_mgr_reader_context.cc + disk_io_mgr_scan_range.cc + buffered_block_mgr2.cc + test_env.cc + mem_tracker.cpp + spill_sorter.cc + sorted_run_merger.cc + data_stream_recvr.cc + buffered_tuple_stream2.cc + buffered_tuple_stream2_ir.cc + buffered_tuple_stream3.cc + # export_task_mgr.cpp + export_sink.cpp + tablet_writer_mgr.cpp + tablets_channel.cpp + bufferpool/buffer_allocator.cc + bufferpool/buffer_pool.cc + bufferpool/reservation_tracker.cc + bufferpool/reservation_util.cc + bufferpool/suballocator.cc + bufferpool/system_allocator.cc + initial_reservations.cc + snapshot_loader.cpp + query_statistics.cpp + message_body_sink.cpp + stream_load/stream_load_context.cpp + stream_load/stream_load_executor.cpp + routine_load/data_consumer.cpp + routine_load/data_consumer_group.cpp + routine_load/data_consumer_pool.cpp + routine_load/routine_load_task_executor.cpp + small_file_mgr.cpp + result_queue_mgr.cpp + memory_scratch_sink.cpp + external_scan_context_mgr.cpp + memory/system_allocator.cpp + memory/chunk_allocator.cpp ) if (WITH_MYSQL) diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 0d30dbbab6..20593ce192 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -24,232 +24,17 @@ #include "common/object_pool.h" #include "exec/tablet_info.h" #include "runtime/descriptors.h" +#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "service/backend_options.h" -#include "util/bitmap.h" #include "util/stopwatch.hpp" #include "olap/delta_writer.h" #include "olap/lru_cache.h" namespace doris { -// channel that process all data for this load -class TabletsChannel { -public: - TabletsChannel(const TabletsChannelKey& key) : _key(key), _closed_senders(64) { } - ~TabletsChannel(); - - Status open(const PTabletWriterOpenRequest& params); - - Status add_batch(const PTabletWriterAddBatchRequest& batch); - - Status close(int sender_id, bool* finished, - const google::protobuf::RepeatedField& partition_ids, - google::protobuf::RepeatedPtrField* tablet_vec); - - time_t last_updated_time() { - return _last_updated_time; - } - -private: - // open all writer - Status _open_all_writers(const PTabletWriterOpenRequest& params); - -private: - // id of this load channel, just for - TabletsChannelKey _key; - - // make execute sequece - std::mutex _lock; - - // initialized in open function - int64_t _txn_id = -1; - int64_t _index_id = -1; - OlapTableSchemaParam* _schema = nullptr; - TupleDescriptor* _tuple_desc = nullptr; - // row_desc used to construct - RowDescriptor* _row_desc = nullptr; - bool _opened = false; - - // next sequence we expect - int _num_remaining_senders = 0; - std::vector _next_seqs; - Bitmap _closed_senders; - Status _close_status; - - // tablet_id -> TabletChannel - std::unordered_map _tablet_writers; - - std::unordered_set _partition_ids; - - // TODO(zc): to add this tracker to somewhere - MemTracker _mem_tracker; - - //use to erase timeout TabletsChannel in _tablets_channels - time_t _last_updated_time; -}; - -TabletsChannel::~TabletsChannel() { - for (auto& it : _tablet_writers) { - delete it.second; - } - delete _row_desc; - delete _schema; -} - -Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { - std::lock_guard l(_lock); - if (_opened) { - // Normal case, already open by other sender - return Status::OK(); - } - LOG(INFO) << "open tablets channel: " << _key; - _txn_id = params.txn_id(); - _index_id = params.index_id(); - _schema = new OlapTableSchemaParam(); - RETURN_IF_ERROR(_schema->init(params.schema())); - _tuple_desc = _schema->tuple_desc(); - _row_desc = new RowDescriptor(_tuple_desc, false); - - _num_remaining_senders = params.num_senders(); - _next_seqs.resize(_num_remaining_senders, 0); - _closed_senders.Reset(_num_remaining_senders); - - RETURN_IF_ERROR(_open_all_writers(params)); - - _opened = true; - _last_updated_time = time(nullptr); - return Status::OK(); -} - -Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { - DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); - std::lock_guard l(_lock); - DCHECK(_opened); - auto next_seq = _next_seqs[params.sender_id()]; - // check packet - if (params.packet_seq() < next_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::OK(); - } else if (params.packet_seq() > next_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::InternalError("lost data packet"); - } - - RowBatch row_batch(*_row_desc, params.row_batch(), &_mem_tracker); - - // iterator all data - for (int i = 0; i < params.tablet_ids_size(); ++i) { - auto tablet_id = params.tablet_ids(i); - auto it = _tablet_writers.find(tablet_id); - if (it == std::end(_tablet_writers)) { - std::stringstream ss; - ss << "unknown tablet to append data, tablet=" << tablet_id; - return Status::InternalError(ss.str()); - } - auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "tablet writer write failed, tablet_id=" << it->first - << ", transaction_id=" << _txn_id << ", err=" << st; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - } - _next_seqs[params.sender_id()]++; - _last_updated_time = time(nullptr); - return Status::OK(); -} - -Status TabletsChannel::close(int sender_id, bool* finished, - const google::protobuf::RepeatedField& partition_ids, - google::protobuf::RepeatedPtrField* tablet_vec) { - std::lock_guard l(_lock); - if (_closed_senders.Get(sender_id)) { - // Double close from one sender, just return OK - *finished = (_num_remaining_senders == 0); - return _close_status; - } - LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id; - for (auto pid : partition_ids) { - _partition_ids.emplace(pid); - } - _closed_senders.Set(sender_id, true); - _num_remaining_senders--; - *finished = (_num_remaining_senders == 0); - if (*finished) { - // All senders are closed - for (auto& it : _tablet_writers) { - if (_partition_ids.count(it.second->partition_id()) > 0) { - auto st = it.second->close(tablet_vec); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "close tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id << ", err=" << st; - LOG(WARNING) << ss.str(); - _close_status = Status::InternalError(ss.str()); - return _close_status; - } - } else { - auto st = it.second->cancel(); - if (st != OLAP_SUCCESS) { - LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << it.first - << ", transaction_id=" << _txn_id; - } - } - } - } - return Status::OK(); -} - -Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) { - std::vector* index_slots = nullptr; - int32_t schema_hash = 0; - for (auto& index : _schema->indexes()) { - if (index->index_id == _index_id) { - index_slots = &index->slots; - schema_hash = index->schema_hash; - break; - } - } - if (index_slots == nullptr) { - std::stringstream ss; - ss << "unknown index id, key=" << _key; - return Status::InternalError(ss.str()); - } - for (auto& tablet : params.tablets()) { - WriteRequest request; - request.tablet_id = tablet.tablet_id(); - request.schema_hash = schema_hash; - request.write_type = LOAD; - request.txn_id = _txn_id; - request.partition_id = tablet.partition_id(); - request.load_id = params.id(); - request.need_gen_rollup = params.need_gen_rollup(); - request.tuple_desc = _tuple_desc; - request.slots = index_slots; - - DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&request, &writer); - if (st != OLAP_SUCCESS) { - std::stringstream ss; - ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() - << ", txn_id=" << _txn_id - << ", partition_id=" << tablet.partition_id() - << ", err=" << st; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - _tablet_writers.emplace(tablet.tablet_id(), writer); - } - DCHECK(_tablet_writers.size() == params.tablets_size()); - return Status::OK(); -} - TabletWriterMgr::TabletWriterMgr(ExecEnv* exec_env) :_exec_env(exec_env) { _tablets_channels.init(2011); _lastest_success_channel = new_lru_cache(1024); @@ -381,15 +166,4 @@ Status TabletWriterMgr::_start_tablets_channel_clean() { return Status::OK(); } -std::string TabletsChannelKey::to_string() const { - std::stringstream ss; - ss << *this; - return ss.str(); -} - -std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { - os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; - return os; -} - } diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/tablet_writer_mgr.h index f5714dade4..8a7f1e192e 100644 --- a/be/src/runtime/tablet_writer_mgr.h +++ b/be/src/runtime/tablet_writer_mgr.h @@ -29,6 +29,7 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/internal_service.pb.h" +#include "runtime/tablets_channel.h" #include "util/hash_util.hpp" #include "util/uid_util.h" @@ -37,29 +38,6 @@ namespace doris { class ExecEnv; -class TabletsChannel; - -struct TabletsChannelKey { - UniqueId id; - int64_t index_id; - - TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) - : id(pid), index_id(index_id_) { } - ~TabletsChannelKey() noexcept { } - - bool operator==(const TabletsChannelKey& rhs) const noexcept { - return index_id == rhs.index_id && id == rhs.id; - } - - std::string to_string() const; -}; - -struct TabletsChannelKeyHasher { - std::size_t operator()(const TabletsChannelKey& key) const { - size_t seed = key.id.hash(); - return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed); - } -}; class Cache; @@ -104,6 +82,4 @@ private: Status _start_tablets_channel_clean(); }; -std::ostream& operator<<(std::ostream& os, const TabletsChannelKey&); - } diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp new file mode 100644 index 0000000000..b52f1aa028 --- /dev/null +++ b/be/src/runtime/tablets_channel.cpp @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/tablets_channel.h" + +#include "exec/tablet_info.h" +#include "olap/delta_writer.h" +#include "olap/memtable.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" + +namespace doris { + +TabletsChannel::TabletsChannel(const TabletsChannelKey& key): + _key(key), _closed_senders(64) { + // _last_updated_time should be set before being inserted to + // _tablet_channels in tablet_channel_mgr, or it may be erased + // immediately by gc thread. + _last_updated_time = time(nullptr); +} + +TabletsChannel::~TabletsChannel() { + for (auto& it : _tablet_writers) { + delete it.second; + } + delete _row_desc; + delete _schema; +} + +Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { + std::lock_guard l(_lock); + if (_opened) { + // Normal case, already open by other sender + return Status::OK(); + } + LOG(INFO) << "open tablets channel: " << _key; + _txn_id = params.txn_id(); + _index_id = params.index_id(); + _schema = new OlapTableSchemaParam(); + RETURN_IF_ERROR(_schema->init(params.schema())); + _tuple_desc = _schema->tuple_desc(); + _row_desc = new RowDescriptor(_tuple_desc, false); + + _num_remaining_senders = params.num_senders(); + _next_seqs.resize(_num_remaining_senders, 0); + _closed_senders.Reset(_num_remaining_senders); + + RETURN_IF_ERROR(_open_all_writers(params)); + + _opened = true; + _last_updated_time = time(nullptr); + return Status::OK(); +} + +Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { + DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); + std::lock_guard l(_lock); + DCHECK(_opened); + auto next_seq = _next_seqs[params.sender_id()]; + // check packet + if (params.packet_seq() < next_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq + << ", recept_seq=" << params.packet_seq(); + return Status::OK(); + } else if (params.packet_seq() > next_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << next_seq + << ", recept_seq=" << params.packet_seq(); + return Status::InternalError("lost data packet"); + } + + RowBatch row_batch(*_row_desc, params.row_batch(), &_mem_tracker); + + // iterator all data + for (int i = 0; i < params.tablet_ids_size(); ++i) { + auto tablet_id = params.tablet_ids(i); + auto it = _tablet_writers.find(tablet_id); + if (it == std::end(_tablet_writers)) { + std::stringstream ss; + ss << "unknown tablet to append data, tablet=" << tablet_id; + return Status::InternalError(ss.str()); + } + auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); + if (st != OLAP_SUCCESS) { + std::stringstream ss; + ss << "tablet writer write failed, tablet_id=" << it->first + << ", transaction_id=" << _txn_id << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + } + _next_seqs[params.sender_id()]++; + _last_updated_time = time(nullptr); + return Status::OK(); +} + +Status TabletsChannel::close(int sender_id, bool* finished, + const google::protobuf::RepeatedField& partition_ids, + google::protobuf::RepeatedPtrField* tablet_vec) { + std::lock_guard l(_lock); + if (_closed_senders.Get(sender_id)) { + // Double close from one sender, just return OK + *finished = (_num_remaining_senders == 0); + return _close_status; + } + LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id; + for (auto pid : partition_ids) { + _partition_ids.emplace(pid); + } + _closed_senders.Set(sender_id, true); + _num_remaining_senders--; + *finished = (_num_remaining_senders == 0); + if (*finished) { + // All senders are closed + // 1. close all delta writers + std::vector need_wait_writers; + for (auto& it : _tablet_writers) { + if (_partition_ids.count(it.second->partition_id()) > 0) { + auto st = it.second->close(); + if (st != OLAP_SUCCESS) { + LOG(WARNING) << "close tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id << ", err=" << st; + // just skip this tablet(writer) and continue to close others + continue; + } + need_wait_writers.push_back(it.second); + } else { + auto st = it.second->cancel(); + if (st != OLAP_SUCCESS) { + LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << it.first + << ", transaction_id=" << _txn_id; + // just skip this tablet(writer) and continue to close others + continue; + } + } + } + + // 2. wait delta writers and build the tablet vector + for (auto writer : need_wait_writers) { + // close may return failed, but no need to handle it here. + // tablet_vec will only contains success tablet, and then let FE judge it. + writer->close_wait(tablet_vec); + } + } + return Status::OK(); +} + +Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) { + std::vector* index_slots = nullptr; + int32_t schema_hash = 0; + for (auto& index : _schema->indexes()) { + if (index->index_id == _index_id) { + index_slots = &index->slots; + schema_hash = index->schema_hash; + break; + } + } + if (index_slots == nullptr) { + std::stringstream ss; + ss << "unknown index id, key=" << _key; + return Status::InternalError(ss.str()); + } + for (auto& tablet : params.tablets()) { + WriteRequest request; + request.tablet_id = tablet.tablet_id(); + request.schema_hash = schema_hash; + request.write_type = LOAD; + request.txn_id = _txn_id; + request.partition_id = tablet.partition_id(); + request.load_id = params.id(); + request.need_gen_rollup = params.need_gen_rollup(); + request.tuple_desc = _tuple_desc; + request.slots = index_slots; + + DeltaWriter* writer = nullptr; + auto st = DeltaWriter::open(&request, &writer); + if (st != OLAP_SUCCESS) { + std::stringstream ss; + ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() + << ", txn_id=" << _txn_id + << ", partition_id=" << tablet.partition_id() + << ", err=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + _tablet_writers.emplace(tablet.tablet_id(), writer); + } + DCHECK(_tablet_writers.size() == params.tablets_size()); + return Status::OK(); +} + +std::string TabletsChannelKey::to_string() const { + std::stringstream ss; + ss << *this; + return ss.str(); +} + +std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { + os << "(id=" << key.id << ",index_id=" << key.index_id << ")"; + return os; +} + +} diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h new file mode 100644 index 0000000000..b893af870b --- /dev/null +++ b/be/src/runtime/tablets_channel.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "runtime/descriptors.h" +#include "runtime/mem_tracker.h" +#include "util/bitmap.h" +#include "util/thread_pool.hpp" +#include "util/uid_util.h" + +#include "gen_cpp/Types_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/internal_service.pb.h" + +namespace doris { + +struct TabletsChannelKey { + UniqueId id; + int64_t index_id; + + TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) + : id(pid), index_id(index_id_) { } + + ~TabletsChannelKey() noexcept { } + + bool operator==(const TabletsChannelKey& rhs) const noexcept { + return index_id == rhs.index_id && id == rhs.id; + } + + std::string to_string() const; +}; + +struct TabletsChannelKeyHasher { + std::size_t operator()(const TabletsChannelKey& key) const { + size_t seed = key.id.hash(); + return doris::HashUtil::hash(&key.index_id, sizeof(key.index_id), seed); + } +}; + +class DeltaWriter; +class OlapTableSchemaParam; + +// channel that process all data for this load +class TabletsChannel { +public: + TabletsChannel(const TabletsChannelKey& key); + + ~TabletsChannel(); + + Status open(const PTabletWriterOpenRequest& params); + + Status add_batch(const PTabletWriterAddBatchRequest& batch); + + Status close(int sender_id, bool* finished, + const google::protobuf::RepeatedField& partition_ids, + google::protobuf::RepeatedPtrField* tablet_vec); + + time_t last_updated_time() { + return _last_updated_time; + } + +private: + // open all writer + Status _open_all_writers(const PTabletWriterOpenRequest& params); + +private: + // id of this load channel + TabletsChannelKey _key; + + // make execute sequece + std::mutex _lock; + + // initialized in open function + int64_t _txn_id = -1; + int64_t _index_id = -1; + OlapTableSchemaParam* _schema = nullptr; + TupleDescriptor* _tuple_desc = nullptr; + // row_desc used to construct + RowDescriptor* _row_desc = nullptr; + bool _opened = false; + + // next sequence we expect + int _num_remaining_senders = 0; + std::vector _next_seqs; + Bitmap _closed_senders; + Status _close_status; + + // tablet_id -> TabletChannel + std::unordered_map _tablet_writers; + + std::unordered_set _partition_ids; + + // TODO(zc): to add this tracker to somewhere + MemTracker _mem_tracker; + + //use to erase timeout TabletsChannel in _tablets_channels + time_t _last_updated_time; +}; + +std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key); + +} // end namespace diff --git a/be/src/util/counter_cond_variable.hpp b/be/src/util/counter_cond_variable.hpp new file mode 100644 index 0000000000..9048037826 --- /dev/null +++ b/be/src/util/counter_cond_variable.hpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +// used for submitter/worker/waiter pattern +// submitter: +// one or more submitters submit tasks and call inc_count() +// worker: +// one or more workers do the task and call dec_count() after finishing the task +// waiter: +// one or more waiter call xxx_wait() to wait until all or at least one tasks are finished. +// Use pattern: +// thread1(submitter): +// CounterCondVariable cond(0); +// ... submit task ... +// cond.inc(); +// ... submit task ... +// cond.inr(); +// +// thread2(worker): +// ... do work... +// cond.dec(); +// ... do work... +// cond.dec(); +// or +// ... failed ... +// cond.dec_to_zero(); +// +// thread3(waiter): +// cond.block_wait(); + +class CounterCondVariable { +public: + explicit CounterCondVariable(int init = 0) : _count(init) { + } + + // increase the counter + void inc(int inc = 1) { + std::unique_lock lock(_lock); + _count += inc; + } + + // decrease the counter, and notify all waiters + void dec(int dec = 1) { + std::unique_lock lock(_lock); + _count -= dec; + _cv.notify_all(); + } + + // decrease the counter to zero + void dec_to_zero() { + std::unique_lock lock(_lock); + _count = 0; + _cv.notify_all(); + } + + // wait until count down to zero + void block_wait() { + std::unique_lock lock(_lock); + if (_count <= 0) { + return; + } + _cv.wait(lock, [this] { return _count <= 0; }); + } + +private: + std::mutex _lock; + std::condition_variable _cv; + int _count; +}; + +} // end namespace diff --git a/be/src/util/semaphore.hpp b/be/src/util/semaphore.hpp index 398c5ae3ed..dac287d604 100644 --- a/be/src/util/semaphore.hpp +++ b/be/src/util/semaphore.hpp @@ -31,13 +31,13 @@ class Semaphore { void signal() { std::unique_lock lock(_mutex); - ++count_; - cv_.notify_one(); + ++_count; + _cv.notify_one(); } void wait() { std::unique_lock lock(_mutex); - cv_.wait(lock, [=] { return _count > 0; }); + _cv.wait(lock, [=] { return _count > 0; }); --_count; } diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 9d671d6f30..3f248e6c30 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -71,3 +71,4 @@ ADD_BE_TEST(key_coder_test) ADD_BE_TEST(short_key_index_test) ADD_BE_TEST(page_cache_test) ADD_BE_TEST(hll_test) +ADD_BE_TEST(memtable_flush_executor_test) diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index ab4e0aba36..9685aeb3cf 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -30,6 +30,7 @@ #include "olap/utils.h" #include "runtime/tuple.h" #include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" #include "util/logging.h" #include "olap/options.h" #include "olap/tablet_meta_manager.h" @@ -56,6 +57,9 @@ void set_up() { doris::EngineOptions options; options.store_paths = paths; doris::StorageEngine::open(options, &k_engine); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); } void tear_down() { @@ -301,7 +305,9 @@ TEST_F(TestDeltaWriter, open) { DeltaWriter* delta_writer = nullptr; DeltaWriter::open(&write_req, &delta_writer); ASSERT_NE(delta_writer, nullptr); - res = delta_writer->close(nullptr); + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr); ASSERT_EQ(OLAP_SUCCESS, res); SAFE_DELETE(delta_writer); @@ -391,7 +397,9 @@ TEST_F(TestDeltaWriter, write) { ASSERT_EQ(OLAP_SUCCESS, res); } - res = delta_writer->close(nullptr); + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr); ASSERT_EQ(OLAP_SUCCESS, res); // publish version success diff --git a/be/test/olap/memtable_flush_executor_test.cpp b/be/test/olap/memtable_flush_executor_test.cpp new file mode 100644 index 0000000000..cfbcab538c --- /dev/null +++ b/be/test/olap/memtable_flush_executor_test.cpp @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_flush_executor.h" + +#include +#include +#include + +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/Types_types.h" +#include "olap/delta_writer.h" +#include "olap/field.h" +#include "olap/memtable.h" +#include "olap/schema.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/utils.h" +#include "runtime/tuple.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "util/logging.h" +#include "olap/options.h" +#include "olap/tablet_meta_manager.h" + +namespace doris { + +StorageEngine* k_engine = nullptr; +MemTableFlushExecutor* k_flush_executor = nullptr; + +void set_up() { + char buffer[1024]; + getcwd(buffer, 1024); + config::storage_root_path = std::string(buffer) + "/flush_test"; + remove_all_dir(config::storage_root_path); + create_dir(config::storage_root_path); + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + doris::StorageEngine::open(options, &k_engine); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(k_engine); + + k_flush_executor = k_engine->memtable_flush_executor(); +} + +void tear_down() { + delete k_engine; + k_engine = nullptr; + system("rm -rf ./flush_test"); + remove_all_dir(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX); +} + +Schema create_schema() { + std::vector col_schemas; + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_SMALLINT, true); + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true); + col_schemas.emplace_back(OLAP_FIELD_AGGREGATION_SUM, OLAP_FIELD_TYPE_BIGINT, true); + Schema schema(col_schemas, 2); + return schema; +} + +class TestMemTableFlushExecutor : public ::testing::Test { +public: + TestMemTableFlushExecutor() { } + ~TestMemTableFlushExecutor() { } + + void SetUp() { + std::cout << "setup" << std::endl; + } + + void TearDown(){ + std::cout << "tear down" << std::endl; + } +}; + +TEST_F(TestMemTableFlushExecutor, create_flush_handler) { + std::vector data_dir = k_engine->get_stores(); + int64_t path_hash = data_dir[0]->path_hash(); + + std::shared_ptr flush_handler; + k_flush_executor->create_flush_handler(path_hash, &flush_handler); + ASSERT_NE(nullptr, flush_handler.get()); + + FlushResult res; + res.flush_status = OLAP_SUCCESS; + res.flush_time_ns = 100; + flush_handler->on_flush_finished(res); + ASSERT_FALSE(flush_handler->is_cancelled()); + ASSERT_EQ(100, flush_handler->get_stats().flush_time_ns); + ASSERT_EQ(1, flush_handler->get_stats().flush_count); + + FlushResult res2; + res2.flush_status = OLAP_ERR_OTHER_ERROR; + flush_handler->on_flush_finished(res2); + ASSERT_TRUE(flush_handler->is_cancelled()); + ASSERT_EQ(100, flush_handler->get_stats().flush_time_ns); + ASSERT_EQ(1, flush_handler->get_stats().flush_count); + + ASSERT_EQ(OLAP_ERR_OTHER_ERROR, flush_handler->wait()); +} + +} // namespace doris + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + doris::init_glog("be-test"); + int ret = doris::OLAP_SUCCESS; + testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + doris::set_up(); + ret = RUN_ALL_TESTS(); + doris::tear_down(); + google::protobuf::ShutdownProtobufLibrary(); + return ret; +} diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index 8ab4202245..8cb542f4cc 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -106,8 +106,10 @@ int main(int argc, char** argv) { fprintf(stderr, "error read config file. \n"); return -1; } + + doris::config::scan_context_gc_interval_min = 1; // doris::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); doris::CpuInfo::init(); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/be/test/runtime/tablet_writer_mgr_test.cpp b/be/test/runtime/tablet_writer_mgr_test.cpp index 69fce8f919..d42ad849b2 100644 --- a/be/test/runtime/tablet_writer_mgr_test.cpp +++ b/be/test/runtime/tablet_writer_mgr_test.cpp @@ -32,6 +32,7 @@ #include "runtime/descriptor_helper.h" #include "util/thrift_util.h" #include "olap/delta_writer.h" +#include "olap/storage_engine.h" namespace doris { @@ -42,7 +43,7 @@ OLAPStatus close_status; int64_t wait_lock_time_ns; // mock -DeltaWriter::DeltaWriter(WriteRequest* req) : _req(*req) { +DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine) : _req(*req) { } DeltaWriter::~DeltaWriter() { @@ -56,7 +57,7 @@ OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { if (open_status != OLAP_SUCCESS) { return open_status; } - *writer = new DeltaWriter(req); + *writer = new DeltaWriter(req, nullptr); return open_status; } @@ -69,7 +70,11 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) { return add_status; } -OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* tablet_vec) { +OLAPStatus DeltaWriter::close() { + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { return close_status; } @@ -479,7 +484,9 @@ TEST_F(TabletWriterMgrTest, close_failed) { google::protobuf::RepeatedPtrField tablet_vec; auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); request.release_id(); - ASSERT_FALSE(st.ok()); + // even if delta close failed, the return status is still ok, but tablet_vec is empty + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(tablet_vec.empty()); } } diff --git a/be/test/runtime/user_function_cache_test.cpp b/be/test/runtime/user_function_cache_test.cpp index b125938f32..a4dc1fc317 100644 --- a/be/test/runtime/user_function_cache_test.cpp +++ b/be/test/runtime/user_function_cache_test.cpp @@ -86,7 +86,7 @@ public: UserFunctionCacheTest() { } virtual ~UserFunctionCacheTest() { } static void SetUpTestCase() { - s_server = new EvHttpServer(29999); + s_server = new EvHttpServer(29987); s_server->register_handler(GET, "/{FILE}", &s_test_handler); s_server->start(); @@ -130,7 +130,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_add st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_TRUE(k_is_downloaded); @@ -140,7 +140,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_del st = cache.get_function_ptr(1, "_Z6my_delv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_NE(nullptr, fn_ptr); @@ -149,7 +149,7 @@ TEST_F(UserFunctionCacheTest, download_normal) { // get my_mul st = cache.get_function_ptr(1, "_Z6my_mulv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_FALSE(st.ok()); @@ -165,7 +165,7 @@ TEST_F(UserFunctionCacheTest, load_normal) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_TRUE(st.ok()); ASSERT_FALSE(k_is_downloaded); @@ -183,7 +183,7 @@ TEST_F(UserFunctionCacheTest, download_fail) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(2, "_Z6my_delv", - "http://127.0.0.1:29999/my_del.so", + "http://127.0.0.1:29987/my_del.so", my_add_md5sum, &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } @@ -199,7 +199,7 @@ TEST_F(UserFunctionCacheTest, md5_fail) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(1, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", "1234", &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } @@ -218,7 +218,7 @@ TEST_F(UserFunctionCacheTest, bad_so) { UserFunctionCacheEntry* entry = nullptr; st = cache.get_function_ptr(2, "_Z6my_addv", - "http://127.0.0.1:29999/my_add.so", + "http://127.0.0.1:29987/my_add.so", "abc", &fn_ptr, &entry); ASSERT_FALSE(st.ok()); } diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 1955a00452..ff910ddfa2 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -47,3 +47,4 @@ ADD_BE_TEST(tdigest_test) ADD_BE_TEST(block_compression_test) ADD_BE_TEST(arrow/arrow_row_block_test) ADD_BE_TEST(arrow/arrow_row_batch_test) +ADD_BE_TEST(counter_cond_variable_test) diff --git a/be/test/util/counter_cond_variable_test.cpp b/be/test/util/counter_cond_variable_test.cpp new file mode 100644 index 0000000000..b64110af54 --- /dev/null +++ b/be/test/util/counter_cond_variable_test.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/counter_cond_variable.hpp" + +#include +#include + +#include "common/logging.h" + +namespace doris { + +CounterCondVariable g_cond; +std::mutex g_io_mu; + +class CounterCondVariableTest : public testing::Test { +public: + CounterCondVariableTest() { } + virtual ~CounterCondVariableTest() { } +}; + +void submitter() { + for (int i = 0; i < 10; ++i) { + g_cond.inc(); + } +} + +void worker() { + for (int i = 0; i < 10; ++i) { + { + std::unique_lock lock(g_io_mu); + std::cout << "worker " << i << std::endl; + } + sleep(1); + g_cond.dec(); + } +} + +void waiter() { + g_cond.block_wait(); + std::cout << "wait finished" << std::endl; +} + +TEST_F(CounterCondVariableTest, test) { + g_cond.block_wait(); + g_cond.inc(10); + g_cond.dec(10); + g_cond.block_wait(); + + std::thread submit(submitter); + std::thread wait1(waiter); + std::thread wait2(waiter); + std::thread work1(worker); + std::thread work2(worker); + + submit.join(); + wait1.join(); + wait2.join(); + work1.join(); + work2.join(); + + g_cond.inc(10); + g_cond.dec_to_zero(); + g_cond.block_wait(); +} + +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index c83aa89a74..a0331d432d 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -395,7 +395,7 @@ public class GlobalTransactionMgr { } if (successReplicaNum < quorumReplicaNum) { - LOG.warn("Failed to commit txn []. " + LOG.warn("Failed to commit txn [{}]. " + "Tablet [{}] success replica num is {} < quorum replica num {} " + "while error backends {}", transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, diff --git a/run-ut.sh b/run-ut.sh index c816e0a19e..95a9456e65 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -158,6 +158,7 @@ ${DORIS_TEST_BINARY_DIR}/util/tdigest_test ${DORIS_TEST_BINARY_DIR}/util/block_compression_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_block_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_batch_test +${DORIS_TEST_BINARY_DIR}/util/counter_cond_variable_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test @@ -241,6 +242,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/column_reader_test ${DORIS_TEST_BINARY_DIR}/olap/row_cursor_test ${DORIS_TEST_BINARY_DIR}/olap/skiplist_test ${DORIS_TEST_BINARY_DIR}/olap/serialize_test +${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test