fix mysqltest

This commit is contained in:
obdev 2024-02-19 08:05:55 +00:00 committed by ob-robot
parent 8e44725dfb
commit 5896db45f9
8 changed files with 63 additions and 14 deletions

View File

@ -1189,10 +1189,6 @@ int ObLogCommitter::after_trans_handled_(PartTransTask *participants)
// Decrement the reference count after the Commit message is updated
// If the reference count is 0, the partition transaction is recycled
else if (0 == part_trans_task->dec_ref_cnt()) {
if (is_ddl_trans && ! part_trans_task->is_sys_ls_part_trans()) {
// mark user_ls part_trans_task in ddl_trans not served, and will recycle redo of the part_trans_task in resource_collector.
part_trans_task->set_unserved();
}
if (OB_FAIL(resource_collector_->revert(part_trans_task))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("revert PartTransTask fail", KR(ret), K(part_trans_task));

View File

@ -529,6 +529,8 @@ int ObLogResourceCollector::handle(void *data,
if (OB_FAIL(trans_ctx_mgr_->get_trans_ctx(tenant_id, trans_id, trans_ctx, enable_create))) {
LOG_ERROR("get trans_ctx fail", KR(ret), K(tenant_id), K(trans_id), K(*task));
} else if (task->is_dml_trans() && trans_ctx->has_ddl_participant() && OB_FAIL(recycle_stored_redo_(*task))) {
LOG_ERROR("recycle stored redo for dml_participant of dist ddl trans failed", KR(ret), KPC(task), KPC(trans_ctx));
}
// Increase the number of participants that can be recycled
else if (OB_FAIL(trans_ctx->inc_revertable_participant_count(all_participant_revertable))) {
@ -868,7 +870,7 @@ void ObLogResourceCollector::get_task_count(int64_t &part_trans_task_count, int6
br_count = ATOMIC_LOAD(&br_count_);
}
int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t thread_idx, PartTransTask &task)
int ObLogResourceCollector::recycle_stored_redo_(PartTransTask &task)
{
int ret = OB_SUCCESS;
const logservice::TenantLSID &tenant_ls_id = task.get_tls_id();
@ -899,14 +901,23 @@ int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t threa
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(recycle_part_trans_task_(thread_idx, &task))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("recycle_part_trans_task_ failed", KR(ret), K(thread_idx), K(task));
}
} else {
// no more access to task
return ret;
}
int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t thread_idx, PartTransTask &task)
{
int ret = OB_SUCCESS;
if (OB_FAIL(recycle_stored_redo_(task))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("recycle_stored_redo_ failed", KR(ret), K(thread_idx), K(task));
}
} else if (OB_FAIL(recycle_part_trans_task_(thread_idx, &task))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("recycle_part_trans_task_ failed", KR(ret), K(thread_idx), K(task));
}
} else {
// no more access to task
}
return ret;

View File

@ -140,6 +140,7 @@ private:
int revert_log_entry_task_(ObLogEntryTask *log_entry_task);
int del_store_service_data_(const uint64_t tenant_id,
std::string &key);
int recycle_stored_redo_(PartTransTask &task);
int revert_unserved_part_trans_task_(const int64_t thread_idx, PartTransTask &task);
void do_stat_(PartTransTask &task,

View File

@ -765,6 +765,20 @@ int TransCtx::get_tenant_id(uint64_t &tenant_id) const
return ret;
}
bool TransCtx::has_ddl_participant() const
{
bool has_ddl_part = false;
PartTransTask* participant = ready_participant_objs_;
while (! has_ddl_part && OB_NOT_NULL(participant)) {
PartTransTask* next = participant->next_task();
has_ddl_part = participant->is_ddl_trans();
participant = next;
}
return has_ddl_part;
}
int TransCtx::init_participant_array_(const int64_t part_count)
{
int ret = OB_SUCCESS;

View File

@ -227,6 +227,7 @@ public:
int lock();
int unlock();
int get_tenant_id(uint64_t &tenant_id) const;
bool has_ddl_participant() const;
// for unittest start
int set_trans_id(const transaction::ObTransID &trans_id);

View File

@ -221,7 +221,7 @@ int ObMergeLogPlan::create_merge_plans(ObIArray<CandidatePlan> &candi_plans,
} else if (is_multi_part_dml && force_no_multi_part) {
/*do nothing*/
} else if (candi_plan.plan_tree_->is_sharding()
&& (is_multi_part_dml || insert_sharding->is_local())
&& (is_multi_part_dml || (insert_sharding != NULL && insert_sharding->is_local()))
&& OB_FAIL(allocate_exchange_as_top(candi_plan.plan_tree_, exch_info))) {
LOG_WARN("failed to allocate exchange as top", K(ret));
} else if (OB_FAIL(allocate_merge_as_top(candi_plan.plan_tree_, insert_table_part,

View File

@ -36,7 +36,12 @@ int ObTransformOrExpansion::transform_one_stmt(ObIArray<ObParentDMLStmt> &parent
{
int ret = OB_SUCCESS;
trans_happened = false;
if (OB_FAIL(transform_in_joined_table(parent_stmts, stmt, trans_happened))) {
bool is_stmt_valid = false;
if (OB_FAIL(check_stmt_valid_for_expansion(stmt, is_stmt_valid))) {
LOG_WARN("failed to check stmt valid for expansion", K(ret));
} else if (!is_stmt_valid) {
/* do nothing */
} else if (OB_FAIL(transform_in_joined_table(parent_stmts, stmt, trans_happened))) {
LOG_WARN("failed to do or expansion in joined condition", K(ret));
} else if (trans_happened) {
/* do nothing */
@ -3340,5 +3345,25 @@ int ObTransformOrExpansion::check_left_bottom_table(ObSelectStmt &stmt,
return ret;
}
int ObTransformOrExpansion::check_stmt_valid_for_expansion(ObDMLStmt *stmt, bool &is_stmt_valid)
{
int ret = OB_SUCCESS;
is_stmt_valid = true;
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected param", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && is_stmt_valid && i < stmt->get_table_size(); i++) {
TableItem *table_item = stmt->get_table_items().at(i);
if (OB_ISNULL(table_item)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (table_item->is_fake_cte_table()) {
is_stmt_valid = false;
}
}
return ret;
}
} /* namespace sql */
} /* namespace oceanbase */

View File

@ -391,6 +391,7 @@ private:
TableItem *rel_table,
TableItem *table,
bool &left_bottom);
int check_stmt_valid_for_expansion(ObDMLStmt *stmt, bool &is_stmt_valid);
DISALLOW_COPY_AND_ASSIGN(ObTransformOrExpansion);
private:
int64_t try_times_;