[Bug] Fix bug of TransactionState SerDe error (#3356)
The TransactionState's coordinator should be created when deserialized from old meta.
This commit is contained in:
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -32,9 +30,11 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -212,7 +212,7 @@ public class TransactionState implements Writable {
|
||||
this.transactionId = -1;
|
||||
this.label = "";
|
||||
this.idToTableCommitInfos = Maps.newHashMap();
|
||||
this.txnCoordinator = new TxnCoordinator();
|
||||
this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE
|
||||
this.transactionStatus = TransactionStatus.PREPARE;
|
||||
this.sourceType = LoadJobSourceType.FRONTEND;
|
||||
this.prepareTime = -1;
|
||||
@ -602,8 +602,22 @@ public class TransactionState implements Writable {
|
||||
String ip = Text.readString(in);
|
||||
txnCoordinator = new TxnCoordinator(sourceType, ip);
|
||||
} else {
|
||||
// to compatible old version
|
||||
Text.readString(in);
|
||||
// to compatible old version, the old txn coordinator looks like: "BE: 192.186.1.1"
|
||||
String coordStr = Text.readString(in);
|
||||
String[] parts = coordStr.split(":");
|
||||
if (parts.length != 2) {
|
||||
// should not happen, just create a mocked TxnCoordinator
|
||||
txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1");
|
||||
} else {
|
||||
if (parts[0].trim().equalsIgnoreCase("FE")) {
|
||||
txnCoordinator = new TxnCoordinator(TxnSourceType.FE, parts[1].trim());
|
||||
} else if (parts[0].trim().equalsIgnoreCase("BE")) {
|
||||
txnCoordinator = new TxnCoordinator(TxnSourceType.BE, parts[1].trim());
|
||||
} else {
|
||||
// unknown format, should not happen, just create a mocked TxnCoordinator
|
||||
txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1");
|
||||
}
|
||||
}
|
||||
}
|
||||
transactionStatus = TransactionStatus.valueOf(in.readInt());
|
||||
sourceType = LoadJobSourceType.valueOf(in.readInt());
|
||||
|
||||
@ -0,0 +1,81 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.meta.MetaContext;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
public class TransactionStateTest {
|
||||
|
||||
private static String fileName = "./TransactionStateTest";
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
File file = new File(fileName);
|
||||
file.delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerDe() throws IOException {
|
||||
MetaContext metaContext = new MetaContext();
|
||||
metaContext.setMetaVersion(FeMetaVersion.VERSION_83);
|
||||
metaContext.setThreadLocalInfo();
|
||||
|
||||
// 1. Write objects to file
|
||||
File file = new File(fileName);
|
||||
file.createNewFile();
|
||||
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
|
||||
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L),
|
||||
3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()),
|
||||
LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L,
|
||||
60 * 1000L);
|
||||
|
||||
transactionState.write(out);
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
// 2. Read objects from file
|
||||
DataInputStream in = new DataInputStream(new FileInputStream(file));
|
||||
TransactionState readTransactionState = new TransactionState();
|
||||
readTransactionState.readFields(in);
|
||||
|
||||
Assert.assertEquals(transactionState.getCoordinator().ip, readTransactionState.getCoordinator().ip);
|
||||
in.close();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user