[OBKV] add put impl for obkv

This commit is contained in:
WeiXinChan
2023-09-12 12:10:22 +00:00
committed by ob-robot
parent f67fe833a9
commit 1225cc3815
6 changed files with 86 additions and 18 deletions

View File

@ -919,7 +919,7 @@ int ObTableExprCgService::refresh_exprs_frame(ObTableCtx &ctx,
/*
refresh rowkey frame
1. The number of entity's rowkey may not equal the number of schema's rowkey
when there is auto_increment column in primary keys.
when there is auto_increment column in primary keys or default current timestamp column.
2. auto_increment expr tree is autoinc_nextval_expr - column_conv_expr,
we need to fill value to column_conv_expr when user had set value.
3. "IS_DEFAULT_NOW_OBJ(item.default_value_)" means default current_timestamp in column,
@ -935,6 +935,7 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx,
const int64_t entity_rowkey_cnt = rowkey.count();
bool is_full_filled = (schema_rowkey_cnt == entity_rowkey_cnt); // did user fill all rowkey columns or not
ObEvalCtx eval_ctx(ctx.get_exec_ctx());
int64_t skip_pos = 0; // skip columns that do not need to be filled
if (exprs.count() < schema_rowkey_cnt) {
ret = OB_ERR_UNDEFINED;
@ -948,7 +949,7 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx,
// e.g., create table test(a varchar(1024), b int primary key);
for (int64_t i = 0; OB_SUCC(ret) && i < items.count(); i++) {
const ObTableColumnItem &item = items.at(i);
int64_t rowkey_position = item.rowkey_position_;
int64_t rowkey_position = item.rowkey_position_; // rowkey_position start from 1
if (rowkey_position <= 0) {
// normal column, do nothing
} else {
@ -961,7 +962,13 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx,
} else {
ObObj null_obj;
null_obj.set_null();
const ObObj *tmp_obj = is_full_filled ? &rowkey.at(rowkey_position-1) : &null_obj;
const ObObj *tmp_obj = nullptr;
if (!is_full_filled) {
tmp_obj = &null_obj;
skip_pos++;
} else {
tmp_obj = &rowkey.at(rowkey_position-1);
}
if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *tmp_obj))) {
LOG_WARN("fail to write auto increment datum", K(ret), K(is_full_filled), K(*expr), K(*tmp_obj));
}
@ -970,16 +977,19 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx,
ObDatum *tmp_datum = nullptr;
if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) {
LOG_WARN("fail to eval current timestamp expr", K(ret));
} else {
skip_pos++;
}
} else {
if (rowkey_position > entity_rowkey_cnt) {
int64_t pos = rowkey_position - 1 - skip_pos;
if (pos >= entity_rowkey_cnt) {
ret = OB_INDEX_OUT_OF_RANGE;
LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt), K(rowkey_position));
LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt), K(rowkey_position), K(skip_pos));
} else if (OB_ISNULL(expr)) {
ret = OB_ERR_UNDEFINED;
LOG_WARN("expr is null", K(ret));
} else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(rowkey_position-1)))) {
LOG_WARN("fail to write datum", K(ret), K(rowkey_position), K(rowkey.at(rowkey_position-1)), K(*expr));
} else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(pos)))) {
LOG_WARN("fail to write datum", K(ret), K(rowkey_position), K(rowkey.at(pos)), K(*expr), K(pos));
}
}
}

View File

@ -272,6 +272,42 @@ int ObTableCtx::get_expr_from_assignments(const ObString &col_name, ObRawExpr *&
return ret;
}
/*
check insert up operation can use put implement or not
1. can not have any index.
2. all column must be filled.
*/
int ObTableCtx::check_insert_up_can_use_put(bool &use_put)
{
int ret = OB_SUCCESS;
use_put = true;
if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put
use_put = false;
} else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid operation type", K(ret), K_(operation_type));
} else if (is_htable()) { // htable has no index and alway full filled.
use_put = true;
} else if (!related_index_ids_.empty()) { // has index, can not use put
use_put = false;
} else if (OB_ISNULL(table_schema_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table schema is null", K(ret));
} else {
if (OB_ISNULL(entity_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("entity is null", K(ret));
} else if (table_schema_->get_column_count() - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count()) { // all columns are filled
use_put = true;
} else { // some columns are missing
use_put = false;
}
}
return ret;
}
/*
1. ObConflictChecker need ObPhysicalPlanCtx.
2. now() expr need ObPhysicalPlanCtx.cur_time_.

View File

@ -351,6 +351,7 @@ public:
int get_column_item_by_expr(sql::ObColumnRefRawExpr *expr, const ObTableColumnItem *&item) const;
int get_expr_from_column_items(const common::ObString &col_name, sql::ObRawExpr *&expr) const;
int get_expr_from_assignments(const common::ObString &col_name, sql::ObRawExpr *&expr) const;
int check_insert_up_can_use_put(bool &use_put);
public:
// convert lob的allocator需要保证obj写入表达式后才能析构
static int convert_lob(common::ObIAllocator &allocator, ObObj &obj);

View File

@ -27,14 +27,22 @@ int ObTableApiInsertUpExecutor::generate_insert_up_rtdef(const ObTableInsUpdCtDe
ObTableInsUpdRtDef &rtdef)
{
int ret = OB_SUCCESS;
bool use_put = false;
if (OB_FAIL(generate_ins_rtdef(ctdef.ins_ctdef_, rtdef.ins_rtdef_))) {
LOG_WARN("fail to generate insert rtdef", K(ret));
} else if (OB_FAIL(generate_upd_rtdef(ctdef.upd_ctdef_,
rtdef.upd_rtdef_))) {
LOG_WARN("fail to generate update rtdef", K(ret));
} else if (OB_FAIL(tb_ctx_.check_insert_up_can_use_put(use_put))) {
LOG_WARN("fail to check insert up use put", K(ret));
} else {
rtdef.ins_rtdef_.das_rtdef_.table_loc_->is_writing_ = true;
rtdef.ins_rtdef_.das_rtdef_.use_put_ = use_put;
}
if (!use_put) {
set_need_fetch_conflict(upd_rtctx_, rtdef.ins_rtdef_);
}
return ret;
@ -288,8 +296,6 @@ int ObTableApiInsertUpExecutor::get_next_row()
int ret = OB_SUCCESS;
transaction::ObTxSEQ savepoint_no;
// must set conflict_row fetch flag
set_need_fetch_conflict(upd_rtctx_, insert_up_rtdef_.ins_rtdef_);
if (OB_FAIL(ObSqlTransControl::create_anonymous_savepoint(exec_ctx_, savepoint_no))) {
LOG_WARN("fail to create save_point", K(ret));
} else if (OB_FAIL(try_insert_row())) {

View File

@ -157,13 +157,15 @@ public:
: ObDASDMLBaseRtDef(DAS_OP_TABLE_INSERT),
need_fetch_conflict_(false),
is_duplicated_(false),
direct_insert_task_id_(0)
direct_insert_task_id_(0),
use_put_(false)
{ }
INHERIT_TO_STRING_KV("ObDASBaseRtDef", ObDASDMLBaseRtDef,
K_(need_fetch_conflict),
K_(is_duplicated),
K_(direct_insert_task_id));
K_(direct_insert_task_id),
K_(use_put));
// used to check whether need to fetch_duplicate_key, will set in table_replace_op
bool need_fetch_conflict_;
@ -172,6 +174,8 @@ public:
bool is_duplicated_;
// used in direct-insert mode
int64_t direct_insert_task_id_;
// use put, only use in obkv for overlay writting.
bool use_put_;
};
typedef DASDMLRtDefArray DASInsRtDefArray;

View File

@ -53,13 +53,24 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_INSERT, ObDASDMLIterator>::write_rows(cons
int ret = OB_SUCCESS;
ObAccessService *as = MTL(ObAccessService *);
dml_param_.direct_insert_task_id_ = rtdef.direct_insert_task_id_;
if (OB_FAIL(as->insert_rows(ls_id,
tablet_id,
*tx_desc_,
dml_param_,
ctdef.column_ids_,
&iter,
affected_rows))) {
if (rtdef.use_put_) {
ret = as->put_rows(ls_id,
tablet_id,
*tx_desc_,
dml_param_,
ctdef.column_ids_,
&iter,
affected_rows);
} else {
ret = as->insert_rows(ls_id,
tablet_id,
*tx_desc_,
dml_param_,
ctdef.column_ids_,
&iter,
affected_rows);
}
if (OB_FAIL(ret)) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("insert rows to access service failed", K(ret));
}