diff --git a/src/observer/table/ob_table_cg_service.cpp b/src/observer/table/ob_table_cg_service.cpp index 5379c5cc49..65c5840cf8 100644 --- a/src/observer/table/ob_table_cg_service.cpp +++ b/src/observer/table/ob_table_cg_service.cpp @@ -919,7 +919,7 @@ int ObTableExprCgService::refresh_exprs_frame(ObTableCtx &ctx, /* refresh rowkey frame 1. The number of entity's rowkey may not equal the number of schema's rowkey - when there is auto_increment column in primary keys. + when there is auto_increment column in primary keys or default current timestamp column. 2. auto_increment expr tree is autoinc_nextval_expr - column_conv_expr, we need to fill value to column_conv_expr when user had set value. 3. "IS_DEFAULT_NOW_OBJ(item.default_value_)" means default current_timestamp in column, @@ -935,6 +935,7 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx, const int64_t entity_rowkey_cnt = rowkey.count(); bool is_full_filled = (schema_rowkey_cnt == entity_rowkey_cnt); // did user fill all rowkey columns or not ObEvalCtx eval_ctx(ctx.get_exec_ctx()); + int64_t skip_pos = 0; // skip columns that do not need to be filled if (exprs.count() < schema_rowkey_cnt) { ret = OB_ERR_UNDEFINED; @@ -948,7 +949,7 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx, // e.g., create table test(a varchar(1024), b int primary key); for (int64_t i = 0; OB_SUCC(ret) && i < items.count(); i++) { const ObTableColumnItem &item = items.at(i); - int64_t rowkey_position = item.rowkey_position_; + int64_t rowkey_position = item.rowkey_position_; // rowkey_position start from 1 if (rowkey_position <= 0) { // normal column, do nothing } else { @@ -961,7 +962,13 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx, } else { ObObj null_obj; null_obj.set_null(); - const ObObj *tmp_obj = is_full_filled ? &rowkey.at(rowkey_position-1) : &null_obj; + const ObObj *tmp_obj = nullptr; + if (!is_full_filled) { + tmp_obj = &null_obj; + skip_pos++; + } else { + tmp_obj = &rowkey.at(rowkey_position-1); + } if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *tmp_obj))) { LOG_WARN("fail to write auto increment datum", K(ret), K(is_full_filled), K(*expr), K(*tmp_obj)); } @@ -970,16 +977,19 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx, ObDatum *tmp_datum = nullptr; if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { LOG_WARN("fail to eval current timestamp expr", K(ret)); + } else { + skip_pos++; } } else { - if (rowkey_position > entity_rowkey_cnt) { + int64_t pos = rowkey_position - 1 - skip_pos; + if (pos >= entity_rowkey_cnt) { ret = OB_INDEX_OUT_OF_RANGE; - LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt), K(rowkey_position)); + LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt), K(rowkey_position), K(skip_pos)); } else if (OB_ISNULL(expr)) { ret = OB_ERR_UNDEFINED; LOG_WARN("expr is null", K(ret)); - } else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(rowkey_position-1)))) { - LOG_WARN("fail to write datum", K(ret), K(rowkey_position), K(rowkey.at(rowkey_position-1)), K(*expr)); + } else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(pos)))) { + LOG_WARN("fail to write datum", K(ret), K(rowkey_position), K(rowkey.at(pos)), K(*expr), K(pos)); } } } diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index d23b5991c4..b42757e0f1 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -272,6 +272,42 @@ int ObTableCtx::get_expr_from_assignments(const ObString &col_name, ObRawExpr *& return ret; } +/* + check insert up operation can use put implement or not + 1. can not have any index. + 2. all column must be filled. +*/ +int ObTableCtx::check_insert_up_can_use_put(bool &use_put) +{ + int ret = OB_SUCCESS; + use_put = true; + + if (is_inc_or_append()) { // increment or append operarion need old value to calculate, can not use put + use_put = false; + } else if (ObTableOperationType::INSERT_OR_UPDATE != operation_type_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid operation type", K(ret), K_(operation_type)); + } else if (is_htable()) { // htable has no index and alway full filled. + use_put = true; + } else if (!related_index_ids_.empty()) { // has index, can not use put + use_put = false; + } else if (OB_ISNULL(table_schema_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table schema is null", K(ret)); + } else { + if (OB_ISNULL(entity_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("entity is null", K(ret)); + } else if (table_schema_->get_column_count() - table_schema_->get_rowkey_column_num() <= entity_->get_properties_count()) { // all columns are filled + use_put = true; + } else { // some columns are missing + use_put = false; + } + } + + return ret; +} + /* 1. ObConflictChecker need ObPhysicalPlanCtx. 2. now() expr need ObPhysicalPlanCtx.cur_time_. diff --git a/src/observer/table/ob_table_context.h b/src/observer/table/ob_table_context.h index 2992f62d8e..9f30ed30a2 100644 --- a/src/observer/table/ob_table_context.h +++ b/src/observer/table/ob_table_context.h @@ -351,6 +351,7 @@ public: int get_column_item_by_expr(sql::ObColumnRefRawExpr *expr, const ObTableColumnItem *&item) const; int get_expr_from_column_items(const common::ObString &col_name, sql::ObRawExpr *&expr) const; int get_expr_from_assignments(const common::ObString &col_name, sql::ObRawExpr *&expr) const; + int check_insert_up_can_use_put(bool &use_put); public: // convert lob的allocator需要保证obj写入表达式后才能析构 static int convert_lob(common::ObIAllocator &allocator, ObObj &obj); diff --git a/src/observer/table/ob_table_insert_up_executor.cpp b/src/observer/table/ob_table_insert_up_executor.cpp index 2164d62282..2c135d6c43 100644 --- a/src/observer/table/ob_table_insert_up_executor.cpp +++ b/src/observer/table/ob_table_insert_up_executor.cpp @@ -27,14 +27,22 @@ int ObTableApiInsertUpExecutor::generate_insert_up_rtdef(const ObTableInsUpdCtDe ObTableInsUpdRtDef &rtdef) { int ret = OB_SUCCESS; + bool use_put = false; if (OB_FAIL(generate_ins_rtdef(ctdef.ins_ctdef_, rtdef.ins_rtdef_))) { LOG_WARN("fail to generate insert rtdef", K(ret)); } else if (OB_FAIL(generate_upd_rtdef(ctdef.upd_ctdef_, rtdef.upd_rtdef_))) { LOG_WARN("fail to generate update rtdef", K(ret)); + } else if (OB_FAIL(tb_ctx_.check_insert_up_can_use_put(use_put))) { + LOG_WARN("fail to check insert up use put", K(ret)); } else { rtdef.ins_rtdef_.das_rtdef_.table_loc_->is_writing_ = true; + rtdef.ins_rtdef_.das_rtdef_.use_put_ = use_put; + } + + if (!use_put) { + set_need_fetch_conflict(upd_rtctx_, rtdef.ins_rtdef_); } return ret; @@ -288,8 +296,6 @@ int ObTableApiInsertUpExecutor::get_next_row() int ret = OB_SUCCESS; transaction::ObTxSEQ savepoint_no; - // must set conflict_row fetch flag - set_need_fetch_conflict(upd_rtctx_, insert_up_rtdef_.ins_rtdef_); if (OB_FAIL(ObSqlTransControl::create_anonymous_savepoint(exec_ctx_, savepoint_no))) { LOG_WARN("fail to create save_point", K(ret)); } else if (OB_FAIL(try_insert_row())) { diff --git a/src/sql/das/ob_das_dml_ctx_define.h b/src/sql/das/ob_das_dml_ctx_define.h index de7f253963..6b4b7e3986 100644 --- a/src/sql/das/ob_das_dml_ctx_define.h +++ b/src/sql/das/ob_das_dml_ctx_define.h @@ -157,13 +157,15 @@ public: : ObDASDMLBaseRtDef(DAS_OP_TABLE_INSERT), need_fetch_conflict_(false), is_duplicated_(false), - direct_insert_task_id_(0) + direct_insert_task_id_(0), + use_put_(false) { } INHERIT_TO_STRING_KV("ObDASBaseRtDef", ObDASDMLBaseRtDef, K_(need_fetch_conflict), K_(is_duplicated), - K_(direct_insert_task_id)); + K_(direct_insert_task_id), + K_(use_put)); // used to check whether need to fetch_duplicate_key, will set in table_replace_op bool need_fetch_conflict_; @@ -172,6 +174,8 @@ public: bool is_duplicated_; // used in direct-insert mode int64_t direct_insert_task_id_; + // use put, only use in obkv for overlay writting. + bool use_put_; }; typedef DASDMLRtDefArray DASInsRtDefArray; diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index e781150c46..1125e63ff3 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -53,13 +53,24 @@ int ObDASIndexDMLAdaptor::write_rows(cons int ret = OB_SUCCESS; ObAccessService *as = MTL(ObAccessService *); dml_param_.direct_insert_task_id_ = rtdef.direct_insert_task_id_; - if (OB_FAIL(as->insert_rows(ls_id, - tablet_id, - *tx_desc_, - dml_param_, - ctdef.column_ids_, - &iter, - affected_rows))) { + if (rtdef.use_put_) { + ret = as->put_rows(ls_id, + tablet_id, + *tx_desc_, + dml_param_, + ctdef.column_ids_, + &iter, + affected_rows); + } else { + ret = as->insert_rows(ls_id, + tablet_id, + *tx_desc_, + dml_param_, + ctdef.column_ids_, + &iter, + affected_rows); + } + if (OB_FAIL(ret)) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("insert rows to access service failed", K(ret)); }