From 515e76aeca005b56c7f69370dee256a7a7d6ceb6 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 9 Oct 2024 20:46:05 +0000 Subject: [PATCH] placeholder for join filter opt --- deps/oblib/src/lib/wait_event/ob_wait_event.h | 1 + .../ob_join_filter_material_control_info.cpp | 3 ++- .../ob_join_filter_material_control_info.h | 2 ++ src/sql/engine/join/ob_join_filter_op.cpp | 3 ++- src/sql/engine/join/ob_join_filter_op.h | 4 +++- .../ob_dh_join_filter_count_row.cpp | 6 +++-- .../components/ob_dh_join_filter_count_row.h | 23 ++++++++++++++++++- 7 files changed, 36 insertions(+), 6 deletions(-) diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index 9ed75c3a5..cd9fc12f0 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -129,6 +129,7 @@ WAIT_EVENT_DEF(STORAGE_HA_FINISH_TRANSFER, 20006, "sleep: finish transfer sleep WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, 20007, "latch: log external storage io task wait", "", "", "", SYSTEM_IO, true, true) WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, 20008, "latch: log external storage handler rw wait", "", "", "", CONCURRENCY, true, false) WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_WAIT, 20009, "latch: log external storage handler spin wait", "", "", "", CONCURRENCY, true, false) +WAIT_EVENT_DEF(DH_LOCAL_SYNC_COND_WAIT, 20010, "datahub local sync conditional wait", "address", "", "", CONCURRENCY, true, true) // share storage 21001-21999 WAIT_EVENT_DEF(ZONE_STORAGE_MANAGER_LOCK_WAIT, 21001, "latch: zone storage manager maintaince lock wait", "address", "number", "tries", CONCURRENCY, true, false) diff --git a/src/sql/engine/join/ob_join_filter_material_control_info.cpp b/src/sql/engine/join/ob_join_filter_material_control_info.cpp index 4015bf05d..5ff640de1 100644 --- a/src/sql/engine/join/ob_join_filter_material_control_info.cpp +++ b/src/sql/engine/join/ob_join_filter_material_control_info.cpp @@ -20,6 +20,7 @@ namespace oceanbase namespace sql { OB_SERIALIZE_MEMBER(ObJoinFilterMaterialControlInfo, enable_material_, hash_id_, is_controller_, - join_filter_count_, extra_hash_count_, each_sqc_has_full_data_); + join_filter_count_, extra_hash_count_, each_sqc_has_full_data_, + need_sync_row_count_); } } \ No newline at end of file diff --git a/src/sql/engine/join/ob_join_filter_material_control_info.h b/src/sql/engine/join/ob_join_filter_material_control_info.h index 9a5c4195f..9822a00cb 100644 --- a/src/sql/engine/join/ob_join_filter_material_control_info.h +++ b/src/sql/engine/join/ob_join_filter_material_control_info.h @@ -36,6 +36,8 @@ public: uint16_t join_filter_count_{0}; // total join filter count in the left side of a hash join uint16_t extra_hash_count_{0}; // hash value count(one for hash join, several for join filter) bool each_sqc_has_full_data_{false}; // mark whether each sqc has complete data + bool need_sync_row_count_{false}; // if at least one join filter is shared join filter, we need to + // send datahub msg to synchronize row count }; diff --git a/src/sql/engine/join/ob_join_filter_op.cpp b/src/sql/engine/join/ob_join_filter_op.cpp index 683dfe928..183776b35 100644 --- a/src/sql/engine/join/ob_join_filter_op.cpp +++ b/src/sql/engine/join/ob_join_filter_op.cpp @@ -55,7 +55,8 @@ OB_SERIALIZE_MEMBER(ObJoinFilterRuntimeConfig, runtime_filter_wait_time_ms_, runtime_filter_max_in_num_, runtime_bloom_filter_max_size_, - px_message_compression_); + px_message_compression_, + build_send_opt_); OB_SERIALIZE_MEMBER(ObRuntimeFilterInfo, filter_expr_id_, diff --git a/src/sql/engine/join/ob_join_filter_op.h b/src/sql/engine/join/ob_join_filter_op.h index f67290d78..591090cc5 100644 --- a/src/sql/engine/join/ob_join_filter_op.h +++ b/src/sql/engine/join/ob_join_filter_op.h @@ -92,7 +92,8 @@ public: runtime_filter_wait_time_ms_(0), runtime_filter_max_in_num_(0), runtime_bloom_filter_max_size_(0), - px_message_compression_(false) {} + px_message_compression_(false), + build_send_opt_{false} {} double bloom_filter_ratio_; int64_t each_group_size_; int64_t bf_piece_size_; // how many int64_t a piece bloom filter contains @@ -100,6 +101,7 @@ public: int64_t runtime_filter_max_in_num_; int64_t runtime_bloom_filter_max_size_; bool px_message_compression_; + bool build_send_opt_; }; class ObJoinFilterOpInput : public ObOpInput diff --git a/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.cpp b/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.cpp index 80d253126..c667cf78a 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.cpp @@ -23,9 +23,11 @@ using namespace oceanbase::sql; using namespace oceanbase::common; +OB_SERIALIZE_MEMBER(ObJoinFilterNdv, valid_, count_); OB_SERIALIZE_MEMBER((ObJoinFilterCountRowPieceMsg, ObDatahubPieceMsg), each_sqc_has_full_data_, sqc_id_, - total_rows_); -OB_SERIALIZE_MEMBER((ObJoinFilterCountRowWholeMsg, ObDatahubWholeMsg), total_rows_); + total_rows_, ndv_info_); +OB_SERIALIZE_MEMBER((ObJoinFilterCountRowWholeMsg, ObDatahubWholeMsg), total_rows_, ndv_info_); + int ObJoinFilterCountRowPieceMsgListener::on_message(ObJoinFilterCountRowPieceMsgCtx &piece_ctx, common::ObIArray &sqcs, diff --git a/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.h b/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.h index 63eb3f0f5..a061ded87 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.h +++ b/src/sql/engine/px/datahub/components/ob_dh_join_filter_count_row.h @@ -29,6 +29,26 @@ class ObJoinFilterCountRowPieceMsgListener; class ObJoinFilterCountRowPieceMsgCtx; class ObPxCoordInfo; +class ObJoinFilterNdv final +{ + OB_UNIS_VERSION(1); +public: + static void gather_piece_ndv(const ObJoinFilterNdv &piece_ndv, ObJoinFilterNdv &total_ndv); +public: + ~ObJoinFilterNdv() = default; + ObJoinFilterNdv &operator=(const ObJoinFilterNdv &r) = default; + inline void reset() { + valid_ = true; + count_ = 0; + } + + TO_STRING_KV(K(valid_), K(count_)); + bool valid_{true}; + uint64_t count_{0}; +}; + +typedef ObSEArray ObJoinFilterNdvInfo; + class ObJoinFilterCountRowPieceMsg : public ObDatahubPieceMsg { @@ -56,7 +76,7 @@ public: bool each_sqc_has_full_data_{false}; // True iff in shared hash join int64_t sqc_id_{OB_INVALID}; // From which sqc int64_t total_rows_{0}; // row count of one thread - + ObJoinFilterNdvInfo ndv_info_; private: DISALLOW_COPY_AND_ASSIGN(ObJoinFilterCountRowPieceMsg); }; @@ -84,6 +104,7 @@ public: } VIRTUAL_TO_STRING_KV(K_(total_rows)); int64_t total_rows_{0}; + ObJoinFilterNdvInfo ndv_info_; }; struct JoinFilterSqcRowInfo