[pipelineX](exchange) Make exchange buffer size configurable (#32201)
This commit is contained in:
@ -264,6 +264,7 @@ DEFINE_mInt32(doris_max_scan_key_num, "48");
|
||||
DEFINE_mInt32(max_pushdown_conditions_per_column, "1024");
|
||||
// (Advanced) Maximum size of per-query receive-side buffer
|
||||
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
|
||||
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
|
||||
|
||||
DEFINE_mInt64(column_dictionary_key_ratio_threshold, "0");
|
||||
DEFINE_mInt64(column_dictionary_key_size_threshold, "0");
|
||||
|
||||
@ -306,6 +306,7 @@ DECLARE_mInt32(doris_max_scan_key_num);
|
||||
DECLARE_mInt32(max_pushdown_conditions_per_column);
|
||||
// (Advanced) Maximum size of per-query receive-side buffer
|
||||
DECLARE_mInt32(exchg_node_buffer_size_bytes);
|
||||
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
|
||||
|
||||
DECLARE_mInt64(column_dictionary_key_ratio_threshold);
|
||||
DECLARE_mInt64(column_dictionary_key_size_threshold);
|
||||
|
||||
@ -111,7 +111,8 @@ void ExchangeSinkBuffer<Parent>::close() {
|
||||
|
||||
template <typename Parent>
|
||||
bool ExchangeSinkBuffer<Parent>::can_write() const {
|
||||
size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size();
|
||||
size_t max_package_size =
|
||||
config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size();
|
||||
size_t total_package_size = 0;
|
||||
for (auto& [_, q] : _instance_to_package_queue) {
|
||||
total_package_size += q.size();
|
||||
@ -168,7 +169,8 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) {
|
||||
std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>();
|
||||
_instance_to_broadcast_package_queue[low_id] =
|
||||
std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>>();
|
||||
_queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size();
|
||||
_queue_capacity =
|
||||
config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size();
|
||||
PUniqueId finst_id;
|
||||
finst_id.set_hi(fragment_instance_id.hi);
|
||||
finst_id.set_lo(fragment_instance_id.lo);
|
||||
|
||||
@ -270,7 +270,6 @@ private:
|
||||
int64_t get_sum_rpc_time();
|
||||
|
||||
std::atomic<int> _total_queue_size = 0;
|
||||
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
|
||||
std::shared_ptr<Dependency> _queue_dependency;
|
||||
std::shared_ptr<Dependency> _finish_dependency;
|
||||
std::atomic<bool> _should_stop {false};
|
||||
|
||||
@ -1413,6 +1413,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
|
||||
if (_scanner_ctx) {
|
||||
_scanner_ctx->stop_scanners(state);
|
||||
}
|
||||
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
|
||||
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
|
||||
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user