[CP] [Bugfix]support put from client

This commit is contained in:
WeiXinChan 2024-01-05 10:57:04 +00:00 committed by ob-robot
parent c19033ab86
commit 00632ae17c
11 changed files with 115 additions and 79 deletions

View File

@ -96,7 +96,7 @@ int ObTableRpcImpl::execute(const ObTableOperation &table_operation, const ObTab
request.consistency_level_ = request_options.consistency_level();
request.returning_affected_rows_ = request_options.returning_affected_rows();
request.returning_affected_entity_ = request_options.returning_affected_entity();
request.returning_rowkey_ = request_options.returning_rowkey();
request.option_flag_ = request_options.get_option_flag();
ret = rpc_proxy_->
timeout(request_options.server_timeout())
@ -286,7 +286,7 @@ int ObTableRpcImpl::batch_execute(const ObTableBatchOperation &batch_operation,
request.consistency_level_ = request_options.consistency_level();
request.returning_affected_rows_ = request_options.returning_affected_rows();
request.returning_affected_entity_ = request_options.returning_affected_entity();
request.returning_rowkey_ = request_options.returning_rowkey();
request.option_flag_ = request_options.get_option_flag();
request.batch_operation_as_atomic_ = request_options.batch_operation_as_atomic();
if (REACH_TIME_INTERVAL(10*1000*1000)) {
// TODO: we can not print the tenat memory usage now.

View File

@ -76,12 +76,12 @@ int ObTableBatchExecuteP::check_arg()
int ObTableBatchExecuteP::check_arg2() const
{
int ret = OB_SUCCESS;
if (arg_.returning_rowkey_
|| arg_.returning_affected_entity_) {
if (arg_.returning_rowkey()
|| arg_.returning_affected_entity()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey_,
"returning_affected_entity", arg_.returning_affected_entity_);
"returning_rowkey", arg_.returning_rowkey(),
"returning_affected_entity", arg_.returning_affected_entity());
}
return ret;
}
@ -110,7 +110,7 @@ uint64_t ObTableBatchExecuteP::get_request_checksum()
const uint64_t op_checksum = arg_.batch_operation_.get_checksum();
checksum = ob_crc64(checksum, &op_checksum, sizeof(op_checksum));
checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_));
checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_));
checksum = ob_crc64(checksum, &arg_.option_flag_, sizeof(arg_.option_flag_));
checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_));
checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_));
checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_));
@ -848,7 +848,7 @@ int ObTableBatchExecuteP::init_single_op_tb_ctx(table::ObTableCtx &ctx,
break;
}
case ObTableOperationType::INSERT_OR_UPDATE: {
if (OB_FAIL(ctx.init_insert_up())) {
if (OB_FAIL(ctx.init_insert_up(arg_.use_put()))) {
LOG_WARN("fail to init insert up ctx", K(ret), K(ctx));
}
break;
@ -860,15 +860,15 @@ int ObTableBatchExecuteP::init_single_op_tb_ctx(table::ObTableCtx &ctx,
break;
}
case ObTableOperationType::APPEND: {
if (OB_FAIL(ctx.init_append(arg_.returning_affected_entity_,
arg_.returning_rowkey_))) {
if (OB_FAIL(ctx.init_append(arg_.returning_affected_entity(),
arg_.returning_rowkey()))) {
LOG_WARN("fail to init append ctx", K(ret), K(ctx));
}
break;
}
case ObTableOperationType::INCREMENT: {
if (OB_FAIL(ctx.init_increment(arg_.returning_affected_entity_,
arg_.returning_rowkey_))) {
if (OB_FAIL(ctx.init_increment(arg_.returning_affected_entity(),
arg_.returning_rowkey()))) {
LOG_WARN("fail to init increment ctx", K(ret), K(ctx));
}
break;

View File

@ -272,42 +272,6 @@ int ObTableCtx::get_expr_from_assignments(const ObString &col_name, ObRawExpr *&
return ret;
}
/*
check insert up operation can use put implement or not
1. can not have any index.
2. all column must be filled.
*/
int ObTableCtx::check_insert_up_can_use_put(bool &use_put)
{
int ret = OB_SUCCESS;
use_put = true;
if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put
use_put = false;
} else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid operation type", K(ret), K_(operation_type));
} else if (is_htable()) { // htable has no index and alway full filled.
use_put = true;
} else if (!related_index_ids_.empty()) { // has index, can not use put
use_put = false;
} else if (OB_ISNULL(table_schema_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table schema is null", K(ret));
} else {
if (OB_ISNULL(entity_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("entity is null", K(ret));
} else if (table_schema_->get_column_count() - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count()) { // all columns are filled
use_put = true;
} else { // some columns are missing
use_put = false;
}
}
return ret;
}
/*
1. ObConflictChecker need ObPhysicalPlanCtx.
2. now() expr need ObPhysicalPlanCtx.cur_time_.
@ -1196,10 +1160,11 @@ int ObTableCtx::init_replace()
1. init update
2. reset for is_for_update_ flag, cause init_update() had set is_for_update_=true
*/
int ObTableCtx::init_insert_up()
int ObTableCtx::init_insert_up(bool is_client_set_put)
{
int ret = OB_SUCCESS;
is_for_insertup_ = true;
is_client_set_put_ = is_client_set_put;
if (OB_FAIL(init_update())) {
LOG_WARN("fail to init update", K(ret));
@ -1271,7 +1236,7 @@ int ObTableCtx::init_append(bool return_affected_entity, bool return_rowkey)
return_affected_entity_ = return_affected_entity;
return_rowkey_ = return_rowkey;
if (OB_FAIL(init_insert_up())) {
if (OB_FAIL(init_insert_up(false))) {
LOG_WARN("fail to init insert up", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < assigns_.count(); i++) {
@ -1334,7 +1299,7 @@ int ObTableCtx::init_increment(bool return_affected_entity, bool return_rowkey)
return_affected_entity_ = return_affected_entity;
return_rowkey_ = return_rowkey;
if (OB_FAIL(init_insert_up())) {
if (OB_FAIL(init_insert_up(false))) {
LOG_WARN("fail to init insert up", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < assigns_.count(); i++) {
@ -1730,5 +1695,50 @@ int ObTableCtx::get_related_tablet_id(const share::schema::ObTableSchema &index_
return ret;
}
/*
check insert up operation can use put implement or not
1. can not have any index.
2. all column must be filled.
*/
int ObTableCtx::check_insert_up_can_use_put(bool &use_put)
{
int ret = OB_SUCCESS;
use_put = true;
if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put
use_put = false;
} else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid operation type", K(ret), K_(operation_type));
} else if (is_htable()) { // htable has no index and alway full filled.
use_put = true;
} else if (is_client_set_put_ && !related_index_ids_.empty()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("client set use_put flag, but has local index is not support", K(ret), K_(related_index_ids));
} else if (!related_index_ids_.empty()) { // has index, can not use put
use_put = false;
} else if (OB_ISNULL(table_schema_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table schema is null", K(ret));
} else {
bool is_all_columns_filled = false;
if (OB_ISNULL(entity_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("entity is null", K(ret));
} else if (FALSE_IT(is_all_columns_filled = table_schema_->get_column_count()
- table_schema_->get_rowkey_column_num() <= entity_->get_properties_count())) { // all columns are filled
} else if (is_client_set_put_ && !is_all_columns_filled) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("client set use_put flag, but not fill all columns is not support", K(ret), KPC_(table_schema), KPC_(entity));
} else if (is_client_set_put_ || is_all_columns_filled) {
use_put = true;
} else { // some columns are missing
use_put = false;
}
}
return ret;
}
} // namespace table
} // namespace oceanbase

View File

@ -151,6 +151,7 @@ public:
cur_cluster_version_ = GET_MIN_CLUSTER_VERSION();
is_ttl_table_ = false;
is_skip_scan_ = false;
is_client_set_put_ = false;
}
virtual ~ObTableCtx()
{}
@ -184,7 +185,8 @@ public:
K_(entity_type),
K_(cur_cluster_version),
K_(is_ttl_table),
K_(is_skip_scan));
K_(is_skip_scan),
K_(is_client_set_put));
public:
//////////////////////////////////////// getter ////////////////////////////////////////////////
// for common
@ -303,6 +305,8 @@ public:
// for delete
OB_INLINE void set_skip_scan(bool skip_scan) { is_skip_scan_ = skip_scan; }
OB_INLINE bool is_skip_scan() { return is_skip_scan_; }
// for put
OB_INLINE void set_client_use_put(bool is_client_use_put) { is_client_set_put_ = is_client_use_put; }
public:
// 基于 table name 初始化common部分(不包括expr_info_, exec_ctx_)
int init_common(ObTableApiCredential &credential,
@ -327,7 +331,7 @@ public:
// 初始化replace相关
int init_replace();
// 初始化insert_up相关
int init_insert_up();
int init_insert_up(bool is_client_set_put);
// 初始化get相关
int init_get();
// 初始化increment相关
@ -477,6 +481,8 @@ private:
bool is_ttl_table_;
// for delete skip scan
bool is_skip_scan_;
// for put
bool is_client_set_put_;
private:
DISALLOW_COPY_AND_ASSIGN(ObTableCtx);
};

View File

@ -95,11 +95,11 @@ int ObTableApiExecuteP::check_arg2() const
if (ObTableOperationType::Type::APPEND != op_type &&
ObTableOperationType::Type::INCREMENT != op_type) {
if (arg_.returning_rowkey_ || arg_.returning_affected_entity_) {
if (arg_.returning_rowkey() || arg_.returning_affected_entity()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"returning_rowkey", arg_.returning_rowkey_,
"returning_affected_entity", arg_.returning_affected_entity_,
"returning_rowkey", arg_.returning_rowkey(),
"returning_affected_entity", arg_.returning_affected_entity(),
"operation_type", op_type);
}
}
@ -127,7 +127,7 @@ int ObTableApiExecuteP::init_tb_ctx()
switch(op_type) {
case ObTableOperationType::INSERT: {
if (tb_ctx_.is_ttl_table()) {
if (OB_FAIL(tb_ctx_.init_insert_up())) {
if (OB_FAIL(tb_ctx_.init_insert_up(arg_.use_put()))) {
LOG_WARN("fail to init insert up ctx", K(ret), K(tb_ctx_));
}
} else {
@ -156,21 +156,21 @@ int ObTableApiExecuteP::init_tb_ctx()
break;
}
case ObTableOperationType::INSERT_OR_UPDATE: {
if (OB_FAIL(tb_ctx_.init_insert_up())) {
if (OB_FAIL(tb_ctx_.init_insert_up(arg_.use_put()))) {
LOG_WARN("fail to init insert up ctx", K(ret), K(tb_ctx_));
}
break;
}
case ObTableOperationType::APPEND: {
if (OB_FAIL(tb_ctx_.init_append(arg_.returning_affected_entity_,
arg_.returning_rowkey_))) {
if (OB_FAIL(tb_ctx_.init_append(arg_.returning_affected_entity(),
arg_.returning_rowkey()))) {
LOG_WARN("fail to init append ctx", K(ret), K(tb_ctx_));
}
break;
}
case ObTableOperationType::INCREMENT: {
if (OB_FAIL(tb_ctx_.init_increment(arg_.returning_affected_entity_,
arg_.returning_rowkey_))) {
if (OB_FAIL(tb_ctx_.init_increment(arg_.returning_affected_entity(),
arg_.returning_rowkey()))) {
LOG_WARN("fail to init increment ctx", K(ret), K(tb_ctx_));
}
break;
@ -316,7 +316,7 @@ uint64_t ObTableApiExecuteP::get_request_checksum()
uint64_t checksum = 0;
checksum = ob_crc64(checksum, arg_.table_name_.ptr(), arg_.table_name_.length());
checksum = ob_crc64(checksum, &arg_.consistency_level_, sizeof(arg_.consistency_level_));
checksum = ob_crc64(checksum, &arg_.returning_rowkey_, sizeof(arg_.returning_rowkey_));
checksum = ob_crc64(checksum, &arg_.option_flag_, sizeof(arg_.option_flag_));
checksum = ob_crc64(checksum, &arg_.returning_affected_entity_, sizeof(arg_.returning_affected_entity_));
checksum = ob_crc64(checksum, &arg_.returning_affected_rows_, sizeof(arg_.returning_affected_rows_));
checksum = ob_crc64(checksum, &arg_.binlog_row_image_type_, sizeof(arg_.binlog_row_image_type_));

View File

@ -184,7 +184,7 @@ int ObTableQueryAndMutateP::init_tb_ctx(table::ObTableCtx &ctx,
break;
}
case ObTableOperationType::INSERT_OR_UPDATE: {
if (OB_FAIL(ctx.init_insert_up())) {
if (OB_FAIL(ctx.init_insert_up(false))) {
LOG_WARN("fail to init insert up ctx", K(ret), K(ctx));
}
break;

View File

@ -542,7 +542,7 @@ ObTableRequestOptions::ObTableRequestOptions()
max_execution_time_us_(10*1000*1000),
retry_policy_(NULL),
returning_affected_rows_(false),
returning_rowkey_(false),
option_flag_(OB_TABLE_OPTION_DEFAULT),
returning_affected_entity_(false),
batch_operation_as_atomic_(false),
binlog_row_image_type_(ObBinlogRowImageType::FULL)

View File

@ -38,6 +38,11 @@ class ObNewRow;
namespace table
{
#define OB_TABLE_OPTION_DEFAULT INT64_C(0)
#define OB_TABLE_OPTION_RETURNING_ROWKEY (INT64_C(1) << 0)
#define OB_TABLE_OPTION_USE_PUT (INT64_C(1) << 1)
using common::ObString;
using common::ObRowkey;
using common::ObObj;
@ -469,21 +474,27 @@ public:
ObIRetryPolicy* retry_policy() { return retry_policy_; }
void set_returning_affected_rows(bool returning) { returning_affected_rows_ = returning; }
bool returning_affected_rows() const { return returning_affected_rows_; }
void set_returning_rowkey(bool returning) { returning_rowkey_ = returning; }
bool returning_rowkey() const { return returning_rowkey_; }
void set_returning_rowkey(bool returning)
{
if (returning) {
option_flag_ |= OB_TABLE_OPTION_RETURNING_ROWKEY;
}
}
bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; }
void set_returning_affected_entity(bool returning) { returning_affected_entity_ = returning; }
bool returning_affected_entity() const { return returning_affected_entity_; }
void set_batch_operation_as_atomic(bool atomic) { batch_operation_as_atomic_ = atomic; }
bool batch_operation_as_atomic() const { return batch_operation_as_atomic_; }
void set_binlog_row_image_type(ObBinlogRowImageType type) { binlog_row_image_type_ = type; }
ObBinlogRowImageType binlog_row_image_type() const { return binlog_row_image_type_; }
uint8_t get_option_flag() const { return option_flag_; }
private:
ObTableConsistencyLevel consistency_level_;
int64_t server_timeout_us_;
int64_t max_execution_time_us_;
ObIRetryPolicy *retry_policy_;
bool returning_affected_rows_; // default: false
bool returning_rowkey_; // default: false
uint8_t option_flag_; // default: 0
bool returning_affected_entity_; // default: false
bool batch_operation_as_atomic_; // default: false
// int route_policy

View File

@ -49,7 +49,7 @@ OB_SERIALIZE_MEMBER(ObTableOperationRequest,
entity_type_,
table_operation_,
consistency_level_,
returning_rowkey_,
option_flag_,
returning_affected_entity_,
returning_affected_rows_,
binlog_row_image_type_);
@ -61,7 +61,7 @@ OB_SERIALIZE_MEMBER(ObTableBatchOperationRequest,
entity_type_,
batch_operation_,
consistency_level_,
returning_rowkey_,
option_flag_,
returning_affected_entity_,
returning_affected_rows_,
tablet_id_,

View File

@ -88,7 +88,7 @@ class ObTableOperationRequest final
public:
ObTableOperationRequest() : credential_(), table_name_(), table_id_(common::OB_INVALID_ID),
tablet_id_(), entity_type_(), table_operation_(),
consistency_level_(), returning_rowkey_(false), returning_affected_entity_(false),
consistency_level_(), option_flag_(OB_TABLE_OPTION_DEFAULT), returning_affected_entity_(false),
returning_affected_rows_(false),
binlog_row_image_type_(ObBinlogRowImageType::FULL)
{}
@ -101,9 +101,14 @@ public:
K_(entity_type),
K_(table_operation),
K_(consistency_level),
K_(returning_rowkey),
K_(option_flag),
K_(returning_affected_entity),
K_(returning_affected_rows));
public:
OB_INLINE bool use_put() const { return option_flag_ & OB_TABLE_OPTION_USE_PUT; }
OB_INLINE bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; }
OB_INLINE uint8_t get_option_flag() const { return option_flag_; }
OB_INLINE bool returning_affected_entity() const { return returning_affected_entity_; }
public:
/// the credential returned when login.
ObString credential_;
@ -119,8 +124,8 @@ public:
ObTableOperation table_operation_;
/// read consistency level. currently only support STRONG.
ObTableConsistencyLevel consistency_level_;
/// Whether return the rowkey, currently the value MUST be false (In the case of Append/Increment the value could be true).
bool returning_rowkey_;
/// option flag, specific option switch.
uint8_t option_flag_;
/// Whether return the row which has been modified, currently the value MUST be false (In the case of Append/Increment, the value could be true)
bool returning_affected_entity_;
/// Whether return affected_rows
@ -144,7 +149,7 @@ public:
entity_type_(),
batch_operation_(),
consistency_level_(),
returning_rowkey_(false),
option_flag_(OB_TABLE_OPTION_DEFAULT),
returning_affected_entity_(false),
returning_affected_rows_(false),
batch_operation_as_atomic_(false),
@ -159,10 +164,14 @@ public:
K_(entity_type),
K_(batch_operation),
K_(consistency_level),
K_(returning_rowkey),
K_(option_flag),
K_(returning_affected_entity),
K_(returning_affected_rows),
K_(batch_operation_as_atomic));
public:
OB_INLINE bool use_put() const { return option_flag_ & OB_TABLE_OPTION_USE_PUT; }
OB_INLINE bool returning_rowkey() const { return option_flag_ & OB_TABLE_OPTION_RETURNING_ROWKEY; }
OB_INLINE bool returning_affected_entity() const { return returning_affected_entity_; }
public:
ObString credential_;
ObString table_name_;
@ -173,8 +182,8 @@ public:
ObTableBatchOperation batch_operation_;
// Only support STRONG
ObTableConsistencyLevel consistency_level_;
// Only support false (Support true for only Append/Increment)
bool returning_rowkey_;
// option flag, specific option switch.
uint8_t option_flag_;
// Only support false (Support true for only Append/Increment)
bool returning_affected_entity_;
/// whether return affected_rows

View File

@ -353,7 +353,7 @@ TEST_F(TestCreateExecutor, insertup)
fake_ctx.set_entity(&entity);
schema_service_.get_schema_guard(fake_ctx.schema_guard_, 1);
fake_ctx_init_common(fake_ctx, &table_schema_);
ASSERT_EQ(OB_SUCCESS, fake_ctx.init_insert_up());
ASSERT_EQ(OB_SUCCESS, fake_ctx.init_insert_up(false));
ASSERT_EQ(OB_SUCCESS, ObTableExprCgService::generate_exprs(fake_ctx, allocator_, fake_expr_info));
fake_ctx.set_expr_info(&fake_expr_info);
ASSERT_EQ(4, fake_ctx.get_all_exprs().get_expr_array().count());