Fix bugs of ddl reap old replica build task
This commit is contained in:
		@ -2238,7 +2238,7 @@ int ObSyncTabletAutoincSeqCtx::call_and_process_all_tablet_autoinc_seqs(P &proxy
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObDDLRedefinitionTask::try_reap_old_replica_build_task()
 | 
					int ObDDLRedefinitionTask::reap_old_replica_build_task(bool &need_exec_new_inner_sql)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObSchemaGetterGuard schema_guard;
 | 
					  ObSchemaGetterGuard schema_guard;
 | 
				
			||||||
@ -2261,17 +2261,19 @@ int ObDDLRedefinitionTask::try_reap_old_replica_build_task()
 | 
				
			|||||||
    const ObTabletID unused_tablet_id;
 | 
					    const ObTabletID unused_tablet_id;
 | 
				
			||||||
    const ObDDLTaskInfo unused_addition_info;
 | 
					    const ObDDLTaskInfo unused_addition_info;
 | 
				
			||||||
    const int old_ret_code = OB_SUCCESS;
 | 
					    const int old_ret_code = OB_SUCCESS;
 | 
				
			||||||
    bool need_exec_new_inner_sql = true;
 | 
					 | 
				
			||||||
    ObAddr invalid_addr;
 | 
					    ObAddr invalid_addr;
 | 
				
			||||||
    (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id,
 | 
					    if (old_execution_id < 0) {
 | 
				
			||||||
 | 
					      need_exec_new_inner_sql = true;
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id,
 | 
				
			||||||
        task_id_, old_execution_id, invalid_addr, trace_id_,
 | 
					        task_id_, old_execution_id, invalid_addr, trace_id_,
 | 
				
			||||||
        table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql);
 | 
					        table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql))) {
 | 
				
			||||||
    if (!need_exec_new_inner_sql) {
 | 
					      if (OB_EAGAIN != ret) {
 | 
				
			||||||
      if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) {
 | 
					        LOG_WARN("failed to check and wait old complement task", K(ret));
 | 
				
			||||||
        LOG_INFO("succ to wait and complete old task finished!", K(ret));
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (!need_exec_new_inner_sql) {
 | 
				
			||||||
 | 
					      if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) {
 | 
				
			||||||
 | 
					        LOG_WARN("failed to wait and complete old task finished!", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      ret = OB_ENTRY_NOT_EXIST;
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
 | 
				
			|||||||
@ -126,7 +126,7 @@ public:
 | 
				
			|||||||
  virtual void flt_set_task_span_tag() const = 0;
 | 
					  virtual void flt_set_task_span_tag() const = 0;
 | 
				
			||||||
  virtual void flt_set_status_span_tag() const = 0;
 | 
					  virtual void flt_set_status_span_tag() const = 0;
 | 
				
			||||||
  virtual int cleanup_impl() override;
 | 
					  virtual int cleanup_impl() override;
 | 
				
			||||||
  int try_reap_old_replica_build_task();
 | 
					  int reap_old_replica_build_task(bool &need_exec_new_inner_sql);
 | 
				
			||||||
  INHERIT_TO_STRING_KV("ObDDLTask", ObDDLTask,
 | 
					  INHERIT_TO_STRING_KV("ObDDLTask", ObDDLTask,
 | 
				
			||||||
      K(wait_trans_ctx_), K(sync_tablet_autoinc_seq_ctx_), K(build_replica_request_time_),
 | 
					      K(wait_trans_ctx_), K(sync_tablet_autoinc_seq_ctx_), K(build_replica_request_time_),
 | 
				
			||||||
      K(complete_sstable_job_ret_code_), K(snapshot_held_), K(has_synced_autoincrement_),
 | 
					      K(complete_sstable_job_ret_code_), K(snapshot_held_), K(has_synced_autoincrement_),
 | 
				
			||||||
 | 
				
			|||||||
@ -720,7 +720,7 @@ int ObIndexBuildTask::release_snapshot(const int64_t snapshot)
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObIndexBuildTask::try_reap_old_replica_build_task()
 | 
					int ObIndexBuildTask::reap_old_replica_build_task(bool &need_exec_new_inner_sql)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObSchemaGetterGuard schema_guard;
 | 
					  ObSchemaGetterGuard schema_guard;
 | 
				
			||||||
@ -743,17 +743,19 @@ int ObIndexBuildTask::try_reap_old_replica_build_task()
 | 
				
			|||||||
    const ObTabletID unused_tablet_id;
 | 
					    const ObTabletID unused_tablet_id;
 | 
				
			||||||
    const ObDDLTaskInfo unused_addition_info;
 | 
					    const ObDDLTaskInfo unused_addition_info;
 | 
				
			||||||
    const int old_ret_code = OB_SUCCESS;
 | 
					    const int old_ret_code = OB_SUCCESS;
 | 
				
			||||||
    bool need_exec_new_inner_sql = true;
 | 
					 | 
				
			||||||
    ObAddr invalid_addr;
 | 
					    ObAddr invalid_addr;
 | 
				
			||||||
    (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id,
 | 
					    if (old_execution_id < 0) {
 | 
				
			||||||
 | 
					      need_exec_new_inner_sql = true;
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id,
 | 
				
			||||||
        task_id_, old_execution_id, invalid_addr, trace_id_,
 | 
					        task_id_, old_execution_id, invalid_addr, trace_id_,
 | 
				
			||||||
        table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql);
 | 
					        table_schema->get_schema_version(), snapshot_version_, need_exec_new_inner_sql))) {
 | 
				
			||||||
    if (!need_exec_new_inner_sql) {
 | 
					      if (OB_EAGAIN != ret) {
 | 
				
			||||||
 | 
					        LOG_WARN("failed to check and wait old complement task", K(ret));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (!need_exec_new_inner_sql) {
 | 
				
			||||||
      if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) {
 | 
					      if (OB_FAIL(update_complete_sstable_job_status(unused_tablet_id, snapshot_version_, old_execution_id, old_ret_code, unused_addition_info))) {
 | 
				
			||||||
        LOG_INFO("succ to wait and complete old task finished!", K(ret));
 | 
					        LOG_INFO("succ to wait and complete old task finished!", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      ret = OB_ENTRY_NOT_EXIST;
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
@ -855,7 +857,14 @@ int ObIndexBuildTask::wait_data_complement()
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  // submit a job to complete sstable for the index table on snapshot_version
 | 
					  // submit a job to complete sstable for the index table on snapshot_version
 | 
				
			||||||
  if (OB_SUCC(ret) && !state_finished && !is_sstable_complete_task_submitted_) {
 | 
					  if (OB_SUCC(ret) && !state_finished && !is_sstable_complete_task_submitted_) {
 | 
				
			||||||
    if (OB_SUCCESS == try_reap_old_replica_build_task()) {
 | 
					    bool need_exec_new_inner_sql = false;
 | 
				
			||||||
 | 
					    if (OB_FAIL(reap_old_replica_build_task(need_exec_new_inner_sql))) {
 | 
				
			||||||
 | 
					      if (OB_EAGAIN == ret) {
 | 
				
			||||||
 | 
					        ret = OB_SUCCESS; // retry
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        LOG_WARN("failed to reap old task", K(ret));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (!need_exec_new_inner_sql) {
 | 
				
			||||||
      state_finished = true;
 | 
					      state_finished = true;
 | 
				
			||||||
    } else if (OB_FAIL(send_build_single_replica_request())) {
 | 
					    } else if (OB_FAIL(send_build_single_replica_request())) {
 | 
				
			||||||
      LOG_WARN("fail to send build single replica request", K(ret));
 | 
					      LOG_WARN("fail to send build single replica request", K(ret));
 | 
				
			||||||
 | 
				
			|||||||
@ -127,7 +127,7 @@ private:
 | 
				
			|||||||
      const share::schema::ObTableSchema &index_schema,
 | 
					      const share::schema::ObTableSchema &index_schema,
 | 
				
			||||||
      const share::schema::ObIndexStatus new_status);
 | 
					      const share::schema::ObIndexStatus new_status);
 | 
				
			||||||
  int check_health();
 | 
					  int check_health();
 | 
				
			||||||
  int try_reap_old_replica_build_task();
 | 
					  int reap_old_replica_build_task(bool &need_exec_new_inner_sql);
 | 
				
			||||||
  int send_build_single_replica_request();
 | 
					  int send_build_single_replica_request();
 | 
				
			||||||
  int check_build_single_replica(bool &is_end);
 | 
					  int check_build_single_replica(bool &is_end);
 | 
				
			||||||
  int check_need_verify_checksum(bool &need_verify);
 | 
					  int check_need_verify_checksum(bool &need_verify);
 | 
				
			||||||
 | 
				
			|||||||
@ -323,7 +323,14 @@ int ObTableRedefinitionTask::table_redefinition(const ObDDLTaskStatus next_task_
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (OB_SUCC(ret) && !is_build_replica_end && 0 == build_replica_request_time_) {
 | 
					  if (OB_SUCC(ret) && !is_build_replica_end && 0 == build_replica_request_time_) {
 | 
				
			||||||
    if (OB_SUCCESS == try_reap_old_replica_build_task()) {
 | 
					    bool need_exec_new_inner_sql = false;
 | 
				
			||||||
 | 
					    if (OB_FAIL(reap_old_replica_build_task(need_exec_new_inner_sql))) {
 | 
				
			||||||
 | 
					      if (OB_EAGAIN == ret) {
 | 
				
			||||||
 | 
					        ret = OB_SUCCESS; // retry
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        LOG_WARN("failed to reap old task", K(ret));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (!need_exec_new_inner_sql) {
 | 
				
			||||||
      is_build_replica_end = true;
 | 
					      is_build_replica_end = true;
 | 
				
			||||||
    } else if (OB_FAIL(send_build_replica_request())) {
 | 
					    } else if (OB_FAIL(send_build_replica_request())) {
 | 
				
			||||||
      LOG_WARN("fail to send build replica request", K(ret));
 | 
					      LOG_WARN("fail to send build replica request", K(ret));
 | 
				
			||||||
 | 
				
			|||||||
@ -1178,7 +1178,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
 | 
				
			|||||||
  if (OB_ISNULL(root_service = GCTX.root_service_)) {
 | 
					  if (OB_ISNULL(root_service = GCTX.root_service_)) {
 | 
				
			||||||
    ret = OB_ERR_SYS;
 | 
					    ret = OB_ERR_SYS;
 | 
				
			||||||
    LOG_WARN("fail to get sql proxy, root service is null.!");
 | 
					    LOG_WARN("fail to get sql proxy, root service is null.!");
 | 
				
			||||||
  } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || trace_id.is_invalid() || !inner_sql_exec_addr.is_valid())) {
 | 
					  } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || trace_id.is_invalid())) {
 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(trace_id), K(inner_sql_exec_addr));
 | 
					    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(trace_id), K(inner_sql_exec_addr));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
@ -1240,8 +1240,6 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
 | 
				
			|||||||
        while (OB_SUCC(ret)) {
 | 
					        while (OB_SUCC(ret)) {
 | 
				
			||||||
          if (OB_FAIL(result->next())) {
 | 
					          if (OB_FAIL(result->next())) {
 | 
				
			||||||
            if (OB_ITER_END == ret) {
 | 
					            if (OB_ITER_END == ret) {
 | 
				
			||||||
              LOG_INFO("success to get result, and no inner sql task", K(ret), K(sql_string.ptr()),
 | 
					 | 
				
			||||||
                K(ip_str), K(trace_id_str), K(tenant_id), K(sql_string));
 | 
					 | 
				
			||||||
              ret = OB_SUCCESS;
 | 
					              ret = OB_SUCCESS;
 | 
				
			||||||
              break;
 | 
					              break;
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
@ -1250,8 +1248,6 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
 | 
				
			|||||||
          } else {
 | 
					          } else {
 | 
				
			||||||
            is_old_task_session_exist =  true;
 | 
					            is_old_task_session_exist =  true;
 | 
				
			||||||
            EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t);
 | 
					            EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t);
 | 
				
			||||||
            LOG_INFO("succ to match inner sql session in trace id", K(ret), K(sql_string.ptr()),
 | 
					 | 
				
			||||||
              K(session_id), K(tenant_id), K(ip_str), K(trace_id_str), K(sql_string));
 | 
					 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
@ -1659,8 +1655,8 @@ int ObCheckTabletDataComplementOp::check_tablet_checksum_update_status(
 | 
				
			|||||||
      if (report_checksum_cnt == tablet_count) {
 | 
					      if (report_checksum_cnt == tablet_count) {
 | 
				
			||||||
        is_checksums_all_report = true;
 | 
					        is_checksums_all_report = true;
 | 
				
			||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        ret = OB_EAGAIN;
 | 
					        is_checksums_all_report = false;
 | 
				
			||||||
        LOG_INFO("not all tablet has update checksum, will re-check",
 | 
					        LOG_INFO("not all tablet has update checksum",
 | 
				
			||||||
          K(ret), K(tablet_idx), K(tablet_count), K(is_checksums_all_report));
 | 
					          K(ret), K(tablet_idx), K(tablet_count), K(is_checksums_all_report));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -1728,7 +1724,7 @@ int ObCheckTabletDataComplementOp::check_finish_report_checksum(
 | 
				
			|||||||
  } else if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, is_checksums_all_report))) {
 | 
					  } else if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, index_table_id, ddl_task_id, execution_id, dest_tablet_ids, is_checksums_all_report))) {
 | 
				
			||||||
    LOG_WARN("fail to check tablet checksum update status, maybe EAGAIN", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
					    LOG_WARN("fail to check tablet checksum update status, maybe EAGAIN", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
				
			||||||
  } else if (!is_checksums_all_report) {
 | 
					  } else if (!is_checksums_all_report) {
 | 
				
			||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					    ret = OB_EAGAIN;
 | 
				
			||||||
    LOG_WARN("tablets checksum not all report!", K(is_checksums_all_report), K(ret));
 | 
					    LOG_WARN("tablets checksum not all report!", K(is_checksums_all_report), K(ret));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
@ -1761,15 +1757,12 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
 | 
				
			|||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(table_id));
 | 
					    LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(table_id));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    LOG_INFO("start to check and wait complement task", K(tenant_id), K(table_id), K(inner_sql_exec_addr), K(trace_id));
 | 
					 | 
				
			||||||
    while (OB_SUCC(ret) && is_old_task_session_exist) {
 | 
					 | 
				
			||||||
    if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, execution_id, scn, is_old_task_session_exist))) {
 | 
					    if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, execution_id, scn, is_old_task_session_exist))) {
 | 
				
			||||||
      LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
 | 
					      LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
 | 
				
			||||||
      } else if (!is_old_task_session_exist) {
 | 
					    } else if (is_old_task_session_exist) {
 | 
				
			||||||
        LOG_WARN("old inner sql session is not exist.", K(ret));
 | 
					      ret = OB_EAGAIN;
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
        usleep(10 * 1000); // sleep 10ms
 | 
					      LOG_INFO("old inner sql session is not exist.", K(ret));
 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // After old session exits, the rule of retry is specified as follows
 | 
					    // After old session exits, the rule of retry is specified as follows
 | 
				
			||||||
@ -1787,6 +1780,8 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    ObArray<ObTabletID> dest_tablet_ids;
 | 
					    ObArray<ObTabletID> dest_tablet_ids;
 | 
				
			||||||
    if (OB_FAIL(ret)) {
 | 
					    if (OB_FAIL(ret)) {
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id, table_id, dest_tablet_ids))) {
 | 
				
			||||||
 | 
					      LOG_WARN("fail to get tablets", K(ret), K(tenant_id), K(table_id));
 | 
				
			||||||
    } else if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, table_id, ddl_task_id, execution_id, dest_tablet_ids, is_dst_checksums_all_report))) {
 | 
					    } else if (OB_FAIL(check_tablet_checksum_update_status(tenant_id, table_id, ddl_task_id, execution_id, dest_tablet_ids, is_dst_checksums_all_report))) {
 | 
				
			||||||
      LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
					      LOG_WARN("fail to check tablet checksum update status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(execution_id));
 | 
				
			||||||
    } else if (is_dst_checksums_all_report) {
 | 
					    } else if (is_dst_checksums_all_report) {
 | 
				
			||||||
@ -1794,8 +1789,9 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
 | 
				
			|||||||
      LOG_INFO("no need execute because all tablet sstable has build finished", K(need_exec_new_inner_sql));
 | 
					      LOG_INFO("no need execute because all tablet sstable has build finished", K(need_exec_new_inner_sql));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					  if (OB_EAGAIN != ret) {
 | 
				
			||||||
    LOG_INFO("end to check and wait complement task", K(ret),
 | 
					    LOG_INFO("end to check and wait complement task", K(ret),
 | 
				
			||||||
      K(table_id), K(is_old_task_session_exist), K(is_dst_checksums_all_report), K(need_exec_new_inner_sql));
 | 
					      K(table_id), K(is_old_task_session_exist), K(is_dst_checksums_all_report), K(need_exec_new_inner_sql));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user