persist execution id and protect ddl start using execution id
This commit is contained in:
		
				
					committed by
					
						
						wangzelin.wzl
					
				
			
			
				
	
			
			
			
						parent
						
							798d3f50d6
						
					
				
				
					commit
					6fc4bc4a26
				
			@ -259,6 +259,7 @@ int ObDDLTask::convert_to_record(
 | 
			
		||||
  task_record.task_id_ = get_task_id();
 | 
			
		||||
  task_record.parent_task_id_ = get_parent_task_id();
 | 
			
		||||
  task_record.task_version_ = get_task_version();
 | 
			
		||||
  task_record.execution_id_ = get_execution_id();
 | 
			
		||||
  task_record.ret_code_ = get_ret_code();
 | 
			
		||||
  const ObString &ddl_stmt_str = get_ddl_stmt_str();
 | 
			
		||||
  if (serialize_param_size > 0) {
 | 
			
		||||
@ -313,7 +314,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code)
 | 
			
		||||
    LOG_WARN("start transaction failed", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t table_task_status = 0;
 | 
			
		||||
    if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status))) {
 | 
			
		||||
    int64_t execution_id = 0;
 | 
			
		||||
    if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, execution_id))) {
 | 
			
		||||
      LOG_WARN("select for update failed", K(ret), K(task_id_));
 | 
			
		||||
    } else if (old_status != task_status_) {
 | 
			
		||||
      ret = OB_EAGAIN;
 | 
			
		||||
@ -551,6 +553,60 @@ int ObDDLTask::batch_release_snapshot(
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDDLTask::check_is_latest_execution_id(const int64_t execution_id, bool &is_latest)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  is_latest = true;
 | 
			
		||||
  ObMySQLTransaction trans;
 | 
			
		||||
  ObRootService *root_service = nullptr;
 | 
			
		||||
  int64_t table_task_status = 0;
 | 
			
		||||
  int64_t table_execution_id = 0;
 | 
			
		||||
  if (OB_ISNULL(root_service = GCTX.root_service_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("error unexpected, root service must not be nullptr", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) {
 | 
			
		||||
    LOG_WARN("start transaction failed", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) {
 | 
			
		||||
      LOG_WARN("select for update failed", K(ret), K(task_id_));
 | 
			
		||||
    } else if (table_execution_id > execution_id) {
 | 
			
		||||
      is_latest = false;
 | 
			
		||||
    }
 | 
			
		||||
    trans.end(false);// abort
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDDLTask::push_execution_id()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObMySQLTransaction trans;
 | 
			
		||||
  ObRootService *root_service = nullptr;
 | 
			
		||||
  int64_t table_task_status = 0;
 | 
			
		||||
  int64_t table_execution_id = 0;
 | 
			
		||||
  if (OB_ISNULL(root_service = GCTX.root_service_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("error unexpected, root service must not be nullptr", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), tenant_id_))) {
 | 
			
		||||
    LOG_WARN("start transaction failed", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) {
 | 
			
		||||
      LOG_WARN("select for update failed", K(ret), K(task_id_));
 | 
			
		||||
    } else if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id_, task_id_, table_execution_id + 1))) {
 | 
			
		||||
      LOG_WARN("update task status failed", K(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      execution_id_ = table_execution_id + 1;
 | 
			
		||||
    }
 | 
			
		||||
    bool commit = (OB_SUCCESS == ret);
 | 
			
		||||
    int tmp_ret = trans.end(commit);
 | 
			
		||||
    if (OB_SUCCESS != tmp_ret) {
 | 
			
		||||
      LOG_WARN("fail to end trans", K(tmp_ret));
 | 
			
		||||
      ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObDDLTask::calc_next_schedule_ts(int ret_code)
 | 
			
		||||
{
 | 
			
		||||
  if (OB_TIMEOUT == ret_code) {
 | 
			
		||||
@ -1382,7 +1438,8 @@ bool ObDDLTaskRecord::is_valid() const
 | 
			
		||||
    && task_version_ > 0
 | 
			
		||||
    && OB_INVALID_ID != object_id_
 | 
			
		||||
    && schema_version_ > 0
 | 
			
		||||
    && ret_code_ >= 0;
 | 
			
		||||
    && ret_code_ >= 0
 | 
			
		||||
    && execution_id_ >= 0;
 | 
			
		||||
  return is_valid;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1401,6 +1458,7 @@ void ObDDLTaskRecord::reset()
 | 
			
		||||
  message_.reset();
 | 
			
		||||
  task_version_ = 0;
 | 
			
		||||
  ret_code_ = OB_SUCCESS;
 | 
			
		||||
  execution_id_ = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1477,6 +1535,30 @@ int ObDDLTaskRecordOperator::update_ret_code(
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDDLTaskRecordOperator::update_execution_id(
 | 
			
		||||
    common::ObISQLClient &sql_client,
 | 
			
		||||
    const uint64_t tenant_id,
 | 
			
		||||
    const int64_t task_id,
 | 
			
		||||
    const int64_t execution_id)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObSqlString sql_string;
 | 
			
		||||
  int64_t affected_rows = 0;
 | 
			
		||||
  if (OB_ISNULL(sql_client.get_pool()) || OB_UNLIKELY(task_id <= 0 || tenant_id <= 0 || execution_id <= 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid arg", K(ret), K(tenant_id), K(task_id));
 | 
			
		||||
  } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET execution_id=%lu WHERE task_id=%lu ",
 | 
			
		||||
          OB_ALL_DDL_TASK_STATUS_TNAME, execution_id, task_id))) {
 | 
			
		||||
    LOG_WARN("assign sql string failed", K(ret), K(execution_id), K(task_id));
 | 
			
		||||
  } else if (OB_FAIL(sql_client.write(tenant_id, sql_string.ptr(), affected_rows))) {
 | 
			
		||||
    LOG_WARN("update snapshot_version of ddl task record failed", K(ret), K(sql_string));
 | 
			
		||||
  } else if (OB_UNLIKELY(affected_rows < 0)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDDLTaskRecordOperator::update_message(
 | 
			
		||||
    common::ObISQLClient &proxy,
 | 
			
		||||
    const uint64_t tenant_id,
 | 
			
		||||
@ -1541,7 +1623,7 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint(
 | 
			
		||||
    SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
			
		||||
      sqlclient::ObMySQLResult *result = NULL;
 | 
			
		||||
      if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version,"
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
 | 
			
		||||
          "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
 | 
			
		||||
          "WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
 | 
			
		||||
          object_id, DDL_CHECK_CONSTRAINT, DDL_FOREIGN_KEY_CONSTRAINT, DDL_ADD_NOT_NULL_COLUMN))) {
 | 
			
		||||
@ -1633,7 +1715,7 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl(
 | 
			
		||||
    SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
			
		||||
      sqlclient::ObMySQLResult *result = nullptr;
 | 
			
		||||
      if (OB_FAIL(sql_string.assign_fmt("SELECT tenant_id, task_id, object_id, target_object_id, ddl_type,"
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version,"
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id,"
 | 
			
		||||
          "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
 | 
			
		||||
          "WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
 | 
			
		||||
          tenant_id, table_id))) {
 | 
			
		||||
@ -1689,7 +1771,7 @@ int ObDDLTaskRecordOperator::get_all_record(
 | 
			
		||||
    SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
			
		||||
      sqlclient::ObMySQLResult *result = NULL;
 | 
			
		||||
      if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version,"
 | 
			
		||||
          "schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
 | 
			
		||||
          "UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) {
 | 
			
		||||
        LOG_WARN("assign sql string failed", K(ret));
 | 
			
		||||
      } else if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
 | 
			
		||||
@ -1758,11 +1840,11 @@ int ObDDLTaskRecordOperator::insert_record(
 | 
			
		||||
    } else if (OB_FAIL(to_hex_str(record.message_, message_string))) {
 | 
			
		||||
      LOG_WARN("append hex escaped string failed", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(sql_string.assign_fmt(
 | 
			
		||||
            " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ret_code, ddl_stmt_str, message) "
 | 
			
		||||
            " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, '%.*s', \"%.*s\") ",
 | 
			
		||||
            " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, execution_id, ret_code, ddl_stmt_str, message) "
 | 
			
		||||
            " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, %lu, '%.*s', \"%.*s\") ",
 | 
			
		||||
            OB_ALL_DDL_TASK_STATUS_TNAME, record.task_id_, record.parent_task_id_,
 | 
			
		||||
            ObSchemaUtils::get_extract_tenant_id(record.tenant_id_, record.tenant_id_), record.object_id_, record.schema_version_,
 | 
			
		||||
            get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.ret_code_,
 | 
			
		||||
            get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.execution_id_, record.ret_code_,
 | 
			
		||||
            static_cast<int>(ddl_stmt_string.length()), ddl_stmt_string.ptr(), static_cast<int>(message_string.length()), message_string.ptr()))) {
 | 
			
		||||
      LOG_WARN("assign sql string failed", K(ret), K(record));
 | 
			
		||||
    } else if (OB_FAIL(proxy.write(record.tenant_id_, sql_string.ptr(), affected_rows))) {
 | 
			
		||||
@ -1803,6 +1885,7 @@ int ObDDLTaskRecordOperator::fill_task_record(
 | 
			
		||||
    EXTRACT_UINT_FIELD_MYSQL(*result_row, "snapshot_version", task_record.snapshot_version_, uint64_t);
 | 
			
		||||
    EXTRACT_INT_FIELD_MYSQL(*result_row, "task_version", task_record.task_version_, int64_t);
 | 
			
		||||
    EXTRACT_INT_FIELD_MYSQL(*result_row, "ret_code", task_record.ret_code_, int64_t);
 | 
			
		||||
    EXTRACT_INT_FIELD_MYSQL(*result_row, "execution_id", task_record.execution_id_, int64_t);
 | 
			
		||||
    EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message);
 | 
			
		||||
    EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str);
 | 
			
		||||
    if (OB_SUCC(ret)) {
 | 
			
		||||
@ -1849,18 +1932,20 @@ int ObDDLTaskRecordOperator::select_for_update(
 | 
			
		||||
    common::ObMySQLTransaction &trans,
 | 
			
		||||
    const uint64_t tenant_id,
 | 
			
		||||
    const int64_t task_id,
 | 
			
		||||
    int64_t &task_status)
 | 
			
		||||
    int64_t &task_status,
 | 
			
		||||
    int64_t &execution_id)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObSqlString sql_string;
 | 
			
		||||
  task_status = 0;
 | 
			
		||||
  execution_id = 0;
 | 
			
		||||
  if (OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", K(ret), K(tenant_id), K(task_id));
 | 
			
		||||
  } else {
 | 
			
		||||
    SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
			
		||||
      sqlclient::ObMySQLResult *result = NULL;
 | 
			
		||||
      if (OB_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu FOR UPDATE",
 | 
			
		||||
      if (OB_FAIL(sql_string.assign_fmt("SELECT status, execution_id FROM %s WHERE task_id = %lu FOR UPDATE",
 | 
			
		||||
          OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
 | 
			
		||||
        LOG_WARN("assign sql string failed", K(ret), K(task_id), K(tenant_id));
 | 
			
		||||
      } else if (OB_FAIL(trans.read(res, tenant_id, sql_string.ptr()))) {
 | 
			
		||||
@ -1872,6 +1957,7 @@ int ObDDLTaskRecordOperator::select_for_update(
 | 
			
		||||
        LOG_WARN("fail to get next row", K(ret));
 | 
			
		||||
      } else {
 | 
			
		||||
        EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int64_t);
 | 
			
		||||
        EXTRACT_INT_FIELD_MYSQL(*result, "execution_id", execution_id, int64_t);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user