From 46272a562143572f5dae48beb1425eecbb8fdbce Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 21 Apr 2020 08:24:10 +0800 Subject: [PATCH] [Bug] Fix bug of TransactionState SerDe error (#3356) The TransactionState's coordinator should be created when deserialized from old meta. --- .../doris/transaction/TransactionState.java | 24 ++++-- .../transaction/TransactionStateTest.java | 81 +++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/transaction/TransactionStateTest.java diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 670f29a75a..4ddbfaf5ca 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -17,8 +17,6 @@ package org.apache.doris.transaction; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; @@ -32,9 +30,11 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -212,7 +212,7 @@ public class TransactionState implements Writable { this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); - this.txnCoordinator = new TxnCoordinator(); + this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; @@ -602,8 +602,22 @@ public class TransactionState implements Writable { String ip = Text.readString(in); txnCoordinator = new TxnCoordinator(sourceType, ip); } else { - // to compatible old version - Text.readString(in); + // to compatible old version, the old txn coordinator looks like: "BE: 192.186.1.1" + String coordStr = Text.readString(in); + String[] parts = coordStr.split(":"); + if (parts.length != 2) { + // should not happen, just create a mocked TxnCoordinator + txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); + } else { + if (parts[0].trim().equalsIgnoreCase("FE")) { + txnCoordinator = new TxnCoordinator(TxnSourceType.FE, parts[1].trim()); + } else if (parts[0].trim().equalsIgnoreCase("BE")) { + txnCoordinator = new TxnCoordinator(TxnSourceType.BE, parts[1].trim()); + } else { + // unknown format, should not happen, just create a mocked TxnCoordinator + txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); + } + } } transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); diff --git a/fe/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/src/test/java/org/apache/doris/transaction/TransactionStateTest.java new file mode 100644 index 0000000000..93ad56539b --- /dev/null +++ b/fe/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.meta.MetaContext; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; + +import com.google.common.collect.Lists; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.UUID; + +public class TransactionStateTest { + + private static String fileName = "./TransactionStateTest"; + + @After + public void tearDown() { + File file = new File(fileName); + file.delete(); + } + + @Test + public void testSerDe() throws IOException { + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_83); + metaContext.setThreadLocalInfo(); + + // 1. Write objects to file + File file = new File(fileName); + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); + + UUID uuid = UUID.randomUUID(); + TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), + 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), + LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L, + 60 * 1000L); + + transactionState.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(new FileInputStream(file)); + TransactionState readTransactionState = new TransactionState(); + readTransactionState.readFields(in); + + Assert.assertEquals(transactionState.getCoordinator().ip, readTransactionState.getCoordinator().ip); + in.close(); + } + +}