[feature](replica version) Add admin set replica version statement (#23706)

This commit is contained in:
yujun
2023-09-14 21:12:00 +08:00
committed by GitHub
parent d20365cdcf
commit 07720d3ff9
14 changed files with 749 additions and 0 deletions

View File

@ -7236,6 +7236,10 @@ admin_stmt ::=
{:
RESULT = new AdminSetReplicaStatusStmt(prop);
:}
| KW_ADMIN KW_SET KW_REPLICA KW_VERSION KW_PROPERTIES LPAREN key_value_map:prop RPAREN
{:
RESULT = new AdminSetReplicaVersionStmt(prop);
:}
| KW_ADMIN KW_REPAIR KW_TABLE base_table_ref:table_ref
{:
RESULT = new AdminRepairTableStmt(table_ref);

View File

@ -0,0 +1,151 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import java.util.Map;
/*
* admin set replicas status properties ("key" = "val", ..);
* Required:
* "tablet_id" = "10010",
* "backend_id" = "10001",
* Optional:
* "version" = "100",
* "last_success_version" = "100",
* "last_failed_version" = "-1",
*/
public class AdminSetReplicaVersionStmt extends DdlStmt {
public static final String TABLET_ID = "tablet_id";
public static final String BACKEND_ID = "backend_id";
public static final String VERSION = "version";
public static final String LAST_SUCCESS_VERSION = "last_success_version";
public static final String LAST_FAILED_VERSION = "last_failed_version";
private Map<String, String> properties;
private long tabletId = -1;
private long backendId = -1;
private Long version = null;
private Long lastSuccessVersion = null;
private Long lastFailedVersion = null;
public AdminSetReplicaVersionStmt(Map<String, String> properties) {
this.properties = properties;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
checkProperties();
}
private void checkProperties() throws AnalysisException {
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
if (key.equalsIgnoreCase(TABLET_ID)) {
try {
tabletId = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid tablet id format: " + val);
}
} else if (key.equalsIgnoreCase(BACKEND_ID)) {
try {
backendId = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid backend id format: " + val);
}
} else if (key.equalsIgnoreCase(VERSION)) {
try {
version = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid version format: " + val);
}
if (version <= 0) {
throw new AnalysisException("Required version > 0");
}
} else if (key.equalsIgnoreCase(LAST_SUCCESS_VERSION)) {
try {
lastSuccessVersion = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid last success version format: " + val);
}
if (lastSuccessVersion <= 0) {
throw new AnalysisException("Required last success version > 0");
}
} else if (key.equalsIgnoreCase(LAST_FAILED_VERSION)) {
try {
lastFailedVersion = Long.valueOf(val);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid last failed version format: " + val);
}
if (lastFailedVersion <= 0 && lastFailedVersion != -1) {
throw new AnalysisException("Required last failed version > 0 or == -1");
}
} else {
throw new AnalysisException("Unknown property: " + key);
}
}
if (tabletId == -1 || backendId == -1
|| (version == null && lastSuccessVersion == null && lastFailedVersion == null)) {
throw new AnalysisException("Should add following properties: TABLET_ID, BACKEND_ID, "
+ "VERSION, LAST_SUCCESS_VERSION, LAST_FAILED_VERSION");
}
}
public long getTabletId() {
return tabletId;
}
public long getBackendId() {
return backendId;
}
public Long getVersion() {
return version;
}
public Long getLastSuccessVersion() {
return lastSuccessVersion;
}
public Long getLastFailedVersion() {
return lastFailedVersion;
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.doris.analysis.AdminCompactTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
import org.apache.doris.analysis.AdminSetPartitionVersionStmt;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AdminSetReplicaVersionStmt;
import org.apache.doris.analysis.AdminSetTableStatusStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
@ -195,6 +196,7 @@ import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.SetPartitionVersionOperationLog;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
import org.apache.doris.persist.SetReplicaVersionOperationLog;
import org.apache.doris.persist.SetTableStatusOperationLog;
import org.apache.doris.persist.Storage;
import org.apache.doris.persist.StorageInfo;
@ -5417,6 +5419,56 @@ public class Env {
}
}
// Set specified replica's version. If replica does not exist, just ignore it.
public void setReplicaVersion(AdminSetReplicaVersionStmt stmt) throws MetaNotFoundException {
long tabletId = stmt.getTabletId();
long backendId = stmt.getBackendId();
Long version = stmt.getVersion();
Long lastSuccessVersion = stmt.getLastSuccessVersion();
Long lastFailedVersion = stmt.getLastFailedVersion();
long updateTime = System.currentTimeMillis();
setReplicaVersionInternal(tabletId, backendId, version, lastSuccessVersion, lastFailedVersion,
updateTime, false);
}
public void replaySetReplicaVersion(SetReplicaVersionOperationLog log) throws MetaNotFoundException {
setReplicaVersionInternal(log.getTabletId(), log.getBackendId(), log.getVersion(),
log.getLastSuccessVersion(), log.getLastFailedVersion(), log.getUpdateTime(), true);
}
private void setReplicaVersionInternal(long tabletId, long backendId, Long version, Long lastSuccessVersion,
Long lastFailedVersion, long updateTime, boolean isReplay)
throws MetaNotFoundException {
try {
TabletMeta meta = tabletInvertedIndex.getTabletMeta(tabletId);
if (meta == null) {
throw new MetaNotFoundException("tablet does not exist");
}
Database db = getInternalCatalog().getDbOrMetaException(meta.getDbId());
Table table = db.getTableOrMetaException(meta.getTableId());
table.writeLockOrMetaException();
try {
Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId);
if (replica == null) {
throw new MetaNotFoundException("replica does not exist on backend, beId=" + backendId);
}
replica.adminUpdateVersionInfo(version, lastFailedVersion, lastSuccessVersion, updateTime);
if (!isReplay) {
SetReplicaVersionOperationLog log = new SetReplicaVersionOperationLog(backendId, tabletId,
version, lastSuccessVersion, lastFailedVersion, updateTime);
getEditLog().logSetReplicaVersion(log);
}
LOG.info("set replica {} of tablet {} on backend {} as version {}, last success version {} ,"
+ ", last failed version {}, update time {}. is replay: {}", replica.getId(), tabletId,
backendId, version, lastSuccessVersion, lastFailedVersion, updateTime, isReplay);
} finally {
table.writeUnlock();
}
} catch (MetaNotFoundException e) {
throw new MetaNotFoundException("set replica version failed, tabletId=" + tabletId, e);
}
}
public void eraseDatabase(long dbId, boolean needEditLog) {
// remove jobs
Env.getCurrentEnv().getLoadInstance().removeDbLoadJob(dbId);

View File

@ -298,6 +298,34 @@ public class Replica implements Writable {
updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount);
}
public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion,
long updateTime) {
if (version != null) {
this.version = version;
}
if (lastSuccessVersion != null) {
this.lastSuccessVersion = lastSuccessVersion;
}
if (lastFailedVersion != null) {
if (this.lastFailedVersion < lastFailedVersion) {
this.lastFailedTimestamp = updateTime;
}
this.lastFailedVersion = lastFailedVersion;
}
if (this.lastFailedVersion < this.version) {
this.lastFailedVersion = -1;
this.lastFailedTimestamp = -1;
this.lastFailedVersionHash = 0;
}
if (this.lastFailedVersion > 0
&& this.lastSuccessVersion > this.lastFailedVersion) {
this.lastSuccessVersion = this.version;
}
if (this.lastSuccessVersion < this.version) {
this.lastSuccessVersion = this.version;
}
}
/* last failed version: LFV
* last success version: LSV
* version: V

View File

@ -109,6 +109,7 @@ import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.persist.SetPartitionVersionOperationLog;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
import org.apache.doris.persist.SetReplicaVersionOperationLog;
import org.apache.doris.persist.SetTableStatusOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
@ -630,6 +631,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_SET_REPLICA_VERSION: {
data = SetReplicaVersionOperationLog.read(in);
isRead = true;
break;
}
case OperationType.OP_SET_PARTITION_VERSION: {
data = SetPartitionVersionOperationLog.read(in);
isRead = true;

View File

@ -821,6 +821,11 @@ public class EditLog {
env.replaySetReplicaStatus(log);
break;
}
case OperationType.OP_SET_REPLICA_VERSION: {
SetReplicaVersionOperationLog log = (SetReplicaVersionOperationLog) journal.getData();
env.replaySetReplicaVersion(log);
break;
}
case OperationType.OP_REMOVE_ALTER_JOB_V2: {
RemoveAlterJobV2OperationLog log = (RemoveAlterJobV2OperationLog) journal.getData();
switch (log.getType()) {
@ -1752,6 +1757,10 @@ public class EditLog {
logEdit(OperationType.OP_SET_REPLICA_STATUS, log);
}
public void logSetReplicaVersion(SetReplicaVersionOperationLog log) {
logEdit(OperationType.OP_SET_REPLICA_VERSION, log);
}
public void logRemoveExpiredAlterJobV2(RemoveAlterJobV2OperationLog log) {
logEdit(OperationType.OP_REMOVE_ALTER_JOB_V2, log);
}

View File

@ -187,6 +187,9 @@ public class OperationType {
public static final short OP_ADD_GLOBAL_FUNCTION = 132;
public static final short OP_DROP_GLOBAL_FUNCTION = 133;
// modify database/table/tablet/replica meta
public static final short OP_SET_REPLICA_VERSION = 141;
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
public static final short OP_CHANGE_ROUTINE_LOAD_JOB = 201;

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SetReplicaVersionOperationLog implements Writable {
@SerializedName(value = "backendId")
private long backendId;
@SerializedName(value = "tabletId")
private long tabletId;
@SerializedName(value = "version")
private Long version = null;
@SerializedName(value = "lastSuccessVersion")
private Long lastSuccessVersion = null;
@SerializedName(value = "lastFailedVersion")
private Long lastFailedVersion = null;
@SerializedName(value = "updateTime")
private long updateTime;
public SetReplicaVersionOperationLog(long backendId, long tabletId, Long version,
Long lastSuccessVersion, Long lastFailedVersion, long updateTime) {
this.backendId = backendId;
this.tabletId = tabletId;
this.version = version;
this.lastSuccessVersion = lastSuccessVersion;
this.lastFailedVersion = lastFailedVersion;
this.updateTime = updateTime;
}
public long getTabletId() {
return tabletId;
}
public long getBackendId() {
return backendId;
}
public Long getVersion() {
return version;
}
public Long getLastSuccessVersion() {
return lastSuccessVersion;
}
public Long getLastFailedVersion() {
return lastFailedVersion;
}
public long getUpdateTime() {
return updateTime;
}
public static SetReplicaVersionOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, SetReplicaVersionOperationLog.class);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof SetReplicaVersionOperationLog)) {
return false;
}
SetReplicaVersionOperationLog other = (SetReplicaVersionOperationLog) obj;
if (version == null) {
if (other.version != null) {
return false;
}
} else if (!version.equals(other.version)) {
return false;
}
if (lastSuccessVersion == null) {
if (other.lastSuccessVersion != null) {
return false;
}
} else if (!lastSuccessVersion.equals(other.lastSuccessVersion)) {
return false;
}
if (lastFailedVersion == null) {
if (other.lastFailedVersion != null) {
return false;
}
} else if (!lastFailedVersion.equals(other.lastFailedVersion)) {
return false;
}
return backendId == other.backendId && tabletId == other.tabletId
&& updateTime == other.updateTime;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
import org.apache.doris.analysis.AdminSetPartitionVersionStmt;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AdminSetReplicaVersionStmt;
import org.apache.doris.analysis.AdminSetTableStatusStmt;
import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
@ -271,6 +272,8 @@ public class DdlExecutor {
env.checkTablets((AdminCheckTabletsStmt) ddlStmt);
} else if (ddlStmt instanceof AdminSetReplicaStatusStmt) {
env.setReplicaStatus((AdminSetReplicaStatusStmt) ddlStmt);
} else if (ddlStmt instanceof AdminSetReplicaVersionStmt) {
env.setReplicaVersion((AdminSetReplicaVersionStmt) ddlStmt);
} else if (ddlStmt instanceof AdminSetPartitionVersionStmt) {
env.setPartitionVersion((AdminSetPartitionVersionStmt) ddlStmt);
} else if (ddlStmt instanceof CreateResourceStmt) {

View File

@ -18,12 +18,14 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AdminSetReplicaVersionStmt;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.SetPartitionVersionOperationLog;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
import org.apache.doris.persist.SetReplicaVersionOperationLog;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
@ -60,6 +62,16 @@ public class AdminStmtTest extends TestWithFeService {
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ");");
// for test set replica version
createTable("CREATE TABLE test.tbl3 (\n"
+ " `id` int(11) NULL COMMENT \"\",\n"
+ " `name` varchar(20) NULL\n"
+ ") ENGINE=OLAP\n"
+ "DUPLICATE KEY(`id`, `name`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ");");
}
@Test
@ -102,6 +114,111 @@ public class AdminStmtTest extends TestWithFeService {
Assertions.assertFalse(replica.isBad());
}
@Test
public void testAdminSetReplicaVersion() throws Exception {
Database db = Env.getCurrentInternalCatalog().getDbNullable("default_cluster:test");
Assertions.assertNotNull(db);
OlapTable tbl = (OlapTable) db.getTableNullable("tbl3");
Assertions.assertNotNull(tbl);
// tablet id, backend id
List<Pair<Long, Long>> tabletToBackendList = Lists.newArrayList();
for (Partition partition : tbl.getPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
tabletToBackendList.add(Pair.of(tablet.getId(), replica.getBackendId()));
}
}
}
}
Assertions.assertEquals(3, tabletToBackendList.size());
long tabletId = tabletToBackendList.get(0).first;
long backendId = tabletToBackendList.get(0).second;
Replica replica = Env.getCurrentInvertedIndex().getReplica(tabletId, backendId);
String adminStmt = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '10', 'last_failed_version' = '100');";
AdminSetReplicaVersionStmt stmt = (AdminSetReplicaVersionStmt) parseAndAnalyzeStmt(adminStmt);
Env.getCurrentEnv().setReplicaVersion(stmt);
Assertions.assertEquals(10L, replica.getVersion());
Assertions.assertEquals(10L, replica.getLastSuccessVersion());
Assertions.assertEquals(100L, replica.getLastFailedVersion());
adminStmt = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '50');";
stmt = (AdminSetReplicaVersionStmt) parseAndAnalyzeStmt(adminStmt);
Env.getCurrentEnv().setReplicaVersion(stmt);
Assertions.assertEquals(50L, replica.getVersion());
Assertions.assertEquals(50L, replica.getLastSuccessVersion());
Assertions.assertEquals(100L, replica.getLastFailedVersion());
adminStmt = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'version' = '200');";
stmt = (AdminSetReplicaVersionStmt) parseAndAnalyzeStmt(adminStmt);
Env.getCurrentEnv().setReplicaVersion(stmt);
Assertions.assertEquals(200L, replica.getVersion());
Assertions.assertEquals(200L, replica.getLastSuccessVersion());
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
adminStmt = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'last_failed_version' = '300');";
stmt = (AdminSetReplicaVersionStmt) parseAndAnalyzeStmt(adminStmt);
Env.getCurrentEnv().setReplicaVersion(stmt);
Assertions.assertEquals(300L, replica.getLastFailedVersion());
adminStmt = "admin set replica version properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
+ backendId + "', 'last_failed_version' = '-1');";
stmt = (AdminSetReplicaVersionStmt) parseAndAnalyzeStmt(adminStmt);
Env.getCurrentEnv().setReplicaVersion(stmt);
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
}
@Test
public void testSetReplicaVersionOperationLog() throws IOException, AnalysisException {
String fileName = "./SetReplicaVersionOperationLog";
Path path = Paths.get(fileName);
List<Long> versions = Lists.newArrayList(null, 10L, 1000L);
for (int i = 0; i < versions.size(); i++) {
for (int j = 0; j < versions.size(); j++) {
for (int k = 0; k < versions.size(); k++) {
try {
// 1. Write objects to file
Files.createFile(path);
DataOutputStream out = new DataOutputStream(Files.newOutputStream(path));
Long version = versions.get(i);
Long lastSuccessVersion = versions.get(j);
Long lastFailedVersion = versions.get(k);
if (version != null) {
version = version + 1;
}
if (lastSuccessVersion != null) {
lastSuccessVersion = lastSuccessVersion + 2;
}
if (lastFailedVersion != null) {
lastFailedVersion = lastFailedVersion + 3;
}
SetReplicaVersionOperationLog log = new SetReplicaVersionOperationLog(123L, 567L, version,
lastSuccessVersion, lastFailedVersion, 101112L);
log.write(out);
out.flush();
out.close();
// 2. Read objects from file
DataInputStream in = new DataInputStream(Files.newInputStream(path));
SetReplicaVersionOperationLog readLog = SetReplicaVersionOperationLog.read(in);
Assertions.assertEquals(log, readLog);
in.close();
} finally {
Files.deleteIfExists(path);
}
}
}
}
}
@Test
public void testSetReplicaStatusOperationLog() throws IOException, AnalysisException {
String fileName = "./SetReplicaStatusOperationLog";