revert create table wait leader
This commit is contained in:
		@ -10834,7 +10834,6 @@ int ObDDLService::construct_create_partition_creator(const common::ObPartitionKe
 | 
				
			|||||||
          // there is no persistent member list, standby_restore will have problems
 | 
					          // there is no persistent member list, standby_restore will have problems
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
          int64_t restore = REPLICA_NOT_RESTORE;
 | 
					          int64_t restore = REPLICA_NOT_RESTORE;
 | 
				
			||||||
          common::ObRole role = FOLLOWER;
 | 
					 | 
				
			||||||
          if (OB_CREATE_TABLE_MODE_RESTORE == create_mode &&
 | 
					          if (OB_CREATE_TABLE_MODE_RESTORE == create_mode &&
 | 
				
			||||||
              ObReplicaTypeCheck::is_replica_with_ssstore(a->replica_type_)) {
 | 
					              ObReplicaTypeCheck::is_replica_with_ssstore(a->replica_type_)) {
 | 
				
			||||||
            // logical restore
 | 
					            // logical restore
 | 
				
			||||||
@ -10849,9 +10848,7 @@ int ObDDLService::construct_create_partition_creator(const common::ObPartitionKe
 | 
				
			|||||||
          } else {
 | 
					          } else {
 | 
				
			||||||
            restore = REPLICA_NOT_RESTORE;
 | 
					            restore = REPLICA_NOT_RESTORE;
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
          if (OB_FAIL(set_flag_role(a->initial_leader_, is_standby, restore, table_id, role))) {
 | 
					          if (OB_FAIL(fill_create_partition_arg(table_id,
 | 
				
			||||||
            LOG_WARN("fail to set flag role", KR(ret), K(is_standby), K(restore), K(table_id), K(role));
 | 
					 | 
				
			||||||
          } else if (OB_FAIL(fill_create_partition_arg(table_id,
 | 
					 | 
				
			||||||
                  partition_cnt,
 | 
					                  partition_cnt,
 | 
				
			||||||
                  paxos_replica_num,
 | 
					                  paxos_replica_num,
 | 
				
			||||||
                  non_paxos_replica_num,
 | 
					                  non_paxos_replica_num,
 | 
				
			||||||
@ -10874,7 +10871,7 @@ int ObDDLService::construct_create_partition_creator(const common::ObPartitionKe
 | 
				
			|||||||
                *a,
 | 
					                *a,
 | 
				
			||||||
                "timestamp",
 | 
					                "timestamp",
 | 
				
			||||||
                now);
 | 
					                now);
 | 
				
			||||||
          } else if (OB_FAIL(fill_flag_replica(table_id, partition_cnt, partition_id, arg, *a, role, flag_replica))) {
 | 
					          } else if (OB_FAIL(fill_flag_replica(table_id, partition_cnt, partition_id, arg, *a, flag_replica))) {
 | 
				
			||||||
            LOG_WARN("fail to fill flag replica",
 | 
					            LOG_WARN("fail to fill flag replica",
 | 
				
			||||||
                K(ret),
 | 
					                K(ret),
 | 
				
			||||||
                K(table_id),
 | 
					                K(table_id),
 | 
				
			||||||
@ -11030,27 +11027,8 @@ int ObDDLService::fill_create_partition_arg(const uint64_t table_id, const int64
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObDDLService::set_flag_role(const bool initial_leader, const bool is_standby, const int64_t restore,
 | 
					 | 
				
			||||||
    const uint64_t table_id, common::ObRole& role)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					 | 
				
			||||||
  if (OB_FAIL(check_inner_stat())) {
 | 
					 | 
				
			||||||
    LOG_WARN("variable is not init", KR(ret));
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    if (initial_leader && !is_standby && REPLICA_NOT_RESTORE == restore && !is_sys_table(table_id)) {
 | 
					 | 
				
			||||||
      // When setting the flag role, only the non-restore leader of the primary cluster is considered,
 | 
					 | 
				
			||||||
      // and the rest are reported normally
 | 
					 | 
				
			||||||
      role = LEADER;
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      role = FOLLOWER;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
  return ret;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObDDLService::fill_flag_replica(const uint64_t table_id, const int64_t partition_cnt, const int64_t partition_id,
 | 
					int ObDDLService::fill_flag_replica(const uint64_t table_id, const int64_t partition_cnt, const int64_t partition_id,
 | 
				
			||||||
    const ObCreatePartitionArg& arg, const ObReplicaAddr& replica_addr, const common::ObRole role,
 | 
					    const ObCreatePartitionArg& arg, const ObReplicaAddr& replica_addr, share::ObPartitionReplica& flag_replica)
 | 
				
			||||||
    share::ObPartitionReplica& flag_replica)
 | 
					 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  if (OB_FAIL(check_inner_stat())) {
 | 
					  if (OB_FAIL(check_inner_stat())) {
 | 
				
			||||||
@ -11088,7 +11066,6 @@ int ObDDLService::fill_flag_replica(const uint64_t table_id, const int64_t parti
 | 
				
			|||||||
          replica_addr.replica_type_);
 | 
					          replica_addr.replica_type_);
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      flag_replica.is_restore_ = arg.restore_;
 | 
					      flag_replica.is_restore_ = arg.restore_;
 | 
				
			||||||
      flag_replica.role_ = role;
 | 
					 | 
				
			||||||
      if (arg.ignore_member_list_) {
 | 
					      if (arg.ignore_member_list_) {
 | 
				
			||||||
        // not fill member list
 | 
					        // not fill member list
 | 
				
			||||||
      } else if (REPLICA_RESTORE_DATA == flag_replica.is_restore_ ||
 | 
					      } else if (REPLICA_RESTORE_DATA == flag_replica.is_restore_ ||
 | 
				
			||||||
@ -15121,7 +15098,6 @@ int ObDDLService::create_tablegroup_partitions_for_create(const share::schema::O
 | 
				
			|||||||
  int64_t non_paxos_replica_num = OB_INVALID_COUNT;
 | 
					  int64_t non_paxos_replica_num = OB_INVALID_COUNT;
 | 
				
			||||||
  const uint64_t tenant_id = tablegroup_schema.get_tenant_id();
 | 
					  const uint64_t tenant_id = tablegroup_schema.get_tenant_id();
 | 
				
			||||||
  const uint64_t tablegroup_id = tablegroup_schema.get_tablegroup_id();
 | 
					  const uint64_t tablegroup_id = tablegroup_schema.get_tablegroup_id();
 | 
				
			||||||
  bool is_standby = false;
 | 
					 | 
				
			||||||
  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || !frozen_status.is_valid())) {
 | 
					  if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || !frozen_status.is_valid())) {
 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(frozen_status));
 | 
					    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(frozen_status));
 | 
				
			||||||
@ -15139,8 +15115,6 @@ int ObDDLService::create_tablegroup_partitions_for_create(const share::schema::O
 | 
				
			|||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
    LOG_WARN(
 | 
					    LOG_WARN(
 | 
				
			||||||
        "array count unexpected", K(ret), "tg_addr_count", tablegroup_addr.count(), "part_ids_count", part_ids.count());
 | 
					        "array count unexpected", K(ret), "tg_addr_count", tablegroup_addr.count(), "part_ids_count", part_ids.count());
 | 
				
			||||||
  } else if (OB_FAIL(get_is_standby_cluster(is_standby))) {
 | 
					 | 
				
			||||||
    LOG_WARN("fail to get cluster", KR(ret), K(is_standby), K(tablegroup_schema));
 | 
					 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    obrpc::ObCreatePartitionArg arg;
 | 
					    obrpc::ObCreatePartitionArg arg;
 | 
				
			||||||
    share::ObSplitPartition split_info;
 | 
					    share::ObSplitPartition split_info;
 | 
				
			||||||
@ -15165,7 +15139,6 @@ int ObDDLService::create_tablegroup_partitions_for_create(const share::schema::O
 | 
				
			|||||||
        FOREACH_CNT_X(a, part_addr, OB_SUCC(ret))
 | 
					        FOREACH_CNT_X(a, part_addr, OB_SUCC(ret))
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
          ObPartitionReplica flag_replica;
 | 
					          ObPartitionReplica flag_replica;
 | 
				
			||||||
          common::ObRole role = FOLLOWER;
 | 
					 | 
				
			||||||
          if (OB_UNLIKELY(nullptr == a || nullptr == rpc_proxy_)) {
 | 
					          if (OB_UNLIKELY(nullptr == a || nullptr == rpc_proxy_)) {
 | 
				
			||||||
            ret = OB_ERR_UNEXPECTED;
 | 
					            ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
            LOG_WARN("addr or rpc_proxy is null", K(ret));
 | 
					            LOG_WARN("addr or rpc_proxy is null", K(ret));
 | 
				
			||||||
@ -15181,9 +15154,7 @@ int ObDDLService::create_tablegroup_partitions_for_create(const share::schema::O
 | 
				
			|||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
              restore = REPLICA_NOT_RESTORE;
 | 
					              restore = REPLICA_NOT_RESTORE;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            if (OB_FAIL(set_flag_role(a->initial_leader_, is_standby, restore, tablegroup_id, role))) {
 | 
					            if (OB_FAIL(fill_create_partition_arg(tablegroup_id,
 | 
				
			||||||
              LOG_WARN("fail to set flag role", KR(ret));
 | 
					 | 
				
			||||||
            } else if (OB_FAIL(fill_create_partition_arg(tablegroup_id,
 | 
					 | 
				
			||||||
                    tablegroup_schema.get_partition_cnt(),
 | 
					                    tablegroup_schema.get_partition_cnt(),
 | 
				
			||||||
                    paxos_replica_num,
 | 
					                    paxos_replica_num,
 | 
				
			||||||
                    non_paxos_replica_num,
 | 
					                    non_paxos_replica_num,
 | 
				
			||||||
@ -15201,7 +15172,6 @@ int ObDDLService::create_tablegroup_partitions_for_create(const share::schema::O
 | 
				
			|||||||
                           partition_id,
 | 
					                           partition_id,
 | 
				
			||||||
                           arg,
 | 
					                           arg,
 | 
				
			||||||
                           *a,
 | 
					                           *a,
 | 
				
			||||||
                           role,
 | 
					 | 
				
			||||||
                           flag_replica))) {
 | 
					                           flag_replica))) {
 | 
				
			||||||
              LOG_WARN("fail to fill flag replica", K(ret), K(tablegroup_id));
 | 
					              LOG_WARN("fail to fill flag replica", K(ret), K(tablegroup_id));
 | 
				
			||||||
            } else if (OB_FAIL(creator.add_flag_replica(flag_replica))) {
 | 
					            } else if (OB_FAIL(creator.add_flag_replica(flag_replica))) {
 | 
				
			||||||
@ -15266,7 +15236,6 @@ int ObDDLService::create_partitions_for_physical_restore(ObSchemaGetterGuard& sc
 | 
				
			|||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  const uint64_t schema_id = restore_arg.schema_id_;
 | 
					  const uint64_t schema_id = restore_arg.schema_id_;
 | 
				
			||||||
  const ObCreateTableMode create_mode = ObCreateTableMode::OB_CREATE_TABLE_MODE_PHYSICAL_RESTORE;
 | 
					  const ObCreateTableMode create_mode = ObCreateTableMode::OB_CREATE_TABLE_MODE_PHYSICAL_RESTORE;
 | 
				
			||||||
  bool is_standby = false;
 | 
					 | 
				
			||||||
  if (OB_FAIL(check_inner_stat())) {
 | 
					  if (OB_FAIL(check_inner_stat())) {
 | 
				
			||||||
    LOG_WARN("variable is not init", K(ret));
 | 
					    LOG_WARN("variable is not init", K(ret));
 | 
				
			||||||
  } else if (!restore_arg.is_valid() || addrs.count() <= 0 || last_schema_version <= OB_CORE_SCHEMA_VERSION) {
 | 
					  } else if (!restore_arg.is_valid() || addrs.count() <= 0 || last_schema_version <= OB_CORE_SCHEMA_VERSION) {
 | 
				
			||||||
@ -15275,8 +15244,6 @@ int ObDDLService::create_partitions_for_physical_restore(ObSchemaGetterGuard& sc
 | 
				
			|||||||
  } else if (!partition_schema.has_self_partition()) {
 | 
					  } else if (!partition_schema.has_self_partition()) {
 | 
				
			||||||
    ret = OB_NOT_SUPPORTED;
 | 
					    ret = OB_NOT_SUPPORTED;
 | 
				
			||||||
    LOG_WARN("table/tablegroup should has self partition", K(ret), K(schema_id));
 | 
					    LOG_WARN("table/tablegroup should has self partition", K(ret), K(schema_id));
 | 
				
			||||||
  } else if (OB_FAIL(get_is_standby_cluster(is_standby))) {
 | 
					 | 
				
			||||||
    LOG_WARN("fail to get standby cluster", KR(ret), K(partition_schema));
 | 
					 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    ObPartIdsGenerator gen(partition_schema);
 | 
					    ObPartIdsGenerator gen(partition_schema);
 | 
				
			||||||
    ObArray<int64_t> part_ids;
 | 
					    ObArray<int64_t> part_ids;
 | 
				
			||||||
@ -15328,12 +15295,9 @@ int ObDDLService::create_partitions_for_physical_restore(ObSchemaGetterGuard& sc
 | 
				
			|||||||
          FOREACH_CNT_X(a, part_addr, OB_SUCC(ret))
 | 
					          FOREACH_CNT_X(a, part_addr, OB_SUCC(ret))
 | 
				
			||||||
          {
 | 
					          {
 | 
				
			||||||
            ObPartitionReplica flag_replica;
 | 
					            ObPartitionReplica flag_replica;
 | 
				
			||||||
            common::ObRole role = FOLLOWER;
 | 
					 | 
				
			||||||
            if (OB_UNLIKELY(nullptr == a || nullptr == rpc_proxy_)) {
 | 
					            if (OB_UNLIKELY(nullptr == a || nullptr == rpc_proxy_)) {
 | 
				
			||||||
              ret = OB_ERR_UNEXPECTED;
 | 
					              ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
              LOG_WARN("addr or rpc_proxy is null", K(ret));
 | 
					              LOG_WARN("addr or rpc_proxy is null", K(ret));
 | 
				
			||||||
            } else if (OB_FAIL(set_flag_role(a->initial_leader_, is_standby, restore, partition_id, role))) {
 | 
					 | 
				
			||||||
              LOG_WARN("fail to set flag role", KR(ret));
 | 
					 | 
				
			||||||
            } else if (OB_FAIL(fill_create_partition_arg(schema_id,
 | 
					            } else if (OB_FAIL(fill_create_partition_arg(schema_id,
 | 
				
			||||||
                           partition_schema.get_partition_cnt(),
 | 
					                           partition_schema.get_partition_cnt(),
 | 
				
			||||||
                           paxos_replica_num,
 | 
					                           paxos_replica_num,
 | 
				
			||||||
@ -15347,13 +15311,8 @@ int ObDDLService::create_partitions_for_physical_restore(ObSchemaGetterGuard& sc
 | 
				
			|||||||
                           frozen_status,
 | 
					                           frozen_status,
 | 
				
			||||||
                           arg))) {
 | 
					                           arg))) {
 | 
				
			||||||
              LOG_WARN("fail to fill ObCreatePartitionArg", K(ret), K(schema_id));
 | 
					              LOG_WARN("fail to fill ObCreatePartitionArg", K(ret), K(schema_id));
 | 
				
			||||||
            } else if (OB_FAIL(fill_flag_replica(schema_id,
 | 
					            } else if (OB_FAIL(fill_flag_replica(
 | 
				
			||||||
                           partition_schema.get_partition_cnt(),
 | 
					                           schema_id, partition_schema.get_partition_cnt(), partition_id, arg, *a, flag_replica))) {
 | 
				
			||||||
                           partition_id,
 | 
					 | 
				
			||||||
                           arg,
 | 
					 | 
				
			||||||
                           *a,
 | 
					 | 
				
			||||||
                           role,
 | 
					 | 
				
			||||||
                           flag_replica))) {
 | 
					 | 
				
			||||||
              LOG_WARN("fail to fill flag replica", K(ret), K(schema_id));
 | 
					              LOG_WARN("fail to fill flag replica", K(ret), K(schema_id));
 | 
				
			||||||
            } else if (OB_FAIL(creator.add_flag_replica(flag_replica))) {
 | 
					            } else if (OB_FAIL(creator.add_flag_replica(flag_replica))) {
 | 
				
			||||||
              LOG_WARN("fail to add flag replica to partition creator", K(ret), K(flag_replica));
 | 
					              LOG_WARN("fail to add flag replica to partition creator", K(ret), K(flag_replica));
 | 
				
			||||||
 | 
				
			|||||||
@ -802,10 +802,8 @@ private:
 | 
				
			|||||||
      const int64_t non_paxos_replica_num, const int64_t partition_id, const ObReplicaAddr& replica_addr,
 | 
					      const int64_t non_paxos_replica_num, const int64_t partition_id, const ObReplicaAddr& replica_addr,
 | 
				
			||||||
      const int64_t lease_start_ts, const bool is_bootstrap, const bool is_standby, const int64_t restore,
 | 
					      const int64_t lease_start_ts, const bool is_bootstrap, const bool is_standby, const int64_t restore,
 | 
				
			||||||
      const share::ObSimpleFrozenStatus& frozen_status, obrpc::ObCreatePartitionArg& arg);
 | 
					      const share::ObSimpleFrozenStatus& frozen_status, obrpc::ObCreatePartitionArg& arg);
 | 
				
			||||||
  int set_flag_role(const bool initial_leader, const bool is_standby, const int64_t restore, const uint64_t table_id,
 | 
					 | 
				
			||||||
      common::ObRole& role);
 | 
					 | 
				
			||||||
  int fill_flag_replica(const uint64_t table_id, const int64_t partition_cnt, const int64_t partition_id,
 | 
					  int fill_flag_replica(const uint64_t table_id, const int64_t partition_cnt, const int64_t partition_id,
 | 
				
			||||||
      const obrpc::ObCreatePartitionArg& arg, const ObReplicaAddr& replica_addr, const common::ObRole role,
 | 
					      const obrpc::ObCreatePartitionArg& arg, const ObReplicaAddr& replica_addr,
 | 
				
			||||||
      share::ObPartitionReplica& flag_replica);
 | 
					      share::ObPartitionReplica& flag_replica);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  int try_modify_tenant_primary_zone_entity_count(common::ObMySQLTransaction& trans,
 | 
					  int try_modify_tenant_primary_zone_entity_count(common::ObMySQLTransaction& trans,
 | 
				
			||||||
 | 
				
			|||||||
@ -568,22 +568,12 @@ int ObPersistentPartitionTable::update(const ObPartitionReplica& replica)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        if (OB_SUCC(ret)) {
 | 
					        if (OB_SUCC(ret)) {
 | 
				
			||||||
          if (new_replica.is_leader_like()) {
 | 
					          if (new_replica.is_leader_like()) {
 | 
				
			||||||
            if (!new_replica.is_flag_replica()) {
 | 
					 | 
				
			||||||
            if (new_replica.data_version_ <= 0) {
 | 
					            if (new_replica.data_version_ <= 0) {
 | 
				
			||||||
              ret = OB_INVALID_ARGUMENT;
 | 
					              ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
              LOG_WARN("data_version must > 0 for leader replica", K(ret), K(new_replica));
 | 
					              LOG_WARN("data_version must > 0 for leader replica", K(ret), K(new_replica));
 | 
				
			||||||
            } else if (new_replica.to_leader_time_ <= 0) {
 | 
					            } else if (new_replica.to_leader_time_ <= 0) {
 | 
				
			||||||
              ret = OB_INVALID_ARGUMENT;
 | 
					              ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
              LOG_WARN("change to leader time must > 0 for leader replica", K(ret), K(new_replica));
 | 
					              LOG_WARN("change to leader time must > 0 for leader replica", K(ret), K(new_replica));
 | 
				
			||||||
              }
 | 
					 | 
				
			||||||
            } else {
 | 
					 | 
				
			||||||
              // flag replica with only the role of leader, to_leader_time_ and data_version_ are not set
 | 
					 | 
				
			||||||
              // flag is set only by rs,observer shall not set the flag replica
 | 
					 | 
				
			||||||
              // observer will update the partition table asynchronously by invoking
 | 
					 | 
				
			||||||
              // ObPGPartitionMTUpdateOperator::batch_update
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            if (OB_FAIL(ret)) {
 | 
					 | 
				
			||||||
              // nothing todo
 | 
					 | 
				
			||||||
            } else if (OB_FAIL(update_leader_replica(*proxy, partition, new_replica))) {
 | 
					            } else if (OB_FAIL(update_leader_replica(*proxy, partition, new_replica))) {
 | 
				
			||||||
              LOG_WARN("update leader replica failed", K(ret), K(new_replica));
 | 
					              LOG_WARN("update leader replica failed", K(ret), K(new_replica));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user