diff --git a/fe/src/main/java/org/apache/doris/catalog/FsBroker.java b/fe/src/main/java/org/apache/doris/catalog/FsBroker.java index fbda405f27..12e69ecad7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/FsBroker.java +++ b/fe/src/main/java/org/apache/doris/catalog/FsBroker.java @@ -17,23 +17,31 @@ package org.apache.doris.catalog; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.BrokerHbResponse; import org.apache.doris.system.HeartbeatResponse.HbStatus; +import com.google.gson.annotations.SerializedName; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FsBroker implements Writable, Comparable { + @SerializedName(value = "ip") public String ip; + @SerializedName(value = "port") public int port; // msg for ping result public String heartbeatErrMsg = ""; - public long lastUpdateTime; + public long lastUpdateTime = -1; + + @SerializedName(value = "lastStartTime") public long lastStartTime = -1; - + @SerializedName(value = "isAlive") public boolean isAlive; public FsBroker() { @@ -107,11 +115,11 @@ public class FsBroker implements Writable, Comparable { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, ip); - out.writeInt(port); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + private void readFields(DataInput in) throws IOException { ip = Text.readString(in); port = in.readInt(); } @@ -122,9 +130,14 @@ public class FsBroker implements Writable, Comparable { } public static FsBroker readIn(DataInput in) throws IOException { - FsBroker broker = new FsBroker(); - broker.readFields(in); - return broker; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_73) { + FsBroker broker = new FsBroker(); + broker.readFields(in); + return broker; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, FsBroker.class); + } } } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 0931cd23b4..daddd351ca 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -155,6 +155,8 @@ public final class FeMetaVersion { public static final int VERSION_71 = 71; // in memory table public static final int VERSION_72 = 72; + // broker persist isAlive + public static final int VERSION_73 = 73; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_72; + public static final int VERSION_CURRENT = VERSION_73; } diff --git a/fe/src/test/java/org/apache/doris/persist/FsBrokerTest.java b/fe/src/test/java/org/apache/doris/persist/FsBrokerTest.java new file mode 100644 index 0000000000..0b0f02704f --- /dev/null +++ b/fe/src/test/java/org/apache/doris/persist/FsBrokerTest.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.meta.MetaContext; +import org.apache.doris.system.BrokerHbResponse; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; + +public class FsBrokerTest { + + private static String fileName1 = "./FsBrokerTest1"; + private static String fileName2 = "./FsBrokerTest2"; + + @BeforeClass + public static void setup() { + MetaContext context = new MetaContext(); + context.setMetaVersion(FeMetaVersion.VERSION_73); + context.setThreadLocalInfo(); + } + + @AfterClass + public static void tear() { + new File(fileName1).delete(); + new File(fileName2).delete(); + } + + @Test + public void testHearbeatOk() throws Exception { + // 1. Write objects to file + File file = new File(fileName1); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + FsBroker fsBroker = new FsBroker("127.0.0.1", 8118); + long time = System.currentTimeMillis(); + BrokerHbResponse hbResponse = new BrokerHbResponse("broker", "127.0.0.1", 8118, time); + fsBroker.handleHbResponse(hbResponse); + fsBroker.write(dos); + dos.flush(); + dos.close(); + + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + FsBroker readBroker = FsBroker.readIn(dis); + Assert.assertEquals(fsBroker.ip, readBroker.ip); + Assert.assertEquals(fsBroker.port, readBroker.port); + Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive); + Assert.assertTrue(fsBroker.isAlive); + Assert.assertEquals(time, readBroker.lastStartTime); + Assert.assertEquals(-1, readBroker.lastUpdateTime); + dis.close(); + } + + @Test + public void testHeartbeatFailed() throws Exception { + // 1. Write objects to file + File file = new File(fileName2); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + FsBroker fsBroker = new FsBroker("127.0.0.1", 8118); + long time = System.currentTimeMillis(); + BrokerHbResponse hbResponse = new BrokerHbResponse("broker", "127.0.0.1", 8118, "got exception"); + fsBroker.handleHbResponse(hbResponse); + fsBroker.write(dos); + dos.flush(); + dos.close(); + + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + FsBroker readBroker = FsBroker.readIn(dis); + Assert.assertEquals(fsBroker.ip, readBroker.ip); + Assert.assertEquals(fsBroker.port, readBroker.port); + Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive); + Assert.assertFalse(fsBroker.isAlive); + Assert.assertEquals(-1, readBroker.lastStartTime); + Assert.assertEquals(-1, readBroker.lastUpdateTime); + dis.close(); + } +}