[fix](move-memtable) pass load stream num to backends (#26198)

This commit is contained in:
Kaijie Chen
2023-11-08 16:16:33 +08:00
committed by GitHub
parent 6637f9c15f
commit 58bf79f79e
25 changed files with 313 additions and 82 deletions

View File

@ -742,8 +742,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// share delta writers when memtable_on_sink_node = true
DEFINE_Bool(share_delta_writers, "true");
// number of brpc stream per load
DEFINE_Int32(num_streams_per_load, "5");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "500");

View File

@ -799,8 +799,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// share delta writers when memtable_on_sink_node = true
DECLARE_Bool(share_delta_writers);
// number of brpc stream per load
DECLARE_Int32(num_streams_per_load);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);

View File

@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(

View File

@ -211,6 +211,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();

View File

@ -30,6 +30,7 @@
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
@ -251,6 +252,8 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = request->total_streams();
DCHECK(_total_streams > 0) << "total streams should be greator than 0";
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
@ -265,41 +268,49 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) {
Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids,
std::vector<int64_t>* failed_tablet_ids) {
std::lock_guard lock_guard(_lock);
std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// we do nothing until recv CLOSE_LOAD from all stream to ensure all data are handled before ack
_open_streams[src_id]--;
LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
<< _open_streams[src_id] << " streams";
if (_open_streams[src_id] == 0) {
_open_streams.erase(src_id);
}
_close_load_cnt++;
LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
<< _total_streams - _close_load_cnt << " senders";
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
if (_close_load_cnt < _total_streams) {
// do not return commit info if there is remaining streams.
return Status::OK();
}
Status st = Status::OK();
if (_open_streams.size() == 0) {
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([this, &success_tablet_ids,
&failed_tablet_ids,
&tablets_to_commit, &mutex,
&cond, &st]() {
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
st = it.second->close(tablets_to_commit, success_tablet_ids, failed_tablet_ids);
if (!st.ok()) {
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() {
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
st = it.second->close(_tablets_to_commit, success_tablet_ids,
failed_tablet_ids);
if (!st.ok()) {
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
return;
}
}
LOG(INFO) << "close load " << *this
<< ", failed_tablet_num=" << failed_tablet_ids->size()
<< ", success_tablet_num=" << success_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
return;
}
}
LOG(INFO) << "close load " << *this
<< ", failed_tablet_num=" << failed_tablet_ids->size()
<< ", success_tablet_num=" << success_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
});
if (ret) {
cond.wait(lock);
} else {
@ -307,24 +318,21 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
"there is not enough thread resource for close load");
}
}
// do not return commit info for non-last one.
return st;
}
void LoadStream::_report_result(StreamId stream, Status& st,
std::vector<int64_t>* success_tablet_ids,
std::vector<int64_t>* failed_tablet_ids) {
LOG(INFO) << "report result, success tablet num " << success_tablet_ids->size()
<< ", failed tablet num " << failed_tablet_ids->size();
void LoadStream::_report_result(StreamId stream, const Status& st,
const std::vector<int64_t>& success_tablet_ids,
const std::vector<int64_t>& failed_tablet_ids) {
LOG(INFO) << "report result, success tablet num " << success_tablet_ids.size()
<< ", failed tablet num " << failed_tablet_ids.size();
butil::IOBuf buf;
PWriteStreamSinkResponse response;
st.to_protobuf(response.mutable_status());
for (auto& id : *success_tablet_ids) {
for (auto& id : success_tablet_ids) {
response.add_success_tablet_ids(id);
}
for (auto& id : *failed_tablet_ids) {
for (auto& id : failed_tablet_ids) {
response.add_failed_tablet_ids(id);
}
@ -421,18 +429,20 @@ int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[]
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << hdr.src_id()
<< " with tablet " << hdr.tablet_id();
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load id {}, expected {}",
UniqueId(hdr.load_id()).to_string(),
UniqueId(_load_id).to_string());
_report_failure(id, st, hdr);
return;
}
{
std::lock_guard lock_guard(_lock);
if (!_open_streams.contains(hdr.src_id())) {
std::vector<int64_t> success_tablet_ids;
std::vector<int64_t> failed_tablet_ids;
if (hdr.has_tablet_id()) {
failed_tablet_ids.push_back(hdr.tablet_id());
}
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open stream from source {}",
hdr.src_id());
_report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
_report_failure(id, st, hdr);
return;
}
}
@ -442,10 +452,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
case PStreamHeader::APPEND_DATA: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
std::vector<int64_t> success_tablet_ids;
std::vector<int64_t> failed_tablet_ids;
failed_tablet_ids.push_back(hdr.tablet_id());
_report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
_report_failure(id, st, hdr);
}
} break;
case PStreamHeader::CLOSE_LOAD: {
@ -454,7 +461,7 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
std::vector<PTabletID> tablets_to_commit(hdr.tablets_to_commit().begin(),
hdr.tablets_to_commit().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablet_ids);
_report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
_report_result(id, st, success_tablet_ids, failed_tablet_ids);
brpc::StreamClose(id);
} break;
default:
@ -468,9 +475,9 @@ void LoadStream::on_idle_timeout(StreamId id) {
}
void LoadStream::on_closed(StreamId id) {
auto remaining_rpc_stream = remove_rpc_stream();
LOG(INFO) << "stream closed " << id << ", remaining_rpc_stream=" << remaining_rpc_stream;
if (remaining_rpc_stream == 0) {
auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << remaining_streams;
if (remaining_streams == 0) {
_load_stream_mgr->clear_load(_load_id);
}
}

View File

@ -113,9 +113,6 @@ public:
_open_streams[src_id]++;
}
uint32_t add_rpc_stream() { return ++_num_rpc_streams; }
uint32_t remove_rpc_stream() { return --_num_rpc_streams; }
Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, std::vector<int64_t>* failed_tablet_ids);
@ -130,16 +127,31 @@ private:
void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
void _report_result(StreamId stream, Status& st, std::vector<int64_t>* success_tablet_ids,
std::vector<int64_t>* failed_tablet_ids);
void _report_result(StreamId stream, const Status& st,
const std::vector<int64_t>& success_tablet_ids,
const std::vector<int64_t>& failed_tablet_ids);
// report failure for one message
void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
std::vector<int64_t> success; // empty
std::vector<int64_t> failure;
if (header.has_tablet_id()) {
failure.push_back(header.tablet_id());
}
_report_result(stream, status, success, failure);
}
private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
std::atomic<uint32_t> _num_rpc_streams;
int32_t _total_streams = 0;
int32_t _close_load_cnt = 0;
std::atomic<int32_t> _close_rpc_cnt = 0;
std::vector<PTabletID> _tablets_to_commit;
bthread::Mutex _lock;
std::unordered_map<int64_t, int32_t> _open_streams;
int64_t _txn_id;
int64_t _txn_id = 0;
std::shared_ptr<OlapTableSchemaParam> _schema;
bool _enable_profile = false;
std::unique_ptr<RuntimeProfile> _profile;

View File

@ -224,6 +224,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
_runtime_state->set_per_fragment_instance_idx(params.sender_id);
_runtime_state->set_num_per_fragment_instances(params.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
// set up sink, if required
if (request.fragment.__isset.output_sink) {

View File

@ -309,6 +309,18 @@ public:
int num_per_fragment_instances() const { return _num_per_fragment_instances; }
void set_load_stream_per_node(int load_stream_per_node) {
_load_stream_per_node = load_stream_per_node;
}
int load_stream_per_node() const { return _load_stream_per_node; }
void set_total_load_streams(int total_load_streams) {
_total_load_streams = total_load_streams;
}
int total_load_streams() const { return _total_load_streams; }
bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
@ -545,6 +557,8 @@ private:
int _per_fragment_instance_idx;
int _num_per_fragment_instances = 0;
int _load_stream_per_node = 0;
int _total_load_streams = 0;
// The backend id on which this fragment instance runs
int64_t _backend_id = -1;

View File

@ -401,7 +401,6 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
return;
}
load_stream->add_rpc_stream();
VLOG_DEBUG << "get streamid =" << streamid;
st.to_protobuf(response->mutable_status());
});

View File

@ -102,11 +102,11 @@ LoadStreamStub::~LoadStreamStub() {
}
// open_load_stream
// tablets means
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, bool enable_profile) {
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
@ -130,6 +130,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
request.set_total_streams(total_streams);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;

View File

@ -148,7 +148,8 @@ public:
// open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, bool enable_profile);
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile);
// for mock this class in UT
#ifdef BE_TEST

View File

@ -28,14 +28,14 @@ LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id) {
int64_t dst_id, int num_streams) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _pool[key].lock();
if (streams) {
return streams;
}
int32_t num_streams = std::max(1, config::num_streams_per_load);
DCHECK(num_streams > 0) << "stream num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id});
auto deleter = [this, key](Streams* s) {
std::lock_guard<std::mutex> lock(_mutex);

View File

@ -78,7 +78,8 @@ public:
~LoadStreamStubPool();
std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id);
std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);

View File

@ -152,6 +152,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node
<< ", total_streams " << _total_streams;
_is_high_priority =
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
@ -218,19 +224,19 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
std::shared_ptr<Streams> streams;
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id,
dst_id);
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node);
// get tablet schema from each backend only in the 1st stream
for (auto& stream : *streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, tablets_for_schema,
_state->enable_profile()));
_total_streams, _state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : *streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {},
*node_info, _txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
}
_streams_for_node[dst_id] = streams;
@ -293,7 +299,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
for (auto& node_id : location->node_ids) {
streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
}
_stream_index = (_stream_index + 1) % config::num_streams_per_load;
_stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
}

View File

@ -155,6 +155,8 @@ private:
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
int _stream_per_node = 0;
int _total_streams = 0;
bool _is_high_priority = false;
bool _write_file_cache = false;

View File

@ -67,6 +67,7 @@ const uint32_t ABNORMAL_SENDER_ID = 10000;
const int64_t NORMAL_TXN_ID = 600001;
const UniqueId NORMAL_LOAD_ID(1, 1);
const UniqueId ABNORMAL_LOAD_ID(1, 0);
std::string NORMAL_STRING("normal");
std::string ABNORMAL_STRING("abnormal");
void construct_schema(OlapTableSchemaParam* schema) {
@ -374,6 +375,8 @@ public:
}
LoadStreamSharedPtr load_stream;
LOG(INFO) << "total streams: " << request->total_streams();
EXPECT_GT(request->total_streams(), 0);
auto st = _load_stream_mgr->open_load_stream(request, load_stream);
stream_options.handler = load_stream.get();
@ -387,8 +390,6 @@ public:
return;
}
load_stream->add_rpc_stream();
status->set_status_code(TStatusCode::OK);
response->set_allocated_status(status.get());
static_cast<void>(response->release_status());
@ -417,7 +418,7 @@ public:
std::function<void()> _cb;
};
Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID) {
Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int total_streams = 1) {
brpc::Channel channel;
std::cerr << "connect_stream" << std::endl;
// Initialize the channel, NULL means using default options.
@ -450,6 +451,7 @@ public:
*request.mutable_load_id() = id;
request.set_txn_id(NORMAL_TXN_ID);
request.set_src_id(sender_id);
request.set_total_streams(total_streams);
auto ptablet = request.add_tablets();
ptablet->set_tablet_id(NORMAL_TABLET_ID);
ptablet->set_index_id(NORMAL_INDEX_ID);
@ -491,7 +493,7 @@ public:
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
void close_load(MockSinkClient& client, uint32_t sender_id) {
void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
@ -535,6 +537,11 @@ public:
static_cast<void>(client.send(&append_buf));
}
void write_normal(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
}
void write_abnormal_load(MockSinkClient& client) {
write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
@ -657,25 +664,52 @@ public:
// <client, index, bucket>
// one client
TEST_F(LoadStreamMgrTest, one_client_normal) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
write_normal(client);
reset_response_stat();
close_load(client, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
close_load(client);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
write_abnormal_load(client);
// TODO check abnormal load id
reset_response_stat();
close_load(client, 1);
write_abnormal_load(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
close_load(client, 0);
close_load(client);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
@ -1063,7 +1097,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
MockSinkClient clients[2];
for (int i = 0; i < 2; i++) {
auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i);
auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2);
EXPECT_TRUE(st.ok());
}
reset_response_stat();
@ -1132,4 +1166,77 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
MockSinkClient clients[2];
EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok());
reset_response_stat();
std::vector<std::string> segment_data;
segment_data.resize(6);
for (int32_t segid = 2; segid >= 0; segid--) {
for (int i = 0; i < 2; i++) {
std::string data = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid);
segment_data[i * 3 + segid] = data;
LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data;
}
}
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 0;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true);
}
EXPECT_EQ(g_response_stat.num, 0);
// CLOSE_LOAD
close_load(clients[0], 0);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// sender 0 closed, before open sender 1, load stream should still be open
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok());
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 1;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true);
}
close_load(clients[1], 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
size_t sender_pos = written_data.find('=');
size_t sender_end = written_data.find(',');
EXPECT_NE(sender_pos, std::string::npos);
EXPECT_NE(sender_end, std::string::npos);
auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos);
LOG(INFO) << "sender_str " << sender_str;
uint32_t sender_id = std::stoi(sender_str);
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
sender_id = (sender_id + 1) % 2;
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
}
} // namespace doris

View File

@ -36,9 +36,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101);
auto streams2 = pool.get_or_create(load_id, src_id, 102);
auto streams3 = pool.get_or_create(load_id, src_id, 101);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);

View File

@ -298,6 +298,8 @@ public class StreamLoadPlanner {
perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams);
execParams.setPerNodeScanRanges(perNodeScanRange);
params.setParams(execParams);
params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
params.setTotalLoadStreams(taskInfo.getStreamPerNode());
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
queryOptions.setQueryTimeout(timeout);
@ -499,6 +501,8 @@ public class StreamLoadPlanner {
pipParams.per_exch_num_senders = Maps.newHashMap();
pipParams.destinations = Lists.newArrayList();
pipParams.setNumSenders(1);
pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex));

View File

@ -78,6 +78,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TErrorTabletInfo;
@ -688,6 +689,7 @@ public class Coordinator implements CoordInterface {
int backendIdx = 0;
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
beToExecStates.clear();
// If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
@ -755,8 +757,23 @@ public class Coordinator implements CoordInterface {
beToExecStates.putIfAbsent(execState.backend.getId(), states);
}
states.addState(execState);
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
backendsWithOlapTableSink.add(execState.backend.getId());
}
++backendIdx;
}
int loadStreamPerNode = 1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
}
for (TExecPlanFragmentParams tParam : tParams) {
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
tParam.setLoadStreamPerNode(loadStreamPerNode);
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode);
}
}
profileFragmentId += 1;
} // end for fragments
@ -845,6 +862,7 @@ public class Coordinator implements CoordInterface {
}
}
Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
// 3. group PipelineExecContext by BE.
// So that we can use one RPC to send all fragment instances of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
@ -878,8 +896,25 @@ public class Coordinator implements CoordInterface {
}
ctxs.addContext(pipelineExecContext);
if (entry.getValue().getFragment().getOutputSink() != null
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
backendsWithOlapTableSink.add(backendId);
}
++backendIdx;
}
int loadStreamPerNode = 1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
}
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
if (entry.getValue().getFragment().getOutputSink() != null
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode);
}
}
profileFragmentId += 1;
} // end for fragments

View File

@ -411,6 +411,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
"enable_memtable_on_sink_node";
public static final String LOAD_STREAM_PER_NODE = "load_stream_per_node";
public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update";
public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold";
@ -1236,6 +1238,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = true)
public boolean enableMemtableOnSinkNode = false;
@VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE)
public int loadStreamPerNode = 20;
@VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT)
public boolean enableInsertGroupCommit = false;
@ -2405,6 +2410,14 @@ public class SessionVariable implements Serializable, Writable {
this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate;
}
public int getLoadStreamPerNode() {
return loadStreamPerNode;
}
public void setLoadStreamPerNode(int loadStreamPerNode) {
this.loadStreamPerNode = loadStreamPerNode;
}
/**
* Serialize to thrift object.
* Used for rest api.

View File

@ -125,6 +125,10 @@ public interface LoadTaskInfo {
return false;
}
default int getStreamPerNode() {
return 20;
}
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;

View File

@ -89,6 +89,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private boolean enableProfile = false;
private boolean memtableOnSinkNode = false;
private int streamPerNode = 20;
private byte enclose = 0;
@ -309,6 +310,15 @@ public class StreamLoadTask implements LoadTaskInfo {
this.memtableOnSinkNode = memtableOnSinkNode;
}
@Override
public int getStreamPerNode() {
return streamPerNode;
}
public void setStreamPerNode(int streamPerNode) {
this.streamPerNode = streamPerNode;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@ -447,6 +457,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetMemtableOnSinkNode()) {
this.memtableOnSinkNode = request.isMemtableOnSinkNode();
}
if (request.isSetStreamPerNode()) {
this.streamPerNode = request.getStreamPerNode();
}
}
// used for stream load

View File

@ -749,6 +749,7 @@ message POpenLoadStreamRequest {
optional POlapTableSchemaParam schema = 4;
repeated PTabletID tablets = 5;
optional bool enable_profile = 6 [default = false];
optional int64 total_streams = 7;
}
message PTabletSchemaWithIndex {

View File

@ -641,6 +641,7 @@ struct TStreamLoadPutRequest {
52: optional i8 escape
53: optional bool memtable_on_sink_node;
54: optional bool group_commit
55: optional i32 stream_per_node;
}
struct TStreamLoadPutResult {

View File

@ -455,6 +455,12 @@ struct TExecPlanFragmentParams {
24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params
25: optional i64 wal_id
// num load stream for each sink backend
26: optional i32 load_stream_per_node
// total num of load streams the downstream backend will see
27: optional i32 total_load_streams
}
struct TExecPlanFragmentParamsList {
@ -670,6 +676,8 @@ struct TPipelineFragmentParams {
// scan node id -> scan range params, only for external file scan
29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> file_scan_params
30: optional bool group_commit = false;
31: optional i32 load_stream_per_node // num load stream for each sink backend
32: optional i32 total_load_streams // total num of load streams the downstream backend will see
}
struct TPipelineFragmentParamsList {