diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 17f69c3d36..0ccb48f116 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -742,8 +742,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); // share delta writers when memtable_on_sink_node = true DEFINE_Bool(share_delta_writers, "true"); -// number of brpc stream per load -DEFINE_Int32(num_streams_per_load, "5"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "500"); diff --git a/be/src/common/config.h b/be/src/common/config.h index cfcd09c198..f0fdf58558 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -799,8 +799,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); // share delta writers when memtable_on_sink_node = true DECLARE_Bool(share_delta_writers); -// number of brpc stream per load -DECLARE_Int32(num_streams_per_load); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 67d0e6045b..62bc3ca8de 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->set_per_fragment_instance_idx(local_params.sender_id); _runtime_state->set_num_per_fragment_instances(request.num_senders); + _runtime_state->set_load_stream_per_node(request.load_stream_per_node); + _runtime_state->set_total_load_streams(request.total_load_streams); if (request.fragment.__isset.output_sink) { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 95cdf47bec..fee49621f0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -211,6 +211,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } _runtime_state->set_desc_tbl(_desc_tbl); _runtime_state->set_num_per_fragment_instances(request.num_senders); + _runtime_state->set_load_stream_per_node(request.load_stream_per_node); + _runtime_state->set_total_load_streams(request.total_load_streams); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index b9cc5891b4..6f66e7239d 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -30,6 +30,7 @@ #include "runtime/load_channel.h" #include "runtime/load_stream_mgr.h" #include "runtime/load_stream_writer.h" +#include "util/runtime_profile.h" #include "util/thrift_util.h" #include "util/uid_util.h" @@ -251,6 +252,8 @@ LoadStream::~LoadStream() { Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); + _total_streams = request->total_streams(); + DCHECK(_total_streams > 0) << "total streams should be greator than 0"; _schema = std::make_shared(); RETURN_IF_ERROR(_schema->init(request->schema())); @@ -265,41 +268,49 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { Status LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, std::vector* failed_tablet_ids) { - std::lock_guard lock_guard(_lock); + std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); // we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack _open_streams[src_id]--; - LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " - << _open_streams[src_id] << " streams"; if (_open_streams[src_id] == 0) { _open_streams.erase(src_id); } + _close_load_cnt++; + LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining " + << _total_streams - _close_load_cnt << " senders"; + + _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), + tablets_to_commit.end()); + + if (_close_load_cnt < _total_streams) { + // do not return commit info if there is remaining streams. + return Status::OK(); + } Status st = Status::OK(); - if (_open_streams.size() == 0) { + { bthread::Mutex mutex; std::unique_lock lock(mutex); bthread::ConditionVariable cond; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([this, &success_tablet_ids, - &failed_tablet_ids, - &tablets_to_commit, &mutex, - &cond, &st]() { - signal::set_signal_task_id(_load_id); - for (auto& it : _index_streams_map) { - st = it.second->close(tablets_to_commit, success_tablet_ids, failed_tablet_ids); - if (!st.ok()) { + bool ret = _load_stream_mgr->heavy_work_pool()->try_offer( + [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() { + signal::set_signal_task_id(_load_id); + for (auto& it : _index_streams_map) { + st = it.second->close(_tablets_to_commit, success_tablet_ids, + failed_tablet_ids); + if (!st.ok()) { + std::unique_lock lock(mutex); + cond.notify_one(); + return; + } + } + LOG(INFO) << "close load " << *this + << ", failed_tablet_num=" << failed_tablet_ids->size() + << ", success_tablet_num=" << success_tablet_ids->size(); std::unique_lock lock(mutex); cond.notify_one(); - return; - } - } - LOG(INFO) << "close load " << *this - << ", failed_tablet_num=" << failed_tablet_ids->size() - << ", success_tablet_num=" << success_tablet_ids->size(); - std::unique_lock lock(mutex); - cond.notify_one(); - }); + }); if (ret) { cond.wait(lock); } else { @@ -307,24 +318,21 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t "there is not enough thread resource for close load"); } } - - // do not return commit info for non-last one. return st; } -void LoadStream::_report_result(StreamId stream, Status& st, - std::vector* success_tablet_ids, - std::vector* failed_tablet_ids) { - LOG(INFO) << "report result, success tablet num " << success_tablet_ids->size() - << ", failed tablet num " << failed_tablet_ids->size(); +void LoadStream::_report_result(StreamId stream, const Status& st, + const std::vector& success_tablet_ids, + const std::vector& failed_tablet_ids) { + LOG(INFO) << "report result, success tablet num " << success_tablet_ids.size() + << ", failed tablet num " << failed_tablet_ids.size(); butil::IOBuf buf; PWriteStreamSinkResponse response; st.to_protobuf(response.mutable_status()); - for (auto& id : *success_tablet_ids) { + for (auto& id : success_tablet_ids) { response.add_success_tablet_ids(id); } - - for (auto& id : *failed_tablet_ids) { + for (auto& id : failed_tablet_ids) { response.add_failed_tablet_ids(id); } @@ -421,18 +429,20 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[] void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) { VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id() << " with tablet " << hdr.tablet_id(); + if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) { + Status st = Status::Error("invalid load id {}, expected {}", + UniqueId(hdr.load_id()).to_string(), + UniqueId(_load_id).to_string()); + _report_failure(id, st, hdr); + return; + } { std::lock_guard lock_guard(_lock); if (!_open_streams.contains(hdr.src_id())) { - std::vector success_tablet_ids; - std::vector failed_tablet_ids; - if (hdr.has_tablet_id()) { - failed_tablet_ids.push_back(hdr.tablet_id()); - } Status st = Status::Error("no open stream from source {}", hdr.src_id()); - _report_result(id, st, &success_tablet_ids, &failed_tablet_ids); + _report_failure(id, st, hdr); return; } } @@ -442,10 +452,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* case PStreamHeader::APPEND_DATA: { auto st = _append_data(hdr, data); if (!st.ok()) { - std::vector success_tablet_ids; - std::vector failed_tablet_ids; - failed_tablet_ids.push_back(hdr.tablet_id()); - _report_result(id, st, &success_tablet_ids, &failed_tablet_ids); + _report_failure(id, st, hdr); } } break; case PStreamHeader::CLOSE_LOAD: { @@ -454,7 +461,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector tablets_to_commit(hdr.tablets_to_commit().begin(), hdr.tablets_to_commit().end()); auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablet_ids); - _report_result(id, st, &success_tablet_ids, &failed_tablet_ids); + _report_result(id, st, success_tablet_ids, failed_tablet_ids); brpc::StreamClose(id); } break; default: @@ -468,9 +475,9 @@ void LoadStream::on_idle_timeout(StreamId id) { } void LoadStream::on_closed(StreamId id) { - auto remaining_rpc_stream = remove_rpc_stream(); - LOG(INFO) << "stream closed " << id << ", remaining_rpc_stream=" << remaining_rpc_stream; - if (remaining_rpc_stream == 0) { + auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1; + LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams; + if (remaining_streams == 0) { _load_stream_mgr->clear_load(_load_id); } } diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index fe7d90d502..1c16c086e2 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -113,9 +113,6 @@ public: _open_streams[src_id]++; } - uint32_t add_rpc_stream() { return ++_num_rpc_streams; } - uint32_t remove_rpc_stream() { return --_num_rpc_streams; } - Status close(int64_t src_id, const std::vector& tablets_to_commit, std::vector* success_tablet_ids, std::vector* failed_tablet_ids); @@ -130,16 +127,31 @@ private: void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr); void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data); Status _append_data(const PStreamHeader& header, butil::IOBuf* data); - void _report_result(StreamId stream, Status& st, std::vector* success_tablet_ids, - std::vector* failed_tablet_ids); + + void _report_result(StreamId stream, const Status& st, + const std::vector& success_tablet_ids, + const std::vector& failed_tablet_ids); + + // report failure for one message + void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) { + std::vector success; // empty + std::vector failure; + if (header.has_tablet_id()) { + failure.push_back(header.tablet_id()); + } + _report_result(stream, status, success, failure); + } private: PUniqueId _load_id; std::unordered_map _index_streams_map; - std::atomic _num_rpc_streams; + int32_t _total_streams = 0; + int32_t _close_load_cnt = 0; + std::atomic _close_rpc_cnt = 0; + std::vector _tablets_to_commit; bthread::Mutex _lock; std::unordered_map _open_streams; - int64_t _txn_id; + int64_t _txn_id = 0; std::shared_ptr _schema; bool _enable_profile = false; std::unique_ptr _profile; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 9344378e91..39277c1a42 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -224,6 +224,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { _runtime_state->set_per_fragment_instance_idx(params.sender_id); _runtime_state->set_num_per_fragment_instances(params.num_senders); + _runtime_state->set_load_stream_per_node(request.load_stream_per_node); + _runtime_state->set_total_load_streams(request.total_load_streams); // set up sink, if required if (request.fragment.__isset.output_sink) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cba5142c15..e990540d2f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -309,6 +309,18 @@ public: int num_per_fragment_instances() const { return _num_per_fragment_instances; } + void set_load_stream_per_node(int load_stream_per_node) { + _load_stream_per_node = load_stream_per_node; + } + + int load_stream_per_node() const { return _load_stream_per_node; } + + void set_total_load_streams(int total_load_streams) { + _total_load_streams = total_load_streams; + } + + int total_load_streams() const { return _total_load_streams; } + bool disable_stream_preaggregations() const { return _query_options.disable_stream_preaggregations; } @@ -545,6 +557,8 @@ private: int _per_fragment_instance_idx; int _num_per_fragment_instances = 0; + int _load_stream_per_node = 0; + int _total_load_streams = 0; // The backend id on which this fragment instance runs int64_t _backend_id = -1; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 01151dcf34..d6846c07a1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -401,7 +401,6 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con return; } - load_stream->add_rpc_stream(); VLOG_DEBUG << "get streamid =" << streamid; st.to_protobuf(response->mutable_status()); }); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index fe9887b3a3..c2f7f246f3 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -102,11 +102,11 @@ LoadStreamStub::~LoadStreamStub() { } // open_load_stream -// tablets means Status LoadStreamStub::open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, - const std::vector& tablets_for_schema, bool enable_profile) { + const std::vector& tablets_for_schema, int total_streams, + bool enable_profile) { _num_open++; std::unique_lock lock(_mutex); if (_is_init.load()) { @@ -130,6 +130,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); + request.set_total_streams(total_streams); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { *request.add_tablets() = tablet; diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 2db9ffcd95..3650b2aeae 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -148,7 +148,8 @@ public: // open_load_stream Status open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, - const std::vector& tablets_for_schema, bool enable_profile); + const std::vector& tablets_for_schema, int total_streams, + bool enable_profile); // for mock this class in UT #ifdef BE_TEST diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 834c152386..5848ca5a51 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -28,14 +28,14 @@ LoadStreamStubPool::LoadStreamStubPool() = default; LoadStreamStubPool::~LoadStreamStubPool() = default; std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, - int64_t dst_id) { + int64_t dst_id, int num_streams) { auto key = std::make_pair(UniqueId(load_id), dst_id); std::lock_guard lock(_mutex); std::shared_ptr streams = _pool[key].lock(); if (streams) { return streams; } - int32_t num_streams = std::max(1, config::num_streams_per_load); + DCHECK(num_streams > 0) << "stream num should be greater than 0"; auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id}); auto deleter = [this, key](Streams* s) { std::lock_guard lock(_mutex); diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index ae550340d2..73b41fdd61 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -78,7 +78,8 @@ public: ~LoadStreamStubPool(); - std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id); + std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, + int num_streams); size_t size() { std::lock_guard lock(_mutex); diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 69a372e0e3..6e610ee717 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -152,6 +152,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { _sender_id = state->per_fragment_instance_idx(); _num_senders = state->num_per_fragment_instances(); + _stream_per_node = state->load_stream_per_node(); + _total_streams = state->total_load_streams(); + DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; + DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; + LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node + << ", total_streams " << _total_streams; _is_high_priority = (state->execution_timeout() <= config::load_task_high_priority_threshold_second); @@ -218,19 +224,19 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } std::shared_ptr streams; - streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id, - dst_id); + streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, src_id, dst_id, _stream_per_node); // get tablet schema from each backend only in the 1st stream for (auto& stream : *streams | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, - _state->enable_profile())); + _total_streams, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : *streams | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), - *node_info, _txn_id, *_schema, {}, + *node_info, _txn_id, *_schema, {}, _total_streams, _state->enable_profile())); } _streams_for_node[dst_id] = streams; @@ -293,7 +299,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) { for (auto& node_id : location->node_ids) { streams.emplace_back(_streams_for_node[node_id]->at(_stream_index)); } - _stream_index = (_stream_index + 1) % config::num_streams_per_load; + _stream_index = (_stream_index + 1) % _stream_per_node; return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index a67c4e65cd..1a67ea581e 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -155,6 +155,8 @@ private: // To support multiple senders, we maintain a channel for each sender. int _sender_id = -1; int _num_senders = -1; + int _stream_per_node = 0; + int _total_streams = 0; bool _is_high_priority = false; bool _write_file_cache = false; diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index bdd0ace9a8..05dfa2ee1a 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -67,6 +67,7 @@ const uint32_t ABNORMAL_SENDER_ID = 10000; const int64_t NORMAL_TXN_ID = 600001; const UniqueId NORMAL_LOAD_ID(1, 1); const UniqueId ABNORMAL_LOAD_ID(1, 0); +std::string NORMAL_STRING("normal"); std::string ABNORMAL_STRING("abnormal"); void construct_schema(OlapTableSchemaParam* schema) { @@ -374,6 +375,8 @@ public: } LoadStreamSharedPtr load_stream; + LOG(INFO) << "total streams: " << request->total_streams(); + EXPECT_GT(request->total_streams(), 0); auto st = _load_stream_mgr->open_load_stream(request, load_stream); stream_options.handler = load_stream.get(); @@ -387,8 +390,6 @@ public: return; } - load_stream->add_rpc_stream(); - status->set_status_code(TStatusCode::OK); response->set_allocated_status(status.get()); static_cast(response->release_status()); @@ -417,7 +418,7 @@ public: std::function _cb; }; - Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID) { + Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int total_streams = 1) { brpc::Channel channel; std::cerr << "connect_stream" << std::endl; // Initialize the channel, NULL means using default options. @@ -450,6 +451,7 @@ public: *request.mutable_load_id() = id; request.set_txn_id(NORMAL_TXN_ID); request.set_src_id(sender_id); + request.set_total_streams(total_streams); auto ptablet = request.add_tablets(); ptablet->set_tablet_id(NORMAL_TABLET_ID); ptablet->set_index_id(NORMAL_INDEX_ID); @@ -491,7 +493,7 @@ public: : _heavy_work_pool(4, 32, "load_stream_test_heavy"), _light_work_pool(4, 32, "load_stream_test_light") {} - void close_load(MockSinkClient& client, uint32_t sender_id) { + void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) { butil::IOBuf append_buf; PStreamHeader header; header.mutable_load_id()->set_hi(1); @@ -535,6 +537,11 @@ public: static_cast(client.send(&append_buf)); } + void write_normal(MockSinkClient& client) { + write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, + NORMAL_TABLET_ID, 0, NORMAL_STRING, true); + } + void write_abnormal_load(MockSinkClient& client) { write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true); @@ -657,25 +664,52 @@ public: // // one client +TEST_F(LoadStreamMgrTest, one_client_normal) { + MockSinkClient client; + auto st = client.connect_stream(); + EXPECT_TRUE(st.ok()); + + write_normal(client); + + reset_response_stat(); + close_load(client, ABNORMAL_SENDER_ID); + wait_for_ack(1); + EXPECT_EQ(g_response_stat.num, 1); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); + EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); + + close_load(client); + wait_for_ack(2); + EXPECT_EQ(g_response_stat.num, 2); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); + + // server will close stream on CLOSE_LOAD + wait_for_close(); + EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); +} + TEST_F(LoadStreamMgrTest, one_client_abnormal_load) { MockSinkClient client; auto st = client.connect_stream(); EXPECT_TRUE(st.ok()); - write_abnormal_load(client); - // TODO check abnormal load id - reset_response_stat(); - close_load(client, 1); + write_abnormal_load(client); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); - close_load(client, 0); + close_load(client); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); - EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); // server will close stream on CLOSE_LOAD @@ -1063,7 +1097,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { MockSinkClient clients[2]; for (int i = 0; i < 2; i++) { - auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i); + auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2); EXPECT_TRUE(st.ok()); } reset_response_stat(); @@ -1132,4 +1166,77 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); } +TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { + MockSinkClient clients[2]; + + EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok()); + + reset_response_stat(); + + std::vector segment_data; + segment_data.resize(6); + for (int32_t segid = 2; segid >= 0; segid--) { + for (int i = 0; i < 2; i++) { + std::string data = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); + segment_data[i * 3 + segid] = data; + LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data; + } + } + + for (int32_t segid = 2; segid >= 0; segid--) { + int i = 0; + write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, + NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true); + } + + EXPECT_EQ(g_response_stat.num, 0); + // CLOSE_LOAD + close_load(clients[0], 0); + wait_for_ack(1); + EXPECT_EQ(g_response_stat.num, 1); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); + + // sender 0 closed, before open sender 1, load stream should still be open + EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); + + EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok()); + + for (int32_t segid = 2; segid >= 0; segid--) { + int i = 1; + write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, + NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true); + } + + close_load(clients[1], 1); + wait_for_ack(2); + EXPECT_EQ(g_response_stat.num, 2); + EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); + EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); + + // server will close stream on CLOSE_LOAD + wait_for_close(); + EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); + + auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); + size_t sender_pos = written_data.find('='); + size_t sender_end = written_data.find(','); + EXPECT_NE(sender_pos, std::string::npos); + EXPECT_NE(sender_end, std::string::npos); + auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos); + LOG(INFO) << "sender_str " << sender_str; + uint32_t sender_id = std::stoi(sender_str); + + for (int i = 0; i < 3; i++) { + auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i); + EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); + } + sender_id = (sender_id + 1) % 2; + for (int i = 0; i < 3; i++) { + auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3); + EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); + } +} + } // namespace doris diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index f1ccb70bee..929b906aab 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -36,9 +36,9 @@ TEST_F(LoadStreamStubPoolTest, test) { PUniqueId load_id; load_id.set_hi(1); load_id.set_hi(2); - auto streams1 = pool.get_or_create(load_id, src_id, 101); - auto streams2 = pool.get_or_create(load_id, src_id, 102); - auto streams3 = pool.get_or_create(load_id, src_id, 101); + auto streams1 = pool.get_or_create(load_id, src_id, 101, 5); + auto streams2 = pool.get_or_create(load_id, src_id, 102, 5); + auto streams3 = pool.get_or_create(load_id, src_id, 101, 5); EXPECT_EQ(2, pool.size()); EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 934bca7ac0..f4131235da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -298,6 +298,8 @@ public class StreamLoadPlanner { perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams); execParams.setPerNodeScanRanges(perNodeScanRange); params.setParams(execParams); + params.setLoadStreamPerNode(taskInfo.getStreamPerNode()); + params.setTotalLoadStreams(taskInfo.getStreamPerNode()); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQueryType(TQueryType.LOAD); queryOptions.setQueryTimeout(timeout); @@ -499,6 +501,8 @@ public class StreamLoadPlanner { pipParams.per_exch_num_senders = Maps.newHashMap(); pipParams.destinations = Lists.newArrayList(); pipParams.setNumSenders(1); + pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode()); + pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode()); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9aaca2f8e2..56ca362e69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -78,6 +78,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TErrorTabletInfo; @@ -688,6 +689,7 @@ public class Coordinator implements CoordInterface { int backendIdx = 0; int profileFragmentId = 0; long memoryLimit = queryOptions.getMemLimit(); + Set backendsWithOlapTableSink = Sets.newHashSet(); beToExecStates.clear(); // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. @@ -755,8 +757,23 @@ public class Coordinator implements CoordInterface { beToExecStates.putIfAbsent(execState.backend.getId(), states); } states.addState(execState); + if (tParam.getFragment().getOutputSink() != null + && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { + backendsWithOlapTableSink.add(execState.backend.getId()); + } ++backendIdx; } + int loadStreamPerNode = 1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode(); + } + for (TExecPlanFragmentParams tParam : tParams) { + if (tParam.getFragment().getOutputSink() != null + && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { + tParam.setLoadStreamPerNode(loadStreamPerNode); + tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode); + } + } profileFragmentId += 1; } // end for fragments @@ -845,6 +862,7 @@ public class Coordinator implements CoordInterface { } } + Set backendsWithOlapTableSink = Sets.newHashSet(); // 3. group PipelineExecContext by BE. // So that we can use one RPC to send all fragment instances of a BE. for (Map.Entry entry : tParams.entrySet()) { @@ -878,8 +896,25 @@ public class Coordinator implements CoordInterface { } ctxs.addContext(pipelineExecContext); + if (entry.getValue().getFragment().getOutputSink() != null + && entry.getValue().getFragment().getOutputSink().getType() + == TDataSinkType.OLAP_TABLE_SINK) { + backendsWithOlapTableSink.add(backendId); + } ++backendIdx; } + int loadStreamPerNode = 1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode(); + } + for (Map.Entry entry : tParams.entrySet()) { + if (entry.getValue().getFragment().getOutputSink() != null + && entry.getValue().getFragment().getOutputSink().getType() + == TDataSinkType.OLAP_TABLE_SINK) { + entry.getValue().setLoadStreamPerNode(loadStreamPerNode); + entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode); + } + } profileFragmentId += 1; } // end for fragments diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3698057003..955450a15c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -411,6 +411,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_MEMTABLE_ON_SINK_NODE = "enable_memtable_on_sink_node"; + public static final String LOAD_STREAM_PER_NODE = "load_stream_per_node"; + public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update"; public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold"; @@ -1236,6 +1238,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = true) public boolean enableMemtableOnSinkNode = false; + @VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE) + public int loadStreamPerNode = 20; + @VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT) public boolean enableInsertGroupCommit = false; @@ -2405,6 +2410,14 @@ public class SessionVariable implements Serializable, Writable { this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate; } + public int getLoadStreamPerNode() { + return loadStreamPerNode; + } + + public void setLoadStreamPerNode(int loadStreamPerNode) { + this.loadStreamPerNode = loadStreamPerNode; + } + /** * Serialize to thrift object. * Used for rest api. diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 3174e4d5c6..610e243cd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -125,6 +125,10 @@ public interface LoadTaskInfo { return false; } + default int getStreamPerNode() { + return 20; + } + class ImportColumnDescs { public List descs = Lists.newArrayList(); public boolean isColumnDescsRewrited = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 485a3599b3..53a47d385b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -89,6 +89,7 @@ public class StreamLoadTask implements LoadTaskInfo { private boolean enableProfile = false; private boolean memtableOnSinkNode = false; + private int streamPerNode = 20; private byte enclose = 0; @@ -309,6 +310,15 @@ public class StreamLoadTask implements LoadTaskInfo { this.memtableOnSinkNode = memtableOnSinkNode; } + @Override + public int getStreamPerNode() { + return streamPerNode; + } + + public void setStreamPerNode(int streamPerNode) { + this.streamPerNode = streamPerNode; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType(), @@ -447,6 +457,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); } + if (request.isSetStreamPerNode()) { + this.streamPerNode = request.getStreamPerNode(); + } } // used for stream load diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9a36916317..f9c2603cb9 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -749,6 +749,7 @@ message POpenLoadStreamRequest { optional POlapTableSchemaParam schema = 4; repeated PTabletID tablets = 5; optional bool enable_profile = 6 [default = false]; + optional int64 total_streams = 7; } message PTabletSchemaWithIndex { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1e0ad2d462..04eaa57824 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -641,6 +641,7 @@ struct TStreamLoadPutRequest { 52: optional i8 escape 53: optional bool memtable_on_sink_node; 54: optional bool group_commit + 55: optional i32 stream_per_node; } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cf38f51c05..e55ab89e32 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -455,6 +455,12 @@ struct TExecPlanFragmentParams { 24: optional map file_scan_params 25: optional i64 wal_id + + // num load stream for each sink backend + 26: optional i32 load_stream_per_node + + // total num of load streams the downstream backend will see + 27: optional i32 total_load_streams } struct TExecPlanFragmentParamsList { @@ -670,6 +676,8 @@ struct TPipelineFragmentParams { // scan node id -> scan range params, only for external file scan 29: optional map file_scan_params 30: optional bool group_commit = false; + 31: optional i32 load_stream_per_node // num load stream for each sink backend + 32: optional i32 total_load_streams // total num of load streams the downstream backend will see } struct TPipelineFragmentParamsList {