From 51ccd44865662be352bb9f7bd9194d42564be06f Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sun, 7 Feb 2021 22:42:18 +0800 Subject: [PATCH] [Load Parallel][3/3] Support parallel delta writer (#5369) In the previous broker load, multiple OlapTableSinks would send data to the same LoadChannel, and because of the lock granularity problem, LoadChannel could only process these requests serially, which made it impossible to make full use of cluster resources. This CL modifies the related locks so that LoadChannel can process these requests in parallel. In the test, with a size of 20G, the load speed of 334 million rows of data in 3 nodes has been increased from 9min to 5min, and after enabling 2 concurrency, it can be increased to 3min. Also modify the profile of load job. --- be/src/exec/tablet_sink.cpp | 37 +++++++-- be/src/exec/tablet_sink.h | 26 +++++-- be/src/olap/delta_writer.cpp | 60 +++++++++++++-- be/src/olap/delta_writer.h | 11 ++- be/src/olap/olap_define.h | 2 + .../rowset/unique_rowset_id_generator.cpp | 1 + be/src/olap/tablet_manager.cpp | 2 + be/src/runtime/load_channel_mgr.cpp | 3 +- be/src/runtime/load_channel_mgr.h | 3 +- be/src/runtime/tablets_channel.cpp | 76 +++++++++---------- be/src/service/internal_service.cpp | 4 +- be/test/runtime/load_channel_mgr_test.cpp | 20 +++-- .../java/org/apache/doris/qe/Coordinator.java | 4 +- 13 files changed, 174 insertions(+), 75 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a8f84b5336..949e0990eb 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) { _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT); _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT); _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime"); _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); - _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); + _non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile, "NonBlockingSendWorkTime", "NonBlockingSendTime"); + _serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime", "NonBlockingSendWorkTime"); + _total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime"); + _max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime"); + _add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); + _num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels", TUnit::UNIT); _load_mem_limit = state->get_load_mem_limit(); // open all channels @@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // BE id -> add_batch method counter std::unordered_map node_add_batch_counter_map; int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, - actual_consume_ns = 0; + actual_consume_ns = 0, total_add_batch_exec_time_ns = 0, + max_add_batch_exec_time_ns = 0, + total_add_batch_num = 0, num_node_channels = 0; { SCOPED_TIMER(_close_timer); for (auto index_channel : _channels) { index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); }); + num_node_channels += index_channel->num_node_channels(); } for (auto index_channel : _channels) { + int64_t add_batch_exec_time = 0; index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, - &queue_push_lock_ns, - &actual_consume_ns](NodeChannel* ch) { + &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, + &total_add_batch_num](NodeChannel* ch) { auto s = ch->close_wait(state); if (!s.ok()) { // 'status' will store the last non-ok status of all channels @@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { } ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, &mem_exceeded_block_ns, &queue_push_lock_ns, - &actual_consume_ns); + &actual_consume_ns, &total_add_batch_exec_time_ns, + &add_batch_exec_time, &total_add_batch_num); }); + + if (add_batch_exec_time > max_add_batch_exec_time_ns) { + max_add_batch_exec_time_ns = add_batch_exec_time; + } } } // TODO need to be improved @@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); + COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns); + COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns); + COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns); + COUNTER_SET(_add_batch_number, total_add_batch_num); + COUNTER_SET(_num_node_channels, num_node_channels); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); @@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: "; for (auto const& pair : node_add_batch_counter_map) { ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) - << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" - << pair.second.add_batch_num << ")} "; + << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); } else { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8d894c6da7..4a4220278e 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -80,7 +80,8 @@ struct AddBatchCounter { template class ReusableClosure : public google::protobuf::Closure { public: - ReusableClosure() : cid(INVALID_BTHREAD_ID) {} + ReusableClosure() : cid(INVALID_BTHREAD_ID) { + } ~ReusableClosure() { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); @@ -173,12 +174,17 @@ public: void time_report(std::unordered_map* add_batch_counter_map, int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, - int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { + int64_t* queue_push_lock_ns, int64_t* actual_consume_ns, + int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns, + int64_t* total_add_batch_num) { (*add_batch_counter_map)[_node_id] += _add_batch_counter; *serialize_batch_ns += _serialize_batch_ns; *mem_exceeded_block_ns += _mem_exceeded_block_ns; *queue_push_lock_ns += _queue_push_lock_ns; *actual_consume_ns += _actual_consume_ns; + *add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000); + *total_add_batch_exec_time_ns += *add_batch_exec_time_ns; + *total_add_batch_num += _add_batch_counter.add_batch_num; } int64_t node_id() const { return _node_id; } @@ -237,10 +243,10 @@ private: std::vector _tablet_commit_infos; AddBatchCounter _add_batch_counter; - std::atomic _serialize_batch_ns; - std::atomic _mem_exceeded_block_ns; - std::atomic _queue_push_lock_ns; - std::atomic _actual_consume_ns; + std::atomic _serialize_batch_ns{0}; + std::atomic _mem_exceeded_block_ns{0}; + std::atomic _queue_push_lock_ns{0}; + std::atomic _actual_consume_ns{0}; }; class IndexChannel { @@ -262,6 +268,8 @@ public: void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } bool has_intolerable_failure(); + size_t num_node_channels() const { return _node_channels.size(); } + private: OlapTableSink* _parent; int64_t _index_id; @@ -382,12 +390,18 @@ private: RuntimeProfile::Counter* _output_rows_counter = nullptr; RuntimeProfile::Counter* _filtered_rows_counter = nullptr; RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; RuntimeProfile::Counter* _convert_batch_timer = nullptr; RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; + RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr; + RuntimeProfile::Counter* _add_batch_number = nullptr; + RuntimeProfile::Counter* _num_node_channels = nullptr; // load mem limit is for remote load channel int64_t _load_mem_limit = -1; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1ed44fbf9f..ee80db1e2d 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() { } OLAPStatus DeltaWriter::write(Tuple* tuple) { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + // The writer may be cancelled at any time by other thread. + // just return ERROR if writer is cancelled. + return OLAP_ERR_ALREADY_CANCELLED; + } + _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() { return _flush_token->submit(_mem_table); } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + std::lock_guard l(_lock); + if (!_is_init) { + // This writer is not initialized before flushing. Do nothing + // But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED, + // Because this method maybe called when trying to reduce mem consumption, + // and at that time, the writer may not be initialized yet and that is a normal case. + return OLAP_SUCCESS; + } + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + if (mem_consumption() == _mem_table->memory_usage()) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { DCHECK(mem_consumption() > _mem_table->memory_usage()); // this means there should be at least one memtable in flush queue. } - // wait all memtables in flush queue to be flushed. + + if (need_wait) { + // wait all memtables in flush queue to be flushed. + RETURN_NOT_OK(_flush_token->wait()); + } + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { + std::lock_guard l(_lock); + if (!_is_init) { + // return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason + // as described in flush_memtable_and_wait() + return OLAP_SUCCESS; + } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } RETURN_NOT_OK(_flush_token->wait()); return OLAP_SUCCESS; } @@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() { } OLAPStatus DeltaWriter::close() { - if (!_is_init) { + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { // if this delta writer is not initialized, but close() is called. // which means this tablet has no data loaded, but at least one tablet // in same partition has data loaded. @@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() { RETURN_NOT_OK(init()); } + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return OLAP_SUCCESS; } OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField* tablet_vec) { + std::lock_guard l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; + + if (_is_cancelled) { + return OLAP_ERR_ALREADY_CANCELLED; + } + // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); DCHECK_EQ(_mem_tracker->consumption(), 0); @@ -295,7 +343,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField l(_lock); + if (!_is_init || _is_cancelled) { return OLAP_SUCCESS; } _mem_table.reset(); @@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() { _flush_token->cancel(); } DCHECK_EQ(_mem_tracker->consumption(), 0); + _is_cancelled = true; return OLAP_SUCCESS; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index caa0483801..9cf59eb506 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -21,6 +21,7 @@ #include "gen_cpp/internal_service.pb.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" +#include "util/spinlock.h" namespace doris { @@ -73,12 +74,17 @@ public: // submit current memtable to flush queue, and wait all memtables in flush queue // to be flushed. // This is currently for reducing mem consumption of this delta writer. - OLAPStatus flush_memtable_and_wait(); + // If need_wait is true, it will wait for all memtable in flush queue to be flushed. + // Otherwise, it will just put memtables to the flush queue and return. + OLAPStatus flush_memtable_and_wait(bool need_wait); int64_t partition_id() const; int64_t mem_consumption() const; + // Wait all memtable in flush queue to be flushed + OLAPStatus wait_flush(); + private: DeltaWriter(WriteRequest* req, const std::shared_ptr& parent, StorageEngine* storage_engine); @@ -92,6 +98,7 @@ private: private: bool _is_init = false; + bool _is_cancelled = false; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; @@ -106,6 +113,8 @@ private: StorageEngine* _storage_engine; std::unique_ptr _flush_token; std::shared_ptr _mem_tracker; + + SpinLock _lock; }; } // namespace doris diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 75ad835afa..ef1ec2efce 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -165,6 +165,8 @@ enum OLAPStatus { OLAP_ERR_TOO_MANY_TRANSACTIONS = -233, OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234, OLAP_ERR_TOO_MANY_VERSION = -235, + OLAP_ERR_NOT_INITIALIZED = -236, + OLAP_ERR_ALREADY_CANCELLED = -237, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/rowset/unique_rowset_id_generator.cpp b/be/src/olap/rowset/unique_rowset_id_generator.cpp index 71352ca848..c21b8caf2f 100644 --- a/be/src/olap/rowset/unique_rowset_id_generator.cpp +++ b/be/src/olap/rowset/unique_rowset_id_generator.cpp @@ -19,6 +19,7 @@ #include "util/doris_metrics.h" #include "util/spinlock.h" +#include "util/stack_util.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 78baf0f34f..9f19cf4c87 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, TTabletId* tablet_id, TSchemaHash* schema_hash) { + // the path like: /data/14/10080/964828783/ static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)"); // match tablet schema hash data path, for example, the path is /data/1/16791/29998 // 1 is shard id , 16791 is tablet id, 29998 is schema hash @@ -651,6 +652,7 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path, } bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) { + // the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*"); string id_str; bool ret = RE2::PartialMatch(path, re, &id_str); diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index c2feebb2f3..2f320d0c31 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { static void dummy_deleter(const CacheKey& key, void* value) {} Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns) { + google::protobuf::RepeatedPtrField* tablet_vec) { UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index f0ec6fab07..450c8bf0e2 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -51,8 +51,7 @@ public: Status open(const PTabletWriterOpenRequest& request); Status add_batch(const PTabletWriterAddBatchRequest& request, - google::protobuf::RepeatedPtrField* tablet_vec, - int64_t* wait_lock_time_ns); + google::protobuf::RepeatedPtrField* tablet_vec); // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 76c7166d82..b2a32c0a92 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -77,23 +77,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { DCHECK(params.tablet_ids_size() == params.row_batch().num_rows()); - std::lock_guard l(_lock); - if (_state != kOpened) { - return _state == kFinished - ? _close_status - : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", - _key.to_string(), _state)); - } - auto next_seq = _next_seqs[params.sender_id()]; - // check packet - if (params.packet_seq() < next_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::OK(); - } else if (params.packet_seq() > next_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << next_seq - << ", recept_seq=" << params.packet_seq(); - return Status::InternalError("lost data packet"); + int64_t cur_seq; + { + std::lock_guard l(_lock); + if (_state != kOpened) { + return _state == kFinished + ? _close_status + : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", + _key.to_string(), _state)); + } + cur_seq = _next_seqs[params.sender_id()]; + // check packet + if (params.packet_seq() < cur_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::OK(); + } else if (params.packet_seq() > cur_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq + << ", recept_seq=" << params.packet_seq(); + return Status::InternalError("lost data packet"); + } } RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get()); @@ -115,7 +118,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { return Status::InternalError(err_msg); } } - _next_seqs[params.sender_id()]++; + + { + std::lock_guard l(_lock); + _next_seqs[params.sender_id()] = cur_seq + 1; + } return Status::OK(); } @@ -183,30 +190,21 @@ Status TabletsChannel::reduce_mem_usage() { // therefore it's possible for reduce_mem_usage() to be called right after close() return _close_status; } - // find tablet writer with largest mem consumption - int64_t max_consume = 0L; - DeltaWriter* writer = nullptr; + + // Flush all memtables for (auto& it : _tablet_writers) { - if (it.second->mem_consumption() > max_consume) { - max_consume = it.second->mem_consumption(); - writer = it.second; + it.second->flush_memtable_and_wait(false); + } + + for (auto& it : _tablet_writers) { + OLAPStatus st = it.second->wait_flush(); + if (st != OLAP_SUCCESS) { + // flush failed, return error + std::stringstream ss; + ss << "failed to reduce mem consumption by flushing memtable. err: " << st; + return Status::InternalError(ss.str()); } } - - if (writer == nullptr || max_consume == 0) { - // barely not happend, just return OK - return Status::OK(); - } - - VLOG_NOTICE << "pick the delte writer to flush, with mem consumption: " << max_consume - << ", channel key: " << _key; - OLAPStatus st = writer->flush_memtable_and_wait(); - if (st != OLAP_SUCCESS) { - // flush failed, return error - std::stringstream ss; - ss << "failed to reduce mem consumption by flushing memtable. err: " << st; - return Status::InternalError(ss.str()); - } return Status::OK(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7c85654600..51480fbb03 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -97,11 +97,10 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr _tablet_worker_pool.offer([request, response, done, this]() { brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; - int64_t wait_lock_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); auto st = _exec_env->load_channel_mgr()->add_batch( - *request, response->mutable_tablet_vec(), &wait_lock_time_ns); + *request, response->mutable_tablet_vec()); if (!st.ok()) { LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() << ", id=" << request->id() << ", index_id=" << request->index_id() @@ -110,7 +109,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr st.to_protobuf(response->mutable_status()); } response->set_execution_time_us(execution_time_ns / 1000); - response->set_wait_lock_time_us(wait_lock_time_ns / 1000); }); } diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index 54b0d3feda..80e03adb58 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -85,7 +85,11 @@ OLAPStatus DeltaWriter::cancel() { return OLAP_SUCCESS; } -OLAPStatus DeltaWriter::flush_memtable_and_wait() { +OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) { + return OLAP_SUCCESS; +} + +OLAPStatus DeltaWriter::wait_flush() { return OLAP_SUCCESS; } @@ -246,7 +250,7 @@ TEST_F(LoadChannelMgrTest, normal) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -413,7 +417,7 @@ TEST_F(LoadChannelMgrTest, add_failed) { row_batch.serialize(request.mutable_row_batch()); add_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -503,7 +507,7 @@ TEST_F(LoadChannelMgrTest, close_failed) { row_batch.serialize(request.mutable_row_batch()); close_status = OLAP_ERR_TABLE_NOT_FOUND; google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); // even if delta close failed, the return status is still ok, but tablet_vec is empty ASSERT_TRUE(st.ok()); @@ -591,7 +595,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_FALSE(st.ok()); } @@ -677,10 +681,10 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { } row_batch.serialize(request.mutable_row_batch()); google::protobuf::RepeatedPtrField tablet_vec1; - auto st = mgr.add_batch(request, &tablet_vec1, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec1); ASSERT_TRUE(st.ok()); google::protobuf::RepeatedPtrField tablet_vec2; - st = mgr.add_batch(request, &tablet_vec2, &wait_lock_time_ns); + st = mgr.add_batch(request, &tablet_vec2); request.release_id(); ASSERT_TRUE(st.ok()); } @@ -693,7 +697,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) { request.set_eos(true); request.set_packet_seq(0); google::protobuf::RepeatedPtrField tablet_vec; - auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns); + auto st = mgr.add_batch(request, &tablet_vec); request.release_id(); ASSERT_TRUE(st.ok()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c0fba828b7..64295d5949 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1069,6 +1069,8 @@ public class Coordinator { List> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); + LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); + for (List scanRangeParams : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); @@ -1085,7 +1087,7 @@ public class Coordinator { throw new UserException("there is no scanNode Backend"); } this.addressToBackendID.put(execHostport, backendIdRef.getRef()); - FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, + FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 0, params); params.instanceExecParams.add(instanceParam); }