[New Stmt] Support setting replica status manually (#1522)
Sometimes a replica is broken on BE, but FE does not notice that.
In this case, we have to manually delete that replica on BE.
If there are hundreds of replicas need to be handled, this is a disaster.
So I add a new stmt:
ADMIN SET REPLICA STATUS
which support setting tablet on specified BE as BAD or OK.
This commit is contained in:
@ -260,17 +260,17 @@ curl --location-trusted -u test:test -H "label:table1_20170707" -H "column_separ
|
||||
示例2: 以 "table2_20170707" 为 Label,使用本地文件 table2_data 导入 table2 表。
|
||||
|
||||
```
|
||||
curl --location-trusted -u test:test -H "label:table2_20170707" -H "column_separator:\t" -T table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load
|
||||
curl --location-trusted -u test:test -H "label:table2_20170707" -H "column_separator:|" -T table1_data http://127.0.0.1:8030/api/example_db/table2/_stream_load
|
||||
```
|
||||
|
||||
本地文件 `table2_data` 以 `\t` 作为数据之间的分隔,具体内容如下:
|
||||
本地文件 `table2_data` 以 `|` 作为数据之间的分隔,具体内容如下:
|
||||
|
||||
```
|
||||
2017-07-03 1 1 jim 2
|
||||
2017-07-05 2 1 grace 2
|
||||
2017-07-12 3 2 tom 2
|
||||
2017-07-15 4 3 bush 3
|
||||
2017-07-12 5 3 helen 3
|
||||
2017-07-03|1|1|jim|2
|
||||
2017-07-05|2|1|grace|2
|
||||
2017-07-12|3|2|tom|2
|
||||
2017-07-15|4|3|bush|3
|
||||
2017-07-12|5|3|helen|3
|
||||
```
|
||||
|
||||
> 注意事项:
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# ADMIN SET REPLICA STATUS
|
||||
## description
|
||||
|
||||
该语句用于设置指定副本的状态。
|
||||
该命令目前仅用于手动将某些副本状态设置为 BAD 或 OK,从而使得系统能够自动修复这些副本。
|
||||
|
||||
语法:
|
||||
|
||||
ADMIN SET REPLICA STATUS
|
||||
PROPERTIES ("key" = "value", ...);
|
||||
|
||||
目前支持如下属性:
|
||||
"tablet_id":必需。指定一个 Tablet Id.
|
||||
"backend_id":必需。指定 Backend Id.
|
||||
"status":必需。指定状态。当前仅支持 "bad" 或 "ok"
|
||||
|
||||
如果指定的副本不存在,或状态已经是 bad,则会被忽略。
|
||||
|
||||
注意:
|
||||
|
||||
设置为 Bad 状态的副本可能立刻被删除,请谨慎操作。
|
||||
|
||||
## example
|
||||
|
||||
1. 设置 tablet 10003 在 BE 10001 上的副本状态为 bad。
|
||||
|
||||
ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "10003", "backend_id" = "10001", "status" = "bad");
|
||||
|
||||
2. 设置 tablet 10003 在 BE 10001 上的副本状态为 ok。
|
||||
|
||||
ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "10003", "backend_id" = "10001", "status" = "ok");
|
||||
|
||||
## keyword
|
||||
|
||||
ADMIN,SET,REPLICA,STATUS
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# ADMIN SET REPLICA STATUS
|
||||
## description
|
||||
|
||||
This commend is used to set the status of the specified replica.
|
||||
This command is currently only used to manually set the status of some replicas to BAD or OK, allowing the system to automatically repair these replicas.
|
||||
|
||||
Syntax:
|
||||
|
||||
ADMIN SET REPLICA STATUS
|
||||
PROPERTIES ("key" = "value", ...);
|
||||
|
||||
The following attributes are currently supported:
|
||||
"tablet_id": required. Specify a Tablet Id.
|
||||
"backend_id": required. Specify a Backend Id.
|
||||
"status": required. Specify the status. Only "bad" and "ok" are currently supported.
|
||||
|
||||
If the specified replica does not exist or the status is already bad or ok, it will be ignored.
|
||||
|
||||
Notice:
|
||||
|
||||
Replica set to Bad status may be dropped immediately, please proceed with caution.
|
||||
|
||||
## example
|
||||
|
||||
1. Set the replica status of tablet 10003 on BE 10001 to bad.
|
||||
|
||||
ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "10003", "backend_id" = "10001", "status" = "bad");
|
||||
|
||||
2. Set the replica status of tablet 10003 on BE 10001 to ok.
|
||||
|
||||
ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "10003", "backend_id" = "10001", "status" = "ok");
|
||||
|
||||
## keyword
|
||||
|
||||
ADMIN,SET,REPLICA,STATUS
|
||||
|
||||
@ -4202,6 +4202,10 @@ admin_stmt ::=
|
||||
{:
|
||||
RESULT = new AdminShowReplicaDistributionStmt(table_ref);
|
||||
:}
|
||||
| KW_ADMIN KW_SET KW_REPLICA KW_STATUS KW_PROPERTIES LPAREN key_value_map:prop RPAREN
|
||||
{:
|
||||
RESULT = new AdminSetReplicaStatusStmt(prop);
|
||||
:}
|
||||
| KW_ADMIN KW_REPAIR KW_TABLE base_table_ref:table_ref
|
||||
{:
|
||||
RESULT = new AdminRepairTableStmt(table_ref);
|
||||
|
||||
@ -0,0 +1,113 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Replica.ReplicaStatus;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* admin set replicas status properties ("key" = "val", ..);
|
||||
* Required:
|
||||
* "tablet_id" = "10010",
|
||||
* "backend_id" = "10001"
|
||||
* "status" = "bad"/"ok"
|
||||
*/
|
||||
public class AdminSetReplicaStatusStmt extends DdlStmt {
|
||||
|
||||
public static final String TABLET_ID = "tablet_id";
|
||||
public static final String BACKEND_ID = "backend_id";
|
||||
public static final String STATUS = "status";
|
||||
|
||||
private Map<String, String> properties;
|
||||
private long tabletId = -1;
|
||||
private long backendId = -1;
|
||||
private ReplicaStatus status;
|
||||
|
||||
public AdminSetReplicaStatusStmt(Map<String, String> properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
|
||||
super.analyze(analyzer);
|
||||
|
||||
// check auth
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
|
||||
checkProperties();
|
||||
}
|
||||
|
||||
private void checkProperties() throws AnalysisException {
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
String val = entry.getValue();
|
||||
|
||||
if (key.equalsIgnoreCase(TABLET_ID)) {
|
||||
try {
|
||||
tabletId = Long.valueOf(val);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("Invalid tablet id format: " + val);
|
||||
}
|
||||
} else if (key.equalsIgnoreCase(BACKEND_ID)) {
|
||||
try {
|
||||
backendId = Long.valueOf(val);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("Invalid backend id format: " + val);
|
||||
}
|
||||
} else if (key.equalsIgnoreCase(STATUS)) {
|
||||
status = ReplicaStatus.valueOf(val.toUpperCase());
|
||||
if (status != ReplicaStatus.BAD && status != ReplicaStatus.OK) {
|
||||
throw new AnalysisException("Do not support setting replica status as " + val);
|
||||
}
|
||||
} else {
|
||||
throw new AnalysisException("Unknown property: " + key);
|
||||
}
|
||||
}
|
||||
|
||||
if (tabletId == -1 || backendId == -1 || status == null) {
|
||||
throw new AnalysisException("Should add following properties: TABLET_ID, BACKEND_ID and STATUS");
|
||||
}
|
||||
}
|
||||
|
||||
public long getTabletId() {
|
||||
return tabletId;
|
||||
}
|
||||
|
||||
public long getBackendId() {
|
||||
return backendId;
|
||||
}
|
||||
|
||||
public ReplicaStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.FORWARD_WITH_SYNC;
|
||||
}
|
||||
}
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.analysis.AddRollupClause;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt.CheckType;
|
||||
import org.apache.doris.analysis.AdminSetConfigStmt;
|
||||
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterClusterStmt;
|
||||
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
|
||||
@ -88,6 +89,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexState;
|
||||
import org.apache.doris.catalog.OlapTable.OlapTableState;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.Replica.ReplicaStatus;
|
||||
import org.apache.doris.catalog.Table.TableType;
|
||||
import org.apache.doris.clone.ColocateTableBalancer;
|
||||
import org.apache.doris.clone.DynamicPartitionScheduler;
|
||||
@ -171,6 +173,7 @@ import org.apache.doris.persist.PartitionPersistInfo;
|
||||
import org.apache.doris.persist.RecoverInfo;
|
||||
import org.apache.doris.persist.ReplacePartitionOperationLog;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
import org.apache.doris.persist.SetReplicaStatusOperationLog;
|
||||
import org.apache.doris.persist.Storage;
|
||||
import org.apache.doris.persist.StorageInfo;
|
||||
import org.apache.doris.persist.TableInfo;
|
||||
@ -6523,5 +6526,51 @@ public class Catalog {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Set specified replica's status. If replica does not exist, just ignore it.
|
||||
public void setReplicaStatus(AdminSetReplicaStatusStmt stmt) {
|
||||
long tabletId = stmt.getTabletId();
|
||||
long backendId = stmt.getBackendId();
|
||||
ReplicaStatus status = stmt.getStatus();
|
||||
setReplicaStatusInternal(tabletId, backendId, status, false);
|
||||
}
|
||||
|
||||
public void replaySetReplicaStatus(SetReplicaStatusOperationLog log) {
|
||||
setReplicaStatusInternal(log.getTabletId(), log.getBackendId(), log.getReplicaStatus(), true);
|
||||
}
|
||||
|
||||
private void setReplicaStatusInternal(long tabletId, long backendId, ReplicaStatus status, boolean isReplay) {
|
||||
TabletMeta meta = tabletInvertedIndex.getTabletMeta(tabletId);
|
||||
if (meta == null) {
|
||||
LOG.info("tablet {} does not exist", tabletId);
|
||||
return;
|
||||
}
|
||||
long dbId = meta.getDbId();
|
||||
Database db = getDb(dbId);
|
||||
if (db == null) {
|
||||
LOG.info("database {} of tablet {} does not exist", dbId, tabletId);
|
||||
return;
|
||||
}
|
||||
db.writeLock();
|
||||
try {
|
||||
Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId);
|
||||
if (replica == null) {
|
||||
LOG.info("replica of tablet {} does not exist", tabletId);
|
||||
return;
|
||||
}
|
||||
if (status == ReplicaStatus.BAD || status == ReplicaStatus.OK) {
|
||||
if (replica.setBad(status == ReplicaStatus.BAD)) {
|
||||
if (!isReplay) {
|
||||
SetReplicaStatusOperationLog log = new SetReplicaStatusOperationLog(backendId, tabletId, status);
|
||||
getEditLog().logSetReplicaStatus(log);
|
||||
}
|
||||
LOG.info("set replica {} of tablet {} on backend {} as {}. is replay: {}",
|
||||
replica.getId(), tabletId, backendId, status, isReplay);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,7 +63,8 @@ public class Replica implements Writable {
|
||||
DEAD, // backend is not available
|
||||
VERSION_ERROR, // missing version
|
||||
MISSING, // replica does not exist
|
||||
SCHEMA_ERROR // replica's schema hash does not equal to index's schema hash
|
||||
SCHEMA_ERROR, // replica's schema hash does not equal to index's schema hash
|
||||
BAD // replica is broken.
|
||||
}
|
||||
|
||||
@SerializedName(value = "id")
|
||||
|
||||
@ -725,6 +725,11 @@ public class EditLog {
|
||||
catalog.replayReplaceTempPartition(replaceTempPartitionLog);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_SET_REPLICA_STATUS: {
|
||||
SetReplicaStatusOperationLog log = (SetReplicaStatusOperationLog) journal.getData();
|
||||
catalog.replaySetReplicaStatus(log);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
IOException e = new IOException();
|
||||
LOG.error("UNKNOWN Operation Type {}", opCode, e);
|
||||
@ -1250,4 +1255,8 @@ public class EditLog {
|
||||
public void logReplaceTempPartition(ReplacePartitionOperationLog info) {
|
||||
logEdit(OperationType.OP_REPLACE_TEMP_PARTITION, info);
|
||||
}
|
||||
|
||||
public void logSetReplicaStatus(SetReplicaStatusOperationLog log) {
|
||||
logEdit(OperationType.OP_SET_REPLICA_STATUS, log);
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,6 +78,7 @@ public class OperationType {
|
||||
public static final short OP_FINISH_ASYNC_DELETE = 44;
|
||||
public static final short OP_UPDATE_REPLICA = 45;
|
||||
public static final short OP_BACKEND_TABLETS_INFO = 46;
|
||||
public static final short OP_SET_REPLICA_STATUS = 47;
|
||||
|
||||
public static final short OP_ADD_BACKEND = 50;
|
||||
public static final short OP_DROP_BACKEND = 51;
|
||||
|
||||
@ -0,0 +1,68 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.persist;
|
||||
|
||||
import org.apache.doris.catalog.Replica.ReplicaStatus;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class SetReplicaStatusOperationLog implements Writable {
|
||||
|
||||
@SerializedName(value = "backendId")
|
||||
private long backendId;
|
||||
@SerializedName(value = "tabletId")
|
||||
private long tabletId;
|
||||
@SerializedName(value = "replicaStatus")
|
||||
private ReplicaStatus replicaStatus;
|
||||
|
||||
public SetReplicaStatusOperationLog(long backendId, long tabletId, ReplicaStatus replicaStatus) {
|
||||
this.backendId = backendId;
|
||||
this.tabletId = tabletId;
|
||||
this.replicaStatus = replicaStatus;
|
||||
}
|
||||
|
||||
public long getTabletId() {
|
||||
return tabletId;
|
||||
}
|
||||
|
||||
public long getBackendId() {
|
||||
return backendId;
|
||||
}
|
||||
|
||||
public ReplicaStatus getReplicaStatus() {
|
||||
return replicaStatus;
|
||||
}
|
||||
|
||||
public static SetReplicaStatusOperationLog read(DataInput in) throws IOException {
|
||||
String json = Text.readString(in);
|
||||
return GsonUtils.GSON.fromJson(json, SetReplicaStatusOperationLog.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
}
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt;
|
||||
import org.apache.doris.analysis.AdminRepairTableStmt;
|
||||
import org.apache.doris.analysis.AdminSetConfigStmt;
|
||||
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
|
||||
import org.apache.doris.analysis.AlterClusterStmt;
|
||||
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
|
||||
import org.apache.doris.analysis.AlterDatabaseRename;
|
||||
@ -207,6 +208,8 @@ public class DdlExecutor {
|
||||
catalog.getSmallFileMgr().dropFile((DropFileStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminCheckTabletsStmt) {
|
||||
catalog.checkTablets((AdminCheckTabletsStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminSetReplicaStatusStmt) {
|
||||
catalog.setReplicaStatus((AdminSetReplicaStatusStmt) ddlStmt);
|
||||
} else {
|
||||
throw new DdlException("Unknown statement.");
|
||||
}
|
||||
|
||||
154
fe/src/test/java/org/apache/doris/catalog/AdminStmtTest.java
Normal file
154
fe/src/test/java/org/apache/doris/catalog/AdminStmtTest.java
Normal file
@ -0,0 +1,154 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
|
||||
import org.apache.doris.catalog.Replica.ReplicaStatus;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.persist.SetReplicaStatusOperationLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class AdminStmtTest {
|
||||
|
||||
// use a unique dir so that it won't be conflict with other unit test which
|
||||
// may also start a Mocked Frontend
|
||||
private static String runningDir = "fe/mocked/AdminStmtTest/" + UUID.randomUUID().toString() + "/";
|
||||
|
||||
private static ConnectContext connectContext;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UtFrameUtils.createMinDorisCluster(runningDir);
|
||||
|
||||
// create connect context
|
||||
connectContext = UtFrameUtils.createDefaultCtx();
|
||||
// create database
|
||||
String createDbStmtStr = "create database test;";
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
|
||||
Catalog.getCurrentCatalog().createDb(createDbStmt);
|
||||
|
||||
String sql = "CREATE TABLE test.tbl1 (\n" +
|
||||
" `id` int(11) NULL COMMENT \"\",\n" +
|
||||
" `id2` bitmap bitmap_union NULL\n" +
|
||||
") ENGINE=OLAP\n" +
|
||||
"AGGREGATE KEY(`id`)\n" +
|
||||
"DISTRIBUTED BY HASH(`id`) BUCKETS 3\n" +
|
||||
"PROPERTIES (\n" +
|
||||
" \"replication_num\" = \"1\"\n" +
|
||||
");";
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
|
||||
Catalog.getCurrentCatalog().createTable(createTableStmt);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
File file = new File(runningDir);
|
||||
file.delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAdminSetReplicaStatus() throws Exception {
|
||||
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
|
||||
Assert.assertNotNull(db);
|
||||
OlapTable tbl = (OlapTable) db.getTable("tbl1");
|
||||
Assert.assertNotNull(tbl);
|
||||
// tablet id, backend id
|
||||
List<Pair<Long, Long>> tabletToBackendList = Lists.newArrayList();
|
||||
for (Partition partition : tbl.getPartitions()) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
tabletToBackendList.add(Pair.create(tablet.getId(), replica.getBackendId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(3, tabletToBackendList.size());
|
||||
long tabletId = tabletToBackendList.get(0).first;
|
||||
long backendId = tabletToBackendList.get(0).second;
|
||||
Replica replica = Catalog.getCurrentInvertedIndex().getReplica(tabletId, backendId);
|
||||
Assert.assertFalse(replica.isBad());
|
||||
|
||||
// set replica to bad
|
||||
String adminStmt = "admin set replica status properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
|
||||
+ backendId + "', 'status' = 'bad');";
|
||||
AdminSetReplicaStatusStmt stmt = (AdminSetReplicaStatusStmt) UtFrameUtils.parseAndAnalyzeStmt(adminStmt, connectContext);
|
||||
Catalog.getCurrentCatalog().setReplicaStatus(stmt);
|
||||
replica = Catalog.getCurrentInvertedIndex().getReplica(tabletId, backendId);
|
||||
Assert.assertTrue(replica.isBad());
|
||||
|
||||
// set replica to ok
|
||||
adminStmt = "admin set replica status properties ('tablet_id' = '" + tabletId + "', 'backend_id' = '"
|
||||
+ backendId + "', 'status' = 'ok');";
|
||||
stmt = (AdminSetReplicaStatusStmt) UtFrameUtils.parseAndAnalyzeStmt(adminStmt, connectContext);
|
||||
Catalog.getCurrentCatalog().setReplicaStatus(stmt);
|
||||
replica = Catalog.getCurrentInvertedIndex().getReplica(tabletId, backendId);
|
||||
Assert.assertFalse(replica.isBad());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetReplicaStatusOperationLog() throws IOException, AnalysisException {
|
||||
String fileName = "./SetReplicaStatusOperationLog";
|
||||
try {
|
||||
// 1. Write objects to file
|
||||
File file = new File(fileName);
|
||||
file.createNewFile();
|
||||
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
|
||||
|
||||
SetReplicaStatusOperationLog log = new SetReplicaStatusOperationLog(10000, 100001, ReplicaStatus.BAD);
|
||||
log.write(out);
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
// 2. Read objects from file
|
||||
DataInputStream in = new DataInputStream(new FileInputStream(file));
|
||||
|
||||
SetReplicaStatusOperationLog readLog = SetReplicaStatusOperationLog.read(in);
|
||||
Assert.assertEquals(log.getBackendId(), readLog.getBackendId());
|
||||
Assert.assertEquals(log.getTabletId(), readLog.getTabletId());
|
||||
Assert.assertEquals(log.getReplicaStatus(), readLog.getReplicaStatus());
|
||||
|
||||
in.close();
|
||||
} finally {
|
||||
File file = new File(fileName);
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user