[FEAT MERGE] transaction pdml support optimization

Co-authored-by: yyy-hust <yyy.hust@gmail.com>
Co-authored-by: SanmuWangZJU <sanmuwang.ws@gmail.com>
This commit is contained in:
chinaxing
2023-12-16 12:42:52 +00:00
committed by ant-ob-hengtang
parent eaa353f503
commit 438a70b2b8
169 changed files with 8960 additions and 3921 deletions

View File

@ -1141,6 +1141,7 @@ int ObDMLService::delete_row(const ObDASDelCtDef &das_del_ctdef,
int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
ObDASDMLBaseRtDef &base_rtdef,
transaction::ObTxReadSnapshot &snapshot,
const int16_t write_branch_id,
ObIAllocator &das_alloc,
storage::ObDMLBaseParam &dml_param)
{
@ -1157,6 +1158,7 @@ int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
dml_param.is_batch_stmt_ = base_ctdef.is_batch_stmt_;
dml_param.dml_allocator_ = &das_alloc;
dml_param.snapshot_ = snapshot;
dml_param.branch_id_ = write_branch_id;
if (base_ctdef.is_batch_stmt_) {
dml_param.write_flag_.set_is_dml_batch_opt();
}

View File

@ -140,6 +140,7 @@ public:
static int init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
ObDASDMLBaseRtDef &base_rtdef,
transaction::ObTxReadSnapshot &snapshot,
const int16_t write_branch_id,
common::ObIAllocator &das_alloc,
storage::ObDMLBaseParam &dml_param);
static int init_das_dml_rtdef(ObDMLRtCtx &dml_rtctx,
@ -281,6 +282,7 @@ public:
ObDASIndexDMLAdaptor()
: tx_desc_(nullptr),
snapshot_(nullptr),
write_branch_id_(0),
ctdef_(nullptr),
rtdef_(nullptr),
related_ctdefs_(nullptr),
@ -302,6 +304,7 @@ public:
public:
transaction::ObTxDesc *tx_desc_;
transaction::ObTxReadSnapshot *snapshot_;
int16_t write_branch_id_;
const CtDefType *ctdef_;
RtDefType *rtdef_;
const DASCtDefFixedArray *related_ctdefs_;
@ -325,7 +328,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet(DMLIterator &iter, int64_
if (OB_FAIL(write_tablet_with_ignore(iter, affected_rows))) {
LOG_WARN("write tablet with ignore failed", K(ret));
}
} else if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, *das_allocator_, dml_param_))) {
} else if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
SQL_DAS_LOG(WARN, "init dml param failed", K(ret), K(ctdef_->table_id_), K(ctdef_->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_, tablet_id_, *ctdef_, *rtdef_, iter, affected_rows))) {
SQL_DAS_LOG(WARN, "write rows failed", K(ret),
@ -341,7 +344,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet(DMLIterator &iter, int64_
K(ls_id_), K(related_tablet_id), K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
if (OB_FAIL(iter.rewind(related_ctdef))) {
SQL_DAS_LOG(WARN, "rewind iterator failed", K(ret));
} else if (OB_FAIL(ObDMLService::init_dml_param(*related_ctdef, *related_rtdef, *snapshot_, *das_allocator_, dml_param_))) {
} else if (OB_FAIL(ObDMLService::init_dml_param(*related_ctdef, *related_rtdef, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
SQL_DAS_LOG(WARN, "init index dml param failed", K(ret),
K(related_ctdef->table_id_), K(related_ctdef->index_tid_));
} else if (OB_FAIL(write_rows(ls_id_,
@ -398,7 +401,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet_with_ignore(DMLIterator &
SQL_DAS_LOG(TRACE, "write table dml row with ignore", KPC(dml_row), K(ls_id_), K(tablet_id_),
K(ctdef_->table_id_), K(ctdef_->index_tid_));
DMLIterator single_row_iter(ctdef_, single_row_buffer, *das_allocator_);
if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, *das_allocator_, dml_param_))) {
if (OB_FAIL(ObDMLService::init_dml_param(*ctdef_, *rtdef_, *snapshot_, write_branch_id_, *das_allocator_, dml_param_))) {
SQL_DAS_LOG(WARN, "init dml param failed", K(ret), KPC_(ctdef), KPC_(rtdef));
} else if (OB_FAIL(write_rows(ls_id_,
tablet_id_,
@ -422,6 +425,7 @@ int ObDASIndexDMLAdaptor<N, DMLIterator>::write_tablet_with_ignore(DMLIterator &
} else if (OB_FAIL(ObDMLService::init_dml_param(*related_ctdef,
*related_rtdef,
*snapshot_,
write_branch_id_,
*das_allocator_,
dml_param_))) {
SQL_DAS_LOG(WARN, "init index dml param failed", K(ret),

View File

@ -747,6 +747,7 @@ int ObTableModifyOp::inner_open()
} else {
init_das_dml_ctx();
}
LOG_TRACE("table_modify_op", K(execute_single_row_));
return ret;
}

View File

@ -291,7 +291,8 @@ public:
bool has_non_trivial_expr_op_ctx() const { return has_non_trivial_expr_op_ctx_; }
void set_non_trivial_expr_op_ctx(bool v) { has_non_trivial_expr_op_ctx_ = v; }
inline bool &get_tmp_alloc_used() { return tmp_alloc_used_; }
// set write branch id for DML write
void set_branch_id(const int16_t branch_id) { das_ctx_.set_write_branch_id(branch_id); }
VIRTUAL_NEED_SERIALIZE_AND_DESERIALIZE;
protected:
uint64_t get_ser_version() const;

View File

@ -180,7 +180,7 @@ int ObPDMLOpBatchRowCache::add_row(const ObExprPtrIArray &row, ObTabletID tablet
// the 2MB config is tested optimal under PDML concurrency=4 and concurrency=8 cases
// TODO: maybe we can introduce a dynamic control policy
// concidering the tenant overall access behavior to storage
constexpr int64_t max_pdml_cache_size_per_thread = 2 * 1024 * 1024;
const int64_t max_pdml_cache_size_per_thread = GCONF._pdml_thread_cache_size;
if (!with_barrier_ && cached_rows_size_ > max_pdml_cache_size_per_thread) {
ret = OB_EXCEED_MEM_LIMIT;
} else if (OB_FAIL(process_dump())) {

View File

@ -66,7 +66,8 @@ OB_SERIALIZE_MEMBER(ObPxSqcMeta,
px_detectable_ids_,
p2p_dh_map_info_,
sqc_count_,
monitoring_info_);
monitoring_info_,
branch_id_base_);
OB_SERIALIZE_MEMBER(ObPxTask,
qc_id_,
dfo_id_,
@ -79,7 +80,8 @@ OB_SERIALIZE_MEMBER(ObPxTask,
exec_addr_,
execution_id_,
px_int_id_,
is_fulltree_);
is_fulltree_,
branch_id_);
OB_SERIALIZE_MEMBER(ObPxRpcInitTaskResponse,
task_co_id_);
@ -155,6 +157,7 @@ int ObPxSqcMeta::assign(const ObPxSqcMeta &other)
qc_id_ = other.qc_id_;
dfo_id_ = other.dfo_id_;
sqc_id_ = other.sqc_id_;
branch_id_base_ = other.branch_id_base_;
thread_inited_ = other.thread_inited_;
thread_finish_ = other.thread_finish_;
exec_addr_ = other.exec_addr_;
@ -1000,11 +1003,7 @@ int ObPxRpcInitTaskArgs::deep_copy_assign(ObPxRpcInitTaskArgs &src,
} else if (ser_pos != des_pos) {
ret = OB_DESERIALIZE_ERROR;
LOG_WARN("data_len and pos mismatch", K(ser_arg_len), K(ser_pos), K(des_pos), K(ret));
} else {
// PLACE_HOLDER: if want multiple px worker share trans_desc
// set exec_ctx_->session->set_effective_trans_desc(src.exec_ctx_->session->get_effective_trans_desc());
}
return ret;
}

View File

@ -207,6 +207,7 @@ public:
qc_id_(common::OB_INVALID_ID),
sqc_id_(common::OB_INVALID_ID),
dfo_id_(common::OB_INVALID_ID),
branch_id_base_(0),
access_table_locations_(),
qc_ch_info_(),
sqc_ch_info_(),
@ -368,8 +369,10 @@ public:
bool sqc_order_gi_tasks() const { return sqc_order_gi_tasks_; }
ObQCMonitoringInfo &get_monitoring_info() { return monitoring_info_; }
const ObQCMonitoringInfo &get_monitoring_info() const { return monitoring_info_; }
void set_branch_id_base(const int16_t branch_id_base) { branch_id_base_ = branch_id_base; }
int16_t get_branch_id_base() const { return branch_id_base_; }
TO_STRING_KV(K_(need_report), K_(execution_id), K_(qc_id), K_(sqc_id), K_(dfo_id), K_(exec_addr), K_(qc_addr),
K_(qc_ch_info), K_(sqc_ch_info),
K_(branch_id_base), K_(qc_ch_info), K_(sqc_ch_info),
K_(task_count), K_(max_task_count), K_(min_task_count),
K_(thread_inited), K_(thread_finish), K_(px_int_id),
K_(transmit_use_interm_result),
@ -380,6 +383,9 @@ private:
uint64_t qc_id_;
int64_t sqc_id_;
int64_t dfo_id_;
// branch id is used to distinguish datas written concurrently by px-workers
// for replace and insert update operator, they need branch_id to rollback writes by one px-worker
int16_t branch_id_base_;
ObQCMonitoringInfo monitoring_info_;
// The partition location information of the all table_scan op and dml op
// used for px worker execution
@ -472,6 +478,7 @@ public:
child_dfos_(),
has_scan_(false),
has_dml_op_(false),
has_need_branch_id_op_(false),
has_temp_scan_(false),
is_active_(false),
is_scheduled_(false),
@ -541,6 +548,8 @@ public:
inline bool has_scan_op() const { return has_scan_; }
inline void set_dml_op(bool has_dml_op) { has_dml_op_ = has_dml_op; }
inline bool has_dml_op() { return has_dml_op_; }
inline void set_need_branch_id_op(bool has_need_branch_id_op) { has_need_branch_id_op_ = has_need_branch_id_op; }
inline bool has_need_branch_id_op() const { return has_need_branch_id_op_; }
inline void set_temp_table_scan(bool has_scan) { has_temp_scan_ = has_scan; }
inline bool has_temp_table_scan() const { return has_temp_scan_; }
inline bool is_fast_dfo() const { return is_prealloc_receive_channel() || is_prealloc_transmit_channel(); }
@ -741,6 +750,7 @@ private:
common::ObSEArray<ObDfo *, 4> child_dfos_;
bool has_scan_; // DFO 中包含至少一个 scan 算子,或者仅仅包含一个dml
bool has_dml_op_; // DFO中可能包含一个dml
bool has_need_branch_id_op_; // DFO 中有算子需要分配branch_id
bool has_temp_scan_;
bool is_active_;
bool is_scheduled_;
@ -906,6 +916,7 @@ public:
dfo_id_(0),
sqc_id_(0),
task_id_(-1),
branch_id_(0),
execution_id_(0),
task_channel_(NULL),
sqc_channel_(NULL),
@ -933,6 +944,7 @@ public:
dfo_id_ = other.dfo_id_;
sqc_id_ = other.sqc_id_;
task_id_ = other.task_id_;
branch_id_ = other.branch_id_;
execution_id_ = other.execution_id_;
sqc_ch_info_ = other.sqc_ch_info_;
task_ch_info_ = other.task_ch_info_;
@ -958,6 +970,7 @@ public:
K_(dfo_id),
K_(sqc_id),
K_(task_id),
K_(branch_id),
K_(execution_id),
K_(sqc_ch_info),
K_(task_ch_info),
@ -986,6 +999,8 @@ public:
inline bool is_task_state_set(int32_t flag) const { return 0 != (state_ & flag); }
inline void set_task_id(int64_t task_id) { task_id_ = task_id; }
inline int64_t get_task_id() const { return task_id_; }
inline void set_branch_id(int16_t branch_id) { branch_id_ = branch_id; }
inline int16_t get_branch_id() const { return branch_id_; }
inline void set_qc_id(uint64_t qc_id) { qc_id_ = qc_id; }
inline int64_t get_qc_id() const { return qc_id_; }
inline void set_sqc_id(int64_t sqc_id) { sqc_id_ = sqc_id; }
@ -1029,6 +1044,7 @@ public:
int64_t dfo_id_;
int64_t sqc_id_;
int64_t task_id_;
int16_t branch_id_;
int64_t execution_id_;
dtl::ObDtlChannelInfo sqc_ch_info_;
dtl::ObDtlChannelInfo task_ch_info_;

View File

@ -502,6 +502,11 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
} else if (phy_op->is_dml_operator() && NULL != parent_dfo) {
// 当前op是一个dml算子,需要设置dfo的属性
parent_dfo->set_dml_op(true);
const ObPhyOperatorType op_type = phy_op->get_type();
LOG_TRACE("set DFO need_branch_id", K(op_type));
parent_dfo->set_need_branch_id_op(op_type == PHY_INSERT_ON_DUP
|| op_type == PHY_REPLACE
|| op_type == PHY_LOCK);
} else if (phy_op->get_type() == PHY_TEMP_TABLE_ACCESS && NULL != parent_dfo) {
parent_dfo->set_temp_table_scan(true);
const ObTempTableAccessOpSpec *access = static_cast<const ObTempTableAccessOpSpec*>(phy_op);

View File

@ -584,6 +584,19 @@ int ObSerialDfoScheduler::do_schedule_dfo(ObExecContext &ctx, ObDfo &dfo) const
}
}
// 2. allocate branch_id for DML: replace, insert update, select for update
if (OB_SUCC(ret) && dfo.has_need_branch_id_op()) {
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
int16_t branch_id = 0;
const int64_t max_task_count = sqcs.at(idx)->get_max_task_count();
if (OB_FAIL(ObSqlTransControl::alloc_branch_id(ctx, max_task_count, branch_id))) {
LOG_WARN("alloc branch id fail", KR(ret), K(max_task_count));
} else {
sqcs.at(idx)->set_branch_id_base(branch_id);
LOG_TRACE("alloc branch id", K(max_task_count), K(branch_id), KPC(sqcs.at(idx)));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(dispatch_sqcs(ctx, dfo, sqcs))) {
@ -755,6 +768,20 @@ int ObParallelDfoScheduler::do_schedule_dfo(ObExecContext &exec_ctx, ObDfo &dfo)
}
}
// 2. allocate branch_id for DML: replace, insert update, select for update
if (OB_SUCC(ret) && dfo.has_need_branch_id_op()) {
ARRAY_FOREACH_X(sqcs, idx, cnt, OB_SUCC(ret)) {
int16_t branch_id = 0;
const int64_t max_task_count = sqcs.at(idx)->get_max_task_count();
if (OB_FAIL(ObSqlTransControl::alloc_branch_id(exec_ctx, max_task_count, branch_id))) {
LOG_WARN("alloc branch id fail", KR(ret), K(max_task_count));
} else {
sqcs.at(idx)->set_branch_id_base(branch_id);
LOG_TRACE("alloc branch id", K(max_task_count), K(branch_id), KPC(sqcs.at(idx)));
}
}
}
if (OB_SUCC(ret)) {
// 下面的逻辑处理握手阶段超时的情况
// - 目的: 为了防止死锁

View File

@ -526,6 +526,9 @@ int ObPxSubCoord::create_tasks(ObPxRpcInitSqcArgs &sqc_arg, ObSqcCtx &sqc_ctx, b
const ObAddr &task_exec_addr = sqc.get_exec_addr();
const ObAddr &qc_exec_addr = sqc.get_qc_addr();
task.set_task_id(i);
if (sqc.get_branch_id_base()) {
task.set_branch_id(sqc.get_branch_id_base() + i);
}
task.set_sqc_addr(sqc_exec_addr);
task.set_exec_addr(task_exec_addr);
task.set_qc_addr(qc_exec_addr);

View File

@ -163,6 +163,7 @@ int ObPxTaskProcess::process()
arg_.exec_ctx_->set_sqc_handler(arg_.sqc_handler_);
arg_.exec_ctx_->set_px_task_id(arg_.task_.get_task_id());
arg_.exec_ctx_->set_px_sqc_id(arg_.task_.get_sqc_id());
arg_.exec_ctx_->set_branch_id(arg_.task_.get_branch_id());
ObMaxWaitGuard max_wait_guard(enable_perf_event ? &max_wait_desc : NULL);
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL);