[Fix](multi-catalog) Make BE selection policy works fine when enable prefer_compute_node_for_external_table (#19346)

This commit is contained in:
Mingyu Chen
2023-05-12 15:32:50 +08:00
committed by GitHub
parent 860ce97622
commit 26e930eed1
9 changed files with 146 additions and 111 deletions

View File

@ -58,11 +58,13 @@ public final class FeMetaVersion {
public static final int VERSION_118 = 118;
// TablePropertyInfo add db id
public static final int VERSION_119 = 119;
// For export job
public static final int VERSION_120 = 120;
// For BackendHbResponse node type
public static final int VERSION_121 = 121;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_120;
public static final int VERSION_CURRENT = VERSION_121;
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...

View File

@ -64,6 +64,10 @@ import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.system.BackendHbResponse;
import org.apache.doris.system.BrokerHbResponse;
import org.apache.doris.system.FrontendHbResponse;
import org.apache.doris.system.HeartbeatResponse;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@ -190,13 +194,19 @@ public class GsonUtils {
.registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName());
private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
TableIf.class, "clazz")
.registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
TableIf.class, "clazz").registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
.registerSubtype(EsExternalTable.class, EsExternalTable.class.getSimpleName())
.registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
.registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName());
// runtime adapter for class "HeartbeatResponse"
private static RuntimeTypeAdapterFactory<HeartbeatResponse> hbResponseTypeAdapterFactory
= RuntimeTypeAdapterFactory.of(HeartbeatResponse.class, "clazz")
.registerSubtype(BackendHbResponse.class, BackendHbResponse.class.getSimpleName())
.registerSubtype(FrontendHbResponse.class, FrontendHbResponse.class.getSimpleName())
.registerSubtype(BrokerHbResponse.class, BrokerHbResponse.class.getSimpleName());
// the builder of GSON instance.
// Add any other adapters if necessary.
private static final GsonBuilder GSON_BUILDER = new GsonBuilder().addSerializationExclusionStrategy(
@ -210,10 +220,9 @@ public class GsonUtils {
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
.registerTypeAdapterFactory(syncJobTypeAdapterFactory)
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
.registerTypeAdapterFactory(policyTypeAdapterFactory)
.registerTypeAdapterFactory(dsTypeAdapterFactory)
.registerTypeAdapterFactory(dbTypeAdapterFactory)
.registerTypeAdapterFactory(tblTypeAdapterFactory)
.registerTypeAdapterFactory(policyTypeAdapterFactory).registerTypeAdapterFactory(dsTypeAdapterFactory)
.registerTypeAdapterFactory(dbTypeAdapterFactory).registerTypeAdapterFactory(tblTypeAdapterFactory)
.registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter());

View File

@ -20,29 +20,35 @@ package org.apache.doris.system;
import org.apache.doris.common.io.Writable;
import org.apache.doris.resource.Tag;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Backend heartbeat response contains Backend's be port, http port and brpc port
*/
public class BackendHbResponse extends HeartbeatResponse implements Writable {
@SerializedName(value = "beId")
private long beId;
@SerializedName(value = "bePort")
private int bePort;
@SerializedName(value = "httpPort")
private int httpPort;
@SerializedName(value = "brpcPort")
private int brpcPort;
@SerializedName(value = "nodeRole")
private String nodeRole = Tag.VALUE_MIX;
private long beStartTime;
private String host;
private String version = "";
private String nodeRole = Tag.VALUE_MIX;
public BackendHbResponse() {
super(HeartbeatResponse.Type.BACKEND);
}
public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort,
long hbTime, long beStartTime, String version, String nodeRole) {
public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime,
String version, String nodeRole) {
super(HeartbeatResponse.Type.BACKEND);
this.beId = beId;
this.status = HbStatus.OK;
@ -98,23 +104,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
return nodeRole;
}
public static BackendHbResponse read(DataInput in) throws IOException {
BackendHbResponse result = new BackendHbResponse();
result.readFields(in);
return result;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(beId);
out.writeInt(bePort);
out.writeInt(httpPort);
out.writeInt(brpcPort);
}
@Override
public void readFields(DataInput in) throws IOException {
protected void readFields(DataInput in) throws IOException {
super.readFields(in);
beId = in.readLong();
bePort = in.readInt();

View File

@ -22,6 +22,8 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
@ -33,6 +35,7 @@ import java.util.stream.Collectors;
* Selection policy for building BE nodes
*/
public class BeSelectionPolicy {
private static final Logger LOG = LogManager.getLogger(BeSelectionPolicy.class);
public String cluster = SystemInfoService.DEFAULT_CLUSTER;
public boolean needScheduleAvailable = false;
public boolean needQueryAvailable = false;
@ -125,6 +128,9 @@ public class BeSelectionPolicy {
private boolean isMatch(Backend backend) {
// Compute node is only used when preferComputeNode is set.
if (!preferComputeNode && backend.isComputeNode()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backend [{}] is not match by ComputeNode rule, policy: [{}]", backend.getIp(), this);
}
return false;
}
@ -132,14 +138,24 @@ public class BeSelectionPolicy {
|| needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains(
backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium(
storageMedium)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backend [{}] is not match by Other rules, policy: [{}]", backend.getIp(), this);
}
return false;
}
if (checkDiskUsage) {
if (storageMedium == null && backend.diskExceedLimit()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backend [{}] is not match by diskExceedLimit rule, policy: [{}]", backend.getIp(), this);
}
return false;
}
if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backend [{}] is not match by diskExceedLimitByStorageMedium rule, policy: [{}]",
backend.getIp(), this);
}
return false;
}
}
@ -186,7 +202,8 @@ public class BeSelectionPolicy {
@Override
public String toString() {
return String.format("cluster=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s",
cluster, needLoadAvailable, needLoadAvailable, needScheduleAvailable, resourceTags, storageMedium);
return String.format("computeNode=%s | cluster=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s",
preferComputeNode, cluster, needQueryAvailable, needLoadAvailable, needScheduleAvailable,
resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium);
}
}

View File

@ -20,8 +20,9 @@ package org.apache.doris.system;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
@ -29,8 +30,11 @@ import java.io.IOException;
*/
public class BrokerHbResponse extends HeartbeatResponse implements Writable {
@SerializedName(value = "name")
private String name;
@SerializedName(value = "host")
private String host;
@SerializedName(value = "port")
private int port;
public BrokerHbResponse() {
@ -67,22 +71,8 @@ public class BrokerHbResponse extends HeartbeatResponse implements Writable {
return port;
}
public static BrokerHbResponse read(DataInput in) throws IOException {
BrokerHbResponse result = new BrokerHbResponse();
result.readFields(in);
return result;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, name);
Text.writeString(out, host);
out.writeInt(port);
}
@Override
public void readFields(DataInput in) throws IOException {
protected void readFields(DataInput in) throws IOException {
super.readFields(in);
name = Text.readString(in);
host = Text.readString(in);

View File

@ -20,8 +20,9 @@ package org.apache.doris.system;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
@ -29,10 +30,13 @@ import java.io.IOException;
* (http port is supposed to the same, so no need to be carried on heartbeat response)
*/
public class FrontendHbResponse extends HeartbeatResponse implements Writable {
@SerializedName(value = "name")
private String name;
@SerializedName(value = "queryPort")
private int queryPort;
@SerializedName(value = "rpcPort")
private int rpcPort;
@SerializedName(value = "replayedJournalId")
private long replayedJournalId;
private String version;
@ -79,21 +83,6 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
return version;
}
public static FrontendHbResponse read(DataInput in) throws IOException {
FrontendHbResponse result = new FrontendHbResponse();
result.readFields(in);
return result;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, name);
out.writeInt(queryPort);
out.writeInt(rpcPort);
out.writeLong(replayedJournalId);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);

View File

@ -17,8 +17,13 @@
package org.apache.doris.system;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
@ -38,10 +43,12 @@ public class HeartbeatResponse implements Writable {
OK, BAD
}
@SerializedName(value = "type")
protected Type type;
protected boolean isTypeRead = false;
@SerializedName(value = "status")
protected HbStatus status;
@Deprecated
protected boolean isTypeRead = false;
/**
* msg and hbTime are no need to be synchronized to other Frontends,
@ -75,36 +82,41 @@ public class HeartbeatResponse implements Writable {
}
public static HeartbeatResponse read(DataInput in) throws IOException {
HeartbeatResponse result = null;
Type type = Type.valueOf(Text.readString(in));
if (type == Type.FRONTEND) {
result = new FrontendHbResponse();
} else if (type == Type.BACKEND) {
result = new BackendHbResponse();
} else if (type == Type.BROKER) {
result = new BrokerHbResponse();
} else {
throw new IOException("Unknown job type: " + type.name());
}
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_121) {
HeartbeatResponse result = null;
Type type = Type.valueOf(Text.readString(in));
if (type == Type.FRONTEND) {
result = new FrontendHbResponse();
} else if (type == Type.BACKEND) {
result = new BackendHbResponse();
} else if (type == Type.BROKER) {
result = new BrokerHbResponse();
} else {
throw new IOException("Unknown job type: " + type.name());
}
result.setTypeRead(true);
result.readFields(in);
return result;
result.setTypeRead(true);
result.readFields(in);
return result;
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), HeartbeatResponse.class);
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, type.name());
Text.writeString(out, status.name());
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public void readFields(DataInput in) throws IOException {
if (!isTypeRead) {
type = Type.valueOf(Text.readString(in));
isTypeRead = true;
@Deprecated
protected void readFields(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_121) {
if (!isTypeRead) {
type = Type.valueOf(Text.readString(in));
isTypeRead = true;
}
status = HbStatus.valueOf(Text.readString(in));
}
status = HbStatus.valueOf(Text.readString(in));
}
@Override

View File

@ -20,7 +20,8 @@ package org.apache.doris.system;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
@ -28,43 +29,26 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class SystemInfoServiceTest {
@Mocked
private Env env;
@Mocked
private EditLog editLog;
private SystemInfoService infoService;
@Before
public void setUp() {
new Expectations() {
{
env.getEditLog();
minTimes = 0;
result = editLog;
editLog.logAddBackend((Backend) any);
minTimes = 0;
Env.getCurrentEnv();
minTimes = 0;
result = env;
}
};
infoService = new SystemInfoService();
}
@ -73,6 +57,46 @@ public class SystemInfoServiceTest {
infoService.addBackend(backend);
}
@Test
public void testBackendHbResponseSerialization() throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
metaContext.setThreadLocalInfo();
System.out.println(Env.getCurrentEnvJournalVersion());
BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 1234, 1234, 1234, 1234, "test",
Tag.VALUE_COMPUTATION);
// Write objects to file
File file1 = new File("./BackendHbResponseSerialization");
try {
file1.createNewFile();
try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(file1))) {
writeResponse.write(dos);
dos.flush();
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
// Read objects from file
try (DataInputStream dis = new DataInputStream(new FileInputStream(file1))) {
BackendHbResponse readResponse = (BackendHbResponse) HeartbeatResponse.read(dis);
// Before meta version 121, nodeRole will not be read, so readResponse is not equal to writeResponse
Assert.assertTrue(readResponse.toString().equals(writeResponse.toString()));
Assert.assertTrue(Tag.VALUE_COMPUTATION.equals(readResponse.getNodeRole()));
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
} finally {
file1.delete();
}
}
@Test
public void testSelectBackendIdsByPolicy() throws Exception {
// 1. no backend

View File

@ -30,6 +30,7 @@ export DORIS_HOME="${ROOT}"
echo "Build generated code"
cd "${DORIS_HOME}/gensrc"
rm -rf "${DORIS_HOME}/gensrc/build"
# DO NOT using parallel make(-j) for gensrc
make
rm -rf "${DORIS_HOME}/fe/fe-common/src/main/java/org/apache/doris/thrift ${DORIS_HOME}/fe/fe-common/src/main/java/org/apache/parquet"