From ad5fdf9fea34a2b86f3d6120fc93320d00a1d9e7 Mon Sep 17 00:00:00 2001 From: simonjoylet Date: Thu, 10 Aug 2023 08:42:33 +0000 Subject: [PATCH] remove core at keybtree, adjust sort keys for unique index --- src/sql/optimizer/ob_del_upd_log_plan.cpp | 93 +++++++++++++++++-- src/sql/optimizer/ob_del_upd_log_plan.h | 6 +- src/storage/memtable/mvcc/ob_keybtree.cpp | 3 +- src/storage/memtable/mvcc/ob_query_engine.cpp | 7 +- 4 files changed, 95 insertions(+), 14 deletions(-) diff --git a/src/sql/optimizer/ob_del_upd_log_plan.cpp b/src/sql/optimizer/ob_del_upd_log_plan.cpp index 40713c196d..0932b23a7b 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.cpp +++ b/src/sql/optimizer/ob_del_upd_log_plan.cpp @@ -629,15 +629,39 @@ int ObDelUpdLogPlan::compute_exchange_info_for_pdml_insert(const ObShardingInfo } if (OB_SUCC(ret)) { if (get_optimizer_context().is_online_ddl() && !get_optimizer_context().is_heap_table_ddl()) { + int64_t sample_sort_column_count = 0; + ObArray ddl_sort_keys; if (OB_UNLIKELY(!get_stmt()->is_insert_stmt())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected error", K(get_stmt()), K(ret)); - } else if (OB_FAIL(static_cast(get_stmt())->get_ddl_sort_keys(exch_info.sort_keys_))) { + } else if (OB_FAIL(static_cast(get_stmt())->get_ddl_sort_keys(ddl_sort_keys))) { LOG_WARN("fail to get ddl sort key", K(ret)); - } else if (exch_info.dist_method_ == ObPQDistributeMethod::PARTITION_RANGE && - OB_FAIL(exch_info.repart_all_tablet_ids_.assign(target_sharding.get_all_tablet_ids()))) { - LOG_WARN("failed to get all partition ids", K(ret)); - } else { /*do nothing*/ } + } else if (OB_FAIL(get_ddl_sample_sort_column_count(sample_sort_column_count))) { + LOG_WARN("get ddl sample sort column count failed", K(ret)); + } else if (sample_sort_column_count > ddl_sort_keys.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sample sort column count is larger than ddl sort keys", K(ret), K(ddl_sort_keys.count()), K(sample_sort_column_count)); + } else { + exch_info.sort_keys_.reset(); + if (sample_sort_column_count > 0) { + if (OB_FAIL(exch_info.sort_keys_.reserve(sample_sort_column_count))) { + LOG_WARN("reserve sort keys array failed", K(ret), K(sample_sort_column_count)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < sample_sort_column_count; ++i) { + if (OB_FAIL(exch_info.sort_keys_.push_back(ddl_sort_keys.at(i)))) { + LOG_WARN("push back sort item failed", K(ret), K(i), K(sample_sort_column_count)); + } + } + } else if (OB_FAIL(exch_info.sort_keys_.assign(ddl_sort_keys))) { + LOG_WARN("assign ddl sort keys failed", K(ret), K(ddl_sort_keys.count())); + } + if (OB_SUCC(ret)) { + if (exch_info.dist_method_ == ObPQDistributeMethod::PARTITION_RANGE && + OB_FAIL(exch_info.repart_all_tablet_ids_.assign(target_sharding.get_all_tablet_ids()))) { + LOG_WARN("failed to get all partition ids", K(ret)); + } else { /*do nothing*/ } + } + } } else { if (OB_FAIL(compute_hash_dist_exprs_for_pdml_insert(exch_info, index_dml_info))) { @@ -1044,6 +1068,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_insert(bool is_index_maintenance, int ret = OB_SUCCESS; ObExchangeInfo exch_info; ObSEArray sort_keys; + ObSEArray sample_sort_keys; ObSEArray px_coord_sort_keys; ObSEArray sharding_conditions; ObShardingInfo *target_sharding = NULL; @@ -1082,10 +1107,10 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_insert(bool is_index_maintenance, exch_info))) { LOG_WARN("failed to compute exchange info for insert", K(ret)); } else if (get_optimizer_context().is_online_ddl() && !get_optimizer_context().is_heap_table_ddl() && - OB_FAIL(get_ddl_sort_keys_with_part_expr(exch_info, sort_keys))) { + OB_FAIL(get_ddl_sort_keys_with_part_expr(exch_info, sort_keys, sample_sort_keys))) { LOG_WARN("failed to get ddl sort keys", K(ret)); - } else if (!sort_keys.empty() && OB_FAIL(gen_px_coord_sampling_sort_keys( - sort_keys, px_coord_sort_keys))) { + } else if (!sample_sort_keys.empty() && OB_FAIL(gen_px_coord_sampling_sort_keys( + sample_sort_keys, px_coord_sort_keys))) { LOG_WARN("generate px coord sort order items failed", K(ret)); } else { bool need_partition_id = target_sharding->get_part_level() == share::schema::PARTITION_LEVEL_ONE || @@ -1097,6 +1122,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_insert(bool is_index_maintenance, exch_info, target_table_partition, sort_keys, + sample_sort_keys, px_coord_sort_keys, is_index_maintenance, is_last_dml_op, @@ -1201,6 +1227,7 @@ int ObDelUpdLogPlan::create_online_ddl_plan(ObLogicalOperator *&top, const ObExchangeInfo &exch_info, ObTablePartitionInfo *table_partition_info, const ObIArray &sort_keys, + const ObIArray &sample_sort_keys, const ObIArray &px_coord_sort_keys, bool is_index_maintenance, bool is_last_dml_op, @@ -1229,7 +1256,7 @@ int ObDelUpdLogPlan::create_online_ddl_plan(ObLogicalOperator *&top, } } else { if (OB_FAIL(allocate_stat_collector_as_top(top, - ObStatCollectorType::SAMPLE_SORT, sort_keys, + ObStatCollectorType::SAMPLE_SORT, sample_sort_keys, table_partition_info->get_part_level()))) { LOG_WARN("fail to allocate stat collector as top", K(ret)); } else if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) { @@ -1257,31 +1284,77 @@ int ObDelUpdLogPlan::create_online_ddl_plan(ObLogicalOperator *&top, return ret; } +int ObDelUpdLogPlan::get_ddl_sample_sort_column_count(int64_t &sample_sort_column_count) +{ + int ret = OB_SUCCESS; + sample_sort_column_count = 0; + const ObInsertStmt *ins_stmt = static_cast(get_stmt()); + if (OB_ISNULL(ins_stmt)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("insert stmt is null", K(ret), KP(ins_stmt)); + } else if (2 != ins_stmt->get_table_size()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, table item size is not as expected", K(ret), "table_item_size", ins_stmt->get_table_size()); + } else { + TableItem* table_item = ins_stmt->get_table_item_by_id(ins_stmt->get_insert_table_info().table_id_); + const uint64_t tenant_id = optimizer_context_.get_session_info()->get_effective_tenant_id(); + ObSchemaGetterGuard *schema_guard = nullptr; + const ObTableSchema *table_schema = nullptr; + if (OB_ISNULL(schema_guard = optimizer_context_.get_schema_guard())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema_guard is null", K(ret), KP(schema_guard)); + } else if (OB_ISNULL(table_item)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table item of insert stmt is null", K(ret)); + } else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_item->ddl_table_id_, table_schema))) { + LOG_WARN("get target table schema failed", K(ret), K(tenant_id), KPC(table_item)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("target table not exist", K(ret), K(tenant_id), KPC(table_item)); + } else if (table_schema->is_unique_index()) { + sample_sort_column_count = table_schema->get_index_column_num(); + } + } + return ret; +} + int ObDelUpdLogPlan::get_ddl_sort_keys_with_part_expr(ObExchangeInfo &exch_info, - common::ObIArray &sort_keys) + common::ObIArray &sort_keys, + common::ObIArray &sample_sort_keys) { int ret = OB_SUCCESS; sort_keys.reset(); + sample_sort_keys.reset(); ObArray tmp_sort_keys; const ObInsertStmt *ins_stmt = static_cast(get_stmt()); + int64_t sample_sort_column_count = 0; if (2 != ins_stmt->get_table_size()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, table item size is not as expected", K(ret), "table_item_size", ins_stmt->get_table_size()); } else if (OB_FAIL(ins_stmt->get_ddl_sort_keys(tmp_sort_keys))) { LOG_WARN("get ddl sort keys failed", K(ret)); + } else if (OB_FAIL(get_ddl_sample_sort_column_count(sample_sort_column_count))) { + LOG_WARN("get ddl sample sort column count failed", K(ret)); } else if (OB_NOT_NULL(exch_info.calc_part_id_expr_)) { OrderItem item; item.expr_ = exch_info.calc_part_id_expr_; item.order_type_ = NULLS_FIRST_ASC; if (OB_FAIL(sort_keys.push_back(item))) { LOG_WARN("push back sort keys with part failed", K(ret)); + } else if (OB_FAIL(sample_sort_keys.push_back(item))) { + LOG_WARN("push back sample sort keys with part failed", K(ret)); } } for (int64_t i = 0; OB_SUCC(ret) && i < tmp_sort_keys.count(); ++i) { if (OB_FAIL(sort_keys.push_back(tmp_sort_keys.at(i)))) { LOG_WARN("push back sort keys failed", K(ret)); + } else if (sample_sort_column_count > 0 && i >= sample_sort_column_count) { + // skip + } else if (OB_FAIL(sample_sort_keys.push_back(tmp_sort_keys.at(i)))) { + LOG_WARN("push back sample sort keys failed", K(ret)); } } + LOG_INFO("get ddl sort keys and sample sort keys", K(ret), K(sort_keys.count()), K(sample_sort_keys.count()), K(sample_sort_column_count)); return ret; } diff --git a/src/sql/optimizer/ob_del_upd_log_plan.h b/src/sql/optimizer/ob_del_upd_log_plan.h index d37317d6af..70083e42ca 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.h +++ b/src/sql/optimizer/ob_del_upd_log_plan.h @@ -131,6 +131,7 @@ public: const ObExchangeInfo &exch_info, ObTablePartitionInfo *table_location, const ObIArray &sort_keys, + const ObIArray &sample_sort_keys, const ObIArray &px_coord_sort_keys, bool is_index_maintenance, bool is_last_dml_op, @@ -138,8 +139,11 @@ public: bool is_pdml_update_split, IndexDMLInfo *index_dml_info); + int get_ddl_sample_sort_column_count(int64_t &sample_sort_column_count); + int get_ddl_sort_keys_with_part_expr(ObExchangeInfo &exch_info, - common::ObIArray &sort_keys); + common::ObIArray &sort_keys, + common::ObIArray &sample_sort_keys); int replace_exch_info_exprs(ObExchangeInfo &exch_info); diff --git a/src/storage/memtable/mvcc/ob_keybtree.cpp b/src/storage/memtable/mvcc/ob_keybtree.cpp index ed33e1f55c..d6349442ae 100644 --- a/src/storage/memtable/mvcc/ob_keybtree.cpp +++ b/src/storage/memtable/mvcc/ob_keybtree.cpp @@ -631,8 +631,7 @@ int WriteHandle::insert_and_split_upward(BtreeKey key, Btree ret = OB_ENTRY_EXIST; BtreeVal old_val = val; val = old_node->get_val(pos, index); - OB_LOG(ERROR, "duplicate key", K(old_node->get_key(pos, index)), K(key), K(old_node->get_val(pos, index)), K(val), K(old_val)); - ob_abort(); + OB_LOG(WARN, "duplicate key", K(old_node->get_key(pos, index)), K(key), K(old_node->get_val(pos, index)), K(val), K(old_val)); } else { ret = insert_into_node(old_node, pos, key, val, new_node_1, new_node_2); } diff --git a/src/storage/memtable/mvcc/ob_query_engine.cpp b/src/storage/memtable/mvcc/ob_query_engine.cpp index 71ab60c537..c94fd33dab 100644 --- a/src/storage/memtable/mvcc/ob_query_engine.cpp +++ b/src/storage/memtable/mvcc/ob_query_engine.cpp @@ -320,7 +320,12 @@ int ObQueryEngine::ensure(const ObMemtableKey *key, ObMvccRow *value) } else { ObStoreRowkeyWrapper key_wrapper(key->get_rowkey()); if (OB_FAIL(node_ptr->get_keybtree().insert(key_wrapper, value))) { - TRANS_LOG(WARN, "ensure keybtree fail", KR(ret), K(*key)); + if (OB_ENTRY_EXIST == ret) { + TRANS_LOG(ERROR, "ensure keybtree fail", KR(ret), K(*key)); + ob_abort(); + } else { + TRANS_LOG(WARN, "ensure keybtree fail", KR(ret), K(*key)); + } } else { value->set_btree_indexed(); }