fix runtime bloom filter config error
This commit is contained in:
parent
0921188899
commit
925bc82bef
@ -3217,6 +3217,18 @@ int ObStaticEngineCG::generate_spec(ObLogJoinFilter &op, ObJoinFilterSpec &spec,
|
||||
spec.set_shared_filter_type(op.get_filter_type());
|
||||
spec.is_shuffle_ = op.is_use_filter_shuffle();
|
||||
bool enable_rich_format = spec.use_rich_format_;
|
||||
|
||||
uint64_t min_ver = GET_MIN_CLUSTER_VERSION();
|
||||
if (min_ver >= CLUSTER_VERSION_4_3_0_1 || min_ver >= MOCK_CLUSTER_VERSION_4_2_3_0
|
||||
|| (min_ver > MOCK_CLUSTER_VERSION_4_2_1_4 && min_ver < CLUSTER_VERSION_4_2_2_0)) {
|
||||
spec.bloom_filter_ratio_ = GCONF._bloom_filter_ratio;
|
||||
spec.send_bloom_filter_size_ = GCONF._send_bloom_filter_size;
|
||||
} else {
|
||||
// for compatibility, if the cluseter is upgrading, set them as default value 0
|
||||
spec.bloom_filter_ratio_ = 0;
|
||||
spec.send_bloom_filter_size_ = 0;
|
||||
}
|
||||
|
||||
if (OB_FAIL(spec.join_keys_.init(op.get_join_exprs().count()))) {
|
||||
LOG_WARN("failed to init join keys", K(ret));
|
||||
} else if (OB_NOT_NULL(op.get_tablet_id_expr()) &&
|
||||
|
@ -72,7 +72,9 @@ OB_SERIALIZE_MEMBER((ObJoinFilterSpec, ObOpSpec),
|
||||
each_group_size_,
|
||||
rf_build_cmp_infos_,
|
||||
rf_probe_cmp_infos_,
|
||||
px_query_range_info_);
|
||||
px_query_range_info_,
|
||||
bloom_filter_ratio_,
|
||||
send_bloom_filter_size_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObJoinFilterOpInput,
|
||||
share_info_,
|
||||
@ -119,8 +121,19 @@ bool ObJoinFilterOpInput::check_release()
|
||||
int ObJoinFilterOpInput::load_runtime_config(const ObJoinFilterSpec &spec, ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
config_.bloom_filter_ratio_ = ((double)GCONF._bloom_filter_ratio / 100.0);
|
||||
config_.bf_piece_size_ = GCONF._send_bloom_filter_size;
|
||||
if (0 == spec.bloom_filter_ratio_ && 0 == spec.send_bloom_filter_size_) {
|
||||
// bloom_filter_ratio_ and send_bloom_filter_size_ are default value, which indicates the
|
||||
// cluster is upgrading. for compatibility, use the value from GCONF
|
||||
config_.bloom_filter_ratio_ = ((double)GCONF._bloom_filter_ratio / 100.0);
|
||||
config_.bf_piece_size_ = GCONF._send_bloom_filter_size;
|
||||
} else {
|
||||
// bf_piece_size_ means how many int64_t a piece bloom filter contains
|
||||
// we expect to split bloom filter into k pieces with 1MB = 2^20B
|
||||
// so a piece bloom filter should contain
|
||||
// 1024(send_bloom_filter_size_) * 128 = 131,072 int64_t, i.e. 1MB
|
||||
config_.bloom_filter_ratio_ = ((double)spec.bloom_filter_ratio_ / 100.0);
|
||||
config_.bf_piece_size_ = spec.send_bloom_filter_size_ * 128;
|
||||
}
|
||||
config_.each_group_size_ = spec.each_group_size_;
|
||||
config_.runtime_filter_wait_time_ms_ = ctx.get_my_session()->
|
||||
get_runtime_filter_wait_time_ms();
|
||||
@ -425,7 +438,9 @@ ObJoinFilterSpec::ObJoinFilterSpec(common::ObIAllocator &alloc, const ObPhyOpera
|
||||
each_group_size_(OB_INVALID_ID),
|
||||
rf_build_cmp_infos_(alloc),
|
||||
rf_probe_cmp_infos_(alloc),
|
||||
px_query_range_info_(alloc)
|
||||
px_query_range_info_(alloc),
|
||||
bloom_filter_ratio_(0),
|
||||
send_bloom_filter_size_(0)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ public:
|
||||
px_message_compression_(false) {}
|
||||
double bloom_filter_ratio_;
|
||||
int64_t each_group_size_;
|
||||
int64_t bf_piece_size_;
|
||||
int64_t bf_piece_size_; // how many int64_t a piece bloom filter contains
|
||||
int64_t runtime_filter_wait_time_ms_;
|
||||
int64_t runtime_filter_max_in_num_;
|
||||
int64_t runtime_bloom_filter_max_size_;
|
||||
@ -177,8 +177,8 @@ class ObJoinFilterSpec : public ObOpSpec
|
||||
public:
|
||||
ObJoinFilterSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type);
|
||||
|
||||
INHERIT_TO_STRING_KV("op_spec", ObOpSpec,
|
||||
K_(mode), K_(filter_id), K_(filter_len), K_(rf_infos));
|
||||
INHERIT_TO_STRING_KV("op_spec", ObOpSpec, K_(mode), K_(filter_id), K_(filter_len), K_(rf_infos),
|
||||
K_(bloom_filter_ratio), K_(send_bloom_filter_size));
|
||||
|
||||
inline void set_mode(JoinFilterMode mode) { mode_ = mode; }
|
||||
inline JoinFilterMode get_mode() const { return mode_; }
|
||||
@ -212,6 +212,8 @@ public:
|
||||
common::ObFixedArray<ObRFCmpInfo, common::ObIAllocator> rf_build_cmp_infos_;
|
||||
common::ObFixedArray<ObRFCmpInfo, common::ObIAllocator> rf_probe_cmp_infos_;
|
||||
ObPxQueryRangeInfo px_query_range_info_;
|
||||
int64_t bloom_filter_ratio_;
|
||||
int64_t send_bloom_filter_size_; // how many KB a piece bloom filter has
|
||||
};
|
||||
|
||||
class ObJoinFilterOp : public ObOperator
|
||||
|
Loading…
x
Reference in New Issue
Block a user