Unify the processing logic of PDML and regular DML

This commit is contained in:
leslieyuchen
2024-02-08 11:54:53 +00:00
committed by ob-robot
parent 84e929fe7b
commit 535924fba1
11 changed files with 94 additions and 197 deletions

View File

@ -36,11 +36,9 @@ int ObPDMLOpRowIterator::get_next_row(const ObExprPtrIArray &row)
if (OB_ISNULL(eval_ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not init the eval_ctx", K(ret));
} else if (OB_SUCC(row_store_it_.get_next_row(row, *eval_ctx_))) {
// we should do uniq row checking after data being stored in row_store
// rather than before this. because we need to return all rows unfiltered to upper ops.
if (uniq_row_checker_) {
ret = uniq_row_checker_->check_rowkey_distinct(row, is_distinct);
} else if (OB_FAIL(row_store_it_.get_next_row(row, *eval_ctx_))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row from row store iter failed", K(ret));
}
}
} while (OB_SUCC(ret) && !is_distinct);

View File

@ -29,12 +29,6 @@ class ObNewRow;
namespace sql
{
struct ObDASTabletLoc;
class ObDMLOpUniqueRowChecker
{
public:
virtual int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) = 0;
};
class ObExecContext;
// 单个分区的新引擎数据缓存器
@ -42,22 +36,18 @@ class ObPDMLOpRowIterator
{
public:
friend class ObPDMLOpBatchRowCache;
ObPDMLOpRowIterator() : eval_ctx_(nullptr), uniq_row_checker_(nullptr) {}
ObPDMLOpRowIterator() : eval_ctx_(nullptr) {}
virtual ~ObPDMLOpRowIterator() = default;
// 获得row_store_it_中的下一行数据
// 返回的数据是对应存储数据的exprs
int get_next_row(const ObExprPtrIArray &row);
void set_uniq_row_checker(ObDMLOpUniqueRowChecker *uniq_row_checker)
{ uniq_row_checker_ = uniq_row_checker; }
void close()
{ row_store_it_.reset(); }
void close() { row_store_it_.reset(); }
private:
int init_data_source(ObChunkDatumStore &row_datum_store,
ObEvalCtx *eval_ctx);
private:
ObChunkDatumStore::Iterator row_store_it_;
ObEvalCtx *eval_ctx_;
ObDMLOpUniqueRowChecker *uniq_row_checker_;
DISALLOW_COPY_AND_ASSIGN(ObPDMLOpRowIterator);
};
@ -132,7 +122,8 @@ public:
// 一般来说,分区id存储在行中的一个伪列里
virtual int read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id) = 0;
common::ObTabletID &tablet_id,
bool &is_skipped) = 0;
};
class ObDMLOpDataWriter

View File

@ -35,7 +35,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec,
ObDMLBaseRtDef &dml_rtdef,
ObDMLOpDataReader *reader,
ObDMLOpDataWriter *writer,
ObDMLOpUniqueRowChecker *uniq_checker,
const bool is_heap_table_insert,
const bool with_barrier/*false*/)
{
@ -53,7 +52,6 @@ int ObPDMLOpDataDriver::init(const ObTableModifySpec &spec,
} else {
reader_ = reader;
writer_ = writer;
uniq_checker_ = uniq_checker;
dml_rtdef_ = &dml_rtdef;
is_heap_table_insert_ = is_heap_table_insert;
with_barrier_ = with_barrier;
@ -179,13 +177,16 @@ int ObPDMLOpDataDriver::fill_cache_unitl_cache_full_or_child_iter_end(ObExecCont
do {
const ObExprPtrIArray *row = nullptr;
ObTabletID tablet_id;
if (OB_FAIL(reader_->read_row(ctx, row, tablet_id))) {
bool is_skipped = false;
if (OB_FAIL(reader_->read_row(ctx, row, tablet_id, is_skipped))) {
if (OB_ITER_END == ret) {
// 当前reader的数据已经读取结束
// do nothing
} else {
LOG_WARN("failed to read row from reader", K(ret));
}
} else if (is_skipped) {
//need to skip this row
} else if (is_heap_table_insert_ && OB_FAIL(set_heap_table_hidden_pk(row, tablet_id))) {
LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id));
} else if (OB_FAIL(cache_.add_row(*row, tablet_id))) {
@ -247,8 +248,6 @@ int ObPDMLOpDataDriver::write_partitions(ObExecContext &ctx)
LOG_WARN("fail get row iterator", K(tablet_id), K(ret));
} else if (OB_FAIL(DAS_CTX(ctx).extended_tablet_loc(*table_loc, tablet_id, tablet_loc))) {
LOG_WARN("extended tablet location failed", K(ret));
} else if (FALSE_IT(row_iter->set_uniq_row_checker(uniq_checker_))) {
// nop
} else if (OB_FAIL(writer_->write_rows(ctx, tablet_loc, *row_iter))) {
LOG_WARN("fail write rows", K(tablet_id), K(ret));
}
@ -392,7 +391,6 @@ int ObPDMLOpDataDriver::switch_row_iter_to_next_partition()
LOG_WARN("failed to get next partition iterator", K(ret),
"part_id", returning_ctx_.tablet_id_array_.at(next_idx), K(next_idx));
} else {
returning_ctx_.row_iter_->set_uniq_row_checker(nullptr);
returning_ctx_.next_idx_++;
}
return ret;

View File

@ -40,7 +40,6 @@ public:
cache_(eval_ctx, op_monitor_info),
reader_(nullptr),
writer_(nullptr),
uniq_checker_(nullptr),
dml_rtdef_(nullptr),
state_(FILL_CACHE),
eval_ctx_(eval_ctx),
@ -61,7 +60,6 @@ public:
ObDMLBaseRtDef &dml_rtdef,
ObDMLOpDataReader *reader,
ObDMLOpDataWriter *writer,
ObDMLOpUniqueRowChecker *uniq_checker,
const bool is_heap_table_insert,
const bool with_barrier = false);
@ -143,7 +141,6 @@ private:
ObPDMLOpBatchRowCache cache_; // 用于缓存数据,需要在init函数中初始化,并且分配alloctor
ObDMLOpDataReader *reader_;
ObDMLOpDataWriter *writer_;
ObDMLOpUniqueRowChecker *uniq_checker_;
ObDMLBaseRtDef *dml_rtdef_;
DriverState state_; // Driver 当前状态:读写数据状态、向上返回数据状态

View File

@ -65,7 +65,7 @@ int ObPxMultiPartDeleteOp::inner_open()
LOG_WARN("failed to inner open", K(ret));
} else if (OB_FAIL(ObDMLService::init_del_rtdef(dml_rtctx_, del_rtdef_, MY_SPEC.del_ctdef_))) {
LOG_WARN("init delete rtdef failed", K(ret));
} else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, this, false, MY_SPEC.with_barrier_))) {
} else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), del_rtdef_, this, this, false, MY_SPEC.with_barrier_))) {
LOG_WARN("failed to init data driver", K(ret));
} else if (MY_SPEC.with_barrier_) {
if (OB_ISNULL(input_)) {
@ -81,24 +81,6 @@ int ObPxMultiPartDeleteOp::inner_open()
return ret;
}
int ObPxMultiPartDeleteOp::check_rowkey_distinct(const ObExprPtrIArray &row,
bool &is_distinct)
{
int ret = OB_SUCCESS;
if (DistinctType::T_DISTINCT_NONE != MY_SPEC.del_ctdef_.distinct_algo_) {
ret = ObDMLService::check_rowkey_whether_distinct(row,
MY_SPEC.del_ctdef_.distinct_algo_,
eval_ctx_,
ctx_,
del_rtdef_.table_rowkey_,
del_rtdef_.se_rowkey_dist_ctx_,
is_distinct);
} else {
is_distinct = true;
}
return ret;
}
int ObPxMultiPartDeleteOp::inner_get_next_row()
{
int ret = OB_SUCCESS;
@ -149,7 +131,8 @@ int ObPxMultiPartDeleteOp::inner_close()
//////////// pdml data interface implementation: reader & writer ////////////
int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
ObTabletID &tablet_id)
ObTabletID &tablet_id,
bool &is_skipped)
{
int ret = OB_SUCCESS;
UNUSED(ctx);
@ -164,6 +147,9 @@ int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx,
} else {
// 每一次从child节点获得新的数据都需要进行清除计算标记
clear_evaluated_flag();
if (OB_FAIL(ObDMLService::process_delete_row(MY_SPEC.del_ctdef_, del_rtdef_, is_skipped, *this))) {
LOG_WARN("process delete row failed", K(ret));
} else if (!is_skipped) {
// 通过partition id expr获得对应行对应的分区
const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index();
// 返回的值是child的output exprs
@ -184,6 +170,7 @@ int ObPxMultiPartDeleteOp::read_row(ObExecContext &ctx,
LOG_DEBUG("get the part id", K(ret), K(expr_datum));
}
}
}
return ret;
}

View File

@ -76,7 +76,6 @@ public:
class ObPxMultiPartDeleteOp : public ObDMLOpDataReader,
public ObDMLOpDataWriter,
public ObDMLOpUniqueRowChecker,
public ObTableModifyOp
{
OB_UNIS_VERSION(1);
@ -96,7 +95,8 @@ public:
// 同时还负责计算出这一行对应的 partition_id
int read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id) override;
common::ObTabletID &tablet_id,
bool &is_skipped) override;
// impl. ObDMLDataWriter
// 将缓存的数据批量写入到存储层
int write_rows(ObExecContext &ctx,
@ -107,9 +107,6 @@ public:
virtual int inner_get_next_row();
virtual int inner_open();
virtual int inner_close();
private:
int check_rowkey_distinct(const ObExprPtrIArray &row, bool &is_distinct) override;
private:
ObPDMLOpDataDriver data_driver_;
ObDelRtDef del_rtdef_;

View File

@ -46,7 +46,7 @@ int ObPxMultiPartInsertOp::inner_open()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_));
} else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), ins_rtdef_, this, this,
nullptr, MY_SPEC.ins_ctdef_.is_heap_table_))) {
MY_SPEC.ins_ctdef_.is_heap_table_))) {
LOG_WARN("failed to init data driver", K(ret));
}
if (OB_SUCC(ret)) {
@ -126,28 +126,11 @@ int ObPxMultiPartInsertOp::inner_close()
return ret;
}
int ObPxMultiPartInsertOp::process_row()
{
int ret = OB_SUCCESS;
bool is_filtered = false;
OZ (ObDMLService::check_row_null(MY_SPEC.ins_ctdef_.new_row_,
eval_ctx_,
ins_rtdef_.cur_row_num_,
MY_SPEC.ins_ctdef_.column_infos_,
MY_SPEC.ins_ctdef_.das_ctdef_,
MY_SPEC.ins_ctdef_.is_single_value_,
*this));
OZ(ObDMLService::filter_row_for_view_check(MY_SPEC.ins_ctdef_.view_check_exprs_, eval_ctx_, is_filtered));
OV(!is_filtered, OB_ERR_CHECK_OPTION_VIOLATED);
OZ(ObDMLService::filter_row_for_check_cst(MY_SPEC.ins_ctdef_.check_cst_exprs_, eval_ctx_, is_filtered));
OV(!is_filtered, OB_ERR_CHECK_CONSTRAINT_VIOLATED);
return ret;
}
//////////// pdml data interface implementation: reader & writer ////////////
int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id)
common::ObTabletID &tablet_id,
bool &is_skipped)
{
int ret = OB_SUCCESS;
UNUSED(ctx);
@ -162,6 +145,9 @@ int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx,
} else {
// 每一次从child节点获得新的数据都需要进行清除计算标记
clear_evaluated_flag();
if (OB_FAIL(ObDMLService::process_insert_row(MY_SPEC.ins_ctdef_, ins_rtdef_, *this, is_skipped))) {
LOG_WARN("process insert row failed", K(ret));
} else if (!is_skipped) {
// 通过partition id expr获得对应行对应的分区
const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index();
// 返回的值是child的output exprs
@ -181,13 +167,9 @@ int ObPxMultiPartInsertOp::read_row(ObExecContext &ctx,
LOG_DEBUG("get the part id", K(ret), K(expr_datum));
}
}
if (!MY_SPEC.is_pdml_index_maintain_ && OB_SUCC(ret)) {
if (OB_FAIL(process_row())) {
LOG_WARN("fail process row", K(ret));
}
}
if (OB_SUCC(ret)) {
LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id));
LOG_TRACE("read row from pdml cache", "read_row", ROWEXPR2STR(eval_ctx_, *row), K(tablet_id), K(is_skipped));
}
return ret;
}

View File

@ -93,7 +93,10 @@ public:
public:
virtual bool has_foreign_key() const { return false; } // 默认实现,先不考虑外键的问题
int read_row(ObExecContext &ctx, const ObExprPtrIArray *&row, common::ObTabletID &tablet_id) override;
int read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id,
bool &is_skipped) override;
int write_rows(ObExecContext &ctx,
const ObDASTabletLoc *tablet_loc,
ObPDMLOpRowIterator &iterator) override;
@ -101,8 +104,6 @@ public:
virtual int inner_get_next_row();
virtual int inner_open();
virtual int inner_close();
private:
int process_row();
protected:
ObPDMLOpDataDriver data_driver_;
ObInsRtDef ins_rtdef_;

View File

@ -36,8 +36,7 @@ int ObPxMultiPartUpdateOp::inner_open()
} else if (!(MY_SPEC.row_desc_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table or row desc is invalid", K(ret), K(MY_SPEC.row_desc_));
} else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this,
nullptr, false))) {
} else if (OB_FAIL(data_driver_.init(get_spec(), ctx_.get_allocator(), upd_rtdef_, this, this, false))) {
LOG_WARN("failed to init data driver", K(ret));
} else if (OB_FAIL(ObDMLService::init_upd_rtdef(dml_rtctx_,
upd_rtdef_,
@ -100,17 +99,9 @@ int ObPxMultiPartUpdateOp::inner_close()
int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc)
{
int ret = OB_SUCCESS;
bool is_skipped = false;
ObChunkDatumStore::StoredRow* stored_row = nullptr;
++upd_rtdef_.cur_row_num_;
if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) {
LOG_WARN("process update row failed", K(ret));
} else if (is_skipped) {
//do nothing
} else if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) {
if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) {
LOG_WARN("insert row with das failed", K(ret));
} else {
++upd_rtdef_.found_rows_;
}
return ret;
}
@ -118,12 +109,12 @@ int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc)
//////////// pdml data interface implementation: reader & writer ////////////
int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id)
common::ObTabletID &tablet_id,
bool &is_skipped)
{
// 从child中读取数据,数据存储在child的output exprs中
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = NULL;
bool is_update_timestamp = false;
if (OB_ISNULL(plan_ctx = ctx.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get physical plan context failed", K(ret));
@ -137,7 +128,12 @@ int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx,
} else {
// 每一次从child节点获得新的数据都需要进行清除计算标记
clear_evaluated_flag();
++upd_rtdef_.cur_row_num_;
if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) {
LOG_WARN("process update row failed", K(ret));
} else if (!is_skipped) {
// 通过partition id expr获得对应行对应的分区
++upd_rtdef_.found_rows_;
const int64_t part_id_idx = MY_SPEC.row_desc_.get_part_id_index();
// 返回的值是child的output exprs
row = &child_->get_spec().output_;
@ -155,60 +151,11 @@ int ObPxMultiPartUpdateOp::read_row(ObExecContext &ctx,
tablet_id = expr_datum.get_int();
LOG_DEBUG("get the part id", K(ret), K(expr_datum));
}
// 做auto_failed_timestamp的检查
// 如果assign_column包含了ON UPDATE CURRENT_TIMESTAMP类型的时间戳列
// 第一步先检查主表是否发生update,如果主表当前行没有发生update,那么吧old_row中的当前列拷贝到new_row中
// pdml的索引表在检查当前行是否发生update时,不会跳过ON UPDATE CURRENT_TIMESTAMP列,会比较value
if (MY_SPEC.upd_ctdef_.is_primary_index_
&& lib::is_mysql_mode()
&& is_update_auto_filled_timestamp()) {
// update the timestamp column, then check whether row is updated
if (OB_FAIL(ObDMLService::check_row_whether_changed(MY_SPEC.upd_ctdef_, upd_rtdef_, get_eval_ctx()))) {
LOG_WARN("fail to check_row_whether_changed", K(ret));
} else if (!upd_rtdef_.is_row_changed_) {
// copy old_row timestamp column as new_row timestamp column
const ObExprPtrIArray &old_row = MY_SPEC.upd_ctdef_.old_row_;
const ObExprPtrIArray &new_row = MY_SPEC.upd_ctdef_.new_row_;
FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, OB_SUCC(ret)) {
const uint64_t idx = info->projector_index_;
if (info->auto_filled_timestamp_) {
ObDatum *old_datum = NULL;
ObDatum *new_datum = NULL;
if (OB_FAIL(old_row.at(idx)->eval(get_eval_ctx(), old_datum))
|| OB_FAIL(new_row.at(idx)->eval(get_eval_ctx(), new_datum))) {
LOG_WARN("evaluate value failed", K(ret));
} else if (OB_ISNULL(old_datum) || OB_ISNULL(new_datum)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("datum is null, unexpected", K(ret), KPC(old_datum), KPC(new_datum));
} else {
new_row.at(idx)->locate_datum_for_write(get_eval_ctx()) = *old_datum;
new_row.at(idx)->set_evaluated_projected(get_eval_ctx());
}
LOG_TRACE("after copy timestamp column to new_row", K(ret), K(idx),
"old_row", ROWEXPR2STR(get_eval_ctx(), old_row),
"new_row", ROWEXPR2STR(get_eval_ctx(), new_row));
}
}
}
}
}
return ret;
}
bool ObPxMultiPartUpdateOp::is_update_auto_filled_timestamp()
{
bool updated = false;
FOREACH_CNT_X(info, MY_SPEC.upd_ctdef_.assign_columns_, !updated) {
const uint64_t idx = info->projector_index_;
if (info->auto_filled_timestamp_) {
updated = true;
}
}
return updated;
}
int ObPxMultiPartUpdateOp::write_rows(ObExecContext &ctx,
const ObDASTabletLoc *tablet_loc,

View File

@ -79,14 +79,13 @@ public:
public:
int read_row(ObExecContext &ctx,
const ObExprPtrIArray *&row,
common::ObTabletID &tablet_id) override;
common::ObTabletID &tablet_id,
bool &is_skipped) override;
int write_rows(ObExecContext &ctx,
const ObDASTabletLoc *tablet_loc,
ObPDMLOpRowIterator &iterator) override;
bool is_update_auto_filled_timestamp();
virtual int inner_get_next_row();
virtual int inner_open();
virtual int inner_close();

View File

@ -264,8 +264,8 @@ int ObDelUpdLogPlan::check_table_rowkey_distinct(
if (OB_ISNULL(index_dml_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index dml info is null", K(ret));
} else if (!use_pdml() && !index_dml_info->is_primary_index_) {
// for PDML, primary table & index table both need unique checker.
} else if (!index_dml_info->is_primary_index_) {
// only primary table need unique checker.
} else if (OB_FAIL(index_dml_info->get_rowkey_exprs(rowkey_exprs))) {
LOG_WARN("failed to get rowkey exprs", K(ret));
} else if (OB_FAIL(ObOptimizerUtil::is_exprs_unique(rowkey_exprs,