Unify the processing logic of PDML and regular DML
This commit is contained in:
		| @ -36,11 +36,9 @@ int ObPDMLOpRowIterator::get_next_row(const ObExprPtrIArray &row) | |||||||
|     if (OB_ISNULL(eval_ctx_)) { |     if (OB_ISNULL(eval_ctx_)) { | ||||||
|       ret = OB_ERR_UNEXPECTED; |       ret = OB_ERR_UNEXPECTED; | ||||||
|       LOG_WARN("not init the eval_ctx", K(ret)); |       LOG_WARN("not init the eval_ctx", K(ret)); | ||||||
|     } else if (OB_SUCC(row_store_it_.get_next_row(row, *eval_ctx_))) { |     } else if (OB_FAIL(row_store_it_.get_next_row(row, *eval_ctx_))) { | ||||||
|       // we should do uniq row checking after data being stored in row_store |       if (OB_ITER_END != ret) { | ||||||
|       // rather than before this. because we need to return all rows unfiltered to upper ops. |         LOG_WARN("get next row from row store iter failed", K(ret)); | ||||||
|       if (uniq_row_checker_) { |  | ||||||
|         ret = uniq_row_checker_->check_rowkey_distinct(row, is_distinct); |  | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|   } while (OB_SUCC(ret) && !is_distinct); |   } while (OB_SUCC(ret) && !is_distinct); | ||||||
|  | |||||||
| @ -29,12 +29,6 @@ class ObNewRow; | |||||||
| namespace sql | namespace sql | ||||||
| { | { | ||||||
| struct ObDASTabletLoc; | struct ObDASTabletLoc; | ||||||
| class ObDMLOpUniqueRowChecker |  | ||||||
| { |  | ||||||
| public: |  | ||||||
|   virtual int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) = 0; |  | ||||||
| }; |  | ||||||
|  |  | ||||||
| class ObExecContext; | class ObExecContext; | ||||||
|  |  | ||||||
| // 单个分区的新引擎数据缓存器 | // 单个分区的新引擎数据缓存器 | ||||||
| @ -42,22 +36,18 @@ class ObPDMLOpRowIterator | |||||||
| { | { | ||||||
| public: | public: | ||||||
|     friend class ObPDMLOpBatchRowCache; |     friend class ObPDMLOpBatchRowCache; | ||||||
|     ObPDMLOpRowIterator() : eval_ctx_(nullptr), uniq_row_checker_(nullptr) {} |     ObPDMLOpRowIterator() : eval_ctx_(nullptr) {} | ||||||
|     virtual ~ObPDMLOpRowIterator() = default; |     virtual ~ObPDMLOpRowIterator() = default; | ||||||
|     // 获得row_store_it_中的下一行数据 |     // 获得row_store_it_中的下一行数据 | ||||||
|     // 返回的数据是对应存储数据的exprs |     // 返回的数据是对应存储数据的exprs | ||||||
|     int get_next_row(const ObExprPtrIArray &row); |     int get_next_row(const ObExprPtrIArray &row); | ||||||
|     void set_uniq_row_checker(ObDMLOpUniqueRowChecker *uniq_row_checker) |     void close() { row_store_it_.reset(); } | ||||||
|     { uniq_row_checker_ = uniq_row_checker; } |  | ||||||
|     void close() |  | ||||||
|     { row_store_it_.reset(); } |  | ||||||
| private: | private: | ||||||
|     int init_data_source(ObChunkDatumStore &row_datum_store, |     int init_data_source(ObChunkDatumStore &row_datum_store, | ||||||
|                          ObEvalCtx *eval_ctx); |                          ObEvalCtx *eval_ctx); | ||||||
| private: | private: | ||||||
|   ObChunkDatumStore::Iterator row_store_it_; |   ObChunkDatumStore::Iterator row_store_it_; | ||||||
|   ObEvalCtx *eval_ctx_; |   ObEvalCtx *eval_ctx_; | ||||||
|   ObDMLOpUniqueRowChecker *uniq_row_checker_; |  | ||||||
|   DISALLOW_COPY_AND_ASSIGN(ObPDMLOpRowIterator); |   DISALLOW_COPY_AND_ASSIGN(ObPDMLOpRowIterator); | ||||||
| }; | }; | ||||||
|  |  | ||||||
| @ -132,7 +122,8 @@ public: | |||||||
|   // 一般来说,分区id存储在行中的一个伪列里 |   // 一般来说,分区id存储在行中的一个伪列里 | ||||||
|   virtual int read_row(ObExecContext &ctx, |   virtual int read_row(ObExecContext &ctx, | ||||||
|                        const ObExprPtrIArray *&row, |                        const ObExprPtrIArray *&row, | ||||||
|                        common::ObTabletID &tablet_id) = 0; |                        common::ObTabletID &tablet_id, | ||||||
|  |                        bool &is_skipped) = 0; | ||||||
| }; | }; | ||||||
|  |  | ||||||
| class ObDMLOpDataWriter | class ObDMLOpDataWriter | ||||||
|  | |||||||
| @ -35,7 +35,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec, | |||||||
|                              ObDMLBaseRtDef &dml_rtdef, |                              ObDMLBaseRtDef &dml_rtdef, | ||||||
|                              ObDMLOpDataReader *reader, |                              ObDMLOpDataReader *reader, | ||||||
|                              ObDMLOpDataWriter *writer, |                              ObDMLOpDataWriter *writer, | ||||||
|                              ObDMLOpUniqueRowChecker *uniq_checker, |  | ||||||
|                              const bool is_heap_table_insert, |                              const bool is_heap_table_insert, | ||||||
|                              const bool with_barrier/*false*/) |                              const bool with_barrier/*false*/) | ||||||
| { | { | ||||||
| @ -53,7 +52,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec, | |||||||
|   } else { |   } else { | ||||||
|     reader_ = reader; |     reader_ = reader; | ||||||
|     writer_ = writer; |     writer_ = writer; | ||||||
|     uniq_checker_ = uniq_checker; |  | ||||||
|     dml_rtdef_ = &dml_rtdef; |     dml_rtdef_ = &dml_rtdef; | ||||||
|     is_heap_table_insert_ = is_heap_table_insert; |     is_heap_table_insert_ = is_heap_table_insert; | ||||||
|     with_barrier_ = with_barrier; |     with_barrier_ = with_barrier; | ||||||
| @ -179,13 +177,16 @@ int ObPDMLOpDataDriver::fill_cache_unitl_cache_full_or_child_iter_end(ObExecCont | |||||||
|     do { |     do { | ||||||
|       const ObExprPtrIArray *row = nullptr; |       const ObExprPtrIArray *row = nullptr; | ||||||
|       ObTabletID tablet_id; |       ObTabletID tablet_id; | ||||||
|       if (OB_FAIL(reader_->read_row(ctx, row, tablet_id))) { |       bool is_skipped = false; | ||||||
|  |       if (OB_FAIL(reader_->read_row(ctx, row, tablet_id, is_skipped))) { | ||||||
|         if (OB_ITER_END == ret) { |         if (OB_ITER_END == ret) { | ||||||
|           // 当前reader的数据已经读取结束 |           // 当前reader的数据已经读取结束 | ||||||
|           // do nothing |           // do nothing | ||||||
|         } else { |         } else { | ||||||
|           LOG_WARN("failed to read row from reader", K(ret)); |           LOG_WARN("failed to read row from reader", K(ret)); | ||||||
|         } |         } | ||||||
|  |       } else if (is_skipped) { | ||||||
|  |         //need to skip this row | ||||||
|       } else if (is_heap_table_insert_ && OB_FAIL(set_heap_table_hidden_pk(row, tablet_id))) { |       } else if (is_heap_table_insert_ && OB_FAIL(set_heap_table_hidden_pk(row, tablet_id))) { | ||||||
|         LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id)); |         LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id)); | ||||||
|       } else if (OB_FAIL(cache_.add_row(*row, tablet_id))) { |       } else if (OB_FAIL(cache_.add_row(*row, tablet_id))) { | ||||||
| @ -247,8 +248,6 @@ int ObPDMLOpDataDriver::write_partitions(ObExecContext &ctx) | |||||||
|         LOG_WARN("fail get row iterator", K(tablet_id), K(ret)); |         LOG_WARN("fail get row iterator", K(tablet_id), K(ret)); | ||||||
|       } else if (OB_FAIL(DAS_CTX(ctx).extended_tablet_loc(*table_loc, tablet_id, tablet_loc))) { |       } else if (OB_FAIL(DAS_CTX(ctx).extended_tablet_loc(*table_loc, tablet_id, tablet_loc))) { | ||||||
|         LOG_WARN("extended tablet location failed", K(ret)); |         LOG_WARN("extended tablet location failed", K(ret)); | ||||||
|       } else if (FALSE_IT(row_iter->set_uniq_row_checker(uniq_checker_))) { |  | ||||||
|         // nop |  | ||||||
|       } else if (OB_FAIL(writer_->write_rows(ctx, tablet_loc, *row_iter))) { |       } else if (OB_FAIL(writer_->write_rows(ctx, tablet_loc, *row_iter))) { | ||||||
|         LOG_WARN("fail write rows", K(tablet_id), K(ret)); |         LOG_WARN("fail write rows", K(tablet_id), K(ret)); | ||||||
|       } |       } | ||||||
| @ -392,7 +391,6 @@ int ObPDMLOpDataDriver::switch_row_iter_to_next_partition() | |||||||
|     LOG_WARN("failed to get next partition iterator", K(ret), |     LOG_WARN("failed to get next partition iterator", K(ret), | ||||||
|         "part_id", returning_ctx_.tablet_id_array_.at(next_idx), K(next_idx)); |         "part_id", returning_ctx_.tablet_id_array_.at(next_idx), K(next_idx)); | ||||||
|   } else { |   } else { | ||||||
|     returning_ctx_.row_iter_->set_uniq_row_checker(nullptr); |  | ||||||
|     returning_ctx_.next_idx_++; |     returning_ctx_.next_idx_++; | ||||||
|   } |   } | ||||||
|   return ret; |   return ret; | ||||||
|  | |||||||
| @ -40,7 +40,6 @@ public: | |||||||
|       cache_(eval_ctx, op_monitor_info), |       cache_(eval_ctx, op_monitor_info), | ||||||
|       reader_(nullptr), |       reader_(nullptr), | ||||||
|       writer_(nullptr), |       writer_(nullptr), | ||||||
|       uniq_checker_(nullptr), |  | ||||||
|       dml_rtdef_(nullptr), |       dml_rtdef_(nullptr), | ||||||
|       state_(FILL_CACHE), |       state_(FILL_CACHE), | ||||||
|       eval_ctx_(eval_ctx), |       eval_ctx_(eval_ctx), | ||||||
| @ -61,7 +60,6 @@ public: | |||||||
|            ObDMLBaseRtDef &dml_rtdef, |            ObDMLBaseRtDef &dml_rtdef, | ||||||
|            ObDMLOpDataReader *reader, |            ObDMLOpDataReader *reader, | ||||||
|            ObDMLOpDataWriter *writer, |            ObDMLOpDataWriter *writer, | ||||||
|            ObDMLOpUniqueRowChecker *uniq_checker, |  | ||||||
|            const bool is_heap_table_insert, |            const bool is_heap_table_insert, | ||||||
|            const bool with_barrier = false); |            const bool with_barrier = false); | ||||||
|  |  | ||||||
| @ -143,7 +141,6 @@ private: | |||||||
|   ObPDMLOpBatchRowCache cache_; // 用于缓存数据,需要在init函数中初始化,并且分配alloctor |   ObPDMLOpBatchRowCache cache_; // 用于缓存数据,需要在init函数中初始化,并且分配alloctor | ||||||
|   ObDMLOpDataReader *reader_; |   ObDMLOpDataReader *reader_; | ||||||
|   ObDMLOpDataWriter *writer_; |   ObDMLOpDataWriter *writer_; | ||||||
|   ObDMLOpUniqueRowChecker *uniq_checker_; |  | ||||||
|   ObDMLBaseRtDef *dml_rtdef_; |   ObDMLBaseRtDef *dml_rtdef_; | ||||||
|   DriverState state_; // Driver 当前状态:读写数据状态、向上返回数据状态 |   DriverState state_; // Driver 当前状态:读写数据状态、向上返回数据状态 | ||||||
|  |  | ||||||
|  | |||||||
| @ -65,7 +65,7 @@ int ObPxMultiPartDeleteOp::inner_open() | |||||||
|     LOG_WARN("failed to inner open", K(ret)); |     LOG_WARN("failed to inner open", K(ret)); | ||||||
|   } else if (OB_FAIL(ObDMLService::init_del_rtdef(dml_rtctx_, del_rtdef_, MY_SPEC.del_ctdef_))) { |   } else if (OB_FAIL(ObDMLService::init_del_rtdef(dml_rtctx_, del_rtdef_, MY_SPEC.del_ctdef_))) { | ||||||
|     LOG_WARN("init delete rtdef failed", K(ret)); |     LOG_WARN("init delete rtdef failed", K(ret)); | ||||||
|   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, this, false, MY_SPEC.with_barrier_))) { |   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, false, MY_SPEC.with_barrier_))) { | ||||||
|     LOG_WARN("failed to init data driver", K(ret)); |     LOG_WARN("failed to init data driver", K(ret)); | ||||||
|   } else if (MY_SPEC.with_barrier_) { |   } else if (MY_SPEC.with_barrier_) { | ||||||
|     if (OB_ISNULL(input_)) { |     if (OB_ISNULL(input_)) { | ||||||
| @ -81,24 +81,6 @@ int ObPxMultiPartDeleteOp::inner_open() | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| int ObPxMultiPartDeleteOp::check_rowkey_distinct(const ObExprPtrIArray &row, |  | ||||||
|                                                  bool &is_distinct) |  | ||||||
| { |  | ||||||
|   int ret = OB_SUCCESS; |  | ||||||
|   if (DistinctType::T_DISTINCT_NONE != MY_SPEC.del_ctdef_.distinct_algo_) { |  | ||||||
|     ret = ObDMLService::check_rowkey_whether_distinct(row, |  | ||||||
|                                                       MY_SPEC.del_ctdef_.distinct_algo_, |  | ||||||
|                                                       eval_ctx_, |  | ||||||
|                                                       ctx_, |  | ||||||
|                                                       del_rtdef_.table_rowkey_, |  | ||||||
|                                                       del_rtdef_.se_rowkey_dist_ctx_, |  | ||||||
|                                                       is_distinct); |  | ||||||
|   } else { |  | ||||||
|     is_distinct = true; |  | ||||||
|   } |  | ||||||
|   return ret; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| int ObPxMultiPartDeleteOp::inner_get_next_row() | int ObPxMultiPartDeleteOp::inner_get_next_row() | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
| @ -149,7 +131,8 @@ int ObPxMultiPartDeleteOp::inner_close() | |||||||
| //////////// pdml data interface implementation: reader & writer //////////// | //////////// pdml data interface implementation: reader & writer //////////// | ||||||
| int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, | int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, | ||||||
|                                     const ObExprPtrIArray *&row, |                                     const ObExprPtrIArray *&row, | ||||||
|                                     ObTabletID &tablet_id) |                                     ObTabletID &tablet_id, | ||||||
|  |                                     bool &is_skipped) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   UNUSED(ctx); |   UNUSED(ctx); | ||||||
| @ -164,6 +147,9 @@ int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, | |||||||
|   } else { |   } else { | ||||||
|     // 每一次从child节点获得新的数据都需要进行清除计算标记 |     // 每一次从child节点获得新的数据都需要进行清除计算标记 | ||||||
|     clear_evaluated_flag(); |     clear_evaluated_flag(); | ||||||
|  |     if (OB_FAIL(ObDMLService::process_delete_row(MY_SPEC.del_ctdef_, del_rtdef_, is_skipped, *this))) { | ||||||
|  |       LOG_WARN("process delete row failed", K(ret)); | ||||||
|  |     } else if (!is_skipped) { | ||||||
|       // 通过partition id expr获得对应行对应的分区 |       // 通过partition id expr获得对应行对应的分区 | ||||||
|       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); |       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); | ||||||
|       // 返回的值是child的output exprs |       // 返回的值是child的output exprs | ||||||
| @ -184,6 +170,7 @@ int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx, | |||||||
|         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); |         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |   } | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | |||||||
| @ -76,7 +76,6 @@ public: | |||||||
|  |  | ||||||
| class ObPxMultiPartDeleteOp : public ObDMLOpDataReader, | class ObPxMultiPartDeleteOp : public ObDMLOpDataReader, | ||||||
|                             public ObDMLOpDataWriter, |                             public ObDMLOpDataWriter, | ||||||
|                             public ObDMLOpUniqueRowChecker, |  | ||||||
|                             public ObTableModifyOp |                             public ObTableModifyOp | ||||||
| { | { | ||||||
|   OB_UNIS_VERSION(1); |   OB_UNIS_VERSION(1); | ||||||
| @ -96,7 +95,8 @@ public: | |||||||
|   // 同时还负责计算出这一行对应的 partition_id |   // 同时还负责计算出这一行对应的 partition_id | ||||||
|   int read_row(ObExecContext &ctx, |   int read_row(ObExecContext &ctx, | ||||||
|                const ObExprPtrIArray *&row, |                const ObExprPtrIArray *&row, | ||||||
|                common::ObTabletID &tablet_id) override; |                common::ObTabletID &tablet_id, | ||||||
|  |                bool &is_skipped) override; | ||||||
|   // impl. ObDMLDataWriter |   // impl. ObDMLDataWriter | ||||||
|   // 将缓存的数据批量写入到存储层 |   // 将缓存的数据批量写入到存储层 | ||||||
|   int write_rows(ObExecContext &ctx, |   int write_rows(ObExecContext &ctx, | ||||||
| @ -107,9 +107,6 @@ public: | |||||||
|   virtual int inner_get_next_row(); |   virtual int inner_get_next_row(); | ||||||
|   virtual int inner_open(); |   virtual int inner_open(); | ||||||
|   virtual int inner_close(); |   virtual int inner_close(); | ||||||
|  |  | ||||||
| private: |  | ||||||
|   int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) override; |  | ||||||
| private: | private: | ||||||
|   ObPDMLOpDataDriver data_driver_; |   ObPDMLOpDataDriver data_driver_; | ||||||
|   ObDelRtDef del_rtdef_; |   ObDelRtDef del_rtdef_; | ||||||
|  | |||||||
| @ -46,7 +46,7 @@ int ObPxMultiPartInsertOp::inner_open() | |||||||
|     ret = OB_ERR_UNEXPECTED; |     ret = OB_ERR_UNEXPECTED; | ||||||
|     LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); |     LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); | ||||||
|   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), ins_rtdef_, this, this, |   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), ins_rtdef_, this, this, | ||||||
|                                        nullptr, MY_SPEC.ins_ctdef_.is_heap_table_))) { |                                        MY_SPEC.ins_ctdef_.is_heap_table_))) { | ||||||
|     LOG_WARN("failed to init data driver", K(ret)); |     LOG_WARN("failed to init data driver", K(ret)); | ||||||
|   } |   } | ||||||
|   if (OB_SUCC(ret)) { |   if (OB_SUCC(ret)) { | ||||||
| @ -126,28 +126,11 @@ int ObPxMultiPartInsertOp::inner_close() | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| int ObPxMultiPartInsertOp::process_row() |  | ||||||
| { |  | ||||||
|   int ret = OB_SUCCESS; |  | ||||||
|   bool is_filtered = false; |  | ||||||
|   OZ (ObDMLService::check_row_null(MY_SPEC.ins_ctdef_.new_row_, |  | ||||||
|                                    eval_ctx_, |  | ||||||
|                                    ins_rtdef_.cur_row_num_, |  | ||||||
|                                    MY_SPEC.ins_ctdef_.column_infos_, |  | ||||||
|                                    MY_SPEC.ins_ctdef_.das_ctdef_, |  | ||||||
|                                    MY_SPEC.ins_ctdef_.is_single_value_, |  | ||||||
|                                    *this)); |  | ||||||
|   OZ(ObDMLService::filter_row_for_view_check(MY_SPEC.ins_ctdef_.view_check_exprs_, eval_ctx_, is_filtered)); |  | ||||||
|   OV(!is_filtered, OB_ERR_CHECK_OPTION_VIOLATED); |  | ||||||
|   OZ(ObDMLService::filter_row_for_check_cst(MY_SPEC.ins_ctdef_.check_cst_exprs_, eval_ctx_, is_filtered)); |  | ||||||
|   OV(!is_filtered, OB_ERR_CHECK_CONSTRAINT_VIOLATED); |  | ||||||
|   return ret; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| //////////// pdml data interface implementation: reader & writer //////////// | //////////// pdml data interface implementation: reader & writer //////////// | ||||||
| int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, | int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, | ||||||
|                                     const ObExprPtrIArray *&row, |                                     const ObExprPtrIArray *&row, | ||||||
|                                     common::ObTabletID &tablet_id) |                                     common::ObTabletID &tablet_id, | ||||||
|  |                                     bool &is_skipped) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   UNUSED(ctx); |   UNUSED(ctx); | ||||||
| @ -162,6 +145,9 @@ int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, | |||||||
|   } else { |   } else { | ||||||
|     // 每一次从child节点获得新的数据都需要进行清除计算标记 |     // 每一次从child节点获得新的数据都需要进行清除计算标记 | ||||||
|     clear_evaluated_flag(); |     clear_evaluated_flag(); | ||||||
|  |     if (OB_FAIL(ObDMLService::process_insert_row(MY_SPEC.ins_ctdef_, ins_rtdef_, *this, is_skipped))) { | ||||||
|  |       LOG_WARN("process insert row failed", K(ret)); | ||||||
|  |     } else if (!is_skipped) { | ||||||
|       // 通过partition id expr获得对应行对应的分区 |       // 通过partition id expr获得对应行对应的分区 | ||||||
|       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); |       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); | ||||||
|       // 返回的值是child的output exprs |       // 返回的值是child的output exprs | ||||||
| @ -181,13 +167,9 @@ int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx, | |||||||
|         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); |         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|   if (!MY_SPEC.is_pdml_index_maintain_ && OB_SUCC(ret)) { |  | ||||||
|     if (OB_FAIL(process_row())) { |  | ||||||
|       LOG_WARN("fail process row", K(ret)); |  | ||||||
|     } |  | ||||||
|   } |   } | ||||||
|   if (OB_SUCC(ret)) { |   if (OB_SUCC(ret)) { | ||||||
|     LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id)); |     LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id), K(is_skipped)); | ||||||
|   } |   } | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  | |||||||
| @ -93,7 +93,10 @@ public: | |||||||
| public: | public: | ||||||
|   virtual bool has_foreign_key() const  { return false; } // 默认实现,先不考虑外键的问题 |   virtual bool has_foreign_key() const  { return false; } // 默认实现,先不考虑外键的问题 | ||||||
|  |  | ||||||
|   int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, common::ObTabletID &tablet_id) override; |   int read_row(ObExecContext &ctx, | ||||||
|  |                const ObExprPtrIArray *&row, | ||||||
|  |                common::ObTabletID &tablet_id, | ||||||
|  |                bool &is_skipped) override; | ||||||
|   int write_rows(ObExecContext &ctx, |   int write_rows(ObExecContext &ctx, | ||||||
|                  const ObDASTabletLoc *tablet_loc, |                  const ObDASTabletLoc *tablet_loc, | ||||||
|                  ObPDMLOpRowIterator &iterator) override; |                  ObPDMLOpRowIterator &iterator) override; | ||||||
| @ -101,8 +104,6 @@ public: | |||||||
|   virtual int inner_get_next_row(); |   virtual int inner_get_next_row(); | ||||||
|   virtual int inner_open(); |   virtual int inner_open(); | ||||||
|   virtual int inner_close(); |   virtual int inner_close(); | ||||||
| private: |  | ||||||
|   int process_row(); |  | ||||||
| protected: | protected: | ||||||
|   ObPDMLOpDataDriver data_driver_; |   ObPDMLOpDataDriver data_driver_; | ||||||
|   ObInsRtDef ins_rtdef_; |   ObInsRtDef ins_rtdef_; | ||||||
|  | |||||||
| @ -36,8 +36,7 @@ int ObPxMultiPartUpdateOp::inner_open() | |||||||
|   } else if (!(MY_SPEC.row_desc_.is_valid())) { |   } else if (!(MY_SPEC.row_desc_.is_valid())) { | ||||||
|     ret = OB_ERR_UNEXPECTED; |     ret = OB_ERR_UNEXPECTED; | ||||||
|     LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); |     LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_)); | ||||||
|   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this, |   } else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this, false))) { | ||||||
|                                        nullptr, false))) { |  | ||||||
|     LOG_WARN("failed to init data driver", K(ret)); |     LOG_WARN("failed to init data driver", K(ret)); | ||||||
|   } else if (OB_FAIL(ObDMLService::init_upd_rtdef(dml_rtctx_, |   } else if (OB_FAIL(ObDMLService::init_upd_rtdef(dml_rtctx_, | ||||||
|                                                   upd_rtdef_, |                                                   upd_rtdef_, | ||||||
| @ -100,17 +99,9 @@ int ObPxMultiPartUpdateOp::inner_close() | |||||||
| int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) | int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   bool is_skipped = false; |  | ||||||
|   ObChunkDatumStore::StoredRow* stored_row = nullptr; |   ObChunkDatumStore::StoredRow* stored_row = nullptr; | ||||||
|   ++upd_rtdef_.cur_row_num_; |   if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) { | ||||||
|   if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) { |  | ||||||
|     LOG_WARN("process update row failed", K(ret)); |  | ||||||
|   } else if (is_skipped) { |  | ||||||
|     //do nothing |  | ||||||
|   } else if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) { |  | ||||||
|     LOG_WARN("insert row with das failed", K(ret)); |     LOG_WARN("insert row with das failed", K(ret)); | ||||||
|   } else { |  | ||||||
|     ++upd_rtdef_.found_rows_; |  | ||||||
|   } |   } | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
| @ -118,12 +109,12 @@ int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) | |||||||
| //////////// pdml data interface implementation: reader & writer //////////// | //////////// pdml data interface implementation: reader & writer //////////// | ||||||
| int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, | int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, | ||||||
|                                     const ObExprPtrIArray *&row, |                                     const ObExprPtrIArray *&row, | ||||||
|                                     common::ObTabletID &tablet_id) |                                     common::ObTabletID &tablet_id, | ||||||
|  |                                     bool &is_skipped) | ||||||
| { | { | ||||||
|   // 从child中读取数据,数据存储在child的output exprs中 |   // 从child中读取数据,数据存储在child的output exprs中 | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|   ObPhysicalPlanCtx *plan_ctx = NULL; |   ObPhysicalPlanCtx *plan_ctx = NULL; | ||||||
|   bool is_update_timestamp = false; |  | ||||||
|   if (OB_ISNULL(plan_ctx = ctx.get_physical_plan_ctx())) { |   if (OB_ISNULL(plan_ctx = ctx.get_physical_plan_ctx())) { | ||||||
|     ret = OB_ERR_UNEXPECTED; |     ret = OB_ERR_UNEXPECTED; | ||||||
|     LOG_WARN("get physical plan context failed", K(ret)); |     LOG_WARN("get physical plan context failed", K(ret)); | ||||||
| @ -137,7 +128,12 @@ int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, | |||||||
|   } else { |   } else { | ||||||
|     // 每一次从child节点获得新的数据都需要进行清除计算标记 |     // 每一次从child节点获得新的数据都需要进行清除计算标记 | ||||||
|     clear_evaluated_flag(); |     clear_evaluated_flag(); | ||||||
|  |     ++upd_rtdef_.cur_row_num_; | ||||||
|  |     if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) { | ||||||
|  |       LOG_WARN("process update row failed", K(ret)); | ||||||
|  |     } else if (!is_skipped) { | ||||||
|       // 通过partition id expr获得对应行对应的分区 |       // 通过partition id expr获得对应行对应的分区 | ||||||
|  |       ++upd_rtdef_.found_rows_; | ||||||
|       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); |       const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index(); | ||||||
|       // 返回的值是child的output exprs |       // 返回的值是child的output exprs | ||||||
|       row = &child_->get_spec().output_; |       row = &child_->get_spec().output_; | ||||||
| @ -155,60 +151,11 @@ int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx, | |||||||
|         tablet_id = expr_datum.get_int(); |         tablet_id = expr_datum.get_int(); | ||||||
|         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); |         LOG_DEBUG("get the part id", K(ret), K(expr_datum)); | ||||||
|       } |       } | ||||||
|  |  | ||||||
|     // 做auto_failed_timestamp的检查 |  | ||||||
|     // 如果assign_column包含了ON UPDATE CURRENT_TIMESTAMP类型的时间戳列 |  | ||||||
|     // 第一步先检查主表是否发生update,如果主表当前行没有发生update,那么吧old_row中的当前列拷贝到new_row中 |  | ||||||
|     // pdml的索引表在检查当前行是否发生update时,不会跳过ON UPDATE CURRENT_TIMESTAMP列,会比较value |  | ||||||
|     if (MY_SPEC.upd_ctdef_.is_primary_index_ |  | ||||||
|         && lib::is_mysql_mode() |  | ||||||
|         && is_update_auto_filled_timestamp()) { |  | ||||||
|       // update the timestamp column, then check whether row is updated |  | ||||||
|       if (OB_FAIL(ObDMLService::check_row_whether_changed(MY_SPEC.upd_ctdef_, upd_rtdef_, get_eval_ctx()))) { |  | ||||||
|         LOG_WARN("fail to check_row_whether_changed", K(ret)); |  | ||||||
|       } else if (!upd_rtdef_.is_row_changed_) { |  | ||||||
|         // copy old_row timestamp column as new_row timestamp column |  | ||||||
|         const ObExprPtrIArray &old_row = MY_SPEC.upd_ctdef_.old_row_; |  | ||||||
|         const ObExprPtrIArray &new_row = MY_SPEC.upd_ctdef_.new_row_; |  | ||||||
|         FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, OB_SUCC(ret)) { |  | ||||||
|           const uint64_t idx = info->projector_index_; |  | ||||||
|           if (info->auto_filled_timestamp_) { |  | ||||||
|             ObDatum *old_datum = NULL; |  | ||||||
|             ObDatum *new_datum = NULL; |  | ||||||
|             if (OB_FAIL(old_row.at(idx)->eval(get_eval_ctx(), old_datum)) |  | ||||||
|                 || OB_FAIL(new_row.at(idx)->eval(get_eval_ctx(), new_datum))) { |  | ||||||
|               LOG_WARN("evaluate value failed", K(ret)); |  | ||||||
|             } else if (OB_ISNULL(old_datum) || OB_ISNULL(new_datum)) { |  | ||||||
|               ret = OB_ERR_UNEXPECTED; |  | ||||||
|               LOG_WARN("datum is null, unexpected", K(ret), KPC(old_datum), KPC(new_datum)); |  | ||||||
|             } else { |  | ||||||
|               new_row.at(idx)->locate_datum_for_write(get_eval_ctx()) = *old_datum; |  | ||||||
|               new_row.at(idx)->set_evaluated_projected(get_eval_ctx()); |  | ||||||
|     } |     } | ||||||
|             LOG_TRACE("after copy timestamp column to new_row", K(ret), K(idx), |  | ||||||
|                       "old_row", ROWEXPR2STR(get_eval_ctx(), old_row), |  | ||||||
|                       "new_row", ROWEXPR2STR(get_eval_ctx(), new_row)); |  | ||||||
|           } |  | ||||||
|         } |  | ||||||
|       } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|   } |   } | ||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| bool ObPxMultiPartUpdateOp::is_update_auto_filled_timestamp() |  | ||||||
| { |  | ||||||
|   bool updated = false; |  | ||||||
|   FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, !updated) { |  | ||||||
|     const uint64_t idx = info->projector_index_; |  | ||||||
|     if (info->auto_filled_timestamp_) { |  | ||||||
|       updated = true; |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
|   return updated; |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| int ObPxMultiPartUpdateOp::write_rows(ObExecContext &ctx, | int ObPxMultiPartUpdateOp::write_rows(ObExecContext &ctx, | ||||||
|                                       const ObDASTabletLoc *tablet_loc, |                                       const ObDASTabletLoc *tablet_loc, | ||||||
|  | |||||||
| @ -79,14 +79,13 @@ public: | |||||||
| public: | public: | ||||||
|   int read_row(ObExecContext &ctx, |   int read_row(ObExecContext &ctx, | ||||||
|                const ObExprPtrIArray *&row, |                const ObExprPtrIArray *&row, | ||||||
|                common::ObTabletID &tablet_id) override; |                common::ObTabletID &tablet_id, | ||||||
|  |                bool &is_skipped) override; | ||||||
|  |  | ||||||
|   int write_rows(ObExecContext &ctx, |   int write_rows(ObExecContext &ctx, | ||||||
|                  const ObDASTabletLoc *tablet_loc, |                  const ObDASTabletLoc *tablet_loc, | ||||||
|                  ObPDMLOpRowIterator &iterator) override; |                  ObPDMLOpRowIterator &iterator) override; | ||||||
|  |  | ||||||
|   bool is_update_auto_filled_timestamp(); |  | ||||||
|  |  | ||||||
|   virtual int inner_get_next_row(); |   virtual int inner_get_next_row(); | ||||||
|   virtual int inner_open(); |   virtual int inner_open(); | ||||||
|   virtual int inner_close(); |   virtual int inner_close(); | ||||||
|  | |||||||
| @ -264,8 +264,8 @@ int ObDelUpdLogPlan::check_table_rowkey_distinct( | |||||||
|       if (OB_ISNULL(index_dml_info)) { |       if (OB_ISNULL(index_dml_info)) { | ||||||
|         ret = OB_ERR_UNEXPECTED; |         ret = OB_ERR_UNEXPECTED; | ||||||
|         LOG_WARN("index dml info is null", K(ret)); |         LOG_WARN("index dml info is null", K(ret)); | ||||||
|       } else if (!use_pdml() && !index_dml_info->is_primary_index_) { |       } else if (!index_dml_info->is_primary_index_) { | ||||||
|         // for PDML, primary table & index table both need unique checker. |         // only primary table need unique checker. | ||||||
|       } else if (OB_FAIL(index_dml_info->get_rowkey_exprs(rowkey_exprs))) { |       } else if (OB_FAIL(index_dml_info->get_rowkey_exprs(rowkey_exprs))) { | ||||||
|         LOG_WARN("failed to get rowkey exprs", K(ret)); |         LOG_WARN("failed to get rowkey exprs", K(ret)); | ||||||
|       } else if (OB_FAIL(ObOptimizerUtil::is_exprs_unique(rowkey_exprs, |       } else if (OB_FAIL(ObOptimizerUtil::is_exprs_unique(rowkey_exprs, | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 leslieyuchen
					leslieyuchen