From 00632ae17c708fbe3f6165eaa516781e3f7ed23f Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Fri, 5 Jan 2024 10:57:04 +0000 Subject: [PATCH] [CP] [Bugfix]support put from client --- src/libtable/src/ob_table_rpc_impl.cpp | 4 +- .../ob_table_batch_execute_processor.cpp | 20 ++--- src/observer/table/ob_table_context.cpp | 88 +++++++++++-------- src/observer/table/ob_table_context.h | 10 ++- .../table/ob_table_execute_processor.cpp | 20 ++--- .../ob_table_query_and_mutate_processor.cpp | 2 +- src/share/table/ob_table.cpp | 2 +- src/share/table/ob_table.h | 17 +++- src/share/table/ob_table_rpc_struct.cpp | 4 +- src/share/table/ob_table_rpc_struct.h | 25 ++++-- .../observer/table/test_create_executor.cpp | 2 +- 11 files changed, 115 insertions(+), 79 deletions(-) diff --git a/src/libtable/src/ob_table_rpc_impl.cpp b/src/libtable/src/ob_table_rpc_impl.cpp index a0748440e..c6614497a 100644 --- a/src/libtable/src/ob_table_rpc_impl.cpp +++ b/src/libtable/src/ob_table_rpc_impl.cpp @@ -96,7 +96,7 @@ int ObTableRpcImpl::execute(const ObTableOperation &table_operation, const ObTab request.consistency_level_ = request_options.consistency_level(); request.returning_affected_rows_ = request_options.returning_affected_rows(); request.returning_affected_entity_ = request_options.returning_affected_entity(); - request.returning_rowkey_ = request_options.returning_rowkey(); + request.option_flag_ = request_options.get_option_flag(); ret = rpc_proxy_-> timeout(request_options.server_timeout()) @@ -286,7 +286,7 @@ int ObTableRpcImpl::batch_execute(const ObTableBatchOperation &batch_operation, request.consistency_level_ = request_options.consistency_level(); request.returning_affected_rows_ = request_options.returning_affected_rows(); request.returning_affected_entity_ = request_options.returning_affected_entity(); - request.returning_rowkey_ = request_options.returning_rowkey(); + request.option_flag_ = request_options.get_option_flag(); request.batch_operation_as_atomic_ = request_options.batch_operation_as_atomic(); if (REACH_TIME_INTERVAL(10*1000*1000)) { // TODO: we can not print the tenat memory usage now. diff --git a/src/observer/table/ob_table_batch_execute_processor.cpp b/src/observer/table/ob_table_batch_execute_processor.cpp index 8791f188a..00b7f3be6 100644 --- a/src/observer/table/ob_table_batch_execute_processor.cpp +++ b/src/observer/table/ob_table_batch_execute_processor.cpp @@ -76,12 +76,12 @@ int ObTableBatchExecuteP::check_arg() int ObTableBatchExecuteP::check_arg2() const { int ret = OB_SUCCESS; - if (arg_.returning_rowkey_ - || arg_.returning_affected_entity_) { + if (arg_.returning_rowkey() + || arg_.returning_affected_entity()) { ret = OB_NOT_SUPPORTED; LOG_WARN("some options not supported yet", K(ret), - "returning_rowkey", arg_.returning_rowkey_, - "returning_affected_entity", arg_.returning_affected_entity_); + "returning_rowkey", arg_.returning_rowkey(), + "returning_affected_entity", arg_.returning_affected_entity()); } return ret; } @@ -110,7 +110,7 @@ uint64_t ObTableBatchExecuteP::get_request_checksum() const uint64_t op_checksum = arg_.batch_operation_.get_checksum(); checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum)); checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_)); - checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_)); + checksum = ob_crc64(checksum, &arg_.option_flag_, sizeof(arg_.option_flag_)); checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_)); checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_)); checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_)); @@ -848,7 +848,7 @@ int ObTableBatchExecuteP::init_single_op_tb_ctx(table::ObTableCtx &ctx, break; } case ObTableOperationType::INSERT_OR_UPDATE: { - if (OB_FAIL(ctx.init_insert_up())) { + if (OB_FAIL(ctx.init_insert_up(arg_.use_put()))) { LOG_WARN("fail to init insert up ctx", K(ret), K(ctx)); } break; @@ -860,15 +860,15 @@ int ObTableBatchExecuteP::init_single_op_tb_ctx(table::ObTableCtx &ctx, break; } case ObTableOperationType::APPEND: { - if (OB_FAIL(ctx.init_append(arg_.returning_affected_entity_, - arg_.returning_rowkey_))) { + if (OB_FAIL(ctx.init_append(arg_.returning_affected_entity(), + arg_.returning_rowkey()))) { LOG_WARN("fail to init append ctx", K(ret), K(ctx)); } break; } case ObTableOperationType::INCREMENT: { - if (OB_FAIL(ctx.init_increment(arg_.returning_affected_entity_, - arg_.returning_rowkey_))) { + if (OB_FAIL(ctx.init_increment(arg_.returning_affected_entity(), + arg_.returning_rowkey()))) { LOG_WARN("fail to init increment ctx", K(ret), K(ctx)); } break; diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index 309cbb46e..56c38a1f3 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -272,42 +272,6 @@ int ObTableCtx::get_expr_from_assignments(const ObString &col_name, ObRawExpr *& return ret; } -/* - check insert up operation can use put implement or not - 1. can not have any index. - 2. all column must be filled. -*/ -int ObTableCtx::check_insert_up_can_use_put(bool &use_put) -{ - int ret = OB_SUCCESS; - use_put = true; - - if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put - use_put = false; - } else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid operation type", K(ret), K_(operation_type)); - } else if (is_htable()) { // htable has no index and alway full filled. - use_put = true; - } else if (!related_index_ids_.empty()) { // has index, can not use put - use_put = false; - } else if (OB_ISNULL(table_schema_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("table schema is null", K(ret)); - } else { - if (OB_ISNULL(entity_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("entity is null", K(ret)); - } else if (table_schema_->get_column_count() - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count()) { // all columns are filled - use_put = true; - } else { // some columns are missing - use_put = false; - } - } - - return ret; -} - /* 1. ObConflictChecker need ObPhysicalPlanCtx. 2. now() expr need ObPhysicalPlanCtx.cur_time_. @@ -1196,10 +1160,11 @@ int ObTableCtx::init_replace() 1. init update 2. reset for is_for_update_ flag, cause init_update() had set is_for_update_=true */ -int ObTableCtx::init_insert_up() +int ObTableCtx::init_insert_up(bool is_client_set_put) { int ret = OB_SUCCESS; is_for_insertup_ = true; + is_client_set_put_ = is_client_set_put; if (OB_FAIL(init_update())) { LOG_WARN("fail to init update", K(ret)); @@ -1271,7 +1236,7 @@ int ObTableCtx::init_append(bool return_affected_entity, bool return_rowkey) return_affected_entity_ = return_affected_entity; return_rowkey_ = return_rowkey; - if (OB_FAIL(init_insert_up())) { + if (OB_FAIL(init_insert_up(false))) { LOG_WARN("fail to init insert up", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < assigns_.count(); i++) { @@ -1334,7 +1299,7 @@ int ObTableCtx::init_increment(bool return_affected_entity, bool return_rowkey) return_affected_entity_ = return_affected_entity; return_rowkey_ = return_rowkey; - if (OB_FAIL(init_insert_up())) { + if (OB_FAIL(init_insert_up(false))) { LOG_WARN("fail to init insert up", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < assigns_.count(); i++) { @@ -1730,5 +1695,50 @@ int ObTableCtx::get_related_tablet_id(const share::schema::ObTableSchema &index_ return ret; } +/* + check insert up operation can use put implement or not + 1. can not have any index. + 2. all column must be filled. +*/ +int ObTableCtx::check_insert_up_can_use_put(bool &use_put) +{ + int ret = OB_SUCCESS; + use_put = true; + + if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put + use_put = false; + } else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid operation type", K(ret), K_(operation_type)); + } else if (is_htable()) { // htable has no index and alway full filled. + use_put = true; + } else if (is_client_set_put_ && !related_index_ids_.empty()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("client set use_put flag, but has local index is not support", K(ret), K_(related_index_ids)); + } else if (!related_index_ids_.empty()) { // has index, can not use put + use_put = false; + } else if (OB_ISNULL(table_schema_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table schema is null", K(ret)); + } else { + bool is_all_columns_filled = false; + if (OB_ISNULL(entity_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("entity is null", K(ret)); + } else if (FALSE_IT(is_all_columns_filled = table_schema_->get_column_count() + - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count())) { // all columns are filled + } else if (is_client_set_put_ && !is_all_columns_filled) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("client set use_put flag, but not fill all columns is not support", K(ret), KPC_(table_schema), KPC_(entity)); + } else if (is_client_set_put_ || is_all_columns_filled) { + use_put = true; + } else { // some columns are missing + use_put = false; + } + } + + return ret; +} + } // namespace table } // namespace oceanbase diff --git a/src/observer/table/ob_table_context.h b/src/observer/table/ob_table_context.h index 0a7f88172..bcfa34959 100644 --- a/src/observer/table/ob_table_context.h +++ b/src/observer/table/ob_table_context.h @@ -151,6 +151,7 @@ public: cur_cluster_version_ = GET_MIN_CLUSTER_VERSION(); is_ttl_table_ = false; is_skip_scan_ = false; + is_client_set_put_ = false; } virtual ~ObTableCtx() {} @@ -184,7 +185,8 @@ public: K_(entity_type), K_(cur_cluster_version), K_(is_ttl_table), - K_(is_skip_scan)); + K_(is_skip_scan), + K_(is_client_set_put)); public: //////////////////////////////////////// getter //////////////////////////////////////////////// // for common @@ -303,6 +305,8 @@ public: // for delete OB_INLINE void set_skip_scan(bool skip_scan) { is_skip_scan_ = skip_scan; } OB_INLINE bool is_skip_scan() { return is_skip_scan_; } + // for put + OB_INLINE void set_client_use_put(bool is_client_use_put) { is_client_set_put_ = is_client_use_put; } public: // 基于 table name 初始化common部分(不包括expr_info_, exec_ctx_) int init_common(ObTableApiCredential &credential, @@ -327,7 +331,7 @@ public: // 初始化replace相关 int init_replace(); // 初始化insert_up相关 - int init_insert_up(); + int init_insert_up(bool is_client_set_put); // 初始化get相关 int init_get(); // 初始化increment相关 @@ -477,6 +481,8 @@ private: bool is_ttl_table_; // for delete skip scan bool is_skip_scan_; + // for put + bool is_client_set_put_; private: DISALLOW_COPY_AND_ASSIGN(ObTableCtx); }; diff --git a/src/observer/table/ob_table_execute_processor.cpp b/src/observer/table/ob_table_execute_processor.cpp index e7402882e..80b7ae294 100644 --- a/src/observer/table/ob_table_execute_processor.cpp +++ b/src/observer/table/ob_table_execute_processor.cpp @@ -95,11 +95,11 @@ int ObTableApiExecuteP::check_arg2() const if (ObTableOperationType::Type::APPEND != op_type && ObTableOperationType::Type::INCREMENT != op_type) { - if (arg_.returning_rowkey_ || arg_.returning_affected_entity_) { + if (arg_.returning_rowkey() || arg_.returning_affected_entity()) { ret = OB_NOT_SUPPORTED; LOG_WARN("some options not supported yet", K(ret), - "returning_rowkey", arg_.returning_rowkey_, - "returning_affected_entity", arg_.returning_affected_entity_, + "returning_rowkey", arg_.returning_rowkey(), + "returning_affected_entity", arg_.returning_affected_entity(), "operation_type", op_type); } } @@ -127,7 +127,7 @@ int ObTableApiExecuteP::init_tb_ctx() switch(op_type) { case ObTableOperationType::INSERT: { if (tb_ctx_.is_ttl_table()) { - if (OB_FAIL(tb_ctx_.init_insert_up())) { + if (OB_FAIL(tb_ctx_.init_insert_up(arg_.use_put()))) { LOG_WARN("fail to init insert up ctx", K(ret), K(tb_ctx_)); } } else { @@ -156,21 +156,21 @@ int ObTableApiExecuteP::init_tb_ctx() break; } case ObTableOperationType::INSERT_OR_UPDATE: { - if (OB_FAIL(tb_ctx_.init_insert_up())) { + if (OB_FAIL(tb_ctx_.init_insert_up(arg_.use_put()))) { LOG_WARN("fail to init insert up ctx", K(ret), K(tb_ctx_)); } break; } case ObTableOperationType::APPEND: { - if (OB_FAIL(tb_ctx_.init_append(arg_.returning_affected_entity_, - arg_.returning_rowkey_))) { + if (OB_FAIL(tb_ctx_.init_append(arg_.returning_affected_entity(), + arg_.returning_rowkey()))) { LOG_WARN("fail to init append ctx", K(ret), K(tb_ctx_)); } break; } case ObTableOperationType::INCREMENT: { - if (OB_FAIL(tb_ctx_.init_increment(arg_.returning_affected_entity_, - arg_.returning_rowkey_))) { + if (OB_FAIL(tb_ctx_.init_increment(arg_.returning_affected_entity(), + arg_.returning_rowkey()))) { LOG_WARN("fail to init increment ctx", K(ret), K(tb_ctx_)); } break; @@ -316,7 +316,7 @@ uint64_t ObTableApiExecuteP::get_request_checksum() uint64_t checksum = 0; checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length()); checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_)); - checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_)); + checksum = ob_crc64(checksum, &arg_.option_flag_, sizeof(arg_.option_flag_)); checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_)); checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_)); checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_)); diff --git a/src/observer/table/ob_table_query_and_mutate_processor.cpp b/src/observer/table/ob_table_query_and_mutate_processor.cpp index 11f91b44a..ec731a9dc 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.cpp +++ b/src/observer/table/ob_table_query_and_mutate_processor.cpp @@ -184,7 +184,7 @@ int ObTableQueryAndMutateP::init_tb_ctx(table::ObTableCtx &ctx, break; } case ObTableOperationType::INSERT_OR_UPDATE: { - if (OB_FAIL(ctx.init_insert_up())) { + if (OB_FAIL(ctx.init_insert_up(false))) { LOG_WARN("fail to init insert up ctx", K(ret), K(ctx)); } break; diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index 20704ec52..ee046e9b4 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -542,7 +542,7 @@ ObTableRequestOptions::ObTableRequestOptions() max_execution_time_us_(10*1000*1000), retry_policy_(NULL), returning_affected_rows_(false), - returning_rowkey_(false), + option_flag_(OB_TABLE_OPTION_DEFAULT), returning_affected_entity_(false), batch_operation_as_atomic_(false), binlog_row_image_type_(ObBinlogRowImageType::FULL) diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index 296926f32..eddb4f4e4 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -38,6 +38,11 @@ class ObNewRow; namespace table { + +#define OB_TABLE_OPTION_DEFAULT INT64_C(0) +#define OB_TABLE_OPTION_RETURNING_ROWKEY (INT64_C(1) << 0) +#define OB_TABLE_OPTION_USE_PUT (INT64_C(1) << 1) + using common::ObString; using common::ObRowkey; using common::ObObj; @@ -469,21 +474,27 @@ public: ObIRetryPolicy* retry_policy() { return retry_policy_; } void set_returning_affected_rows(bool returning) { returning_affected_rows_ = returning; } bool returning_affected_rows() const { return returning_affected_rows_; } - void set_returning_rowkey(bool returning) { returning_rowkey_ = returning; } - bool returning_rowkey() const { return returning_rowkey_; } + void set_returning_rowkey(bool returning) + { + if (returning) { + option_flag_ |= OB_TABLE_OPTION_RETURNING_ROWKEY; + } + } + bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; } void set_returning_affected_entity(bool returning) { returning_affected_entity_ = returning; } bool returning_affected_entity() const { return returning_affected_entity_; } void set_batch_operation_as_atomic(bool atomic) { batch_operation_as_atomic_ = atomic; } bool batch_operation_as_atomic() const { return batch_operation_as_atomic_; } void set_binlog_row_image_type(ObBinlogRowImageType type) { binlog_row_image_type_ = type; } ObBinlogRowImageType binlog_row_image_type() const { return binlog_row_image_type_; } + uint8_t get_option_flag() const { return option_flag_; } private: ObTableConsistencyLevel consistency_level_; int64_t server_timeout_us_; int64_t max_execution_time_us_; ObIRetryPolicy *retry_policy_; bool returning_affected_rows_; // default: false - bool returning_rowkey_; // default: false + uint8_t option_flag_; // default: 0 bool returning_affected_entity_; // default: false bool batch_operation_as_atomic_; // default: false // int route_policy diff --git a/src/share/table/ob_table_rpc_struct.cpp b/src/share/table/ob_table_rpc_struct.cpp index 4c8fe6327..270158d22 100644 --- a/src/share/table/ob_table_rpc_struct.cpp +++ b/src/share/table/ob_table_rpc_struct.cpp @@ -49,7 +49,7 @@ OB_SERIALIZE_MEMBER(ObTableOperationRequest, entity_type_, table_operation_, consistency_level_, - returning_rowkey_, + option_flag_, returning_affected_entity_, returning_affected_rows_, binlog_row_image_type_); @@ -61,7 +61,7 @@ OB_SERIALIZE_MEMBER(ObTableBatchOperationRequest, entity_type_, batch_operation_, consistency_level_, - returning_rowkey_, + option_flag_, returning_affected_entity_, returning_affected_rows_, tablet_id_, diff --git a/src/share/table/ob_table_rpc_struct.h b/src/share/table/ob_table_rpc_struct.h index 586fc07d8..c350a5aa8 100644 --- a/src/share/table/ob_table_rpc_struct.h +++ b/src/share/table/ob_table_rpc_struct.h @@ -88,7 +88,7 @@ class ObTableOperationRequest final public: ObTableOperationRequest() : credential_(), table_name_(), table_id_(common::OB_INVALID_ID), tablet_id_(), entity_type_(), table_operation_(), - consistency_level_(), returning_rowkey_(false), returning_affected_entity_(false), + consistency_level_(), option_flag_(OB_TABLE_OPTION_DEFAULT), returning_affected_entity_(false), returning_affected_rows_(false), binlog_row_image_type_(ObBinlogRowImageType::FULL) {} @@ -101,9 +101,14 @@ public: K_(entity_type), K_(table_operation), K_(consistency_level), - K_(returning_rowkey), + K_(option_flag), K_(returning_affected_entity), K_(returning_affected_rows)); +public: + OB_INLINE bool use_put() const { return option_flag_ & OB_TABLE_OPTION_USE_PUT; } + OB_INLINE bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; } + OB_INLINE uint8_t get_option_flag() const { return option_flag_; } + OB_INLINE bool returning_affected_entity() const { return returning_affected_entity_; } public: /// the credential returned when login. ObString credential_; @@ -119,8 +124,8 @@ public: ObTableOperation table_operation_; /// read consistency level. currently only support STRONG. ObTableConsistencyLevel consistency_level_; - /// Whether return the rowkey, currently the value MUST be false (In the case of Append/Increment the value could be true). - bool returning_rowkey_; + /// option flag, specific option switch. + uint8_t option_flag_; /// Whether return the row which has been modified, currently the value MUST be false (In the case of Append/Increment, the value could be true) bool returning_affected_entity_; /// Whether return affected_rows @@ -144,7 +149,7 @@ public: entity_type_(), batch_operation_(), consistency_level_(), - returning_rowkey_(false), + option_flag_(OB_TABLE_OPTION_DEFAULT), returning_affected_entity_(false), returning_affected_rows_(false), batch_operation_as_atomic_(false), @@ -159,10 +164,14 @@ public: K_(entity_type), K_(batch_operation), K_(consistency_level), - K_(returning_rowkey), + K_(option_flag), K_(returning_affected_entity), K_(returning_affected_rows), K_(batch_operation_as_atomic)); +public: + OB_INLINE bool use_put() const { return option_flag_ & OB_TABLE_OPTION_USE_PUT; } + OB_INLINE bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; } + OB_INLINE bool returning_affected_entity() const { return returning_affected_entity_; } public: ObString credential_; ObString table_name_; @@ -173,8 +182,8 @@ public: ObTableBatchOperation batch_operation_; // Only support STRONG ObTableConsistencyLevel consistency_level_; - // Only support false (Support true for only Append/Increment) - bool returning_rowkey_; + // option flag, specific option switch. + uint8_t option_flag_; // Only support false (Support true for only Append/Increment) bool returning_affected_entity_; /// whether return affected_rows diff --git a/unittest/observer/table/test_create_executor.cpp b/unittest/observer/table/test_create_executor.cpp index af57a03a4..49867ed04 100644 --- a/unittest/observer/table/test_create_executor.cpp +++ b/unittest/observer/table/test_create_executor.cpp @@ -353,7 +353,7 @@ TEST_F(TestCreateExecutor, insertup) fake_ctx.set_entity(&entity); schema_service_.get_schema_guard(fake_ctx.schema_guard_, 1); fake_ctx_init_common(fake_ctx, &table_schema_); - ASSERT_EQ(OB_SUCCESS, fake_ctx.init_insert_up()); + ASSERT_EQ(OB_SUCCESS, fake_ctx.init_insert_up(false)); ASSERT_EQ(OB_SUCCESS, ObTableExprCgService::generate_exprs(fake_ctx, allocator_, fake_expr_info)); fake_ctx.set_expr_info(&fake_expr_info); ASSERT_EQ(4, fake_ctx.get_all_exprs().get_expr_array().count());