diff --git a/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.cpp b/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.cpp index 979c630554..436aefbdc6 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.cpp +++ b/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.cpp @@ -36,11 +36,9 @@ int ObPDMLOpRowIterator::get_next_row(const ObExprPtrIArray &row) if (OB_ISNULL(eval_ctx_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("not init the eval_ctx", K(ret)); - } else if (OB_SUCC(row_store_it_.get_next_row(row, *eval_ctx_))) { - // we should do uniq row checking after data being stored in row_store - // rather than before this. because we need to return all rows unfiltered to upper ops. - if (uniq_row_checker_) { - ret = uniq_row_checker_->check_rowkey_distinct(row, is_distinct); + } else if (OB_FAIL(row_store_it_.get_next_row(row, *eval_ctx_))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next row from row store iter failed", K(ret)); } } } while (OB_SUCC(ret) && !is_distinct); diff --git a/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.h b/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.h index d14a18414e..44d538b9f0 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.h +++ b/src/sql/engine/pdml/static/ob_pdml_op_batch_row_cache.h @@ -29,12 +29,6 @@ class ObNewRow; namespace sql { struct ObDASTabletLoc; -class ObDMLOpUniqueRowChecker -{ -public: - virtual int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) = 0; -}; - class ObExecContext; // 单个分区的新引擎数据缓存器 @@ -42,22 +36,18 @@ class ObPDMLOpRowIterator { public: friend class ObPDMLOpBatchRowCache; - ObPDMLOpRowIterator() : eval_ctx_(nullptr), uniq_row_checker_(nullptr) {} + ObPDMLOpRowIterator() : eval_ctx_(nullptr) {} virtual ~ObPDMLOpRowIterator() = default; // 获得row_store_it_中的下一行数据 // 返回的数据是对应存储数据的exprs int get_next_row(const ObExprPtrIArray &row); - void set_uniq_row_checker(ObDMLOpUniqueRowChecker *uniq_row_checker) - { uniq_row_checker_ = uniq_row_checker; } - void close() - { row_store_it_.reset(); } + void close() { row_store_it_.reset(); } private: int init_data_source(ObChunkDatumStore &row_datum_store, ObEvalCtx *eval_ctx); private: ObChunkDatumStore::Iterator row_store_it_; ObEvalCtx *eval_ctx_; - ObDMLOpUniqueRowChecker *uniq_row_checker_; DISALLOW_COPY_AND_ASSIGN(ObPDMLOpRowIterator); }; @@ -132,7 +122,8 @@ public: // 一般来说,分区id存储在行中的一个伪列里 virtual int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - common::ObTabletID &tablet_id) = 0; + common::ObTabletID &tablet_id, + bool &is_skipped) = 0; }; class ObDMLOpDataWriter diff --git a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp index fbba1eb6c9..a513185d37 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp +++ b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.cpp @@ -35,7 +35,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec, ObDMLBaseRtDef &dml_rtdef, ObDMLOpDataReader *reader, ObDMLOpDataWriter *writer, - ObDMLOpUniqueRowChecker *uniq_checker, const bool is_heap_table_insert, const bool with_barrier/*false*/) { @@ -53,7 +52,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec, } else { reader_ = reader; writer_ = writer; - uniq_checker_ = uniq_checker; dml_rtdef_ = &dml_rtdef; is_heap_table_insert_ = is_heap_table_insert; with_barrier_ = with_barrier; @@ -179,13 +177,16 @@ int ObPDMLOpDataDriver::fill_cache_unitl_cache_full_or_child_iter_end(ObExecCont do { const ObExprPtrIArray *row = nullptr; ObTabletID tablet_id; - if (OB_FAIL(reader_->read_row(ctx, row, tablet_id))) { + bool is_skipped = false; + if (OB_FAIL(reader_->read_row(ctx, row, tablet_id, is_skipped))) { if (OB_ITER_END == ret) { // 当前reader的数据已经读取结束 // do nothing } else { LOG_WARN("failed to read row from reader", K(ret)); } + } else if (is_skipped) { + //need to skip this row } else if (is_heap_table_insert_ && OB_FAIL(set_heap_table_hidden_pk(row, tablet_id))) { LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id)); } else if (OB_FAIL(cache_.add_row(*row, tablet_id))) { @@ -247,8 +248,6 @@ int ObPDMLOpDataDriver::write_partitions(ObExecContext &ctx) LOG_WARN("fail get row iterator", K(tablet_id), K(ret)); } else if (OB_FAIL(DAS_CTX(ctx).extended_tablet_loc(*table_loc, tablet_id, tablet_loc))) { LOG_WARN("extended tablet location failed", K(ret)); - } else if (FALSE_IT(row_iter->set_uniq_row_checker(uniq_checker_))) { - // nop } else if (OB_FAIL(writer_->write_rows(ctx, tablet_loc, *row_iter))) { LOG_WARN("fail write rows", K(tablet_id), K(ret)); } @@ -392,7 +391,6 @@ int ObPDMLOpDataDriver::switch_row_iter_to_next_partition() LOG_WARN("failed to get next partition iterator", K(ret), "part_id", returning_ctx_.tablet_id_array_.at(next_idx), K(next_idx)); } else { - returning_ctx_.row_iter_->set_uniq_row_checker(nullptr); returning_ctx_.next_idx_++; } return ret; diff --git a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h index dfd57f742d..1e404b66f1 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h +++ b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h @@ -40,7 +40,6 @@ public: cache_(eval_ctx, op_monitor_info), reader_(nullptr), writer_(nullptr), - uniq_checker_(nullptr), dml_rtdef_(nullptr), state_(FILL_CACHE), eval_ctx_(eval_ctx), @@ -61,7 +60,6 @@ public: ObDMLBaseRtDef &dml_rtdef, ObDMLOpDataReader *reader, ObDMLOpDataWriter *writer, - ObDMLOpUniqueRowChecker *uniq_checker, const bool is_heap_table_insert, const bool with_barrier = false); @@ -143,7 +141,6 @@ private: ObPDMLOpBatchRowCache cache_; // 用于缓存数据,需要在init函数中初始化,并且分配alloctor ObDMLOpDataReader *reader_; ObDMLOpDataWriter *writer_; - ObDMLOpUniqueRowChecker *uniq_checker_; ObDMLBaseRtDef *dml_rtdef_; DriverState state_; // Driver 当前状态:读写数据状态、向上返回数据状态 diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp index e832c48377..9db86bc3fe 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp @@ -65,7 +65,7 @@ int ObPxMultiPartDeleteOp::inner_open() LOG_WARN("failed to inner open", K(ret)); } else if (OB_FAIL(ObDMLService::init_del_rtdef(dml_rtctx_, del_rtdef_, MY_SPEC.del_ctdef_))) { LOG_WARN("init delete rtdef failed", K(ret)); - } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, this, false, MY_SPEC.with_barrier_))) { + } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, false, MY_SPEC.with_barrier_))) { LOG_WARN("failed to init data driver", K(ret)); } else if (MY_SPEC.with_barrier_) { if (OB_ISNULL(input_)) { @@ -81,24 +81,6 @@ int ObPxMultiPartDeleteOp::inner_open() return ret; } -int ObPxMultiPartDeleteOp::check_rowkey_distinct(const ObExprPtrIArray &row, - bool &is_distinct) -{ - int ret = OB_SUCCESS; - if (DistinctType::T_DISTINCT_NONE != MY_SPEC.del_ctdef_.distinct_algo_) { - ret = ObDMLService::check_rowkey_whether_distinct(row, - MY_SPEC.del_ctdef_.distinct_algo_, - eval_ctx_, - ctx_, - del_rtdef_.table_rowkey_, - del_rtdef_.se_rowkey_dist_ctx_, - is_distinct); - } else { - is_distinct = true; - } - return ret; -} - int ObPxMultiPartDeleteOp::inner_get_next_row() { int ret = OB_SUCCESS; @@ -149,7 +131,8 @@ int ObPxMultiPartDeleteOp::inner_close() //////////// pdml data interface implementation: reader & writer //////////// int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - ObTabletID &tablet_id) + ObTabletID &tablet_id, + bool &is_skipped) { int ret = OB_SUCCESS; UNUSED(ctx); @@ -164,24 +147,28 @@ int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, } else { // 每一次从child节点获得新的数据都需要进行清除计算标记 clear_evaluated_flag(); - // 通过partition id expr获得对应行对应的分区 - const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); - // 返回的值是child的output exprs - row = &child_->get_spec().output_; - if (NO_PARTITION_ID_FLAG == part_id_idx) { - // 如果row中没有partition id expr对应的cell,默认partition id为0 - ObDASTableLoc *table_loc = del_rtdef_.das_rtdef_.table_loc_; - if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); - } else { - tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; + if (OB_FAIL(ObDMLService::process_delete_row(MY_SPEC.del_ctdef_, del_rtdef_, is_skipped, *this))) { + LOG_WARN("process delete row failed", K(ret)); + } else if (!is_skipped) { + // 通过partition id expr获得对应行对应的分区 + const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); + // 返回的值是child的output exprs + row = &child_->get_spec().output_; + if (NO_PARTITION_ID_FLAG == part_id_idx) { + // 如果row中没有partition id expr对应的cell,默认partition id为0 + ObDASTableLoc *table_loc = del_rtdef_.das_rtdef_.table_loc_; + if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); + } else { + tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; + } + } else if (child_->get_spec().output_.count() > part_id_idx) { + ObExpr *expr = child_->get_spec().output_.at(part_id_idx); + ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); + tablet_id = expr_datum.get_int(); + LOG_DEBUG("get the part id", K(ret), K(expr_datum)); } - } else if (child_->get_spec().output_.count() > part_id_idx) { - ObExpr *expr = child_->get_spec().output_.at(part_id_idx); - ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); - tablet_id = expr_datum.get_int(); - LOG_DEBUG("get the part id", K(ret), K(expr_datum)); } } return ret; diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.h b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.h index dae641309f..501a67c916 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.h +++ b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.h @@ -76,7 +76,6 @@ public: class ObPxMultiPartDeleteOp : public ObDMLOpDataReader, public ObDMLOpDataWriter, - public ObDMLOpUniqueRowChecker, public ObTableModifyOp { OB_UNIS_VERSION(1); @@ -96,7 +95,8 @@ public: // 同时还负责计算出这一行对应的 partition_id int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - common::ObTabletID &tablet_id) override; + common::ObTabletID &tablet_id, + bool &is_skipped) override; // impl. ObDMLDataWriter // 将缓存的数据批量写入到存储层 int write_rows(ObExecContext &ctx, @@ -107,9 +107,6 @@ public: virtual int inner_get_next_row(); virtual int inner_open(); virtual int inner_close(); - -private: - int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) override; private: ObPDMLOpDataDriver data_driver_; ObDelRtDef del_rtdef_; diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp index ef097295c4..6200051162 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp @@ -46,7 +46,7 @@ int ObPxMultiPartInsertOp::inner_open() ret = OB_ERR_UNEXPECTED; LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), ins_rtdef_, this, this, - nullptr, MY_SPEC.ins_ctdef_.is_heap_table_))) { + MY_SPEC.ins_ctdef_.is_heap_table_))) { LOG_WARN("failed to init data driver", K(ret)); } if (OB_SUCC(ret)) { @@ -126,28 +126,11 @@ int ObPxMultiPartInsertOp::inner_close() return ret; } -int ObPxMultiPartInsertOp::process_row() -{ - int ret = OB_SUCCESS; - bool is_filtered = false; - OZ (ObDMLService::check_row_null(MY_SPEC.ins_ctdef_.new_row_, - eval_ctx_, - ins_rtdef_.cur_row_num_, - MY_SPEC.ins_ctdef_.column_infos_, - MY_SPEC.ins_ctdef_.das_ctdef_, - MY_SPEC.ins_ctdef_.is_single_value_, - *this)); - OZ(ObDMLService::filter_row_for_view_check(MY_SPEC.ins_ctdef_.view_check_exprs_, eval_ctx_, is_filtered)); - OV(!is_filtered, OB_ERR_CHECK_OPTION_VIOLATED); - OZ(ObDMLService::filter_row_for_check_cst(MY_SPEC.ins_ctdef_.check_cst_exprs_, eval_ctx_, is_filtered)); - OV(!is_filtered, OB_ERR_CHECK_CONSTRAINT_VIOLATED); - return ret; -} - //////////// pdml data interface implementation: reader & writer //////////// int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - common::ObTabletID &tablet_id) + common::ObTabletID &tablet_id, + bool &is_skipped) { int ret = OB_SUCCESS; UNUSED(ctx); @@ -162,32 +145,31 @@ int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, } else { // 每一次从child节点获得新的数据都需要进行清除计算标记 clear_evaluated_flag(); - // 通过partition id expr获得对应行对应的分区 - const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); - // 返回的值是child的output exprs - row = &child_->get_spec().output_; - if (NO_PARTITION_ID_FLAG == part_id_idx) { - ObDASTableLoc *table_loc = ins_rtdef_.das_rtdef_.table_loc_; - if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); - } else { - tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; + if (OB_FAIL(ObDMLService::process_insert_row(MY_SPEC.ins_ctdef_, ins_rtdef_, *this, is_skipped))) { + LOG_WARN("process insert row failed", K(ret)); + } else if (!is_skipped) { + // 通过partition id expr获得对应行对应的分区 + const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); + // 返回的值是child的output exprs + row = &child_->get_spec().output_; + if (NO_PARTITION_ID_FLAG == part_id_idx) { + ObDASTableLoc *table_loc = ins_rtdef_.das_rtdef_.table_loc_; + if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); + } else { + tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; + } + } else if (child_->get_spec().output_.count() > part_id_idx) { + ObExpr *expr = child_->get_spec().output_.at(part_id_idx); + ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); + tablet_id = expr_datum.get_int(); + LOG_DEBUG("get the part id", K(ret), K(expr_datum)); } - } else if (child_->get_spec().output_.count() > part_id_idx) { - ObExpr *expr = child_->get_spec().output_.at(part_id_idx); - ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); - tablet_id = expr_datum.get_int(); - LOG_DEBUG("get the part id", K(ret), K(expr_datum)); - } - } - if (!MY_SPEC.is_pdml_index_maintain_ && OB_SUCC(ret)) { - if (OB_FAIL(process_row())) { - LOG_WARN("fail process row", K(ret)); } } if (OB_SUCC(ret)) { - LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id)); + LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id), K(is_skipped)); } return ret; } diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h index eac99d0ba8..6c7f4ffbd8 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.h @@ -93,7 +93,10 @@ public: public: virtual bool has_foreign_key() const { return false; } // 默认实现,先不考虑外键的问题 - int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, common::ObTabletID &tablet_id) override; + int read_row(ObExecContext &ctx, + const ObExprPtrIArray *&row, + common::ObTabletID &tablet_id, + bool &is_skipped) override; int write_rows(ObExecContext &ctx, const ObDASTabletLoc *tablet_loc, ObPDMLOpRowIterator &iterator) override; @@ -101,8 +104,6 @@ public: virtual int inner_get_next_row(); virtual int inner_open(); virtual int inner_close(); -private: - int process_row(); protected: ObPDMLOpDataDriver data_driver_; ObInsRtDef ins_rtdef_; diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp index 47d180905e..eee919ab46 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp @@ -36,8 +36,7 @@ int ObPxMultiPartUpdateOp::inner_open() } else if (!(MY_SPEC.row_desc_.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); - } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this, - nullptr, false))) { + } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this, false))) { LOG_WARN("failed to init data driver", K(ret)); } else if (OB_FAIL(ObDMLService::init_upd_rtdef(dml_rtctx_, upd_rtdef_, @@ -100,17 +99,9 @@ int ObPxMultiPartUpdateOp::inner_close() int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) { int ret = OB_SUCCESS; - bool is_skipped = false; ObChunkDatumStore::StoredRow* stored_row = nullptr; - ++upd_rtdef_.cur_row_num_; - if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) { - LOG_WARN("process update row failed", K(ret)); - } else if (is_skipped) { - //do nothing - } else if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) { + if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) { LOG_WARN("insert row with das failed", K(ret)); - } else { - ++upd_rtdef_.found_rows_; } return ret; } @@ -118,12 +109,12 @@ int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) //////////// pdml data interface implementation: reader & writer //////////// int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - common::ObTabletID &tablet_id) + common::ObTabletID &tablet_id, + bool &is_skipped) { // 从child中读取数据,数据存储在child的output exprs中 int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = NULL; - bool is_update_timestamp = false; if (OB_ISNULL(plan_ctx = ctx.get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get physical plan context failed", K(ret)); @@ -137,78 +128,34 @@ int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, } else { // 每一次从child节点获得新的数据都需要进行清除计算标记 clear_evaluated_flag(); - // 通过partition id expr获得对应行对应的分区 - const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); - // 返回的值是child的output exprs - row = &child_->get_spec().output_; - if (NO_PARTITION_ID_FLAG == part_id_idx) { - ObDASTableLoc *table_loc = upd_rtdef_.dupd_rtdef_.table_loc_; - if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); - } else { - tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; - } - } else if (child_->get_spec().output_.count() > part_id_idx) { - ObExpr *expr = child_->get_spec().output_.at(part_id_idx); - ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); - tablet_id = expr_datum.get_int(); - LOG_DEBUG("get the part id", K(ret), K(expr_datum)); - } - - // 做auto_failed_timestamp的检查 - // 如果assign_column包含了ON UPDATE CURRENT_TIMESTAMP类型的时间戳列 - // 第一步先检查主表是否发生update,如果主表当前行没有发生update,那么吧old_row中的当前列拷贝到new_row中 - // pdml的索引表在检查当前行是否发生update时,不会跳过ON UPDATE CURRENT_TIMESTAMP列,会比较value - if (MY_SPEC.upd_ctdef_.is_primary_index_ - && lib::is_mysql_mode() - && is_update_auto_filled_timestamp()) { - // update the timestamp column, then check whether row is updated - if (OB_FAIL(ObDMLService::check_row_whether_changed(MY_SPEC.upd_ctdef_, upd_rtdef_, get_eval_ctx()))) { - LOG_WARN("fail to check_row_whether_changed", K(ret)); - } else if (!upd_rtdef_.is_row_changed_) { - // copy old_row timestamp column as new_row timestamp column - const ObExprPtrIArray &old_row = MY_SPEC.upd_ctdef_.old_row_; - const ObExprPtrIArray &new_row = MY_SPEC.upd_ctdef_.new_row_; - FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, OB_SUCC(ret)) { - const uint64_t idx = info->projector_index_; - if (info->auto_filled_timestamp_) { - ObDatum *old_datum = NULL; - ObDatum *new_datum = NULL; - if (OB_FAIL(old_row.at(idx)->eval(get_eval_ctx(), old_datum)) - || OB_FAIL(new_row.at(idx)->eval(get_eval_ctx(), new_datum))) { - LOG_WARN("evaluate value failed", K(ret)); - } else if (OB_ISNULL(old_datum) || OB_ISNULL(new_datum)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("datum is null, unexpected", K(ret), KPC(old_datum), KPC(new_datum)); - } else { - new_row.at(idx)->locate_datum_for_write(get_eval_ctx()) = *old_datum; - new_row.at(idx)->set_evaluated_projected(get_eval_ctx()); - } - LOG_TRACE("after copy timestamp column to new_row", K(ret), K(idx), - "old_row", ROWEXPR2STR(get_eval_ctx(), old_row), - "new_row", ROWEXPR2STR(get_eval_ctx(), new_row)); - } + ++upd_rtdef_.cur_row_num_; + if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) { + LOG_WARN("process update row failed", K(ret)); + } else if (!is_skipped) { + // 通过partition id expr获得对应行对应的分区 + ++upd_rtdef_.found_rows_; + const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); + // 返回的值是child的output exprs + row = &child_->get_spec().output_; + if (NO_PARTITION_ID_FLAG == part_id_idx) { + ObDASTableLoc *table_loc = upd_rtdef_.dupd_rtdef_.table_loc_; + if (OB_ISNULL(table_loc) || table_loc->get_tablet_locs().size() != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("insert table location is invalid", K(ret), KPC(table_loc)); + } else { + tablet_id = table_loc->get_first_tablet_loc()->tablet_id_; } + } else if (child_->get_spec().output_.count() > part_id_idx) { + ObExpr *expr = child_->get_spec().output_.at(part_id_idx); + ObDatum &expr_datum = expr->locate_expr_datum(get_eval_ctx()); + tablet_id = expr_datum.get_int(); + LOG_DEBUG("get the part id", K(ret), K(expr_datum)); } } - } return ret; } -bool ObPxMultiPartUpdateOp::is_update_auto_filled_timestamp() -{ - bool updated = false; - FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, !updated) { - const uint64_t idx = info->projector_index_; - if (info->auto_filled_timestamp_) { - updated = true; - } - } - return updated; -} - int ObPxMultiPartUpdateOp::write_rows(ObExecContext &ctx, const ObDASTabletLoc *tablet_loc, diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.h b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.h index 2864fc32e9..d6cd4cbed0 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.h +++ b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.h @@ -79,14 +79,13 @@ public: public: int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, - common::ObTabletID &tablet_id) override; + common::ObTabletID &tablet_id, + bool &is_skipped) override; int write_rows(ObExecContext &ctx, const ObDASTabletLoc *tablet_loc, ObPDMLOpRowIterator &iterator) override; - bool is_update_auto_filled_timestamp(); - virtual int inner_get_next_row(); virtual int inner_open(); virtual int inner_close(); diff --git a/src/sql/optimizer/ob_del_upd_log_plan.cpp b/src/sql/optimizer/ob_del_upd_log_plan.cpp index 68167752b0..8a6c3b1a7e 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.cpp +++ b/src/sql/optimizer/ob_del_upd_log_plan.cpp @@ -264,8 +264,8 @@ int ObDelUpdLogPlan::check_table_rowkey_distinct( if (OB_ISNULL(index_dml_info)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("index dml info is null", K(ret)); - } else if (!use_pdml() && !index_dml_info->is_primary_index_) { - // for PDML, primary table & index table both need unique checker. + } else if (!index_dml_info->is_primary_index_) { + // only primary table need unique checker. } else if (OB_FAIL(index_dml_info->get_rowkey_exprs(rowkey_exprs))) { LOG_WARN("failed to get rowkey exprs", K(ret)); } else if (OB_FAIL(ObOptimizerUtil::is_exprs_unique(rowkey_exprs,