[fix](move-memtable) pass num local sink to backends (#26897)

This commit is contained in:
Kaijie Chen
2023-11-14 08:28:49 +08:00
committed by GitHub
parent de62c00f4e
commit b19abac5e2
19 changed files with 177 additions and 76 deletions

View File

@ -100,6 +100,9 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
// build tablet schema in request level
if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
}
_build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_streams[0]->tablet_schema(_req.index_id));
RowsetWriterContext context;

View File

@ -310,6 +310,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(

View File

@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();

View File

@ -214,6 +214,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
_runtime_state->set_num_per_fragment_instances(params.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
// set up sink, if required
if (request.fragment.__isset.output_sink) {

View File

@ -319,6 +319,10 @@ public:
int total_load_streams() const { return _total_load_streams; }
void set_num_local_sink(int num_local_sink) { _num_local_sink = num_local_sink; }
int num_local_sink() const { return _num_local_sink; }
bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
@ -553,6 +557,7 @@ private:
int _num_per_fragment_instances = 0;
int _load_stream_per_node = 0;
int _total_load_streams = 0;
int _num_local_sink = 0;
// The backend id on which this fragment instance runs
int64_t _backend_id = -1;

View File

@ -25,7 +25,8 @@ class TExpr;
namespace vectorized {
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {}
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool)
: _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
@ -38,9 +39,15 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
}
Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
if (--_use_cnt > 0) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , use_cnt = " << num_use;
return Status::OK();
}
LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
if (_pool != nullptr) {
_pool->erase(_load_id);
}
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
@ -59,6 +66,11 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
}
void DeltaWriterV2Map::cancel(Status status) {
int num_use = --_use_cnt;
LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " << num_use;
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
_map.for_each([&status](auto& entry) {
static_cast<void>(entry.second->cancel_with_status(status));
});
@ -68,23 +80,24 @@ DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id) {
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id,
int num_sink) {
UniqueId id {load_id};
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
if (map) {
map->grab();
return map;
}
auto deleter = [this](DeltaWriterV2Map* m) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(m->unique_id());
delete m;
};
map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
_pool[id] = map;
return map;
}
void DeltaWriterV2Pool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
_pool.erase(load_id);
}
} // namespace vectorized
} // namespace doris

View File

@ -58,26 +58,24 @@ class RuntimeProfile;
namespace vectorized {
class DeltaWriterV2Pool;
class DeltaWriterV2Map {
public:
DeltaWriterV2Map(UniqueId load_id);
DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool* pool = nullptr);
~DeltaWriterV2Map();
void grab() { ++_use_cnt; }
// get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map
DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close(RuntimeProfile* profile);
Status close(RuntimeProfile* profile = nullptr);
// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
UniqueId unique_id() const { return _load_id; }
size_t size() const { return _map.size(); }
private:
@ -89,6 +87,7 @@ private:
UniqueId _load_id;
TabletToDeltaWriterV2Map _map;
std::atomic<int> _use_cnt;
DeltaWriterV2Pool* _pool;
};
class DeltaWriterV2Pool {
@ -97,7 +96,9 @@ public:
~DeltaWriterV2Pool();
std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id, int num_sink = 1);
void erase(UniqueId load_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
@ -106,7 +107,7 @@ public:
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
std::unordered_map<UniqueId, std::shared_ptr<DeltaWriterV2Map>> _pool;
};
} // namespace vectorized

View File

@ -83,14 +83,16 @@ void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
_close_cv.notify_all();
}
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
: _load_id(load_id),
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
: _use_cnt(num_use),
_load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {};
LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
: _load_id(stub._load_id),
: _use_cnt(stub._use_cnt.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
@ -107,7 +109,6 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
return Status::OK();
@ -190,15 +191,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (--_num_open > 0) {
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
}
if (--_use_cnt > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
for (const auto& tablet : _tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
}
}
return _encode_and_send(header);
}

View File

@ -134,7 +134,7 @@ private:
public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id);
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub);
@ -177,7 +177,7 @@ public:
}
std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
return _tablet_schema_for_index->at(index_id);
return (*_tablet_schema_for_index)[index_id];
}
bool enable_unique_mow(int64_t index_id) const {
@ -203,7 +203,10 @@ protected:
std::atomic<bool> _is_init;
bthread::Mutex _mutex;
std::atomic<int> _num_open;
std::atomic<int> _use_cnt;
std::mutex _tablets_to_commit_mutex;
std::vector<PTabletID> _tablets_to_commit;
std::mutex _buffer_mutex;
std::mutex _send_mutex;

View File

@ -24,33 +24,50 @@ class TExpr;
namespace stream_load {
LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool)
: _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
void LoadStreams::release() {
int num_use = --_use_cnt;
if (num_use == 0) {
LOG(INFO) << "releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id;
_pool->erase(_load_id, _dst_id);
} else {
LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id
<< ", use_cnt = " << num_use;
}
}
LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams) {
std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams,
int num_sink) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _pool[key].lock();
std::shared_ptr<LoadStreams> streams = _pool[key];
if (streams) {
return streams;
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id});
auto deleter = [this, key](Streams* s) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(key);
_template_stubs.erase(key.first);
delete s;
};
streams = std::shared_ptr<Streams>(new Streams(), deleter);
DCHECK(num_sink > 0) << "sink num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink});
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->emplace_back(new LoadStreamStub {*it->second});
streams->streams().emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
return streams;
}
void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(std::make_pair(load_id, dst_id));
_template_stubs.erase(load_id);
}
} // namespace stream_load
} // namespace doris

View File

@ -70,16 +70,36 @@ class LoadStreamStub;
namespace stream_load {
class LoadStreamStubPool;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
class LoadStreams {
public:
LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool);
void release();
Streams& streams() { return _streams; }
private:
Streams _streams;
UniqueId _load_id;
int64_t _dst_id;
std::atomic<int> _use_cnt;
LoadStreamStubPool* _pool;
};
class LoadStreamStubPool {
public:
LoadStreamStubPool();
~LoadStreamStubPool();
std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams);
std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams, int num_sink);
void erase(UniqueId load_id, int64_t dst_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
@ -95,7 +115,7 @@ public:
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> _template_stubs;
std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>> _pool;
std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool;
};
} // namespace stream_load

View File

@ -155,10 +155,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_num_senders = state->num_per_fragment_instances();
_stream_per_node = state->load_stream_per_node();
_total_streams = state->total_load_streams();
_num_local_sink = state->num_local_sink();
DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node
<< ", total_streams " << _total_streams;
<< ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
_is_high_priority =
(state->execution_timeout() <= config::load_task_high_priority_threshold_second);
@ -197,8 +199,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
if (config::share_delta_writers) {
_delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
_delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
_load_id, _num_local_sink);
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
@ -226,18 +228,17 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
std::shared_ptr<Streams> streams;
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node);
auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
// get tablet schema from each backend only in the 1st stream
for (auto& stream : *streams | std::ranges::views::take(1)) {
for (auto& stream : streams->streams() | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, tablets_for_schema,
_total_streams, _state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : *streams | std::ranges::views::drop(1)) {
for (auto& stream : streams->streams() | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {}, _total_streams,
_state->enable_profile()));
@ -300,7 +301,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
for (auto& node_id : location->node_ids) {
streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
streams.emplace_back(_streams_for_node[node_id]->streams().at(_stream_index));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
@ -393,6 +394,9 @@ Status VOlapTableSinkV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
return Status::OK();
}
@ -415,6 +419,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
// release streams from the pool first, to prevent memory leak
for (const auto& [_, streams] : _streams_for_node) {
streams->release();
}
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
@ -425,14 +434,14 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
{
// send CLOSE_LOAD to all streams, return ERROR if any
for (const auto& [_, streams] : _streams_for_node) {
RETURN_IF_ERROR(_close_load(*streams));
RETURN_IF_ERROR(_close_load(streams->streams()));
}
}
{
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : *streams) {
for (const auto& stream : streams->streams()) {
RETURN_IF_ERROR(stream->close_wait());
}
}
@ -440,7 +449,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (const auto& [node_id, streams] : _streams_for_node) {
for (const auto& stream : *streams) {
for (const auto& stream : streams->streams()) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;

View File

@ -77,6 +77,10 @@ class TExpr;
class TabletSchema;
class TupleDescriptor;
namespace stream_load {
class LoadStreams;
}
namespace vectorized {
class OlapTableBlockConvertor;
@ -156,8 +160,9 @@ private:
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
int _stream_per_node = 0;
int _total_streams = 0;
int _stream_per_node = -1;
int _total_streams = -1;
int _num_local_sink = -1;
bool _is_high_priority = false;
bool _write_file_cache = false;
@ -204,7 +209,9 @@ private:
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
std::unordered_map<int64_t, std::shared_ptr<::doris::stream_load::LoadStreams>>
_streams_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;

View File

@ -51,7 +51,7 @@ static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
class MockStreamStub : public LoadStreamStub {
public:
MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id) {};
MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {};
virtual ~MockStreamStub() = default;

View File

@ -42,9 +42,9 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
map.reset();
map2.reset();
map3.reset();
EXPECT_TRUE(map->close().ok());
EXPECT_TRUE(map2->close().ok());
EXPECT_TRUE(map3->close().ok());
EXPECT_EQ(0, pool.size());
}
@ -62,7 +62,7 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
map.reset();
static_cast<void>(map->close());
EXPECT_EQ(0, pool.size());
}

View File

@ -36,16 +36,16 @@ TEST_F(LoadStreamStubPoolTest, test) {
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
EXPECT_NE(streams1, streams2);
streams1.reset();
streams2.reset();
streams3.reset();
streams1->release();
streams2->release();
streams3->release();
EXPECT_EQ(0, pool.size());
EXPECT_EQ(0, pool.templates_size());
}

View File

@ -300,6 +300,7 @@ public class StreamLoadPlanner {
params.setParams(execParams);
params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
params.setTotalLoadStreams(taskInfo.getStreamPerNode());
params.setNumLocalSink(1);
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
queryOptions.setQueryTimeout(timeout);
@ -503,6 +504,7 @@ public class StreamLoadPlanner {
pipParams.setNumSenders(1);
pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
pipParams.setNumLocalSink(1);
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex));

View File

@ -683,7 +683,7 @@ public class Coordinator implements CoordInterface {
int backendIdx = 0;
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
Map<Long, Integer> numSinkOnBackend = Maps.newHashMap();
beToExecStates.clear();
// If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
@ -753,7 +753,7 @@ public class Coordinator implements CoordInterface {
states.addState(execState);
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
backendsWithOlapTableSink.add(execState.backend.getId());
numSinkOnBackend.merge(execState.backend.getId(), 1, Integer::sum);
}
++backendIdx;
}
@ -765,7 +765,10 @@ public class Coordinator implements CoordInterface {
if (tParam.getFragment().getOutputSink() != null
&& tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) {
tParam.setLoadStreamPerNode(loadStreamPerNode);
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode);
tParam.setTotalLoadStreams(numSinkOnBackend.size() * loadStreamPerNode);
tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId()));
LOG.info("num local sink for backend {} is {}", tParam.getBackendId(),
numSinkOnBackend.get(tParam.getBackendId()));
}
}
profileFragmentId += 1;
@ -844,7 +847,7 @@ public class Coordinator implements CoordInterface {
}
}
Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
int numBackendsWithSink = 0;
// 3. group PipelineExecContext by BE.
// So that we can use one RPC to send all fragment instances of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
@ -881,7 +884,7 @@ public class Coordinator implements CoordInterface {
if (entry.getValue().getFragment().getOutputSink() != null
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
backendsWithOlapTableSink.add(backendId);
numBackendsWithSink++;
}
++backendIdx;
}
@ -894,7 +897,10 @@ public class Coordinator implements CoordInterface {
&& entry.getValue().getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode);
entry.getValue().setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode);
entry.getValue().setNumLocalSink(entry.getValue().getLocalParams().size());
LOG.info("num local sink for backend {} is {}", entry.getValue().getBackendId(),
entry.getValue().getNumLocalSink());
}
}

View File

@ -461,6 +461,8 @@ struct TExecPlanFragmentParams {
// total num of load streams the downstream backend will see
27: optional i32 total_load_streams
28: optional i32 num_local_sink
}
struct TExecPlanFragmentParamsList {
@ -678,6 +680,7 @@ struct TPipelineFragmentParams {
30: optional bool group_commit = false;
31: optional i32 load_stream_per_node // num load stream for each sink backend
32: optional i32 total_load_streams // total num of load streams the downstream backend will see
33: optional i32 num_local_sink
}
struct TPipelineFragmentParamsList {