placeholder for join filter opt

This commit is contained in:
obdev 2024-10-09 20:46:05 +00:00 committed by ob-robot
parent 3bce0a45f5
commit 515e76aeca
7 changed files with 36 additions and 6 deletions

View File

@ -129,6 +129,7 @@ WAIT_EVENT_DEF(STORAGE_HA_FINISH_TRANSFER, 20006, "sleep: finish transfer sleep
WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, 20007, "latch: log external storage io task wait", "", "", "", SYSTEM_IO, true, true)
WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, 20008, "latch: log external storage handler rw wait", "", "", "", CONCURRENCY, true, false)
WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_WAIT, 20009, "latch: log external storage handler spin wait", "", "", "", CONCURRENCY, true, false)
WAIT_EVENT_DEF(DH_LOCAL_SYNC_COND_WAIT, 20010, "datahub local sync conditional wait", "address", "", "", CONCURRENCY, true, true)
// share storage 21001-21999
WAIT_EVENT_DEF(ZONE_STORAGE_MANAGER_LOCK_WAIT, 21001, "latch: zone storage manager maintaince lock wait", "address", "number", "tries", CONCURRENCY, true, false)

View File

@ -20,6 +20,7 @@ namespace oceanbase
namespace sql
{
OB_SERIALIZE_MEMBER(ObJoinFilterMaterialControlInfo, enable_material_, hash_id_, is_controller_,
join_filter_count_, extra_hash_count_, each_sqc_has_full_data_);
join_filter_count_, extra_hash_count_, each_sqc_has_full_data_,
need_sync_row_count_);
}
}

View File

@ -36,6 +36,8 @@ public:
uint16_t join_filter_count_{0}; // total join filter count in the left side of a hash join
uint16_t extra_hash_count_{0}; // hash value count(one for hash join, several for join filter)
bool each_sqc_has_full_data_{false}; // mark whether each sqc has complete data
bool need_sync_row_count_{false}; // if at least one join filter is shared join filter, we need to
// send datahub msg to synchronize row count
};

View File

@ -55,7 +55,8 @@ OB_SERIALIZE_MEMBER(ObJoinFilterRuntimeConfig,
runtime_filter_wait_time_ms_,
runtime_filter_max_in_num_,
runtime_bloom_filter_max_size_,
px_message_compression_);
px_message_compression_,
build_send_opt_);
OB_SERIALIZE_MEMBER(ObRuntimeFilterInfo,
filter_expr_id_,

View File

@ -92,7 +92,8 @@ public:
runtime_filter_wait_time_ms_(0),
runtime_filter_max_in_num_(0),
runtime_bloom_filter_max_size_(0),
px_message_compression_(false) {}
px_message_compression_(false),
build_send_opt_{false} {}
double bloom_filter_ratio_;
int64_t each_group_size_;
int64_t bf_piece_size_; // how many int64_t a piece bloom filter contains
@ -100,6 +101,7 @@ public:
int64_t runtime_filter_max_in_num_;
int64_t runtime_bloom_filter_max_size_;
bool px_message_compression_;
bool build_send_opt_;
};
class ObJoinFilterOpInput : public ObOpInput

View File

@ -23,9 +23,11 @@
using namespace oceanbase::sql;
using namespace oceanbase::common;
OB_SERIALIZE_MEMBER(ObJoinFilterNdv, valid_, count_);
OB_SERIALIZE_MEMBER((ObJoinFilterCountRowPieceMsg, ObDatahubPieceMsg), each_sqc_has_full_data_, sqc_id_,
total_rows_);
OB_SERIALIZE_MEMBER((ObJoinFilterCountRowWholeMsg, ObDatahubWholeMsg), total_rows_);
total_rows_, ndv_info_);
OB_SERIALIZE_MEMBER((ObJoinFilterCountRowWholeMsg, ObDatahubWholeMsg), total_rows_, ndv_info_);
int ObJoinFilterCountRowPieceMsgListener::on_message(ObJoinFilterCountRowPieceMsgCtx &piece_ctx,
common::ObIArray<ObPxSqcMeta *> &sqcs,

View File

@ -29,6 +29,26 @@ class ObJoinFilterCountRowPieceMsgListener;
class ObJoinFilterCountRowPieceMsgCtx;
class ObPxCoordInfo;
class ObJoinFilterNdv final
{
OB_UNIS_VERSION(1);
public:
static void gather_piece_ndv(const ObJoinFilterNdv &piece_ndv, ObJoinFilterNdv &total_ndv);
public:
~ObJoinFilterNdv() = default;
ObJoinFilterNdv &operator=(const ObJoinFilterNdv &r) = default;
inline void reset() {
valid_ = true;
count_ = 0;
}
TO_STRING_KV(K(valid_), K(count_));
bool valid_{true};
uint64_t count_{0};
};
typedef ObSEArray<ObJoinFilterNdv, 4> ObJoinFilterNdvInfo;
class ObJoinFilterCountRowPieceMsg
: public ObDatahubPieceMsg<dtl::ObDtlMsgType::DH_JOIN_FILTER_COUNT_ROW_PIECE_MSG>
{
@ -56,7 +76,7 @@ public:
bool each_sqc_has_full_data_{false}; // True iff in shared hash join
int64_t sqc_id_{OB_INVALID}; // From which sqc
int64_t total_rows_{0}; // row count of one thread
ObJoinFilterNdvInfo ndv_info_;
private:
DISALLOW_COPY_AND_ASSIGN(ObJoinFilterCountRowPieceMsg);
};
@ -84,6 +104,7 @@ public:
}
VIRTUAL_TO_STRING_KV(K_(total_rows));
int64_t total_rows_{0};
ObJoinFilterNdvInfo ndv_info_;
};
struct JoinFilterSqcRowInfo