diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 738af8ff13..96b0520e3c 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -562,6 +562,7 @@ class EventTable EN_DAS_SIMULATE_VT_CREATE_ERROR = 305, EN_DAS_SIMULATE_LOOKUPOP_INIT_ERROR = 306, EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT = 307, + EN_DAS_SIMULATE_DUMP_WRITE_BUFFER = 308, EN_REPLAY_STORAGE_SCHEMA_FAILURE = 351, EN_SKIP_GET_STORAGE_SCHEMA = 352, diff --git a/src/objit/include/objit/common/ob_item_type.h b/src/objit/include/objit/common/ob_item_type.h index 8a9fa43e5f..1387e9d633 100755 --- a/src/objit/include/objit/common/ob_item_type.h +++ b/src/objit/include/objit/common/ob_item_type.h @@ -833,6 +833,7 @@ typedef enum ObItemType T_PSEUDO_STMT_ID = 3009, T_PSEUDO_RANDOM = 3010, T_INNER_WF_AGGR_STAUTS = 3011, + T_PSEUDO_ROW_TRANS_INFO_COLUMN = 3012, // trans_info, only for defensive check T_PSEUDO_GROUP_PARAM = 3040, T_PSEUDO_EXTERNAL_FILE_COL = 3041, diff --git a/src/observer/mysql/obmp_base.cpp b/src/observer/mysql/obmp_base.cpp index 379fe1fb98..f6b559bb92 100644 --- a/src/observer/mysql/obmp_base.cpp +++ b/src/observer/mysql/obmp_base.cpp @@ -350,7 +350,9 @@ int ObMPBase::init_process_var(sql::ObSqlCtx &ctx, ctx.can_reroute_sql_ = (pkt.can_reroute_pkt() && get_conn()->is_support_proxy_reroute()); } ctx.is_protocol_weak_read_ = pkt.is_weak_read(); - LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_)); + ctx.is_strict_defensive_check_ = GCONF.enable_strict_defensive_check(); + LOG_TRACE("protocol flag info", K(ctx.can_reroute_sql_), K(ctx.is_protocol_weak_read_), + K(ctx.is_strict_defensive_check_)); } return ret; } diff --git a/src/sql/code_generator/ob_dml_cg_service.cpp b/src/sql/code_generator/ob_dml_cg_service.cpp index f717fb089e..4889586e94 100644 --- a/src/sql/code_generator/ob_dml_cg_service.cpp +++ b/src/sql/code_generator/ob_dml_cg_service.cpp @@ -275,7 +275,8 @@ int ObDmlCgService::generate_delete_ctdef(ObLogDelUpd &op, ObSEArray new_row; if (OB_FAIL(old_row.assign(index_dml_info.column_old_values_exprs_))) { LOG_WARN("fail to assign delete old row", K(ret)); - } else if (OB_FAIL(generate_dml_base_ctdef(op, index_dml_info, + } else if (OB_FAIL(generate_dml_base_ctdef(op, + index_dml_info, del_ctdef, ObTriggerEvents::get_delete_event(), old_row, @@ -988,7 +989,8 @@ int ObDmlCgService::generate_scan_ctdef(ObLogInsert &op, false))) { LOG_WARN("generate calc exprs failed", K(ret)); } else if (OB_FAIL(cg_.tsc_cg_service_.generate_das_result_output(scan_ctdef.access_column_ids_, - scan_ctdef))) { + scan_ctdef, + nullptr))) { LOG_WARN("generate das result output failed", K(ret)); } } @@ -1626,6 +1628,20 @@ int ObDmlCgService::generate_dml_base_ctdef(ObLogicalOperator &op, } } + if (OB_SUCC(ret) && + op.is_dml_operator() && + OB_NOT_NULL(index_dml_info.trans_info_expr_)) { + ObLogDelUpd &dml_op = static_cast(op); + // Cg is only needed when the current trans_info_expr_ has a producer operator + if (has_exist_in_array(dml_op.get_produced_trans_exprs(), index_dml_info.trans_info_expr_)) { + if (cg_.generate_rt_expr(*index_dml_info.trans_info_expr_, dml_base_ctdef.trans_info_expr_)) { + LOG_WARN("fail to cg trans_info expr", K(ret), KPC(index_dml_info.trans_info_expr_)); + } + } else { + LOG_TRACE("this trans_info_expr not produced", K(ret), K(index_dml_info)); + } + } + if (OB_SUCC(ret) && log_op_def::LOG_INSERT == op.get_type()) { ObLogInsert &log_ins_op = static_cast(op); @@ -1633,7 +1649,6 @@ int ObDmlCgService::generate_dml_base_ctdef(ObLogicalOperator &op, dml_base_ctdef.das_base_ctdef_.is_insert_up_ = true; } } - return ret; } diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 4c54f76166..a1cc180d76 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -346,6 +346,7 @@ int ObStaticEngineCG::check_expr_columnlized(const ObRawExpr *expr) // T_TABLET_AUTOINC_NEXTVAL is the hidden_pk for heap_table // this column is an pseudo column || T_TABLET_AUTOINC_NEXTVAL == expr->get_expr_type() + || T_PSEUDO_ROW_TRANS_INFO_COLUMN == expr->get_expr_type() || expr->is_set_op_expr() || (expr->is_sys_func_expr() && 0 == expr->get_param_count()) // sys func with no param || expr->is_query_ref_expr() @@ -5099,7 +5100,7 @@ int ObStaticEngineCG::generate_spec(ObLogDelete &op, } else { spec.row_desc_.set_part_id_index(partition_expr_idx); } - LOG_TRACE("pdml static cg information", K(ret), K(partition_expr_idx), K(index_dml_info)); + LOG_TRACE("pdml static cg information", K(ret), K(index_dml_info), K(partition_expr_idx)); } return ret; } @@ -5188,7 +5189,7 @@ int ObStaticEngineCG::generate_spec(ObLogUpdate &op, } else { spec.row_desc_.set_part_id_index(partition_expr_idx); } - LOG_TRACE("pdml static cg information", K(ret), K(partition_expr_idx), K(index_dml_info)); + LOG_TRACE("pdml static cg information", K(ret), K(index_dml_info), K(partition_expr_idx)); // table columns exprs in dml need to set IS_COLUMNLIZED flag OZ(mark_expr_self_produced(index_dml_info.column_exprs_)); } diff --git a/src/sql/code_generator/ob_tsc_cg_service.cpp b/src/sql/code_generator/ob_tsc_cg_service.cpp index d591def226..ca8a10e709 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.cpp +++ b/src/sql/code_generator/ob_tsc_cg_service.cpp @@ -224,7 +224,7 @@ int ObTscCgService::generate_table_param(const ObLogTableScan &op, ObDASScanCtDe scan_ctdef.aggregate_column_ids_))) { LOG_WARN("convert agg failed", K(ret), K(*table_schema), K(scan_ctdef.aggregate_column_ids_), K(op.get_index_back())); - } else if (OB_FAIL(generate_das_result_output(tsc_out_cols, scan_ctdef, pd_agg))) { + } else if (OB_FAIL(generate_das_result_output(tsc_out_cols, scan_ctdef, op.get_trans_info_expr(), pd_agg))) { LOG_WARN("failed to init result outputs", K(ret)); } return ret; @@ -232,7 +232,8 @@ int ObTscCgService::generate_table_param(const ObLogTableScan &op, ObDASScanCtDe int ObTscCgService::generate_das_result_output(const ObIArray &output_cids, ObDASScanCtDef &scan_ctdef, - const bool include_agg) + const ObRawExpr *trans_info_expr, + const bool include_agg) { int ret = OB_SUCCESS; ExprFixedArray &access_exprs = scan_ctdef.pd_expr_spec_.access_exprs_; @@ -240,10 +241,11 @@ int ObTscCgService::generate_das_result_output(const ObIArray &output_ int64_t access_column_cnt = scan_ctdef.access_column_ids_.count(); int64_t access_expr_cnt = access_exprs.count(); int64_t agg_expr_cnt = include_agg ? agg_exprs.count() : 0; + int64_t trans_expr_cnt = trans_info_expr == nullptr ? 0 : 1; if (OB_UNLIKELY(access_column_cnt != access_expr_cnt)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("access column count is invalid", K(ret), K(access_column_cnt), K(access_expr_cnt)); - } else if (OB_FAIL(scan_ctdef.result_output_.init(output_cids.count() + agg_expr_cnt))) { + } else if (OB_FAIL(scan_ctdef.result_output_.init(output_cids.count() + agg_expr_cnt + trans_expr_cnt))) { LOG_WARN("init result output failed", K(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < output_cids.count(); ++i) { @@ -264,6 +266,19 @@ int ObTscCgService::generate_das_result_output(const ObIArray &output_ } } } + + // When the lookup occurs, the result_output of the das task + // during index_scan and the main table lookup will have trans_info_expr + if (OB_SUCC(ret) && trans_expr_cnt > 0) { + ObExpr *e = NULL; + if (OB_FAIL(cg_.generate_rt_expr(*trans_info_expr, e))) { + LOG_WARN("fail to generate rt exprt", K(ret), KPC(trans_info_expr)); + } else if (OB_FAIL(scan_ctdef.result_output_.push_back(e))) { + LOG_WARN("fail to push back trans_info expr", K(ret)); + } else { + scan_ctdef.trans_info_expr_ = e; + } + } return ret; } @@ -802,6 +817,18 @@ int ObTscCgService::generate_das_scan_ctdef(const ObLogTableScan &op, LOG_WARN("generate pushdown aggr ctdef failed", K(ret)); } } + + // 3. cg trans_info_expr + if (OB_SUCC(ret)) { + ObRawExpr *trans_info_expr = op.get_trans_info_expr(); + if (OB_NOT_NULL(trans_info_expr)) { + if (OB_FAIL(cg_.generate_rt_expr(*op.get_trans_info_expr(), + scan_ctdef.pd_expr_spec_.trans_info_expr_))) { + LOG_WARN("generate trans info expr failed", K(ret)); + } + } + } + //4. generate batch scan ctdef if (OB_SUCC(ret) && op.use_batch()) { if (OB_FAIL(cg_.generate_rt_expr(*op.get_group_id_expr(), scan_ctdef.group_id_expr_))) { diff --git a/src/sql/code_generator/ob_tsc_cg_service.h b/src/sql/code_generator/ob_tsc_cg_service.h index a19a15406b..9510709cef 100644 --- a/src/sql/code_generator/ob_tsc_cg_service.h +++ b/src/sql/code_generator/ob_tsc_cg_service.h @@ -47,6 +47,7 @@ public: ObDASTableLocMeta &loc_meta); int generate_das_result_output(const common::ObIArray &output_cids, ObDASScanCtDef &scan_ctdef, + const ObRawExpr *trans_info_expr, const bool include_agg = false); private: int generate_access_ctdef(const ObLogTableScan &op, ObDASScanCtDef &scan_ctdef, bool &has_rowscn); diff --git a/src/sql/das/ob_das_delete_op.cpp b/src/sql/das/ob_das_delete_op.cpp index 2872987d1b..6c05727a99 100644 --- a/src/sql/das/ob_das_delete_op.cpp +++ b/src/sql/das/ob_das_delete_op.cpp @@ -138,11 +138,11 @@ int ObDASDeleteOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_mor return ret; } -int ObDASDeleteOp::init_task_info() +int ObDASDeleteOp::init_task_info(uint32_t row_extend_size) { int ret = OB_SUCCESS; if (!write_buffer_.is_inited() - && OB_FAIL(write_buffer_.init(op_alloc_, DAS_ROW_EXTEND_SIZE, tenant_id_, "DASDeleteBuffer"))) { + && OB_FAIL(write_buffer_.init(op_alloc_, row_extend_size, MTL_ID(), "DASDeleteBuffer"))) { LOG_WARN("init delete buffer failed", K(ret)); } return ret; @@ -159,12 +159,18 @@ int ObDASDeleteOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASDeleteOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) +int ObDASDeleteOp::write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { + if (!write_buffer_.is_inited()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("buffer not inited", K(ret)); + } else if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to datum store failed", K(ret), K(row), K(write_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_delete_op.h b/src/sql/das/ob_das_delete_op.h index 4d277341cd..caa7905d11 100644 --- a/src/sql/das/ob_das_delete_op.h +++ b/src/sql/das/ob_das_delete_op.h @@ -31,11 +31,14 @@ public: virtual int release_op() override; virtual int decode_task_result(ObIDASTaskResult *task_result) override; virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; - virtual int init_task_info() override; + virtual int init_task_info(uint32_t row_extend_size) override; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return del_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return del_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); + int write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full); int64_t get_row_cnt() const { return write_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASDelCtDef *del_ctdef) { del_ctdef_ = del_ctdef; } void set_das_rtdef(ObDASDelRtDef *del_rtdef) { del_rtdef_ = del_rtdef; } diff --git a/src/sql/das/ob_das_dml_ctx_define.cpp b/src/sql/das/ob_das_dml_ctx_define.cpp index b055cf4aa8..c66d16fb1a 100644 --- a/src/sql/das/ob_das_dml_ctx_define.cpp +++ b/src/sql/das/ob_das_dml_ctx_define.cpp @@ -380,7 +380,7 @@ int ObDASWriteBuffer::init_dml_shadow_row(int64_t column_cnt, bool strip_lob_loc int ObDASWriteBuffer::try_add_row(const ObIArray &exprs, ObEvalCtx *ctx, const int64_t memory_limit, - DmlRow* &stored_row, + DmlRow *&stored_row, bool &row_added, bool strip_lob_locator) { @@ -442,9 +442,9 @@ OB_INLINE int ObDASWriteBuffer::create_link_buffer(int64_t row_size, DmlRow *&ro char *buf = NULL; DmlRow *dml_row = nullptr; int64_t NODE_HEADER_SIZE = sizeof(LinkNode); - // 注意,这里 ObChunkDatumStore::StoredRow的长度包括两部分 - // 本身存储ObDatum的长度row_size + 拓展的row_extend_size_长度 - int64_t buffer_len = NODE_HEADER_SIZE + row_size + row_extend_size_; + // Note that whether it is a deserialization process or a local add_row, the length of row_size here includes two parts + // Store the length of ObDatum itself row_size + extended row_extend_size_length + int64_t buffer_len = NODE_HEADER_SIZE + row_size; if (OB_ISNULL(buf = reinterpret_cast(das_alloc_->alloc(buffer_len, mem_attr_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc buf failed", K(ret), K(buffer_len)); @@ -476,7 +476,7 @@ OB_INLINE int ObDASWriteBuffer::add_row_to_dlist(const ObIArray &exprs, { int ret = OB_SUCCESS; DmlRow *dml_row = nullptr; - if (OB_FAIL(create_link_buffer(row_size, dml_row))) { + if (OB_FAIL(create_link_buffer(row_size + row_extend_size_, dml_row))) { LOG_WARN("create link buffer failed", K(ret)); } else if (OB_FAIL(DmlRow::build(dml_row, exprs, *ctx, (char *)dml_row, row_size))) { LOG_WARN("build stored row failed", K(ret)); @@ -493,7 +493,7 @@ OB_INLINE int ObDASWriteBuffer::add_row_to_dlist(const ObChunkDatumStore::Shadow int ret = OB_SUCCESS; DmlRow *dml_row = nullptr; const DmlRow *lsr = sr.get_store_row(); - if (OB_FAIL(create_link_buffer(lsr->row_size_, dml_row))) { + if (OB_FAIL(create_link_buffer(lsr->row_size_ + row_extend_size_, dml_row))) { LOG_WARN("create link buffer failed", K(ret)); } else { char *buf = dml_row->payload_; @@ -619,13 +619,15 @@ int ObDASWriteBuffer::dump_data(const ObDASDMLBaseCtDef &das_base_ctdef) const const ObChunkDatumStore::StoredRow *store_row = NULL; int64_t rownum = 0; ObArenaAllocator tmp_alloc; - + ObDatum *trans_info_datum = nullptr; ObDASWriteBuffer::Iterator write_iter_tmp; + ObString trans_info_str; + if (OB_FAIL(const_cast(this)->begin(write_iter_tmp))) { LOG_WARN("get write iter failed", K(ret)); } - while (OB_SUCC(ret) && OB_SUCC(write_iter_tmp.get_next_row(store_row))) { + trans_info_str.reset(); if (OB_ISNULL(store_row)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null", K(ret)); @@ -662,10 +664,20 @@ int ObDASWriteBuffer::dump_data(const ObDASDMLBaseCtDef &das_base_ctdef) const } } + if (OB_SUCC(ret)) { + if (row_extend_size_ > ObDASWriteBuffer::DAS_ROW_DEFAULT_EXTEND_SIZE) { + // It means the payload contain trans_info string + char *buf = static_cast(store_row->get_extra_payload()); + int32_t *str_len = reinterpret_cast(store_row->get_extra_payload()); + int64_t pos = sizeof(int32_t); + trans_info_str.assign(buf + pos, *str_len); + } + } + if (OB_SUCC(ret)) { // do print LOG_INFO("DASWriteBuffer dump", K(rownum), "task_type", das_base_ctdef.op_type_, - KPC(new_row), KPC(old_row), KPC(store_row)); + K(trans_info_str), KPC(new_row), KPC(old_row), KPC(store_row)); rownum++; } } // end while diff --git a/src/sql/das/ob_das_dml_ctx_define.h b/src/sql/das/ob_das_dml_ctx_define.h index 40366da173..de7f253963 100644 --- a/src/sql/das/ob_das_dml_ctx_define.h +++ b/src/sql/das/ob_das_dml_ctx_define.h @@ -386,7 +386,7 @@ public: int try_add_row(const common::ObIArray &exprs, ObEvalCtx *ctx, const int64_t memory_limit, - DmlRow* &stored_row, + DmlRow *&stored_row, bool &row_added, bool strip_lob_locator); int try_add_row(const DmlShadowRow &sr, const int64_t memory_limit, bool &row_added, DmlRow **stored_row = nullptr); @@ -394,6 +394,7 @@ public: int begin(NewRowIterator &it, const common::ObIArray &col_types); int dump_data(const ObDASDMLBaseCtDef &das_base_ctdef) const; + uint32_t get_row_extend_size() { return row_extend_size_; } TO_STRING_KV(K_(mem_attr), "buffer_memory", buffer_list_.mem_used_, @@ -421,6 +422,11 @@ private: int deserialize_buffer_list(const char *buf, const int64_t data_len, int64_t &pos); int get_stored_row_size(const common::ObIArray &exprs, ObEvalCtx &ctx, int64_t &size); int init_dml_shadow_row(int64_t column_cnt, bool strip_lob_locator); +public: + const static uint32_t DAS_ROW_DEFAULT_EXTEND_SIZE = 16; + const static uint32_t DAS_ROW_TRANS_STRING_SIZE = 128; + const static uint32_t DAS_WITH_TRANS_INFO_EXTEND_SIZE = + DAS_ROW_DEFAULT_EXTEND_SIZE + sizeof(int32_t) + DAS_ROW_TRANS_STRING_SIZE; private: static const int64_t DAS_WRITE_ROW_LIST_LEN = 128; private: diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index 6d7c40aa2a..e781150c46 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -340,11 +340,11 @@ int ObDASInsertOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_mor return ret; } -int ObDASInsertOp::init_task_info() +int ObDASInsertOp::init_task_info(uint32_t row_extend_size) { int ret = OB_SUCCESS; if (!insert_buffer_.is_inited() - && OB_FAIL(insert_buffer_.init(op_alloc_, DAS_ROW_EXTEND_SIZE, tenant_id_, "DASInsertBuffer"))) { + && OB_FAIL(insert_buffer_.init(op_alloc_, row_extend_size, MTL_ID(), "DASInsertBuffer"))) { LOG_WARN("init insert buffer failed", K(ret)); } return ret; @@ -361,12 +361,18 @@ int ObDASInsertOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASInsertOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) +int ObDASInsertOp::write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(insert_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { + if (!insert_buffer_.is_inited()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("buffer not inited", K(ret)); + } else if (OB_FAIL(insert_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to insert buffer failed", K(ret), K(row), K(insert_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_insert_op.h b/src/sql/das/ob_das_insert_op.h index a602a4cdfd..a9e12a9d04 100644 --- a/src/sql/das/ob_das_insert_op.h +++ b/src/sql/das/ob_das_insert_op.h @@ -34,11 +34,14 @@ public: virtual int release_op() override; virtual int decode_task_result(ObIDASTaskResult *task_result) override; virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; - virtual int init_task_info() override; + virtual int init_task_info(uint32_t row_extend_size) override; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return ins_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return ins_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); + int write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full); int64_t get_row_cnt() const { return insert_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASInsCtDef *ins_ctdef) { ins_ctdef_ = ins_ctdef; } void set_das_rtdef(ObDASInsRtDef *ins_rtdef) { ins_rtdef_ = ins_rtdef; } diff --git a/src/sql/das/ob_das_lock_op.cpp b/src/sql/das/ob_das_lock_op.cpp index 6953279dbf..9f03a38207 100644 --- a/src/sql/das/ob_das_lock_op.cpp +++ b/src/sql/das/ob_das_lock_op.cpp @@ -110,13 +110,13 @@ int ObDASLockOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, return ret; } -int ObDASLockOp::init_task_info() +int ObDASLockOp::init_task_info(uint32_t row_extend_size) { int ret = OB_SUCCESS; if (!lock_buffer_.is_inited() && OB_FAIL(lock_buffer_.init(CURRENT_CONTEXT->get_allocator(), - DAS_ROW_EXTEND_SIZE, - tenant_id_, + row_extend_size, + MTL_ID(), "DASLockBuffer"))) { LOG_WARN("init lock buffer failed", K(ret)); } @@ -134,12 +134,18 @@ int ObDASLockOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASLockOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) +int ObDASLockOp::write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(lock_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { + if (!lock_buffer_.is_inited()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("buffer not inited", K(ret)); + } else if (OB_FAIL(lock_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to lock buffer failed", K(ret), K(row), K(lock_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_lock_op.h b/src/sql/das/ob_das_lock_op.h index dad51525d5..024738b077 100644 --- a/src/sql/das/ob_das_lock_op.h +++ b/src/sql/das/ob_das_lock_op.h @@ -31,11 +31,14 @@ public: virtual int release_op() override; virtual int decode_task_result(ObIDASTaskResult *task_result) override; virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; - virtual int init_task_info() override; + virtual int init_task_info(uint32_t row_extend_size) override; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return lock_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return lock_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); + int write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full); int64_t get_row_cnt() const { return lock_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASLockCtDef *del_ctdef) { lock_ctdef_ = del_ctdef; } void set_das_rtdef(ObDASLockRtDef *del_rtdef) { lock_rtdef_ = del_rtdef; } diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index 5dcebc0a87..d598a35036 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -530,9 +530,7 @@ int ObDASRef::create_das_task(const ObDASTabletLoc *tablet_loc, task_op->set_tablet_id(tablet_loc->tablet_id_); task_op->set_ls_id(tablet_loc->ls_id_); task_op->set_tablet_loc(tablet_loc); - if (OB_FAIL(task_op->init_task_info())) { - LOG_WARN("init task info failed", K(ret)); - } else if (OB_FAIL(add_aggregated_task(task_op))) { + if (OB_FAIL(add_aggregated_task(task_op))) { LOG_WARN("failed to add aggregated task", KR(ret)); } } diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index 24fba9f9b1..718bd63689 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -55,7 +55,8 @@ OB_SERIALIZE_MEMBER(ObDASScanCtDef, external_file_location_, external_file_access_info_, external_files_, - external_file_format_str_); + external_file_format_str_, + trans_info_expr_); OB_DEF_SERIALIZE(ObDASScanRtDef) { @@ -148,6 +149,8 @@ int ObDASScanRtDef::init_pd_op(ObExecContext &exec_ctx, scan_ctdef.pd_expr_spec_))) { } else if (OB_FAIL(pd_expr_op_.init_pushdown_storage_filter())) { LOG_WARN("init pushdown storage filter failed", K(ret)); + } else if (OB_NOT_NULL(scan_ctdef.trans_info_expr_)) { + //do nothing } } return ret; @@ -173,6 +176,7 @@ ObDASScanOp::~ObDASScanOp() #endif } scan_param_.destroy(); + trans_info_array_.destroy(); } int ObDASScanOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) @@ -438,6 +442,11 @@ void ObDASScanOp::reset_access_datums_ptr() ObEvalInfo &info = (*e)->get_eval_info(*scan_rtdef_->eval_ctx_); info.point_to_frame_ = true; } + FOREACH_CNT(e, get_result_outputs()) { + (*e)->locate_datums_for_update(*scan_rtdef_->eval_ctx_, scan_rtdef_->eval_ctx_->max_batch_size_); + ObEvalInfo &info = (*e)->get_eval_info(*scan_rtdef_->eval_ctx_); + info.point_to_frame_ = true; + } } if (get_lookup_rtdef() != nullptr && get_lookup_rtdef()->p_pd_expr_op_->is_vectorized()) { FOREACH_CNT(e, get_lookup_ctdef()->pd_expr_spec_.access_exprs_) { @@ -586,6 +595,12 @@ int ObDASScanOp::fill_extra_result() return ret; } +int ObDASScanOp::init_task_info(uint32_t row_extend_size) +{ + UNUSED(row_extend_size); + return OB_SUCCESS; +} + int ObDASScanOp::rescan() { int &ret = errcode_; @@ -889,6 +904,9 @@ int ObLocalIndexLookupOp::process_data_table_rowkey() void *buf = nullptr; common::ObArenaAllocator& lookup_alloc = lookup_memctx_->get_arena_allocator(); ObNewRange lookup_range; + if (index_ctdef_->trans_info_expr_ != nullptr) { + rowkey_cnt = rowkey_cnt - 1; + } if (OB_ISNULL(buf = lookup_alloc.alloc(sizeof(ObObj) * rowkey_cnt))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate buffer failed", K(ret), K(rowkey_cnt)); @@ -900,6 +918,9 @@ int ObLocalIndexLookupOp::process_data_table_rowkey() ObExpr *expr = index_ctdef_->result_output_.at(i); if (T_PSEUDO_GROUP_ID == expr->type_) { // do nothing + } else if (T_PSEUDO_ROW_TRANS_INFO_COLUMN == expr->type_) { + // do nothing + ObDatum &col_datum = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); } else { ObDatum &col_datum = expr->locate_expr_datum(*lookup_rtdef_->eval_ctx_); if (OB_FAIL(col_datum.to_obj(tmp_obj, expr->obj_meta_, expr->obj_datum_map_))) { @@ -909,6 +930,23 @@ int ObLocalIndexLookupOp::process_data_table_rowkey() } } } + + if (OB_SUCC(ret) && nullptr != index_ctdef_->trans_info_expr_) { + void *buf = nullptr; + ObDatum *datum_ptr = nullptr; + if (OB_FAIL(build_trans_datum(index_ctdef_->trans_info_expr_, + lookup_rtdef_->eval_ctx_, + lookup_memctx_->get_arena_allocator(), + datum_ptr))) { + + } else if (OB_ISNULL(datum_ptr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (OB_FAIL(trans_info_array_.push_back(datum_ptr))) { + LOG_WARN("fail to push back trans info array", K(ret), KPC(datum_ptr)); + } + } + if (OB_SUCC(ret)) { ObRowkey table_rowkey(obj_ptr, rowkey_cnt); uint64_t ref_table_id = lookup_ctdef_->ref_table_id_; @@ -1056,14 +1094,30 @@ int ObLocalIndexLookupOp::check_lookup_row_cnt() K(ret), K_(lookup_rowkey_cnt), K_(lookup_row_cnt), "index_group_cnt", get_index_group_cnt(), "lookup_group_cnt", get_lookup_group_cnt(), - "scan_range", scan_param_.key_ranges_, "index_table_id", index_ctdef_->ref_table_id_ , "data_table_tablet_id", tablet_id_ , KPC_(tx_desc)); - LOG_ERROR("Fatal Error!!! Catch a defensive error!", - K(ret), KPC_(lookup_ctdef), KPC_(lookup_rtdef)); + if (trans_info_array_.count() == scan_param_.key_ranges_.count()) { + for (int64_t i = 0; i < trans_info_array_.count(); i++) { + LOG_ERROR("dump TableLookup DAS Task trans_info and key_ranges", K(i), + KPC(trans_info_array_.at(i)), K(scan_param_.key_ranges_.at(i))); + } + } else { + for (int64_t i = 0; i < scan_param_.key_ranges_.count(); i++) { + LOG_ERROR("dump TableLookup DAS Task key_ranges", + K(i), K(scan_param_.key_ranges_.at(i))); + } + } } } + + int simulate_error = EVENT_CALL(EventTable::EN_DAS_SIMULATE_DUMP_WRITE_BUFFER); + if (0 != simulate_error) { + for (int64_t i = 0; i < trans_info_array_.count(); i++) { + LOG_INFO("dump TableLookup DAS Task trans info", K(i), KPC(trans_info_array_.at(i))); + } + } + return ret; } @@ -1198,6 +1252,7 @@ int ObLocalIndexLookupOp::reset_lookup_state() int ret = OB_SUCCESS; state_ = INDEX_SCAN; index_end_ = false; + trans_info_array_.reuse(); lookup_rtdef_->stmt_allocator_.set_alloc(index_rtdef_->stmt_allocator_.get_alloc()); // Keep lookup_rtdef_->stmt_allocator_.alloc_ consistent with index_rtdef_->stmt_allocator_.alloc_ // to avoid memory expansion @@ -1225,6 +1280,7 @@ int ObLocalIndexLookupOp::revert_iter() lookup_iter_ = NULL; scan_param_.destroy_schema_guard(); scan_param_.~ObTableScanParam(); + trans_info_array_.destroy(); if (lookup_memctx_ != nullptr) { lookup_memctx_->reset_remain_one_page(); DESTROY_CONTEXT(lookup_memctx_); diff --git a/src/sql/das/ob_das_scan_op.h b/src/sql/das/ob_das_scan_op.h index 01c5c1b112..a2829126f1 100644 --- a/src/sql/das/ob_das_scan_op.h +++ b/src/sql/das/ob_das_scan_op.h @@ -42,7 +42,8 @@ public: external_file_access_info_(alloc), external_file_location_(alloc), external_files_(alloc), - external_file_format_str_(alloc) + external_file_format_str_(alloc), + trans_info_expr_(nullptr) { } //in das scan op, column described with column expr virtual bool has_expr() const override { return true; } @@ -73,7 +74,8 @@ public: K_(is_external_table), K_(external_files), K_(external_file_format_str), - K_(external_file_location)); + K_(external_file_location), + KPC_(trans_info_expr)); common::ObTableID ref_table_id_; UIntFixedArray access_column_ids_; int64_t schema_version_; @@ -91,6 +93,7 @@ public: ObExternalFileFormat::StringData external_file_location_; ExternalFileNameArray external_files_; //for external table scan TODO jim.wjh remove ObExternalFileFormat::StringData external_file_format_str_; + ObExpr *trans_info_expr_; // transaction information pseudo-column }; struct ObDASScanRtDef : ObDASBaseRtDef @@ -170,10 +173,11 @@ public: virtual int release_op() override; storage::ObTableScanParam &get_scan_param() { return scan_param_; } const storage::ObTableScanParam &get_scan_param() const { return scan_param_; } + virtual int decode_task_result(ObIDASTaskResult *task_result) override; virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; virtual int fill_extra_result() override; - virtual int init_task_info() override { return common::OB_SUCCESS; } + virtual int init_task_info(uint32_t row_extend_size) override; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return scan_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return scan_rtdef_; } @@ -200,6 +204,7 @@ public: virtual ObLocalIndexLookupOp *get_lookup_op(); ObExpr *get_group_id_expr() { return scan_ctdef_->group_id_expr_; } bool is_group_scan() { return NULL != scan_ctdef_->group_id_expr_; } + bool is_contain_trans_info() {return NULL != scan_ctdef_->trans_info_expr_; } virtual bool need_all_output() { return false; } virtual int switch_scan_group() { return common::OB_SUCCESS; }; virtual int set_scan_group(int64_t group_id) { UNUSED(group_id); return common::OB_NOT_IMPLEMENT; }; @@ -213,6 +218,8 @@ protected: virtual int do_local_index_lookup(); virtual common::ObNewRowIterator *get_storage_scan_iter(); virtual common::ObNewRowIterator *get_output_result_iter() { return result_; } +public: + ObSEArray trans_info_array_; protected: //对于DASScanOp,本质上是对PartitionService的table_scan()接口的封装, //参数为scan_param,结果为result iterator @@ -394,6 +401,8 @@ protected: common::ObTabletID tablet_id_; share::ObLSID ls_id_; storage::ObTableScanParam scan_param_; + + ObSEArray trans_info_array_; lib::MemoryContext lookup_memctx_; union { uint32_t status_; diff --git a/src/sql/das/ob_das_task.cpp b/src/sql/das/ob_das_task.cpp index ba842778c2..861249a865 100644 --- a/src/sql/das/ob_das_task.cpp +++ b/src/sql/das/ob_das_task.cpp @@ -180,6 +180,7 @@ int ObIDASTaskOp::start_das_task() { int &ret = errcode_; int simulate_error = EVENT_CALL(EventTable::EN_DAS_SIMULATE_OPEN_ERROR); + int need_dump = EVENT_CALL(EventTable::EN_DAS_SIMULATE_DUMP_WRITE_BUFFER); if (OB_UNLIKELY(!is_in_retry() && OB_SUCCESS != simulate_error)) { ret = simulate_error; } else { @@ -190,6 +191,8 @@ int ObIDASTaskOp::start_das_task() //dump das task data to help analysis defensive bug dump_data(); } + } else if (OB_SUCCESS != need_dump) { + dump_data(); } } // no need to advance state here because this function could be called on remote executor. @@ -286,7 +289,8 @@ OB_DEF_DESERIALIZE(ObDASTaskArg) for (int64_t i = 0; OB_SUCC(ret) && i < count; i ++) { OB_UNIS_DECODE(op_type); OZ(das_factory->create_das_task_op(op_type, task_op)); - OZ(task_op->init_task_info()); + // Here you must init first, you need to set the allocator + OZ(task_op->init_task_info(ObDASWriteBuffer::DAS_ROW_DEFAULT_EXTEND_SIZE)); if (OB_SUCC(ret)) { task_ops_.at(i) = task_op; } diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index 7ae93306d1..e2d99e4781 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -139,7 +139,7 @@ public: { return common::OB_NOT_IMPLEMENT; } - virtual int init_task_info() = 0; + virtual int init_task_info(uint32_t row_extend_size) = 0; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) = 0; virtual const ObDASBaseCtDef *get_ctdef() const { return nullptr; } virtual ObDASBaseRtDef *get_rtdef() { return nullptr; } @@ -200,6 +200,7 @@ public: ObIDASTaskResult *get_op_result() const { return op_result_; } void set_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; } + protected: int start_das_task(); int end_das_task(); @@ -241,9 +242,6 @@ protected: ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize ObIDASTaskResult *op_result_; - -public: - const static uint32_t DAS_ROW_EXTEND_SIZE = 16; }; typedef common::ObObjStore DasTaskList; typedef DasTaskList::Iterator DASTaskIter; diff --git a/src/sql/das/ob_das_update_op.cpp b/src/sql/das/ob_das_update_op.cpp index 51a57466cf..1ff157f809 100644 --- a/src/sql/das/ob_das_update_op.cpp +++ b/src/sql/das/ob_das_update_op.cpp @@ -354,11 +354,11 @@ int ObDASUpdateOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_mor return ret; } -int ObDASUpdateOp::init_task_info() +int ObDASUpdateOp::init_task_info(uint32_t row_extend_size) { int ret = OB_SUCCESS; if (!write_buffer_.is_inited() - && OB_FAIL(write_buffer_.init(op_alloc_, DAS_ROW_EXTEND_SIZE, tenant_id_, "DASUpdateBuffer"))) { + && OB_FAIL(write_buffer_.init(op_alloc_, row_extend_size, MTL_ID(), "DASUpdateBuffer"))) { LOG_WARN("init update buffer failed", K(ret)); } return ret; @@ -375,12 +375,18 @@ int ObDASUpdateOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASUpdateOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) +int ObDASUpdateOp::write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { + if (!write_buffer_.is_inited()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("buffer not inited", K(ret)); + } else if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to datum store failed", K(ret), K(row), K(write_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_update_op.h b/src/sql/das/ob_das_update_op.h index 7506305209..55dbbe05f8 100644 --- a/src/sql/das/ob_das_update_op.h +++ b/src/sql/das/ob_das_update_op.h @@ -31,11 +31,14 @@ public: virtual int release_op() override; virtual int decode_task_result(ObIDASTaskResult *task_result) override; virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override; - virtual int init_task_info() override; + virtual int init_task_info(uint32_t row_extend_size) override; virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return upd_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return upd_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); + int write_row(const ExprFixedArray &row, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *&stored_row, + bool &buffer_full); int64_t get_row_cnt() const { return write_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASUpdCtDef *upd_ctdef) { upd_ctdef_ = upd_ctdef; } void set_das_rtdef(ObDASUpdRtDef *upd_rtdef) { upd_rtdef_ = upd_rtdef; } diff --git a/src/sql/das/ob_data_access_service.h b/src/sql/das/ob_data_access_service.h index e2fa589a34..75cba9268d 100644 --- a/src/sql/das/ob_data_access_service.h +++ b/src/sql/das/ob_data_access_service.h @@ -31,8 +31,8 @@ class ObDataAccessService public: ObDataAccessService(); ~ObDataAccessService() = default; - static int mtl_init(ObDataAccessService* &das); - static void mtl_destroy(ObDataAccessService* &das); + static int mtl_init(ObDataAccessService *&das); + static void mtl_destroy(ObDataAccessService *&das); int init(rpc::frame::ObReqTransport *transport, const common::ObAddr &self_addr); //开启DAS Task分区相关的事务控制,并执行task对应的op diff --git a/src/sql/engine/basic/ob_pushdown_filter.cpp b/src/sql/engine/basic/ob_pushdown_filter.cpp index 653912281b..e8eb271fc6 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.cpp +++ b/src/sql/engine/basic/ob_pushdown_filter.cpp @@ -1435,7 +1435,8 @@ ObPushdownExprSpec::ObPushdownExprSpec(ObIAllocator &alloc) pd_storage_filters_(alloc), pd_storage_aggregate_output_(alloc), ext_file_column_exprs_(alloc), - ext_column_convert_exprs_(alloc) + ext_column_convert_exprs_(alloc), + trans_info_expr_(nullptr) { } @@ -1455,7 +1456,8 @@ OB_DEF_SERIALIZE(ObPushdownExprSpec) fake_pd_storage_index_back_filters, //mock a fake filters to compatible with 4.0 pd_storage_aggregate_output_, ext_file_column_exprs_, - ext_column_convert_exprs_); + ext_column_convert_exprs_, + trans_info_expr_); return ret; } @@ -1475,7 +1477,8 @@ OB_DEF_DESERIALIZE(ObPushdownExprSpec) fake_pd_storage_index_back_filters, //mock a fake filters to compatible with 4.0 pd_storage_aggregate_output_, ext_file_column_exprs_, - ext_column_convert_exprs_); + ext_column_convert_exprs_, + trans_info_expr_); return ret; } @@ -1495,7 +1498,8 @@ OB_DEF_SERIALIZE_SIZE(ObPushdownExprSpec) fake_pd_storage_index_back_filters, //mock a fake filters to compatible with 4.0 pd_storage_aggregate_output_, ext_file_column_exprs_, - ext_column_convert_exprs_); + ext_column_convert_exprs_, + trans_info_expr_); return len; } @@ -1525,6 +1529,49 @@ int ObPushdownOperator::init_pushdown_storage_filter() return ret; } +int ObPushdownOperator::reset_trans_info_datum() +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(expr_spec_.trans_info_expr_)) { + if (expr_spec_.trans_info_expr_->is_batch_result()) { + ObDatum *datums = expr_spec_.trans_info_expr_->locate_datums_for_update(eval_ctx_, expr_spec_.max_batch_size_); + for (int64_t i = 0; i < expr_spec_.max_batch_size_; i++) { + datums[i].set_null(); + } + } else { + ObDatum &datum = expr_spec_.trans_info_expr_->locate_datum_for_write(eval_ctx_); + datum.set_null(); + } + } + return ret; +} + +int ObPushdownOperator::write_trans_info_datum(blocksstable::ObDatumRow &out_row) +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(expr_spec_.trans_info_expr_) && + OB_NOT_NULL(out_row.trans_info_)) { + ObDatum &datum = expr_spec_.trans_info_expr_->locate_datum_for_write(eval_ctx_); + char *dst_ptr = const_cast(datum.ptr_); + int64_t pos = 0; + if (OB_ISNULL(dst_ptr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr", K(ret)); + } else if (OB_FAIL(databuff_memcpy(dst_ptr, + ObDASWriteBuffer::DAS_ROW_TRANS_STRING_SIZE, + pos, + strlen(out_row.trans_info_), + out_row.trans_info_))) { + LOG_WARN("fail to copy trans info to datum", K(ret)); + } else { + datum.pack_ = pos; + // out_row.trans_info_ must be reset to nullptr to prevent affecting the next row + out_row.trans_info_ = nullptr; + } + } + return ret; +} + int ObPushdownOperator::clear_datum_eval_flag() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/basic/ob_pushdown_filter.h b/src/sql/engine/basic/ob_pushdown_filter.h index b4e5b04db0..496b7ed9b7 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.h +++ b/src/sql/engine/basic/ob_pushdown_filter.h @@ -40,6 +40,7 @@ struct ObTableAccessContext; namespace blocksstable { struct ObStorageDatum; +struct ObDatumRow; }; namespace sql { @@ -532,7 +533,8 @@ public: K_(ext_column_convert_exprs), K_(max_batch_size), K_(pushdown_filters), - K_(pd_storage_flag)); + K_(pd_storage_flag), + KPC_(trans_info_expr)); int set_calc_exprs(const ExprFixedArray &calc_exprs, int64_t max_batch_size) { @@ -554,6 +556,7 @@ public: // used by external table ExprFixedArray ext_file_column_exprs_; ExprFixedArray ext_column_convert_exprs_; + ObExpr *trans_info_expr_; }; //下压到存储层的表达式执行依赖的op ctx @@ -577,10 +580,13 @@ public: // clear eval flag of all datums within a batch int clear_evaluated_flag(); int deep_copy(const sql::ObExprPtrIArray *exprs, const int64_t batch_idx); + int reset_trans_info_datum(); + int write_trans_info_datum(blocksstable::ObDatumRow &out_row); public: ObPushdownFilterExecutor *pd_storage_filters_; ObEvalCtx &eval_ctx_; const ObPushdownExprSpec &expr_spec_; + // The datum of the trans_info expression that records transaction information }; // filter row for storage callback. diff --git a/src/sql/engine/dml/ob_dml_ctx_define.cpp b/src/sql/engine/dml/ob_dml_ctx_define.cpp index 51cfd8610f..2029b1057a 100644 --- a/src/sql/engine/dml/ob_dml_ctx_define.cpp +++ b/src/sql/engine/dml/ob_dml_ctx_define.cpp @@ -208,7 +208,8 @@ OB_SERIALIZE_MEMBER(ObDMLBaseCtDef, view_check_exprs_, is_primary_index_, is_heap_table_, - has_instead_of_trigger_); + has_instead_of_trigger_, + trans_info_expr_); OB_SERIALIZE_MEMBER(ObMultiInsCtDef, calc_part_id_expr_, diff --git a/src/sql/engine/dml/ob_dml_ctx_define.h b/src/sql/engine/dml/ob_dml_ctx_define.h index 8ac466c34c..755704e306 100644 --- a/src/sql/engine/dml/ob_dml_ctx_define.h +++ b/src/sql/engine/dml/ob_dml_ctx_define.h @@ -421,7 +421,8 @@ public: K_(view_check_exprs), K_(is_primary_index), K_(is_heap_table), - K_(has_instead_of_trigger)); + K_(has_instead_of_trigger), + KPC_(trans_info_expr)); ObDMLOpType dml_type_; ExprFixedArray check_cst_exprs_; @@ -444,6 +445,7 @@ public: bool is_primary_index_; bool is_heap_table_; bool has_instead_of_trigger_; + ObExpr *trans_info_expr_; protected: ObDMLBaseCtDef(common::ObIAllocator &alloc, ObDASDMLBaseCtDef &das_base_ctdef, @@ -461,7 +463,8 @@ protected: view_check_exprs_(alloc), is_primary_index_(false), is_heap_table_(false), - has_instead_of_trigger_(false) + has_instead_of_trigger_(false), + trans_info_expr_(nullptr) { } }; diff --git a/src/sql/engine/dml/ob_dml_service.cpp b/src/sql/engine/dml/ob_dml_service.cpp index 4176b54183..72804e0c9d 100644 --- a/src/sql/engine/dml/ob_dml_service.cpp +++ b/src/sql/engine/dml/ob_dml_service.cpp @@ -862,7 +862,7 @@ int ObDMLService::insert_row(const ObInsCtDef &ins_ctdef, ObInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &stored_row) + ObChunkDatumStore::StoredRow *&stored_row) { int ret = OB_SUCCESS; if (OB_FAIL(check_dml_tablet_validity(dml_rtctx, @@ -887,10 +887,16 @@ int ObDMLService::insert_row(const ObDASInsCtDef &ins_ctdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, const ExprFixedArray &new_row, - ObChunkDatumStore::StoredRow* &stored_row) + ObChunkDatumStore::StoredRow *&stored_row) { int ret = OB_SUCCESS; - ret = write_row_to_das_op(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx, new_row, stored_row); + ret = write_row_to_das_op(ins_ctdef, + ins_rtdef, + tablet_loc, + dml_rtctx, + new_row, + nullptr, + stored_row); return ret; } @@ -898,7 +904,7 @@ int ObDMLService::delete_row(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &stored_row) + ObChunkDatumStore::StoredRow *&stored_row) { int ret = OB_SUCCESS; if (OB_FAIL(check_dml_tablet_validity(dml_rtctx, @@ -912,6 +918,7 @@ int ObDMLService::delete_row(const ObDelCtDef &del_ctdef, tablet_loc, dml_rtctx, del_ctdef.old_row_, + del_ctdef.trans_info_expr_, stored_row))) { LOG_WARN("delete old row from das failed", K(ret)); } @@ -935,6 +942,7 @@ int ObDMLService::lock_row(const ObLockCtDef &lock_ctdef, tablet_loc, dml_rtctx, lock_ctdef.old_row_, + nullptr, stored_row))) { LOG_WARN("lock row to das failed", K(ret)); } @@ -953,6 +961,7 @@ int ObDMLService::lock_row(const ObDASLockCtDef &dlock_ctdef, tablet_loc, das_rtctx, old_row, + nullptr, stored_row); } @@ -961,9 +970,9 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, const ObDASTabletLoc *old_tablet_loc, const ObDASTabletLoc *new_tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &old_row, - ObChunkDatumStore::StoredRow* &new_row, - ObChunkDatumStore::StoredRow* &full_row) + ObChunkDatumStore::StoredRow *&old_row, + ObChunkDatumStore::StoredRow *&new_row, + ObChunkDatumStore::StoredRow *&full_row) { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(dml_rtctx.get_exec_ctx()); @@ -996,6 +1005,7 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, old_tablet_loc, dml_rtctx, upd_ctdef.old_row_, + nullptr, old_row))) { LOG_WARN("write row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else { @@ -1025,6 +1035,7 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, old_tablet_loc, dml_rtctx, upd_ctdef.old_row_, + upd_ctdef.trans_info_expr_, old_row))) { LOG_WARN("delete row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else if (upd_ctdef.is_heap_table_ && @@ -1037,6 +1048,7 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, new_tablet_loc, dml_rtctx, upd_ctdef.new_row_, + nullptr, new_row))) { LOG_WARN("insert row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else { @@ -1050,6 +1062,7 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, old_tablet_loc, dml_rtctx, upd_ctdef.full_row_, + upd_ctdef.trans_info_expr_, full_row))) { LOG_WARN("write row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else { @@ -1072,6 +1085,7 @@ int ObDMLService::update_row(const ObDASUpdCtDef &ctdef, tablet_loc, dml_rtctx, full_row, + nullptr, stored_row); } @@ -1080,13 +1094,14 @@ int ObDMLService::delete_row(const ObDASDelCtDef &das_del_ctdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, const ExprFixedArray &old_row, - ObChunkDatumStore::StoredRow* &stored_row) + ObChunkDatumStore::StoredRow *&stored_row) { return write_row_to_das_op(das_del_ctdef, das_del_rtdef, tablet_loc, das_rtctx, old_row, + nullptr, stored_row); } @@ -1438,6 +1453,31 @@ int ObDMLService::init_ob_rowkey( ObIAllocator &allocator, const int64_t rowkey_ return ret; } +int ObDMLService::add_trans_info_datum(ObExpr *trans_info_expr, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *stored_row) +{ + int ret = OB_SUCCESS; + ObDatum *datum = nullptr; + + if (OB_ISNULL(trans_info_expr) || OB_ISNULL(stored_row)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null pointer", K(ret), K(trans_info_expr), K(stored_row)); + } else if (OB_FAIL(trans_info_expr->eval(eval_ctx, datum))) { + LOG_WARN("failed to evaluate expr datum", K(ret)); + } else if (OB_ISNULL(datum)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null pointer", K(ret)); + } else { + char *buf = static_cast(stored_row->get_extra_payload()); + *static_cast(stored_row->get_extra_payload()) = datum->len_; + int64_t pos = sizeof(int32_t); + MEMCPY(buf + pos, datum->ptr_, datum->len_); + } + + return ret; +} + int ObDMLService::init_das_ins_rtdef_for_update(ObDMLRtCtx &dml_rtctx, const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef) @@ -1523,15 +1563,20 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, const ExprFixedArray &row, - ObChunkDatumStore::StoredRow* &stored_row) + ObExpr *trans_info_expr, + ObChunkDatumStore::StoredRow *&stored_row) { int ret = OB_SUCCESS; bool need_retry = false; + bool is_strict_defensive_check = trans_info_expr == nullptr ? false : true; typedef typename das_reg::ObDASOpTypeTraits::DASCtDef CtDefType; typedef typename das_reg::ObDASOpTypeTraits::DASRtDef RtDefType; typedef typename das_reg::ObDASOpTypeTraits::DASOp OpType; OB_ASSERT(typeid(ctdef) == typeid(CtDefType)); OB_ASSERT(typeid(rtdef) == typeid(RtDefType)); + int64_t extend_size = is_strict_defensive_check ? + ObDASWriteBuffer::DAS_WITH_TRANS_INFO_EXTEND_SIZE : + ObDASWriteBuffer::DAS_ROW_DEFAULT_EXTEND_SIZE; do { bool buffer_full = false; need_retry = false; @@ -1540,6 +1585,8 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, if (OB_UNLIKELY(!dml_rtctx.das_ref_.has_das_op(tablet_loc, dml_op))) { if (OB_FAIL(dml_rtctx.das_ref_.prepare_das_task(tablet_loc, dml_op))) { LOG_WARN("prepare das task failed", K(ret), K(N)); + } else if (OB_FAIL(dml_op->init_task_info(extend_size))) { + LOG_WARN("fail to init das write buff", K(ret), K(extend_size)); } else { dml_op->set_das_ctdef(static_cast(&ctdef)); dml_op->set_das_rtdef(static_cast(&rtdef)); @@ -1559,11 +1606,15 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, if (OB_SUCC(ret)) { if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), stored_row, buffer_full))) { LOG_WARN("insert row to das dml op buffer failed", K(ret), K(ctdef), K(rtdef)); + } else if (!buffer_full && + OB_NOT_NULL(trans_info_expr) && + OB_FAIL(ObDMLService::add_trans_info_datum(trans_info_expr, dml_rtctx.get_eval_ctx(), stored_row))) { + LOG_WARN("fail to add trans info datum", K(ret)); } else if (OB_NOT_NULL(stored_row)) { dml_rtctx.add_cached_row_size(stored_row->row_size_); - LOG_DEBUG("write row to das op", K(ret), K(buffer_full), "op_type", N, - "table_id", ctdef.table_id_, "index_tid", ctdef.index_tid_, - "row", ROWEXPR2STR(dml_rtctx.get_eval_ctx(), row), "row_size", stored_row->row_size_); + LOG_DEBUG("write row to das op", K(ret), K(buffer_full), + "op_type", N, "table_id", ctdef.table_id_, "index_tid", ctdef.index_tid_, + "row", ROWEXPR2STR(dml_rtctx.get_eval_ctx(), row), "row_size", stored_row->row_size_); } } //3. if buffer is full, frozen node, create a new das op to add row @@ -2007,7 +2058,7 @@ bool ObDMLService::is_nested_dup_table(const uint64_t table_id, DASDelCtxList& return ret; } -int ObDMLService::get_nested_dup_table_ctx(const uint64_t table_id, DASDelCtxList& del_ctx_list, SeRowkeyDistCtx* &rowkey_dist_ctx) +int ObDMLService::get_nested_dup_table_ctx(const uint64_t table_id, DASDelCtxList& del_ctx_list, SeRowkeyDistCtx *&rowkey_dist_ctx) { int ret = OB_SUCCESS; bool find = false; diff --git a/src/sql/engine/dml/ob_dml_service.h b/src/sql/engine/dml/ob_dml_service.h index 85de9496f8..6f829c8ea4 100644 --- a/src/sql/engine/dml/ob_dml_service.h +++ b/src/sql/engine/dml/ob_dml_service.h @@ -92,18 +92,18 @@ public: ObInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &stored_row); + ObChunkDatumStore::StoredRow *&stored_row); static int insert_row(const ObDASInsCtDef &ins_ctdef, ObDASInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, const ExprFixedArray &new_row, - ObChunkDatumStore::StoredRow* &stored_row); + ObChunkDatumStore::StoredRow *&stored_row); static int delete_row(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &stored_row); + ObChunkDatumStore::StoredRow *&stored_row); static int update_row(const ObDASUpdCtDef &ctdef, ObDASUpdRtDef &rtdef, const ObDASTabletLoc *tablet_loc, @@ -114,16 +114,16 @@ public: const ObDASTabletLoc *old_tablet_loc, const ObDASTabletLoc *new_tablet_loc, ObDMLRtCtx &dml_rtctx, - ObChunkDatumStore::StoredRow* &old_row, - ObChunkDatumStore::StoredRow* &new_row, - ObChunkDatumStore::StoredRow* &full_row); + ObChunkDatumStore::StoredRow *&old_row, + ObChunkDatumStore::StoredRow *&new_row, + ObChunkDatumStore::StoredRow *&full_row); static int delete_row(const ObDASDelCtDef &ctdef, ObDASDelRtDef &rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, const ExprFixedArray &old_row, - ObChunkDatumStore::StoredRow* &stored_row); + ObChunkDatumStore::StoredRow *&stored_row); static int lock_row(const ObDASLockCtDef &dlock_ctdef, ObDASLockRtDef &dlock_rtdef, @@ -226,11 +226,15 @@ public: static bool is_nested_dup_table(const uint64_t table_id,DASDelCtxList& del_ctx_list); static int get_nested_dup_table_ctx(const uint64_t table_id, DASDelCtxList& del_ctx_list, - SeRowkeyDistCtx* &rowkey_dist_ctx); + SeRowkeyDistCtx *&rowkey_dist_ctx); static int handle_after_row_processing_single(ObDMLModifyRowsList *dml_modify_rows); static int handle_after_row_processing_batch(ObDMLModifyRowsList *dml_modify_rows); static int handle_after_row_processing(bool execute_single_row, ObDMLModifyRowsList *dml_modify_rows); static int init_ob_rowkey( ObIAllocator &allocator, const int64_t rowkey_cnt, ObRowkey &table_rowkey); + static int add_trans_info_datum(ObExpr *trans_info_expr, + ObEvalCtx &eval_ctx, + ObChunkDatumStore::StoredRow *stored_row); + private: template static int write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, @@ -238,7 +242,8 @@ private: const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, const ExprFixedArray &row, - ObChunkDatumStore::StoredRow* &stored_row); + ObExpr *trans_info_expr, // debug info for 4377 defensive check + ObChunkDatumStore::StoredRow *&stored_row); template static const ObDASTableLocMeta *get_table_loc_meta(const T *multi_ctdef); static int check_nested_sql_legality(ObExecContext &ctx, common::ObTableID ref_table_id); @@ -368,7 +373,7 @@ int ObDASIndexDMLAdaptor::write_tablet_with_ignore(DMLIterator & dsr.store_row_ = const_cast(dml_row); if (OB_FAIL(ObDMLService::create_anonymous_savepoint(*tx_desc_, savepoint_no))) { SQL_DAS_LOG(WARN, "create anonymous savepoint failed", K(ret)); - } else if (OB_FAIL(single_row_buffer.init(*das_allocator_, ObIDASTaskOp::DAS_ROW_EXTEND_SIZE, MTL_ID()))) { + } else if (OB_FAIL(single_row_buffer.init(*das_allocator_, ObDASWriteBuffer::DAS_ROW_DEFAULT_EXTEND_SIZE, MTL_ID()))) { SQL_DAS_LOG(WARN, "init single row buffer failed", K(ret)); } else if (OB_FAIL(single_row_buffer.try_add_row(dsr, das::OB_DAS_MAX_PACKET_SIZE, added, &store_row))) { SQL_DAS_LOG(WARN, "try add row to single row buffer failed", K(ret)); diff --git a/src/sql/engine/table/ob_index_lookup_op_impl.cpp b/src/sql/engine/table/ob_index_lookup_op_impl.cpp index a3755fd485..4f19276d6c 100644 --- a/src/sql/engine/table/ob_index_lookup_op_impl.cpp +++ b/src/sql/engine/table/ob_index_lookup_op_impl.cpp @@ -183,5 +183,34 @@ int ObIndexLookupOpImpl::get_next_rows(int64_t &count, int64_t capacity) return ret; } +int ObIndexLookupOpImpl::build_trans_datum(ObExpr *expr, + ObEvalCtx *eval_ctx, + ObIAllocator &alloc, + ObDatum *&datum_ptr) +{ + int ret = OB_SUCCESS; + datum_ptr = nullptr; + if (OB_ISNULL(expr) || OB_ISNULL(eval_ctx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr", K(ret), K(expr), K(eval_ctx)); + } + if (OB_SUCC(ret)) { + void *buf = nullptr; + ObDatum &col_datum = expr->locate_expr_datum(*eval_ctx); + int64_t pos = sizeof(ObDatum); + int64_t len = sizeof(ObDatum) + col_datum.len_; + if (OB_ISNULL(buf = alloc.alloc(len))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate buffer failed", K(ret)); + } else if (FALSE_IT(datum_ptr = new (buf) ObDatum)) { + // do nothing + } else if (OB_FAIL(datum_ptr->deep_copy(col_datum, static_cast(buf), sizeof(ObDatum) + col_datum.len_, pos))) { + LOG_WARN("failed to deep copy datum", K(ret), K(pos), K(len)); + } + } + + return ret; +} + } // end namespace sql } // end namespace oceanbase diff --git a/src/sql/engine/table/ob_index_lookup_op_impl.h b/src/sql/engine/table/ob_index_lookup_op_impl.h index a8faa732c7..992c43b7f3 100644 --- a/src/sql/engine/table/ob_index_lookup_op_impl.h +++ b/src/sql/engine/table/ob_index_lookup_op_impl.h @@ -75,6 +75,7 @@ public: virtual ObEvalCtx & get_eval_ctx() = 0; virtual const ExprFixedArray & get_output_expr() = 0; virtual int switch_index_table_and_rowkey_group_id() { return OB_SUCCESS; } + int build_trans_datum(ObExpr *expr, ObEvalCtx *eval_ctx, ObIAllocator &alloc, ObDatum *&datum_ptr); protected: LookupType lookup_type_; diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index 20789ac61b..27f0e82ca1 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -3159,6 +3159,22 @@ int ObGlobalIndexLookupOpImpl::process_data_table_rowkey() scan_param.is_get_ = true; } } + + if (OB_SUCC(ret) && get_lookup_ctdef()->trans_info_expr_ != nullptr) { + void *buf = nullptr; + ObDatum *datum_ptr = nullptr; + if (OB_FAIL(build_trans_datum(get_lookup_ctdef()->trans_info_expr_, + &(table_scan_op_->get_eval_ctx()), + lookup_memctx_->get_arena_allocator(), + datum_ptr))) { + + } else if (OB_ISNULL(datum_ptr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (OB_FAIL(das_scan_op->trans_info_array_.push_back(datum_ptr))) { + LOG_WARN("fail to push back trans info array", K(ret), KPC(datum_ptr)); + } + } return ret; } @@ -3322,15 +3338,45 @@ int ObGlobalIndexLookupOpImpl::check_lookup_row_cnt() "index_table_id", table_scan_op_->get_tsc_spec().get_ref_table_id(), KPC(my_session->get_tx_desc())); //now to dump lookup das task info + int64_t rownum = 0; for (DASTaskIter task_iter = das_ref_.begin_task_iter(); !task_iter.is_end(); ++task_iter) { ObDASScanOp *das_op = static_cast(*task_iter); - LOG_INFO("dump TableLookup DAS Task range", - "scan_range", das_op->get_scan_param().key_ranges_, - "range_array_pos", das_op->get_scan_param().range_array_pos_, - "tablet_id", das_op->get_tablet_id()); + if (das_op->trans_info_array_.count() == das_op->get_scan_param().key_ranges_.count()) { + for (int64_t i = 0; i < das_op->trans_info_array_.count(); i++) { + rownum++; + ObDatum *datum = das_op->trans_info_array_.at(i); + LOG_ERROR("dump TableLookup DAS Task range and trans_info", + K(rownum), KPC(datum), + K(das_op->get_scan_param().key_ranges_.at(i)), + K(das_op->get_tablet_id())); + } + } else { + for (int64_t i = 0; i < das_op->get_scan_param().key_ranges_.count(); i++) { + rownum++; + LOG_ERROR("dump TableLookup DAS Task range", + K(rownum), + K(das_op->get_scan_param().key_ranges_.at(i)), + K(das_op->get_tablet_id())); + } + } } } } + + int simulate_error = EVENT_CALL(EventTable::EN_DAS_SIMULATE_DUMP_WRITE_BUFFER); + if (0 != simulate_error) { + for (DASTaskIter task_iter = das_ref_.begin_task_iter(); !task_iter.is_end(); ++task_iter) { + ObDASScanOp *das_op = static_cast(*task_iter); + for (int64_t i = 0; i < das_op->trans_info_array_.count(); i++) { + ObDatum *datum = das_op->trans_info_array_.at(i); + LOG_INFO("dump TableLookup DAS Task trans info", K(i), + KPC(das_op->trans_info_array_.at(i)), + K(das_op->get_scan_param().key_ranges_.at(i)), + K(das_op->get_tablet_id())); + } + } + } + return ret; } diff --git a/src/sql/ob_sql_context.cpp b/src/sql/ob_sql_context.cpp index b5e5025900..bcaec580fd 100644 --- a/src/sql/ob_sql_context.cpp +++ b/src/sql/ob_sql_context.cpp @@ -239,6 +239,7 @@ ObSqlCtx::ObSqlCtx() res_map_rule_param_idx_(OB_INVALID_INDEX), res_map_rule_version_(0), is_text_ps_mode_(false), + is_strict_defensive_check_(false), first_plan_hash_(0), is_bulk_(false), reroute_info_(nullptr) @@ -295,6 +296,7 @@ void ObSqlCtx::reset() cur_plan_ = nullptr; is_execute_call_stmt_ = false; is_text_ps_mode_ = false; + is_strict_defensive_check_ = false; is_bulk_ = false; } @@ -310,6 +312,7 @@ void ObSqlCtx::clear() spm_ctx_.bl_key_.reset(); cur_stmt_ = nullptr; is_text_ps_mode_ = false; + is_strict_defensive_check_ = false; } OB_SERIALIZE_MEMBER(ObSqlCtx, stmt_type_); diff --git a/src/sql/ob_sql_context.h b/src/sql/ob_sql_context.h index 7e75dd346a..1400cb3409 100644 --- a/src/sql/ob_sql_context.h +++ b/src/sql/ob_sql_context.h @@ -523,6 +523,7 @@ public: int64_t res_map_rule_param_idx_; uint64_t res_map_rule_version_; bool is_text_ps_mode_; + bool is_strict_defensive_check_; uint64_t first_plan_hash_; common::ObString first_outline_data_; bool is_bulk_; diff --git a/src/sql/optimizer/ob_del_upd_log_plan.cpp b/src/sql/optimizer/ob_del_upd_log_plan.cpp index 0867ec7adf..34a486050d 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.cpp +++ b/src/sql/optimizer/ob_del_upd_log_plan.cpp @@ -811,7 +811,7 @@ int ObDelUpdLogPlan::compute_hash_dist_exprs_for_pdml_insert(ObExchangeInfo &exc } int ObDelUpdLogPlan::replace_assignment_expr_from_dml_info(const IndexDMLInfo &dml_info, - ObRawExpr* &expr) + ObRawExpr *&expr) { int ret = OB_SUCCESS; for (int64_t j = 0; OB_SUCC(ret) && j < dml_info.assignments_.count(); ++j) { @@ -1751,6 +1751,35 @@ int ObDelUpdLogPlan::prepare_table_dml_info_basic(const ObDmlTableInfo& table_in } else { table_dml_info->rowkey_cnt_ = index_schema->get_rowkey_column_num(); table_dml_info->is_primary_index_ = true; + ObExecContext *exec_ctx = get_optimizer_context().get_exec_ctx(); + if (OB_ISNULL(exec_ctx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("exec_cts is null", K(ret)); + } else if (exec_ctx->get_sql_ctx()->is_strict_defensive_check_ && + !(optimizer_context_.get_session_info()->is_inner()) && + (stmt_->is_update_stmt() || stmt_->is_delete_stmt()) && + GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) { + // 1: Is strict defensive check mode + // 2: Not inner_sql + // 3: Now only support delete and update statement + // 4: disable it when upgrade + // Only when the three conditions are met can the defensive_check information be added + TableItem *table_item = nullptr; + ObOpPseudoColumnRawExpr *trans_info_expr = nullptr; + if (OB_ISNULL(table_item = stmt_->get_table_item_by_id(table_info.table_id_))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret), K(table_info.table_id_), KPC(stmt_)); + } else if (OB_FAIL(ObOptimizerUtil::generate_pseudo_trans_info_expr(get_optimizer_context(), + table_item->get_table_name(), + trans_info_expr))) { + LOG_WARN("fail to generate pseudo trans info expr", K(ret), K(table_item->get_table_name())); + } else if (OB_ISNULL(trans_info_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null pointer", K(ret)); + } else { + table_dml_info->trans_info_expr_ = trans_info_expr; + } + } } } if (OB_SUCC(ret) && !has_tg) { @@ -1783,6 +1812,8 @@ int ObDelUpdLogPlan::prepare_table_dml_info_basic(const ObDmlTableInfo& table_in index_dml_info->ref_table_id_ = index_tid[i]; index_dml_info->rowkey_cnt_ = index_schema->get_rowkey_column_num(); index_dml_info->spk_cnt_ = index_schema->get_shadow_rowkey_column_num(); + // Trans_info_expr_ on the main table is recorded in all index_dml_info + index_dml_info->trans_info_expr_ = table_dml_info->trans_info_expr_; ObSchemaObjVersion table_version; table_version.object_id_ = index_tid[i]; table_version.object_type_ = DEPENDENCY_TABLE; diff --git a/src/sql/optimizer/ob_del_upd_log_plan.h b/src/sql/optimizer/ob_del_upd_log_plan.h index a3f995895e..c188c96101 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.h +++ b/src/sql/optimizer/ob_del_upd_log_plan.h @@ -87,7 +87,7 @@ public: ObIArray &rowkey_exprs); int replace_assignment_expr_from_dml_info(const IndexDMLInfo &index_dml_info, - ObRawExpr* &expr); + ObRawExpr *&expr); int candi_allocate_one_pdml_delete(bool is_index_maintenance, bool is_last_dml_op, diff --git a/src/sql/optimizer/ob_log_del_upd.cpp b/src/sql/optimizer/ob_log_del_upd.cpp index b2af5b6a02..4ef0c82251 100644 --- a/src/sql/optimizer/ob_log_del_upd.cpp +++ b/src/sql/optimizer/ob_log_del_upd.cpp @@ -56,6 +56,10 @@ int IndexDMLInfo::deep_copy(ObIRawExprCopier &expr_copier, const IndexDMLInfo &o LOG_WARN("failed to copy exprs", K(ret)); } else if (OB_FAIL(part_ids_.assign(other.part_ids_))) { LOG_WARN("failed to assign part ids", K(ret)); + } else if (OB_NOT_NULL(other.trans_info_expr_)) { + if (OB_FAIL(expr_copier.copy(other.trans_info_expr_, trans_info_expr_))) { + LOG_WARN("failed to trans info exprs", K(ret), KPC(other.trans_info_expr_)); + } } for (int64_t i = 0; OB_SUCC(ret) && i < other.assignments_.count(); ++i) { if (OB_FAIL(assignments_.at(i).deep_copy(expr_copier, @@ -79,6 +83,7 @@ int IndexDMLInfo::assign_basic(const IndexDMLInfo &other) is_primary_index_ = other.is_primary_index_; is_update_unique_key_ = other.is_update_unique_key_; is_update_part_key_ = other.is_update_part_key_; + trans_info_expr_ = other.trans_info_expr_; if (OB_FAIL(column_exprs_.assign(other.column_exprs_))) { LOG_WARN("failed to assign column exprs", K(ret)); } else if (OB_FAIL(column_convert_exprs_.assign(other.column_convert_exprs_))) { @@ -286,7 +291,8 @@ ObLogDelUpd::ObLogDelUpd(ObDelUpdLogPlan &plan) pdml_is_returning_(false), err_log_define_(), need_alloc_part_id_expr_(false), - has_instead_of_trigger_(false) + has_instead_of_trigger_(false), + produced_trans_exprs_() { } @@ -550,6 +556,80 @@ int ObLogDelUpd::find_pdml_part_id_producer(ObLogicalOperator &op, return ret; } +int ObLogDelUpd::find_trans_info_producer() { + int ret = OB_SUCCESS; + for (int64_t i = 0 ; OB_SUCC(ret) && i < index_dml_infos_.count(); i++) { + ObLogicalOperator *producer = NULL; + IndexDMLInfo *index_dml_info = index_dml_infos_.at(i); + if (OB_ISNULL(index_dml_info)) { + ret = OB_ERR_UNEXPECTED; + } else if ((!is_pdml() && !index_dml_info->is_primary_index_)) { + // Don't worry about non-pdml and non-main tables + // Every operator in pdml needs to try to press down once + } else if (OB_ISNULL(index_dml_info->trans_info_expr_)) { + // do nothing + } else if (OB_FAIL(find_trans_info_producer(*this, index_dml_info->table_id_, producer))) { + LOG_WARN("fail to find trans info producer", K(ret), KPC(index_dml_info), K(get_name())); + } else if (NULL == producer) { + // No error can be reported here, + // the producer of the corresponding trans_info expression was not found, ignore these + LOG_TRACE("can not found trans debug info expr producer", K(ret), K(index_dml_info->table_id_)); + } else if (OB_FAIL(add_var_to_array_no_dup(produced_trans_exprs_, + index_dml_info->trans_info_expr_))) { + LOG_WARN("fail to push trans_info_expr_", K(ret)); + } else { + if (producer->get_type() == log_op_def::LOG_TABLE_SCAN) { + if (static_cast(producer)->get_trans_info_expr() == index_dml_info->trans_info_expr_) { + LOG_DEBUG("this expr has find the producer", K(ret)); + } else { + static_cast(producer)-> + set_trans_info_expr(static_cast(index_dml_info->trans_info_expr_)); + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected type of pdml partition id producer", K(ret), K(producer)); + } + } + } + return ret; +} + +int ObLogDelUpd::find_trans_info_producer(ObLogicalOperator &op, + const uint64_t tid, + ObLogicalOperator *&producer) +{ + int ret = OB_SUCCESS; + producer = NULL; + if (op.get_type() == log_op_def::LOG_TABLE_SCAN) { + ObLogTableScan &tsc = static_cast(op); + if (tid == tsc.get_table_id()) { + producer = &op; + } + } + + for (int64_t i = 0; OB_SUCC(ret) && NULL == producer && i < op.get_num_of_child(); i++) { + if (OB_ISNULL(op.get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null child", K(ret)); + } else if (log_op_def::LOG_JOIN == op.get_type()) { + ObLogJoin &join_op = static_cast(op); + if (IS_LEFT_SEMI_ANTI_JOIN(join_op.get_join_type()) && + second_child == i) { + continue; + } else if (IS_RIGHT_SEMI_ANTI_JOIN(join_op.get_join_type()) && + first_child == i) { + continue; + } + if (OB_FAIL(SMART_CALL(find_trans_info_producer(*op.get_child(i), tid, producer)))) { + LOG_WARN("find pdml part id producer failed", K(ret)); + } + } else if (OB_FAIL(SMART_CALL(find_trans_info_producer(*op.get_child(i), tid, producer)))) { + LOG_WARN("find pdml part id producer failed", K(ret)); + } + } + return ret; +} + int ObLogDelUpd::inner_get_op_exprs(ObIArray &all_exprs, bool need_column_expr) { int ret = OB_SUCCESS; @@ -558,6 +638,8 @@ int ObLogDelUpd::inner_get_op_exprs(ObIArray &all_exprs, bool need_c } else if (is_pdml() && need_alloc_part_id_expr_ && OB_FAIL(generate_pdml_partition_id_expr())) { LOG_WARN("failed to allocate partition id expr", K(ret)); + } else if (OB_FAIL(find_trans_info_producer())) { + LOG_WARN("failed to find trasn info producer", K(ret)); } else if (OB_FAIL(generate_rowid_expr_for_trigger())) { LOG_WARN("failed to try add rowid col expr for trigger", K(ret)); } else if (NULL != lock_row_flag_expr_ && OB_FAIL(all_exprs.push_back(lock_row_flag_expr_))) { @@ -566,6 +648,8 @@ int ObLogDelUpd::inner_get_op_exprs(ObIArray &all_exprs, bool need_c LOG_WARN("failed to append exprs", K(ret)); } else if (NULL != pdml_partition_id_expr_ && OB_FAIL(all_exprs.push_back(pdml_partition_id_expr_))) { LOG_WARN("failed to push back exprs", K(ret)); + } else if (OB_FAIL(append_array_no_dup(all_exprs, produced_trans_exprs_))) { + LOG_WARN("failed to push back exprs", K(ret), K(produced_trans_exprs_)); } else if (OB_FAIL(get_table_columns_exprs(get_index_dml_infos(), all_exprs, need_column_expr))) { LOG_WARN("failed to add table columns to ctx", K(ret)); } else if (OB_FAIL(ObLogicalOperator::get_op_exprs(all_exprs))) { @@ -1327,8 +1411,10 @@ int ObLogDelUpd::inner_replace_op_exprs( int ret = OB_SUCCESS; if (OB_FAIL(replace_dml_info_exprs(to_replace_exprs, get_index_dml_infos()))) { LOG_WARN("failed to replace dml info exprs", K(ret)); - } else if(OB_FAIL(replace_exprs_action(to_replace_exprs, view_check_exprs_))) { + } else if (OB_FAIL(replace_exprs_action(to_replace_exprs, view_check_exprs_))) { LOG_WARN("failed to replace view check exprs", K(ret)); + } else if (OB_FAIL(replace_exprs_action(to_replace_exprs, produced_trans_exprs_))) { + LOG_WARN("failed to replace produced trans exprs", K(ret)); } else if (NULL != pdml_partition_id_expr_ && OB_FAIL(replace_expr_action(to_replace_exprs, pdml_partition_id_expr_))) { LOG_WARN("failed to replace pdml partition id expr", K(ret)); diff --git a/src/sql/optimizer/ob_log_del_upd.h b/src/sql/optimizer/ob_log_del_upd.h index bb4f07bc66..6037243b56 100644 --- a/src/sql/optimizer/ob_log_del_upd.h +++ b/src/sql/optimizer/ob_log_del_upd.h @@ -42,6 +42,7 @@ public: new_part_id_expr_(NULL), old_rowid_expr_(NULL), new_rowid_expr_(NULL), + trans_info_expr_(NULL), related_index_ids_() { } @@ -69,6 +70,7 @@ public: new_part_id_expr_ = NULL, old_rowid_expr_ = NULL, new_rowid_expr_ = NULL, + trans_info_expr_ = NULL, related_index_ids_.reset(); } int64_t to_explain_string(char *buf, int64_t buf_len, ExplainType type) const; @@ -157,6 +159,9 @@ public: ObRawExpr *new_part_id_expr_; ObRawExpr *old_rowid_expr_; ObRawExpr *new_rowid_expr_; + // When the defensive check level is set to 2, + // the transaction information of the current row is recorded for 4377 diagnosis + ObRawExpr *trans_info_expr_; // for generated column, the diff between column_exprs_ and column_old_values_exprs_ // is virtual generated column is replaced. common::ObSEArray column_old_values_exprs_; @@ -178,7 +183,9 @@ public: K_(ck_cst_exprs), K_(is_update_unique_key), K_(is_update_part_key), - K_(assignments)); + K_(assignments), + K_(distinct_algo), + K_(related_index_ids)); }; class ObDelUpdLogPlan; @@ -225,6 +232,17 @@ public: { return view_check_exprs_; } + + inline const common::ObIArray &get_produced_trans_exprs() const + { + return produced_trans_exprs_; + } + inline common::ObIArray &get_produced_trans_exprs() + { + return produced_trans_exprs_; + } + + virtual bool is_single_value() const { return false; } virtual uint64_t get_hash(uint64_t seed) const { return seed; } virtual uint64_t hash(uint64_t seed) const override; @@ -295,6 +313,10 @@ public: int get_rowid_version(int64_t &rowid_version); virtual int get_op_exprs(ObIArray &all_exprs) override = 0; int inner_get_op_exprs(ObIArray &all_exprs, bool need_column_expr); + int find_trans_info_producer(); + int find_trans_info_producer(ObLogicalOperator &op, + const uint64_t tid, + ObLogicalOperator *&producer); int get_table_columns_exprs(const ObIArray &index_dml_infos, ObIArray &all_exprs, bool need_column_expr); @@ -397,6 +419,11 @@ protected: // 但是对于非分区表,pdml中的dml是需要分配partition id expr bool need_alloc_part_id_expr_; // pdml计划中,用于判断当前dml 算子是否需要分配partition id expr bool has_instead_of_trigger_; + // Only when trans_info_expr can be pushed down to the corresponding table_scan operator, + // the expression will be added to produced_trans_exprs_ + // When trans_info_expr does not find a producer operator, + // the upper layer dml operator cannot consume the expression + common::ObSEArray produced_trans_exprs_; }; } } diff --git a/src/sql/optimizer/ob_log_table_scan.cpp b/src/sql/optimizer/ob_log_table_scan.cpp index c063063238..60c1d66eb5 100644 --- a/src/sql/optimizer/ob_log_table_scan.cpp +++ b/src/sql/optimizer/ob_log_table_scan.cpp @@ -165,6 +165,10 @@ int ObLogTableScan::get_op_exprs(ObIArray &all_exprs) LOG_WARN("failed to push back expr", K(ret)); } else if (NULL != calc_part_id_expr_ && OB_FAIL(all_exprs.push_back(calc_part_id_expr_))) { LOG_WARN("failed to push back expr", K(ret)); + } else if (OB_FAIL(allocate_lookup_trans_info_expr())) { + LOG_WARN("failed to add lookup trans expr", K(ret)); + } else if (NULL != trans_info_expr_ && OB_FAIL(all_exprs.push_back(trans_info_expr_))) { + LOG_WARN("failed to push back expr", K(ret)); } else if (OB_FAIL(append(all_exprs, access_exprs_))) { LOG_WARN("failed to append exprs", K(ret)); } else if (OB_FAIL(append(all_exprs, pushdown_aggr_exprs_))) { @@ -246,6 +250,10 @@ int ObLogTableScan::check_output_dependance(common::ObIArray &child } else if (use_batch() && nullptr != group_id_expr_ && OB_FAIL(add_var_to_array_no_dup(exprs, group_id_expr_))) { LOG_WARN("failed to push back group id expr", K(ret)); + } else if (index_back_ && + nullptr != trans_info_expr_ && + OB_FAIL(add_var_to_array_no_dup(exprs, trans_info_expr_))) { + LOG_WARN("fail to add lookup trans info expr", K(ret)); } else if (OB_FAIL(dep_checker.check(exprs))) { LOG_WARN("failed to check op_exprs", K(ret)); } else { @@ -701,6 +709,38 @@ int ObLogTableScan::get_mbr_column_exprs(const uint64_t table_id, return ret; } +int ObLogTableScan::allocate_lookup_trans_info_expr() +{ + int ret = OB_SUCCESS; + // Is strict defensive check mode + // Is index_back (contain local lookup and global lookup) + // There is no trans_info_expr on the current table_scan operator + // Satisfy the three conditions, add trans_info_expr for lookup + // The result of Index_scan will contain the transaction information corresponding to each row + // The result of the lookup in the data table will also include the trans_info + // of the current row in the data table, But the trans_info will not be output to the upper operator + ObOptimizerContext *opt_ctx = nullptr; + ObOpPseudoColumnRawExpr *tmp_trans_info_expr = nullptr; + if (OB_ISNULL(get_plan())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (OB_ISNULL(opt_ctx = &(get_plan()->get_optimizer_context()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (index_back_ && + opt_ctx->is_strict_defensive_check() && + nullptr == trans_info_expr_) { + if (OB_FAIL(OB_FAIL(ObOptimizerUtil::generate_pseudo_trans_info_expr(*opt_ctx, + index_name_, + tmp_trans_info_expr)))) { + LOG_WARN("fail to generate pseudo trans info expr", K(ret), K(index_name_)); + } else { + trans_info_expr_ = tmp_trans_info_expr; + } + } + return ret; +} + int ObLogTableScan::generate_necessary_rowkey_and_partkey_exprs() { int ret = OB_SUCCESS; diff --git a/src/sql/optimizer/ob_log_table_scan.h b/src/sql/optimizer/ob_log_table_scan.h index 2ad319b980..9da7a619ef 100644 --- a/src/sql/optimizer/ob_log_table_scan.h +++ b/src/sql/optimizer/ob_log_table_scan.h @@ -76,6 +76,7 @@ public: tablet_id_expr_(NULL), tablet_id_type_(0), calc_part_id_expr_(NULL), + trans_info_expr_(NULL), global_index_back_table_partition_info_(NULL), has_index_scan_filter_(false), has_index_lookup_filter_(false), @@ -419,7 +420,9 @@ public: void set_access_path(AccessPath* path) { access_path_ = path; } inline const AccessPath* get_access_path() const { return access_path_; } void set_tablet_id_expr(ObOpPseudoColumnRawExpr *expr) { tablet_id_expr_ = expr; } + void set_trans_info_expr(ObOpPseudoColumnRawExpr *expr) { trans_info_expr_ = expr; } ObOpPseudoColumnRawExpr *get_tablet_id_expr() const { return tablet_id_expr_; } + ObRawExpr *get_trans_info_expr() const { return trans_info_expr_; } void set_tablet_id_type(int64_t type) { tablet_id_type_ = type; } int64_t get_tablet_id_type() const { return tablet_id_type_; } const common::ObIArray &get_rowkey_exprs() const { return rowkey_exprs_; } @@ -467,6 +470,7 @@ private: // member functions int generate_necessary_rowkey_and_partkey_exprs(); int add_mapping_columns_for_vt(ObIArray &access_exprs); int get_mbr_column_exprs(const uint64_t table_id, ObIArray &mbr_exprs); + int allocate_lookup_trans_info_expr(); protected: // memeber variables // basic info uint64_t table_id_; //table id or alias table id @@ -564,6 +568,7 @@ protected: // memeber variables // 0 for tablet id, 1 for logical part id, 2 for logical subpart id int64_t tablet_id_type_; ObRawExpr *calc_part_id_expr_; + ObRawExpr *trans_info_expr_; // begin for global index lookup ObTablePartitionInfo *global_index_back_table_partition_info_; diff --git a/src/sql/optimizer/ob_optimizer_context.h b/src/sql/optimizer/ob_optimizer_context.h index c517c72ffb..e31ce6e667 100644 --- a/src/sql/optimizer/ob_optimizer_context.h +++ b/src/sql/optimizer/ob_optimizer_context.h @@ -254,11 +254,18 @@ ObOptimizerContext(ObSQLSessionInfo *session_info, inline void set_parallel_rule(PXParallelRule rule) { px_parallel_rule_ = rule; } const PXParallelRule& get_parallel_rule() const { return px_parallel_rule_; } inline bool is_batched_multi_stmt() { + bool bret = false; if (NULL != exec_ctx_ && NULL != exec_ctx_->get_sql_ctx()) { - return exec_ctx_->get_sql_ctx()->multi_stmt_item_.is_batched_multi_stmt(); - } else { - return false; + bret = exec_ctx_->get_sql_ctx()->multi_stmt_item_.is_batched_multi_stmt(); } + return bret; + } + inline bool is_strict_defensive_check() { + bool bret = false; + if (NULL != exec_ctx_ && NULL != exec_ctx_->get_sql_ctx()) { + bret = exec_ctx_->get_sql_ctx()->is_strict_defensive_check_; + } + return bret; } void disable_batch_rpc() { enable_batch_opt_ = 0; } bool enable_batch_rpc() diff --git a/src/sql/optimizer/ob_optimizer_util.cpp b/src/sql/optimizer/ob_optimizer_util.cpp index 911a9503b8..e6eeef4a66 100644 --- a/src/sql/optimizer/ob_optimizer_util.cpp +++ b/src/sql/optimizer/ob_optimizer_util.cpp @@ -8636,3 +8636,36 @@ int ObOptimizerUtil::truncate_string_for_opt_stats(const ObObj *old_obj, LOG_TRACE("Succeed to truncate string obj for opt stats", KPC(old_obj), KPC(new_obj), K(is_truncated)); return ret; } + +int ObOptimizerUtil::generate_pseudo_trans_info_expr(ObOptimizerContext &opt_ctx, + const common::ObString &table_name, + ObOpPseudoColumnRawExpr *&expr) +{ + int ret = OB_SUCCESS; + ObExprResType res_type; + char *pseudo_name = nullptr; + res_type.set_type(ObVarcharType); + res_type.set_collation_type(CS_TYPE_BINARY); + res_type.set_accuracy(ObAccuracy::MAX_ACCURACY[ObVarcharType]); + const char *name = ".TRANS_DEBUG_INFO"; + int64_t buf_len = table_name.length()+ STRLEN(name) + 1; + int64_t pos = 0; + if (OB_ISNULL(pseudo_name = + static_cast(opt_ctx.get_allocator().alloc(buf_len)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate name buffer failed", K(ret), K(buf_len)); + } else if (OB_FAIL(databuff_printf(pseudo_name, buf_len, pos, "%.*s", table_name.length(), table_name.ptr()))) { + LOG_WARN("databuff print column name failed", K(ret)); + } else if (OB_FAIL(databuff_printf(pseudo_name, buf_len, pos, "%.*s", static_cast(STRLEN(name)), name))) { + LOG_WARN("databuff print column name failed", K(ret)); + } else if (OB_FAIL(ObRawExprUtils::build_op_pseudo_column_expr(opt_ctx.get_expr_factory(), + T_PSEUDO_ROW_TRANS_INFO_COLUMN, + pseudo_name, + res_type, + expr))) { + LOG_WARN("build operator pseudo column failed", K(ret)); + } else if (OB_FAIL(expr->formalize(opt_ctx.get_session_info()))) { + LOG_WARN("expr formalize failed", K(ret)); + } + return ret; +} diff --git a/src/sql/optimizer/ob_optimizer_util.h b/src/sql/optimizer/ob_optimizer_util.h index bc4ce8614e..86bf5f0a15 100644 --- a/src/sql/optimizer/ob_optimizer_util.h +++ b/src/sql/optimizer/ob_optimizer_util.h @@ -1439,6 +1439,10 @@ public: ObIAllocator &alloc, ObObj *&new_obj); + static int generate_pseudo_trans_info_expr(ObOptimizerContext &opt_ctx, + const common::ObString &table_name, + ObOpPseudoColumnRawExpr *&expr); + private: //disallow construct ObOptimizerUtil(); diff --git a/src/sql/plan_cache/ob_plan_cache_util.cpp b/src/sql/plan_cache/ob_plan_cache_util.cpp index 8e35881b1b..55dfe82987 100644 --- a/src/sql/plan_cache/ob_plan_cache_util.cpp +++ b/src/sql/plan_cache/ob_plan_cache_util.cpp @@ -395,6 +395,7 @@ int ObConfigInfoInPC::load_influence_plan_config() // here to add value of configs that can influence execution plan. enable_px_ordered_coord_ = GCONF._enable_px_ordered_coord; enable_newsort_ = GCONF._enable_newsort; + is_strict_defensive_check_ = GCONF.enable_strict_defensive_check(); is_enable_px_fast_reclaim_ = GCONF._enable_px_fast_reclaim; // For Tenant configs @@ -440,6 +441,9 @@ int ObConfigInfoInPC::serialize_configs(char *buf, int buf_len, int64_t &pos) } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%d,", px_join_skew_handling_))) { SQL_PC_LOG(WARN, "failed to databuff_printf", K(ret), K(px_join_skew_handling_)); + } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, + "%d,", is_strict_defensive_check_))) { + SQL_PC_LOG(WARN, "failed to databuff_printf", K(ret), K(is_strict_defensive_check_)); } else if (OB_FAIL(databuff_printf(buf, buf_len, pos, "%d,", px_join_skew_minfreq_))) { SQL_PC_LOG(WARN, "failed to databuff_printf", K(ret), K(px_join_skew_minfreq_)); diff --git a/src/sql/plan_cache/ob_plan_cache_util.h b/src/sql/plan_cache/ob_plan_cache_util.h index f2cbe51cb2..b93652b736 100644 --- a/src/sql/plan_cache/ob_plan_cache_util.h +++ b/src/sql/plan_cache/ob_plan_cache_util.h @@ -974,6 +974,7 @@ public: bloom_filter_enabled_(true), enable_newsort_(true), px_join_skew_handling_(true), + is_strict_defensive_check_(true), px_join_skew_minfreq_(30), min_cluster_version_(0), is_enable_px_fast_reclaim_(false), @@ -1014,6 +1015,7 @@ public: bool bloom_filter_enabled_; bool enable_newsort_; bool px_join_skew_handling_; + bool is_strict_defensive_check_; int8_t px_join_skew_minfreq_; uint64_t min_cluster_version_; bool is_enable_px_fast_reclaim_; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 7458491cc5..13b1ecd89d 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -468,6 +468,7 @@ ob_set_subtarget(ob_storage compaction ob_set_subtarget(ob_storage concurrency_control concurrency_control/ob_multi_version_garbage_collector.cpp + concurrency_control/ob_trans_stat_row.cpp ) ob_set_subtarget(ob_storage memtable diff --git a/src/storage/access/ob_aggregated_store.cpp b/src/storage/access/ob_aggregated_store.cpp index d19e7360d1..fd926b6f5a 100644 --- a/src/storage/access/ob_aggregated_store.cpp +++ b/src/storage/access/ob_aggregated_store.cpp @@ -569,7 +569,7 @@ int ObAggRow::init(const ObTableAccessParam ¶m, const int64_t batch_size) OB_ISNULL(cell = new(buf) ObMinMaxAggCell(is_min, col_idx, col_param, expr, allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to alloc memroy for agg cell", K(ret), K(i)); - } else if (OB_FAIL(static_cast(cell)->init(param.op_, col_expr, batch_size))) { + } else if (OB_FAIL(static_cast(cell)->init(param.get_op(), col_expr, batch_size))) { LOG_WARN("Failed to init ObMinMaxAggCell", K(ret), KPC(cell)); } else if (OB_FAIL(agg_cells_.push_back(cell))) { LOG_WARN("Failed to push back agg cell", K(ret), K(i)); diff --git a/src/storage/access/ob_multiple_merge.cpp b/src/storage/access/ob_multiple_merge.cpp index 02b51e84a9..4f85c12bcf 100644 --- a/src/storage/access/ob_multiple_merge.cpp +++ b/src/storage/access/ob_multiple_merge.cpp @@ -112,10 +112,10 @@ int ObMultipleMerge::init( STORAGE_LOG(WARN, "Failed to init datum row", K(ret)); } else if (OB_FAIL(nop_pos_.init(*context.stmt_allocator_, param.get_max_out_col_cnt()))) { STORAGE_LOG(WARN, "Fail to init nop pos, ", K(ret)); - } else if (NULL != param.op_ && (NULL == param.output_exprs_ || NULL == param.row2exprs_projector_ + } else if (NULL != param.get_op() && (NULL == param.output_exprs_ || NULL == param.row2exprs_projector_ || OB_FAIL(param.row2exprs_projector_->init( *param.output_exprs_, - *param.op_, + *param.get_op(), *param.iter_param_.out_cols_project_)))) { if (OB_SUCCESS == ret) { ret = OB_ERR_UNEXPECTED; @@ -359,6 +359,9 @@ int ObMultipleMerge::get_next_row(ObDatumRow *&row) LOG_WARN("Failed to fill iter idx", K(ret), KPC(access_param_), K(unprojected_row_)); } else if (OB_FAIL(process_fuse_row(not_using_static_engine, unprojected_row_, row))) { LOG_WARN("get row from fuse failed", K(ret), K(unprojected_row_)); + } else if (OB_NOT_NULL(access_param_->get_op()) && + OB_FAIL(access_param_->get_op()->write_trans_info_datum(unprojected_row_))) { + LOG_WARN("write trans_info to expr datum failed", K(ret), K(unprojected_row_)); } else if (nullptr != row) { break; } @@ -400,11 +403,11 @@ int ObMultipleMerge::get_next_rows(int64_t &count, int64_t capacity) } else if (ObQRIterType::T_SINGLE_GET == get_type()) { ObDatumRow *row = nullptr; sql::ObEvalCtx *eval_ctx = nullptr; - if (OB_ISNULL(access_param_->op_)) { + if (OB_ISNULL(access_param_->get_op())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected access param: null op", K(ret)); - } else if (FALSE_IT(eval_ctx = &access_param_->op_->get_eval_ctx())) { - } else if (FALSE_IT(eval_ctx->reuse(min(capacity, access_param_->op_->get_batch_size())))) { + } else if (FALSE_IT(eval_ctx = &access_param_->get_op()->get_eval_ctx())) { + } else if (FALSE_IT(eval_ctx->reuse(min(capacity, access_param_->get_op()->get_batch_size())))) { } else if (OB_FAIL(get_next_row(row))) { if (OB_ITER_END != ret) { LOG_WARN("failed to get single row", K(ret)); @@ -430,13 +433,13 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity) if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("The ObMultipleMerge has not been inited, ", K(ret)); - } else if (OB_UNLIKELY(nullptr == access_param_->op_ - || !access_param_->op_->is_vectorized() + } else if (OB_UNLIKELY(nullptr == access_param_->get_op() + || !access_param_->get_op()->is_vectorized() || !access_param_->iter_param_.vectorized_enabled_ || nullptr == block_row_store_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpect pushdown operator in vectorized", K(ret), K(access_param_->iter_param_.pd_storage_flag_), - K(access_param_->op_), K(access_param_->op_->is_vectorized()), K(block_row_store_), + K(access_param_->get_op()), K(access_param_->get_op()->is_vectorized()), K(block_row_store_), K(access_param_->iter_param_.vectorized_enabled_)); } @@ -445,7 +448,7 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity) LOG_WARN("fail to refresh table on demand", K(ret)); } else { ObVectorStore *vector_store = reinterpret_cast(block_row_store_); - int64_t batch_size = min(capacity, access_param_->op_->get_batch_size()); + int64_t batch_size = min(capacity, access_param_->get_op()->get_batch_size()); vector_store->reuse_capacity(batch_size); if (need_padding_) { padding_allocator_.reuse(); @@ -500,8 +503,10 @@ int ObMultipleMerge::get_next_normal_rows(int64_t &count, int64_t capacity) } else if (OB_FAIL(process_fuse_row(nullptr == access_param_->output_exprs_, unprojected_row_, out_row))) { LOG_WARN("get row from fuse failed", K(ret), K(unprojected_row_)); } else if (nullptr != out_row) { - if (OB_FAIL(access_param_->op_->deep_copy(access_param_->output_exprs_, vector_store->get_row_count()))) { + if (OB_FAIL(access_param_->get_op()->deep_copy(access_param_->output_exprs_, vector_store->get_row_count()))) { LOG_WARN("fail to deep copy row", K(ret)); + } else if (OB_FAIL(access_param_->get_op()->write_trans_info_datum(unprojected_row_))) { + LOG_WARN("write trans_info to expr datum failed", K(ret), K(unprojected_row_)); } else if (OB_FAIL(vector_store->fill_row(unprojected_row_))) { LOG_WARN("fail to aggregate row", K(ret)); } @@ -539,7 +544,7 @@ int ObMultipleMerge::get_next_aggregate_row(ObDatumRow *&row) } else if (OB_UNLIKELY(nullptr == block_row_store_ || access_param_->iter_param_.need_fill_group_idx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected aggregate pushdown status", K(ret), - K(access_param_->op_), K(block_row_store_), K(access_param_->iter_param_.need_fill_group_idx())); + K(access_param_->get_op()), K(block_row_store_), K(access_param_->iter_param_.need_fill_group_idx())); } else if (OB_UNLIKELY(nullptr != access_ctx_->range_array_pos_ && access_ctx_->range_array_pos_->count() > 1)) { ret = OB_ERR_UNEXPECTED; @@ -551,17 +556,17 @@ int ObMultipleMerge::get_next_aggregate_row(ObDatumRow *&row) } else { ObAggregatedStore *agg_row_store = reinterpret_cast(block_row_store_); agg_row_store->reuse_aggregated_row(); - if (OB_NOT_NULL(access_param_->op_)) { - int64_t batch_size = max(1, access_param_->op_->get_batch_size()); - access_param_->op_->get_eval_ctx().reuse(batch_size); + if (OB_NOT_NULL(access_param_->get_op())) { + int64_t batch_size = max(1, access_param_->get_op()->get_batch_size()); + access_param_->get_op()->get_eval_ctx().reuse(batch_size); } reuse_lob_locator(); while (OB_SUCC(ret) && !agg_row_store->is_end()) { bool can_batch = false; // clear evaluated flag for every row // all rows will be touched in this loop - if (NULL != access_param_->op_) { - access_param_->op_->clear_datum_eval_flag(); + if (NULL != access_param_->get_op()) { + access_param_->get_op()->clear_datum_eval_flag(); } if (OB_FAIL(refresh_table_on_demand())) { LOG_WARN("fail to refresh table on demand", K(ret)); @@ -710,8 +715,8 @@ int ObMultipleMerge::process_fuse_row(const bool not_using_static_engine, } else if (nullptr != access_ctx_->limit_param_ && access_ctx_->out_cnt_ < access_ctx_->limit_param_->offset_) { // clear evaluated flag for next row. - if (NULL != access_param_->op_) { - access_param_->op_->clear_datum_eval_flag(); + if (NULL != access_param_->get_op()) { + access_param_->get_op()->clear_datum_eval_flag(); } ++access_ctx_->out_cnt_; } else { @@ -865,8 +870,8 @@ int ObMultipleMerge::alloc_row_store(ObTableAccessContext &context, const ObTabl LOG_WARN("fail to alloc aggregated store", K(ret)); } else { block_row_store_ = new (buf) ObAggregatedStore( - param.iter_param_.vectorized_enabled_ ? param.op_->get_batch_size() : ObAggregatedStore::BATCH_SIZE, - param.op_->get_eval_ctx(), + param.iter_param_.vectorized_enabled_ ? param.get_op()->get_batch_size() : ObAggregatedStore::BATCH_SIZE, + param.get_op()->get_eval_ctx(), context); } } else if (ObQRIterType::T_SINGLE_GET != get_type()) { @@ -876,8 +881,8 @@ int ObMultipleMerge::alloc_row_store(ObTableAccessContext &context, const ObTabl LOG_WARN("fail to alloc vector store", K(ret)); } else { block_row_store_ = new (buf) ObVectorStore( - param.op_->get_batch_size(), - param.op_->get_eval_ctx(), + param.get_op()->get_batch_size(), + param.get_op()->get_eval_ctx(), context); } } else if (param.iter_param_.enable_pd_blockscan()) { @@ -940,9 +945,9 @@ int ObMultipleMerge::fuse_default(ObDatumRow &row) if (!def_cell.is_nop_value()) { sql::ObExpr *expr = access_param_->output_exprs_->at(pos); sql::ObDatum &datum = expr->locate_datum_for_write( - access_param_->op_->get_eval_ctx()); + access_param_->get_op()->get_eval_ctx()); sql::ObEvalInfo &eval_info = expr->get_eval_info( - access_param_->op_->get_eval_ctx()); + access_param_->get_op()->get_eval_ctx()); if (OB_FAIL(datum.from_obj(def_cell, expr->obj_datum_map_))) { LOG_WARN("convert obj to datum failed", K(ret)); } else if (is_lob_storage(def_cell.get_type()) && @@ -1009,10 +1014,10 @@ int ObMultipleMerge::pad_columns(ObDatumRow &row) // do nothing for virtual column with no data read, // datum is filled && padded in fill_virtual_columns(). } else { - if (OB_ISNULL(access_param_->op_)) { + if (OB_ISNULL(access_param_->get_op())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected access param: null op", K(ret)); - } else if (OB_FAIL(pad_column(col_param->get_accuracy(), access_param_->op_->get_eval_ctx(), *e))) { + } else if (OB_FAIL(pad_column(col_param->get_accuracy(), access_param_->get_op()->get_eval_ctx(), *e))) { LOG_WARN("pad column failed", K(ret), K(*col_param)); } } @@ -1024,7 +1029,7 @@ int ObMultipleMerge::pad_columns(ObDatumRow &row) int ObMultipleMerge::fill_virtual_columns(ObDatumRow &row) { int ret = OB_SUCCESS; - if (NULL != access_param_->op_) { + if (NULL != access_param_->get_op()) { for (int64_t i = 0; OB_SUCC(ret) && i < nop_pos_.count(); i++) { int64_t pos = 0; if (OB_FAIL(nop_pos_.get_nop_pos(i, pos))) { @@ -1034,16 +1039,16 @@ int ObMultipleMerge::fill_virtual_columns(ObDatumRow &row) // table scan access exprs is column reference expr, only virtual column has argument. if (expr->arg_cnt_ > 0) { ObDatum *datum = NULL; - access_param_->op_->clear_datum_eval_flag(); - if (OB_FAIL(expr->eval(access_param_->op_->get_eval_ctx(), datum))) { + access_param_->get_op()->clear_datum_eval_flag(); + if (OB_FAIL(expr->eval(access_param_->get_op()->get_eval_ctx(), datum))) { LOG_WARN("evaluate virtual column failed", K(ret)); } else if (need_padding_ && expr->obj_meta_.is_fixed_len_char_type()) { const int64_t col_idx = access_param_->iter_param_.out_cols_project_->at(pos); - if (OB_ISNULL(access_param_->op_)) { + if (OB_ISNULL(access_param_->get_op())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected access param: null op", K(ret)); } else if (OB_FAIL(pad_column(access_param_->iter_param_.get_col_params()->at(col_idx)->get_accuracy(), - access_param_->op_->get_eval_ctx(), + access_param_->get_op()->get_eval_ctx(), *expr))) { LOG_WARN("pad column failed", K(ret)); } @@ -1069,7 +1074,7 @@ int ObMultipleMerge::check_filtered(const ObDatumRow &row, bool &filtered) && !access_param_->op_filters_->empty()) { // Execute filter in sql static typing engine. // %row is already projected to output expressions for main table scan. - if (OB_FAIL(access_param_->op_->filter_row_outside(*access_param_->op_filters_, filtered))) { + if (OB_FAIL(access_param_->get_op()->filter_row_outside(*access_param_->op_filters_, filtered))) { LOG_WARN("filter row failed", K(ret)); } } diff --git a/src/storage/access/ob_single_merge.cpp b/src/storage/access/ob_single_merge.cpp index 7773927521..0557941c6e 100644 --- a/src/storage/access/ob_single_merge.cpp +++ b/src/storage/access/ob_single_merge.cpp @@ -317,6 +317,7 @@ int ObSingleMerge::inner_get_next_row(ObDatumRow &row) } else { row.row_flag_ = full_row_.row_flag_; row.group_idx_ = rowkey_->get_group_idx(); + row.trans_info_ = full_row_.trans_info_; STORAGE_LOG(TRACE, "succ to do single get", K(full_row_), K(row), K(have_uncommited_row), K(cols_index), K(access_param_->iter_param_.table_id_)); } if (OB_FAIL(ret)) { diff --git a/src/storage/access/ob_table_access_param.cpp b/src/storage/access/ob_table_access_param.cpp index ea9257c39f..d109a98e4b 100644 --- a/src/storage/access/ob_table_access_param.cpp +++ b/src/storage/access/ob_table_access_param.cpp @@ -43,6 +43,7 @@ ObTableIterParam::ObTableIterParam() is_for_foreign_check_(false), limit_prefetch_(false), ss_rowkey_prefix_cnt_(0), + op_(nullptr), pd_storage_flag_(0) { } @@ -72,6 +73,7 @@ void ObTableIterParam::reset() has_lob_column_out_ = false; is_for_foreign_check_ = false; limit_prefetch_ = false; + op_ = nullptr; } bool ObTableIterParam::is_valid() const @@ -122,6 +124,15 @@ bool ObTableIterParam::enable_fuse_row_cache(const ObQueryFlag &query_flag) cons return bret; } +bool ObTableIterParam::need_trans_info() const +{ + bool bret = false; + if (OB_NOT_NULL(op_) && OB_NOT_NULL(op_->expr_spec_.trans_info_expr_)) { + bret = true; + } + return bret; +} + DEF_TO_STRING(ObTableIterParam) { int64_t pos = 0; @@ -152,7 +163,6 @@ ObTableAccessParam::ObTableAccessParam() projector_size_(0), output_exprs_(NULL), aggregate_exprs_(NULL), - op_(NULL), op_filters_(NULL), row2exprs_projector_(NULL), output_sel_mask_(NULL), @@ -171,7 +181,6 @@ void ObTableAccessParam::reset() padding_cols_ = NULL; projector_size_ = 0; output_exprs_ = NULL; - op_ = NULL; op_filters_ = NULL; row2exprs_projector_ = NULL; output_sel_mask_ = NULL; @@ -204,7 +213,7 @@ int ObTableAccessParam::init( output_exprs_ = scan_param.output_exprs_; aggregate_exprs_ = scan_param.aggregate_exprs_; - op_ = scan_param.op_; + iter_param_.op_ = scan_param.op_; op_filters_ = scan_param.op_filters_; row2exprs_projector_ = scan_param.row2exprs_projector_; output_sel_mask_ = &table_param.get_output_sel_mask(); @@ -228,7 +237,7 @@ int ObTableAccessParam::init( } iter_param_.has_virtual_columns_ = table_param.has_virtual_column(); // vectorize requires blockscan is enabled(_pushdown_storage_level > 0) - iter_param_.vectorized_enabled_ = nullptr != op_ && op_->is_vectorized(); + iter_param_.vectorized_enabled_ = nullptr != get_op() && get_op()->is_vectorized(); iter_param_.limit_prefetch_ = (nullptr == op_filters_ || op_filters_->empty()); if (OB_FAIL(iter_param_.check_read_info_valid())) { @@ -338,7 +347,6 @@ DEF_TO_STRING(ObTableAccessParam) KPC_(padding_cols), K_(projector_size), KPC_(output_exprs), - KP_(op), KP_(op_filters), KP_(row2exprs_projector), KPC_(output_sel_mask), diff --git a/src/storage/access/ob_table_access_param.h b/src/storage/access/ob_table_access_param.h index b525ae888a..7e0a8b13ad 100644 --- a/src/storage/access/ob_table_access_param.h +++ b/src/storage/access/ob_table_access_param.h @@ -143,6 +143,7 @@ public: { use_iter_pool_ = 1; } OB_INLINE bool has_lob_column_out() const { return has_lob_column_out_; } + bool need_trans_info() const; DECLARE_TO_STRING; public: uint64_t table_id_; @@ -167,6 +168,7 @@ public: bool is_for_foreign_check_; bool limit_prefetch_; int64_t ss_rowkey_prefix_cnt_; + sql::ObPushdownOperator *op_; union { struct { int32_t pd_blockscan_:1; @@ -202,7 +204,9 @@ public: // used for index back when query OB_INLINE int64_t get_out_col_cnt() const { return iter_param_.get_out_col_cnt(); } OB_INLINE int64_t get_max_out_col_cnt() const { return iter_param_.get_max_out_col_cnt(); } - + // get push down operator + OB_INLINE sql::ObPushdownOperator *get_op() { return iter_param_.op_; } + OB_INLINE sql::ObPushdownOperator *get_op() const { return iter_param_.op_; } public: DECLARE_TO_STRING; public: @@ -216,7 +220,6 @@ public: // output for sql static typing engine, NULL for old sql engine scan. const sql::ObExprPtrIArray *output_exprs_; const sql::ObExprPtrIArray *aggregate_exprs_; - sql::ObPushdownOperator *op_; const sql::ObExprPtrIArray *op_filters_; ObRow2ExprsProjector *row2exprs_projector_; const common::ObIArray *output_sel_mask_; diff --git a/src/storage/access/ob_table_scan_iterator.cpp b/src/storage/access/ob_table_scan_iterator.cpp index 6608691339..7fced76586 100644 --- a/src/storage/access/ob_table_scan_iterator.cpp +++ b/src/storage/access/ob_table_scan_iterator.cpp @@ -509,8 +509,10 @@ int ObTableScanIterator::get_next_row(ObNewRow *&row) } else if (OB_ISNULL(main_iter_)) { ret = OB_ITER_END; } else { + ObDatum *trans_info_datums = nullptr; if (scan_param_->op_ != nullptr) { scan_param_->op_->clear_datum_eval_flag(); + scan_param_->op_->reset_trans_info_datum(); } if (OB_FAIL(main_iter_->get_next_row(store_row))) { if (OB_ITER_END != ret) { @@ -540,9 +542,12 @@ int ObTableScanIterator::get_next_rows(int64_t &count, int64_t capacity) } else if (OB_ISNULL(main_iter_)) { ret = OB_ITER_END; } else { + ObDatum *trans_info_datums = nullptr; if (scan_param_->op_ != nullptr) { scan_param_->op_->clear_datum_eval_flag(); + scan_param_->op_->reset_trans_info_datum(); } + if (OB_FAIL(main_iter_->get_next_rows(count, capacity))) { if (OB_ITER_END != ret) { STORAGE_LOG(WARN, "Fail to get next row, ", K(ret), K(*scan_param_), K_(main_table_param), diff --git a/src/storage/blocksstable/ob_datum_row.cpp b/src/storage/blocksstable/ob_datum_row.cpp index 11692a2212..c4cf7924fd 100644 --- a/src/storage/blocksstable/ob_datum_row.cpp +++ b/src/storage/blocksstable/ob_datum_row.cpp @@ -196,7 +196,8 @@ ObDatumRow::ObDatumRow() storage_datums_(nullptr), datum_buffer_(), old_row_(), - obj_buf_() + obj_buf_(), + trans_info_(nullptr) {} ObDatumRow::~ObDatumRow() @@ -204,7 +205,7 @@ ObDatumRow::~ObDatumRow() reset(); } -int ObDatumRow::init(ObIAllocator &allocator, const int64_t capacity) +int ObDatumRow::init(ObIAllocator &allocator, const int64_t capacity, char *trans_info_ptr) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_valid())) { @@ -219,9 +220,11 @@ int ObDatumRow::init(ObIAllocator &allocator, const int64_t capacity) } else { storage_datums_ = datum_buffer_.get_datums(); count_ = capacity; + // trans_info_ptr maybe is nullptr, + // ObDatumRow does not care about the free of trans_info_ptr's memory + trans_info_ = trans_info_ptr; } - return ret; } int ObDatumRow::init(const int64_t capacity) @@ -310,6 +313,7 @@ void ObDatumRow::reset() row_flag_.reset(); count_ = 0; local_allocator_.reset(); + trans_info_ = nullptr; } void ObDatumRow::reuse() @@ -326,6 +330,9 @@ void ObDatumRow::reuse() snapshot_version_ = 0; fast_filter_skipped_ = false; have_uncommited_row_ = false; + if (OB_NOT_NULL(trans_info_)) { + trans_info_[0] = '\0'; + } } int ObDatumRow::deep_copy(const ObDatumRow &src, ObIAllocator &allocator) diff --git a/src/storage/blocksstable/ob_datum_row.h b/src/storage/blocksstable/ob_datum_row.h index 0d956878ea..af77a1d4a6 100644 --- a/src/storage/blocksstable/ob_datum_row.h +++ b/src/storage/blocksstable/ob_datum_row.h @@ -347,7 +347,7 @@ struct ObDatumRow public: ObDatumRow(); ~ObDatumRow(); - int init(common::ObIAllocator &allocator, const int64_t capacity); + int init(common::ObIAllocator &allocator, const int64_t capacity, char *trans_info_ptr = nullptr); int init(const int64_t capacity); void reset(); void reuse(); @@ -408,6 +408,9 @@ public: //TODO @hanhui only for compile common::ObNewRow old_row_; storage::ObObjBufArray obj_buf_; + // add by @zimiao ObDatumRow does not care about the free of trans_info_ptr's memory + // The caller must guarantee the life cycle and release of this memory + char *trans_info_; }; struct ObConstDatumRow diff --git a/src/storage/concurrency_control/ob_trans_stat_row.cpp b/src/storage/concurrency_control/ob_trans_stat_row.cpp new file mode 100644 index 0000000000..7f45b48f69 --- /dev/null +++ b/src/storage/concurrency_control/ob_trans_stat_row.cpp @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "storage/concurrency_control/ob_trans_stat_row.h" +#include "sql/engine/basic/ob_pushdown_filter.h" + +namespace oceanbase +{ +namespace concurrency_control +{ + +void build_trans_stat_datum(const storage::ObTableIterParam *param, + const blocksstable::ObDatumRow &row, + const ObTransStatRow trans_stat_row) +{ + const int64_t MAX_SIZE_FOR_TRANS_STAT_DATUM = 100; + // trans stat datum index for vectorized execution + TRANS_LOG(DEBUG, "memtable try to generate trans_info", + K(trans_stat_row), K(param), K(param->op_), K(row.trans_info_), + K(lbt())); + char *trans_stat_ptr = row.trans_info_; + if (OB_NOT_NULL(param->op_) + && OB_NOT_NULL(trans_stat_ptr)) { + trans_stat_ptr[0] = '\0'; + concurrency_control::build_trans_stat_(trans_stat_row, + MAX_SIZE_FOR_TRANS_STAT_DATUM, + trans_stat_ptr); + TRANS_LOG(DEBUG, "memtable generate trans_info", + K(ObString(strlen(trans_stat_ptr), trans_stat_ptr)), + K(param->op_->is_vectorized()), K(trans_stat_row), K(param), + K(param->op_->eval_ctx_)); + } +} + +void build_trans_stat_(const ObTransStatRow trans_stat_row, + const int64_t trans_stat_len, + char *trans_stat_ptr) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + if (OB_FAIL(databuff_printf(trans_stat_ptr, + trans_stat_len, + pos, + "[%ld, %ld, %ld, %ld]", + trans_stat_row.trans_version_.get_val_for_tx(), + trans_stat_row.scn_.get_val_for_tx(), + trans_stat_row.trans_id_.get_id(), + trans_stat_row.seq_no_))) { + TRANS_LOG(WARN, "failed to printf", K(ret), K(pos), K(trans_stat_len), K(trans_stat_row)); + trans_stat_ptr[0] = '\0'; + } else { + if (pos > trans_stat_len) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected length for datum", K(pos)); + trans_stat_ptr[0] = '\0'; + } + } +} + + +} // namespace concurrency_control +} // namespace oceanbase diff --git a/src/storage/concurrency_control/ob_trans_stat_row.h b/src/storage/concurrency_control/ob_trans_stat_row.h new file mode 100644 index 0000000000..fcbce3eea8 --- /dev/null +++ b/src/storage/concurrency_control/ob_trans_stat_row.h @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_STORAGE_CONCURRENCY_CONTROL_OB_TRANS_STAT_ROW +#define OCEANBASE_STORAGE_CONCURRENCY_CONTROL_OB_TRANS_STAT_ROW + +#include "share/scn.h" +#include "share/datum/ob_datum.h" +#include "storage/access/ob_table_access_param.h" + +namespace oceanbase +{ +namespace concurrency_control +{ + +class ObTransStatRow +{ +public: + ObTransStatRow() + : trans_version_(share::SCN::max_scn()), + scn_(share::SCN::max_scn()), + trans_id_(), + seq_no_(0) {} + + void set(const share::SCN trans_version, + const share::SCN scn, + const transaction::ObTransID trans_id, + const int64_t seq_no) + { + trans_version_ = trans_version; + scn_ = scn; + trans_id_ = trans_id; + seq_no_ = seq_no; + } + + void reset() + { + trans_version_ = share::SCN::max_scn(); + scn_ = share::SCN::max_scn(); + trans_id_.reset(); + seq_no_ = 0; + } + + TO_STRING_KV(K_(trans_version), K_(scn), K_(trans_id), K_(seq_no)); + share::SCN trans_version_; + share::SCN scn_; + transaction::ObTransID trans_id_; + int64_t seq_no_; +public: + static const int64_t MAX_TRANS_STRING_SIZE = 120; +}; + +void build_trans_stat_datum(const storage::ObTableIterParam *param, + const blocksstable::ObDatumRow &row, + const ObTransStatRow trans_stat_row); + +void build_trans_stat_(const ObTransStatRow trans_stat_row, + const int64_t trans_stat_len, + char *trans_stat_ptr); + +} // namespace concurrency_control +} // namespace oceanbase + +#endif // OCEANBASE_STORAGE_CONCURRENCY_CONTROL_OB_TRANS_STAT_ROW diff --git a/src/storage/lob/ob_lob_locator.cpp b/src/storage/lob/ob_lob_locator.cpp index d18cde490a..db91b7037b 100644 --- a/src/storage/lob/ob_lob_locator.cpp +++ b/src/storage/lob/ob_lob_locator.cpp @@ -165,7 +165,7 @@ int ObLobLocatorHelper::fill_lob_locator(ObDatumRow &row, } else if (!lib::is_oracle_mode() || is_sys_table(access_param.iter_param_.table_id_)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Only oracle mode need build lob locator", K(ret)); - } else if (OB_ISNULL(access_param.output_exprs_) || OB_ISNULL(access_param.op_)) { + } else if (OB_ISNULL(access_param.output_exprs_) || OB_ISNULL(access_param.get_op())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "output expr or op is null", K(ret), K(access_param)); } else { @@ -190,14 +190,14 @@ int ObLobLocatorHelper::fill_lob_locator(ObDatumRow &row, } else if (OB_UNLIKELY(i >= access_param.output_exprs_->count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected index", K(ret), K(i), KPC(access_param.output_exprs_)); - } else if (OB_ISNULL(access_param.op_)) { + } else if (OB_ISNULL(access_param.get_op())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("access_param.op is null", K(ret), K(access_param)); } else { sql::ObExpr *expr = access_param.output_exprs_->at(i); if (is_lob_locator(expr->datum_meta_.type_)) { ObLobLocator *locator = NULL; - sql::ObDatum &datum = expr->locate_expr_datum(access_param.op_->get_eval_ctx()); + sql::ObDatum &datum = expr->locate_expr_datum(access_param.get_op()->get_eval_ctx()); if (datum.is_null()) { // do nothing. } else if (OB_FAIL(build_lob_locator(datum.get_string(), col_descs->at(idx).col_id_, diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index 1f13195ba2..0d453db99a 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -250,6 +250,18 @@ int ObMvccValueIterator::try_cleanout_tx_node_(ObMvccTransNode *tnode) return ret; } +void ObMvccValueIterator::get_trans_stat_row(concurrency_control::ObTransStatRow &row) +{ + if (OB_ISNULL(version_iter_)) { + row.reset(); + } else { + row.set(version_iter_->get_tx_version(), + version_iter_->get_scn(), + version_iter_->get_tx_id(), + version_iter_->get_seq_no()); + } +} + int ObMvccValueIterator::get_next_node(const void *&tnode) { int ret = OB_SUCCESS; diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.h b/src/storage/memtable/mvcc/ob_mvcc_iterator.h index 64ef48b15b..899efdfed2 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.h +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.h @@ -18,6 +18,7 @@ #include "storage/memtable/mvcc/ob_mvcc_row.h" #include "storage/memtable/mvcc/ob_query_engine.h" #include "storage/tx/ob_trans_define.h" +#include "storage/concurrency_control/ob_trans_stat_row.h" namespace oceanbase { @@ -139,6 +140,7 @@ public: const ObMvccAccessCtx *get_mvcc_acc_ctx() const { return ctx_; } const ObMvccRow *get_mvcc_row() const { return value_; } const ObMvccTransNode *get_trans_node() const { return version_iter_; } + void get_trans_stat_row(concurrency_control::ObTransStatRow &row); private: int lock_for_read_(const ObQueryFlag &flag); int lock_for_read_inner_(const ObQueryFlag &flag, ObMvccTransNode *&iter); diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.h b/src/storage/memtable/mvcc/ob_mvcc_row.h index a2484b209a..6220fd6a8b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.h +++ b/src/storage/memtable/mvcc/ob_mvcc_row.h @@ -182,7 +182,9 @@ public: tx_end_scn_.atomic_store(tx_end_scn); } } - share::SCN get_tx_end_scn() { return tx_end_scn_.atomic_load(); } + share::SCN get_tx_end_scn() const { return tx_end_scn_.atomic_load(); } + share::SCN get_tx_version() const { return trans_version_.atomic_load(); } + share::SCN get_scn() const { return scn_.atomic_load(); } private: // the row flag of the mvcc tx node diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 3e64e903b4..392dc15865 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -44,6 +44,8 @@ #include "storage/tablet/ob_tablet_memtable_mgr.h" #include "storage/tx_storage/ob_tenant_freezer.h" +#include "storage/concurrency_control/ob_trans_stat_row.h" + namespace oceanbase { using namespace common; @@ -846,12 +848,24 @@ int ObMemtable::get( } } else { if (OB_UNLIKELY(!row.is_valid())) { - if (OB_FAIL(row.init(*context.stmt_allocator_, out_cols.count()))) { - STORAGE_LOG(WARN, "Failed to init datum row", K(ret)); + char *trans_info_ptr = nullptr; + if (param.need_trans_info()) { + int64_t length = concurrency_control::ObTransStatRow::MAX_TRANS_STRING_SIZE; + if (OB_ISNULL(trans_info_ptr = static_cast(context.stmt_allocator_->alloc(length)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc memory", K(ret)); + } + } + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_FAIL(row.init(*context.stmt_allocator_, out_cols.count(), trans_info_ptr))) { + STORAGE_LOG(WARN, "Failed to init datum row", K(ret), K(param.need_trans_info())); } } if (OB_SUCC(ret)) { const ObStoreRowkey *store_rowkey = nullptr; + concurrency_control::ObTransStatRow trans_stat_row; + (void)value_iter.get_trans_stat_row(trans_stat_row); if (NULL != returned_mtk.get_rowkey()) { returned_mtk.get_rowkey(store_rowkey); } else { @@ -872,6 +886,9 @@ int ObMemtable::get( } } } + + // generate trans stat datum for 4377 check + concurrency_control::build_trans_stat_datum(¶m, row, trans_stat_row); } } } diff --git a/src/storage/memtable/ob_memtable_iterator.cpp b/src/storage/memtable/ob_memtable_iterator.cpp index b44acb945c..28b796cfb3 100644 --- a/src/storage/memtable/ob_memtable_iterator.cpp +++ b/src/storage/memtable/ob_memtable_iterator.cpp @@ -73,14 +73,24 @@ int ObMemtableGetIterator::init( ObIMemtable &memtable) { int ret = OB_SUCCESS; + char *trans_info_ptr = nullptr; if (is_inited_) { reset(); } const ObTableReadInfo *read_info = param.get_read_info(context.use_fuse_row_cache_); - if (OB_UNLIKELY(nullptr == read_info || !read_info->is_valid())) { + if (param.need_trans_info()) { + int64_t length = concurrency_control::ObTransStatRow::MAX_TRANS_STRING_SIZE; + if (OB_ISNULL(trans_info_ptr = static_cast(context.stmt_allocator_->alloc(length)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "fail to alloc memory", K(ret)); + } + } + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_UNLIKELY(nullptr == read_info || !read_info->is_valid())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "Unexpected read info", K(ret), KPC(read_info)); - } else if (OB_FAIL(cur_row_.init(*context.stmt_allocator_, read_info->get_request_count()))) { + } else if (OB_FAIL(cur_row_.init(*context.stmt_allocator_, read_info->get_request_count(), trans_info_ptr))) { STORAGE_LOG(WARN, "Failed to init datum row", K(ret)); } else { param_ = ¶m; @@ -189,15 +199,27 @@ int ObMemtableScanIterator::init( storage::ObTableAccessContext &context) { int ret = OB_SUCCESS; + char *trans_info_ptr = nullptr; if (is_inited_) { reset(); } - if (OB_ISNULL(read_info_ = param.get_read_info(false))) { + if (param.need_trans_info()) { + int64_t length = concurrency_control::ObTransStatRow::MAX_TRANS_STRING_SIZE; + if (OB_ISNULL(trans_info_ptr = static_cast(context.stmt_allocator_->alloc(length)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "fail to alloc memory", K(ret)); + } + } + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_ISNULL(read_info_ = param.get_read_info(false))) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "Unexpected null read info", K(ret), K(param)); - } else if (OB_FAIL(row_.init(*context.stmt_allocator_, read_info_->get_request_count()))) { - TRANS_LOG(WARN, "Failed to init datum row", K(ret)); + } else if (OB_FAIL(row_.init(*context.stmt_allocator_, + read_info_->get_request_count(), + trans_info_ptr))) { + TRANS_LOG(WARN, "Failed to init datum row", K(ret), K(param.need_trans_info())); } else { TRANS_LOG(DEBUG, "scan iterator init succ", K(param.table_id_)); param_ = ¶m; @@ -348,12 +370,8 @@ int ObMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row) int64_t row_scn = 0; key->get_rowkey(rowkey); - bool is_committed = false; - const ObMvccTransNode *latest_trans_node = value_iter->get_trans_node(); - if (OB_NOT_NULL(latest_trans_node) - && latest_trans_node->is_committed()) { - is_committed = true; - } + concurrency_control::ObTransStatRow trans_stat_row; + (void)value_iter->get_trans_stat_row(trans_stat_row); ObStoreRowLockState lock_state; if (param_->is_for_foreign_check_ && @@ -388,6 +406,9 @@ int ObMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row) } } + // generate trans stat datum for 4377 check + concurrency_control::build_trans_stat_datum(param_, row_, trans_stat_row); + row_.scan_index_ = 0; row = &row_; } @@ -396,7 +417,6 @@ int ObMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row) iter_flag_ = 0; } return ret; - } void ObMemtableScanIterator::reset() @@ -444,14 +464,24 @@ int ObMemtableMGetIterator::init( reset(); } + char *trans_info_ptr = nullptr; const ObTableReadInfo *read_info = param.get_read_info(context.use_fuse_row_cache_); - if (OB_ISNULL(table) || OB_ISNULL(query_range)) { + if (param.need_trans_info()) { + int64_t length = concurrency_control::ObTransStatRow::MAX_TRANS_STRING_SIZE; + if (OB_ISNULL(trans_info_ptr = static_cast(context.stmt_allocator_->alloc(length)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "fail to alloc memory", K(ret)); + } + } + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_ISNULL(table) || OB_ISNULL(query_range)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "table and query_range can not be null", KP(table), KP(query_range), K(ret)); } else if (OB_UNLIKELY(nullptr == read_info || !read_info->is_valid())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "Unexpected read info", K(ret), KPC(read_info)); - } else if (OB_FAIL(cur_row_.init(*context.stmt_allocator_, read_info->get_request_count()))) { + } else if (OB_FAIL(cur_row_.init(*context.stmt_allocator_, read_info->get_request_count(), trans_info_ptr))) { TRANS_LOG(WARN, "Failed to init datum row", K(ret)); } else { const ObColDescIArray &out_cols = read_info->get_columns_desc(); diff --git a/src/storage/ob_row_fuse.cpp b/src/storage/ob_row_fuse.cpp index 091a7e6a11..674a4db508 100644 --- a/src/storage/ob_row_fuse.cpp +++ b/src/storage/ob_row_fuse.cpp @@ -205,7 +205,12 @@ int ObRowFuse::fuse_row(const blocksstable::ObDatumRow &former, final_result = false; bool first_val = (0 == result.count_ || result.row_flag_.is_not_exist()); int64_t column_cnt = 0; - + // add by zimiao, When result.trans_info_ is nullptr, + // it means that the current row does not have any transaction information, + // so set the first transaction information obtained by the current row + if (OB_ISNULL(result.trans_info_)) { + result.trans_info_ = former.trans_info_; + } if (first_val) { nop_pos.reset(); result.row_flag_ = former.row_flag_;